From 9541a1af869dc44d6cfc1f6549ebd7b7fd8674ab Mon Sep 17 00:00:00 2001 From: yhydev Date: Mon, 2 Feb 2026 02:02:31 +0800 Subject: [PATCH] init --- position_closer.py | 473 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100644 position_closer.py diff --git a/position_closer.py b/position_closer.py new file mode 100644 index 0000000..6faa7ec --- /dev/null +++ b/position_closer.py @@ -0,0 +1,473 @@ +""" +币安永续合约定时平仓脚本。 + +平仓触发条件:合约多空名义价值总和 > 阈值(默认 50 USDT)且未实现盈亏 > 0(盈利); +或在 Redis 指定 Set 中的合约强制平仓。 + +流程: +1. 获取当前持仓; +2. 得到需要平仓的交易对; +3. 创建 BinanceSocketManager,监听需要平仓的合约交易对,收到 WS 事件后按规则创建平仓订单。 +平仓规则:多头限价卖出 = 当前价 * 1.003,空头限价买入 = 当前价 * 0.997。 + +纯逻辑为顶层函数,便于单元测试(from position_closer import round_to_step, ...); +I/O 与编排在下方 async 函数中。 +""" +from __future__ import annotations + +import asyncio +from decimal import Decimal +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from binance import AsyncClient as BinanceAsyncClient + +# 第三方与 I/O 依赖在下方 async 函数内按需导入,便于单测时只导入纯逻辑而不触发 binance/redis + +# ---------- 纯逻辑(无 I/O,可单独单元测试)---------- + +LONG_CLOSE_PRICE_RATIO = Decimal("1.003") +SHORT_CLOSE_PRICE_RATIO = Decimal("0.997") + + +def round_to_step(value: Decimal, step: str) -> str: + """按 step 精度舍入。""" + step_d = Decimal(step) + if step_d <= 0: + return str(value) + q = (value / step_d).quantize(Decimal("1"), rounding="ROUND_DOWN") + return str((q * step_d).normalize()) + + +def round_price_to_tick(price: Decimal, tick_size: str) -> str: + """ + 按合约最小价格精度(tickSize)优化价格,返回符合交易所要求的字符串。 + 不同合约的 price_filter(tickSize)不同,必须舍入到其整数倍,否则下单会报错。 + """ + tick = Decimal(tick_size) + if tick <= 0: + return str(price) + rounded = (price / tick).quantize(Decimal("1"), rounding="ROUND_DOWN") * tick + # 避免科学计数法(如 1E+2),保证 API 接受 + s = f"{rounded:.10f}".rstrip("0").rstrip(".") + return s if s else "0" + + +def group_positions_by_symbol(positions: list[dict]) -> dict[str, list[dict]]: + """按 symbol 分组。""" + by_symbol: dict[str, list[dict]] = {} + for p in positions: + sym = p.get("symbol", "") + if sym: + by_symbol.setdefault(sym, []).append(p) + return by_symbol + + +def filter_nonzero_positions(positions: list[dict]) -> list[dict]: + """只保留 positionAmt 非零的持仓。""" + return [ + p for p in positions + if float(p.get("positionAmt", 0) or 0) != 0 + ] + + +def get_notional_by_side(positions: list[dict]) -> tuple[float, float]: + """计算该合约下多头、空头的名义价值(绝对值)。""" + long_notional = 0.0 + short_notional = 0.0 + for p in positions: + amt = float(p.get("positionAmt", 0) or 0) + notional = abs(float(p.get("notional", 0) or 0)) + side = (p.get("positionSide") or "BOTH").upper() + if side == "BOTH": + if amt > 0: + long_notional += notional + else: + short_notional += notional + elif side == "LONG": + long_notional += notional + else: + short_notional += notional + return long_notional, short_notional + + +def get_unrealized_profit(positions: list[dict]) -> float: + """计算该合约下所有持仓的未实现盈亏总和(币安字段 unrealizedProfit)。""" + total = 0.0 + for p in positions: + total += float(p.get("unrealizedProfit", 0) or 0) + return total + + +def should_close_symbol( + long_notional: float, + short_notional: float, + unrealized_profit: float, + threshold: float, + in_redis_set: bool = False, +) -> bool: + """ + 是否对该合约执行平仓。 + 条件:在 Redis 集合中 或 (多空名义价值总和 > threshold 且 未实现盈亏 > 0,即盈利状态)。 + """ + if in_redis_set: + return True + total_notional = long_notional + short_notional + if total_notional > threshold and unrealized_profit > 0: + return True + return False + + +def get_symbols_to_close( + by_symbol: dict[str, list[dict]], + threshold: float, + redis_contracts: set[str], +) -> tuple[set[str], dict[str, list[dict]]]: + """ + 根据持仓、阈值和 Redis 集合,得到需要平仓的交易对及其持仓子集。 + 平仓条件:在 Redis 中 或 (多空名义价值总和 > threshold 且 盈利)。 + 返回 (symbols_to_close, by_symbol_filtered)。 + """ + symbols_to_close: set[str] = set() + for symbol, sym_positions in by_symbol.items(): + long_n, short_n = get_notional_by_side(sym_positions) + unrealized_profit = get_unrealized_profit(sym_positions) + in_redis = symbol in redis_contracts + if should_close_symbol(long_n, short_n, unrealized_profit, threshold, in_redis): + symbols_to_close.add(symbol) + by_symbol_filtered = {k: v for k, v in by_symbol.items() if k in symbols_to_close} + return symbols_to_close, by_symbol_filtered + + +def parse_ticker_message(msg: dict) -> tuple[str | None, Decimal | None]: + """ + 解析 WS 组合流 ticker 消息,提取 symbol 与最新价。 + 返回 (symbol, price) 或 (None, None) / (symbol, None)。 + """ + stream = msg.get("stream", "") + symbol = stream.upper().split("@")[0] if stream else None + if not symbol: + return None, None + data = msg.get("data", {}) + price_str = data.get("c") + if not price_str: + return symbol, None + try: + return symbol, Decimal(str(price_str)) + except Exception: + return symbol, None + + +def parse_exchange_info_to_precisions(info: dict) -> dict[str, dict[str, str]]: + """从 futures exchangeInfo 解析出各 symbol 的 lot_size、price_filter。""" + result = {} + for s in info.get("symbols", []): + sym = s.get("symbol", "") + if not sym: + continue + lot_size = "0.01" + price_filter = "0.01" + for f in s.get("filters", []): + if f.get("filterType") == "LOT_SIZE": + lot_size = f.get("stepSize", "0.01") + elif f.get("filterType") == "PRICE_FILTER": + price_filter = f.get("tickSize", "0.01") + result[sym] = {"lot_size": lot_size, "price_filter": price_filter} + return result + + +def build_close_order_params( + symbol: str, + position_side: str, + position_amt: float, + current_price: Decimal, + precisions: dict[str, dict[str, str]], + reduce_only: bool = False, + long_ratio: Decimal = LONG_CLOSE_PRICE_RATIO, + short_ratio: Decimal = SHORT_CLOSE_PRICE_RATIO, +) -> dict[str, Any] | None: + """ + 根据持仓与当前价构建平仓限价单参数字典(不发起请求)。 + 若 quantity 舍入后为 0 则返回 None。 + """ + amt = abs(position_amt) + if amt <= 0: + return None + side_upper = (position_side or "BOTH").upper() + prec = precisions.get(symbol, {"lot_size": "0.001", "price_filter": "0.01"}) + quantity = round_to_step(Decimal(str(amt)), prec["lot_size"]) + if not quantity or Decimal(quantity) <= 0: + return None + # 使用该合约的最小价格精度(tickSize)优化平仓价格 + tick_size = prec["price_filter"] + if side_upper == "LONG" or (side_upper == "BOTH" and position_amt > 0): + side = "SELL" + price = current_price * long_ratio + else: + side = "BUY" + price = current_price * short_ratio + price_str = round_price_to_tick(price, tick_size) + params: dict[str, Any] = { + "symbol": symbol, + "side": side, + "type": "LIMIT", + "timeInForce": "GTC", + "quantity": quantity, + "price": price_str, + } + if side_upper in ("LONG", "SHORT"): + params["positionSide"] = side_upper + if reduce_only or side_upper == "BOTH": + params["reduceOnly"] = "true" + return params + + +# ---------- I/O 与编排 ---------- + +async def get_symbol_precision(client: BinanceAsyncClient) -> dict[str, dict]: + """获取合约的 quantity/price 精度。""" + info = await client.futures_exchange_info() + return parse_exchange_info_to_precisions(info) + + +async def get_positions(client: BinanceAsyncClient) -> list[dict]: + """获取永续合约持仓(只保留有仓位的)。""" + account = await client.futures_account() + positions = account.get("positions", []) + return filter_nonzero_positions(positions) + + +async def place_close_order( + client: BinanceAsyncClient, + symbol: str, + position_side: str, + position_amt: float, + current_price, + precisions: dict[str, dict], + reduce_only: bool = False, + dry_run: bool = False, +) -> None: + """下平仓限价单。dry_run=True 时仅计算并打印参数,不真实下单。""" + from loguru import logger + + price = current_price if isinstance(current_price, Decimal) else Decimal(str(current_price)) + params = build_close_order_params( + symbol=symbol, + position_side=position_side, + position_amt=position_amt, + current_price=price, + precisions=precisions, + reduce_only=reduce_only, + ) + if not params: + logger.warning("舍入后 quantity 为 0,跳过 {} {}", symbol, position_side) + return + if dry_run: + logger.info( + "【DRY-RUN】将下单: {} {} {} @ {} qty={}", + symbol, + params["side"], + position_side, + params["price"], + params["quantity"], + ) + return + try: + await client.futures_create_order(**params) + logger.info( + "平仓单已提交: {} {} {} @ {} qty={}", + symbol, + params["side"], + position_side, + params["price"], + params["quantity"], + ) + except Exception as e: + logger.exception("平仓下单失败 {}: {}", symbol, e) + + +async def run_ws_listener( + client: BinanceAsyncClient, + symbols_to_close: set[str], + by_symbol: dict[str, list[dict]], + precisions: dict[str, dict], + redis_key: str, + redis_url: str, + redis_contracts: set[str], + dry_run: bool = False, + ws_connection_timeout: float = 30, +) -> None: + """ + 创建 BinanceSocketManager,订阅需平仓合约的 ticker, + 收到 WS 事件后按规则下平仓单(每个 symbol 只下一次)。 + dry_run=True 时只打印将下的单,不真实下单、不从 Redis 移除。 + ws_connection_timeout: 建立 WebSocket 连接的超时(秒),超时则退出,避免 async with socket 处无限挂起。 + """ + from binance.enums import FuturesType + from binance.ws.streams import BinanceSocketManager + from loguru import logger + from redis.asyncio import Redis as Aioredis + + if not symbols_to_close: + logger.info("无需平仓的交易对,退出 WS 监听") + return + + bsm = BinanceSocketManager(client) + streams = [f"{s.lower()}@ticker" for s in symbols_to_close] + socket = bsm.futures_multiplex_socket(streams, futures_type=FuturesType.USD_M) + symbols_order_placed: set[str] = set() + + async def on_ticker_message(msg: dict) -> None: + symbol, current_price = parse_ticker_message(msg) + if not symbol or symbol not in symbols_to_close or symbol in symbols_order_placed: + return + if current_price is None: + return + sym_positions = by_symbol.get(symbol, []) + if not sym_positions: + return + logger.info("收到 {} ticker,价格 {},执行平仓", symbol, current_price) + for p in sym_positions: + amt = float(p.get("positionAmt", 0) or 0) + if amt == 0: + continue + pos_side = p.get("positionSide") or "BOTH" + await place_close_order( + client, symbol, pos_side, amt, current_price, precisions, + reduce_only=(pos_side == "BOTH"), + dry_run=dry_run, + ) + symbols_order_placed.add(symbol) + if not dry_run and symbol in redis_contracts: + redis_client = Aioredis.from_url(redis_url, decode_responses=True) + try: + await redis_client.srem(redis_key, symbol) + logger.info("已从 Redis 平仓集合移除: {}", symbol) + except Exception as e: + logger.warning("从 Redis 移除 {} 失败: {}", symbol, e) + finally: + await redis_client.aclose() + + # 仅对「建立连接」阶段加超时,避免网络不可达时 async with socket 无限挂起 + try: + s = await asyncio.wait_for(socket.__aenter__(), timeout=ws_connection_timeout) + except asyncio.TimeoutError: + logger.error( + "WebSocket 连接超时({}s),请检查网络或 Binance 可访问性(如需代理可配置 https_proxy)", + ws_connection_timeout, + ) + return + except Exception as e: + logger.exception("WebSocket 连接失败: {}", e) + return + + try: + logger.info("WS 已连接,监听 {} 个交易对: {}", len(symbols_to_close), sorted(symbols_to_close)) + while True: + try: + msg = await asyncio.wait_for(s.recv(), timeout=300) + except asyncio.TimeoutError: + continue + except Exception as e: + logger.exception("WS 接收异常: {}", e) + break + if not msg: + continue + await on_ticker_message(msg) + if symbols_order_placed >= symbols_to_close: + logger.info("所有需平仓交易对已下单,退出监听") + break + finally: + await socket.__aexit__(None, None, None) + + +async def main_async() -> None: + from binance import AsyncClient as BinanceAsyncClient + from loguru import logger + from redis.asyncio import Redis as Aioredis + + from config import settings + + api_key = (getattr(settings, "binance_api_key", None) or "").strip() + api_secret = (getattr(settings, "binance_api_secret", None) or "").strip() + if not api_key or not api_secret: + logger.error("请在 .secrets.toml 或环境变量中设置 binance_api_key / binance_api_secret") + return + + import os + + env_dry = os.environ.get("DRY_RUN", "").strip().lower() + if env_dry in ("0", "false", "no"): + dry_run = False + elif env_dry in ("1", "true", "yes"): + dry_run = True + else: + dry_run = bool(getattr(settings, "dry_run", True)) # 默认 dry-run + if dry_run: + logger.info("【DRY-RUN】仅测试全流程,不会真实下单、不会从 Redis 移除") + + base_url = getattr(settings, "binance_base_url", None) or "https://fapi.binance.com" + testnet = "testnet" in str(base_url).lower() + redis_url = getattr(settings, "redis_url", None) or "redis://localhost:6379/0" + threshold = float(getattr(settings, "notional_threshold", None) or 50) + redis_key = getattr(settings, "redis_close_key", None) or "close_position:contracts" + ws_connection_timeout = float(getattr(settings, "ws_connection_timeout", None) or 30) + + client = await BinanceAsyncClient.create( + api_key=api_key, + api_secret=api_secret + ) + try: + positions = await get_positions(client) + by_symbol = group_positions_by_symbol(positions) + + try: + redis_client = Aioredis.from_url(redis_url, decode_responses=True) + redis_contracts = set(await redis_client.smembers(redis_key) or []) + await redis_client.aclose() + except Exception as e: + logger.warning("读取 Redis 平仓集合失败,仅按名义价值判断: {}", e) + redis_contracts = set() + + symbols_to_close, by_symbol_filtered = get_symbols_to_close( + by_symbol, threshold, redis_contracts + ) + for symbol, sym_positions in by_symbol_filtered.items(): + long_n, short_n = get_notional_by_side(sym_positions) + profit = get_unrealized_profit(sym_positions) + logger.info( + "需平仓: symbol={} long_notional={:.2f} short_notional={:.2f} unrealized_profit={:.2f} in_redis={}", + symbol, + long_n, + short_n, + profit, + symbol in redis_contracts, + ) + + if not symbols_to_close: + logger.info("当前无需要平仓的交易对") + return + + precisions = await get_symbol_precision(client) + + await run_ws_listener( + client=client, + symbols_to_close=symbols_to_close, + by_symbol=by_symbol_filtered, + precisions=precisions, + redis_key=redis_key, + redis_url=redis_url, + redis_contracts=redis_contracts, + dry_run=dry_run, + ws_connection_timeout=ws_connection_timeout, + ) + finally: + await client.close_connection() + + +def main() -> None: + asyncio.run(main_async()) + + +if __name__ == "__main__": + main()