diff --git a/comsumer/order.py b/comsumer/order.py new file mode 100644 index 0000000..e5b7259 --- /dev/null +++ b/comsumer/order.py @@ -0,0 +1,33 @@ +from binance import AsyncClient,BinanceSocketManager +import kafka_service +import asyncio +import json +from loguru import logger +async def start(): + pass + + +async def start_bn_future_order_consumer(account_id, api_key, api_secret): + client = await AsyncClient.create(api_key, api_secret, testnet=True,https_proxy="http://127.0.0.1:7890") + bm = BinanceSocketManager(client, max_queue_size=100000) + ws = bm.futures_user_socket() + + async with ws as stream: + while True: + msg = await ws.recv() + msg['x-account-id'] = account_id + asyncio.create_task(send_msg(msg)) + + +async def send_msg(msg): + logger.info(f"send to kafka: {msg}") + await kafka_service.send_to_kafka("binance_future_order_update", json.dumps(msg)) + logger.info(f"send to kafka success: {msg}") + + +if __name__ == "__main__": + import asyncio + api_key = "a47059edcf978d1e1f616634c2cce1b3ae22e741f745a5152b56c039e8bfa7a4" + secret = "6f69f45a686cfc7dae71c10ebd3419517582b0f4ce0253e6c21d853e8637c112" + asyncio.run(start_bn_future_order_consumer(1, api_key, secret)) + \ No newline at end of file diff --git a/kafka_service.py b/kafka_service.py index f5651b3..738d768 100644 --- a/kafka_service.py +++ b/kafka_service.py @@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092' class KafkaProducerSingleton: _instance = None _producer = None + _producer2 = None def __new__(cls): if cls._instance is None: @@ -23,20 +24,34 @@ class KafkaProducerSingleton: ) await self._producer.start() return self._producer + + async def get_producer2(self): + if self._producer2 is None: + self._producer2 = AIOKafkaProducer( + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + key_serializer=lambda k: k.encode('utf-8'), + value_serializer=lambda v: v.encode('utf-8'), + ) + await self._producer2.start() + return self._producer2 async def stop_producer(self): if self._producer is not None: await self._producer.stop() self._producer = None + if self._producer2 is not None: + await self._producer2.stop() + self._producer2 = None + + # 初始化单例实例 _kafka_producer_singleton = KafkaProducerSingleton() -async def send_to_kafka(topic, data, key=None): +async def send_to_kafka(topic, data, key=None, send_now=False): """将数据发送到 Kafka 主题""" - producer = await _kafka_producer_singleton.get_producer() - #print(topic, data, key) - try: - await producer.send(topic, data,key=key) - except Exception as e: - print("error: ", topic, data, key,e) + if send_now: + producer = await _kafka_producer_singleton.get_producer2() + else: + producer = await _kafka_producer_singleton.get_producer() + await producer.send(topic, data,key=key) \ No newline at end of file