From 7a03c29330ec178bdeceb239687e15734291f967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=B0=91=E6=9D=B0?= Date: Thu, 9 Apr 2026 21:44:34 +0800 Subject: [PATCH] feat(orchestrator): implement QuantRunner with BollingerStrategy signal generation --- orchestrator/quant_runner.py | 164 +++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 orchestrator/quant_runner.py diff --git a/orchestrator/quant_runner.py b/orchestrator/quant_runner.py new file mode 100644 index 00000000..87102b05 --- /dev/null +++ b/orchestrator/quant_runner.py @@ -0,0 +1,164 @@ +import json +import logging +import sqlite3 +import sys +from datetime import datetime, timezone, timedelta +from typing import Any + +import yfinance as yf + +from orchestrator.config import OrchestratorConfig +from orchestrator.signals import Signal + +logger = logging.getLogger(__name__) + + +class QuantRunner: + def __init__(self, config: OrchestratorConfig): + if not config.quant_backtest_path: + raise ValueError("OrchestratorConfig.quant_backtest_path must be set") + self._config = config + path = config.quant_backtest_path + if path not in sys.path: + sys.path.insert(0, path) + + def get_signal(self, ticker: str, date: str) -> Signal: + """ + 获取指定股票在指定日期的量化信号。 + date 格式:'YYYY-MM-DD' + 返回 Signal(source="quant") + """ + result = self._load_best_params(ticker) + params: dict = result["params"] + sharpe: float = result["sharpe_ratio"] + + # 获取 date 前 60 天的历史数据 + end_dt = datetime.strptime(date, "%Y-%m-%d") + start_dt = end_dt - timedelta(days=60) + start_str = start_dt.strftime("%Y-%m-%d") + + df = yf.download(ticker, start=start_str, end=date, progress=False, auto_adjust=True) + if df.empty: + logger.warning("No price data for %s between %s and %s", ticker, start_str, date) + return Signal( + ticker=ticker, + direction=0, + confidence=0.0, + source="quant", + timestamp=datetime.now(timezone.utc), + metadata={"reason": "no_data"}, + ) + + # 标准化列名为小写 + df.columns = [c[0].lower() if isinstance(c, tuple) else c.lower() for c in df.columns] + + # 用最佳参数创建 BollingerStrategy 实例 + from strategies.momentum import BollingerStrategy + from core.data_models import Bar, OrderDirection + + strategy = BollingerStrategy( + period=params.get("period", 20), + num_std=params.get("num_std", 2.0), + position_pct=params.get("position_pct", 0.20), + stop_loss_pct=params.get("stop_loss_pct", 0.05), + take_profit_pct=params.get("take_profit_pct", 0.15), + ) + + # 逐 bar 喂给策略,模拟历史回放 + direction = 0 + context: dict[str, Any] = {"positions": {}} + + for ts, row in df.iterrows(): + bar = Bar( + symbol=ticker, + timestamp=ts.to_pydatetime() if hasattr(ts, "to_pydatetime") else ts, + open=float(row["open"]), + high=float(row["high"]), + low=float(row["low"]), + close=float(row["close"]), + volume=float(row.get("volume", 0)), + ) + orders = strategy.on_bar(bar, context) + # 更新模拟持仓 + for order in orders: + if order.direction == OrderDirection.BUY: + context["positions"][ticker] = order.volume + elif order.direction == OrderDirection.SELL: + context["positions"][ticker] = 0 + + # 最后一个 bar 的信号 + last_orders = orders if df.shape[0] > 0 else [] + for order in last_orders: + if order.direction == OrderDirection.BUY: + direction = 1 + break + elif order.direction == OrderDirection.SELL: + direction = -1 + break + + # 计算 max_sharpe(从 DB 中取全局最大值) + db_path = f"{self._config.quant_backtest_path}/research_results/runs.db" + try: + conn = sqlite3.connect(db_path) + cur = conn.cursor() + cur.execute("SELECT MAX(sharpe_ratio) FROM backtest_results") + row = cur.fetchone() + max_sharpe = float(row[0]) if row and row[0] is not None else sharpe + conn.close() + except Exception: + max_sharpe = sharpe + + confidence = self._calc_confidence(sharpe, max_sharpe) + + return Signal( + ticker=ticker, + direction=direction, + confidence=confidence, + source="quant", + timestamp=datetime.now(timezone.utc), + metadata={"params": params, "sharpe_ratio": sharpe, "max_sharpe": max_sharpe}, + ) + + def _load_best_params(self, ticker: str) -> dict: + """ + 直接查 SQLite 获取 BollingerStrategy 最佳参数。 + strategy_type 支持 'BollingerStrategy' 和 'bollinger'(兼容两种写法)。 + """ + db_path = f"{self._config.quant_backtest_path}/research_results/runs.db" + conn = sqlite3.connect(db_path) + try: + cur = conn.cursor() + # 先按规格查 'BollingerStrategy',再 fallback 到 'bollinger' + cur.execute( + """ + SELECT params, sharpe_ratio + FROM backtest_results + WHERE strategy_type IN ('BollingerStrategy', 'bollinger') + ORDER BY sharpe_ratio DESC + LIMIT 1 + """, + ) + row = cur.fetchone() + finally: + conn.close() + + if row is None: + raise ValueError( + "No BollingerStrategy results found in ResultStore. " + "Run optimization first: python quant_backtest/run_research.py" + ) + + params = json.loads(row[0]) if isinstance(row[0], str) else row[0] + return {"params": params, "sharpe_ratio": float(row[1])} + + def _calc_confidence(self, sharpe: float, max_sharpe: float) -> float: + """ + Sharpe 归一化为置信度。 + - max_sharpe=0 时返回 0.5(默认值,避免除零) + - sharpe/max_sharpe 上限截断到 1.0 + - 下限截断到 0.0(负 Sharpe 不产生负置信度) + """ + if max_sharpe == 0: + return 0.5 + ratio = sharpe / max_sharpe + return max(0.0, min(1.0, ratio))