Files
bn-pc/position_closer.py
yhydev 48d31cd1d0 移除 Redis、精简配置,新增 Docker 支持
- position_closer: 去掉 Redis 依赖,平仓条件仅名义+未实现盈亏
- requirements: 移除 redis
- settings.toml: 仅保留实际使用的配置项
- 新增 Dockerfile(仅安装依赖)、docker-compose(挂载代码与配置)
- 新增 .dockerignore、.gitignore(含 nohup.log)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-04 10:06:30 +08:00

461 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安永续合约定时平仓脚本。
平仓触发条件:
- 多空名义价值总和 ≤ 20 USDT 且未实现盈亏 > 0.05。
流程:
1. 获取当前持仓,得到需要平仓的交易对;
2. 创建 BinanceSocketManager监听需平仓合约收到 WS 事件后按规则创建平仓订单。
平仓规则:多头限价卖出 = 当前价 * 1.003,空头限价买入 = 当前价 * 0.997。
纯逻辑为顶层函数便于单元测试from position_closer import round_to_step, ...
I/O 与编排在下方 async 函数中。
"""
from __future__ import annotations
import asyncio
from decimal import Decimal
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from binance import AsyncClient as BinanceAsyncClient
# 第三方与 I/O 依赖在下方 async 函数内按需导入,便于单测时只导入纯逻辑而不触发 binance
# ---------- 纯逻辑(无 I/O可单独单元测试----------
LONG_CLOSE_PRICE_RATIO = Decimal("1.003")
SHORT_CLOSE_PRICE_RATIO = Decimal("0.997")
def round_to_step(value: Decimal, step: str) -> str:
"""按 step 精度舍入。"""
step_d = Decimal(step)
if step_d <= 0:
return str(value)
q = (value / step_d).quantize(Decimal("1"), rounding="ROUND_DOWN")
return str((q * step_d).normalize())
def round_price_to_tick(price: Decimal, tick_size: str) -> str:
"""
按合约最小价格精度tickSize优化价格返回符合交易所要求的字符串。
不同合约的 price_filtertickSize不同必须舍入到其整数倍否则下单会报错。
"""
tick = Decimal(tick_size)
if tick <= 0:
return str(price)
rounded = (price / tick).quantize(Decimal("1"), rounding="ROUND_DOWN") * tick
# 避免科学计数法(如 1E+2保证 API 接受
s = f"{rounded:.10f}".rstrip("0").rstrip(".")
return s if s else "0"
def group_positions_by_symbol(positions: list[dict]) -> dict[str, list[dict]]:
"""按 symbol 分组。"""
by_symbol: dict[str, list[dict]] = {}
for p in positions:
sym = p.get("symbol", "")
if sym:
by_symbol.setdefault(sym, []).append(p)
return by_symbol
def filter_nonzero_positions(positions: list[dict]) -> list[dict]:
"""只保留 positionAmt 非零的持仓。"""
return [
p for p in positions
if float(p.get("positionAmt", 0) or 0) != 0
]
def get_notional_by_side(positions: list[dict]) -> tuple[float, float]:
"""计算该合约下多头、空头的名义价值(绝对值)。"""
long_notional = 0.0
short_notional = 0.0
for p in positions:
amt = float(p.get("positionAmt", 0) or 0)
notional = abs(float(p.get("notional", 0) or 0))
side = (p.get("positionSide") or "BOTH").upper()
if side == "BOTH":
if amt > 0:
long_notional += notional
else:
short_notional += notional
elif side == "LONG":
long_notional += notional
else:
short_notional += notional
return long_notional, short_notional
def get_unrealized_profit(positions: list[dict]) -> float:
"""计算该合约下所有持仓的未实现盈亏总和(币安字段 unrealizedProfit"""
total = 0.0
for p in positions:
total += float(p.get("unrealizedProfit", 0) or 0)
return total
def should_close_symbol(
long_notional: float,
short_notional: float,
unrealized_profit: float,
notional_threshold: float = 20,
min_profit: float = 0.05,
) -> bool:
"""
是否对该合约执行平仓。
条件:多空名义价值总和 ≤ notional_threshold 且 未实现盈亏 > min_profit。
"""
total_notional = long_notional + short_notional
return total_notional <= notional_threshold and unrealized_profit > min_profit
def get_symbols_to_close(
by_symbol: dict[str, list[dict]],
notional_threshold: float = 20,
min_profit: float = 0.05,
) -> tuple[set[str], dict[str, list[dict]]]:
"""
根据持仓得到需要平仓的交易对及其持仓子集。
平仓条件:多空名义价值总和 ≤ notional_threshold 且 未实现盈亏 > min_profit。
返回 (symbols_to_close, by_symbol_filtered)。
"""
symbols_to_close: set[str] = set()
for symbol, sym_positions in by_symbol.items():
long_n, short_n = get_notional_by_side(sym_positions)
unrealized_profit = get_unrealized_profit(sym_positions)
if should_close_symbol(
long_n, short_n, unrealized_profit,
notional_threshold=notional_threshold,
min_profit=min_profit,
):
symbols_to_close.add(symbol)
by_symbol_filtered = {k: v for k, v in by_symbol.items() if k in symbols_to_close}
return symbols_to_close, by_symbol_filtered
def parse_ticker_message(msg: dict) -> tuple[str | None, Decimal | None]:
"""
解析 WS 组合流 ticker 消息,提取 symbol 与最新价。
返回 (symbol, price) 或 (None, None) / (symbol, None)。
"""
stream = msg.get("stream", "")
symbol = stream.upper().split("@")[0] if stream else None
if not symbol:
return None, None
data = msg.get("data", {})
price_str = data.get("c")
if not price_str:
return symbol, None
try:
return symbol, Decimal(str(price_str))
except Exception:
return symbol, None
def parse_exchange_info_to_precisions(info: dict) -> dict[str, dict[str, str]]:
"""从 futures exchangeInfo 解析出各 symbol 的 lot_size、price_filter。"""
result = {}
for s in info.get("symbols", []):
sym = s.get("symbol", "")
if not sym:
continue
lot_size = "0.01"
price_filter = "0.01"
for f in s.get("filters", []):
if f.get("filterType") == "LOT_SIZE":
lot_size = f.get("stepSize", "0.01")
elif f.get("filterType") == "PRICE_FILTER":
price_filter = f.get("tickSize", "0.01")
result[sym] = {"lot_size": lot_size, "price_filter": price_filter}
return result
def build_close_order_params(
symbol: str,
position_side: str,
position_amt: float,
current_price: Decimal,
precisions: dict[str, dict[str, str]],
reduce_only: bool = False,
long_ratio: Decimal = LONG_CLOSE_PRICE_RATIO,
short_ratio: Decimal = SHORT_CLOSE_PRICE_RATIO,
) -> dict[str, Any] | None:
"""
根据持仓与当前价构建平仓限价单参数字典(不发起请求)。
若 quantity 舍入后为 0 则返回 None。
"""
amt = abs(position_amt)
if amt <= 0:
return None
side_upper = (position_side or "BOTH").upper()
prec = precisions.get(symbol, {"lot_size": "0.001", "price_filter": "0.01"})
quantity = round_to_step(Decimal(str(amt)), prec["lot_size"])
if not quantity or Decimal(quantity) <= 0:
return None
# 使用该合约的最小价格精度tickSize优化平仓价格
tick_size = prec["price_filter"]
if side_upper == "LONG" or (side_upper == "BOTH" and position_amt > 0):
side = "SELL"
price = current_price * long_ratio
else:
side = "BUY"
price = current_price * short_ratio
price_str = round_price_to_tick(price, tick_size)
params: dict[str, Any] = {
"symbol": symbol,
"side": side,
"type": "LIMIT",
"timeInForce": "GTC",
"quantity": quantity,
"price": price_str,
}
if side_upper in ("LONG", "SHORT"):
params["positionSide"] = side_upper
if reduce_only or side_upper == "BOTH":
params["reduceOnly"] = "true"
return params
# ---------- I/O 与编排 ----------
async def get_symbol_precision(client: BinanceAsyncClient) -> dict[str, dict]:
"""获取合约的 quantity/price 精度。"""
info = await client.futures_exchange_info()
return parse_exchange_info_to_precisions(info)
async def get_positions(client: BinanceAsyncClient) -> list[dict]:
"""获取永续合约持仓(只保留有仓位的)。"""
account = await client.futures_account()
positions = account.get("positions", [])
return filter_nonzero_positions(positions)
async def place_close_order(
client: BinanceAsyncClient,
symbol: str,
position_side: str,
position_amt: float,
current_price,
precisions: dict[str, dict],
reduce_only: bool = False,
dry_run: bool = False,
) -> None:
"""下平仓限价单。dry_run=True 时仅计算并打印参数,不真实下单。"""
from loguru import logger
price = current_price if isinstance(current_price, Decimal) else Decimal(str(current_price))
params = build_close_order_params(
symbol=symbol,
position_side=position_side,
position_amt=position_amt,
current_price=price,
precisions=precisions,
reduce_only=reduce_only,
)
if not params:
logger.warning("舍入后 quantity 为 0跳过 {} {}", symbol, position_side)
return
if dry_run:
logger.info(
"【DRY-RUN】将下单: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
return
try:
await client.futures_create_order(**params)
logger.info(
"平仓单已提交: {} {} {} @ {} qty={}",
symbol,
params["side"],
position_side,
params["price"],
params["quantity"],
)
except Exception as e:
logger.exception("平仓下单失败 {}: {}", symbol, e)
async def run_ws_listener(
client: BinanceAsyncClient,
symbols_to_close: set[str],
by_symbol: dict[str, list[dict]],
precisions: dict[str, dict],
dry_run: bool = False,
ws_connection_timeout: float = 30,
) -> None:
"""
创建 BinanceSocketManager订阅需平仓合约的 ticker
收到 WS 事件后按规则下平仓单(每个 symbol 只下一次)。
dry_run=True 时只打印将下的单,不真实下单。
ws_connection_timeout: 建立 WebSocket 连接的超时(秒),超时则退出,避免 async with socket 处无限挂起。
"""
from binance.enums import FuturesType
from binance.ws.streams import BinanceSocketManager
from loguru import logger
if not symbols_to_close:
logger.info("无需平仓的交易对,退出 WS 监听")
return
bsm = BinanceSocketManager(client)
streams = [f"{s.lower()}@ticker" for s in symbols_to_close]
socket = bsm.futures_multiplex_socket(streams, futures_type=FuturesType.USD_M)
symbols_order_placed: set[str] = set()
async def on_ticker_message(msg: dict) -> None:
symbol, current_price = parse_ticker_message(msg)
if not symbol or symbol not in symbols_to_close or symbol in symbols_order_placed:
return
if current_price is None:
return
sym_positions = by_symbol.get(symbol, [])
if not sym_positions:
return
logger.info("收到 {} ticker价格 {},执行平仓", symbol, current_price)
for p in sym_positions:
amt = float(p.get("positionAmt", 0) or 0)
if amt == 0:
continue
pos_side = p.get("positionSide") or "BOTH"
await place_close_order(
client, symbol, pos_side, amt, current_price, precisions,
reduce_only=(pos_side == "BOTH"),
dry_run=dry_run,
)
symbols_order_placed.add(symbol)
# 仅对「建立连接」阶段加超时,避免网络不可达时 async with socket 无限挂起
try:
s = await asyncio.wait_for(socket.__aenter__(), timeout=ws_connection_timeout)
except asyncio.TimeoutError:
logger.error(
"WebSocket 连接超时({}s请检查网络或 Binance 可访问性(如需代理可配置 https_proxy",
ws_connection_timeout,
)
return
except Exception as e:
logger.exception("WebSocket 连接失败: {}", e)
return
try:
logger.info("WS 已连接,监听 {} 个交易对: {}", len(symbols_to_close), sorted(symbols_to_close))
while True:
try:
msg = await asyncio.wait_for(s.recv(), timeout=300)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.exception("WS 接收异常: {}", e)
break
if not msg:
continue
await on_ticker_message(msg)
if symbols_order_placed >= symbols_to_close:
logger.info("所有需平仓交易对已下单,退出监听")
break
finally:
await socket.__aexit__(None, None, None)
async def main_async() -> None:
from binance import AsyncClient as BinanceAsyncClient
from loguru import logger
from config import settings
api_key = (getattr(settings, "binance_api_key", None) or "").strip()
api_secret = (getattr(settings, "binance_api_secret", None) or "").strip()
if not api_key or not api_secret:
logger.error("请在 .secrets.toml 或环境变量中设置 binance_api_key / binance_api_secret")
return
import os
env_dry = os.environ.get("DRY_RUN", "").strip().lower()
if env_dry in ("0", "false", "no"):
dry_run = False
elif env_dry in ("1", "true", "yes"):
dry_run = True
else:
dry_run = bool(getattr(settings, "dry_run", True)) # 默认 dry-run
if dry_run:
logger.info("【DRY-RUN】仅测试全流程不会真实下单")
base_url = getattr(settings, "binance_base_url", None) or "https://fapi.binance.com"
testnet = "testnet" in str(base_url).lower()
notional_threshold = float(getattr(settings, "notional_close_threshold", None) or 20)
min_profit = float(getattr(settings, "close_min_profit", None) or 0.05)
ws_connection_timeout = float(getattr(settings, "ws_connection_timeout", None) or 30)
client = await BinanceAsyncClient.create(
api_key=api_key,
api_secret=api_secret
)
try:
positions = await get_positions(client)
by_symbol = group_positions_by_symbol(positions)
logger.info("当前持仓合约数: {}", len(by_symbol))
by_symbol_sorted = sorted(
by_symbol.items(),
key=lambda x: get_unrealized_profit(x[1]),
reverse=True,
)
for symbol, sym_positions in by_symbol_sorted:
unrealized = get_unrealized_profit(sym_positions)
long_n, short_n = get_notional_by_side(sym_positions)
logger.info(
"合约 {} 未实现盈亏={:.2f} 多头名义={:.2f} 空头名义={:.2f}",
symbol, unrealized, long_n, short_n,
)
symbols_to_close, by_symbol_filtered = get_symbols_to_close(
by_symbol,
notional_threshold=notional_threshold,
min_profit=min_profit,
)
for symbol, sym_positions in by_symbol_filtered.items():
long_n, short_n = get_notional_by_side(sym_positions)
profit = get_unrealized_profit(sym_positions)
logger.info(
"需平仓: symbol={} long_notional={:.2f} short_notional={:.2f} unrealized_profit={:.2f}",
symbol,
long_n,
short_n,
profit,
)
if not symbols_to_close:
logger.info("当前无需要平仓的交易对")
return
precisions = await get_symbol_precision(client)
await run_ws_listener(
client=client,
symbols_to_close=symbols_to_close,
by_symbol=by_symbol_filtered,
precisions=precisions,
dry_run=dry_run,
ws_connection_timeout=ws_connection_timeout,
)
finally:
await client.close_connection()
def main() -> None:
asyncio.run(main_async())
if __name__ == "__main__":
main()