From 739133bf687d65427bd4e87348ddc6541eb1d888 Mon Sep 17 00:00:00 2001 From: "yhydev@outlook.com" Date: Sun, 15 Jun 2025 01:44:06 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=AE=A2=E5=8D=95=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comsumer/order.py | 33 +++++++++++++++++++++++++++++++++ kafka_service.py | 29 ++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 comsumer/order.py diff --git a/comsumer/order.py b/comsumer/order.py new file mode 100644 index 0000000..e5b7259 --- /dev/null +++ b/comsumer/order.py @@ -0,0 +1,33 @@ +from binance import AsyncClient,BinanceSocketManager +import kafka_service +import asyncio +import json +from loguru import logger +async def start(): + pass + + +async def start_bn_future_order_consumer(account_id, api_key, api_secret): + client = await AsyncClient.create(api_key, api_secret, testnet=True,https_proxy="http://127.0.0.1:7890") + bm = BinanceSocketManager(client, max_queue_size=100000) + ws = bm.futures_user_socket() + + async with ws as stream: + while True: + msg = await ws.recv() + msg['x-account-id'] = account_id + asyncio.create_task(send_msg(msg)) + + +async def send_msg(msg): + logger.info(f"send to kafka: {msg}") + await kafka_service.send_to_kafka("binance_future_order_update", json.dumps(msg)) + logger.info(f"send to kafka success: {msg}") + + +if __name__ == "__main__": + import asyncio + api_key = "a47059edcf978d1e1f616634c2cce1b3ae22e741f745a5152b56c039e8bfa7a4" + secret = "6f69f45a686cfc7dae71c10ebd3419517582b0f4ce0253e6c21d853e8637c112" + asyncio.run(start_bn_future_order_consumer(1, api_key, secret)) + \ No newline at end of file diff --git a/kafka_service.py b/kafka_service.py index f5651b3..738d768 100644 --- a/kafka_service.py +++ b/kafka_service.py @@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092' class KafkaProducerSingleton: _instance = None _producer = None + _producer2 = None def __new__(cls): if cls._instance is None: @@ -23,20 +24,34 @@ class KafkaProducerSingleton: ) await self._producer.start() return self._producer + + async def get_producer2(self): + if self._producer2 is None: + self._producer2 = AIOKafkaProducer( + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + key_serializer=lambda k: k.encode('utf-8'), + value_serializer=lambda v: v.encode('utf-8'), + ) + await self._producer2.start() + return self._producer2 async def stop_producer(self): if self._producer is not None: await self._producer.stop() self._producer = None + if self._producer2 is not None: + await self._producer2.stop() + self._producer2 = None + + # 初始化单例实例 _kafka_producer_singleton = KafkaProducerSingleton() -async def send_to_kafka(topic, data, key=None): +async def send_to_kafka(topic, data, key=None, send_now=False): """将数据发送到 Kafka 主题""" - producer = await _kafka_producer_singleton.get_producer() - #print(topic, data, key) - try: - await producer.send(topic, data,key=key) - except Exception as e: - print("error: ", topic, data, key,e) + if send_now: + producer = await _kafka_producer_singleton.get_producer2() + else: + producer = await _kafka_producer_singleton.get_producer() + await producer.send(topic, data,key=key) \ No newline at end of file