This commit is contained in:
yhydev
2025-12-13 00:16:41 +08:00
parent ccc5b3bb03
commit 77a8f51e5d
3 changed files with 316 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

296
myscripts/working_tool.py Normal file
View File

@@ -0,0 +1,296 @@
#!/usr/bin/env python3
import os
import zipfile
import hashlib
import requests
import tempfile
import json
# 存储上次上传的信息
LAST_UPLOAD_FILE = '.last_upload.json'
def get_file_hash(file_path):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_working_files():
"""从环境变量WORKING_FILES获取要打包的文件列表"""
working_files_env = os.environ.get('WORKING_FILES', '')
working_dir = os.environ.get('WORKING_DIR', '.')
# 确保WORKING_DIR是绝对路径
if not os.path.isabs(working_dir):
working_dir = os.path.abspath(working_dir)
if not working_files_env:
return []
working_files = []
for item in working_files_env.split(','):
item = item.strip()
if not item:
continue
# 检查是否为相对路径,不能包含..
if os.path.isabs(item) or '..' in item:
raise ValueError(f"Invalid path '{item}': must be relative path without '..'")
# 构建完整路径
full_path = os.path.join(working_dir, item)
if os.path.isfile(full_path):
working_files.append(full_path)
elif os.path.isdir(full_path):
# 递归获取目录下的所有文件
for root, _, files in os.walk(full_path):
for file in files:
working_files.append(os.path.join(root, file))
return working_files
def check_files_modified(working_files):
"""检查工作空间文件是否有修改"""
# 计算当前文件的哈希值总和
current_hashes = {}
for file_path in working_files:
if os.path.exists(file_path):
current_hashes[file_path] = get_file_hash(file_path)
# 读取上次的哈希值
if os.path.exists(LAST_UPLOAD_FILE):
with open(LAST_UPLOAD_FILE, 'r') as f:
last_data = json.load(f)
# 比较哈希值
if last_data['hashes'] == current_hashes:
cached_url = last_data['download_url']
# 检查cached_url是否为JSON格式
if isinstance(cached_url, str) and cached_url.startswith('{') and cached_url.endswith('}'):
try:
# 如果是JSON解析出downloadLink
cached_data = json.loads(cached_url)
download_url = cached_data.get('downloadLink', cached_url)
return False, download_url
except json.JSONDecodeError:
# 如果解析失败使用原始URL
pass
return False, cached_url
return True, None
def upload_to_tmpfile(zip_path):
"""将ZIP文件上传到tmpfile.link并返回下载链接"""
try:
with open(zip_path, 'rb') as f:
response = requests.post(
'https://tmpfile.link/api/upload',
files={'file': f}
)
response.raise_for_status()
response_text = response.text.strip()
# 确保返回的是纯URL不是JSON
if response_text.startswith('{') and response_text.endswith('}'):
# 是JSON格式解析出downloadLink
data = json.loads(response_text)
download_url = data.get('downloadLink', '')
if download_url:
return download_url
else:
# 如果没有downloadLink返回原始文本
return response_text
else:
# 不是JSON直接返回文本
return response_text
except (requests.exceptions.RequestException, ValueError) as e:
raise Exception(f"Failed to upload to tmpfile.link: {str(e)}")
def download_and_extract(download_url, extract_dir='.', password=None):
"""从下载链接下载ZIP文件并解压到指定目录"""
try:
# 确保解压目录存在
os.makedirs(extract_dir, exist_ok=True)
# 下载ZIP文件到临时目录
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
zip_path = tmp.name
print(f"Downloading from {download_url}...")
response = requests.get(download_url, stream=True)
response.raise_for_status()
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Extracting to {extract_dir}...")
# 从环境变量获取密码,优先使用环境变量
env_password = os.environ.get('ZIP_PASSWORD')
if env_password:
password = env_password
# 优先使用Python内置的zipfile模块
try:
with zipfile.ZipFile(zip_path, 'r') as zipf:
if password:
zipf.extractall(extract_dir, pwd=password.encode())
else:
zipf.extractall(extract_dir)
except Exception as e:
# 如果内置模块失败尝试使用7zip如果可用
try:
import subprocess
subprocess.run(['7z', '--help'], check=True, capture_output=True)
# 使用7zip解压
cmd = ['7z', 'x', '-y', '-o' + extract_dir]
if password:
cmd.extend(['-p' + password])
cmd.append(zip_path)
subprocess.run(cmd, check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError):
# 如果7zip也不可用抛出原始错误
raise e
print(f"Download and extraction completed successfully.")
return True
except (requests.exceptions.RequestException, zipfile.BadZipFile, IOError, Exception) as e:
raise Exception(f"Failed to download and extract: {str(e)}")
finally:
# 清理临时文件
if 'zip_path' in locals() and os.path.exists(zip_path):
os.unlink(zip_path)
def upload_working_files(password=None):
"""将工作空间文件打包成ZIP并上传到tmpfile返回下载链接"""
# 从环境变量获取密码,优先使用环境变量
env_password = os.environ.get('ZIP_PASSWORD')
if env_password:
password = env_password
# 获取要打包的文件列表
working_files = get_working_files()
if not working_files:
return "No files to upload"
# 检查文件是否有修改
modified, last_url = check_files_modified(working_files)
if not modified and last_url:
return last_url
# 创建临时ZIP文件
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
zip_path = tmp.name
try:
# 获取WORKING_DIR
working_dir = os.environ.get('WORKING_DIR', '.')
if not os.path.isabs(working_dir):
working_dir = os.path.abspath(working_dir)
# 打包文件
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in working_files:
# 计算相对路径,保持目录结构
arcname = os.path.relpath(file_path, working_dir)
zipf.write(file_path, arcname)
# 如果提供了密码创建加密ZIP文件
if password:
# 直接创建加密ZIP文件替换原有的非加密文件
os.unlink(zip_path)
# 获取WORKING_DIR
working_dir = os.environ.get('WORKING_DIR', '.')
if not os.path.isabs(working_dir):
working_dir = os.path.abspath(working_dir)
# 使用内置的zipfile模块创建加密ZIP
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as zipf:
for file_path in working_files:
# 计算相对路径,保持目录结构
arcname = os.path.relpath(file_path, working_dir)
zipf.write(file_path, arcname)
# 设置密码用于解密
zipf.setpassword(password.encode())
# 上传到tmpfile
download_url = upload_to_tmpfile(zip_path)
# 计算当前文件的哈希值
current_hashes = {}
for file_path in working_files:
if os.path.exists(file_path):
current_hashes[file_path] = get_file_hash(file_path)
# 保存本次上传信息
with open(LAST_UPLOAD_FILE, 'w') as f:
json.dump({
'hashes': current_hashes,
'download_url': download_url,
'timestamp': os.path.getmtime(zip_path),
'encrypted': bool(password)
}, f)
return download_url
finally:
# 清理临时文件
if os.path.exists(zip_path):
os.unlink(zip_path)
# 清理加密临时文件
encrypted_zip_path = zip_path + '.encrypted.zip'
if os.path.exists(encrypted_zip_path):
os.unlink(encrypted_zip_path)
def main():
# 命令行入口点
import sys
import argparse
parser = argparse.ArgumentParser(description='Upload working files to tmpfile.link or download and extract files.')
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# 上传命令
upload_parser = subparsers.add_parser('upload', help='Upload working files')
upload_parser.add_argument('--password', '-p', help='Password for encrypting the ZIP file')
# 下载命令
download_parser = subparsers.add_parser('download', help='Download and extract files')
download_parser.add_argument('url', help='Download URL')
download_parser.add_argument('extract_dir', nargs='?', default='.', help='Extraction directory')
download_parser.add_argument('--password', '-p', help='Password for decrypting the ZIP file')
args = parser.parse_args()
if args.command == 'download':
# 下载模式
try:
download_and_extract(args.url, args.extract_dir, args.password)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
else:
# 上传模式(默认)
download_url = upload_working_files(args.password)
print(f"Download URL: {download_url}")
if __name__ == "__main__":
main()

10
pyproject.toml Normal file
View File

@@ -0,0 +1,10 @@
[project]
name = "myscripts"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = ["requests"]
[project.scripts]
working_tool = "myscripts.working_tool:main"