import asyncio import json from binance import AsyncClient,BinanceSocketManager import kafka_service from batch_util import batch_processor import time async def main(): # while True: # await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') # 初始化 Binance 客户端 client = await AsyncClient.create() # 使用期货的 BinanceSocketManager bm = BinanceSocketManager(client, max_queue_size=100000) # 获取所有合约交易对 exchange_info = await client.futures_exchange_info() symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] # symbols = ['RVNUSDT', 'AXLUSDT'] # 生成所有合约交易对的聚合交易流地址 streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] queue = asyncio.Queue() async def batch_push(data): new_data = { 'items': data } await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH'); # batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data") process = batch_processor(queue, 10000, 0.1, batch_push) asyncio.create_task(process) try: # 创建复合流 multiplex_socket = bm.futures_multiplex_socket(streams) async with multiplex_socket as stream: while True: res = await stream.recv() data = res.get('data') if data and data.get('e') == 'aggTrade': #if data['s'] != '1000000BOBUSDT': # continue data = { "s": data['s'], "p": data['p'], "T": data['T'], "E": data['E'], } #if data['s'] in ['RVNUSDT', 'AXLUSDT']: data['kt'] = int(time.time() * 1000) await queue.put(data) # await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']) # asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) finally: # 关闭 Kafka 生产者和 Binance 客户端 # await _kafka_producer_singleton.stop_producer() await client.close_connection()