Files
prefect-code/download_unzip_csv.py
2026-01-14 10:58:33 +08:00

105 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pandas as pd
from io import BytesIO
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def download_unzip_csv(url, **kwargs):
"""
从URL下载文件解压并返回CSV内容作为pandas DataFrame
参数:
url: 文件下载URL
**kwargs: 传递给pandas.read_csv的额外参数
返回:
pandas.DataFrame: 解压后的CSV内容
"""
logger.info(f"Downloading file from {url}")
try:
# 下载文件
response = requests.get(url)
response.raise_for_status() # 检查下载是否成功
logger.info(f"Successfully downloaded file from {url}")
# 使用BytesIO处理下载的内容
zip_data = BytesIO(response.content)
# 读取并解压CSV文件
logger.info("Reading and decompressing CSV file")
df = pd.read_csv(zip_data, compression='zip', **kwargs)
# 智能判定首行是否为标题行
def is_header_row(row):
"""智能检测行是否为标题行"""
# 检查条件
conditions = [
# 1. 首行包含常见的时间相关列名
any(keyword in str(cell).lower() for keyword in ['time', 'date', 'datetime', 'timestamp'] for cell in row),
# 2. 首行包含常见的价格相关列名
any(keyword in str(cell).lower() for keyword in ['open', 'high', 'low', 'close', 'volume'] for cell in row),
# 3. 首行包含常见的交易相关列名
any(keyword in str(cell).lower() for keyword in ['taker', 'quote', 'count', 'ignore'] for cell in row),
# 4. 首行全为字符串,而第二行包含数值
len(df) > 1 and all(isinstance(str(cell), str) and not str(cell).replace('.', '').isdigit() for cell in row) and \
any(str(cell).replace('.', '').isdigit() for cell in df.iloc[1] if pd.notna(cell)),
# 5. 首行包含'_'字符(常见于编程命名的列名)
any('_' in str(cell) for cell in row)
]
return any(conditions)
if len(df) > 0 and is_header_row(df.iloc[0]):
df = df[1:].reset_index(drop=True)
logger.info("Skipped header row")
# 转换数值列
numeric_columns = ['open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_asset_volume', 'number_of_trades',
'taker_buy_volume', 'taker_buy_quote_volume', 'taker_buy_base_asset_volume',
'taker_buy_quote_asset_volume', 'ignore']
# 只转换存在的列
for col in numeric_columns:
if col in df.columns:
try:
if col in ['number_of_trades', 'count']:
df[col] = df[col].astype(int)
else:
df[col] = df[col].astype(float)
except ValueError:
logger.warning(f"Could not convert column {col} to numeric type")
logger.info(f"Successfully parsed CSV file with {len(df)} rows")
return df
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download file: {e}")
raise
except Exception as e:
logger.error(f"Failed to process file: {e}")
raise
if __name__ == "__main__":
# 测试函数
test_url = "https://data.binance.vision/data/futures/um/monthly/klines/BTCUSDT/1d/BTCUSDT-1d-2024-01.zip"
# CSV列名
columns = [
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
try:
df = download_unzip_csv(test_url, header=None, names=columns)
print(f"DataFrame shape: {df.shape}")
print("DataFrame head:")
print(df.head())
print("DataFrame dtypes:")
print(df.dtypes)
except Exception as e:
print(f"Error: {e}")