From 078631190753bfb2f3c7d3a3a7540ed20ac30ff0 Mon Sep 17 00:00:00 2001 From: yhydev Date: Wed, 14 Jan 2026 10:58:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- download_binance_kline.py | 285 ++++++++++++++++++++++++++++++++++++++ download_unzip_csv.py | 104 ++++++++++++++ 2 files changed, 389 insertions(+) create mode 100644 download_binance_kline.py create mode 100644 download_unzip_csv.py diff --git a/download_binance_kline.py b/download_binance_kline.py new file mode 100644 index 0000000..0728ee7 --- /dev/null +++ b/download_binance_kline.py @@ -0,0 +1,285 @@ +import requests +import os +import pandas as pd +from io import BytesIO +import psycopg2 +from psycopg2.extras import execute_values +import logging +from datetime import datetime +import xml.etree.ElementTree as ET + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 配置参数 +BASE_URL = "https://data.binance.vision/data/futures/um/monthly/klines/" +# 可以添加多个交易对,例如:["BOBUSDT", "BTCUSDT", "ETHUSDT"] +SYMBOLS = ["BOBUSDT", "BTCUSDT"] # BOBUSDT数据从2025-11开始可用 +INTERVAL = "1d" +START_DATE = "2021-01" +END_DATE = datetime.now().strftime("%Y-%m") + +# PostgreSQL配置 +DB_CONFIG = { + "host": "localhost", + "database": "your_database", + "user": "your_username", + "password": "your_password", + "port": 5432 +} + +# K线数据列名 +KLINE_COLUMNS = [ + "open_time", "open", "high", "low", "close", "volume", + "close_time", "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" +] + +def download_kline_data(symbol, interval, year_month): + """下载指定月份的K线数据""" + # 拆分年月 + year, month = year_month.split("-") + filename = f"{symbol}-{interval}-{year}-{month}.zip" + url = f"{BASE_URL}{symbol}/{interval}/{filename}" + logger.info(f"Downloading {url}") + + # 配置代理 + proxies = { + 'http': 'http://localhost:1080', + 'https': 'http://localhost:1080' + } + + try: + response = requests.get(url, proxies=proxies) + response.raise_for_status() + logger.info(f"Downloaded {filename} successfully") + return BytesIO(response.content) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to download {filename}: {e}") + return None + +def parse_kline_data(zip_data, symbol): + """解析下载的K线数据""" + try: + df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS) + # 跳过标题行 + df = df[df["open_time"] != "open_time"] + # 添加symbol列 + df["symbol"] = symbol + # 转换时间戳为datetime + df["open_time"] = pd.to_datetime(df["open_time"], unit='ms') + df["close_time"] = pd.to_datetime(df["close_time"], unit='ms') + # 转换数值列 + numeric_columns = ["open", "high", "low", "close", "volume", + "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"] + df[numeric_columns] = df[numeric_columns].astype(float) + df["number_of_trades"] = df["number_of_trades"].astype(int) + logger.info(f"Parsed {len(df)} rows of K线 data") + return df + except Exception as e: + logger.error(f"Failed to parse K线 data: {e}") + return None + +def create_connection(): + """创建PostgreSQL连接""" + try: + conn = psycopg2.connect(**DB_CONFIG) + logger.info("Connected to PostgreSQL database") + return conn + except psycopg2.OperationalError as e: + logger.error(f"Failed to connect to PostgreSQL: {e}") + return None + +def create_table(conn): + """创建K线数据表""" + create_table_query = """ + CREATE TABLE IF NOT EXISTS binance_kline ( + id SERIAL PRIMARY KEY, + symbol VARCHAR(20) NOT NULL, + open_time TIMESTAMP NOT NULL, + open NUMERIC(18, 8) NOT NULL, + high NUMERIC(18, 8) NOT NULL, + low NUMERIC(18, 8) NOT NULL, + close NUMERIC(18, 8) NOT NULL, + volume NUMERIC(18, 8) NOT NULL, + close_time TIMESTAMP NOT NULL, + quote_asset_volume NUMERIC(18, 8) NOT NULL, + number_of_trades INTEGER NOT NULL, + taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL, + taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL, + ignore NUMERIC(18, 8) NOT NULL, + UNIQUE(symbol, open_time) + ); + """ + + try: + with conn.cursor() as cur: + cur.execute(create_table_query) + conn.commit() + logger.info("Created binance_kline table") + except psycopg2.Error as e: + logger.error(f"Failed to create table: {e}") + conn.rollback() + +def insert_data(conn, df): + """将K线数据插入到数据库""" + # 准备插入数据 + insert_query = """ + INSERT INTO binance_kline ( + symbol, open_time, open, high, low, close, volume, + close_time, quote_asset_volume, number_of_trades, + taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore + ) VALUES %s + ON CONFLICT (symbol, open_time) DO NOTHING; + """ + + # 转换DataFrame为元组列表 + data = [tuple(row) for row in df[df.columns[1:]].to_numpy()] # 跳过id列 + + try: + with conn.cursor() as cur: + execute_values(cur, insert_query, data) + conn.commit() + logger.info(f"Inserted {len(data)} rows into database") + except psycopg2.Error as e: + logger.error(f"Failed to insert data: {e}") + conn.rollback() + +def list_s3_files(url, timeout=10): + """ + 从S3存储桶的XML响应中提取所有文件的完整URL + + 参数: + url: S3存储桶的列表URL,例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/ + timeout: 请求超时时间,单位秒,默认10秒 + + 返回: + list: 完整的文件URL列表 + """ + logger.info(f"Listing files from {url}") + + # 配置代理 + proxies = { + 'http': 'http://localhost:1080', + 'https': 'http://localhost:1080' + } + + try: + response = requests.get(url, timeout=timeout, proxies=proxies) + response.raise_for_status() + + # 解析XML响应 + root = ET.fromstring(response.content) + + # S3 XML使用的命名空间 + ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'} + + # 基础下载URL + base_download_url = "https://data.binance.vision/" + + # 提取所有Key元素的文本,构建完整URL + file_urls = [] + for key_elem in root.findall('.//s3:Key', ns): + file_path = key_elem.text + # 只返回zip文件 + if file_path and file_path.endswith('.zip'): + # 构建完整URL + full_url = base_download_url + file_path + file_urls.append(full_url) + + logger.info(f"Found {len(file_urls)} files") + return file_urls + except requests.exceptions.RequestException as e: + logger.error(f"Failed to list files from {url}: {e}") + return [] + except ET.ParseError as e: + logger.error(f"Failed to parse XML response: {e}") + return [] + except KeyboardInterrupt: + logger.error(f"Request to {url} was interrupted") + return [] + +def download_kline_data_by_url(url): + """ + 通过完整URL下载K线数据 + + 参数: + url: 完整的K线数据下载URL + + 返回: + BytesIO: 下载的数据,或None如果下载失败 + """ + logger.info(f"Downloading {url}") + + # 配置代理 + proxies = { + 'http': 'http://localhost:1080', + 'https': 'http://localhost:1080' + } + + try: + response = requests.get(url, proxies=proxies) + response.raise_for_status() + filename = os.path.basename(url) + logger.info(f"Downloaded {filename} successfully") + return BytesIO(response.content) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to download {url}: {e}") + return None + +def main(): + # 创建数据库连接 + conn = create_connection() + if not conn: + return + + # 创建表 + create_table(conn) + + for symbol in SYMBOLS: + # 使用list_s3_files函数获取可用的文件URL列表 + s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{INTERVAL}/" + file_urls = list_s3_files(s3_url) + + if not file_urls: + logger.warning(f"No files found for {symbol}-{INTERVAL}") + continue + + # 处理每个文件URL + for file_url in file_urls: + # 从URL中提取文件名 + filename = os.path.basename(file_url) + # 检查文件名格式 + if not filename.endswith('.zip'): + continue + + # 解析文件名,提取交易对和年月信息 + # 格式: symbol-interval-year-month.zip + parts = filename[:-4].split('-') # 移除.zip后缀并拆分 + if len(parts) != 4: + logger.warning(f"Invalid filename format: {filename}") + continue + + file_symbol, file_interval, year, month = parts + + # 下载数据 + zip_data = download_kline_data_by_url(file_url) + if not zip_data: + continue + + # 解析数据 + df = parse_kline_data(zip_data, file_symbol) + if df is None or df.empty: + continue + + # 插入数据 + insert_data(conn, df) + + # 关闭连接 + conn.close() + logger.info("Script completed successfully") + +if __name__ == "__main__": + main() diff --git a/download_unzip_csv.py b/download_unzip_csv.py new file mode 100644 index 0000000..8f4fd0d --- /dev/null +++ b/download_unzip_csv.py @@ -0,0 +1,104 @@ +import requests +import pandas as pd +from io import BytesIO +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def download_unzip_csv(url, **kwargs): + """ + 从URL下载文件,解压并返回CSV内容作为pandas DataFrame + + 参数: + url: 文件下载URL + **kwargs: 传递给pandas.read_csv的额外参数 + + 返回: + pandas.DataFrame: 解压后的CSV内容 + """ + logger.info(f"Downloading file from {url}") + + try: + # 下载文件 + response = requests.get(url) + response.raise_for_status() # 检查下载是否成功 + logger.info(f"Successfully downloaded file from {url}") + + # 使用BytesIO处理下载的内容 + zip_data = BytesIO(response.content) + + # 读取并解压CSV文件 + logger.info("Reading and decompressing CSV file") + df = pd.read_csv(zip_data, compression='zip', **kwargs) + + # 智能判定首行是否为标题行 + def is_header_row(row): + """智能检测行是否为标题行""" + # 检查条件 + conditions = [ + # 1. 首行包含常见的时间相关列名 + any(keyword in str(cell).lower() for keyword in ['time', 'date', 'datetime', 'timestamp'] for cell in row), + # 2. 首行包含常见的价格相关列名 + any(keyword in str(cell).lower() for keyword in ['open', 'high', 'low', 'close', 'volume'] for cell in row), + # 3. 首行包含常见的交易相关列名 + any(keyword in str(cell).lower() for keyword in ['taker', 'quote', 'count', 'ignore'] for cell in row), + # 4. 首行全为字符串,而第二行包含数值 + len(df) > 1 and all(isinstance(str(cell), str) and not str(cell).replace('.', '').isdigit() for cell in row) and \ + any(str(cell).replace('.', '').isdigit() for cell in df.iloc[1] if pd.notna(cell)), + # 5. 首行包含'_'字符(常见于编程命名的列名) + any('_' in str(cell) for cell in row) + ] + return any(conditions) + + if len(df) > 0 and is_header_row(df.iloc[0]): + df = df[1:].reset_index(drop=True) + logger.info("Skipped header row") + + # 转换数值列 + numeric_columns = ['open', 'high', 'low', 'close', 'volume', + 'close_time', 'quote_asset_volume', 'number_of_trades', + 'taker_buy_volume', 'taker_buy_quote_volume', 'taker_buy_base_asset_volume', + 'taker_buy_quote_asset_volume', 'ignore'] + + # 只转换存在的列 + for col in numeric_columns: + if col in df.columns: + try: + if col in ['number_of_trades', 'count']: + df[col] = df[col].astype(int) + else: + df[col] = df[col].astype(float) + except ValueError: + logger.warning(f"Could not convert column {col} to numeric type") + + logger.info(f"Successfully parsed CSV file with {len(df)} rows") + return df + except requests.exceptions.RequestException as e: + logger.error(f"Failed to download file: {e}") + raise + except Exception as e: + logger.error(f"Failed to process file: {e}") + raise + +if __name__ == "__main__": + # 测试函数 + test_url = "https://data.binance.vision/data/futures/um/monthly/klines/BTCUSDT/1d/BTCUSDT-1d-2024-01.zip" + + # CSV列名 + columns = [ + "open_time", "open", "high", "low", "close", "volume", + "close_time", "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" + ] + + try: + df = download_unzip_csv(test_url, header=None, names=columns) + print(f"DataFrame shape: {df.shape}") + print("DataFrame head:") + print(df.head()) + print("DataFrame dtypes:") + print(df.dtypes) + except Exception as e: + print(f"Error: {e}")