优化代码,添加订单处理
This commit is contained in:
33
comsumer/order.py
Normal file
33
comsumer/order.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
from binance import AsyncClient,BinanceSocketManager
|
||||||
|
import kafka_service
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from loguru import logger
|
||||||
|
async def start():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def start_bn_future_order_consumer(account_id, api_key, api_secret):
|
||||||
|
client = await AsyncClient.create(api_key, api_secret, testnet=True,https_proxy="http://127.0.0.1:7890")
|
||||||
|
bm = BinanceSocketManager(client, max_queue_size=100000)
|
||||||
|
ws = bm.futures_user_socket()
|
||||||
|
|
||||||
|
async with ws as stream:
|
||||||
|
while True:
|
||||||
|
msg = await ws.recv()
|
||||||
|
msg['x-account-id'] = account_id
|
||||||
|
asyncio.create_task(send_msg(msg))
|
||||||
|
|
||||||
|
|
||||||
|
async def send_msg(msg):
|
||||||
|
logger.info(f"send to kafka: {msg}")
|
||||||
|
await kafka_service.send_to_kafka("binance_future_order_update", json.dumps(msg))
|
||||||
|
logger.info(f"send to kafka success: {msg}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import asyncio
|
||||||
|
api_key = "a47059edcf978d1e1f616634c2cce1b3ae22e741f745a5152b56c039e8bfa7a4"
|
||||||
|
secret = "6f69f45a686cfc7dae71c10ebd3419517582b0f4ce0253e6c21d853e8637c112"
|
||||||
|
asyncio.run(start_bn_future_order_consumer(1, api_key, secret))
|
||||||
|
|
||||||
@@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092'
|
|||||||
class KafkaProducerSingleton:
|
class KafkaProducerSingleton:
|
||||||
_instance = None
|
_instance = None
|
||||||
_producer = None
|
_producer = None
|
||||||
|
_producer2 = None
|
||||||
|
|
||||||
def __new__(cls):
|
def __new__(cls):
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
@@ -24,19 +25,33 @@ class KafkaProducerSingleton:
|
|||||||
await self._producer.start()
|
await self._producer.start()
|
||||||
return self._producer
|
return self._producer
|
||||||
|
|
||||||
|
async def get_producer2(self):
|
||||||
|
if self._producer2 is None:
|
||||||
|
self._producer2 = AIOKafkaProducer(
|
||||||
|
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
|
||||||
|
key_serializer=lambda k: k.encode('utf-8'),
|
||||||
|
value_serializer=lambda v: v.encode('utf-8'),
|
||||||
|
)
|
||||||
|
await self._producer2.start()
|
||||||
|
return self._producer2
|
||||||
|
|
||||||
async def stop_producer(self):
|
async def stop_producer(self):
|
||||||
if self._producer is not None:
|
if self._producer is not None:
|
||||||
await self._producer.stop()
|
await self._producer.stop()
|
||||||
self._producer = None
|
self._producer = None
|
||||||
|
if self._producer2 is not None:
|
||||||
|
await self._producer2.stop()
|
||||||
|
self._producer2 = None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 初始化单例实例
|
# 初始化单例实例
|
||||||
_kafka_producer_singleton = KafkaProducerSingleton()
|
_kafka_producer_singleton = KafkaProducerSingleton()
|
||||||
|
|
||||||
async def send_to_kafka(topic, data, key=None):
|
async def send_to_kafka(topic, data, key=None, send_now=False):
|
||||||
"""将数据发送到 Kafka 主题"""
|
"""将数据发送到 Kafka 主题"""
|
||||||
|
if send_now:
|
||||||
|
producer = await _kafka_producer_singleton.get_producer2()
|
||||||
|
else:
|
||||||
producer = await _kafka_producer_singleton.get_producer()
|
producer = await _kafka_producer_singleton.get_producer()
|
||||||
#print(topic, data, key)
|
|
||||||
try:
|
|
||||||
await producer.send(topic, data,key=key)
|
await producer.send(topic, data,key=key)
|
||||||
except Exception as e:
|
|
||||||
print("error: ", topic, data, key,e)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user