Optimize insert_data function to avoid duplicate inserts by pre-checking existing data
This commit is contained in:
@@ -125,8 +125,55 @@ def create_table(conn):
|
|||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
|
||||||
def insert_data(conn, df):
|
def insert_data(conn, df):
|
||||||
"""将K线数据插入到数据库"""
|
"""将K线数据插入到数据库,避免重复插入"""
|
||||||
# 准备插入数据
|
if df.empty:
|
||||||
|
logger.info("No data to insert")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 定义要插入的列顺序
|
||||||
|
insert_columns = [
|
||||||
|
"symbol", "open_time", "open", "high", "low", "close", "volume",
|
||||||
|
"close_time", "quote_asset_volume", "number_of_trades",
|
||||||
|
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
||||||
|
]
|
||||||
|
|
||||||
|
# 1. 提取所有待插入的(symbol, open_time)组合
|
||||||
|
symbol_time_pairs = df[['symbol', 'open_time']].values.tolist()
|
||||||
|
|
||||||
|
# 2. 查询数据库中已存在的(symbol, open_time)组合
|
||||||
|
if symbol_time_pairs:
|
||||||
|
# 构建查询条件,使用VALUES列表
|
||||||
|
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs])
|
||||||
|
check_query = f"""
|
||||||
|
SELECT symbol, open_time FROM binance_kline
|
||||||
|
WHERE (symbol, open_time) IN ({values_list})
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(check_query)
|
||||||
|
existing_pairs = cur.fetchall()
|
||||||
|
|
||||||
|
# 转换为set以便快速查找
|
||||||
|
existing_set = set((row[0], row[1]) for row in existing_pairs)
|
||||||
|
logger.info(f"Found {len(existing_set)} existing records in database")
|
||||||
|
|
||||||
|
# 3. 过滤DataFrame,只保留不存在的数据
|
||||||
|
df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)]
|
||||||
|
|
||||||
|
if df_filtered.empty:
|
||||||
|
logger.info("All records already exist in database, skipping insertion")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"Filtered to {len(df_filtered)} new records to insert")
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
logger.error(f"Failed to check existing data: {e}")
|
||||||
|
logger.info("Falling back to ON CONFLICT mechanism")
|
||||||
|
df_filtered = df
|
||||||
|
else:
|
||||||
|
df_filtered = df
|
||||||
|
|
||||||
|
# 4. 准备插入数据
|
||||||
insert_query = """
|
insert_query = """
|
||||||
INSERT INTO binance_kline (
|
INSERT INTO binance_kline (
|
||||||
symbol, open_time, open, high, low, close, volume,
|
symbol, open_time, open, high, low, close, volume,
|
||||||
@@ -136,21 +183,14 @@ def insert_data(conn, df):
|
|||||||
ON CONFLICT (symbol, open_time) DO NOTHING;
|
ON CONFLICT (symbol, open_time) DO NOTHING;
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# 定义要插入的列顺序
|
data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()]
|
||||||
insert_columns = [
|
|
||||||
"symbol", "open_time", "open", "high", "low", "close", "volume",
|
|
||||||
"close_time", "quote_asset_volume", "number_of_trades",
|
|
||||||
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
|
||||||
]
|
|
||||||
|
|
||||||
# 转换DataFrame为元组列表
|
|
||||||
data = [tuple(row) for row in df[insert_columns].to_numpy()]
|
|
||||||
|
|
||||||
|
# 5. 执行插入
|
||||||
try:
|
try:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
execute_values(cur, insert_query, data)
|
execute_values(cur, insert_query, data)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
logger.info(f"Inserted {len(data)} rows into database")
|
logger.info(f"Inserted {len(data)} new rows into database")
|
||||||
except psycopg2.Error as e:
|
except psycopg2.Error as e:
|
||||||
logger.error(f"Failed to insert data: {e}")
|
logger.error(f"Failed to insert data: {e}")
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
|||||||
Reference in New Issue
Block a user