import requests import os import pandas as pd from io import BytesIO import psycopg2 from psycopg2.extras import execute_values import logging from datetime import datetime import xml.etree.ElementTree as ET from download_unzip_csv import download_unzip_csv from dynaconf import Dynaconf # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 初始化Dynaconf配置 settings = Dynaconf( envvar_prefix="DYNACONF", settings_files=["settings.toml"], environments=True, load_dotenv=False, default_env="default", env_switcher="BINANCE_ENV" ) # 配置参数 BASE_URL = settings.BASE_URL SYMBOLS = settings.SYMBOLS INTERVAL = settings.INTERVAL # 代理配置 PROXIES = { 'http': settings.HTTP_PROXY, 'https': settings.HTTPS_PROXY } if settings.PROXY_ENABLED else None # PostgreSQL配置 DB_CONFIG = { "host": settings.DB_HOST, "database": settings.DB_DATABASE, "user": settings.DB_USER, "password": settings.DB_PASSWORD, "port": settings.DB_PORT } # K线数据列名 KLINE_COLUMNS = settings.KLINE_COLUMNS def validate_kline_data(df): """ 校验K线数据的完整性和合理性 参数: df: 包含K线数据的DataFrame 返回: bool: 数据是否有效 str: 校验结果说明 """ if df.empty: return False, "数据为空" # 1. 检查必要列是否存在 required_columns = ['symbol', 'open_time', 'close_time', 'open', 'high', 'low', 'close', 'volume'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"缺少必要列: {missing_columns}" # 2. 检查数据类型 numeric_columns = ['open', 'high', 'low', 'close', 'volume'] for col in numeric_columns: if not pd.api.types.is_numeric_dtype(df[col]): return False, f"列 {col} 不是数值类型" # 3. 检查时间列的连续性 df_sorted = df.sort_values('open_time') time_diff = df_sorted['open_time'].diff().dropna() if not time_diff.empty: expected_diff = pd.Timedelta(days=1) # 默认1天间隔 # 允许有少量异常值(如月初月末),但大部分数据应该符合预期间隔 if (time_diff != expected_diff).mean() > 0.1: # 超过10%的数据不符合预期间隔 return False, f"时间间隔不符合预期,预期{expected_diff},实际差异较大" # 4. 检查价格的合理性 # 最高价 >= 最低价 if (df['high'] < df['low']).any(): return False, "存在最高价小于最低价的记录" # 最高价 >= 开盘价和收盘价 if ((df['high'] < df['open']) | (df['high'] < df['close'])).any(): return False, "存在最高价小于开盘价或收盘价的记录" # 最低价 <= 开盘价和收盘价 if ((df['low'] > df['open']) | (df['low'] > df['close'])).any(): return False, "存在最低价大于开盘价或收盘价的记录" # 5. 检查交易量的合理性 if (df['volume'] < 0).any(): return False, "存在交易量为负数的记录" # 6. 检查open_time和close_time的关系 if (df['open_time'] >= df['close_time']).any(): return False, "存在open_time大于等于close_time的记录" # 7. 检查symbol列是否一致 if df['symbol'].nunique() > 1: return False, "同一批次数据包含多个交易对" # 8. 检查是否有重复的open_time if df['open_time'].duplicated().any(): return False, "存在重复的open_time记录" # 9. 检查数值字段的范围,确保符合数据库numeric(20,8)的要求 # numeric(20,8)意味着:最多20位数字,其中8位小数,因此整数部分最多12位 max_value = 10**12 - 1e-8 # 999999999999.99999999 min_value = -max_value for col in numeric_columns: if (df[col] > max_value).any() or (df[col] < min_value).any(): return False, f"列 {col} 的数值超出数据库字段范围 [-{max_value:.8f}, {max_value:.8f}]" return True, "数据完整性校验通过" def download_kline_data(symbol, interval, year_month): """下载指定月份的K线数据""" # 拆分年月 year, month = year_month.split("-") filename = f"{symbol}-{interval}-{year}-{month}.zip" url = f"{BASE_URL}{symbol}/{interval}/{filename}" logger.info(f"Downloading {url}") try: response = requests.get(url) response.raise_for_status() logger.info(f"Downloaded {filename} successfully") return BytesIO(response.content) except requests.exceptions.RequestException as e: logger.error(f"Failed to download {filename}: {e}") return None def parse_kline_data(zip_data, symbol): """解析下载的K线数据""" try: df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS) # 跳过标题行 df = df[df["open_time"] != "open_time"] # 添加symbol列 df["symbol"] = symbol # 转换时间戳为datetime df["open_time"] = pd.to_datetime(df["open_time"], unit='ms') df["close_time"] = pd.to_datetime(df["close_time"], unit='ms') # 转换数值列 numeric_columns = ["open", "high", "low", "close", "volume", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"] df[numeric_columns] = df[numeric_columns].astype(float) df["number_of_trades"] = df["number_of_trades"].astype(int) # 数据完整性校验 is_valid, message = validate_kline_data(df) if not is_valid: logger.error(f"K线数据校验失败: {message}") return None logger.info(f"Parsed {len(df)} rows of K线 data, 数据校验通过") return df except Exception as e: logger.error(f"Failed to parse K线 data: {e}") return None def create_connection(): """创建PostgreSQL连接""" try: conn = psycopg2.connect(**DB_CONFIG) logger.info("Connected to PostgreSQL database") return conn except psycopg2.OperationalError as e: logger.error(f"Failed to connect to PostgreSQL: {e}") return None def create_table(conn): """ 创建K线数据表 """ # 先检查并删除旧表,然后重新创建 drop_table_query = """ DROP TABLE IF EXISTS bn_futures_kline_1d; """ create_table_query = """ CREATE TABLE bn_futures_kline_1d ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, open_time TIMESTAMP NOT NULL, open NUMERIC(20, 8) NOT NULL, high NUMERIC(20, 8) NOT NULL, low NUMERIC(20, 8) NOT NULL, close NUMERIC(20, 8) NOT NULL, volume NUMERIC(20, 8) NOT NULL, close_time TIMESTAMP NOT NULL, quote_asset_volume NUMERIC(20, 8) NOT NULL, number_of_trades INTEGER NOT NULL, taker_buy_base_asset_volume NUMERIC(20, 8) NOT NULL, taker_buy_quote_asset_volume NUMERIC(20, 8) NOT NULL, ignore NUMERIC(20, 8) NOT NULL, UNIQUE(symbol, open_time) ); """ try: with conn.cursor() as cur: cur.execute(drop_table_query) cur.execute(create_table_query) conn.commit() logger.info("Created bn_futures_kline_1d table with updated schema") except psycopg2.Error as e: logger.error(f"Failed to create table: {e}") conn.rollback() def insert_data(conn, df): """ 将K线数据插入到数据库,避免重复插入 """ if df.empty: logger.info("No data to insert") return # 最终数据完整性校验 is_valid, message = validate_kline_data(df) if not is_valid: logger.error(f"插入数据库前数据校验失败: {message}") return total_records = len(df) logger.info(f"Preparing to insert {total_records} records") # 定义要插入的列顺序 insert_columns = [ "symbol", "open_time", "high", "low", "open", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ] # 1. 提取所有待插入的(symbol, open_time)组合 symbol_time_df = df[['symbol', 'open_time']].drop_duplicates() symbol_time_pairs = symbol_time_df.values.tolist() # 2. 查询数据库中已存在的(symbol, open_time)组合 existing_pairs = [] batch_size = 500 # 批量检查大小 if symbol_time_pairs: # 分批处理,避免SQL语句过长 for i in range(0, len(symbol_time_pairs), batch_size): batch_pairs = symbol_time_pairs[i:i+batch_size] # 构建查询条件,使用VALUES列表 values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in batch_pairs]) check_query = f""" SELECT symbol, open_time FROM bn_futures_kline_1d WHERE (symbol, open_time) IN ({values_list}) """ try: with conn.cursor() as cur: cur.execute(check_query) batch_existing = cur.fetchall() existing_pairs.extend(batch_existing) except psycopg2.Error as e: logger.error(f"Failed to check existing data in batch {i//batch_size + 1}: {e}") logger.info("Falling back to ON CONFLICT mechanism for this batch") # 对于失败的批次,我们将使用ON CONFLICT机制处理 df_filtered = df break else: # 只有当所有批次都成功时才执行过滤 # 转换为set以便快速查找 existing_set = set((row[0], row[1]) for row in existing_pairs) logger.info(f"Found {len(existing_set)} existing records in database") if existing_set: # 3. 使用更高效的方式过滤DataFrame,避免使用apply # 创建一个包含现有组合的DataFrame existing_df = pd.DataFrame(list(existing_set), columns=['symbol', 'open_time']) # 使用merge进行高效过滤 merged_df = df.merge( existing_df, on=['symbol', 'open_time'], how='left', indicator=True ) # 只保留在左表中存在而在右表中不存在的记录 df_filtered = merged_df[merged_df['_merge'] == 'left_only'].drop('_merge', axis=1) else: df_filtered = df if df_filtered.empty: logger.info("All records already exist in database, skipping insertion") return logger.info(f"Filtered to {len(df_filtered)} new records to insert (removed {total_records - len(df_filtered)} existing records)") else: df_filtered = df # 4. 准备插入数据 insert_query = """ INSERT INTO bn_futures_kline_1d ( symbol, open_time, high, low, open, close, volume, close_time, quote_asset_volume, number_of_trades, taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore ) VALUES %s ON CONFLICT (symbol, open_time) DO NOTHING; """ # 转换DataFrame为元组列表 data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()] # 5. 执行插入 try: with conn.cursor() as cur: execute_values(cur, insert_query, data) conn.commit() logger.info(f"Successfully inserted {len(data)} new rows into database") except psycopg2.Error as e: logger.error(f"Failed to insert data: {e}") conn.rollback() def list_s3_files(url, timeout=10): """ 从S3存储桶的XML响应中提取所有文件的完整URL 参数: url: S3存储桶的列表URL,例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/ timeout: 请求超时时间,单位秒,默认10秒 返回: list: 完整的文件URL列表 """ logger.info(f"Listing files from {url}") try: # 根据配置决定是否使用代理 if PROXIES: response = requests.get(url, timeout=timeout, proxies=PROXIES) else: response = requests.get(url, timeout=timeout) response.raise_for_status() # 解析XML响应 root = ET.fromstring(response.content) # S3 XML使用的命名空间 ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'} # 基础下载URL base_download_url = "https://data.binance.vision/" # 提取所有Key元素的文本,构建完整URL file_urls = [] for key_elem in root.findall('.//s3:Key', ns): file_path = key_elem.text # 只返回zip文件 if file_path and file_path.endswith('.zip'): # 构建完整URL full_url = base_download_url + file_path file_urls.append(full_url) logger.info(f"Found {len(file_urls)} files") return file_urls except requests.exceptions.RequestException as e: logger.error(f"Failed to list files from {url}: {e}") return [] except ET.ParseError as e: logger.error(f"Failed to parse XML response: {e}") return [] except KeyboardInterrupt: logger.error(f"Request to {url} was interrupted") return [] def download_kline_data_by_url(url): """ 通过完整URL下载K线数据 参数: url: 完整的K线数据下载URL 返回: BytesIO: 下载的数据,或None如果下载失败 """ logger.info(f"Downloading {url}") try: response = requests.get(url) response.raise_for_status() filename = os.path.basename(url) logger.info(f"Downloaded {filename} successfully") return BytesIO(response.content) except requests.exceptions.RequestException as e: logger.error(f"Failed to download {url}: {e}") return None def process_symbol(symbol, interval=INTERVAL): """ 处理指定交易对的所有K线数据,包括下载、解析、合并和插入数据库 参数: symbol: 交易对,例如: "BTCUSDT" interval: 时间间隔,例如: "1d",默认使用全局INTERVAL 返回: pandas.DataFrame: 合并后的K线数据 """ logger.info(f"Processing symbol: {symbol}, interval: {interval}") # 组装S3列表URL s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{interval}/" # 获取所有可下载文件URL file_urls = list_s3_files(s3_url) if not file_urls: logger.warning(f"No files found for {symbol}-{interval}") return None # 合并所有DataFrame all_dfs = [] for file_url in file_urls: if not file_url.endswith('.zip'): continue try: # 调用download_unzip_csv下载并解析数据 df = download_unzip_csv(file_url, header=None, names=KLINE_COLUMNS) # 添加symbol列 df["symbol"] = symbol # 转换时间戳为datetime(先转换为数值类型以避免FutureWarning) df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms') df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms') # 数据完整性校验 is_valid, message = validate_kline_data(df) if not is_valid: logger.error(f"文件 {os.path.basename(file_url)} 数据校验失败: {message}") continue all_dfs.append(df) logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows, 数据校验通过") except Exception as e: logger.error(f"Failed to process {file_url}: {e}") continue if not all_dfs: logger.warning(f"No data processed for {symbol}-{interval}") return None # 合并所有DataFrame merged_df = pd.concat(all_dfs, ignore_index=True) logger.info(f"Merged {len(all_dfs)} files into a single DataFrame with {len(merged_df)} rows") # 去重 merged_df = merged_df.drop_duplicates(subset=["symbol", "open_time"]) logger.info(f"After deduplication, {len(merged_df)} rows remain") # 插入到PostgreSQL数据库 conn = create_connection() if conn: try: # 确保表存在 create_table(conn) # 插入数据 insert_data(conn, merged_df) logger.info(f"Successfully inserted {len(merged_df)} rows into database for {symbol}") finally: # 关闭连接 conn.close() return merged_df def main(): """主函数,处理所有配置的交易对""" logger.info("Starting main process for symbols: %s", SYMBOLS) for symbol in SYMBOLS: try: process_symbol(symbol, INTERVAL) except Exception as e: logger.error(f"Failed to process symbol {symbol}: {e}") logger.info("Script completed successfully") if __name__ == "__main__": # 测试新添加的process_symbol函数 import sys if len(sys.argv) > 1: # 从命令行获取交易对 symbol = sys.argv[1] interval = sys.argv[2] if len(sys.argv) > 2 else INTERVAL process_symbol(symbol, interval) else: # 默认运行main函数 main()