@@ -47,6 +47,81 @@ DB_CONFIG = {
# K线数据列名
KLINE_COLUMNS = settings . KLINE_COLUMNS
def validate_kline_data ( df ) :
"""
校验K线数据的完整性和合理性
参数:
df: 包含K线数据的DataFrame
返回:
bool: 数据是否有效
str: 校验结果说明
"""
if df . empty :
return False , " 数据为空 "
# 1. 检查必要列是否存在
required_columns = [ ' symbol ' , ' open_time ' , ' close_time ' , ' open ' , ' high ' , ' low ' , ' close ' , ' volume ' ]
missing_columns = [ col for col in required_columns if col not in df . columns ]
if missing_columns :
return False , f " 缺少必要列: { missing_columns } "
# 2. 检查数据类型
numeric_columns = [ ' open ' , ' high ' , ' low ' , ' close ' , ' volume ' ]
for col in numeric_columns :
if not pd . api . types . is_numeric_dtype ( df [ col ] ) :
return False , f " 列 { col } 不是数值类型 "
# 3. 检查时间列的连续性
df_sorted = df . sort_values ( ' open_time ' )
time_diff = df_sorted [ ' open_time ' ] . diff ( ) . dropna ( )
if not time_diff . empty :
expected_diff = pd . Timedelta ( days = 1 ) # 默认1天间隔
# 允许有少量异常值(如月初月末),但大部分数据应该符合预期间隔
if ( time_diff != expected_diff ) . mean ( ) > 0.1 : # 超过10%的数据不符合预期间隔
return False , f " 时间间隔不符合预期,预期 { expected_diff } ,实际差异较大 "
# 4. 检查价格的合理性
# 最高价 >= 最低价
if ( df [ ' high ' ] < df [ ' low ' ] ) . any ( ) :
return False , " 存在最高价小于最低价的记录 "
# 最高价 >= 开盘价和收盘价
if ( ( df [ ' high ' ] < df [ ' open ' ] ) | ( df [ ' high ' ] < df [ ' close ' ] ) ) . any ( ) :
return False , " 存在最高价小于开盘价或收盘价的记录 "
# 最低价 <= 开盘价和收盘价
if ( ( df [ ' low ' ] > df [ ' open ' ] ) | ( df [ ' low ' ] > df [ ' close ' ] ) ) . any ( ) :
return False , " 存在最低价大于开盘价或收盘价的记录 "
# 5. 检查交易量的合理性
if ( df [ ' volume ' ] < 0 ) . any ( ) :
return False , " 存在交易量为负数的记录 "
# 6. 检查open_time和close_time的关系
if ( df [ ' open_time ' ] > = df [ ' close_time ' ] ) . any ( ) :
return False , " 存在open_time大于等于close_time的记录 "
# 7. 检查symbol列是否一致
if df [ ' symbol ' ] . nunique ( ) > 1 :
return False , " 同一批次数据包含多个交易对 "
# 8. 检查是否有重复的open_time
if df [ ' open_time ' ] . duplicated ( ) . any ( ) :
return False , " 存在重复的open_time记录 "
# 9. 检查数值字段的范围, 确保符合数据库numeric(20,8)的要求
# numeric(20,8)意味着: 最多20位数字, 其中8位小数, 因此整数部分最多12位
max_value = 10 * * 12 - 1e-8 # 999999999999.99999999
min_value = - max_value
for col in numeric_columns :
if ( df [ col ] > max_value ) . any ( ) or ( df [ col ] < min_value ) . any ( ) :
return False , f " 列 { col } 的数值超出数据库字段范围 [- { max_value : .8f } , { max_value : .8f } ] "
return True , " 数据完整性校验通过 "
def download_kline_data ( symbol , interval , year_month ) :
""" 下载指定月份的K线数据 """
# 拆分年月
@@ -81,7 +156,14 @@ def parse_kline_data(zip_data, symbol):
" taker_buy_base_asset_volume " , " taker_buy_quote_asset_volume " , " ignore " ]
df [ numeric_columns ] = df [ numeric_columns ] . astype ( float )
df [ " number_of_trades " ] = df [ " number_of_trades " ] . astype ( int )
logger . info ( f " Parsed { len ( df ) } rows of K线 data " )
# 数据完整性校验
is_valid , message = validate_kline_data ( df )
if not is_valid :
logger . error ( f " K线数据校验失败: { message } " )
return None
logger . info ( f " Parsed { len ( df ) } rows of K线 data, 数据校验通过 " )
return df
except Exception as e :
logger . error ( f " Failed to parse K线 data: { e } " )
@@ -98,95 +180,142 @@ def create_connection():
return None
def create_table ( conn ) :
""" 创建K线数据表 """
"""
创建K线数据表
"""
# 先检查并删除旧表,然后重新创建
drop_table_query = """
DROP TABLE IF EXISTS bn_futures_kline_1d;
"""
create_table_query = """
CREATE TABLE IF NOT EXISTS binance _kline (
CREATE TABLE bn_futures _kline_1d (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
open_time TIMESTAMP NOT NULL,
open NUMERIC(18 , 8) NOT NULL,
high NUMERIC(18 , 8) NOT NULL,
low NUMERIC(18 , 8) NOT NULL,
close NUMERIC(18 , 8) NOT NULL,
volume NUMERIC(18 , 8) NOT NULL,
open NUMERIC(20 , 8) NOT NULL,
high NUMERIC(20 , 8) NOT NULL,
low NUMERIC(20 , 8) NOT NULL,
close NUMERIC(20 , 8) NOT NULL,
volume NUMERIC(20 , 8) NOT NULL,
close_time TIMESTAMP NOT NULL,
quote_asset_volume NUMERIC(18 , 8) NOT NULL,
quote_asset_volume NUMERIC(20 , 8) NOT NULL,
number_of_trades INTEGER NOT NULL,
taker_buy_base_asset_volume NUMERIC(18 , 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(18 , 8) NOT NULL,
ignore NUMERIC(18 , 8) NOT NULL,
taker_buy_base_asset_volume NUMERIC(20 , 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(20 , 8) NOT NULL,
ignore NUMERIC(20 , 8) NOT NULL,
UNIQUE(symbol, open_time)
);
"""
try :
with conn . cursor ( ) as cur :
cur . execute ( drop_table_query )
cur . execute ( create_table_query )
conn . commit ( )
logger . info ( " Created binance _kline table " )
logger . info ( " Created bn_futures _kline_1d table with updated schema " )
except psycopg2 . Error as e :
logger . error ( f " Failed to create table: { e } " )
conn . rollback ( )
def insert_data ( conn , df ) :
""" 将K线数据插入到数据库, 避免重复插入 """
"""
将K线数据插入到数据库, 避免重复插入
"""
if df . empty :
logger . info ( " No data to insert " )
return
# 最终数据完整性校验
is_valid , message = validate_kline_data ( df )
if not is_valid :
logger . error ( f " 插入数据库前数据校验失败: { message } " )
return
total_records = len ( df )
logger . info ( f " Preparing to insert { total_records } records " )
# 定义要插入的列顺序
insert_columns = [
" symbol " , " open_time " , " open " , " high" , " low " , " close " , " volume " ,
" symbol " , " open_time " , " high " , " low " , " open " , " close " , " volume " ,
" close_time " , " quote_asset_volume " , " number_of_trades " ,
" taker_buy_base_asset_volume " , " taker_buy_quote_asset_volume " , " ignore "
]
# 1. 提取所有待插入的(symbol, open_time)组合
symbol_time_pairs = df [ [ ' symbol ' , ' open_time ' ] ] . values . tolist ( )
symbol_time_df = df [ [ ' symbol ' , ' open_time ' ] ] . drop_duplicates ( )
symbol_time_pairs = symbol_time_df . values . tolist ( )
# 2. 查询数据库中已存在的(symbol, open_time)组合
existing_pairs = [ ]
batch_size = 500 # 批量检查大小
if symbol_time_pairs :
# 分批处理, 避免SQL语句过长
for i in range ( 0 , len ( symbol_time_pairs ) , batch_size ) :
batch_pairs = symbol_time_pairs [ i : i + batch_size ]
# 构建查询条件, 使用VALUES列表
values_list = ' , ' . join ( [ f " ( ' { symbol } ' , ' { open_time } ' ) " for symbol , open_time in symbol_time _pairs] )
values_list = ' , ' . join ( [ f " ( ' { symbol } ' , ' { open_time } ' ) " for symbol , open_time in batch _pairs] )
check_query = f """
SELECT symbol, open_time FROM binance _kline
SELECT symbol, open_time FROM bn_futures _kline_1d
WHERE (symbol, open_time) IN ( { values_list } )
"""
try :
with conn . cursor ( ) as cur :
cur . execute ( check_query )
existing_pairs = cur . fetchall ( )
batch_ existing = cur . fetchall ( )
existing_pairs . extend ( batch_existing )
except psycopg2 . Error as e :
logger . error ( f " Failed to check existing data in batch { i / / batch_size + 1 } : { e } " )
logger . info ( " Falling back to ON CONFLICT mechanism for this batch " )
# 对于失败的批次, 我们将使用ON CONFLICT机制处理
df_filtered = df
break
else :
# 只有当所有批次都成功时才执行过滤
# 转换为set以便快速查找
existing_set = set ( ( row [ 0 ] , row [ 1 ] ) for row in existing_pairs )
logger . info ( f " Found { len ( existing_set ) } existing records in database " )
# 3. 过滤DataFrame, 只保留不存在的数据
df_filtered = df [ ~ df . apply ( lambda row : ( row [ ' symbol ' ] , row [ ' open_time ' ] ) in existing_set , axis = 1 ) ]
if existing_set :
# 3. 使用更高效的方式过滤DataFrame, 避免使用apply
# 创建一个包含现有组合的DataFrame
existing_df = pd . DataFrame ( list ( existing_set ) , columns = [ ' symbol ' , ' open_time ' ] )
# 使用merge进行高效过滤
merged_df = df . merge (
existing_df ,
on = [ ' symbol ' , ' open_time ' ] ,
how = ' left ' ,
indicator = True
)
# 只保留在左表中存在而在右表中不存在的记录
df_filtered = merged_df [ merged_df [ ' _merge ' ] == ' left_only ' ] . drop ( ' _merge ' , axis = 1 )
else :
df_filtered = df
if df_filtered . empty :
logger . info ( " All records already exist in database, skipping insertion " )
return
logger . info ( f " Filtered to { len ( df_filtered ) } new records to insert " )
except psycopg2 . Error as e :
logger . error ( f " Failed to check existing data: { e } " )
logger . info ( " Falling back to ON CONFLICT mechanism " )
df_filtered = df
logger . info ( f " Filtered to { len ( df_filtered ) } new records to insert (removed { total_records - len ( df_filtered ) } existing records) " )
else :
df_filtered = df
# 4. 准备插入数据
insert_query = """
INSERT INTO binance _kline (
symbol, open_time, open, high, low, close, volume,
INSERT INTO bn_futures _kline_1d (
symbol, open_time, high, low, open, close, volume,
close_time, quote_asset_volume, number_of_trades,
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
) VALUES %s
ON CONFLICT (symbol, open_time) DO NOTHING;
"""
# 转换DataFrame为元组列表
data = [ tuple ( row ) for row in df_filtered [ insert_columns ] . to_numpy ( ) ]
# 5. 执行插入
@@ -194,7 +323,7 @@ def insert_data(conn, df):
with conn . cursor ( ) as cur :
execute_values ( cur , insert_query , data )
conn . commit ( )
logger . info ( f " I nserted { len ( data ) } new rows into database " )
logger . info ( f " Successfully i nserted { len ( data ) } new rows into database " )
except psycopg2 . Error as e :
logger . error ( f " Failed to insert data: { e } " )
conn . rollback ( )
@@ -313,8 +442,14 @@ def process_symbol(symbol, interval=INTERVAL):
df [ " open_time " ] = pd . to_datetime ( df [ " open_time " ] . astype ( float ) , unit = ' ms ' )
df [ " close_time " ] = pd . to_datetime ( df [ " close_time " ] . astype ( float ) , unit = ' ms ' )
# 数据完整性校验
is_valid , message = validate_kline_data ( df )
if not is_valid :
logger . error ( f " 文件 { os . path . basename ( file_url ) } 数据校验失败: { message } " )
continue
all_dfs . append ( df )
logger . info ( f " Processed { os . path . basename ( file_url ) } with { len ( df ) } rows " )
logger . info ( f " Processed { os . path . basename ( file_url ) } with { len ( df ) } rows, 数据校验通过 " )
except Exception as e :
logger . error ( f " Failed to process { file_url } : { e } " )
continue