From 27a8e3d64b3198f3ae01a90c5c0e5425acd179b2 Mon Sep 17 00:00:00 2001 From: yhydev Date: Wed, 14 Jan 2026 11:17:06 +0800 Subject: [PATCH] Optimize download_binance_kline.py: extract global proxy config, fix warnings, refactor main function --- download_binance_kline.py | 97 +++++++++++---------------------------- 1 file changed, 26 insertions(+), 71 deletions(-) diff --git a/download_binance_kline.py b/download_binance_kline.py index eed0030..e476e27 100644 --- a/download_binance_kline.py +++ b/download_binance_kline.py @@ -21,6 +21,12 @@ INTERVAL = "1d" START_DATE = "2021-01" END_DATE = datetime.now().strftime("%Y-%m") +# 代理配置 +PROXIES = { + 'http': 'http://localhost:1080', + 'https': 'http://localhost:1080' +} + # PostgreSQL配置 DB_CONFIG = { "host": "localhost", @@ -45,14 +51,8 @@ def download_kline_data(symbol, interval, year_month): url = f"{BASE_URL}{symbol}/{interval}/{filename}" logger.info(f"Downloading {url}") - # 配置代理 - proxies = { - 'http': 'http://localhost:1080', - 'https': 'http://localhost:1080' - } - try: - response = requests.get(url, proxies=proxies) + response = requests.get(url, proxies=PROXIES) response.raise_for_status() logger.info(f"Downloaded {filename} successfully") return BytesIO(response.content) @@ -136,8 +136,15 @@ def insert_data(conn, df): ON CONFLICT (symbol, open_time) DO NOTHING; """ + # 定义要插入的列顺序 + insert_columns = [ + "symbol", "open_time", "open", "high", "low", "close", "volume", + "close_time", "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" + ] + # 转换DataFrame为元组列表 - data = [tuple(row) for row in df[df.columns[1:]].to_numpy()] # 跳过id列 + data = [tuple(row) for row in df[insert_columns].to_numpy()] try: with conn.cursor() as cur: @@ -161,14 +168,8 @@ def list_s3_files(url, timeout=10): """ logger.info(f"Listing files from {url}") - # 配置代理 - proxies = { - 'http': 'http://localhost:1080', - 'https': 'http://localhost:1080' - } - try: - response = requests.get(url, timeout=timeout, proxies=proxies) + response = requests.get(url, timeout=timeout, proxies=PROXIES) response.raise_for_status() # 解析XML响应 @@ -214,14 +215,8 @@ def download_kline_data_by_url(url): """ logger.info(f"Downloading {url}") - # 配置代理 - proxies = { - 'http': 'http://localhost:1080', - 'https': 'http://localhost:1080' - } - try: - response = requests.get(url, proxies=proxies) + response = requests.get(url, proxies=PROXIES) response.raise_for_status() filename = os.path.basename(url) logger.info(f"Downloaded {filename} successfully") @@ -266,9 +261,9 @@ def process_symbol(symbol, interval=INTERVAL): # 添加symbol列 df["symbol"] = symbol - # 转换时间戳为datetime - df["open_time"] = pd.to_datetime(df["open_time"], unit='ms') - df["close_time"] = pd.to_datetime(df["close_time"], unit='ms') + # 转换时间戳为datetime(先转换为数值类型以避免FutureWarning) + df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms') + df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms') all_dfs.append(df) logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows") @@ -306,55 +301,15 @@ def process_symbol(symbol, interval=INTERVAL): def main(): - # 创建数据库连接 - conn = create_connection() - if not conn: - return - - # 创建表 - create_table(conn) + """主函数,处理所有配置的交易对""" + logger.info("Starting main process for symbols: %s", SYMBOLS) for symbol in SYMBOLS: - # 使用list_s3_files函数获取可用的文件URL列表 - s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{INTERVAL}/" - file_urls = list_s3_files(s3_url) - - if not file_urls: - logger.warning(f"No files found for {symbol}-{INTERVAL}") - continue - - # 处理每个文件URL - for file_url in file_urls: - # 从URL中提取文件名 - filename = os.path.basename(file_url) - # 检查文件名格式 - if not filename.endswith('.zip'): - continue - - # 解析文件名,提取交易对和年月信息 - # 格式: symbol-interval-year-month.zip - parts = filename[:-4].split('-') # 移除.zip后缀并拆分 - if len(parts) != 4: - logger.warning(f"Invalid filename format: {filename}") - continue - - file_symbol, file_interval, year, month = parts - - # 下载数据 - zip_data = download_kline_data_by_url(file_url) - if not zip_data: - continue - - # 解析数据 - df = parse_kline_data(zip_data, file_symbol) - if df is None or df.empty: - continue - - # 插入数据 - insert_data(conn, df) + try: + process_symbol(symbol, INTERVAL) + except Exception as e: + logger.error(f"Failed to process symbol {symbol}: {e}") - # 关闭连接 - conn.close() logger.info("Script completed successfully") if __name__ == "__main__":