Files
prefect-code/download_binance_kline.py

374 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import os
import pandas as pd
from io import BytesIO
import psycopg2
from psycopg2.extras import execute_values
import logging
from datetime import datetime
import xml.etree.ElementTree as ET
from download_unzip_csv import download_unzip_csv
from dynaconf import Dynaconf
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 初始化Dynaconf配置
settings = Dynaconf(
envvar_prefix="DYNACONF",
settings_files=["settings.toml"],
environments=True,
load_dotenv=False,
default_env="default",
env_switcher="BINANCE_ENV"
)
# 配置参数
BASE_URL = settings.BASE_URL
SYMBOLS = settings.SYMBOLS
INTERVAL = settings.INTERVAL
# 代理配置
PROXIES = {
'http': settings.HTTP_PROXY,
'https': settings.HTTPS_PROXY
} if settings.PROXY_ENABLED else None
# PostgreSQL配置
DB_CONFIG = {
"host": settings.DB_HOST,
"database": settings.DB_DATABASE,
"user": settings.DB_USER,
"password": settings.DB_PASSWORD,
"port": settings.DB_PORT
}
# K线数据列名
KLINE_COLUMNS = settings.KLINE_COLUMNS
def download_kline_data(symbol, interval, year_month):
"""下载指定月份的K线数据"""
# 拆分年月
year, month = year_month.split("-")
filename = f"{symbol}-{interval}-{year}-{month}.zip"
url = f"{BASE_URL}{symbol}/{interval}/{filename}"
logger.info(f"Downloading {url}")
try:
response = requests.get(url)
response.raise_for_status()
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {filename}: {e}")
return None
def parse_kline_data(zip_data, symbol):
"""解析下载的K线数据"""
try:
df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS)
# 跳过标题行
df = df[df["open_time"] != "open_time"]
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms')
# 转换数值列
numeric_columns = ["open", "high", "low", "close", "volume",
"quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
df[numeric_columns] = df[numeric_columns].astype(float)
df["number_of_trades"] = df["number_of_trades"].astype(int)
logger.info(f"Parsed {len(df)} rows of K线 data")
return df
except Exception as e:
logger.error(f"Failed to parse K线 data: {e}")
return None
def create_connection():
"""创建PostgreSQL连接"""
try:
conn = psycopg2.connect(**DB_CONFIG)
logger.info("Connected to PostgreSQL database")
return conn
except psycopg2.OperationalError as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
return None
def create_table(conn):
"""创建K线数据表"""
create_table_query = """
CREATE TABLE IF NOT EXISTS binance_kline (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
open_time TIMESTAMP NOT NULL,
open NUMERIC(18, 8) NOT NULL,
high NUMERIC(18, 8) NOT NULL,
low NUMERIC(18, 8) NOT NULL,
close NUMERIC(18, 8) NOT NULL,
volume NUMERIC(18, 8) NOT NULL,
close_time TIMESTAMP NOT NULL,
quote_asset_volume NUMERIC(18, 8) NOT NULL,
number_of_trades INTEGER NOT NULL,
taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL,
ignore NUMERIC(18, 8) NOT NULL,
UNIQUE(symbol, open_time)
);
"""
try:
with conn.cursor() as cur:
cur.execute(create_table_query)
conn.commit()
logger.info("Created binance_kline table")
except psycopg2.Error as e:
logger.error(f"Failed to create table: {e}")
conn.rollback()
def insert_data(conn, df):
"""将K线数据插入到数据库避免重复插入"""
if df.empty:
logger.info("No data to insert")
return
# 定义要插入的列顺序
insert_columns = [
"symbol", "open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
# 1. 提取所有待插入的(symbol, open_time)组合
symbol_time_pairs = df[['symbol', 'open_time']].values.tolist()
# 2. 查询数据库中已存在的(symbol, open_time)组合
if symbol_time_pairs:
# 构建查询条件使用VALUES列表
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs])
check_query = f"""
SELECT symbol, open_time FROM binance_kline
WHERE (symbol, open_time) IN ({values_list})
"""
try:
with conn.cursor() as cur:
cur.execute(check_query)
existing_pairs = cur.fetchall()
# 转换为set以便快速查找
existing_set = set((row[0], row[1]) for row in existing_pairs)
logger.info(f"Found {len(existing_set)} existing records in database")
# 3. 过滤DataFrame只保留不存在的数据
df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)]
if df_filtered.empty:
logger.info("All records already exist in database, skipping insertion")
return
logger.info(f"Filtered to {len(df_filtered)} new records to insert")
except psycopg2.Error as e:
logger.error(f"Failed to check existing data: {e}")
logger.info("Falling back to ON CONFLICT mechanism")
df_filtered = df
else:
df_filtered = df
# 4. 准备插入数据
insert_query = """
INSERT INTO binance_kline (
symbol, open_time, open, high, low, close, volume,
close_time, quote_asset_volume, number_of_trades,
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
) VALUES %s
ON CONFLICT (symbol, open_time) DO NOTHING;
"""
data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()]
# 5. 执行插入
try:
with conn.cursor() as cur:
execute_values(cur, insert_query, data)
conn.commit()
logger.info(f"Inserted {len(data)} new rows into database")
except psycopg2.Error as e:
logger.error(f"Failed to insert data: {e}")
conn.rollback()
def list_s3_files(url, timeout=10):
"""
从S3存储桶的XML响应中提取所有文件的完整URL
参数:
url: S3存储桶的列表URL例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/
timeout: 请求超时时间单位秒默认10秒
返回:
list: 完整的文件URL列表
"""
logger.info(f"Listing files from {url}")
try:
# 根据配置决定是否使用代理
if PROXIES:
response = requests.get(url, timeout=timeout, proxies=PROXIES)
else:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
# 解析XML响应
root = ET.fromstring(response.content)
# S3 XML使用的命名空间
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
# 基础下载URL
base_download_url = "https://data.binance.vision/"
# 提取所有Key元素的文本构建完整URL
file_urls = []
for key_elem in root.findall('.//s3:Key', ns):
file_path = key_elem.text
# 只返回zip文件
if file_path and file_path.endswith('.zip'):
# 构建完整URL
full_url = base_download_url + file_path
file_urls.append(full_url)
logger.info(f"Found {len(file_urls)} files")
return file_urls
except requests.exceptions.RequestException as e:
logger.error(f"Failed to list files from {url}: {e}")
return []
except ET.ParseError as e:
logger.error(f"Failed to parse XML response: {e}")
return []
except KeyboardInterrupt:
logger.error(f"Request to {url} was interrupted")
return []
def download_kline_data_by_url(url):
"""
通过完整URL下载K线数据
参数:
url: 完整的K线数据下载URL
返回:
BytesIO: 下载的数据或None如果下载失败
"""
logger.info(f"Downloading {url}")
try:
response = requests.get(url)
response.raise_for_status()
filename = os.path.basename(url)
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {url}: {e}")
return None
def process_symbol(symbol, interval=INTERVAL):
"""
处理指定交易对的所有K线数据包括下载、解析、合并和插入数据库
参数:
symbol: 交易对,例如: "BTCUSDT"
interval: 时间间隔,例如: "1d"默认使用全局INTERVAL
返回:
pandas.DataFrame: 合并后的K线数据
"""
logger.info(f"Processing symbol: {symbol}, interval: {interval}")
# 组装S3列表URL
s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{interval}/"
# 获取所有可下载文件URL
file_urls = list_s3_files(s3_url)
if not file_urls:
logger.warning(f"No files found for {symbol}-{interval}")
return None
# 合并所有DataFrame
all_dfs = []
for file_url in file_urls:
if not file_url.endswith('.zip'):
continue
try:
# 调用download_unzip_csv下载并解析数据
df = download_unzip_csv(file_url, header=None, names=KLINE_COLUMNS)
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime先转换为数值类型以避免FutureWarning
df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms')
all_dfs.append(df)
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows")
except Exception as e:
logger.error(f"Failed to process {file_url}: {e}")
continue
if not all_dfs:
logger.warning(f"No data processed for {symbol}-{interval}")
return None
# 合并所有DataFrame
merged_df = pd.concat(all_dfs, ignore_index=True)
logger.info(f"Merged {len(all_dfs)} files into a single DataFrame with {len(merged_df)} rows")
# 去重
merged_df = merged_df.drop_duplicates(subset=["symbol", "open_time"])
logger.info(f"After deduplication, {len(merged_df)} rows remain")
# 插入到PostgreSQL数据库
conn = create_connection()
if conn:
try:
# 确保表存在
create_table(conn)
# 插入数据
insert_data(conn, merged_df)
logger.info(f"Successfully inserted {len(merged_df)} rows into database for {symbol}")
finally:
# 关闭连接
conn.close()
return merged_df
def main():
"""主函数,处理所有配置的交易对"""
logger.info("Starting main process for symbols: %s", SYMBOLS)
for symbol in SYMBOLS:
try:
process_symbol(symbol, INTERVAL)
except Exception as e:
logger.error(f"Failed to process symbol {symbol}: {e}")
logger.info("Script completed successfully")
if __name__ == "__main__":
# 测试新添加的process_symbol函数
import sys
if len(sys.argv) > 1:
# 从命令行获取交易对
symbol = sys.argv[1]
interval = sys.argv[2] if len(sys.argv) > 2 else INTERVAL
process_symbol(symbol, interval)
else:
# 默认运行main函数
main()