From 3dbb3b5cb545bc6082c4b9a03c1eaee1933c3e74 Mon Sep 17 00:00:00 2001 From: yhydev Date: Wed, 14 Jan 2026 11:22:53 +0800 Subject: [PATCH] Optimize insert_data function to avoid duplicate inserts by pre-checking existing data --- download_binance_kline.py | 64 +++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/download_binance_kline.py b/download_binance_kline.py index e476e27..277cb13 100644 --- a/download_binance_kline.py +++ b/download_binance_kline.py @@ -125,8 +125,55 @@ def create_table(conn): conn.rollback() def insert_data(conn, df): - """将K线数据插入到数据库""" - # 准备插入数据 + """将K线数据插入到数据库,避免重复插入""" + if df.empty: + logger.info("No data to insert") + return + + # 定义要插入的列顺序 + insert_columns = [ + "symbol", "open_time", "open", "high", "low", "close", "volume", + "close_time", "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" + ] + + # 1. 提取所有待插入的(symbol, open_time)组合 + symbol_time_pairs = df[['symbol', 'open_time']].values.tolist() + + # 2. 查询数据库中已存在的(symbol, open_time)组合 + if symbol_time_pairs: + # 构建查询条件,使用VALUES列表 + values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs]) + check_query = f""" + SELECT symbol, open_time FROM binance_kline + WHERE (symbol, open_time) IN ({values_list}) + """ + + try: + with conn.cursor() as cur: + cur.execute(check_query) + existing_pairs = cur.fetchall() + + # 转换为set以便快速查找 + existing_set = set((row[0], row[1]) for row in existing_pairs) + logger.info(f"Found {len(existing_set)} existing records in database") + + # 3. 过滤DataFrame,只保留不存在的数据 + df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)] + + if df_filtered.empty: + logger.info("All records already exist in database, skipping insertion") + return + + logger.info(f"Filtered to {len(df_filtered)} new records to insert") + except psycopg2.Error as e: + logger.error(f"Failed to check existing data: {e}") + logger.info("Falling back to ON CONFLICT mechanism") + df_filtered = df + else: + df_filtered = df + + # 4. 准备插入数据 insert_query = """ INSERT INTO binance_kline ( symbol, open_time, open, high, low, close, volume, @@ -136,21 +183,14 @@ def insert_data(conn, df): ON CONFLICT (symbol, open_time) DO NOTHING; """ - # 定义要插入的列顺序 - insert_columns = [ - "symbol", "open_time", "open", "high", "low", "close", "volume", - "close_time", "quote_asset_volume", "number_of_trades", - "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" - ] - - # 转换DataFrame为元组列表 - data = [tuple(row) for row in df[insert_columns].to_numpy()] + data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()] + # 5. 执行插入 try: with conn.cursor() as cur: execute_values(cur, insert_query, data) conn.commit() - logger.info(f"Inserted {len(data)} rows into database") + logger.info(f"Inserted {len(data)} new rows into database") except psycopg2.Error as e: logger.error(f"Failed to insert data: {e}") conn.rollback()