Compare commits

...

8 Commits

3 changed files with 642 additions and 0 deletions

508
download_binance_kline.py Normal file
View File

@@ -0,0 +1,508 @@
import requests
import os
import pandas as pd
from io import BytesIO
import psycopg2
from psycopg2.extras import execute_values
import logging
from datetime import datetime
import xml.etree.ElementTree as ET
from download_unzip_csv import download_unzip_csv
from dynaconf import Dynaconf
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 初始化Dynaconf配置
settings = Dynaconf(
envvar_prefix="DYNACONF",
settings_files=["settings.toml"],
environments=True,
load_dotenv=False,
default_env="default",
env_switcher="BINANCE_ENV"
)
# 配置参数
BASE_URL = settings.BASE_URL
SYMBOLS = settings.SYMBOLS
INTERVAL = settings.INTERVAL
# 代理配置
PROXIES = {
'http': settings.HTTP_PROXY,
'https': settings.HTTPS_PROXY
} if settings.PROXY_ENABLED else None
# PostgreSQL配置
DB_CONFIG = {
"host": settings.DB_HOST,
"database": settings.DB_DATABASE,
"user": settings.DB_USER,
"password": settings.DB_PASSWORD,
"port": settings.DB_PORT
}
# K线数据列名
KLINE_COLUMNS = settings.KLINE_COLUMNS
def validate_kline_data(df):
"""
校验K线数据的完整性和合理性
参数:
df: 包含K线数据的DataFrame
返回:
bool: 数据是否有效
str: 校验结果说明
"""
if df.empty:
return False, "数据为空"
# 1. 检查必要列是否存在
required_columns = ['symbol', 'open_time', 'close_time', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"缺少必要列: {missing_columns}"
# 2. 检查数据类型
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_columns:
if not pd.api.types.is_numeric_dtype(df[col]):
return False, f"{col} 不是数值类型"
# 3. 检查时间列的连续性
df_sorted = df.sort_values('open_time')
time_diff = df_sorted['open_time'].diff().dropna()
if not time_diff.empty:
expected_diff = pd.Timedelta(days=1) # 默认1天间隔
# 允许有少量异常值(如月初月末),但大部分数据应该符合预期间隔
if (time_diff != expected_diff).mean() > 0.1: # 超过10%的数据不符合预期间隔
return False, f"时间间隔不符合预期,预期{expected_diff},实际差异较大"
# 4. 检查价格的合理性
# 最高价 >= 最低价
if (df['high'] < df['low']).any():
return False, "存在最高价小于最低价的记录"
# 最高价 >= 开盘价和收盘价
if ((df['high'] < df['open']) | (df['high'] < df['close'])).any():
return False, "存在最高价小于开盘价或收盘价的记录"
# 最低价 <= 开盘价和收盘价
if ((df['low'] > df['open']) | (df['low'] > df['close'])).any():
return False, "存在最低价大于开盘价或收盘价的记录"
# 5. 检查交易量的合理性
if (df['volume'] < 0).any():
return False, "存在交易量为负数的记录"
# 6. 检查open_time和close_time的关系
if (df['open_time'] >= df['close_time']).any():
return False, "存在open_time大于等于close_time的记录"
# 7. 检查symbol列是否一致
if df['symbol'].nunique() > 1:
return False, "同一批次数据包含多个交易对"
# 8. 检查是否有重复的open_time
if df['open_time'].duplicated().any():
return False, "存在重复的open_time记录"
# 9. 检查数值字段的范围确保符合数据库numeric(20,8)的要求
# numeric(20,8)意味着最多20位数字其中8位小数因此整数部分最多12位
max_value = 10**12 - 1e-8 # 999999999999.99999999
min_value = -max_value
for col in numeric_columns:
if (df[col] > max_value).any() or (df[col] < min_value).any():
return False, f"{col} 的数值超出数据库字段范围 [-{max_value:.8f}, {max_value:.8f}]"
return True, "数据完整性校验通过"
def download_kline_data(symbol, interval, year_month):
"""下载指定月份的K线数据"""
# 拆分年月
year, month = year_month.split("-")
filename = f"{symbol}-{interval}-{year}-{month}.zip"
url = f"{BASE_URL}{symbol}/{interval}/{filename}"
logger.info(f"Downloading {url}")
try:
response = requests.get(url)
response.raise_for_status()
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {filename}: {e}")
return None
def parse_kline_data(zip_data, symbol):
"""解析下载的K线数据"""
try:
df = pd.read_csv(zip_data, compression='zip', header=None, names=KLINE_COLUMNS)
# 跳过标题行
df = df[df["open_time"] != "open_time"]
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime
df["open_time"] = pd.to_datetime(df["open_time"], unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"], unit='ms')
# 转换数值列
numeric_columns = ["open", "high", "low", "close", "volume",
"quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
df[numeric_columns] = df[numeric_columns].astype(float)
df["number_of_trades"] = df["number_of_trades"].astype(int)
# 数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"K线数据校验失败: {message}")
return None
logger.info(f"Parsed {len(df)} rows of K线 data, 数据校验通过")
return df
except Exception as e:
logger.error(f"Failed to parse K线 data: {e}")
return None
def create_connection():
"""创建PostgreSQL连接"""
try:
conn = psycopg2.connect(**DB_CONFIG)
logger.info("Connected to PostgreSQL database")
return conn
except psycopg2.OperationalError as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
return None
def create_table(conn):
"""
创建K线数据表
"""
# 先检查并删除旧表,然后重新创建
drop_table_query = """
DROP TABLE IF EXISTS bn_futures_kline_1d;
"""
create_table_query = """
CREATE TABLE bn_futures_kline_1d (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
open_time TIMESTAMP NOT NULL,
open NUMERIC(20, 8) NOT NULL,
high NUMERIC(20, 8) NOT NULL,
low NUMERIC(20, 8) NOT NULL,
close NUMERIC(20, 8) NOT NULL,
volume NUMERIC(20, 8) NOT NULL,
close_time TIMESTAMP NOT NULL,
quote_asset_volume NUMERIC(20, 8) NOT NULL,
number_of_trades INTEGER NOT NULL,
taker_buy_base_asset_volume NUMERIC(20, 8) NOT NULL,
taker_buy_quote_asset_volume NUMERIC(20, 8) NOT NULL,
ignore NUMERIC(20, 8) NOT NULL,
UNIQUE(symbol, open_time)
);
"""
try:
with conn.cursor() as cur:
cur.execute(drop_table_query)
cur.execute(create_table_query)
conn.commit()
logger.info("Created bn_futures_kline_1d table with updated schema")
except psycopg2.Error as e:
logger.error(f"Failed to create table: {e}")
conn.rollback()
def insert_data(conn, df):
"""
将K线数据插入到数据库避免重复插入
"""
if df.empty:
logger.info("No data to insert")
return
# 最终数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"插入数据库前数据校验失败: {message}")
return
total_records = len(df)
logger.info(f"Preparing to insert {total_records} records")
# 定义要插入的列顺序
insert_columns = [
"symbol", "open_time", "high", "low", "open", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
# 1. 提取所有待插入的(symbol, open_time)组合
symbol_time_df = df[['symbol', 'open_time']].drop_duplicates()
symbol_time_pairs = symbol_time_df.values.tolist()
# 2. 查询数据库中已存在的(symbol, open_time)组合
existing_pairs = []
batch_size = 500 # 批量检查大小
if symbol_time_pairs:
# 分批处理避免SQL语句过长
for i in range(0, len(symbol_time_pairs), batch_size):
batch_pairs = symbol_time_pairs[i:i+batch_size]
# 构建查询条件使用VALUES列表
values_list = ','.join([f"('{symbol}', '{open_time}')" for symbol, open_time in batch_pairs])
check_query = f"""
SELECT symbol, open_time FROM bn_futures_kline_1d
WHERE (symbol, open_time) IN ({values_list})
"""
try:
with conn.cursor() as cur:
cur.execute(check_query)
batch_existing = cur.fetchall()
existing_pairs.extend(batch_existing)
except psycopg2.Error as e:
logger.error(f"Failed to check existing data in batch {i//batch_size + 1}: {e}")
logger.info("Falling back to ON CONFLICT mechanism for this batch")
# 对于失败的批次我们将使用ON CONFLICT机制处理
df_filtered = df
break
else:
# 只有当所有批次都成功时才执行过滤
# 转换为set以便快速查找
existing_set = set((row[0], row[1]) for row in existing_pairs)
logger.info(f"Found {len(existing_set)} existing records in database")
if existing_set:
# 3. 使用更高效的方式过滤DataFrame避免使用apply
# 创建一个包含现有组合的DataFrame
existing_df = pd.DataFrame(list(existing_set), columns=['symbol', 'open_time'])
# 使用merge进行高效过滤
merged_df = df.merge(
existing_df,
on=['symbol', 'open_time'],
how='left',
indicator=True
)
# 只保留在左表中存在而在右表中不存在的记录
df_filtered = merged_df[merged_df['_merge'] == 'left_only'].drop('_merge', axis=1)
else:
df_filtered = df
if df_filtered.empty:
logger.info("All records already exist in database, skipping insertion")
return
logger.info(f"Filtered to {len(df_filtered)} new records to insert (removed {total_records - len(df_filtered)} existing records)")
else:
df_filtered = df
# 4. 准备插入数据
insert_query = """
INSERT INTO bn_futures_kline_1d (
symbol, open_time, high, low, open, close, volume,
close_time, quote_asset_volume, number_of_trades,
taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
) VALUES %s
ON CONFLICT (symbol, open_time) DO NOTHING;
"""
# 转换DataFrame为元组列表
data = [tuple(row) for row in df_filtered[insert_columns].to_numpy()]
# 5. 执行插入
try:
with conn.cursor() as cur:
execute_values(cur, insert_query, data)
conn.commit()
logger.info(f"Successfully inserted {len(data)} new rows into database")
except psycopg2.Error as e:
logger.error(f"Failed to insert data: {e}")
conn.rollback()
def list_s3_files(url, timeout=10):
"""
从S3存储桶的XML响应中提取所有文件的完整URL
参数:
url: S3存储桶的列表URL例如: https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/BOBUSDT/1d/
timeout: 请求超时时间单位秒默认10秒
返回:
list: 完整的文件URL列表
"""
logger.info(f"Listing files from {url}")
try:
# 根据配置决定是否使用代理
if PROXIES:
response = requests.get(url, timeout=timeout, proxies=PROXIES)
else:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
# 解析XML响应
root = ET.fromstring(response.content)
# S3 XML使用的命名空间
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
# 基础下载URL
base_download_url = "https://data.binance.vision/"
# 提取所有Key元素的文本构建完整URL
file_urls = []
for key_elem in root.findall('.//s3:Key', ns):
file_path = key_elem.text
# 只返回zip文件
if file_path and file_path.endswith('.zip'):
# 构建完整URL
full_url = base_download_url + file_path
file_urls.append(full_url)
logger.info(f"Found {len(file_urls)} files")
return file_urls
except requests.exceptions.RequestException as e:
logger.error(f"Failed to list files from {url}: {e}")
return []
except ET.ParseError as e:
logger.error(f"Failed to parse XML response: {e}")
return []
except KeyboardInterrupt:
logger.error(f"Request to {url} was interrupted")
return []
def download_kline_data_by_url(url):
"""
通过完整URL下载K线数据
参数:
url: 完整的K线数据下载URL
返回:
BytesIO: 下载的数据或None如果下载失败
"""
logger.info(f"Downloading {url}")
try:
response = requests.get(url)
response.raise_for_status()
filename = os.path.basename(url)
logger.info(f"Downloaded {filename} successfully")
return BytesIO(response.content)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download {url}: {e}")
return None
def process_symbol(symbol, interval=INTERVAL):
"""
处理指定交易对的所有K线数据包括下载、解析、合并和插入数据库
参数:
symbol: 交易对,例如: "BTCUSDT"
interval: 时间间隔,例如: "1d"默认使用全局INTERVAL
返回:
pandas.DataFrame: 合并后的K线数据
"""
logger.info(f"Processing symbol: {symbol}, interval: {interval}")
# 组装S3列表URL
s3_url = f"https://s3-ap-northeast-1.amazonaws.com/data.binance.vision?delimiter=/&prefix=data/futures/um/monthly/klines/{symbol}/{interval}/"
# 获取所有可下载文件URL
file_urls = list_s3_files(s3_url)
if not file_urls:
logger.warning(f"No files found for {symbol}-{interval}")
return None
# 合并所有DataFrame
all_dfs = []
for file_url in file_urls:
if not file_url.endswith('.zip'):
continue
try:
# 调用download_unzip_csv下载并解析数据
df = download_unzip_csv(file_url, header=None, names=KLINE_COLUMNS)
# 添加symbol列
df["symbol"] = symbol
# 转换时间戳为datetime先转换为数值类型以避免FutureWarning
df["open_time"] = pd.to_datetime(df["open_time"].astype(float), unit='ms')
df["close_time"] = pd.to_datetime(df["close_time"].astype(float), unit='ms')
# 数据完整性校验
is_valid, message = validate_kline_data(df)
if not is_valid:
logger.error(f"文件 {os.path.basename(file_url)} 数据校验失败: {message}")
continue
all_dfs.append(df)
logger.info(f"Processed {os.path.basename(file_url)} with {len(df)} rows, 数据校验通过")
except Exception as e:
logger.error(f"Failed to process {file_url}: {e}")
continue
if not all_dfs:
logger.warning(f"No data processed for {symbol}-{interval}")
return None
# 合并所有DataFrame
merged_df = pd.concat(all_dfs, ignore_index=True)
logger.info(f"Merged {len(all_dfs)} files into a single DataFrame with {len(merged_df)} rows")
# 去重
merged_df = merged_df.drop_duplicates(subset=["symbol", "open_time"])
logger.info(f"After deduplication, {len(merged_df)} rows remain")
# 插入到PostgreSQL数据库
conn = create_connection()
if conn:
try:
# 确保表存在
create_table(conn)
# 插入数据
insert_data(conn, merged_df)
logger.info(f"Successfully inserted {len(merged_df)} rows into database for {symbol}")
finally:
# 关闭连接
conn.close()
return merged_df
def main():
"""主函数,处理所有配置的交易对"""
logger.info("Starting main process for symbols: %s", SYMBOLS)
for symbol in SYMBOLS:
try:
process_symbol(symbol, INTERVAL)
except Exception as e:
logger.error(f"Failed to process symbol {symbol}: {e}")
logger.info("Script completed successfully")
if __name__ == "__main__":
# 测试新添加的process_symbol函数
import sys
if len(sys.argv) > 1:
# 从命令行获取交易对
symbol = sys.argv[1]
interval = sys.argv[2] if len(sys.argv) > 2 else INTERVAL
process_symbol(symbol, interval)
else:
# 默认运行main函数
main()

104
download_unzip_csv.py Normal file
View File

@@ -0,0 +1,104 @@
import requests
import pandas as pd
from io import BytesIO
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def download_unzip_csv(url, **kwargs):
"""
从URL下载文件解压并返回CSV内容作为pandas DataFrame
参数:
url: 文件下载URL
**kwargs: 传递给pandas.read_csv的额外参数
返回:
pandas.DataFrame: 解压后的CSV内容
"""
logger.info(f"Downloading file from {url}")
try:
# 下载文件
response = requests.get(url)
response.raise_for_status() # 检查下载是否成功
logger.info(f"Successfully downloaded file from {url}")
# 使用BytesIO处理下载的内容
zip_data = BytesIO(response.content)
# 读取并解压CSV文件
logger.info("Reading and decompressing CSV file")
df = pd.read_csv(zip_data, compression='zip', **kwargs)
# 智能判定首行是否为标题行
def is_header_row(row):
"""智能检测行是否为标题行"""
# 检查条件
conditions = [
# 1. 首行包含常见的时间相关列名
any(keyword in str(cell).lower() for keyword in ['time', 'date', 'datetime', 'timestamp'] for cell in row),
# 2. 首行包含常见的价格相关列名
any(keyword in str(cell).lower() for keyword in ['open', 'high', 'low', 'close', 'volume'] for cell in row),
# 3. 首行包含常见的交易相关列名
any(keyword in str(cell).lower() for keyword in ['taker', 'quote', 'count', 'ignore'] for cell in row),
# 4. 首行全为字符串,而第二行包含数值
len(df) > 1 and all(isinstance(str(cell), str) and not str(cell).replace('.', '').isdigit() for cell in row) and \
any(str(cell).replace('.', '').isdigit() for cell in df.iloc[1] if pd.notna(cell)),
# 5. 首行包含'_'字符(常见于编程命名的列名)
any('_' in str(cell) for cell in row)
]
return any(conditions)
if len(df) > 0 and is_header_row(df.iloc[0]):
df = df[1:].reset_index(drop=True)
logger.info("Skipped header row")
# 转换数值列
numeric_columns = ['open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_asset_volume', 'number_of_trades',
'taker_buy_volume', 'taker_buy_quote_volume', 'taker_buy_base_asset_volume',
'taker_buy_quote_asset_volume', 'ignore']
# 只转换存在的列
for col in numeric_columns:
if col in df.columns:
try:
if col in ['number_of_trades', 'count']:
df[col] = df[col].astype(int)
else:
df[col] = df[col].astype(float)
except ValueError:
logger.warning(f"Could not convert column {col} to numeric type")
logger.info(f"Successfully parsed CSV file with {len(df)} rows")
return df
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download file: {e}")
raise
except Exception as e:
logger.error(f"Failed to process file: {e}")
raise
if __name__ == "__main__":
# 测试函数
test_url = "https://data.binance.vision/data/futures/um/monthly/klines/BTCUSDT/1d/BTCUSDT-1d-2024-01.zip"
# CSV列名
columns = [
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
try:
df = download_unzip_csv(test_url, header=None, names=columns)
print(f"DataFrame shape: {df.shape}")
print("DataFrame head:")
print(df.head())
print("DataFrame dtypes:")
print(df.dtypes)
except Exception as e:
print(f"Error: {e}")

30
settings.toml Normal file
View File

@@ -0,0 +1,30 @@
# Binance K线数据下载配置
[default]
# 基础URL
BASE_URL = "https://data.binance.vision/data/futures/um/monthly/klines/"
# 默认交易对
SYMBOLS = ["BOBUSDT", "BTCUSDT"]
# 默认时间间隔
INTERVAL = "1d"
# K线数据列名
KLINE_COLUMNS = [
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
# 代理配置
PROXY_ENABLED = true
HTTP_PROXY = "http://localhost:1080"
HTTPS_PROXY = "http://localhost:1080"
# PostgreSQL数据库配置
DB_HOST = "localhost"
DB_PORT = 5432
DB_DATABASE = "graphql-engine"
DB_USER = "root"
DB_PASSWORD = "pgsqlpasswd.."