diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..505a3b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/myscripts/working_tool.py b/myscripts/working_tool.py new file mode 100644 index 0000000..bce5ecb --- /dev/null +++ b/myscripts/working_tool.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +import os +import zipfile +import hashlib +import requests +import tempfile +import json + +# 存储上次上传的信息 +LAST_UPLOAD_FILE = '.last_upload.json' + + +def get_file_hash(file_path): + """计算文件的MD5哈希值""" + hash_md5 = hashlib.md5() + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + +def get_working_files(): + """从环境变量WORKING_FILES获取要打包的文件列表""" + working_files_env = os.environ.get('WORKING_FILES', '') + working_dir = os.environ.get('WORKING_DIR', '.') + + # 确保WORKING_DIR是绝对路径 + if not os.path.isabs(working_dir): + working_dir = os.path.abspath(working_dir) + + if not working_files_env: + return [] + + working_files = [] + for item in working_files_env.split(','): + item = item.strip() + if not item: + continue + + # 检查是否为相对路径,不能包含.. + if os.path.isabs(item) or '..' in item: + raise ValueError(f"Invalid path '{item}': must be relative path without '..'") + + # 构建完整路径 + full_path = os.path.join(working_dir, item) + + if os.path.isfile(full_path): + working_files.append(full_path) + elif os.path.isdir(full_path): + # 递归获取目录下的所有文件 + for root, _, files in os.walk(full_path): + for file in files: + working_files.append(os.path.join(root, file)) + + return working_files + + +def check_files_modified(working_files): + """检查工作空间文件是否有修改""" + # 计算当前文件的哈希值总和 + current_hashes = {} + for file_path in working_files: + if os.path.exists(file_path): + current_hashes[file_path] = get_file_hash(file_path) + + # 读取上次的哈希值 + if os.path.exists(LAST_UPLOAD_FILE): + with open(LAST_UPLOAD_FILE, 'r') as f: + last_data = json.load(f) + + # 比较哈希值 + if last_data['hashes'] == current_hashes: + cached_url = last_data['download_url'] + + # 检查cached_url是否为JSON格式 + if isinstance(cached_url, str) and cached_url.startswith('{') and cached_url.endswith('}'): + try: + # 如果是JSON,解析出downloadLink + cached_data = json.loads(cached_url) + download_url = cached_data.get('downloadLink', cached_url) + return False, download_url + except json.JSONDecodeError: + # 如果解析失败,使用原始URL + pass + + return False, cached_url + + return True, None + + +def upload_to_tmpfile(zip_path): + """将ZIP文件上传到tmpfile.link并返回下载链接""" + try: + with open(zip_path, 'rb') as f: + response = requests.post( + 'https://tmpfile.link/api/upload', + files={'file': f} + ) + + response.raise_for_status() + response_text = response.text.strip() + + # 确保返回的是纯URL,不是JSON + if response_text.startswith('{') and response_text.endswith('}'): + # 是JSON格式,解析出downloadLink + data = json.loads(response_text) + download_url = data.get('downloadLink', '') + if download_url: + return download_url + else: + # 如果没有downloadLink,返回原始文本 + return response_text + else: + # 不是JSON,直接返回文本 + return response_text + except (requests.exceptions.RequestException, ValueError) as e: + raise Exception(f"Failed to upload to tmpfile.link: {str(e)}") + + +def download_and_extract(download_url, extract_dir='.', password=None): + """从下载链接下载ZIP文件并解压到指定目录""" + try: + # 确保解压目录存在 + os.makedirs(extract_dir, exist_ok=True) + + # 下载ZIP文件到临时目录 + with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + zip_path = tmp.name + + print(f"Downloading from {download_url}...") + response = requests.get(download_url, stream=True) + response.raise_for_status() + + with open(zip_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + print(f"Extracting to {extract_dir}...") + + # 从环境变量获取密码,优先使用环境变量 + env_password = os.environ.get('ZIP_PASSWORD') + if env_password: + password = env_password + + # 优先使用Python内置的zipfile模块 + try: + with zipfile.ZipFile(zip_path, 'r') as zipf: + if password: + zipf.extractall(extract_dir, pwd=password.encode()) + else: + zipf.extractall(extract_dir) + except Exception as e: + # 如果内置模块失败,尝试使用7zip(如果可用) + try: + import subprocess + subprocess.run(['7z', '--help'], check=True, capture_output=True) + # 使用7zip解压 + cmd = ['7z', 'x', '-y', '-o' + extract_dir] + if password: + cmd.extend(['-p' + password]) + cmd.append(zip_path) + + subprocess.run(cmd, check=True, capture_output=True) + except (subprocess.CalledProcessError, FileNotFoundError): + # 如果7zip也不可用,抛出原始错误 + raise e + + print(f"Download and extraction completed successfully.") + return True + except (requests.exceptions.RequestException, zipfile.BadZipFile, IOError, Exception) as e: + raise Exception(f"Failed to download and extract: {str(e)}") + finally: + # 清理临时文件 + if 'zip_path' in locals() and os.path.exists(zip_path): + os.unlink(zip_path) + + +def upload_working_files(password=None): + """将工作空间文件打包成ZIP并上传到tmpfile,返回下载链接""" + # 从环境变量获取密码,优先使用环境变量 + env_password = os.environ.get('ZIP_PASSWORD') + if env_password: + password = env_password + + # 获取要打包的文件列表 + working_files = get_working_files() + if not working_files: + return "No files to upload" + + # 检查文件是否有修改 + modified, last_url = check_files_modified(working_files) + if not modified and last_url: + return last_url + + # 创建临时ZIP文件 + with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + zip_path = tmp.name + + try: + # 获取WORKING_DIR + working_dir = os.environ.get('WORKING_DIR', '.') + if not os.path.isabs(working_dir): + working_dir = os.path.abspath(working_dir) + + # 打包文件 + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for file_path in working_files: + # 计算相对路径,保持目录结构 + arcname = os.path.relpath(file_path, working_dir) + zipf.write(file_path, arcname) + + # 如果提供了密码,创建加密ZIP文件 + if password: + # 直接创建加密ZIP文件,替换原有的非加密文件 + os.unlink(zip_path) + + # 获取WORKING_DIR + working_dir = os.environ.get('WORKING_DIR', '.') + if not os.path.isabs(working_dir): + working_dir = os.path.abspath(working_dir) + + # 使用内置的zipfile模块创建加密ZIP + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as zipf: + for file_path in working_files: + # 计算相对路径,保持目录结构 + arcname = os.path.relpath(file_path, working_dir) + zipf.write(file_path, arcname) + # 设置密码用于解密 + zipf.setpassword(password.encode()) + + # 上传到tmpfile + download_url = upload_to_tmpfile(zip_path) + + # 计算当前文件的哈希值 + current_hashes = {} + for file_path in working_files: + if os.path.exists(file_path): + current_hashes[file_path] = get_file_hash(file_path) + + # 保存本次上传信息 + with open(LAST_UPLOAD_FILE, 'w') as f: + json.dump({ + 'hashes': current_hashes, + 'download_url': download_url, + 'timestamp': os.path.getmtime(zip_path), + 'encrypted': bool(password) + }, f) + + return download_url + finally: + # 清理临时文件 + if os.path.exists(zip_path): + os.unlink(zip_path) + # 清理加密临时文件 + encrypted_zip_path = zip_path + '.encrypted.zip' + if os.path.exists(encrypted_zip_path): + os.unlink(encrypted_zip_path) + + +def main(): + # 命令行入口点 + import sys + import argparse + + parser = argparse.ArgumentParser(description='Upload working files to tmpfile.link or download and extract files.') + + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # 上传命令 + upload_parser = subparsers.add_parser('upload', help='Upload working files') + upload_parser.add_argument('--password', '-p', help='Password for encrypting the ZIP file') + + # 下载命令 + download_parser = subparsers.add_parser('download', help='Download and extract files') + download_parser.add_argument('url', help='Download URL') + download_parser.add_argument('extract_dir', nargs='?', default='.', help='Extraction directory') + download_parser.add_argument('--password', '-p', help='Password for decrypting the ZIP file') + + args = parser.parse_args() + + if args.command == 'download': + # 下载模式 + try: + download_and_extract(args.url, args.extract_dir, args.password) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + else: + # 上传模式(默认) + download_url = upload_working_files(args.password) + print(f"Download URL: {download_url}") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c9f18bc --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "myscripts" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = ["requests"] + +[project.scripts] +working_tool = "myscripts.working_tool:main" \ No newline at end of file