perf: parallelize compute_signals with ThreadPoolExecutor
Strategies perform independent network I/O (yfinance calls via _data helpers), so running them concurrently in a thread pool reduces wall-clock time from O(n) to O(n/workers). Addresses code review feedback on sequential strategy computation.
This commit is contained in:
parent
fda4d20ca1
commit
32956522a5
|
|
@ -5,12 +5,15 @@ from __future__ import annotations
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
import pkgutil
|
import pkgutil
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from .base import BaseStrategy, Role, StrategySignal
|
from .base import BaseStrategy, Role, StrategySignal
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_MAX_WORKERS = 4 # cap threads; strategies do network I/O, not CPU work
|
||||||
|
|
||||||
_registry: list[BaseStrategy] = []
|
_registry: list[BaseStrategy] = []
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -49,19 +52,32 @@ def reset_registry() -> None:
|
||||||
_registry.clear()
|
_registry.clear()
|
||||||
|
|
||||||
|
|
||||||
|
def _run_strategy(
|
||||||
|
strategy: BaseStrategy, ticker: str, date: str, context: dict[str, Any] | None,
|
||||||
|
) -> StrategySignal | None:
|
||||||
|
"""Execute a single strategy, returning None on failure."""
|
||||||
|
try:
|
||||||
|
return strategy.compute(ticker, date, context)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("Strategy %s failed for %s@%s", strategy.name, ticker, date, exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def compute_signals(
|
def compute_signals(
|
||||||
ticker: str, date: str, context: dict[str, Any] | None = None
|
ticker: str, date: str, context: dict[str, Any] | None = None
|
||||||
) -> list[StrategySignal]:
|
) -> list[StrategySignal]:
|
||||||
"""Run every registered strategy and collect non-None signals."""
|
"""Run every registered strategy in parallel and collect non-None signals."""
|
||||||
_discover()
|
_discover()
|
||||||
signals: list[StrategySignal] = []
|
signals: list[StrategySignal] = []
|
||||||
for strategy in _registry:
|
with ThreadPoolExecutor(max_workers=min(_MAX_WORKERS, len(_registry) or 1)) as pool:
|
||||||
try:
|
futures = {
|
||||||
sig = strategy.compute(ticker, date, context)
|
pool.submit(_run_strategy, s, ticker, date, context): s
|
||||||
|
for s in _registry
|
||||||
|
}
|
||||||
|
for fut in as_completed(futures):
|
||||||
|
sig = fut.result()
|
||||||
if sig is not None:
|
if sig is not None:
|
||||||
signals.append(sig)
|
signals.append(sig)
|
||||||
except Exception:
|
|
||||||
logger.warning("Strategy %s failed for %s@%s", strategy.name, ticker, date, exc_info=True)
|
|
||||||
return signals
|
return signals
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue