diff --git a/download_binance_kline.py b/download_binance_kline.py index 59d97d3..f62841c 100644 --- a/download_binance_kline.py +++ b/download_binance_kline.py @@ -47,6 +47,81 @@ DB_CONFIG = { # K线数据列名 KLINE_COLUMNS = settings.KLINE_COLUMNS +def validate_kline_data(df): + """ + 校验K线数据的完整性和合理性 + + 参数: + df: 包含K线数据的DataFrame + + 返回: + bool: 数据是否有效 + str: 校验结果说明 + """ + if df.empty: + return False, "数据为空" + + # 1. 检查必要列是否存在 + required_columns = ['symbol', 'open_time', 'close_time', 'open', 'high', 'low', 'close', 'volume'] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + return False, f"缺少必要列: {missing_columns}" + + # 2. 检查数据类型 + numeric_columns = ['open', 'high', 'low', 'close', 'volume'] + for col in numeric_columns: + if not pd.api.types.is_numeric_dtype(df[col]): + return False, f"列 {col} 不是数值类型" + + # 3. 检查时间列的连续性 + df_sorted = df.sort_values('open_time') + time_diff = df_sorted['open_time'].diff().dropna() + if not time_diff.empty: + expected_diff = pd.Timedelta(days=1) # 默认1天间隔 + # 允许有少量异常值(如月初月末),但大部分数据应该符合预期间隔 + if (time_diff != expected_diff).mean() > 0.1: # 超过10%的数据不符合预期间隔 + return False, f"时间间隔不符合预期,预期{expected_diff},实际差异较大" + + # 4. 检查价格的合理性 + # 最高价 >= 最低价 + if (df['high'] < df['low']).any(): + return False, "存在最高价小于最低价的记录" + + # 最高价 >= 开盘价和收盘价 + if ((df['high'] < df['open']) | (df['high'] < df['close'])).any(): + return False, "存在最高价小于开盘价或收盘价的记录" + + # 最低价 <= 开盘价和收盘价 + if ((df['low'] > df['open']) | (df['low'] > df['close'])).any(): + return False, "存在最低价大于开盘价或收盘价的记录" + + # 5. 检查交易量的合理性 + if (df['volume'] < 0).any(): + return False, "存在交易量为负数的记录" + + # 6. 检查open_time和close_time的关系 + if (df['open_time'] >= df['close_time']).any(): + return False, "存在open_time大于等于close_time的记录" + + # 7. 检查symbol列是否一致 + if df['symbol'].nunique() > 1: + return False, "同一批次数据包含多个交易对" + + # 8. 检查是否有重复的open_time + if df['open_time'].duplicated().any(): + return False, "存在重复的open_time记录" + + # 9. 检查数值字段的范围,确保符合数据库numeric(20,8)的要求 + # numeric(20,8)意味着:最多20位数字,其中8位小数,因此整数部分最多12位 + max_value = 10**12 - 1e-8 # 999999999999.99999999 + min_value = -max_value + + for col in numeric_columns: + if (df[col] > max_value).any() or (df[col] < min_value).any(): + return False, f"列 {col} 的数值超出数据库字段范围 [-{max_value:.8f}, {max_value:.8f}]" + + return True, "数据完整性校验通过" + def download_kline_data(symbol, interval, year_month): """下载指定月份的K线数据""" # 拆分年月 @@ -81,7 +156,14 @@ def parse_kline_data(zip_data, symbol): "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"] df[numeric_columns] = df[numeric_columns].astype(float) df["number_of_trades"] = df["number_of_trades"].astype(int) - logger.info(f"Parsed {len(df)} rows of K线 data") + + # 数据完整性校验 + is_valid, message = validate_kline_data(df) + if not is_valid: + logger.error(f"K线数据校验失败: {message}") + return None + + logger.info(f"Parsed {len(df)} rows of K线 data, 数据校验通过") return df except Exception as e: logger.error(f"Failed to parse K线 data: {e}") @@ -98,95 +180,142 @@ def create_connection(): return None def create_table(conn): - """创建K线数据表""" + """ + 创建K线数据表 + """ + # 先检查并删除旧表,然后重新创建 + drop_table_query = """ + DROP TABLE IF EXISTS binance_kline; + """ + create_table_query = """ - CREATE TABLE IF NOT EXISTS binance_kline ( + CREATE TABLE binance_kline ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, open_time TIMESTAMP NOT NULL, - open NUMERIC(18, 8) NOT NULL, - high NUMERIC(18, 8) NOT NULL, - low NUMERIC(18, 8) NOT NULL, - close NUMERIC(18, 8) NOT NULL, - volume NUMERIC(18, 8) NOT NULL, + open NUMERIC(20, 8) NOT NULL, + high NUMERIC(20, 8) NOT NULL, + low NUMERIC(20, 8) NOT NULL, + close NUMERIC(20, 8) NOT NULL, + volume NUMERIC(20, 8) NOT NULL, close_time TIMESTAMP NOT NULL, - quote_asset_volume NUMERIC(18, 8) NOT NULL, + quote_asset_volume NUMERIC(20, 8) NOT NULL, number_of_trades INTEGER NOT NULL, - taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL, - taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL, - ignore NUMERIC(18, 8) NOT NULL, + taker_buy_base_asset_volume NUMERIC(20, 8) NOT NULL, + taker_buy_quote_asset_volume NUMERIC(20, 8) NOT NULL, + ignore NUMERIC(20, 8) NOT NULL, UNIQUE(symbol, open_time) ); """ try: with conn.cursor() as cur: + cur.execute(drop_table_query) cur.execute(create_table_query) conn.commit() - logger.info("Created binance_kline table") + logger.info("Created binance_kline table with updated schema") except psycopg2.Error as e: logger.error(f"Failed to create table: {e}") conn.rollback() def insert_data(conn, df): - """将K线数据插入到数据库,避免重复插入""" + """ + 将K线数据插入到数据库,避免重复插入 + """ if df.empty: logger.info("No data to insert") return + # 最终数据完整性校验 + is_valid, message = validate_kline_data(df) + if not is_valid: + logger.error(f"插入数据库前数据校验失败: {message}") + return + + total_records = len(df) + logger.info(f"Preparing to insert {total_records} records") + # 定义要插入的列顺序 insert_columns = [ - "symbol", "open_time", "open", "high", "low", "close", "volume", + "symbol", "open_time", "high", "low", "open", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ] # 1. 提取所有待插入的(symbol, open_time)组合 - symbol_time_pairs = df[['symbol', 'open_time']].values.tolist() + symbol_time_df = df[['symbol', 'open_time']].drop_duplicates() + symbol_time_pairs = symbol_time_df.values.tolist() # 2. 查询数据库中已存在的(symbol, open_time)组合 + existing_pairs = [] + batch_size = 500 # 批量检查大小 + if symbol_time_pairs: - # 构建查询条件,使用VALUES列表 - values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs]) - check_query = f""" - SELECT symbol, open_time FROM binance_kline - WHERE (symbol, open_time) IN ({values_list}) - """ - - try: - with conn.cursor() as cur: - cur.execute(check_query) - existing_pairs = cur.fetchall() + # 分批处理,避免SQL语句过长 + for i in range(0, len(symbol_time_pairs), batch_size): + batch_pairs = symbol_time_pairs[i:i+batch_size] + # 构建查询条件,使用VALUES列表 + values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in batch_pairs]) + check_query = f""" + SELECT symbol, open_time FROM binance_kline + WHERE (symbol, open_time) IN ({values_list}) + """ + + try: + with conn.cursor() as cur: + cur.execute(check_query) + batch_existing = cur.fetchall() + existing_pairs.extend(batch_existing) + except psycopg2.Error as e: + logger.error(f"Failed to check existing data in batch {i//batch_size + 1}: {e}") + logger.info("Falling back to ON CONFLICT mechanism for this batch") + # 对于失败的批次,我们将使用ON CONFLICT机制处理 + df_filtered = df + break + else: + # 只有当所有批次都成功时才执行过滤 # 转换为set以便快速查找 existing_set = set((row[0], row[1]) for row in existing_pairs) logger.info(f"Found {len(existing_set)} existing records in database") - # 3. 过滤DataFrame,只保留不存在的数据 - df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)] + if existing_set: + # 3. 使用更高效的方式过滤DataFrame,避免使用apply + # 创建一个包含现有组合的DataFrame + existing_df = pd.DataFrame(list(existing_set), columns=['symbol', 'open_time']) + + # 使用merge进行高效过滤 + merged_df = df.merge( + existing_df, + on=['symbol', 'open_time'], + how='left', + indicator=True + ) + + # 只保留在左表中存在而在右表中不存在的记录 + df_filtered = merged_df[merged_df['_merge'] == 'left_only'].drop('_merge', axis=1) + else: + df_filtered = df if df_filtered.empty: logger.info("All records already exist in database, skipping insertion") return - logger.info(f"Filtered to {len(df_filtered)} new records to insert") - except psycopg2.Error as e: - logger.error(f"Failed to check existing data: {e}") - logger.info("Falling back to ON CONFLICT mechanism") - df_filtered = df + logger.info(f"Filtered to {len(df_filtered)} new records to insert (removed {total_records - len(df_filtered)} existing records)") else: df_filtered = df # 4. 准备插入数据 insert_query = """ INSERT INTO binance_kline ( - symbol, open_time, open, high, low, close, volume, + symbol, open_time, high, low, open, close, volume, close_time, quote_asset_volume, number_of_trades, taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore ) VALUES %s ON CONFLICT (symbol, open_time) DO NOTHING; """ + # 转换DataFrame为元组列表 data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()] # 5. 执行插入 @@ -194,7 +323,7 @@ def insert_data(conn, df): with conn.cursor() as cur: execute_values(cur, insert_query, data) conn.commit() - logger.info(f"Inserted {len(data)} new rows into database") + logger.info(f"Successfully inserted {len(data)} new rows into database") except psycopg2.Error as e: logger.error(f"Failed to insert data: {e}") conn.rollback() @@ -313,8 +442,14 @@ def process_symbol(symbol, interval=INTERVAL): df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms') df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms') + # 数据完整性校验 + is_valid, message = validate_kline_data(df) + if not is_valid: + logger.error(f"文件 {os.path.basename(file_url)} 数据校验失败: {message}") + continue + all_dfs.append(df) - logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows") + logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows, 数据校验通过") except Exception as e: logger.error(f"Failed to process {file_url}: {e}") continue diff --git a/settings.toml b/settings.toml index 9ed5dee..ba83451 100644 --- a/settings.toml +++ b/settings.toml @@ -25,6 +25,6 @@ HTTPS_PROXY = "http://localhost:1080" # PostgreSQL数据库配置 DB_HOST = "localhost" DB_PORT = 5432 -DB_DATABASE = "your_database" -DB_USER = "your_username" -DB_PASSWORD = "your_password" +DB_DATABASE = "graphql-engine" +DB_USER = "root" +DB_PASSWORD = "pgsqlpasswd.."