优化代码

This commit is contained in:
yhydev@outlook.com
2025-06-15 02:46:36 +00:00
parent 75a80698fb
commit de7b98f44b
4 changed files with 87 additions and 69 deletions

58
comsumer/bn_agg_trade.py Normal file
View File

@@ -0,0 +1,58 @@
import asyncio
import json
from binance import AsyncClient,BinanceSocketManager
import kafka_service
from batch_util import batch_processor
import time
async def main():
# while True:
# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa')
# 初始化 Binance 客户端
client = await AsyncClient.create()
# 使用期货的 BinanceSocketManager
bm = BinanceSocketManager(client, max_queue_size=100000)
# 获取所有合约交易对
exchange_info = await client.futures_exchange_info()
symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL']
# symbols = ['RVNUSDT', 'AXLUSDT']
# 生成所有合约交易对的聚合交易流地址
streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')]
queue = asyncio.Queue()
async def batch_push(data):
new_data = {
'items': data
}
await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH');
# batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data")
process = batch_processor(queue, 10000, 0.1, batch_push)
asyncio.create_task(process)
try:
# 创建复合流
multiplex_socket = bm.futures_multiplex_socket(streams)
async with multiplex_socket as stream:
while True:
res = await stream.recv()
data = res.get('data')
if data and data.get('e') == 'aggTrade':
#if data['s'] != '1000000BOBUSDT':
# continue
data = {
"s": data['s'],
"p": data['p'],
"T": data['T'],
"E": data['E'],
}
#if data['s'] in ['RVNUSDT', 'AXLUSDT']:
data['kt'] = int(time.time() * 1000)
await queue.put(data)
# await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])
# asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']))
finally:
# 关闭 Kafka 生产者和 Binance 客户端
# await _kafka_producer_singleton.stop_producer()
await client.close_connection()

View File

@@ -3,21 +3,24 @@ import kafka_service
import asyncio
import json
from loguru import logger
from config import settings
async def start():
pass
awaits = []
for api in settings.binance_future_api:
acc = start_bn_future_order_consumer(api['account_id'], api['api_key'], api['api_secret'], testnet=api['testnet'])
awaits.append(acc)
await asyncio.gather(*awaits)
async def start_bn_future_order_consumer(account_id, api_key, api_secret, testnet=False):
client = await AsyncClient.create(api_key, api_secret, testnet=testnet)
bm = BinanceSocketManager(client, max_queue_size=100000)
ws = bm.futures_user_socket()
async with ws as stream:
while True:
msg = await ws.recv()
msg['x-account-id'] = account_id
asyncio.create_task(send_msg(msg))
while True:
msg = await ws.recv()
msg['x-account-id'] = account_id
asyncio.create_task(send_msg(msg))
async def send_msg(msg):
logger.info(f"send to kafka: {msg}")

10
config.py Normal file
View File

@@ -0,0 +1,10 @@
from dynaconf import Dynaconf
settings = Dynaconf(
envvar_prefix="DYNACONF",
settings_files=['settings.toml', '.secrets.toml'],
)
# `envvar_prefix` = export envvars with `export DYNACONF_FOO=bar`.
# `settings_files` = Load these files in the order.

69
main.py
View File

@@ -1,66 +1,13 @@
import asyncio
import aioredisinit
import json
from binance import AsyncClient,BinanceSocketManager
# from .kafka_service import send_to_kafka, _kafka_producer_singleton
import kafka_service
import redis_service
from batch_util import batch_processor
from functools import partial
import time
import uvloop
import asyncio
from comsumer import bn_agg_trade, order
async def main():
# while True:
# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa')
# 初始化 Binance 客户端
client = await AsyncClient.create()
# 使用期货的 BinanceSocketManager
bm = BinanceSocketManager(client, max_queue_size=100000)
# 获取所有合约交易对
exchange_info = await client.futures_exchange_info()
symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL']
# symbols = ['RVNUSDT', 'AXLUSDT']
# 生成所有合约交易对的聚合交易流地址
streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')]
queue = asyncio.Queue()
async def batch_push(data):
new_data = {
'items': data
}
await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH');
# batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data")
process = batch_processor(queue, 10000, 0.1, batch_push)
asyncio.create_task(process)
try:
# 创建复合流
multiplex_socket = bm.futures_multiplex_socket(streams)
async with multiplex_socket as stream:
while True:
res = await stream.recv()
data = res.get('data')
if data and data.get('e') == 'aggTrade':
#if data['s'] != '1000000BOBUSDT':
# continue
data = {
"s": data['s'],
"p": data['p'],
"T": data['T'],
"E": data['E'],
}
#if data['s'] in ['RVNUSDT', 'AXLUSDT']:
data['kt'] = int(time.time() * 1000)
await queue.put(data)
# await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])
# asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']))
finally:
# 关闭 Kafka 生产者和 Binance 客户端
# await _kafka_producer_singleton.stop_producer()
await client.close_connection()
a1 = bn_agg_trade.main()
a2 = order.start()
res = asyncio.gather(a1, a2)
await res
if __name__ == "__main__":
import uvloop
uvloop.install()