支持数据上传

This commit is contained in:
yhydev
2026-01-06 17:43:35 +08:00
parent 3d03943b62
commit d361f3a1c1

View File

@@ -185,13 +185,13 @@ def split_pairs(pairs: List[str], jobs: int = None, max_pairs: int = None) -> Li
return chunks return chunks
def submit_job(timeframe: str, timerange: str, pairs_chunk: List[str], def submit_job(name:str, timeframe: str, timerange: str, pairs_chunk: List[str],
strategy: str, strategy: str,
start_datetime: str, start_datetime: str,
job_id: int, job_id: int,
working_url: str) -> None: working_url: str) -> None:
"""Submit job function (placeholder implementation)""" """Submit job function (placeholder implementation)"""
script = generate_shell_script(timeframe, timerange, script = generate_shell_script(name, timeframe, timerange,
pairs_chunk, strategy, start_datetime, job_id, working_url) pairs_chunk, strategy, start_datetime, job_id, working_url)
requests.post( requests.post(
"https://prefect.oopsapi.com/api/deployments/dac4e321-cc60-4ca2-8aba-ee389d395ae9/create_flow_run", "https://prefect.oopsapi.com/api/deployments/dac4e321-cc60-4ca2-8aba-ee389d395ae9/create_flow_run",
@@ -202,7 +202,7 @@ def submit_job(timeframe: str, timerange: str, pairs_chunk: List[str],
} }
) )
def generate_shell_script(timeframe: str, timerange: str, pairs_chunk: List[str], def generate_shell_script(name:str, timeframe: str, timerange: str, pairs_chunk: List[str],
strategy: str, strategy: str,
start_datetime: str, start_datetime: str,
job_id: int, job_id: int,
@@ -211,6 +211,43 @@ def generate_shell_script(timeframe: str, timerange: str, pairs_chunk: List[str]
Generate a shell script for the given timeframe, timerange, and pairs chunk. Generate a shell script for the given timeframe, timerange, and pairs chunk.
""" """
BUCKET_NAME = os.environ.get("BUCKET_NAME", "backresult") BUCKET_NAME = os.environ.get("BUCKET_NAME", "backresult")
upload_data_script = """
function upload_data(){
path=$1
filename=$(basename "$path")
label=$2
body=$(cat <<EOF
{
"object": {
"filename": "$filename",
"path": "$path",
"label": "$label",
"content": $(cat "$path")
}
}
EOF
)
curl $HASURA_ENDPOINT/api/rest/json_files \
-H 'content-type: application/json' \
-H "x-hasura-access-key: $HASURA_ACCESS_KEY" \
-X POST \
-d "$body"
}
function unzip_data_and_upload(){
data_dir=$1
name_keyword=$2
label=$3
tmp_dir=`mktemp -d`
find "$data_dir" -name "*$name_keyword*.zip" -exec unzip -o {} -d "$tmp_dir" \\;
files=$(find "$tmp_dir" -name "*.json" | grep json|grep -v config)
for file in $files; do
upload_data "$file" "$label"
done
}
unzip_data_and_upload user_data/backtest_results/%s job_%s %s
""" % (start_datetime, job_id, name)
return f""" return f"""
#!/bin/bash #!/bin/bash
cd $WORKING_DIR cd $WORKING_DIR
@@ -232,7 +269,7 @@ if [ $? -ne 0 ]; then
exit 1 exit 1
fi fi
mc cp user_data/backtest_results/{start_datetime}/job_{job_id}* oss/{BUCKET_NAME}/{start_datetime}/ mc cp user_data/backtest_results/{start_datetime}/job_{job_id}* oss/{BUCKET_NAME}/{start_datetime}/
""" """ + upload_data_script
def main(): def main():
dotenv.load_dotenv() dotenv.load_dotenv()
@@ -263,12 +300,12 @@ def main():
# Split pairs into chunks # Split pairs into chunks
pair_chunks = split_pairs(filtered_pairs, args.jobs, args.max_pairs) pair_chunks = split_pairs(filtered_pairs, args.jobs, args.max_pairs)
dt = args.name + "_" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") dt = args.name + "_" + datetime.datetime.now().strftime("%Y%m%d%H%M%S%s")
# Submit jobs # Submit jobs
for i, chunk in enumerate(pair_chunks): for i, chunk in enumerate(pair_chunks):
if chunk: # 检查分块是否为空 if chunk: # 检查分块是否为空
submit_job(args.timeframe, args.timerange, chunk, submit_job(args.name, args.timeframe, args.timerange, chunk,
args.strategy, dt, i, working_url) args.strategy, dt, i, working_url)
if __name__ == "__main__": if __name__ == "__main__":