添加脚本

This commit is contained in:
yhydev
2025-12-12 16:13:15 +08:00
commit 71c531b1fd

294
run_script.py Normal file
View File

@@ -0,0 +1,294 @@
#!/usr/bin/env python3
"""
Freqtrade 回测 Prefect Flow
支持从 URL 下载并执行 shell 脚本,或直接提供脚本内容。
参数优先级(从高到低):
1. shell_script直接提供脚本内容
2. script_url从 URL 下载脚本)
使用示例:
1. 使用 script_url 参数:
python backtest.py --script-url https://example.com/freqtrade_backtest.sh
2. 使用 shell_script 参数:
python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
3. 使用环境变量script_url
export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh
python backtest.py
4. 使用环境变量shell_script
export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
python backtest.py
5. 使用配置文件script_url
python backtest.py --flow-config flow_config.json
6. 使用配置文件shell_script
python backtest.py --flow-config flow_config_with_script.json
配置文件示例:
flow_config.json使用 script_url
{
"script_url": "https://example.com/freqtrade_backtest.sh"
}
flow_config_with_script.json使用 shell_script
{
"shell_script": "freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data"
}
7. 混合使用shell_script 优先级更高):
python backtest.py --script-url https://example.com/freqtrade_backtest.sh \
--shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
"""
import os
import argparse
import requests
from typing import Optional
from prefect import flow, task
from prefect_shell import shell_run_command
def download_script_from_url(url: str, save_path: str = None) -> str:
"""
从指定 URL 下载 shell 脚本
Args:
url: 脚本的 URL 地址
save_path: 保存脚本的路径,默认生成临时文件
Returns:
下载的脚本文件路径
"""
if not save_path:
# 生成临时文件名
save_path = f"freqtrade_backtest_script_{os.getpid()}.sh"
print(f"正在从 URL 下载脚本: {url}")
# 发送请求下载脚本
response = requests.get(url, timeout=30)
response.raise_for_status() # 检查请求是否成功
# 保存脚本到文件
with open(save_path, 'w') as f:
f.write(response.text)
# 赋予执行权限
os.chmod(save_path, 0o755)
print(f"脚本下载完成,保存到: {save_path}")
return save_path
@task
async def run_script(
script_url: str = None,
shell_script: str = None
) -> dict:
"""
执行脚本
Args:
script_url: 用于下载脚本的 URL 地址
shell_script: 直接提供脚本内容(优先级高于 script_url
Returns:
包含脚本执行结果的字典
"""
script_path = None
try:
# 确定脚本来源shell_script 优先级更高
if shell_script:
print("使用直接提供的脚本内容")
# 生成临时文件名,使用绝对路径
script_path = os.path.abspath(f"freqtrade_backtest_script_{os.getpid()}.sh")
# 保存脚本内容到文件
with open(script_path, 'w') as f:
f.write(shell_script)
# 赋予执行权限
os.chmod(script_path, 0o755)
elif script_url:
# 从 URL 下载脚本,确保返回绝对路径
script_path = download_script_from_url(script_url)
script_path = os.path.abspath(script_path)
else:
raise ValueError("必须提供 script_url 或 shell_script 参数")
# 构建脚本执行命令,使用绝对路径
cmd = [script_path]
try:
# 执行命令,添加实时输出支持
result_lines = await shell_run_command(
command=" ".join(cmd),
return_all=True,
stream_level=10 # 使用 DEBUG 级别输出,实现实时输出效果
)
# 将结果列表转换为字符串
stdout = "\n".join(result_lines) if isinstance(result_lines, list) else result_lines
return {
"command": " ".join(cmd),
"stdout": stdout,
"stderr": "", # shell_run_command 将 stdout 和 stderr 合并输出
"exit_code": 0 # 执行成功
}
except RuntimeError as e:
# 解析错误信息,提取退出码和错误内容
error_msg = str(e)
stderr = error_msg
exit_code = 1 # 默认退出码
# 尝试从错误信息中提取退出码
if "exit code" in error_msg:
try:
exit_code = int(error_msg.split("exit code")[1].split(":")[0].strip())
except (IndexError, ValueError):
pass
# 任务执行失败,抛出异常
raise RuntimeError(f"脚本执行失败: {error_msg}") from e
finally:
# 清理临时脚本文件
if script_path and os.path.exists(script_path):
try:
os.remove(script_path)
print(f"已清理临时脚本文件: {script_path}")
except Exception as e:
print(f"清理临时脚本文件失败: {e}")
@flow(name="Freqtrade Backtesting Flow")
def freqtrade_backtest_flow(
script_url: Optional[str] = None,
shell_script: Optional[str] = None
) -> dict:
"""
Prefect Flow 用于执行 Freqtrade 回测脚本
Args:
script_url: 用于下载回测脚本的 URL 地址
shell_script: 直接提供回测脚本内容(优先级高于 script_url
Returns:
包含回测结果的字典
"""
return run_script(
script_url=script_url,
shell_script=shell_script
)
def parse_command_line_args() -> argparse.Namespace:
"""
解析命令行参数
Returns:
命令行参数命名空间
"""
parser = argparse.ArgumentParser(description="Freqtrade 回测 Prefect Flow")
# 主参数
parser.add_argument("--script-url", type=str, help="用于下载脚本的 URL 地址")
parser.add_argument("--shell-script", type=str, help="直接提供脚本内容")
# 配置文件参数
parser.add_argument("--flow-config", type=str, help="Flow 配置文件路径(仅 JSON 格式)")
return parser.parse_args()
def get_env_vars() -> dict:
"""
从环境变量加载配置
Returns:
包含 script_url 和 shell_script 的环境变量配置字典
"""
env_vars = {}
# 读取环境变量
if "FREQTRADE_SCRIPT_URL" in os.environ:
env_vars["script_url"] = os.environ["FREQTRADE_SCRIPT_URL"]
if "FREQTRADE_SHELL_SCRIPT" in os.environ:
env_vars["shell_script"] = os.environ["FREQTRADE_SHELL_SCRIPT"]
return env_vars
def merge_params(*param_dicts: dict) -> dict:
"""
合并多个参数字典,后面的字典会覆盖前面的字典
Args:
param_dicts: 要合并的参数字典
Returns:
合并后的参数字典
"""
merged = {}
for params in param_dicts:
merged.update(params)
return merged
if __name__ == "__main__":
import json
# 1. 解析命令行参数
args = parse_command_line_args()
# 2. 从环境变量加载配置
env_vars = get_env_vars()
# 3. 从 Flow 配置文件加载配置(如果提供)
flow_config = {}
if args.flow_config:
flow_config = load_config_file(args.flow_config)
# 4. 从命令行参数提取参数
cli_args = {}
if args.script_url:
cli_args["script_url"] = args.script_url
if args.shell_script:
cli_args["shell_script"] = args.shell_script
# 5. 合并参数(优先级:命令行参数 > Flow 配置文件 > 环境变量)
merged_params = merge_params(env_vars, flow_config, cli_args)
# 6. 验证参数是否提供
script_url = merged_params.get("script_url")
shell_script = merged_params.get("shell_script")
if not script_url and not shell_script:
print("错误:必须提供 script_url 或 shell_script 参数!")
print("使用方法:")
print(" 1. 使用 script_url")
print(" python backtest.py --script-url https://example.com/freqtrade_backtest.sh")
print(" 2. 使用 shell_script")
print(" python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy'")
print(" 3. 使用环境变量:")
print(" export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh && python backtest.py")
print(" 或 export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy' && python backtest.py")
print(" 4. 使用配置文件:")
print(" python backtest.py --flow-config flow_config.json")
exit(1)
# 7. 执行 Flow
result = freqtrade_backtest_flow(
script_url=script_url,
shell_script=shell_script
)
print("完成!")
print(f"命令: {result['command']}")
print(f"退出码: {result['exit_code']}")
print(f"标准输出: {result['stdout'][:500]}...") # 只显示前500个字符
if result['stderr']:
print(f"标准错误: {result['stderr']}")