commit 71c531b1fdb63cbf22ff1b85cb971ecbc9a7a5d1 Author: yhydev Date: Fri Dec 12 16:13:15 2025 +0800 添加脚本 diff --git a/run_script.py b/run_script.py new file mode 100644 index 0000000..8a36b93 --- /dev/null +++ b/run_script.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 +""" +Freqtrade 回测 Prefect Flow + +支持从 URL 下载并执行 shell 脚本,或直接提供脚本内容。 + +参数优先级(从高到低): +1. shell_script(直接提供脚本内容) +2. script_url(从 URL 下载脚本) + +使用示例: + +1. 使用 script_url 参数: + python backtest.py --script-url https://example.com/freqtrade_backtest.sh + +2. 使用 shell_script 参数: + python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' + +3. 使用环境变量(script_url): + export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh + python backtest.py + +4. 使用环境变量(shell_script): + export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' + python backtest.py + +5. 使用配置文件(script_url): + python backtest.py --flow-config flow_config.json + +6. 使用配置文件(shell_script): + python backtest.py --flow-config flow_config_with_script.json + +配置文件示例: + +flow_config.json(使用 script_url): +{ + "script_url": "https://example.com/freqtrade_backtest.sh" +} + +flow_config_with_script.json(使用 shell_script): +{ + "shell_script": "freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data" +} + +7. 混合使用(shell_script 优先级更高): + python backtest.py --script-url https://example.com/freqtrade_backtest.sh \ + --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data' +""" + +import os +import argparse +import requests +from typing import Optional +from prefect import flow, task +from prefect_shell import shell_run_command + +def download_script_from_url(url: str, save_path: str = None) -> str: + """ + 从指定 URL 下载 shell 脚本 + + Args: + url: 脚本的 URL 地址 + save_path: 保存脚本的路径,默认生成临时文件 + + Returns: + 下载的脚本文件路径 + """ + if not save_path: + # 生成临时文件名 + save_path = f"freqtrade_backtest_script_{os.getpid()}.sh" + + print(f"正在从 URL 下载脚本: {url}") + + # 发送请求下载脚本 + response = requests.get(url, timeout=30) + response.raise_for_status() # 检查请求是否成功 + + # 保存脚本到文件 + with open(save_path, 'w') as f: + f.write(response.text) + + # 赋予执行权限 + os.chmod(save_path, 0o755) + + print(f"脚本下载完成,保存到: {save_path}") + return save_path + + +@task +async def run_script( + script_url: str = None, + shell_script: str = None +) -> dict: + """ + 执行脚本 + + Args: + script_url: 用于下载脚本的 URL 地址 + shell_script: 直接提供脚本内容(优先级高于 script_url) + + Returns: + 包含脚本执行结果的字典 + """ + script_path = None + try: + # 确定脚本来源,shell_script 优先级更高 + if shell_script: + print("使用直接提供的脚本内容") + # 生成临时文件名,使用绝对路径 + script_path = os.path.abspath(f"freqtrade_backtest_script_{os.getpid()}.sh") + # 保存脚本内容到文件 + with open(script_path, 'w') as f: + f.write(shell_script) + # 赋予执行权限 + os.chmod(script_path, 0o755) + elif script_url: + # 从 URL 下载脚本,确保返回绝对路径 + script_path = download_script_from_url(script_url) + script_path = os.path.abspath(script_path) + else: + raise ValueError("必须提供 script_url 或 shell_script 参数") + + # 构建脚本执行命令,使用绝对路径 + cmd = [script_path] + + try: + # 执行命令,添加实时输出支持 + result_lines = await shell_run_command( + command=" ".join(cmd), + return_all=True, + stream_level=10 # 使用 DEBUG 级别输出,实现实时输出效果 + ) + + # 将结果列表转换为字符串 + stdout = "\n".join(result_lines) if isinstance(result_lines, list) else result_lines + + return { + "command": " ".join(cmd), + "stdout": stdout, + "stderr": "", # shell_run_command 将 stdout 和 stderr 合并输出 + "exit_code": 0 # 执行成功 + } + except RuntimeError as e: + # 解析错误信息,提取退出码和错误内容 + error_msg = str(e) + stderr = error_msg + exit_code = 1 # 默认退出码 + + # 尝试从错误信息中提取退出码 + if "exit code" in error_msg: + try: + exit_code = int(error_msg.split("exit code")[1].split(":")[0].strip()) + except (IndexError, ValueError): + pass + + # 任务执行失败,抛出异常 + raise RuntimeError(f"脚本执行失败: {error_msg}") from e + finally: + # 清理临时脚本文件 + if script_path and os.path.exists(script_path): + try: + os.remove(script_path) + print(f"已清理临时脚本文件: {script_path}") + except Exception as e: + print(f"清理临时脚本文件失败: {e}") + +@flow(name="Freqtrade Backtesting Flow") +def freqtrade_backtest_flow( + script_url: Optional[str] = None, + shell_script: Optional[str] = None +) -> dict: + """ + Prefect Flow 用于执行 Freqtrade 回测脚本 + + Args: + script_url: 用于下载回测脚本的 URL 地址 + shell_script: 直接提供回测脚本内容(优先级高于 script_url) + + Returns: + 包含回测结果的字典 + """ + return run_script( + script_url=script_url, + shell_script=shell_script + ) + +def parse_command_line_args() -> argparse.Namespace: + """ + 解析命令行参数 + + Returns: + 命令行参数命名空间 + """ + parser = argparse.ArgumentParser(description="Freqtrade 回测 Prefect Flow") + + # 主参数 + parser.add_argument("--script-url", type=str, help="用于下载脚本的 URL 地址") + parser.add_argument("--shell-script", type=str, help="直接提供脚本内容") + + # 配置文件参数 + parser.add_argument("--flow-config", type=str, help="Flow 配置文件路径(仅 JSON 格式)") + + return parser.parse_args() + + +def get_env_vars() -> dict: + """ + 从环境变量加载配置 + + Returns: + 包含 script_url 和 shell_script 的环境变量配置字典 + """ + env_vars = {} + + # 读取环境变量 + if "FREQTRADE_SCRIPT_URL" in os.environ: + env_vars["script_url"] = os.environ["FREQTRADE_SCRIPT_URL"] + + if "FREQTRADE_SHELL_SCRIPT" in os.environ: + env_vars["shell_script"] = os.environ["FREQTRADE_SHELL_SCRIPT"] + + return env_vars + + +def merge_params(*param_dicts: dict) -> dict: + """ + 合并多个参数字典,后面的字典会覆盖前面的字典 + + Args: + param_dicts: 要合并的参数字典 + + Returns: + 合并后的参数字典 + """ + merged = {} + for params in param_dicts: + merged.update(params) + return merged + + +if __name__ == "__main__": + import json + + # 1. 解析命令行参数 + args = parse_command_line_args() + + # 2. 从环境变量加载配置 + env_vars = get_env_vars() + + # 3. 从 Flow 配置文件加载配置(如果提供) + flow_config = {} + if args.flow_config: + flow_config = load_config_file(args.flow_config) + + # 4. 从命令行参数提取参数 + cli_args = {} + if args.script_url: + cli_args["script_url"] = args.script_url + if args.shell_script: + cli_args["shell_script"] = args.shell_script + + # 5. 合并参数(优先级:命令行参数 > Flow 配置文件 > 环境变量) + merged_params = merge_params(env_vars, flow_config, cli_args) + + # 6. 验证参数是否提供 + script_url = merged_params.get("script_url") + shell_script = merged_params.get("shell_script") + + if not script_url and not shell_script: + print("错误:必须提供 script_url 或 shell_script 参数!") + print("使用方法:") + print(" 1. 使用 script_url:") + print(" python backtest.py --script-url https://example.com/freqtrade_backtest.sh") + print(" 2. 使用 shell_script:") + print(" python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy'") + print(" 3. 使用环境变量:") + print(" export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh && python backtest.py") + print(" 或 export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy' && python backtest.py") + print(" 4. 使用配置文件:") + print(" python backtest.py --flow-config flow_config.json") + exit(1) + + # 7. 执行 Flow + result = freqtrade_backtest_flow( + script_url=script_url, + shell_script=shell_script + ) + + print("完成!") + print(f"命令: {result['command']}") + print(f"退出码: {result['exit_code']}") + print(f"标准输出: {result['stdout'][:500]}...") # 只显示前500个字符 + if result['stderr']: + print(f"标准错误: {result['stderr']}")