Compare commits
6 Commits
7fb579be6e
...
28d569bf1e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28d569bf1e | ||
|
|
b8c70ad448 | ||
|
|
3dbb3b5cb5 | ||
|
|
27a8e3d64b | ||
|
|
1849e67f54 | ||
|
|
0786311907 |
373
download_binance_kline.py
Normal file
373
download_binance_kline.py
Normal file
@@ -0,0 +1,373 @@
|
||||
import requests
|
||||
import os
|
||||
import pandas as pd
|
||||
from io import BytesIO
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import xml.etree.ElementTree as ET
|
||||
from download_unzip_csv import download_unzip_csv
|
||||
from dynaconf import Dynaconf
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 初始化Dynaconf配置
|
||||
settings = Dynaconf(
|
||||
envvar_prefix="DYNACONF",
|
||||
settings_files=["settings.toml"],
|
||||
environments=True,
|
||||
load_dotenv=False,
|
||||
default_env="default",
|
||||
env_switcher="BINANCE_ENV"
|
||||
)
|
||||
|
||||
# 配置参数
|
||||
BASE_URL = settings.BASE_URL
|
||||
SYMBOLS = settings.SYMBOLS
|
||||
INTERVAL = settings.INTERVAL
|
||||
|
||||
# 代理配置
|
||||
PROXIES = {
|
||||
'http': settings.HTTP_PROXY,
|
||||
'https': settings.HTTPS_PROXY
|
||||
} if settings.PROXY_ENABLED else None
|
||||
|
||||
# PostgreSQL配置
|
||||
DB_CONFIG = {
|
||||
"host": settings.DB_HOST,
|
||||
"database": settings.DB_DATABASE,
|
||||
"user": settings.DB_USER,
|
||||
"password": settings.DB_PASSWORD,
|
||||
"port": settings.DB_PORT
|
||||
}
|
||||
|
||||
# K线数据列名
|
||||
KLINE_COLUMNS = settings.KLINE_COLUMNS
|
||||
|
||||
def download_kline_data(symbol, interval, year_month):
|
||||
"""下载指定月份的K线数据"""
|
||||
# 拆分年月
|
||||
year, month = year_month.split("-")
|
||||
filename = f"{symbol}-{interval}-{year}-{month}.zip"
|
||||
url = f"{BASE_URL}{symbol}/{interval}/{filename}"
|
||||
logger.info(f"Downloading {url}")
|
||||
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Downloaded {filename} successfully")
|
||||
return BytesIO(response.content)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to download {filename}: {e}")
|
||||
return None
|
||||
|
||||
def parse_kline_data(zip_data, symbol):
|
||||
"""解析下载的K线数据"""
|
||||
try:
|
||||
df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS)
|
||||
# 跳过标题行
|
||||
df = df[df["open_time"] != "open_time"]
|
||||
# 添加symbol列
|
||||
df["symbol"] = symbol
|
||||
# 转换时间戳为datetime
|
||||
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms')
|
||||
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms')
|
||||
# 转换数值列
|
||||
numeric_columns = ["open", "high", "low", "close", "volume",
|
||||
"quote_asset_volume", "number_of_trades",
|
||||
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
|
||||
df[numeric_columns] = df[numeric_columns].astype(float)
|
||||
df["number_of_trades"] = df["number_of_trades"].astype(int)
|
||||
logger.info(f"Parsed {len(df)} rows of K线 data")
|
||||
return df
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse K线 data: {e}")
|
||||
return None
|
||||
|
||||
def create_connection():
|
||||
"""创建PostgreSQL连接"""
|
||||
try:
|
||||
conn = psycopg2.connect(**DB_CONFIG)
|
||||
logger.info("Connected to PostgreSQL database")
|
||||
return conn
|
||||
except psycopg2.OperationalError as e:
|
||||
logger.error(f"Failed to connect to PostgreSQL: {e}")
|
||||
return None
|
||||
|
||||
def create_table(conn):
|
||||
"""创建K线数据表"""
|
||||
create_table_query = """
|
||||
CREATE TABLE IF NOT EXISTS binance_kline (
|
||||
id SERIAL PRIMARY KEY,
|
||||
symbol VARCHAR(20) NOT NULL,
|
||||
open_time TIMESTAMP NOT NULL,
|
||||
open NUMERIC(18, 8) NOT NULL,
|
||||
high NUMERIC(18, 8) NOT NULL,
|
||||
low NUMERIC(18, 8) NOT NULL,
|
||||
close NUMERIC(18, 8) NOT NULL,
|
||||
volume NUMERIC(18, 8) NOT NULL,
|
||||
close_time TIMESTAMP NOT NULL,
|
||||
quote_asset_volume NUMERIC(18, 8) NOT NULL,
|
||||
number_of_trades INTEGER NOT NULL,
|
||||
taker_buy_base_asset_volume NUMERIC(18, 8) NOT NULL,
|
||||
taker_buy_quote_asset_volume NUMERIC(18, 8) NOT NULL,
|
||||
ignore NUMERIC(18, 8) NOT NULL,
|
||||
UNIQUE(symbol, open_time)
|
||||
);
|
||||
"""
|
||||
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(create_table_query)
|
||||
conn.commit()
|
||||
logger.info("Created binance_kline table")
|
||||
except psycopg2.Error as e:
|
||||
logger.error(f"Failed to create table: {e}")
|
||||
conn.rollback()
|
||||
|
||||
def insert_data(conn, df):
|
||||
"""将K线数据插入到数据库,避免重复插入"""
|
||||
if df.empty:
|
||||
logger.info("No data to insert")
|
||||
return
|
||||
|
||||
# 定义要插入的列顺序
|
||||
insert_columns = [
|
||||
"symbol", "open_time", "open", "high", "low", "close", "volume",
|
||||
"close_time", "quote_asset_volume", "number_of_trades",
|
||||
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
||||
]
|
||||
|
||||
# 1. 提取所有待插入的(symbol, open_time)组合
|
||||
symbol_time_pairs = df[['symbol', 'open_time']].values.tolist()
|
||||
|
||||
# 2. 查询数据库中已存在的(symbol, open_time)组合
|
||||
if symbol_time_pairs:
|
||||
# 构建查询条件,使用VALUES列表
|
||||
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in symbol_time_pairs])
|
||||
check_query = f"""
|
||||
SELECT symbol, open_time FROM binance_kline
|
||||
WHERE (symbol, open_time) IN ({values_list})
|
||||
"""
|
||||
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(check_query)
|
||||
existing_pairs = cur.fetchall()
|
||||
|
||||
# 转换为set以便快速查找
|
||||
existing_set = set((row[0], row[1]) for row in existing_pairs)
|
||||
logger.info(f"Found {len(existing_set)} existing records in database")
|
||||
|
||||
# 3. 过滤DataFrame,只保留不存在的数据
|
||||
df_filtered = df[~df.apply(lambda row: (row['symbol'], row['open_time']) in existing_set, axis=1)]
|
||||
|
||||
if df_filtered.empty:
|
||||
logger.info("All records already exist in database, skipping insertion")
|
||||
return
|
||||
|
||||
logger.info(f"Filtered to {len(df_filtered)} new records to insert")
|
||||
except psycopg2.Error as e:
|
||||
logger.error(f"Failed to check existing data: {e}")
|
||||
logger.info("Falling back to ON CONFLICT mechanism")
|
||||
df_filtered = df
|
||||
else:
|
||||
df_filtered = df
|
||||
|
||||
# 4. 准备插入数据
|
||||
insert_query = """
|
||||
INSERT INTO binance_kline (
|
||||
symbol, open_time, open, high, low, close, volume,
|
||||
close_time, quote_asset_volume, number_of_trades,
|
||||
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
|
||||
) VALUES %s
|
||||
ON CONFLICT (symbol, open_time) DO NOTHING;
|
||||
"""
|
||||
|
||||
data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()]
|
||||
|
||||
# 5. 执行插入
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
execute_values(cur, insert_query, data)
|
||||
conn.commit()
|
||||
logger.info(f"Inserted {len(data)} new rows into database")
|
||||
except psycopg2.Error as e:
|
||||
logger.error(f"Failed to insert data: {e}")
|
||||
conn.rollback()
|
||||
|
||||
def list_s3_files(url, timeout=10):
|
||||
"""
|
||||
从S3存储桶的XML响应中提取所有文件的完整URL
|
||||
|
||||
参数:
|
||||
url: S3存储桶的列表URL,例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/
|
||||
timeout: 请求超时时间,单位秒,默认10秒
|
||||
|
||||
返回:
|
||||
list: 完整的文件URL列表
|
||||
"""
|
||||
logger.info(f"Listing files from {url}")
|
||||
|
||||
try:
|
||||
# 根据配置决定是否使用代理
|
||||
if PROXIES:
|
||||
response = requests.get(url, timeout=timeout, proxies=PROXIES)
|
||||
else:
|
||||
response = requests.get(url, timeout=timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
# 解析XML响应
|
||||
root = ET.fromstring(response.content)
|
||||
|
||||
# S3 XML使用的命名空间
|
||||
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
|
||||
|
||||
# 基础下载URL
|
||||
base_download_url = "https://data.binance.vision/"
|
||||
|
||||
# 提取所有Key元素的文本,构建完整URL
|
||||
file_urls = []
|
||||
for key_elem in root.findall('.//s3:Key', ns):
|
||||
file_path = key_elem.text
|
||||
# 只返回zip文件
|
||||
if file_path and file_path.endswith('.zip'):
|
||||
# 构建完整URL
|
||||
full_url = base_download_url + file_path
|
||||
file_urls.append(full_url)
|
||||
|
||||
logger.info(f"Found {len(file_urls)} files")
|
||||
return file_urls
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to list files from {url}: {e}")
|
||||
return []
|
||||
except ET.ParseError as e:
|
||||
logger.error(f"Failed to parse XML response: {e}")
|
||||
return []
|
||||
except KeyboardInterrupt:
|
||||
logger.error(f"Request to {url} was interrupted")
|
||||
return []
|
||||
|
||||
def download_kline_data_by_url(url):
|
||||
"""
|
||||
通过完整URL下载K线数据
|
||||
|
||||
参数:
|
||||
url: 完整的K线数据下载URL
|
||||
|
||||
返回:
|
||||
BytesIO: 下载的数据,或None如果下载失败
|
||||
"""
|
||||
logger.info(f"Downloading {url}")
|
||||
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
filename = os.path.basename(url)
|
||||
logger.info(f"Downloaded {filename} successfully")
|
||||
return BytesIO(response.content)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to download {url}: {e}")
|
||||
return None
|
||||
|
||||
def process_symbol(symbol, interval=INTERVAL):
|
||||
"""
|
||||
处理指定交易对的所有K线数据,包括下载、解析、合并和插入数据库
|
||||
|
||||
参数:
|
||||
symbol: 交易对,例如: "BTCUSDT"
|
||||
interval: 时间间隔,例如: "1d",默认使用全局INTERVAL
|
||||
|
||||
返回:
|
||||
pandas.DataFrame: 合并后的K线数据
|
||||
"""
|
||||
logger.info(f"Processing symbol: {symbol}, interval: {interval}")
|
||||
|
||||
# 组装S3列表URL
|
||||
s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{interval}/"
|
||||
|
||||
# 获取所有可下载文件URL
|
||||
file_urls = list_s3_files(s3_url)
|
||||
|
||||
if not file_urls:
|
||||
logger.warning(f"No files found for {symbol}-{interval}")
|
||||
return None
|
||||
|
||||
# 合并所有DataFrame
|
||||
all_dfs = []
|
||||
for file_url in file_urls:
|
||||
if not file_url.endswith('.zip'):
|
||||
continue
|
||||
|
||||
try:
|
||||
# 调用download_unzip_csv下载并解析数据
|
||||
df = download_unzip_csv(file_url, header=None, names=KLINE_COLUMNS)
|
||||
|
||||
# 添加symbol列
|
||||
df["symbol"] = symbol
|
||||
|
||||
# 转换时间戳为datetime(先转换为数值类型以避免FutureWarning)
|
||||
df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms')
|
||||
df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms')
|
||||
|
||||
all_dfs.append(df)
|
||||
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process {file_url}: {e}")
|
||||
continue
|
||||
|
||||
if not all_dfs:
|
||||
logger.warning(f"No data processed for {symbol}-{interval}")
|
||||
return None
|
||||
|
||||
# 合并所有DataFrame
|
||||
merged_df = pd.concat(all_dfs, ignore_index=True)
|
||||
logger.info(f"Merged {len(all_dfs)} files into a single DataFrame with {len(merged_df)} rows")
|
||||
|
||||
# 去重
|
||||
merged_df = merged_df.drop_duplicates(subset=["symbol", "open_time"])
|
||||
logger.info(f"After deduplication, {len(merged_df)} rows remain")
|
||||
|
||||
# 插入到PostgreSQL数据库
|
||||
conn = create_connection()
|
||||
if conn:
|
||||
try:
|
||||
# 确保表存在
|
||||
create_table(conn)
|
||||
|
||||
# 插入数据
|
||||
insert_data(conn, merged_df)
|
||||
logger.info(f"Successfully inserted {len(merged_df)} rows into database for {symbol}")
|
||||
finally:
|
||||
# 关闭连接
|
||||
conn.close()
|
||||
|
||||
return merged_df
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数,处理所有配置的交易对"""
|
||||
logger.info("Starting main process for symbols: %s", SYMBOLS)
|
||||
|
||||
for symbol in SYMBOLS:
|
||||
try:
|
||||
process_symbol(symbol, INTERVAL)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process symbol {symbol}: {e}")
|
||||
|
||||
logger.info("Script completed successfully")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 测试新添加的process_symbol函数
|
||||
import sys
|
||||
if len(sys.argv) > 1:
|
||||
# 从命令行获取交易对
|
||||
symbol = sys.argv[1]
|
||||
interval = sys.argv[2] if len(sys.argv) > 2 else INTERVAL
|
||||
process_symbol(symbol, interval)
|
||||
else:
|
||||
# 默认运行main函数
|
||||
main()
|
||||
104
download_unzip_csv.py
Normal file
104
download_unzip_csv.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import requests
|
||||
import pandas as pd
|
||||
from io import BytesIO
|
||||
import logging
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def download_unzip_csv(url, **kwargs):
|
||||
"""
|
||||
从URL下载文件,解压并返回CSV内容作为pandas DataFrame
|
||||
|
||||
参数:
|
||||
url: 文件下载URL
|
||||
**kwargs: 传递给pandas.read_csv的额外参数
|
||||
|
||||
返回:
|
||||
pandas.DataFrame: 解压后的CSV内容
|
||||
"""
|
||||
logger.info(f"Downloading file from {url}")
|
||||
|
||||
try:
|
||||
# 下载文件
|
||||
response = requests.get(url)
|
||||
response.raise_for_status() # 检查下载是否成功
|
||||
logger.info(f"Successfully downloaded file from {url}")
|
||||
|
||||
# 使用BytesIO处理下载的内容
|
||||
zip_data = BytesIO(response.content)
|
||||
|
||||
# 读取并解压CSV文件
|
||||
logger.info("Reading and decompressing CSV file")
|
||||
df = pd.read_csv(zip_data, compression='zip', **kwargs)
|
||||
|
||||
# 智能判定首行是否为标题行
|
||||
def is_header_row(row):
|
||||
"""智能检测行是否为标题行"""
|
||||
# 检查条件
|
||||
conditions = [
|
||||
# 1. 首行包含常见的时间相关列名
|
||||
any(keyword in str(cell).lower() for keyword in ['time', 'date', 'datetime', 'timestamp'] for cell in row),
|
||||
# 2. 首行包含常见的价格相关列名
|
||||
any(keyword in str(cell).lower() for keyword in ['open', 'high', 'low', 'close', 'volume'] for cell in row),
|
||||
# 3. 首行包含常见的交易相关列名
|
||||
any(keyword in str(cell).lower() for keyword in ['taker', 'quote', 'count', 'ignore'] for cell in row),
|
||||
# 4. 首行全为字符串,而第二行包含数值
|
||||
len(df) > 1 and all(isinstance(str(cell), str) and not str(cell).replace('.', '').isdigit() for cell in row) and \
|
||||
any(str(cell).replace('.', '').isdigit() for cell in df.iloc[1] if pd.notna(cell)),
|
||||
# 5. 首行包含'_'字符(常见于编程命名的列名)
|
||||
any('_' in str(cell) for cell in row)
|
||||
]
|
||||
return any(conditions)
|
||||
|
||||
if len(df) > 0 and is_header_row(df.iloc[0]):
|
||||
df = df[1:].reset_index(drop=True)
|
||||
logger.info("Skipped header row")
|
||||
|
||||
# 转换数值列
|
||||
numeric_columns = ['open', 'high', 'low', 'close', 'volume',
|
||||
'close_time', 'quote_asset_volume', 'number_of_trades',
|
||||
'taker_buy_volume', 'taker_buy_quote_volume', 'taker_buy_base_asset_volume',
|
||||
'taker_buy_quote_asset_volume', 'ignore']
|
||||
|
||||
# 只转换存在的列
|
||||
for col in numeric_columns:
|
||||
if col in df.columns:
|
||||
try:
|
||||
if col in ['number_of_trades', 'count']:
|
||||
df[col] = df[col].astype(int)
|
||||
else:
|
||||
df[col] = df[col].astype(float)
|
||||
except ValueError:
|
||||
logger.warning(f"Could not convert column {col} to numeric type")
|
||||
|
||||
logger.info(f"Successfully parsed CSV file with {len(df)} rows")
|
||||
return df
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to download file: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process file: {e}")
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 测试函数
|
||||
test_url = "https://data.binance.vision/data/futures/um/monthly/klines/BTCUSDT/1d/BTCUSDT-1d-2024-01.zip"
|
||||
|
||||
# CSV列名
|
||||
columns = [
|
||||
"open_time", "open", "high", "low", "close", "volume",
|
||||
"close_time", "quote_asset_volume", "number_of_trades",
|
||||
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
||||
]
|
||||
|
||||
try:
|
||||
df = download_unzip_csv(test_url, header=None, names=columns)
|
||||
print(f"DataFrame shape: {df.shape}")
|
||||
print("DataFrame head:")
|
||||
print(df.head())
|
||||
print("DataFrame dtypes:")
|
||||
print(df.dtypes)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
30
settings.toml
Normal file
30
settings.toml
Normal file
@@ -0,0 +1,30 @@
|
||||
# Binance K线数据下载配置
|
||||
|
||||
[default]
|
||||
# 基础URL
|
||||
BASE_URL = "https://data.binance.vision/data/futures/um/monthly/klines/"
|
||||
|
||||
# 默认交易对
|
||||
SYMBOLS = ["BOBUSDT", "BTCUSDT"]
|
||||
|
||||
# 默认时间间隔
|
||||
INTERVAL = "1d"
|
||||
|
||||
# K线数据列名
|
||||
KLINE_COLUMNS = [
|
||||
"open_time", "open", "high", "low", "close", "volume",
|
||||
"close_time", "quote_asset_volume", "number_of_trades",
|
||||
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
||||
]
|
||||
|
||||
# 代理配置
|
||||
PROXY_ENABLED = true
|
||||
HTTP_PROXY = "http://localhost:1080"
|
||||
HTTPS_PROXY = "http://localhost:1080"
|
||||
|
||||
# PostgreSQL数据库配置
|
||||
DB_HOST = "localhost"
|
||||
DB_PORT = 5432
|
||||
DB_DATABASE = "your_database"
|
||||
DB_USER = "your_username"
|
||||
DB_PASSWORD = "your_password"
|
||||
Reference in New Issue
Block a user