This commit is contained in:
yhydev
2026-02-02 02:02:31 +08:00
commit 9541a1af86

473
position_closer.py Normal file
View File

@@ -0,0 +1,473 @@
"""
币安永续合约定时平仓脚本。
平仓触发条件:合约多空名义价值总和 > 阈值(默认 50 USDT且未实现盈亏 > 0盈利
或在 Redis 指定 Set 中的合约强制平仓。
流程:
1. 获取当前持仓;
2. 得到需要平仓的交易对;
3. 创建 BinanceSocketManager监听需要平仓的合约交易对收到 WS 事件后按规则创建平仓订单。
平仓规则:多头限价卖出 = 当前价 * 1.003,空头限价买入 = 当前价 * 0.997。
纯逻辑为顶层函数便于单元测试from position_closer import round_to_step, ...
I/O 与编排在下方 async 函数中。
"""
from __future__ import annotations
import asyncio
from decimal import Decimal
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from binance import AsyncClient as BinanceAsyncClient
# 第三方与 I/O 依赖在下方 async 函数内按需导入,便于单测时只导入纯逻辑而不触发 binance/redis
# ---------- 纯逻辑(无 I/O可单独单元测试----------
LONG_CLOSE_PRICE_RATIO = Decimal("1.003")
SHORT_CLOSE_PRICE_RATIO = Decimal("0.997")
def round_to_step(value: Decimal, step: str) -> str:
"""按 step 精度舍入。"""
step_d = Decimal(step)
if step_d <= 0:
return str(value)
q = (value / step_d).quantize(Decimal("1"), rounding="ROUND_DOWN")
return str((q * step_d).normalize())
def round_price_to_tick(price: Decimal, tick_size: str) -> str:
"""
按合约最小价格精度tickSize优化价格返回符合交易所要求的字符串。
不同合约的 price_filtertickSize不同必须舍入到其整数倍否则下单会报错。
"""
tick = Decimal(tick_size)
if tick <= 0:
return str(price)
rounded = (price / tick).quantize(Decimal("1"), rounding="ROUND_DOWN") * tick
# 避免科学计数法(如 1E+2保证 API 接受
s = f"{rounded:.10f}".rstrip("0").rstrip(".")
return s if s else "0"
def group_positions_by_symbol(positions: list[dict]) -> dict[str, list[dict]]:
"""按 symbol 分组。"""
by_symbol: dict[str, list[dict]] = {}
for p in positions:
sym = p.get("symbol", "")
if sym:
by_symbol.setdefault(sym, []).append(p)
return by_symbol
def filter_nonzero_positions(positions: list[dict]) -> list[dict]:
"""只保留 positionAmt 非零的持仓。"""
return [
p for p in positions
if float(p.get("positionAmt", 0) or 0) != 0
]
def get_notional_by_side(positions: list[dict]) -> tuple[float, float]:
"""计算该合约下多头、空头的名义价值(绝对值)。"""
long_notional = 0.0
short_notional = 0.0
for p in positions:
amt = float(p.get("positionAmt", 0) or 0)
notional = abs(float(p.get("notional", 0) or 0))
side = (p.get("positionSide") or "BOTH").upper()
if side == "BOTH":
if amt > 0:
long_notional += notional
else:
short_notional += notional
elif side == "LONG":
long_notional += notional
else:
short_notional += notional
return long_notional, short_notional
def get_unrealized_profit(positions: list[dict]) -> float:
"""计算该合约下所有持仓的未实现盈亏总和(币安字段 unrealizedProfit"""
total = 0.0
for p in positions:
total += float(p.get("unrealizedProfit", 0) or 0)
return total
def should_close_symbol(
long_notional: float,
short_notional: float,
unrealized_profit: float,
threshold: float,
in_redis_set: bool = False,
) -> bool:
"""
是否对该合约执行平仓。
条件:在 Redis 集合中 或 (多空名义价值总和 > threshold 且 未实现盈亏 > 0即盈利状态
"""
if in_redis_set:
return True
total_notional = long_notional + short_notional
if total_notional > threshold and unrealized_profit > 0:
return True
return False
def get_symbols_to_close(
by_symbol: dict[str, list[dict]],
threshold: float,
redis_contracts: set[str],
) -> tuple[set[str], dict[str, list[dict]]]:
"""
根据持仓、阈值和 Redis 集合,得到需要平仓的交易对及其持仓子集。
平仓条件:在 Redis 中 或 (多空名义价值总和 > threshold 且 盈利)。
返回 (symbols_to_close, by_symbol_filtered)。
"""
symbols_to_close: set[str] = set()
for symbol, sym_positions in by_symbol.items():
long_n, short_n = get_notional_by_side(sym_positions)
unrealized_profit = get_unrealized_profit(sym_positions)
in_redis = symbol in redis_contracts
if should_close_symbol(long_n, short_n, unrealized_profit, threshold, in_redis):
symbols_to_close.add(symbol)
by_symbol_filtered = {k: v for k, v in by_symbol.items() if k in symbols_to_close}
return symbols_to_close, by_symbol_filtered
def parse_ticker_message(msg: dict) -> tuple[str | None, Decimal | None]:
"""
解析 WS 组合流 ticker 消息,提取 symbol 与最新价。
返回 (symbol, price) 或 (None, None) / (symbol, None)。
"""
stream = msg.get("stream", "")
symbol = stream.upper().split("@")[0] if stream else None
if not symbol:
return None, None
data = msg.get("data", {})
price_str = data.get("c")
if not price_str:
return symbol, None
try:
return symbol, Decimal(str(price_str))
except Exception:
return symbol, None
def parse_exchange_info_to_precisions(info: dict) -> dict[str, dict[str, str]]:
"""从 futures exchangeInfo 解析出各 symbol 的 lot_size、price_filter。"""
result = {}
for s in info.get("symbols", []):
sym = s.get("symbol", "")
if not sym:
continue
lot_size = "0.01"
price_filter = "0.01"
for f in s.get("filters", []):
if f.get("filterType") == "LOT_SIZE":
lot_size = f.get("stepSize", "0.01")
elif f.get("filterType") == "PRICE_FILTER":
price_filter = f.get("tickSize", "0.01")
result[sym] = {"lot_size": lot_size, "price_filter": price_filter}
return result
def build_close_order_params(
symbol: str,
position_side: str,
position_amt: float,
current_price: Decimal,
precisions: dict[str, dict[str, str]],
reduce_only: bool = False,
long_ratio: Decimal = LONG_CLOSE_PRICE_RATIO,
short_ratio: Decimal = SHORT_CLOSE_PRICE_RATIO,
) -> dict[str, Any] | None:
"""
根据持仓与当前价构建平仓限价单参数字典(不发起请求)。
若 quantity 舍入后为 0 则返回 None。
"""
amt = abs(position_amt)
if amt <= 0:
return None
side_upper = (position_side or "BOTH").upper()
prec = precisions.get(symbol, {"lot_size": "0.001", "price_filter": "0.01"})
quantity = round_to_step(Decimal(str(amt)), prec["lot_size"])
if not quantity or Decimal(quantity) <= 0:
return None
# 使用该合约的最小价格精度tickSize优化平仓价格
tick_size = prec["price_filter"]
if side_upper == "LONG" or (side_upper == "BOTH" and position_amt > 0):
side = "SELL"
price = current_price * long_ratio
else:
side = "BUY"
price = current_price * short_ratio
price_str = round_price_to_tick(price, tick_size)
params: dict[str, Any] = {
"symbol": symbol,
"side": side,
"type": "LIMIT",
"timeInForce": "GTC",
"quantity": quantity,
"price": price_str,
}
if side_upper in ("LONG", "SHORT"):
params["positionSide"] = side_upper
if reduce_only or side_upper == "BOTH":
params["reduceOnly"] = "true"
return params
# ---------- I/O 与编排 ----------
async def get_symbol_precision(client: BinanceAsyncClient) -> dict[str, dict]:
"""获取合约的 quantity/price 精度。"""
info = await client.futures_exchange_info()
return parse_exchange_info_to_precisions(info)
async def get_positions(client: BinanceAsyncClient) -> list[dict]:
"""获取永续合约持仓(只保留有仓位的)。"""
account = await client.futures_account()
positions = account.get("positions", [])
return filter_nonzero_positions(positions)
async def place_close_order(
client: BinanceAsyncClient,
symbol: str,
position_side: str,
position_amt: float,
current_price,
precisions: dict[str, dict],
reduce_only: bool = False,
dry_run: bool = False,
) -> None:
"""下平仓限价单。dry_run=True 时仅计算并打印参数,不真实下单。"""
from loguru import logger
price = current_price if isinstance(current_price, Decimal) else Decimal(str(current_price))
params = build_close_order_params(
symbol=symbol,
position_side=position_side,
position_amt=position_amt,
current_price=price,
precisions=precisions,
reduce_only=reduce_only,
)
if not params:
logger.warning("舍入后 quantity 为 0跳过 {} {}", symbol, position_side)
return
if dry_run:
logger.info(
"【DRY-RUN】将下单: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
return
try:
await client.futures_create_order(**params)
logger.info(
"平仓单已提交: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
except Exception as e:
logger.exception("平仓下单失败 {}: {}", symbol, e)
async def run_ws_listener(
client: BinanceAsyncClient,
symbols_to_close: set[str],
by_symbol: dict[str, list[dict]],
precisions: dict[str, dict],
redis_key: str,
redis_url: str,
redis_contracts: set[str],
dry_run: bool = False,
ws_connection_timeout: float = 30,
) -> None:
"""
创建 BinanceSocketManager订阅需平仓合约的 ticker
收到 WS 事件后按规则下平仓单(每个 symbol 只下一次)。
dry_run=True 时只打印将下的单,不真实下单、不从 Redis 移除。
ws_connection_timeout: 建立 WebSocket 连接的超时(秒),超时则退出,避免 async with socket 处无限挂起。
"""
from binance.enums import FuturesType
from binance.ws.streams import BinanceSocketManager
from loguru import logger
from redis.asyncio import Redis as Aioredis
if not symbols_to_close:
logger.info("无需平仓的交易对,退出 WS 监听")
return
bsm = BinanceSocketManager(client)
streams = [f"{s.lower()}@ticker" for s in symbols_to_close]
socket = bsm.futures_multiplex_socket(streams, futures_type=FuturesType.USD_M)
symbols_order_placed: set[str] = set()
async def on_ticker_message(msg: dict) -> None:
symbol, current_price = parse_ticker_message(msg)
if not symbol or symbol not in symbols_to_close or symbol in symbols_order_placed:
return
if current_price is None:
return
sym_positions = by_symbol.get(symbol, [])
if not sym_positions:
return
logger.info("收到 {} ticker价格 {},执行平仓", symbol, current_price)
for p in sym_positions:
amt = float(p.get("positionAmt", 0) or 0)
if amt == 0:
continue
pos_side = p.get("positionSide") or "BOTH"
await place_close_order(
client, symbol, pos_side, amt, current_price, precisions,
reduce_only=(pos_side == "BOTH"),
dry_run=dry_run,
)
symbols_order_placed.add(symbol)
if not dry_run and symbol in redis_contracts:
redis_client = Aioredis.from_url(redis_url, decode_responses=True)
try:
await redis_client.srem(redis_key, symbol)
logger.info("已从 Redis 平仓集合移除: {}", symbol)
except Exception as e:
logger.warning("从 Redis 移除 {} 失败: {}", symbol, e)
finally:
await redis_client.aclose()
# 仅对「建立连接」阶段加超时,避免网络不可达时 async with socket 无限挂起
try:
s = await asyncio.wait_for(socket.__aenter__(), timeout=ws_connection_timeout)
except asyncio.TimeoutError:
logger.error(
"WebSocket 连接超时({}s请检查网络或 Binance 可访问性(如需代理可配置 https_proxy",
ws_connection_timeout,
)
return
except Exception as e:
logger.exception("WebSocket 连接失败: {}", e)
return
try:
logger.info("WS 已连接,监听 {} 个交易对: {}", len(symbols_to_close), sorted(symbols_to_close))
while True:
try:
msg = await asyncio.wait_for(s.recv(), timeout=300)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.exception("WS 接收异常: {}", e)
break
if not msg:
continue
await on_ticker_message(msg)
if symbols_order_placed >= symbols_to_close:
logger.info("所有需平仓交易对已下单,退出监听")
break
finally:
await socket.__aexit__(None, None, None)
async def main_async() -> None:
from binance import AsyncClient as BinanceAsyncClient
from loguru import logger
from redis.asyncio import Redis as Aioredis
from config import settings
api_key = (getattr(settings, "binance_api_key", None) or "").strip()
api_secret = (getattr(settings, "binance_api_secret", None) or "").strip()
if not api_key or not api_secret:
logger.error("请在 .secrets.toml 或环境变量中设置 binance_api_key / binance_api_secret")
return
import os
env_dry = os.environ.get("DRY_RUN", "").strip().lower()
if env_dry in ("0", "false", "no"):
dry_run = False
elif env_dry in ("1", "true", "yes"):
dry_run = True
else:
dry_run = bool(getattr(settings, "dry_run", True)) # 默认 dry-run
if dry_run:
logger.info("【DRY-RUN】仅测试全流程不会真实下单、不会从 Redis 移除")
base_url = getattr(settings, "binance_base_url", None) or "https://fapi.binance.com"
testnet = "testnet" in str(base_url).lower()
redis_url = getattr(settings, "redis_url", None) or "redis://localhost:6379/0"
threshold = float(getattr(settings, "notional_threshold", None) or 50)
redis_key = getattr(settings, "redis_close_key", None) or "close_position:contracts"
ws_connection_timeout = float(getattr(settings, "ws_connection_timeout", None) or 30)
client = await BinanceAsyncClient.create(
api_key=api_key,
api_secret=api_secret
)
try:
positions = await get_positions(client)
by_symbol = group_positions_by_symbol(positions)
try:
redis_client = Aioredis.from_url(redis_url, decode_responses=True)
redis_contracts = set(await redis_client.smembers(redis_key) or [])
await redis_client.aclose()
except Exception as e:
logger.warning("读取 Redis 平仓集合失败,仅按名义价值判断: {}", e)
redis_contracts = set()
symbols_to_close, by_symbol_filtered = get_symbols_to_close(
by_symbol, threshold, redis_contracts
)
for symbol, sym_positions in by_symbol_filtered.items():
long_n, short_n = get_notional_by_side(sym_positions)
profit = get_unrealized_profit(sym_positions)
logger.info(
"需平仓: symbol={} long_notional={:.2f} short_notional={:.2f} unrealized_profit={:.2f} in_redis={}",
symbol,
long_n,
short_n,
profit,
symbol in redis_contracts,
)
if not symbols_to_close:
logger.info("当前无需要平仓的交易对")
return
precisions = await get_symbol_precision(client)
await run_ws_listener(
client=client,
symbols_to_close=symbols_to_close,
by_symbol=by_symbol_filtered,
precisions=precisions,
redis_key=redis_key,
redis_url=redis_url,
redis_contracts=redis_contracts,
dry_run=dry_run,
ws_connection_timeout=ws_connection_timeout,
)
finally:
await client.close_connection()
def main() -> None:
asyncio.run(main_async())
if __name__ == "__main__":
main()