Files
prefect-flows/run_script.py
2025-12-13 17:17:55 +08:00

243 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Freqtrade 回测 Prefect Flow
支持从 URL 下载并执行 shell 脚本,或直接提供脚本内容。
参数优先级(从高到低):
1. shell_script直接提供脚本内容
2. script_url从 URL 下载脚本)
使用示例:
1. 使用 script_url 参数:
python backtest.py --script-url https://example.com/freqtrade_backtest.sh
2. 使用 shell_script 参数:
python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
3. 使用环境变量script_url
export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh
python backtest.py
4. 使用环境变量shell_script
export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
python backtest.py
5. 使用配置文件script_url
python backtest.py --flow-config flow_config.json
6. 使用配置文件shell_script
python backtest.py --flow-config flow_config_with_script.json
配置文件示例:
flow_config.json使用 script_url
{
"script_url": "https://example.com/freqtrade_backtest.sh"
}
flow_config_with_script.json使用 shell_script
{
"shell_script": "freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data"
}
7. 混合使用shell_script 优先级更高):
python backtest.py --script-url https://example.com/freqtrade_backtest.sh \
--shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
"""
import os
import argparse
import requests
from typing import Optional
from prefect import flow, task
from prefect_shell import shell_run_command
def download_script_from_url(url: str, save_path: str = None) -> str:
"""
从指定 URL 下载 shell 脚本
Args:
url: 脚本的 URL 地址
save_path: 保存脚本的路径,默认生成临时文件
Returns:
下载的脚本文件路径
"""
if not save_path:
# 生成临时文件名
save_path = f"freqtrade_backtest_script_{os.getpid()}.sh"
print(f"正在从 URL 下载脚本: {url}")
# 发送请求下载脚本
response = requests.get(url, timeout=30)
response.raise_for_status() # 检查请求是否成功
# 保存脚本到文件
with open(save_path, 'w') as f:
f.write(response.text)
# 赋予执行权限
os.chmod(save_path, 0o755)
print(f"脚本下载完成,保存到: {save_path}")
return save_path
@task
async def run_script(
script_url: str = None,
shell_script: str = None
) -> dict:
"""
执行脚本
Args:
script_url: 用于下载脚本的 URL 地址
shell_script: 直接提供脚本内容(优先级高于 script_url
Returns:
包含脚本执行结果的字典
"""
print(f"-- script_url: {script_url}")
print(f"-- shell_script: {shell_script}")
script_path = None
try:
# 确定脚本来源shell_script 优先级更高
if shell_script:
print("使用直接提供的脚本内容")
# 生成临时文件名,使用绝对路径
script_path = os.path.abspath(f"freqtrade_backtest_script_{os.getpid()}.sh")
# 保存脚本内容到文件
with open(script_path, 'w') as f:
f.write(shell_script)
# 赋予执行权限
os.chmod(script_path, 0o755)
elif script_url:
# 从 URL 下载脚本,确保返回绝对路径
script_path = download_script_from_url(script_url)
script_path = os.path.abspath(script_path)
else:
raise ValueError("必须提供 script_url 或 shell_script 参数")
# 构建脚本执行命令,使用绝对路径
cmd = [script_path]
try:
print(f"正在执行脚本: {script_path}")
# 执行命令,添加实时输出支持
result_lines = await shell_run_command(
command=" ".join(cmd),
return_all=True,
stream_level=20 # 使用 INFO 级别输出,确保实时输出效果
)
# 将结果列表转换为字符串
stdout = "\n".join(result_lines) if isinstance(result_lines, list) else result_lines
return {
"command": " ".join(cmd),
"stdout": stdout,
"stderr": "", # shell_run_command 将 stdout 和 stderr 合并输出
"exit_code": 0 # 执行成功
}
except RuntimeError as e:
# 解析错误信息,提取退出码和错误内容
error_msg = str(e)
stderr = error_msg
exit_code = 1 # 默认退出码
# 尝试从错误信息中提取退出码
if "exit code" in error_msg:
try:
exit_code = int(error_msg.split("exit code")[1].split(":")[0].strip())
except (IndexError, ValueError):
pass
# 任务执行失败,抛出异常
raise RuntimeError(f"脚本执行失败: {error_msg}") from e
finally:
# 清理临时脚本文件
if script_path and os.path.exists(script_path):
try:
os.remove(script_path)
print(f"已清理临时脚本文件: {script_path}")
except Exception as e:
print(f"清理临时脚本文件失败: {e}")
@flow(name="run-script", log_prints=True)
async def start(
script_url: Optional[str] = None,
shell_script: Optional[str] = None
) -> dict:
"""
Prefect Flow 用于执行脚本
Args:
script_url: 用于下载脚本的 URL 地址
shell_script: 直接提供脚本内容(优先级高于 script_url
Returns:
包含脚本执行结果的字典
"""
print(f"script_url: {script_url}")
print(f"shell_script: {shell_script}")
return await run_script(
script_url=script_url,
shell_script=shell_script
)
def parse_command_line_args() -> argparse.Namespace:
"""
解析命令行参数
Returns:
命令行参数命名空间
"""
parser = argparse.ArgumentParser(description="脚本执行 Prefect Flow")
# 主参数
parser.add_argument("--script-url", type=str, help="用于下载脚本的 URL 地址")
parser.add_argument("--shell-script", type=str, help="直接提供脚本内容")
# 配置文件参数
parser.add_argument("--flow-config", type=str, help="Flow 配置文件路径(仅 JSON 格式)")
return parser.parse_args()
def get_env_vars() -> dict:
"""
从环境变量加载配置
Returns:
包含 script_url 和 shell_script 的环境变量配置字典
"""
env_vars = {}
# 读取环境变量
if "FREQTRADE_SCRIPT_URL" in os.environ:
env_vars["script_url"] = os.environ["FREQTRADE_SCRIPT_URL"]
if "FREQTRADE_SHELL_SCRIPT" in os.environ:
env_vars["shell_script"] = os.environ["FREQTRADE_SHELL_SCRIPT"]
return env_vars
def merge_params(*param_dicts: dict) -> dict:
"""
合并多个参数字典,后面的字典会覆盖前面的字典
Args:
param_dicts: 要合并的参数字典
Returns:
合并后的参数字典
"""
merged = {}
for params in param_dicts:
merged.update(params)
return merged