Files
quant-data-collect/kafka_service.py
2025-06-15 01:44:06 +00:00

57 lines
1.8 KiB
Python

from aiokafka import AIOKafkaProducer
# Kafka 配置
KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092'
# KAFKA_TOPIC = 'binance_agg_trade'
class KafkaProducerSingleton:
_instance = None
_producer = None
_producer2 = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(KafkaProducerSingleton, cls).__new__(cls)
return cls._instance
async def get_producer(self):
if self._producer is None:
self._producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: v.encode('utf-8'),
linger_ms=100
)
await self._producer.start()
return self._producer
async def get_producer2(self):
if self._producer2 is None:
self._producer2 = AIOKafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: v.encode('utf-8'),
)
await self._producer2.start()
return self._producer2
async def stop_producer(self):
if self._producer is not None:
await self._producer.stop()
self._producer = None
if self._producer2 is not None:
await self._producer2.stop()
self._producer2 = None
# 初始化单例实例
_kafka_producer_singleton = KafkaProducerSingleton()
async def send_to_kafka(topic, data, key=None, send_now=False):
"""将数据发送到 Kafka 主题"""
if send_now:
producer = await _kafka_producer_singleton.get_producer2()
else:
producer = await _kafka_producer_singleton.get_producer()
await producer.send(topic, data,key=key)