#!/usr/bin/env python3 """ Freqtrade 回测 Prefect Flow 支持从 URL 下载并执行 shell 脚本,或直接提供脚本内容。 参数优先级(从高到低): 1. shell_script(直接提供脚本内容) 2. script_url(从 URL 下载脚本) 使用示例: 1. 使用 script_url 参数: python backtest.py --script-url https://example.com/freqtrade_backtest.sh 2. 使用 shell_script 参数: python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' 3. 使用环境变量(script_url): export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh python backtest.py 4. 使用环境变量(shell_script): export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' python backtest.py 5. 使用配置文件(script_url): python backtest.py --flow-config flow_config.json 6. 使用配置文件(shell_script): python backtest.py --flow-config flow_config_with_script.json 配置文件示例: flow_config.json(使用 script_url): { "script_url": "https://example.com/freqtrade_backtest.sh" } flow_config_with_script.json(使用 shell_script): { "shell_script": "freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data" } 7. 混合使用(shell_script 优先级更高): python backtest.py --script-url https://example.com/freqtrade_backtest.sh \ --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' """ import os import argparse import requests from typing import Optional from prefect import flow, task from prefect_shell import shell_run_command def download_script_from_url(url: str, save_path: str = None) -> str: """ 从指定 URL 下载 shell 脚本 Args: url: 脚本的 URL 地址 save_path: 保存脚本的路径,默认生成临时文件 Returns: 下载的脚本文件路径 """ if not save_path: # 生成临时文件名 save_path = f"freqtrade_backtest_script_{os.getpid()}.sh" print(f"正在从 URL 下载脚本: {url}") # 发送请求下载脚本 response = requests.get(url, timeout=30) response.raise_for_status() # 检查请求是否成功 # 保存脚本到文件 with open(save_path, 'w') as f: f.write(response.text) # 赋予执行权限 os.chmod(save_path, 0o755) print(f"脚本下载完成,保存到: {save_path}") return save_path @task async def run_script( script_url: str = None, shell_script: str = None ) -> dict: """ 执行脚本 Args: script_url: 用于下载脚本的 URL 地址 shell_script: 直接提供脚本内容(优先级高于 script_url) Returns: 包含脚本执行结果的字典 """ script_path = None try: # 确定脚本来源,shell_script 优先级更高 if shell_script: print("使用直接提供的脚本内容") # 生成临时文件名,使用绝对路径 script_path = os.path.abspath(f"freqtrade_backtest_script_{os.getpid()}.sh") # 保存脚本内容到文件 with open(script_path, 'w') as f: f.write(shell_script) # 赋予执行权限 os.chmod(script_path, 0o755) elif script_url: # 从 URL 下载脚本,确保返回绝对路径 script_path = download_script_from_url(script_url) script_path = os.path.abspath(script_path) else: raise ValueError("必须提供 script_url 或 shell_script 参数") # 构建脚本执行命令,使用绝对路径 cmd = [script_path] try: # 执行命令,添加实时输出支持 result_lines = await shell_run_command( command=" ".join(cmd), return_all=True, stream_level=10 # 使用 DEBUG 级别输出,实现实时输出效果 ) # 将结果列表转换为字符串 stdout = "\n".join(result_lines) if isinstance(result_lines, list) else result_lines return { "command": " ".join(cmd), "stdout": stdout, "stderr": "", # shell_run_command 将 stdout 和 stderr 合并输出 "exit_code": 0 # 执行成功 } except RuntimeError as e: # 解析错误信息,提取退出码和错误内容 error_msg = str(e) stderr = error_msg exit_code = 1 # 默认退出码 # 尝试从错误信息中提取退出码 if "exit code" in error_msg: try: exit_code = int(error_msg.split("exit code")[1].split(":")[0].strip()) except (IndexError, ValueError): pass # 任务执行失败,抛出异常 raise RuntimeError(f"脚本执行失败: {error_msg}") from e finally: # 清理临时脚本文件 if script_path and os.path.exists(script_path): try: os.remove(script_path) print(f"已清理临时脚本文件: {script_path}") except Exception as e: print(f"清理临时脚本文件失败: {e}") @flow(name="run-script", log_prints=True) def start( script_url: Optional[str] = None, shell_script: Optional[str] = None ) -> dict: """ Prefect Flow 用于执行脚本 Args: script_url: 用于下载脚本的 URL 地址 shell_script: 直接提供脚本内容(优先级高于 script_url) Returns: 包含脚本执行结果的字典 """ return run_script( script_url=script_url, shell_script=shell_script ) def parse_command_line_args() -> argparse.Namespace: """ 解析命令行参数 Returns: 命令行参数命名空间 """ parser = argparse.ArgumentParser(description="脚本执行 Prefect Flow") # 主参数 parser.add_argument("--script-url", type=str, help="用于下载脚本的 URL 地址") parser.add_argument("--shell-script", type=str, help="直接提供脚本内容") # 配置文件参数 parser.add_argument("--flow-config", type=str, help="Flow 配置文件路径(仅 JSON 格式)") return parser.parse_args() def get_env_vars() -> dict: """ 从环境变量加载配置 Returns: 包含 script_url 和 shell_script 的环境变量配置字典 """ env_vars = {} # 读取环境变量 if "FREQTRADE_SCRIPT_URL" in os.environ: env_vars["script_url"] = os.environ["FREQTRADE_SCRIPT_URL"] if "FREQTRADE_SHELL_SCRIPT" in os.environ: env_vars["shell_script"] = os.environ["FREQTRADE_SHELL_SCRIPT"] return env_vars def merge_params(*param_dicts: dict) -> dict: """ 合并多个参数字典,后面的字典会覆盖前面的字典 Args: param_dicts: 要合并的参数字典 Returns: 合并后的参数字典 """ merged = {} for params in param_dicts: merged.update(params) return merged if __name__ == "__main__": import json # 1. 解析命令行参数 args = parse_command_line_args() # 2. 从环境变量加载配置 env_vars = get_env_vars() # 3. 从 Flow 配置文件加载配置(如果提供) flow_config = {} if args.flow_config: flow_config = load_config_file(args.flow_config) # 4. 从命令行参数提取参数 cli_args = {} if args.script_url: cli_args["script_url"] = args.script_url if args.shell_script: cli_args["shell_script"] = args.shell_script # 5. 合并参数(优先级:命令行参数 > Flow 配置文件 > 环境变量) merged_params = merge_params(env_vars, flow_config, cli_args) # 6. 验证参数是否提供 script_url = merged_params.get("script_url") shell_script = merged_params.get("shell_script") if not script_url and not shell_script: print("错误:必须提供 script_url 或 shell_script 参数!") print("使用方法:") print(" 1. 使用 script_url:") print(" python backtest.py --script-url https://example.com/freqtrade_backtest.sh") print(" 2. 使用 shell_script:") print(" python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy'") print(" 3. 使用环境变量:") print(" export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh && python backtest.py") print(" 或 export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy' && python backtest.py") print(" 4. 使用配置文件:") print(" python backtest.py --flow-config flow_config.json") exit(1) # 7. 执行 Flow result = start( script_url=script_url, shell_script=shell_script ) print("完成!") print(f"命令: {result['command']}") print(f"退出码: {result['exit_code']}") print(f"标准输出: {result['stdout'][:500]}...") # 只显示前500个字符 if result['stderr']: print(f"标准错误: {result['stderr']}")