diff --git a/cloudbt/run_cloudbt.py b/cloudbt/run_cloudbt.py index a850e6b..4ea860c 100644 --- a/cloudbt/run_cloudbt.py +++ b/cloudbt/run_cloudbt.py @@ -2,7 +2,10 @@ import argparse import datetime import os -from typing import List, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +import threading +from typing import Dict, List, Optional, Tuple import requests from myscripts import working_tool import dotenv @@ -91,60 +94,97 @@ def get_pair_data_file_path(pair: str, config_path: str, timeframe: str) -> Opti print(f"Warning: Failed to get data file path for pair {pair}: {e}") return None +import hashlib +import json + +DATE_CACHE_DIR = Path('.cache') / 'pairs_date' + +def _get_cache_path(file_path: Path) -> Path: + mtime = file_path.stat().st_mtime + file_key = f"{file_path}:{mtime}" + hash_key = hashlib.sha256(file_key.encode()).hexdigest()[:32] + return DATE_CACHE_DIR / f"{hash_key}.json" + +def _load_date_cache(file_path: Path) -> Optional[Tuple[pd.Timestamp, pd.Timestamp]]: + cache_path = _get_cache_path(file_path) + if cache_path.exists(): + try: + with open(cache_path, 'r') as f: + data = json.load(f) + return (pd.Timestamp(data['min']), pd.Timestamp(data['max'])) + except Exception: + pass + return None + +def _save_date_cache(file_path: Path, date_min: pd.Timestamp, date_max: pd.Timestamp) -> None: + DATE_CACHE_DIR.mkdir(parents=True, exist_ok=True) + cache_path = _get_cache_path(file_path) + try: + with open(cache_path, 'w') as f: + json.dump({'min': str(date_min), 'max': str(date_max)}, f) + except Exception as e: + print(f"Warning: Failed to save date cache for {file_path}: {e}") + def filter_pairs_by_timerange(pairs: List[str], timerange: str, config_path: str, timeframe: str) -> List[str]: """Filter pairs based on timerange, checking if data exists for the given timerange""" print(f"Checking pair data availability for timerange: {timerange}...") - valid_pairs = [] - invalid_pairs = [] try: - # Parse timerange string into start and end dates start_str, end_str = timerange.split('-') start_date = datetime.datetime.strptime(start_str, '%Y%m%d') end_date = datetime.datetime.strptime(end_str, '%Y%m%d') - - # Convert to pandas Timestamps for comparison start_ts = pd.Timestamp(start_date) end_ts = pd.Timestamp(end_date) - - for pair in pairs: - file_path = get_pair_data_file_path(pair, config_path, timeframe) - if file_path is not None: - try: - # Read only the date column to check data availability - df = pd.read_feather(file_path, columns=['date']) - if not df.empty: - # Convert date column to datetime - df['date'] = pd.to_datetime(df['date']) - - # Remove timezone if present - if df['date'].dt.tz is not None: - df['date'] = df['date'].dt.tz_convert('UTC').dt.tz_localize(None) - - # Check if there's data within the timerange - mask = (df['date'] >= start_ts) & (df['date'] <= end_ts) - if any(mask): - valid_pairs.append(pair) - else: - invalid_pairs.append(pair) - else: - invalid_pairs.append(pair) - except Exception as e: - print(f"Warning: Error reading data for pair {pair}: {e}") - invalid_pairs.append(pair) - else: - invalid_pairs.append(pair) except Exception as e: print(f"Error parsing timerange: {e}") return [] - + + def get_date_range(file_path: Path) -> Tuple[pd.Timestamp, pd.Timestamp]: + cached = _load_date_cache(file_path) + if cached is not None: + return cached + + try: + df = pd.read_feather(file_path, columns=['date']) + if df.empty: + result = (pd.Timestamp.max, pd.Timestamp.min) + else: + df['date'] = pd.to_datetime(df['date']) + if df['date'].dt.tz is not None: + df['date'] = df['date'].dt.tz_convert('UTC').dt.tz_localize(None) + result = (df['date'].min(), df['date'].max()) + + _save_date_cache(file_path, result[0], result[1]) + return result + except Exception as e: + print(f"Warning: Error reading data for {file_path}: {e}") + result = (pd.Timestamp.max, pd.Timestamp.min) + _save_date_cache(file_path, result[0], result[1]) + return result + + def check_pair(pair: str) -> Tuple[str, bool]: + file_path = get_pair_data_file_path(pair, config_path, timeframe) + if file_path is None: + return (pair, False) + + file_min, file_max = get_date_range(file_path) + has_data = not (file_max < start_ts or file_min > end_ts) + return (pair, has_data) + + max_workers = min(32, len(pairs), os.cpu_count() or 4 * 5) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(executor.map(check_pair, pairs)) + + valid_pairs = [pair for pair, is_valid in results if is_valid] + invalid_pairs = [pair for pair, is_valid in results if not is_valid] + if invalid_pairs: print(f"Filtered out {len(invalid_pairs)} pairs with no data in the specified timerange:") - for pair in invalid_pairs[:10]: # Show only first 10 + for pair in invalid_pairs[:10]: print(f" - {pair}") if len(invalid_pairs) > 10: print(f" ... and {len(invalid_pairs) - 10} more pairs") - + print(f"Kept {len(valid_pairs)} pairs with available data") return valid_pairs