From 85a2a4926153ed1530b98ae20e3b3698f5abf690 Mon Sep 17 00:00:00 2001 From: yhydev Date: Sat, 14 Jun 2025 13:15:50 +0000 Subject: [PATCH] init --- Dockerfile | 2 ++ aioredisinit.py | 53 +++++++++++++++++++++++++++++++++++++ batch_util.py | 47 +++++++++++++++++++++++++++++++++ kafka_service.py | 42 ++++++++++++++++++++++++++++++ main.py | 68 ++++++++++++++++++++++++++++++++++++++++++++++++ main.py.redis | 58 +++++++++++++++++++++++++++++++++++++++++ redis_service.py | 19 ++++++++++++++ 7 files changed, 289 insertions(+) create mode 100644 Dockerfile create mode 100644 aioredisinit.py create mode 100644 batch_util.py create mode 100644 kafka_service.py create mode 100644 main.py create mode 100644 main.py.redis create mode 100644 redis_service.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..23e7990 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,2 @@ +FROM python:3.11-slim +RUN pip install python-binance aiokafka aioredis py-spy uvloop diff --git a/aioredisinit.py b/aioredisinit.py new file mode 100644 index 0000000..42fa23b --- /dev/null +++ b/aioredisinit.py @@ -0,0 +1,53 @@ +import os +import pkgutil +import fileinput +import sys + +def replace_keyword_in_package_file(package_name, target_file, old_str, new_str): + """ + 替换指定包中某个文件的关键字 + + 参数: + package_name (str): 要查找的包名 + target_file (str): 要替换的文件名(包括相对路径) + old_str (str): 要被替换的关键字 + new_str (str): 替换后的新关键字 + """ + try: + # 获取包的路径 + package = pkgutil.get_loader(package_name) + if package is None: + print(f"错误: 找不到包 '{package_name}'") + return False + + package_path = os.path.dirname(package.get_filename()) + target_path = os.path.join(package_path, target_file) + + # 检查目标文件是否存在 + if not os.path.exists(target_path): + print(f"错误: 文件 '{target_file}' 不在包 '{package_name}' 的路径中") + return False + + # 执行替换操作 + print(f"正在处理文件: {target_path}") + print(f"将 '{old_str}' 替换为 '{new_str}'") + + replaced = False + with fileinput.FileInput(target_path, inplace=True, backup='.bak') as file: + for line in file: + if old_str in line: + replaced = True + print(line.replace(old_str, new_str), end='') + + if replaced: + print("替换完成!") + return True + else: + print(f"警告: 文件中未找到关键字 '{old_str}'") + return False + + except Exception as e: + print(f"发生错误: {str(e)}") + return False + +replace_keyword_in_package_file("aioredis", "exceptions.py", "builtins.TimeoutError,", "") diff --git a/batch_util.py b/batch_util.py new file mode 100644 index 0000000..598161e --- /dev/null +++ b/batch_util.py @@ -0,0 +1,47 @@ +import asyncio +from typing import Any, Callable, List, Coroutine + +async def batch_processor( + queue: asyncio.Queue, + batch_size: int, + linger_sec: float, + execute_fn: Callable[[List[Any]], Coroutine[Any, Any, Any]] # 标注为返回协程的函数 +): + """ + 异步批量处理器 + + 参数: + queue: 异步队列,从中获取待处理项 + batch_size: 触发处理的批量大小 + linger_sec: 最大等待时间(秒),超过此时间即使未达到batch_size也会触发处理 + execute_fn: 协程处理函数,接收一个批量的数据并返回一个协程 + + 返回: + 无,但会持续运行直到队列被关闭 + """ + batch = [] + last_time = asyncio.get_event_loop().time() + while True: + try: + # 设置超时时间为剩余等待时间 + now = asyncio.get_event_loop().time() + # print(now, last_time, linger_sec - (now - last_time)) + remaining_time = max(0, linger_sec - (now - last_time)) + # print(remaining_time) + item = await asyncio.wait_for(queue.get(), timeout=remaining_time) + batch.append(item) + + # 检查是否达到批量大小 + if len(batch) >= batch_size: + # print("batch_size_push", len(batch)) + await execute_fn(batch) # 直接 await 协程 + batch.clear() + last_time = asyncio.get_event_loop().time() + + except asyncio.TimeoutError: + # 超时触发处理 + if batch: + # print("timeout_push: ", len(batch)) + await execute_fn(batch) # 直接 await 协程 + batch.clear() + last_time = asyncio.get_event_loop().time() diff --git a/kafka_service.py b/kafka_service.py new file mode 100644 index 0000000..f5651b3 --- /dev/null +++ b/kafka_service.py @@ -0,0 +1,42 @@ +from aiokafka import AIOKafkaProducer + +# Kafka 配置 +KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092' +# KAFKA_TOPIC = 'binance_agg_trade' + +class KafkaProducerSingleton: + _instance = None + _producer = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(KafkaProducerSingleton, cls).__new__(cls) + return cls._instance + + async def get_producer(self): + if self._producer is None: + self._producer = AIOKafkaProducer( + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + key_serializer=lambda k: k.encode('utf-8'), + value_serializer=lambda v: v.encode('utf-8'), + linger_ms=100 + ) + await self._producer.start() + return self._producer + + async def stop_producer(self): + if self._producer is not None: + await self._producer.stop() + self._producer = None + +# 初始化单例实例 +_kafka_producer_singleton = KafkaProducerSingleton() + +async def send_to_kafka(topic, data, key=None): + """将数据发送到 Kafka 主题""" + producer = await _kafka_producer_singleton.get_producer() + #print(topic, data, key) + try: + await producer.send(topic, data,key=key) + except Exception as e: + print("error: ", topic, data, key,e) diff --git a/main.py b/main.py new file mode 100644 index 0000000..f26ec6f --- /dev/null +++ b/main.py @@ -0,0 +1,68 @@ +import asyncio +import aioredisinit +import json +from binance import AsyncClient,BinanceSocketManager +# from .kafka_service import send_to_kafka, _kafka_producer_singleton +import kafka_service +import redis_service +from batch_util import batch_processor +from functools import partial +import time +import uvloop +async def main(): +# while True: +# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') + + # 初始化 Binance 客户端 + client = await AsyncClient.create() + # 使用期货的 BinanceSocketManager + bm = BinanceSocketManager(client, max_queue_size=100000) + + # 获取所有合约交易对 + exchange_info = await client.futures_exchange_info() + symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] + + # symbols = ['RVNUSDT', 'AXLUSDT'] + # 生成所有合约交易对的聚合交易流地址 + streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] + queue = asyncio.Queue() + async def batch_push(data): + new_data = { + 'items': data + } + await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH'); + # batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data") + process = batch_processor(queue, 10000, 0.1, batch_push) + asyncio.create_task(process) + try: + # 创建复合流 + multiplex_socket = bm.futures_multiplex_socket(streams) + async with multiplex_socket as stream: + while True: + res = await stream.recv() + data = res.get('data') + if data and data.get('e') == 'aggTrade': + #if data['s'] != '1000000BOBUSDT': + # continue + data = { + "s": data['s'], + "p": data['p'], + "T": data['T'], + "E": data['E'], + } + #if data['s'] in ['RVNUSDT', 'AXLUSDT']: + data['kt'] = int(time.time() * 1000) + await queue.put(data) + # await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']) + # asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) + + finally: + # 关闭 Kafka 生产者和 Binance 客户端 + # await _kafka_producer_singleton.stop_producer() + await client.close_connection() + +if __name__ == "__main__": + import uvloop + uvloop.install() + asyncio.run(main()) + diff --git a/main.py.redis b/main.py.redis new file mode 100644 index 0000000..ce7bbac --- /dev/null +++ b/main.py.redis @@ -0,0 +1,58 @@ +import asyncio +import aioredisinit +import json +from binance import AsyncClient,BinanceSocketManager +# from .kafka_service import send_to_kafka, _kafka_producer_singleton +import kafka_service +import redis_service +from batch_util import batch_processor +from functools import partial +import time +async def main(): +# while True: +# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa') + + # 初始化 Binance 客户端 + client = await AsyncClient.create() + # 使用期货的 BinanceSocketManager + bm = BinanceSocketManager(client, max_queue_size=100000) + + # 获取所有合约交易对 + exchange_info = await client.futures_exchange_info() + symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL'] + + # 生成所有合约交易对的聚合交易流地址 + streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')] + queue = asyncio.Queue() + batch_push = partial(redis_service.push, "agg_trade_data") + process = batch_processor(queue, 100, 0.01, batch_push) + asyncio.create_task(process) + try: + # 创建复合流 + multiplex_socket = bm.futures_multiplex_socket(streams) + async with multiplex_socket as stream: + while True: + res = await stream.recv() + data = res.get('data') + if data and data.get('e') == 'aggTrade': + #asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])) + t = int(time.time() * 1000) + data = { + 's':data['s'], + 'p':data['p'], + 'T':data['T'], + 'E':data['E'], + 't': t, + } + + #t = redis_service.push('agg_trade_data', data) + await queue.put(data) + + finally: + # 关闭 Kafka 生产者和 Binance 客户端 + # await _kafka_producer_singleton.stop_producer() + await client.close_connection() + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/redis_service.py b/redis_service.py new file mode 100644 index 0000000..5d73416 --- /dev/null +++ b/redis_service.py @@ -0,0 +1,19 @@ +import aioredis +redis = aioredis.from_url( + "redis://:redispasswd..QAQ@127.0.0.1:6379/0" + ) + +async def push(stream, data): + if isinstance(data, list): + async with redis.pipeline() as pipe: + for i in data: + await pipe.xadd(stream, i, maxlen=200000) + await pipe.execute() + return + async with redis as conn: + await conn.xadd(stream, data, maxlen=200000) + + +if __name__ == "__main__": + import asyncio + asyncio.run(push_agg_trade_data({"s": "BTCUSDT", "T": 1680895800000, "p": "10000"}))