Files
prefect-code/download_binance_kline.py

371 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import os
import pandas as pd
from io import BytesIO
import psycopg2
from psycopg2.extras import execute_values
import logging
from datetime import datetime
import xml.etree.ElementTree as ET
from download_unzip_csv import download_unzip_csv
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置参数
BASE_URL = "https://data.binance.vision/data/futures/um/monthly/klines/"
# 可以添加多个交易对,例如:["BOBUSDT", "BTCUSDT", "ETHUSDT"]
SYMBOLS = ["BOBUSDT", "BTCUSDT"] # BOBUSDT数据从2025-11开始可用
INTERVAL = "1d"
START_DATE = "2021-01"
END_DATE = datetime.now().strftime("%Y-%m")
# PostgreSQL配置
DB_CONFIG = {
"host": "localhost",
"database": "your_database",
"user": "your_username",
"password": "your_password",
"port": 5432
}
# K线数据列名
KLINE_COLUMNS = [
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
def download_kline_data(symbol, interval, year_month):
"""下载指定月份的K线数据"""
# 拆分年月
year, month = year_month.split("-")
filename = f"{symbol}-{interval}-{year}-{month}.zip"
url = f"{BASE_URL}{symbol}/{interval}/{filename}"
logger.info(f"Downloading {url}")
# 配置代理
proxies = {
'http': 'http://localhost:1080',
'https': 'http://localhost:1080'
}
try:
response = requests.get(url, proxies=proxies)
response.raise_for_status()
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {filename}: {e}")
return None
def parse_kline_data(zip_data, symbol):
"""解析下载的K线数据"""
try:
df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS)
# 跳过标题行
df = df[df["open_time"] != "open_time"]
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms')
# 转换数值列
numeric_columns = ["open", "high", "low", "close", "volume",
"quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
df[numeric_columns] = df[numeric_columns].astype(float)
df["number_of_trades"] = df["number_of_trades"].astype(int)
logger.info(f"Parsed {len(df)} rows of K线 data")
return df
except Exception as e:
logger.error(f"Failed to parse K线 data: {e}")
return None
def create_connection():
"""创建PostgreSQL连接"""
try:
conn = psycopg2.connect(**DB_CONFIG)
logger.info("Connected to PostgreSQL database")
return conn
except psycopg2.OperationalError as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
return None
def create_table(conn):
"""创建K线数据表"""
create_table_query = """
CREATE TABLE IF NOT EXISTS binance_kline (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
open_time TIMESTAMP NOT NULL,
open NUMERIC(18, 8) NOT NULL,
high NUMERIC(18, 8) NOT NULL,
low NUMERIC(18, 8) NOT NULL,
close NUMERIC(18, 8) NOT NULL,
volume NUMERIC(18, 8) NOT NULL,
close_time TIMESTAMP NOT NULL,
quote_asset_volume NUMERIC(18, 8) NOT NULL,
number_of_trades INTEGER NOT NULL,
taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL,
ignore NUMERIC(18, 8) NOT NULL,
UNIQUE(symbol, open_time)
);
"""
try:
with conn.cursor() as cur:
cur.execute(create_table_query)
conn.commit()
logger.info("Created binance_kline table")
except psycopg2.Error as e:
logger.error(f"Failed to create table: {e}")
conn.rollback()
def insert_data(conn, df):
"""将K线数据插入到数据库"""
# 准备插入数据
insert_query = """
INSERT INTO binance_kline (
symbol, open_time, open, high, low, close, volume,
close_time, quote_asset_volume, number_of_trades,
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
) VALUES %s
ON CONFLICT (symbol, open_time) DO NOTHING;
"""
# 转换DataFrame为元组列表
data = [tuple(row) for row in df[df.columns[1:]].to_numpy()] # 跳过id列
try:
with conn.cursor() as cur:
execute_values(cur, insert_query, data)
conn.commit()
logger.info(f"Inserted {len(data)} rows into database")
except psycopg2.Error as e:
logger.error(f"Failed to insert data: {e}")
conn.rollback()
def list_s3_files(url, timeout=10):
"""
从S3存储桶的XML响应中提取所有文件的完整URL
参数:
url: S3存储桶的列表URL例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/
timeout: 请求超时时间单位秒默认10秒
返回:
list: 完整的文件URL列表
"""
logger.info(f"Listing files from {url}")
# 配置代理
proxies = {
'http': 'http://localhost:1080',
'https': 'http://localhost:1080'
}
try:
response = requests.get(url, timeout=timeout, proxies=proxies)
response.raise_for_status()
# 解析XML响应
root = ET.fromstring(response.content)
# S3 XML使用的命名空间
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
# 基础下载URL
base_download_url = "https://data.binance.vision/"
# 提取所有Key元素的文本构建完整URL
file_urls = []
for key_elem in root.findall('.//s3:Key', ns):
file_path = key_elem.text
# 只返回zip文件
if file_path and file_path.endswith('.zip'):
# 构建完整URL
full_url = base_download_url + file_path
file_urls.append(full_url)
logger.info(f"Found {len(file_urls)} files")
return file_urls
except requests.exceptions.RequestException as e:
logger.error(f"Failed to list files from {url}: {e}")
return []
except ET.ParseError as e:
logger.error(f"Failed to parse XML response: {e}")
return []
except KeyboardInterrupt:
logger.error(f"Request to {url} was interrupted")
return []
def download_kline_data_by_url(url):
"""
通过完整URL下载K线数据
参数:
url: 完整的K线数据下载URL
返回:
BytesIO: 下载的数据或None如果下载失败
"""
logger.info(f"Downloading {url}")
# 配置代理
proxies = {
'http': 'http://localhost:1080',
'https': 'http://localhost:1080'
}
try:
response = requests.get(url, proxies=proxies)
response.raise_for_status()
filename = os.path.basename(url)
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {url}: {e}")
return None
def process_symbol(symbol, interval=INTERVAL):
"""
处理指定交易对的所有K线数据包括下载、解析、合并和插入数据库
参数:
symbol: 交易对,例如: "BTCUSDT"
interval: 时间间隔,例如: "1d"默认使用全局INTERVAL
返回:
pandas.DataFrame: 合并后的K线数据
"""
logger.info(f"Processing symbol: {symbol}, interval: {interval}")
# 组装S3列表URL
s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{interval}/"
# 获取所有可下载文件URL
file_urls = list_s3_files(s3_url)
if not file_urls:
logger.warning(f"No files found for {symbol}-{interval}")
return None
# 合并所有DataFrame
all_dfs = []
for file_url in file_urls:
if not file_url.endswith('.zip'):
continue
try:
# 调用download_unzip_csv下载并解析数据
df = download_unzip_csv(file_url, header=None, names=KLINE_COLUMNS)
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms')
all_dfs.append(df)
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows")
except Exception as e:
logger.error(f"Failed to process {file_url}: {e}")
continue
if not all_dfs:
logger.warning(f"No data processed for {symbol}-{interval}")
return None
# 合并所有DataFrame
merged_df = pd.concat(all_dfs, ignore_index=True)
logger.info(f"Merged {len(all_dfs)} files into a single DataFrame with {len(merged_df)} rows")
# 去重
merged_df = merged_df.drop_duplicates(subset=["symbol", "open_time"])
logger.info(f"After deduplication, {len(merged_df)} rows remain")
# 插入到PostgreSQL数据库
conn = create_connection()
if conn:
try:
# 确保表存在
create_table(conn)
# 插入数据
insert_data(conn, merged_df)
logger.info(f"Successfully inserted {len(merged_df)} rows into database for {symbol}")
finally:
# 关闭连接
conn.close()
return merged_df
def main():
# 创建数据库连接
conn = create_connection()
if not conn:
return
# 创建表
create_table(conn)
for symbol in SYMBOLS:
# 使用list_s3_files函数获取可用的文件URL列表
s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{INTERVAL}/"
file_urls = list_s3_files(s3_url)
if not file_urls:
logger.warning(f"No files found for {symbol}-{INTERVAL}")
continue
# 处理每个文件URL
for file_url in file_urls:
# 从URL中提取文件名
filename = os.path.basename(file_url)
# 检查文件名格式
if not filename.endswith('.zip'):
continue
# 解析文件名,提取交易对和年月信息
# 格式: symbol-interval-year-month.zip
parts = filename[:-4].split('-') # 移除.zip后缀并拆分
if len(parts) != 4:
logger.warning(f"Invalid filename format: {filename}")
continue
file_symbol, file_interval, year, month = parts
# 下载数据
zip_data = download_kline_data_by_url(file_url)
if not zip_data:
continue
# 解析数据
df = parse_kline_data(zip_data, file_symbol)
if df is None or df.empty:
continue
# 插入数据
insert_data(conn, df)
# 关闭连接
conn.close()
logger.info("Script completed successfully")
if __name__ == "__main__":
# 测试新添加的process_symbol函数
import sys
if len(sys.argv) > 1:
# 从命令行获取交易对
symbol = sys.argv[1]
interval = sys.argv[2] if len(sys.argv) > 2 else INTERVAL
process_symbol(symbol, interval)
else:
# 默认运行main函数
main()