Files
bn-pc/position_closer.py
2026-02-05 11:32:10 +08:00

546 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安永续合约定时平仓脚本。
平仓触发条件(满足其一即平仓):
- 多空名义价值总和 ≤ 20 USDT 且未实现盈亏 > 0.05
- 未实现盈利 > 10 USDT
- 未实现亏损 > 3 USDT即未实现盈亏 < -3
流程:
1. 获取当前持仓,得到需要平仓的交易对;
2. 创建 BinanceSocketManager监听需平仓合约收到 WS 事件后按规则创建平仓订单。
平仓规则:多头限价卖出 = 当前价 * 1.003,空头限价买入 = 当前价 * 0.997。
纯逻辑为顶层函数便于单元测试from position_closer import round_to_step, ...
I/O 与编排在下方 async 函数中。
"""
from __future__ import annotations
import asyncio
from decimal import Decimal
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from binance import AsyncClient as BinanceAsyncClient
# 第三方与 I/O 依赖在下方 async 函数内按需导入,便于单测时只导入纯逻辑而不触发 binance
# ---------- 纯逻辑(无 I/O可单独单元测试----------
LONG_CLOSE_PRICE_RATIO = Decimal("1.003")
SHORT_CLOSE_PRICE_RATIO = Decimal("0.997")
def round_to_step(value: Decimal, step: str) -> str:
"""按 step 精度舍入。返回普通小数/整数字符串,避免科学计数法(如 5E+1否则币安 API 会报签名错误。"""
step_d = Decimal(step)
if step_d <= 0:
return str(value)
q = (value / step_d).quantize(Decimal("1"), rounding="ROUND_DOWN")
result = (q * step_d).normalize()
s = f"{result:.10f}".rstrip("0").rstrip(".")
return s if s else "0"
def round_price_to_tick(price: Decimal, tick_size: str) -> str:
"""
按合约最小价格精度tickSize优化价格返回符合交易所要求的字符串。
不同合约的 price_filtertickSize不同必须舍入到其整数倍否则下单会报错。
"""
tick = Decimal(tick_size)
if tick <= 0:
return str(price)
rounded = (price / tick).quantize(Decimal("1"), rounding="ROUND_DOWN") * tick
# 避免科学计数法(如 1E+2保证 API 接受
s = f"{rounded:.10f}".rstrip("0").rstrip(".")
return s if s else "0"
def group_positions_by_symbol(positions: list[dict]) -> dict[str, list[dict]]:
"""按 symbol 分组。"""
by_symbol: dict[str, list[dict]] = {}
for p in positions:
sym = p.get("symbol", "")
if sym:
by_symbol.setdefault(sym, []).append(p)
return by_symbol
def filter_nonzero_positions(positions: list[dict]) -> list[dict]:
"""只保留 positionAmt 非零的持仓。"""
return [
p for p in positions
if float(p.get("positionAmt", 0) or 0) != 0
]
def get_notional_by_side(positions: list[dict]) -> tuple[float, float]:
"""计算该合约下多头、空头的名义价值(绝对值)。"""
long_notional = 0.0
short_notional = 0.0
for p in positions:
amt = float(p.get("positionAmt", 0) or 0)
notional = abs(float(p.get("notional", 0) or 0))
side = (p.get("positionSide") or "BOTH").upper()
if side == "BOTH":
if amt > 0:
long_notional += notional
else:
short_notional += notional
elif side == "LONG":
long_notional += notional
else:
short_notional += notional
return long_notional, short_notional
def get_unrealized_profit(positions: list[dict]) -> float:
"""计算该合约下所有持仓的未实现盈亏总和(币安字段 unrealizedProfit"""
total = 0.0
for p in positions:
total += float(p.get("unrealizedProfit", 0) or 0)
return total
def should_close_symbol(
long_notional: float,
short_notional: float,
unrealized_profit: float,
notional_threshold: float = 20,
min_profit: float = 0.05,
min_profit_large: float = 10,
max_loss: float = 3,
) -> bool:
"""
是否对该合约执行平仓。
条件(满足其一即平仓):
- 多空名义价值总和 ≤ notional_threshold 且 未实现盈亏 > min_profit
- 未实现盈利 > min_profit_large
- 未实现亏损 > max_loss即 unrealized_profit < -max_loss
"""
if unrealized_profit < -max_loss:
return True
if unrealized_profit > min_profit_large:
return True
total_notional = long_notional + short_notional
return total_notional <= notional_threshold and unrealized_profit > min_profit
def get_symbols_to_close(
by_symbol: dict[str, list[dict]],
notional_threshold: float = 20,
min_profit: float = 0.05,
min_profit_large: float = 10,
max_loss: float = 3,
) -> tuple[set[str], dict[str, list[dict]]]:
"""
根据持仓得到需要平仓的交易对及其持仓子集。
平仓条件:名义≤阈值且盈利>min_profit或 盈利>min_profit_large或 亏损>max_loss。
返回 (symbols_to_close, by_symbol_filtered)。
"""
symbols_to_close: set[str] = set()
for symbol, sym_positions in by_symbol.items():
long_n, short_n = get_notional_by_side(sym_positions)
unrealized_profit = get_unrealized_profit(sym_positions)
if should_close_symbol(
long_n, short_n, unrealized_profit,
notional_threshold=notional_threshold,
min_profit=min_profit,
min_profit_large=min_profit_large,
max_loss=max_loss,
):
symbols_to_close.add(symbol)
by_symbol_filtered = {k: v for k, v in by_symbol.items() if k in symbols_to_close}
return symbols_to_close, by_symbol_filtered
def parse_ticker_message(msg: dict) -> tuple[str | None, Decimal | None]:
"""
解析 WS 组合流 ticker 消息,提取 symbol 与最新价。
返回 (symbol, price) 或 (None, None) / (symbol, None)。
"""
stream = msg.get("stream", "")
symbol = stream.upper().split("@")[0] if stream else None
if not symbol:
return None, None
data = msg.get("data", {})
price_str = data.get("c")
if not price_str:
return symbol, None
try:
return symbol, Decimal(str(price_str))
except Exception:
return symbol, None
def parse_exchange_info_to_precisions(info: dict) -> dict[str, dict[str, str]]:
"""从 futures exchangeInfo 解析出各 symbol 的 lot_size、price_filter。"""
result = {}
for s in info.get("symbols", []):
sym = s.get("symbol", "")
if not sym:
continue
lot_size = "0.01"
price_filter = "0.01"
for f in s.get("filters", []):
if f.get("filterType") == "LOT_SIZE":
lot_size = f.get("stepSize", "0.01")
elif f.get("filterType") == "PRICE_FILTER":
price_filter = f.get("tickSize", "0.01")
result[sym] = {"lot_size": lot_size, "price_filter": price_filter}
return result
def build_close_order_params(
symbol: str,
position_side: str,
position_amt: float,
current_price: Decimal,
precisions: dict[str, dict[str, str]],
reduce_only: bool = False,
long_ratio: Decimal = LONG_CLOSE_PRICE_RATIO,
short_ratio: Decimal = SHORT_CLOSE_PRICE_RATIO,
) -> dict[str, Any] | None:
"""
根据持仓与当前价构建平仓限价单参数字典(不发起请求)。
若 quantity 舍入后为 0 则返回 None。
"""
amt = abs(position_amt)
if amt <= 0:
return None
side_upper = (position_side or "BOTH").upper()
prec = precisions.get(symbol, {"lot_size": "0.001", "price_filter": "0.01"})
quantity = round_to_step(Decimal(str(amt)), prec["lot_size"])
if not quantity or Decimal(quantity) <= 0:
return None
# 使用该合约的最小价格精度tickSize优化平仓价格
tick_size = prec["price_filter"]
if side_upper == "LONG" or (side_upper == "BOTH" and position_amt > 0):
side = "SELL"
price = current_price * long_ratio
else:
side = "BUY"
price = current_price * short_ratio
price_str = round_price_to_tick(price, tick_size)
params: dict[str, Any] = {
"symbol": symbol,
"side": side,
"type": "LIMIT",
"timeInForce": "GTC",
"quantity": quantity,
"price": price_str,
}
# 双向持仓(对冲)模式必须传 positionSide否则报 -4061
if side_upper in ("LONG", "SHORT"):
params["positionSide"] = side_upper
elif side_upper == "BOTH":
# 单向模式下 API 可能仍返回 BOTH对冲模式下根据持仓方向推断
params["positionSide"] = "LONG" if position_amt > 0 else "SHORT"
if reduce_only or side_upper == "BOTH":
params["reduceOnly"] = "true"
return params
# ---------- I/O 与编排 ----------
def _normalize_api_credentials(api_key: str, api_secret: str) -> tuple[str, str]:
"""去除首尾空白、引号、换行,避免 toml/环境变量带入导致签名错误。"""
key = (api_key or "").strip().strip('"').strip("'").replace("\r", "").replace("\n", "")
secret = (api_secret or "").strip().strip('"').strip("'").replace("\r", "").replace("\n", "")
return key, secret
async def create_futures_client(
api_key: str,
api_secret: str,
testnet: bool = False,
recv_window: int = 60000,
verbose: bool = False,
https_proxy: str | None = None,
):
"""
创建仅使用合约接口初始化的客户端,避免 create() 调用现货 ping/time 导致
「仅启用合约」的 API Key 报 Signature for this request is not valid。
recv_window: 放宽时间窗(毫秒),本机时间有偏差时可增大,默认 60 秒。
"""
import time
from binance import AsyncClient as BinanceAsyncClient
from loguru import logger
key, secret = _normalize_api_credentials(api_key, api_secret)
if not key or not secret:
raise ValueError("api_key / api_secret 不能为空")
client = BinanceAsyncClient(
api_key=key,
api_secret=secret,
testnet=testnet,
verbose=verbose,
https_proxy=https_proxy,
)
try:
await client.futures_ping()
res = await client.futures_time()
client.timestamp_offset = res["serverTime"] - int(time.time() * 1000)
client.REQUEST_RECVWINDOW = recv_window
logger.debug(
"合约服务器时间差 {} msrecvWindow={} ms",
client.timestamp_offset,
recv_window,
)
return client
except Exception:
await client.close_connection()
raise
async def get_symbol_precision(client: BinanceAsyncClient) -> dict[str, dict]:
"""获取合约的 quantity/price 精度。"""
info = await client.futures_exchange_info()
return parse_exchange_info_to_precisions(info)
async def get_positions(client: BinanceAsyncClient) -> list[dict]:
"""获取永续合约持仓(只保留有仓位的)。"""
account = await client.futures_account()
positions = account.get("positions", [])
return filter_nonzero_positions(positions)
async def place_close_order(
client: BinanceAsyncClient,
symbol: str,
position_side: str,
position_amt: float,
current_price,
precisions: dict[str, dict],
reduce_only: bool = False,
dry_run: bool = False,
) -> None:
"""下平仓限价单。dry_run=True 时仅计算并打印参数,不真实下单。"""
from loguru import logger
price = current_price if isinstance(current_price, Decimal) else Decimal(str(current_price))
params = build_close_order_params(
symbol=symbol,
position_side=position_side,
position_amt=position_amt,
current_price=price,
precisions=precisions,
reduce_only=reduce_only,
)
if not params:
logger.warning("舍入后 quantity 为 0跳过 {} {}", symbol, position_side)
return
if dry_run:
logger.info(
"【DRY-RUN】将下单: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
return
try:
await client.futures_create_order(**params)
logger.info(
"平仓单已提交: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
except Exception as e:
logger.exception("平仓下单失败 {}: {}", symbol, e)
async def run_ws_listener(
client: BinanceAsyncClient,
symbols_to_close: set[str],
by_symbol: dict[str, list[dict]],
precisions: dict[str, dict],
dry_run: bool = False,
ws_connection_timeout: float = 30,
) -> None:
"""
创建 BinanceSocketManager订阅需平仓合约的 ticker
收到 WS 事件后按规则下平仓单(每个 symbol 只下一次)。
dry_run=True 时只打印将下的单,不真实下单。
ws_connection_timeout: 建立 WebSocket 连接的超时(秒),超时则退出,避免 async with socket 处无限挂起。
"""
from binance.enums import FuturesType
from binance.ws.streams import BinanceSocketManager
from loguru import logger
if not symbols_to_close:
logger.info("无需平仓的交易对,退出 WS 监听")
return
bsm = BinanceSocketManager(client)
streams = [f"{s.lower()}@ticker" for s in symbols_to_close]
socket = bsm.futures_multiplex_socket(streams, futures_type=FuturesType.USD_M)
symbols_order_placed: set[str] = set()
async def on_ticker_message(msg: dict) -> None:
symbol, current_price = parse_ticker_message(msg)
if not symbol or symbol not in symbols_to_close or symbol in symbols_order_placed:
return
if current_price is None:
return
sym_positions = by_symbol.get(symbol, [])
if not sym_positions:
return
logger.info("收到 {} ticker价格 {},执行平仓", symbol, current_price)
for p in sym_positions:
amt = float(p.get("positionAmt", 0) or 0)
if amt == 0:
continue
pos_side = p.get("positionSide") or "BOTH"
await place_close_order(
client, symbol, pos_side, amt, current_price, precisions,
reduce_only=(pos_side == "BOTH"),
dry_run=dry_run,
)
symbols_order_placed.add(symbol)
# 仅对「建立连接」阶段加超时,避免网络不可达时 async with socket 无限挂起
try:
s = await asyncio.wait_for(socket.__aenter__(), timeout=ws_connection_timeout)
except asyncio.TimeoutError:
logger.error(
"WebSocket 连接超时({}s请检查网络或 Binance 可访问性(如需代理可配置 https_proxy",
ws_connection_timeout,
)
return
except Exception as e:
logger.exception("WebSocket 连接失败: {}", e)
return
try:
logger.info("WS 已连接,监听 {} 个交易对: {}", len(symbols_to_close), sorted(symbols_to_close))
while True:
try:
msg = await asyncio.wait_for(s.recv(), timeout=300)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.exception("WS 接收异常: {}", e)
break
if not msg:
continue
await on_ticker_message(msg)
if symbols_order_placed >= symbols_to_close:
logger.info("所有需平仓交易对已下单,退出监听")
break
finally:
await socket.__aexit__(None, None, None)
async def main_async() -> None:
from binance import AsyncClient as BinanceAsyncClient
from loguru import logger
from config import settings
api_key = getattr(settings, "binance_api_key", None) or ""
api_secret = getattr(settings, "binance_api_secret", None) or ""
if not (api_key and api_secret):
logger.error("请在 .secrets.toml 或环境变量中设置 binance_api_key / binance_api_secret")
return
import os
recv_window = int(getattr(settings, "binance_recv_window", None) or 60000)
verbose = bool(getattr(settings, "binance_verbose", None) or os.environ.get("BINANCE_VERBOSE", ""))
https_proxy = (getattr(settings, "https_proxy", None) or os.environ.get("https_proxy") or os.environ.get("HTTPS_PROXY")) or None
env_dry = os.environ.get("DRY_RUN", "").strip().lower()
if env_dry in ("0", "false", "no"):
dry_run = False
elif env_dry in ("1", "true", "yes"):
dry_run = True
else:
dry_run = bool(getattr(settings, "dry_run", True)) # 默认 dry-run
if dry_run:
logger.info("【DRY-RUN】仅测试全流程不会真实下单")
base_url = getattr(settings, "binance_base_url", None) or "https://fapi.binance.com"
testnet = "testnet" in str(base_url).lower()
notional_threshold = float(getattr(settings, "notional_close_threshold", None) or 20)
min_profit = float(getattr(settings, "close_min_profit", None) or 0.05)
min_profit_large = float(getattr(settings, "close_min_profit_large", None) or 10)
max_loss = float(getattr(settings, "close_max_loss", None) or 3)
ws_connection_timeout = float(getattr(settings, "ws_connection_timeout", None) or 30)
# 使用合约接口初始化recv_window 放宽时间窗,本机时间有偏差时可减少签名错误
client = await create_futures_client(
api_key=api_key,
api_secret=api_secret,
testnet=testnet,
recv_window=recv_window,
verbose=verbose,
https_proxy=https_proxy,
)
try:
positions = await get_positions(client)
by_symbol = group_positions_by_symbol(positions)
logger.info("当前持仓合约数: {}", len(by_symbol))
by_symbol_sorted = sorted(
by_symbol.items(),
key=lambda x: get_unrealized_profit(x[1]),
reverse=True,
)
for symbol, sym_positions in by_symbol_sorted:
unrealized = get_unrealized_profit(sym_positions)
long_n, short_n = get_notional_by_side(sym_positions)
logger.info(
"合约 {} 未实现盈亏={:.2f} 多头名义={:.2f} 空头名义={:.2f}",
symbol, unrealized, long_n, short_n,
)
symbols_to_close, by_symbol_filtered = get_symbols_to_close(
by_symbol,
notional_threshold=notional_threshold,
min_profit=min_profit,
min_profit_large=min_profit_large,
max_loss=max_loss,
)
for symbol, sym_positions in by_symbol_filtered.items():
long_n, short_n = get_notional_by_side(sym_positions)
profit = get_unrealized_profit(sym_positions)
logger.info(
"需平仓: symbol={} long_notional={:.2f} short_notional={:.2f} unrealized_profit={:.2f}",
symbol,
long_n,
short_n,
profit,
)
if not symbols_to_close:
logger.info("当前无需要平仓的交易对")
return
precisions = await get_symbol_precision(client)
await run_ws_listener(
client=client,
symbols_to_close=symbols_to_close,
by_symbol=by_symbol_filtered,
precisions=precisions,
dry_run=dry_run,
ws_connection_timeout=ws_connection_timeout,
)
finally:
await client.close_connection()
def main() -> None:
asyncio.run(main_async())
if __name__ == "__main__":
main()