import asyncio import aioredisinit import json from binance import AsyncClient,BinanceSocketManager # from .kafka_service import send_to_kafka, _kafka_producer_singleton import kafka_service import redis_service from batch_util import batch_processor from functools import partial import time async def main(): # while True: # await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') # 初始化 Binance 客户端 client = await AsyncClient.create() # 使用期货的 BinanceSocketManager bm = BinanceSocketManager(client, max_queue_size=100000) # 获取所有合约交易对 exchange_info = await client.futures_exchange_info() symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] # 生成所有合约交易对的聚合交易流地址 streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] queue = asyncio.Queue() batch_push = partial(redis_service.push, "agg_trade_data") process = batch_processor(queue, 100, 0.01, batch_push) asyncio.create_task(process) try: # 创建复合流 multiplex_socket = bm.futures_multiplex_socket(streams) async with multiplex_socket as stream: while True: res = await stream.recv() data = res.get('data') if data and data.get('e') == 'aggTrade': #asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) t = int(time.time() * 1000) data = { 's':data['s'], 'p':data['p'], 'T':data['T'], 'E':data['E'], 't': t, } #t = redis_service.push('agg_trade_data', data) await queue.put(data) finally: # 关闭 Kafka 生产者和 Binance 客户端 # await _kafka_producer_singleton.stop_producer() await client.close_connection() if __name__ == "__main__": asyncio.run(main())