优化K线数据完整性校验功能,添加多维度校验和关键节点验证

This commit is contained in:
yhydev
2026-01-14 13:31:56 +08:00
parent 28d569bf1e
commit a821ed39fb
2 changed files with 175 additions and 40 deletions

View File

@@ -47,6 +47,81 @@ DB_CONFIG = {
# K线数据列名
KLINE_COLUMNS = settings.KLINE_COLUMNS
def validate_kline_data(df):
"""
校验K线数据的完整性和合理性
参数:
df: 包含K线数据的DataFrame
返回:
bool: 数据是否有效
str: 校验结果说明
"""
if df.empty:
return False, "数据为空"
# 1. 检查必要列是否存在
required_columns = ['symbol', 'open_time', 'close_time', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"缺少必要列: {missing_columns}"
# 2. 检查数据类型
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_columns:
if not pd.api.types.is_numeric_dtype(df[col]):
return False, f"{col} 不是数值类型"
# 3. 检查时间列的连续性
df_sorted = df.sort_values('open_time')
time_diff = df_sorted['open_time'].diff().dropna()
if not time_diff.empty:
expected_diff = pd.Timedelta(days=1) # 默认1天间隔
# 允许有少量异常值(如月初月末),但大部分数据应该符合预期间隔
if (time_diff != expected_diff).mean() > 0.1: # 超过10%的数据不符合预期间隔
return False, f"时间间隔不符合预期,预期{expected_diff},实际差异较大"
# 4. 检查价格的合理性
# 最高价 >= 最低价
if (df['high'] < df['low']).any():
return False, "存在最高价小于最低价的记录"
# 最高价 >= 开盘价和收盘价
if ((df['high'] < df['open']) | (df['high'] < df['close'])).any():
return False, "存在最高价小于开盘价或收盘价的记录"
# 最低价 <= 开盘价和收盘价
if ((df['low'] > df['open']) | (df['low'] > df['close'])).any():
return False, "存在最低价大于开盘价或收盘价的记录"
# 5. 检查交易量的合理性
if (df['volume'] < 0).any():
return False, "存在交易量为负数的记录"
# 6. 检查open_time和close_time的关系
if (df['open_time'] >= df['close_time']).any():
return False, "存在open_time大于等于close_time的记录"
# 7. 检查symbol列是否一致
if df['symbol'].nunique() > 1:
return False, "同一批次数据包含多个交易对"
# 8. 检查是否有重复的open_time
if df['open_time'].duplicated().any():
return False, "存在重复的open_time记录"
# 9. 检查数值字段的范围确保符合数据库numeric(20,8)的要求
# numeric(20,8)意味着最多20位数字其中8位小数因此整数部分最多12位
max_value = 10**12 - 1e-8 # 999999999999.99999999
min_value = -max_value
for col in numeric_columns:
if (df[col] > max_value).any() or (df[col] < min_value).any():
return False, f"{col} 的数值超出数据库字段范围 [-{max_value:.8f}, {max_value:.8f}]"
return True, "数据完整性校验通过"
def download_kline_data(symbol, interval, year_month):
"""下载指定月份的K线数据"""
# 拆分年月
@@ -81,7 +156,14 @@ def parse_kline_data(zip_data, symbol):
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
df[numeric_columns] = df[numeric_columns].astype(float)
df["number_of_trades"] = df["number_of_trades"].astype(int)
logger.info(f"Parsed {len(df)} rows of K线 data")
# 数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"K线数据校验失败: {message}")
return None
logger.info(f"Parsed {len(df)} rows of K线 data, 数据校验通过")
return df
except Exception as e:
logger.error(f"Failed to parse K线 data: {e}")
@@ -98,95 +180,142 @@ def create_connection():
return None
def create_table(conn):
"""创建K线数据表"""
"""
创建K线数据表
"""
# 先检查并删除旧表,然后重新创建
drop_table_query = """
DROP TABLE IF EXISTS binance_kline;
"""
create_table_query = """
CREATE TABLE IF NOT EXISTS binance_kline (
CREATE TABLE binance_kline (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
open_time TIMESTAMP NOT NULL,
open NUMERIC(18, 8) NOT NULL,
high NUMERIC(18, 8) NOT NULL,
low NUMERIC(18, 8) NOT NULL,
close NUMERIC(18, 8) NOT NULL,
volume NUMERIC(18, 8) NOT NULL,
open NUMERIC(20, 8) NOT NULL,
high NUMERIC(20, 8) NOT NULL,
low NUMERIC(20, 8) NOT NULL,
close NUMERIC(20, 8) NOT NULL,
volume NUMERIC(20, 8) NOT NULL,
close_time TIMESTAMP NOT NULL,
quote_asset_volume NUMERIC(18, 8) NOT NULL,
quote_asset_volume NUMERIC(20, 8) NOT NULL,
number_of_trades INTEGER NOT NULL,
taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL,
ignore NUMERIC(18, 8) NOT NULL,
taker_buy_base_asset_volume NUMERIC(20, 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(20, 8) NOT NULL,
ignore NUMERIC(20, 8) NOT NULL,
UNIQUE(symbol, open_time)
);
"""
try:
with conn.cursor() as cur:
cur.execute(drop_table_query)
cur.execute(create_table_query)
conn.commit()
logger.info("Created binance_kline table")
logger.info("Created binance_kline table with updated schema")
except psycopg2.Error as e:
logger.error(f"Failed to create table: {e}")
conn.rollback()
def insert_data(conn, df):
"""将K线数据插入到数据库避免重复插入"""
"""
将K线数据插入到数据库避免重复插入
"""
if df.empty:
logger.info("No data to insert")
return
# 最终数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"插入数据库前数据校验失败: {message}")
return
total_records = len(df)
logger.info(f"Preparing to insert {total_records} records")
# 定义要插入的列顺序
insert_columns = [
"symbol", "open_time", "open", "high", "low", "close", "volume",
"symbol", "open_time", "high", "low", "open", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
# 1. 提取所有待插入的(symbol, open_time)组合
symbol_time_pairs = df[['symbol', 'open_time']].values.tolist()
symbol_time_df = df[['symbol', 'open_time']].drop_duplicates()
symbol_time_pairs = symbol_time_df.values.tolist()
# 2. 查询数据库中已存在的(symbol, open_time)组合
existing_pairs = []
batch_size = 500 # 批量检查大小
if symbol_time_pairs:
# 构建查询条件使用VALUES列表
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs])
check_query = f"""
SELECT symbol, open_time FROM binance_kline
WHERE (symbol, open_time) IN ({values_list})
"""
# 分批处理避免SQL语句过长
for i in range(0, len(symbol_time_pairs), batch_size):
batch_pairs = symbol_time_pairs[i:i+batch_size]
try:
with conn.cursor() as cur:
cur.execute(check_query)
existing_pairs = cur.fetchall()
# 构建查询条件使用VALUES列表
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in batch_pairs])
check_query = f"""
SELECT symbol, open_time FROM binance_kline
WHERE (symbol, open_time) IN ({values_list})
"""
try:
with conn.cursor() as cur:
cur.execute(check_query)
batch_existing = cur.fetchall()
existing_pairs.extend(batch_existing)
except psycopg2.Error as e:
logger.error(f"Failed to check existing data in batch {i//batch_size + 1}: {e}")
logger.info("Falling back to ON CONFLICT mechanism for this batch")
# 对于失败的批次我们将使用ON CONFLICT机制处理
df_filtered = df
break
else:
# 只有当所有批次都成功时才执行过滤
# 转换为set以便快速查找
existing_set = set((row[0], row[1]) for row in existing_pairs)
logger.info(f"Found {len(existing_set)} existing records in database")
# 3. 过滤DataFrame只保留不存在的数据
df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)]
if existing_set:
# 3. 使用更高效的方式过滤DataFrame避免使用apply
# 创建一个包含现有组合的DataFrame
existing_df = pd.DataFrame(list(existing_set), columns=['symbol', 'open_time'])
# 使用merge进行高效过滤
merged_df = df.merge(
existing_df,
on=['symbol', 'open_time'],
how='left',
indicator=True
)
# 只保留在左表中存在而在右表中不存在的记录
df_filtered = merged_df[merged_df['_merge'] == 'left_only'].drop('_merge', axis=1)
else:
df_filtered = df
if df_filtered.empty:
logger.info("All records already exist in database, skipping insertion")
return
logger.info(f"Filtered to {len(df_filtered)} new records to insert")
except psycopg2.Error as e:
logger.error(f"Failed to check existing data: {e}")
logger.info("Falling back to ON CONFLICT mechanism")
df_filtered = df
logger.info(f"Filtered to {len(df_filtered)} new records to insert (removed {total_records - len(df_filtered)} existing records)")
else:
df_filtered = df
# 4. 准备插入数据
insert_query = """
INSERT INTO binance_kline (
symbol, open_time, open, high, low, close, volume,
symbol, open_time, high, low, open, close, volume,
close_time, quote_asset_volume, number_of_trades,
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
) VALUES %s
ON CONFLICT (symbol, open_time) DO NOTHING;
"""
# 转换DataFrame为元组列表
data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()]
# 5. 执行插入
@@ -194,7 +323,7 @@ def insert_data(conn, df):
with conn.cursor() as cur:
execute_values(cur, insert_query, data)
conn.commit()
logger.info(f"Inserted {len(data)} new rows into database")
logger.info(f"Successfully inserted {len(data)} new rows into database")
except psycopg2.Error as e:
logger.error(f"Failed to insert data: {e}")
conn.rollback()
@@ -313,8 +442,14 @@ def process_symbol(symbol, interval=INTERVAL):
df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms')
# 数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"文件 {os.path.basename(file_url)} 数据校验失败: {message}")
continue
all_dfs.append(df)
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows")
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows, 数据校验通过")
except Exception as e:
logger.error(f"Failed to process {file_url}: {e}")
continue

View File

@@ -25,6 +25,6 @@ HTTPS_PROXY = "http://localhost:1080"
# PostgreSQL数据库配置
DB_HOST = "localhost"
DB_PORT = 5432
DB_DATABASE = "your_database"
DB_USER = "your_username"
DB_PASSWORD = "your_password"
DB_DATABASE = "graphql-engine"
DB_USER = "root"
DB_PASSWORD = "pgsqlpasswd.."