diff --git a/tradingagents/strategies/registry.py b/tradingagents/strategies/registry.py index 2b469f68..a3c2cd0d 100644 --- a/tradingagents/strategies/registry.py +++ b/tradingagents/strategies/registry.py @@ -5,12 +5,15 @@ from __future__ import annotations import importlib import logging import pkgutil +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any from .base import BaseStrategy, Role, StrategySignal logger = logging.getLogger(__name__) +_MAX_WORKERS = 4 # cap threads; strategies do network I/O, not CPU work + _registry: list[BaseStrategy] = [] @@ -49,19 +52,32 @@ def reset_registry() -> None: _registry.clear() +def _run_strategy( + strategy: BaseStrategy, ticker: str, date: str, context: dict[str, Any] | None, +) -> StrategySignal | None: + """Execute a single strategy, returning None on failure.""" + try: + return strategy.compute(ticker, date, context) + except Exception: + logger.warning("Strategy %s failed for %s@%s", strategy.name, ticker, date, exc_info=True) + return None + + def compute_signals( ticker: str, date: str, context: dict[str, Any] | None = None ) -> list[StrategySignal]: - """Run every registered strategy and collect non-None signals.""" + """Run every registered strategy in parallel and collect non-None signals.""" _discover() signals: list[StrategySignal] = [] - for strategy in _registry: - try: - sig = strategy.compute(ticker, date, context) + with ThreadPoolExecutor(max_workers=min(_MAX_WORKERS, len(_registry) or 1)) as pool: + futures = { + pool.submit(_run_strategy, s, ticker, date, context): s + for s in _registry + } + for fut in as_completed(futures): + sig = fut.result() if sig is not None: signals.append(sig) - except Exception: - logger.warning("Strategy %s failed for %s@%s", strategy.name, ticker, date, exc_info=True) return signals