""" 币安永续合约定时平仓脚本。 平仓触发条件: - 多空名义价值总和 ≤ 20 USDT 且未实现盈亏 > 0.05。 流程: 1. 获取当前持仓,得到需要平仓的交易对; 2. 创建 BinanceSocketManager,监听需平仓合约,收到 WS 事件后按规则创建平仓订单。 平仓规则:多头限价卖出 = 当前价 * 1.003,空头限价买入 = 当前价 * 0.997。 纯逻辑为顶层函数,便于单元测试(from position_closer import round_to_step, ...); I/O 与编排在下方 async 函数中。 """ from __future__ import annotations import asyncio from decimal import Decimal from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from binance import AsyncClient as BinanceAsyncClient # 第三方与 I/O 依赖在下方 async 函数内按需导入,便于单测时只导入纯逻辑而不触发 binance/redis # ---------- 纯逻辑(无 I/O,可单独单元测试)---------- LONG_CLOSE_PRICE_RATIO = Decimal("1.003") SHORT_CLOSE_PRICE_RATIO = Decimal("0.997") def round_to_step(value: Decimal, step: str) -> str: """按 step 精度舍入。""" step_d = Decimal(step) if step_d <= 0: return str(value) q = (value / step_d).quantize(Decimal("1"), rounding="ROUND_DOWN") return str((q * step_d).normalize()) def round_price_to_tick(price: Decimal, tick_size: str) -> str: """ 按合约最小价格精度(tickSize)优化价格,返回符合交易所要求的字符串。 不同合约的 price_filter(tickSize)不同,必须舍入到其整数倍,否则下单会报错。 """ tick = Decimal(tick_size) if tick <= 0: return str(price) rounded = (price / tick).quantize(Decimal("1"), rounding="ROUND_DOWN") * tick # 避免科学计数法(如 1E+2),保证 API 接受 s = f"{rounded:.10f}".rstrip("0").rstrip(".") return s if s else "0" def group_positions_by_symbol(positions: list[dict]) -> dict[str, list[dict]]: """按 symbol 分组。""" by_symbol: dict[str, list[dict]] = {} for p in positions: sym = p.get("symbol", "") if sym: by_symbol.setdefault(sym, []).append(p) return by_symbol def filter_nonzero_positions(positions: list[dict]) -> list[dict]: """只保留 positionAmt 非零的持仓。""" return [ p for p in positions if float(p.get("positionAmt", 0) or 0) != 0 ] def get_notional_by_side(positions: list[dict]) -> tuple[float, float]: """计算该合约下多头、空头的名义价值(绝对值)。""" long_notional = 0.0 short_notional = 0.0 for p in positions: amt = float(p.get("positionAmt", 0) or 0) notional = abs(float(p.get("notional", 0) or 0)) side = (p.get("positionSide") or "BOTH").upper() if side == "BOTH": if amt > 0: long_notional += notional else: short_notional += notional elif side == "LONG": long_notional += notional else: short_notional += notional return long_notional, short_notional def get_unrealized_profit(positions: list[dict]) -> float: """计算该合约下所有持仓的未实现盈亏总和(币安字段 unrealizedProfit)。""" total = 0.0 for p in positions: total += float(p.get("unrealizedProfit", 0) or 0) return total def should_close_symbol( long_notional: float, short_notional: float, unrealized_profit: float, notional_threshold: float = 20, min_profit: float = 0.05, ) -> bool: """ 是否对该合约执行平仓。 条件:多空名义价值总和 ≤ notional_threshold 且 未实现盈亏 > min_profit。 """ total_notional = long_notional + short_notional return total_notional <= notional_threshold and unrealized_profit > min_profit def get_symbols_to_close( by_symbol: dict[str, list[dict]], notional_threshold: float = 20, min_profit: float = 0.05, ) -> tuple[set[str], dict[str, list[dict]]]: """ 根据持仓得到需要平仓的交易对及其持仓子集。 平仓条件:多空名义价值总和 ≤ notional_threshold 且 未实现盈亏 > min_profit。 返回 (symbols_to_close, by_symbol_filtered)。 """ symbols_to_close: set[str] = set() for symbol, sym_positions in by_symbol.items(): long_n, short_n = get_notional_by_side(sym_positions) unrealized_profit = get_unrealized_profit(sym_positions) if should_close_symbol( long_n, short_n, unrealized_profit, notional_threshold=notional_threshold, min_profit=min_profit, ): symbols_to_close.add(symbol) by_symbol_filtered = {k: v for k, v in by_symbol.items() if k in symbols_to_close} return symbols_to_close, by_symbol_filtered def parse_ticker_message(msg: dict) -> tuple[str | None, Decimal | None]: """ 解析 WS 组合流 ticker 消息,提取 symbol 与最新价。 返回 (symbol, price) 或 (None, None) / (symbol, None)。 """ stream = msg.get("stream", "") symbol = stream.upper().split("@")[0] if stream else None if not symbol: return None, None data = msg.get("data", {}) price_str = data.get("c") if not price_str: return symbol, None try: return symbol, Decimal(str(price_str)) except Exception: return symbol, None def parse_exchange_info_to_precisions(info: dict) -> dict[str, dict[str, str]]: """从 futures exchangeInfo 解析出各 symbol 的 lot_size、price_filter。""" result = {} for s in info.get("symbols", []): sym = s.get("symbol", "") if not sym: continue lot_size = "0.01" price_filter = "0.01" for f in s.get("filters", []): if f.get("filterType") == "LOT_SIZE": lot_size = f.get("stepSize", "0.01") elif f.get("filterType") == "PRICE_FILTER": price_filter = f.get("tickSize", "0.01") result[sym] = {"lot_size": lot_size, "price_filter": price_filter} return result def build_close_order_params( symbol: str, position_side: str, position_amt: float, current_price: Decimal, precisions: dict[str, dict[str, str]], reduce_only: bool = False, long_ratio: Decimal = LONG_CLOSE_PRICE_RATIO, short_ratio: Decimal = SHORT_CLOSE_PRICE_RATIO, ) -> dict[str, Any] | None: """ 根据持仓与当前价构建平仓限价单参数字典(不发起请求)。 若 quantity 舍入后为 0 则返回 None。 """ amt = abs(position_amt) if amt <= 0: return None side_upper = (position_side or "BOTH").upper() prec = precisions.get(symbol, {"lot_size": "0.001", "price_filter": "0.01"}) quantity = round_to_step(Decimal(str(amt)), prec["lot_size"]) if not quantity or Decimal(quantity) <= 0: return None # 使用该合约的最小价格精度(tickSize)优化平仓价格 tick_size = prec["price_filter"] if side_upper == "LONG" or (side_upper == "BOTH" and position_amt > 0): side = "SELL" price = current_price * long_ratio else: side = "BUY" price = current_price * short_ratio price_str = round_price_to_tick(price, tick_size) params: dict[str, Any] = { "symbol": symbol, "side": side, "type": "LIMIT", "timeInForce": "GTC", "quantity": quantity, "price": price_str, } if side_upper in ("LONG", "SHORT"): params["positionSide"] = side_upper if reduce_only or side_upper == "BOTH": params["reduceOnly"] = "true" return params # ---------- I/O 与编排 ---------- async def get_symbol_precision(client: BinanceAsyncClient) -> dict[str, dict]: """获取合约的 quantity/price 精度。""" info = await client.futures_exchange_info() return parse_exchange_info_to_precisions(info) async def get_positions(client: BinanceAsyncClient) -> list[dict]: """获取永续合约持仓(只保留有仓位的)。""" account = await client.futures_account() positions = account.get("positions", []) return filter_nonzero_positions(positions) async def place_close_order( client: BinanceAsyncClient, symbol: str, position_side: str, position_amt: float, current_price, precisions: dict[str, dict], reduce_only: bool = False, dry_run: bool = False, ) -> None: """下平仓限价单。dry_run=True 时仅计算并打印参数,不真实下单。""" from loguru import logger price = current_price if isinstance(current_price, Decimal) else Decimal(str(current_price)) params = build_close_order_params( symbol=symbol, position_side=position_side, position_amt=position_amt, current_price=price, precisions=precisions, reduce_only=reduce_only, ) if not params: logger.warning("舍入后 quantity 为 0,跳过 {} {}", symbol, position_side) return if dry_run: logger.info( "【DRY-RUN】将下单: {} {} {} @ {} qty={}", symbol, params["side"], position_side, params["price"], params["quantity"], ) return try: await client.futures_create_order(**params) logger.info( "平仓单已提交: {} {} {} @ {} qty={}", symbol, params["side"], position_side, params["price"], params["quantity"], ) except Exception as e: logger.exception("平仓下单失败 {}: {}", symbol, e) async def run_ws_listener( client: BinanceAsyncClient, symbols_to_close: set[str], by_symbol: dict[str, list[dict]], precisions: dict[str, dict], redis_key: str, redis_url: str, redis_contracts: set[str], dry_run: bool = False, ws_connection_timeout: float = 30, ) -> None: """ 创建 BinanceSocketManager,订阅需平仓合约的 ticker, 收到 WS 事件后按规则下平仓单(每个 symbol 只下一次)。 dry_run=True 时只打印将下的单,不真实下单、不从 Redis 移除。 ws_connection_timeout: 建立 WebSocket 连接的超时(秒),超时则退出,避免 async with socket 处无限挂起。 """ from binance.enums import FuturesType from binance.ws.streams import BinanceSocketManager from loguru import logger from redis.asyncio import Redis as Aioredis if not symbols_to_close: logger.info("无需平仓的交易对,退出 WS 监听") return bsm = BinanceSocketManager(client) streams = [f"{s.lower()}@ticker" for s in symbols_to_close] socket = bsm.futures_multiplex_socket(streams, futures_type=FuturesType.USD_M) symbols_order_placed: set[str] = set() async def on_ticker_message(msg: dict) -> None: symbol, current_price = parse_ticker_message(msg) if not symbol or symbol not in symbols_to_close or symbol in symbols_order_placed: return if current_price is None: return sym_positions = by_symbol.get(symbol, []) if not sym_positions: return logger.info("收到 {} ticker,价格 {},执行平仓", symbol, current_price) for p in sym_positions: amt = float(p.get("positionAmt", 0) or 0) if amt == 0: continue pos_side = p.get("positionSide") or "BOTH" await place_close_order( client, symbol, pos_side, amt, current_price, precisions, reduce_only=(pos_side == "BOTH"), dry_run=dry_run, ) symbols_order_placed.add(symbol) if not dry_run and symbol in redis_contracts: redis_client = Aioredis.from_url(redis_url, decode_responses=True) try: await redis_client.srem(redis_key, symbol) logger.info("已从 Redis 平仓集合移除: {}", symbol) except Exception as e: logger.warning("从 Redis 移除 {} 失败: {}", symbol, e) finally: await redis_client.aclose() # 仅对「建立连接」阶段加超时,避免网络不可达时 async with socket 无限挂起 try: s = await asyncio.wait_for(socket.__aenter__(), timeout=ws_connection_timeout) except asyncio.TimeoutError: logger.error( "WebSocket 连接超时({}s),请检查网络或 Binance 可访问性(如需代理可配置 https_proxy)", ws_connection_timeout, ) return except Exception as e: logger.exception("WebSocket 连接失败: {}", e) return try: logger.info("WS 已连接,监听 {} 个交易对: {}", len(symbols_to_close), sorted(symbols_to_close)) while True: try: msg = await asyncio.wait_for(s.recv(), timeout=300) except asyncio.TimeoutError: continue except Exception as e: logger.exception("WS 接收异常: {}", e) break if not msg: continue await on_ticker_message(msg) if symbols_order_placed >= symbols_to_close: logger.info("所有需平仓交易对已下单,退出监听") break finally: await socket.__aexit__(None, None, None) async def main_async() -> None: from binance import AsyncClient as BinanceAsyncClient from loguru import logger from redis.asyncio import Redis as Aioredis from config import settings api_key = (getattr(settings, "binance_api_key", None) or "").strip() api_secret = (getattr(settings, "binance_api_secret", None) or "").strip() if not api_key or not api_secret: logger.error("请在 .secrets.toml 或环境变量中设置 binance_api_key / binance_api_secret") return import os env_dry = os.environ.get("DRY_RUN", "").strip().lower() if env_dry in ("0", "false", "no"): dry_run = False elif env_dry in ("1", "true", "yes"): dry_run = True else: dry_run = bool(getattr(settings, "dry_run", True)) # 默认 dry-run if dry_run: logger.info("【DRY-RUN】仅测试全流程,不会真实下单、不会从 Redis 移除") base_url = getattr(settings, "binance_base_url", None) or "https://fapi.binance.com" testnet = "testnet" in str(base_url).lower() redis_url = getattr(settings, "redis_url", None) or "redis://localhost:6379/0" notional_threshold = float(getattr(settings, "notional_close_threshold", None) or 20) min_profit = float(getattr(settings, "close_min_profit", None) or 0.05) redis_key = getattr(settings, "redis_close_key", None) or "close_position:contracts" ws_connection_timeout = float(getattr(settings, "ws_connection_timeout", None) or 30) client = await BinanceAsyncClient.create( api_key=api_key, api_secret=api_secret ) try: positions = await get_positions(client) by_symbol = group_positions_by_symbol(positions) logger.info("当前持仓合约数: {}", len(by_symbol)) by_symbol_sorted = sorted( by_symbol.items(), key=lambda x: get_unrealized_profit(x[1]), reverse=True, ) for symbol, sym_positions in by_symbol_sorted: unrealized = get_unrealized_profit(sym_positions) long_n, short_n = get_notional_by_side(sym_positions) logger.info( "合约 {} 未实现盈亏={:.2f} 多头名义={:.2f} 空头名义={:.2f}", symbol, unrealized, long_n, short_n, ) try: redis_client = Aioredis.from_url(redis_url, decode_responses=True) redis_contracts = set(await redis_client.smembers(redis_key) or []) await redis_client.aclose() except Exception as e: logger.warning("读取 Redis 平仓集合失败,仅按名义价值判断: {}", e) redis_contracts = set() symbols_to_close, by_symbol_filtered = get_symbols_to_close( by_symbol, notional_threshold=notional_threshold, min_profit=min_profit, ) for symbol, sym_positions in by_symbol_filtered.items(): long_n, short_n = get_notional_by_side(sym_positions) profit = get_unrealized_profit(sym_positions) logger.info( "需平仓: symbol={} long_notional={:.2f} short_notional={:.2f} unrealized_profit={:.2f}", symbol, long_n, short_n, profit, ) if not symbols_to_close: logger.info("当前无需要平仓的交易对") return precisions = await get_symbol_precision(client) await run_ws_listener( client=client, symbols_to_close=symbols_to_close, by_symbol=by_symbol_filtered, precisions=precisions, redis_key=redis_key, redis_url=redis_url, redis_contracts=redis_contracts, dry_run=dry_run, ws_connection_timeout=ws_connection_timeout, ) finally: await client.close_connection() def main() -> None: asyncio.run(main_async()) if __name__ == "__main__": main()