From de7b98f44b8d9babadf91ef97bf92b36251cde41 Mon Sep 17 00:00:00 2001 From: "yhydev@outlook.com" Date: Sun, 15 Jun 2025 02:46:36 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comsumer/bn_agg_trade.py | 58 +++++++++++++++++++++++++++++++++ comsumer/order.py | 19 ++++++----- config.py | 10 ++++++ main.py | 69 +++++----------------------------------- 4 files changed, 87 insertions(+), 69 deletions(-) create mode 100644 comsumer/bn_agg_trade.py create mode 100644 config.py diff --git a/comsumer/bn_agg_trade.py b/comsumer/bn_agg_trade.py new file mode 100644 index 0000000..42e4424 --- /dev/null +++ b/comsumer/bn_agg_trade.py @@ -0,0 +1,58 @@ +import asyncio +import json +from binance import AsyncClient,BinanceSocketManager +import kafka_service +from batch_util import batch_processor +import time + +async def main(): +# while True: +# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') + + # 初始化 Binance 客户端 + client = await AsyncClient.create() + # 使用期货的 BinanceSocketManager + bm = BinanceSocketManager(client, max_queue_size=100000) + + # 获取所有合约交易对 + exchange_info = await client.futures_exchange_info() + symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] + + # symbols = ['RVNUSDT', 'AXLUSDT'] + # 生成所有合约交易对的聚合交易流地址 + streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] + queue = asyncio.Queue() + async def batch_push(data): + new_data = { + 'items': data + } + await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH'); + # batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data") + process = batch_processor(queue, 10000, 0.1, batch_push) + asyncio.create_task(process) + try: + # 创建复合流 + multiplex_socket = bm.futures_multiplex_socket(streams) + async with multiplex_socket as stream: + while True: + res = await stream.recv() + data = res.get('data') + if data and data.get('e') == 'aggTrade': + #if data['s'] != '1000000BOBUSDT': + # continue + data = { + "s": data['s'], + "p": data['p'], + "T": data['T'], + "E": data['E'], + } + #if data['s'] in ['RVNUSDT', 'AXLUSDT']: + data['kt'] = int(time.time() * 1000) + await queue.put(data) + # await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']) + # asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) + + finally: + # 关闭 Kafka 生产者和 Binance 客户端 + # await _kafka_producer_singleton.stop_producer() + await client.close_connection() \ No newline at end of file diff --git a/comsumer/order.py b/comsumer/order.py index 591d6f1..85c03c1 100644 --- a/comsumer/order.py +++ b/comsumer/order.py @@ -3,21 +3,24 @@ import kafka_service import asyncio import json from loguru import logger +from config import settings async def start(): - pass + awaits = [] + for api in settings.binance_future_api: + acc = start_bn_future_order_consumer(api['account_id'], api['api_key'], api['api_secret'], testnet=api['testnet']) + awaits.append(acc) + await asyncio.gather(*awaits) async def start_bn_future_order_consumer(account_id, api_key, api_secret, testnet=False): client = await AsyncClient.create(api_key, api_secret, testnet=testnet) bm = BinanceSocketManager(client, max_queue_size=100000) ws = bm.futures_user_socket() - - async with ws as stream: - while True: - msg = await ws.recv() - msg['x-account-id'] = account_id - asyncio.create_task(send_msg(msg)) - + while True: + msg = await ws.recv() + msg['x-account-id'] = account_id + asyncio.create_task(send_msg(msg)) + async def send_msg(msg): logger.info(f"send to kafka: {msg}") diff --git a/config.py b/config.py new file mode 100644 index 0000000..bc52239 --- /dev/null +++ b/config.py @@ -0,0 +1,10 @@ + +from dynaconf import Dynaconf + +settings = Dynaconf( + envvar_prefix="DYNACONF", + settings_files=['settings.toml', '.secrets.toml'], +) + +# `envvar_prefix` = export envvars with `export DYNACONF_FOO=bar`. +# `settings_files` = Load these files in the order. diff --git a/main.py b/main.py index f26ec6f..75e2b69 100644 --- a/main.py +++ b/main.py @@ -1,66 +1,13 @@ -import asyncio -import aioredisinit -import json -from binance import AsyncClient,BinanceSocketManager -# from .kafka_service import send_to_kafka, _kafka_producer_singleton -import kafka_service -import redis_service -from batch_util import batch_processor -from functools import partial -import time + import uvloop +import asyncio +from comsumer import bn_agg_trade, order async def main(): -# while True: -# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') - - # 初始化 Binance 客户端 - client = await AsyncClient.create() - # 使用期货的 BinanceSocketManager - bm = BinanceSocketManager(client, max_queue_size=100000) - - # 获取所有合约交易对 - exchange_info = await client.futures_exchange_info() - symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] - - # symbols = ['RVNUSDT', 'AXLUSDT'] - # 生成所有合约交易对的聚合交易流地址 - streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] - queue = asyncio.Queue() - async def batch_push(data): - new_data = { - 'items': data - } - await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH'); - # batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data") - process = batch_processor(queue, 10000, 0.1, batch_push) - asyncio.create_task(process) - try: - # 创建复合流 - multiplex_socket = bm.futures_multiplex_socket(streams) - async with multiplex_socket as stream: - while True: - res = await stream.recv() - data = res.get('data') - if data and data.get('e') == 'aggTrade': - #if data['s'] != '1000000BOBUSDT': - # continue - data = { - "s": data['s'], - "p": data['p'], - "T": data['T'], - "E": data['E'], - } - #if data['s'] in ['RVNUSDT', 'AXLUSDT']: - data['kt'] = int(time.time() * 1000) - await queue.put(data) - # await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']) - # asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) - - finally: - # 关闭 Kafka 生产者和 Binance 客户端 - # await _kafka_producer_singleton.stop_producer() - await client.close_connection() - + a1 = bn_agg_trade.main() + a2 = order.start() + res = asyncio.gather(a1, a2) + await res + if __name__ == "__main__": import uvloop uvloop.install()