from aiokafka import AIOKafkaProducer import os kafka_nodes = os.environ.get("kafka.nodes","redpanda-0:19092") # Kafka 配置 KAFKA_BOOTSTRAP_SERVERS = kafka_nodes # KAFKA_TOPIC = 'binance_agg_trade' class KafkaProducerSingleton: _instance = None _producer = None _producer2 = None def __new__(cls): if cls._instance is None: cls._instance = super(KafkaProducerSingleton, cls).__new__(cls) return cls._instance async def get_producer(self): if self._producer is None: self._producer = AIOKafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, key_serializer=lambda k: k.encode('utf-8'), value_serializer=lambda v: v.encode('utf-8'), linger_ms=100 ) await self._producer.start() return self._producer async def get_producer2(self): if self._producer2 is None: self._producer2 = AIOKafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, key_serializer=lambda k: k.encode('utf-8'), value_serializer=lambda v: v.encode('utf-8'), ) await self._producer2.start() return self._producer2 async def stop_producer(self): if self._producer is not None: await self._producer.stop() self._producer = None if self._producer2 is not None: await self._producer2.stop() self._producer2 = None # 初始化单例实例 _kafka_producer_singleton = KafkaProducerSingleton() async def send_to_kafka(topic, data, key=None, send_now=False): """将数据发送到 Kafka 主题""" if send_now: producer = await _kafka_producer_singleton.get_producer2() else: producer = await _kafka_producer_singleton.get_producer() await producer.send(topic, data,key=key)