243 lines
7.5 KiB
Python
243 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Freqtrade 回测 Prefect Flow
|
||
|
||
支持从 URL 下载并执行 shell 脚本,或直接提供脚本内容。
|
||
|
||
参数优先级(从高到低):
|
||
1. shell_script(直接提供脚本内容)
|
||
2. script_url(从 URL 下载脚本)
|
||
|
||
使用示例:
|
||
|
||
1. 使用 script_url 参数:
|
||
python backtest.py --script-url https://example.com/freqtrade_backtest.sh
|
||
|
||
2. 使用 shell_script 参数:
|
||
python backtest.py --shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
|
||
|
||
3. 使用环境变量(script_url):
|
||
export FREQTRADE_SCRIPT_URL=https://example.com/freqtrade_backtest.sh
|
||
python backtest.py
|
||
|
||
4. 使用环境变量(shell_script):
|
||
export FREQTRADE_SHELL_SCRIPT='freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
|
||
python backtest.py
|
||
|
||
5. 使用配置文件(script_url):
|
||
python backtest.py --flow-config flow_config.json
|
||
|
||
6. 使用配置文件(shell_script):
|
||
python backtest.py --flow-config flow_config_with_script.json
|
||
|
||
配置文件示例:
|
||
|
||
flow_config.json(使用 script_url):
|
||
{
|
||
"script_url": "https://example.com/freqtrade_backtest.sh"
|
||
}
|
||
|
||
flow_config_with_script.json(使用 shell_script):
|
||
{
|
||
"shell_script": "freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data"
|
||
}
|
||
|
||
7. 混合使用(shell_script 优先级更高):
|
||
python backtest.py --script-url https://example.com/freqtrade_backtest.sh \
|
||
--shell-script 'freqtrade backtesting --config config.json --strategy MyStrategy --data-dir user_data/data'
|
||
"""
|
||
|
||
import os
|
||
import argparse
|
||
import requests
|
||
from typing import Optional
|
||
from prefect import flow, task
|
||
from prefect_shell import shell_run_command
|
||
|
||
def download_script_from_url(url: str, save_path: str = None) -> str:
|
||
"""
|
||
从指定 URL 下载 shell 脚本
|
||
|
||
Args:
|
||
url: 脚本的 URL 地址
|
||
save_path: 保存脚本的路径,默认生成临时文件
|
||
|
||
Returns:
|
||
下载的脚本文件路径
|
||
"""
|
||
if not save_path:
|
||
# 生成临时文件名
|
||
save_path = f"freqtrade_backtest_script_{os.getpid()}.sh"
|
||
|
||
print(f"正在从 URL 下载脚本: {url}")
|
||
|
||
# 发送请求下载脚本
|
||
response = requests.get(url, timeout=30)
|
||
response.raise_for_status() # 检查请求是否成功
|
||
|
||
# 保存脚本到文件
|
||
with open(save_path, 'w') as f:
|
||
f.write(response.text)
|
||
|
||
# 赋予执行权限
|
||
os.chmod(save_path, 0o755)
|
||
|
||
print(f"脚本下载完成,保存到: {save_path}")
|
||
return save_path
|
||
|
||
|
||
@task
|
||
async def run_script(
|
||
script_url: str = None,
|
||
shell_script: str = None
|
||
) -> dict:
|
||
"""
|
||
执行脚本
|
||
|
||
Args:
|
||
script_url: 用于下载脚本的 URL 地址
|
||
shell_script: 直接提供脚本内容(优先级高于 script_url)
|
||
|
||
Returns:
|
||
包含脚本执行结果的字典
|
||
"""
|
||
print(f"-- script_url: {script_url}")
|
||
print(f"-- shell_script: {shell_script}")
|
||
script_path = None
|
||
try:
|
||
# 确定脚本来源,shell_script 优先级更高
|
||
if shell_script:
|
||
print("使用直接提供的脚本内容")
|
||
# 生成临时文件名,使用绝对路径
|
||
script_path = os.path.abspath(f"freqtrade_backtest_script_{os.getpid()}.sh")
|
||
# 保存脚本内容到文件
|
||
with open(script_path, 'w') as f:
|
||
f.write(shell_script)
|
||
# 赋予执行权限
|
||
os.chmod(script_path, 0o755)
|
||
elif script_url:
|
||
# 从 URL 下载脚本,确保返回绝对路径
|
||
script_path = download_script_from_url(script_url)
|
||
script_path = os.path.abspath(script_path)
|
||
else:
|
||
raise ValueError("必须提供 script_url 或 shell_script 参数")
|
||
|
||
# 构建脚本执行命令,使用绝对路径
|
||
cmd = [script_path]
|
||
|
||
try:
|
||
print(f"正在执行脚本: {script_path}")
|
||
# 执行命令,添加实时输出支持
|
||
result_lines = await shell_run_command(
|
||
command=" ".join(cmd),
|
||
return_all=True,
|
||
stream_level=20 # 使用 INFO 级别输出,确保实时输出效果
|
||
)
|
||
|
||
# 将结果列表转换为字符串
|
||
stdout = "\n".join(result_lines) if isinstance(result_lines, list) else result_lines
|
||
|
||
return {
|
||
"command": " ".join(cmd),
|
||
"stdout": stdout,
|
||
"stderr": "", # shell_run_command 将 stdout 和 stderr 合并输出
|
||
"exit_code": 0 # 执行成功
|
||
}
|
||
except RuntimeError as e:
|
||
# 解析错误信息,提取退出码和错误内容
|
||
error_msg = str(e)
|
||
stderr = error_msg
|
||
exit_code = 1 # 默认退出码
|
||
|
||
# 尝试从错误信息中提取退出码
|
||
if "exit code" in error_msg:
|
||
try:
|
||
exit_code = int(error_msg.split("exit code")[1].split(":")[0].strip())
|
||
except (IndexError, ValueError):
|
||
pass
|
||
|
||
# 任务执行失败,抛出异常
|
||
raise RuntimeError(f"脚本执行失败: {error_msg}") from e
|
||
finally:
|
||
# 清理临时脚本文件
|
||
if script_path and os.path.exists(script_path):
|
||
try:
|
||
os.remove(script_path)
|
||
print(f"已清理临时脚本文件: {script_path}")
|
||
except Exception as e:
|
||
print(f"清理临时脚本文件失败: {e}")
|
||
|
||
@flow(name="run-script", log_prints=True)
|
||
async def start(
|
||
script_url: Optional[str] = None,
|
||
shell_script: Optional[str] = None
|
||
) -> dict:
|
||
"""
|
||
Prefect Flow 用于执行脚本
|
||
|
||
Args:
|
||
script_url: 用于下载脚本的 URL 地址
|
||
shell_script: 直接提供脚本内容(优先级高于 script_url)
|
||
|
||
Returns:
|
||
包含脚本执行结果的字典
|
||
"""
|
||
print(f"script_url: {script_url}")
|
||
print(f"shell_script: {shell_script}")
|
||
return await run_script(
|
||
script_url=script_url,
|
||
shell_script=shell_script
|
||
)
|
||
|
||
def parse_command_line_args() -> argparse.Namespace:
|
||
"""
|
||
解析命令行参数
|
||
|
||
Returns:
|
||
命令行参数命名空间
|
||
"""
|
||
parser = argparse.ArgumentParser(description="脚本执行 Prefect Flow")
|
||
|
||
# 主参数
|
||
parser.add_argument("--script-url", type=str, help="用于下载脚本的 URL 地址")
|
||
parser.add_argument("--shell-script", type=str, help="直接提供脚本内容")
|
||
|
||
# 配置文件参数
|
||
parser.add_argument("--flow-config", type=str, help="Flow 配置文件路径(仅 JSON 格式)")
|
||
|
||
return parser.parse_args()
|
||
|
||
|
||
def get_env_vars() -> dict:
|
||
"""
|
||
从环境变量加载配置
|
||
|
||
Returns:
|
||
包含 script_url 和 shell_script 的环境变量配置字典
|
||
"""
|
||
env_vars = {}
|
||
|
||
# 读取环境变量
|
||
if "FREQTRADE_SCRIPT_URL" in os.environ:
|
||
env_vars["script_url"] = os.environ["FREQTRADE_SCRIPT_URL"]
|
||
|
||
if "FREQTRADE_SHELL_SCRIPT" in os.environ:
|
||
env_vars["shell_script"] = os.environ["FREQTRADE_SHELL_SCRIPT"]
|
||
|
||
return env_vars
|
||
|
||
|
||
def merge_params(*param_dicts: dict) -> dict:
|
||
"""
|
||
合并多个参数字典,后面的字典会覆盖前面的字典
|
||
|
||
Args:
|
||
param_dicts: 要合并的参数字典
|
||
|
||
Returns:
|
||
合并后的参数字典
|
||
"""
|
||
merged = {}
|
||
for params in param_dicts:
|
||
merged.update(params)
|
||
return merged |