import requests import os import pandas as pd from io import BytesIO import psycopg2 from psycopg2.extras import execute_values import logging from datetime import datetime import xml.etree.ElementTree as ET # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 配置参数 BASE_URL = "https://data.binance.vision/data/futures/um/monthly/klines/" # 可以添加多个交易对,例如:["BOBUSDT", "BTCUSDT", "ETHUSDT"] SYMBOLS = ["BOBUSDT", "BTCUSDT"] # BOBUSDT数据从2025-11开始可用 INTERVAL = "1d" START_DATE = "2021-01" END_DATE = datetime.now().strftime("%Y-%m") # PostgreSQL配置 DB_CONFIG = { "host": "localhost", "database": "your_database", "user": "your_username", "password": "your_password", "port": 5432 } # K线数据列名 KLINE_COLUMNS = [ "open_time", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ] def download_kline_data(symbol, interval, year_month): """下载指定月份的K线数据""" # 拆分年月 year, month = year_month.split("-") filename = f"{symbol}-{interval}-{year}-{month}.zip" url = f"{BASE_URL}{symbol}/{interval}/{filename}" logger.info(f"Downloading {url}") # 配置代理 proxies = { 'http': 'http://localhost:1080', 'https': 'http://localhost:1080' } try: response = requests.get(url, proxies=proxies) response.raise_for_status() logger.info(f"Downloaded {filename} successfully") return BytesIO(response.content) except requests.exceptions.RequestException as e: logger.error(f"Failed to download {filename}: {e}") return None def parse_kline_data(zip_data, symbol): """解析下载的K线数据""" try: df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS) # 跳过标题行 df = df[df["open_time"] != "open_time"] # 添加symbol列 df["symbol"] = symbol # 转换时间戳为datetime df["open_time"] = pd.to_datetime(df["open_time"], unit='ms') df["close_time"] = pd.to_datetime(df["close_time"], unit='ms') # 转换数值列 numeric_columns = ["open", "high", "low", "close", "volume", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"] df[numeric_columns] = df[numeric_columns].astype(float) df["number_of_trades"] = df["number_of_trades"].astype(int) logger.info(f"Parsed {len(df)} rows of K线 data") return df except Exception as e: logger.error(f"Failed to parse K线 data: {e}") return None def create_connection(): """创建PostgreSQL连接""" try: conn = psycopg2.connect(**DB_CONFIG) logger.info("Connected to PostgreSQL database") return conn except psycopg2.OperationalError as e: logger.error(f"Failed to connect to PostgreSQL: {e}") return None def create_table(conn): """创建K线数据表""" create_table_query = """ CREATE TABLE IF NOT EXISTS binance_kline ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, open_time TIMESTAMP NOT NULL, open NUMERIC(18, 8) NOT NULL, high NUMERIC(18, 8) NOT NULL, low NUMERIC(18, 8) NOT NULL, close NUMERIC(18, 8) NOT NULL, volume NUMERIC(18, 8) NOT NULL, close_time TIMESTAMP NOT NULL, quote_asset_volume NUMERIC(18, 8) NOT NULL, number_of_trades INTEGER NOT NULL, taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL, taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL, ignore NUMERIC(18, 8) NOT NULL, UNIQUE(symbol, open_time) ); """ try: with conn.cursor() as cur: cur.execute(create_table_query) conn.commit() logger.info("Created binance_kline table") except psycopg2.Error as e: logger.error(f"Failed to create table: {e}") conn.rollback() def insert_data(conn, df): """将K线数据插入到数据库""" # 准备插入数据 insert_query = """ INSERT INTO binance_kline ( symbol, open_time, open, high, low, close, volume, close_time, quote_asset_volume, number_of_trades, taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore ) VALUES %s ON CONFLICT (symbol, open_time) DO NOTHING; """ # 转换DataFrame为元组列表 data = [tuple(row) for row in df[df.columns[1:]].to_numpy()] # 跳过id列 try: with conn.cursor() as cur: execute_values(cur, insert_query, data) conn.commit() logger.info(f"Inserted {len(data)} rows into database") except psycopg2.Error as e: logger.error(f"Failed to insert data: {e}") conn.rollback() def list_s3_files(url, timeout=10): """ 从S3存储桶的XML响应中提取所有文件的完整URL 参数: url: S3存储桶的列表URL,例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/ timeout: 请求超时时间,单位秒,默认10秒 返回: list: 完整的文件URL列表 """ logger.info(f"Listing files from {url}") # 配置代理 proxies = { 'http': 'http://localhost:1080', 'https': 'http://localhost:1080' } try: response = requests.get(url, timeout=timeout, proxies=proxies) response.raise_for_status() # 解析XML响应 root = ET.fromstring(response.content) # S3 XML使用的命名空间 ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'} # 基础下载URL base_download_url = "https://data.binance.vision/" # 提取所有Key元素的文本,构建完整URL file_urls = [] for key_elem in root.findall('.//s3:Key', ns): file_path = key_elem.text # 只返回zip文件 if file_path and file_path.endswith('.zip'): # 构建完整URL full_url = base_download_url + file_path file_urls.append(full_url) logger.info(f"Found {len(file_urls)} files") return file_urls except requests.exceptions.RequestException as e: logger.error(f"Failed to list files from {url}: {e}") return [] except ET.ParseError as e: logger.error(f"Failed to parse XML response: {e}") return [] except KeyboardInterrupt: logger.error(f"Request to {url} was interrupted") return [] def download_kline_data_by_url(url): """ 通过完整URL下载K线数据 参数: url: 完整的K线数据下载URL 返回: BytesIO: 下载的数据,或None如果下载失败 """ logger.info(f"Downloading {url}") # 配置代理 proxies = { 'http': 'http://localhost:1080', 'https': 'http://localhost:1080' } try: response = requests.get(url, proxies=proxies) response.raise_for_status() filename = os.path.basename(url) logger.info(f"Downloaded {filename} successfully") return BytesIO(response.content) except requests.exceptions.RequestException as e: logger.error(f"Failed to download {url}: {e}") return None def main(): # 创建数据库连接 conn = create_connection() if not conn: return # 创建表 create_table(conn) for symbol in SYMBOLS: # 使用list_s3_files函数获取可用的文件URL列表 s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{INTERVAL}/" file_urls = list_s3_files(s3_url) if not file_urls: logger.warning(f"No files found for {symbol}-{INTERVAL}") continue # 处理每个文件URL for file_url in file_urls: # 从URL中提取文件名 filename = os.path.basename(file_url) # 检查文件名格式 if not filename.endswith('.zip'): continue # 解析文件名,提取交易对和年月信息 # 格式: symbol-interval-year-month.zip parts = filename[:-4].split('-') # 移除.zip后缀并拆分 if len(parts) != 4: logger.warning(f"Invalid filename format: {filename}") continue file_symbol, file_interval, year, month = parts # 下载数据 zip_data = download_kline_data_by_url(file_url) if not zip_data: continue # 解析数据 df = parse_kline_data(zip_data, file_symbol) if df is None or df.empty: continue # 插入数据 insert_data(conn, df) # 关闭连接 conn.close() logger.info("Script completed successfully") if __name__ == "__main__": main()