This commit is contained in:
yhydev
2025-06-14 13:15:50 +00:00
commit 85a2a49261
7 changed files with 289 additions and 0 deletions

2
Dockerfile Normal file
View File

@@ -0,0 +1,2 @@
FROM python:3.11-slim
RUN pip install python-binance aiokafka aioredis py-spy uvloop

53
aioredisinit.py Normal file
View File

@@ -0,0 +1,53 @@
import os
import pkgutil
import fileinput
import sys
def replace_keyword_in_package_file(package_name, target_file, old_str, new_str):
"""
替换指定包中某个文件的关键字
参数:
package_name (str): 要查找的包名
target_file (str): 要替换的文件名(包括相对路径)
old_str (str): 要被替换的关键字
new_str (str): 替换后的新关键字
"""
try:
# 获取包的路径
package = pkgutil.get_loader(package_name)
if package is None:
print(f"错误: 找不到包 '{package_name}'")
return False
package_path = os.path.dirname(package.get_filename())
target_path = os.path.join(package_path, target_file)
# 检查目标文件是否存在
if not os.path.exists(target_path):
print(f"错误: 文件 '{target_file}' 不在包 '{package_name}' 的路径中")
return False
# 执行替换操作
print(f"正在处理文件: {target_path}")
print(f"'{old_str}' 替换为 '{new_str}'")
replaced = False
with fileinput.FileInput(target_path, inplace=True, backup='.bak') as file:
for line in file:
if old_str in line:
replaced = True
print(line.replace(old_str, new_str), end='')
if replaced:
print("替换完成!")
return True
else:
print(f"警告: 文件中未找到关键字 '{old_str}'")
return False
except Exception as e:
print(f"发生错误: {str(e)}")
return False
replace_keyword_in_package_file("aioredis", "exceptions.py", "builtins.TimeoutError,", "")

47
batch_util.py Normal file
View File

@@ -0,0 +1,47 @@
import asyncio
from typing import Any, Callable, List, Coroutine
async def batch_processor(
queue: asyncio.Queue,
batch_size: int,
linger_sec: float,
execute_fn: Callable[[List[Any]], Coroutine[Any, Any, Any]] # 标注为返回协程的函数
):
"""
异步批量处理器
参数:
queue: 异步队列,从中获取待处理项
batch_size: 触发处理的批量大小
linger_sec: 最大等待时间(秒)超过此时间即使未达到batch_size也会触发处理
execute_fn: 协程处理函数,接收一个批量的数据并返回一个协程
返回:
无,但会持续运行直到队列被关闭
"""
batch = []
last_time = asyncio.get_event_loop().time()
while True:
try:
# 设置超时时间为剩余等待时间
now = asyncio.get_event_loop().time()
# print(now, last_time, linger_sec - (now - last_time))
remaining_time = max(0, linger_sec - (now - last_time))
# print(remaining_time)
item = await asyncio.wait_for(queue.get(), timeout=remaining_time)
batch.append(item)
# 检查是否达到批量大小
if len(batch) >= batch_size:
# print("batch_size_push", len(batch))
await execute_fn(batch) # 直接 await 协程
batch.clear()
last_time = asyncio.get_event_loop().time()
except asyncio.TimeoutError:
# 超时触发处理
if batch:
# print("timeout_push: ", len(batch))
await execute_fn(batch) # 直接 await 协程
batch.clear()
last_time = asyncio.get_event_loop().time()

42
kafka_service.py Normal file
View File

@@ -0,0 +1,42 @@
from aiokafka import AIOKafkaProducer
# Kafka 配置
KAFKA_BOOTSTRAP_SERVERS = '127.0.0.1:19092'
# KAFKA_TOPIC = 'binance_agg_trade'
class KafkaProducerSingleton:
_instance = None
_producer = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(KafkaProducerSingleton, cls).__new__(cls)
return cls._instance
async def get_producer(self):
if self._producer is None:
self._producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: v.encode('utf-8'),
linger_ms=100
)
await self._producer.start()
return self._producer
async def stop_producer(self):
if self._producer is not None:
await self._producer.stop()
self._producer = None
# 初始化单例实例
_kafka_producer_singleton = KafkaProducerSingleton()
async def send_to_kafka(topic, data, key=None):
"""将数据发送到 Kafka 主题"""
producer = await _kafka_producer_singleton.get_producer()
#print(topic, data, key)
try:
await producer.send(topic, data,key=key)
except Exception as e:
print("error: ", topic, data, key,e)

68
main.py Normal file
View File

@@ -0,0 +1,68 @@
import asyncio
import aioredisinit
import json
from binance import AsyncClient,BinanceSocketManager
# from .kafka_service import send_to_kafka, _kafka_producer_singleton
import kafka_service
import redis_service
from batch_util import batch_processor
from functools import partial
import time
import uvloop
async def main():
# while True:
# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa')
# 初始化 Binance 客户端
client = await AsyncClient.create()
# 使用期货的 BinanceSocketManager
bm = BinanceSocketManager(client, max_queue_size=100000)
# 获取所有合约交易对
exchange_info = await client.futures_exchange_info()
symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL']
# symbols = ['RVNUSDT', 'AXLUSDT']
# 生成所有合约交易对的聚合交易流地址
streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')]
queue = asyncio.Queue()
async def batch_push(data):
new_data = {
'items': data
}
await kafka_service.send_to_kafka('bn_agg_trade', json.dumps(new_data), 'BATCH');
# batch_push = partial(kafka_service.send_to_kafka, "agg_trade_data")
process = batch_processor(queue, 10000, 0.1, batch_push)
asyncio.create_task(process)
try:
# 创建复合流
multiplex_socket = bm.futures_multiplex_socket(streams)
async with multiplex_socket as stream:
while True:
res = await stream.recv()
data = res.get('data')
if data and data.get('e') == 'aggTrade':
#if data['s'] != '1000000BOBUSDT':
# continue
data = {
"s": data['s'],
"p": data['p'],
"T": data['T'],
"E": data['E'],
}
#if data['s'] in ['RVNUSDT', 'AXLUSDT']:
data['kt'] = int(time.time() * 1000)
await queue.put(data)
# await kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s'])
# asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']))
finally:
# 关闭 Kafka 生产者和 Binance 客户端
# await _kafka_producer_singleton.stop_producer()
await client.close_connection()
if __name__ == "__main__":
import uvloop
uvloop.install()
asyncio.run(main())

58
main.py.redis Normal file
View File

@@ -0,0 +1,58 @@
import asyncio
import aioredisinit
import json
from binance import AsyncClient,BinanceSocketManager
# from .kafka_service import send_to_kafka, _kafka_producer_singleton
import kafka_service
import redis_service
from batch_util import batch_processor
from functools import partial
import time
async def main():
# while True:
# await kafka_service.send_to_kafka("bn_agg_trade", "{}", 'aa')
# 初始化 Binance 客户端
client = await AsyncClient.create()
# 使用期货的 BinanceSocketManager
bm = BinanceSocketManager(client, max_queue_size=100000)
# 获取所有合约交易对
exchange_info = await client.futures_exchange_info()
symbols = [symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['status'] == 'TRADING' and symbol['contractType'] == 'PERPETUAL']
# 生成所有合约交易对的聚合交易流地址
streams = [f"{symbol.lower()}@aggTrade" for symbol in symbols if symbol.endswith('USDT')]
queue = asyncio.Queue()
batch_push = partial(redis_service.push, "agg_trade_data")
process = batch_processor(queue, 100, 0.01, batch_push)
asyncio.create_task(process)
try:
# 创建复合流
multiplex_socket = bm.futures_multiplex_socket(streams)
async with multiplex_socket as stream:
while True:
res = await stream.recv()
data = res.get('data')
if data and data.get('e') == 'aggTrade':
#asyncio.create_task(kafka_service.send_to_kafka("bn_agg_trade",json.dumps(data), data['s']))
t = int(time.time() * 1000)
data = {
's':data['s'],
'p':data['p'],
'T':data['T'],
'E':data['E'],
't': t,
}
#t = redis_service.push('agg_trade_data', data)
await queue.put(data)
finally:
# 关闭 Kafka 生产者和 Binance 客户端
# await _kafka_producer_singleton.stop_producer()
await client.close_connection()
if __name__ == "__main__":
asyncio.run(main())

19
redis_service.py Normal file
View File

@@ -0,0 +1,19 @@
import aioredis
redis = aioredis.from_url(
"redis://:redispasswd..QAQ@127.0.0.1:6379/0"
)
async def push(stream, data):
if isinstance(data, list):
async with redis.pipeline() as pipe:
for i in data:
await pipe.xadd(stream, i, maxlen=200000)
await pipe.execute()
return
async with redis as conn:
await conn.xadd(stream, data, maxlen=200000)
if __name__ == "__main__":
import asyncio
asyncio.run(push_agg_trade_data({"s": "BTCUSDT", "T": 1680895800000, "p": "10000"}))