diff --git a/tradingagents/dataflows/discovery/scanners/sector_rotation.py b/tradingagents/dataflows/discovery/scanners/sector_rotation.py index dee0bd8d..15cf70b1 100644 --- a/tradingagents/dataflows/discovery/scanners/sector_rotation.py +++ b/tradingagents/dataflows/discovery/scanners/sector_rotation.py @@ -86,21 +86,58 @@ class SectorRotationScanner(BaseScanner): sector_names = [SECTOR_ETFS.get(etf, etf) for etf in accelerating_sectors] logger.info(f"Accelerating sectors: {', '.join(sector_names)}") - # Step 2: Find laggard stocks in those sectors + # Step 2: Batch-download 5-day close prices for all candidate tickers at once. + # This replaces the previous serial get_ticker_info() + download_history() loop + # which made up to max_tickers individual HTTP requests and would time out. tickers = _load_tickers_from_file(self.ticker_file) if not tickers: return [] tickers = tickers[: self.max_tickers] - candidates = [] + try: + batch_hist = download_history( + tickers, period="1mo", interval="1d", auto_adjust=True, progress=False + ) + except Exception as e: + logger.warning(f"Batch history download failed: {e}") + return [] + + if batch_hist is None or batch_hist.empty: + return [] + + # Calculate 5-day return for each ticker from the batch data + ticker_returns: Dict[str, float] = {} for ticker in tickers: - result = self._check_sector_laggard(ticker, accelerating_sectors, get_ticker_info) - if result: - candidates.append(result) + try: + if isinstance(batch_hist.columns, pd.MultiIndex): + if ticker not in batch_hist.columns.get_level_values(1): + continue + close = batch_hist.xs(ticker, axis=1, level=1)["Close"].dropna() + else: + close = batch_hist["Close"].dropna() + if len(close) < 6: + continue + ticker_returns[ticker] = (float(close.iloc[-1]) / float(close.iloc[-6]) - 1) * 100 + except Exception: + continue + + # Step 3: Only call get_ticker_info() for laggard tickers (< 2% 5d move). + # This dramatically reduces API calls from max_tickers down to ~20-30%. + candidates = [] + for ticker, ret_5d in ticker_returns.items(): + if ret_5d > 2.0: + continue # Already moved — not a laggard + if len(candidates) >= self.limit: break + result = self._check_sector_laggard(ticker, accelerating_sectors, get_ticker_info) + if result: + # Overwrite ret_5d with the value we already computed + result["stock_5d_return"] = round(ret_5d, 2) + candidates.append(result) + logger.info(f"Sector rotation: {len(candidates)} candidates") return candidates @@ -148,7 +185,7 @@ class SectorRotationScanner(BaseScanner): def _check_sector_laggard( self, ticker: str, accelerating_sectors: List[str], get_info_fn ) -> Optional[Dict[str, Any]]: - """Check if stock is in an accelerating sector but hasn't moved yet.""" + """Check if stock is in an accelerating sector (sector lookup only — no price download).""" try: info = get_info_fn(ticker) if not info: @@ -163,32 +200,8 @@ class SectorRotationScanner(BaseScanner): if not sector_etf or sector_etf not in accelerating_sectors: return None - # Check if stock is lagging its sector - from tradingagents.dataflows.y_finance import download_history - - hist = download_history( - ticker, period="1mo", interval="1d", auto_adjust=True, progress=False - ) - if hist is None or hist.empty or len(hist) < 6: - return None - - # Handle MultiIndex - if isinstance(hist.columns, pd.MultiIndex): - tickers_in_data = hist.columns.get_level_values(1).unique() - target = ticker if ticker in tickers_in_data else tickers_in_data[0] - hist = hist.xs(target, level=1, axis=1) - - close = hist["Close"] if "Close" in hist.columns else hist.iloc[:, 0] - ret_5d = (float(close.iloc[-1]) / float(close.iloc[-6]) - 1) * 100 - - # Stock is a laggard if it moved less than 2% while sector is accelerating - if ret_5d > 2.0: - return None # Already moved, not a laggard - - context = ( - f"Sector rotation: {stock_sector} sector accelerating, " - f"{ticker} lagging at {ret_5d:+.1f}% (5d)" - ) + # 5-day return is filled in by the caller (batch-computed) + context = f"Sector rotation: {stock_sector} sector accelerating, {ticker} lagging" return { "ticker": ticker, @@ -198,7 +211,7 @@ class SectorRotationScanner(BaseScanner): "strategy": self.strategy, "sector": stock_sector, "sector_etf": sector_etf, - "stock_5d_return": round(ret_5d, 2), + "stock_5d_return": 0.0, # overwritten by caller } except Exception as e: diff --git a/tradingagents/graph/discovery_graph.py b/tradingagents/graph/discovery_graph.py index 76a54432..c07831ca 100644 --- a/tradingagents/graph/discovery_graph.py +++ b/tradingagents/graph/discovery_graph.py @@ -452,8 +452,15 @@ class DiscoveryGraph: pipeline_candidates: Dict[str, List[Dict[str, Any]]] = {} + # Global wall-clock limit: all scanners must finish within this budget. + # Using timeout_seconds as per-scanner budget × number of scanners gives a + # reasonable upper bound, capped at 5 minutes so a single slow scanner can + # never block the whole run indefinitely. + global_timeout = min(timeout_seconds * len(enabled_scanners), 300) + logger.info( - f"Running {len(enabled_scanners)} scanners concurrently (max {max_workers} workers)..." + f"Running {len(enabled_scanners)} scanners concurrently " + f"(max {max_workers} workers, global timeout {global_timeout}s)..." ) def run_scanner(scanner_info: tuple) -> tuple: @@ -483,40 +490,46 @@ class DiscoveryGraph: for scanner_info in enabled_scanners } - # Collect results as they complete (no global timeout, handle per-scanner) + # Collect results as they complete. + # The global_timeout passed to as_completed() ensures that if any + # scanner thread blocks indefinitely (e.g. waiting on a hung network + # call), we raise TimeoutError and continue rather than hanging forever. completed_count = 0 - for future in as_completed(future_to_scanner): - scanner_name = future_to_scanner[future] + try: + for future in as_completed(future_to_scanner, timeout=global_timeout): + scanner_name = future_to_scanner[future] - try: - # Get result with per-scanner timeout - name, pipeline, candidates, error, scanner_logs = future.result( - timeout=timeout_seconds - ) + try: + name, pipeline, candidates, error, scanner_logs = future.result() - # Initialize pipeline list if needed - if pipeline not in pipeline_candidates: - pipeline_candidates[pipeline] = [] + # Initialize pipeline list if needed + if pipeline not in pipeline_candidates: + pipeline_candidates[pipeline] = [] - if error: - logger.warning(f"⚠️ {name}: {error}") - else: - pipeline_candidates[pipeline].extend(candidates) - logger.info(f"✓ {name}: {len(candidates)} candidates") + if error: + logger.warning(f"⚠️ {name}: {error}") + else: + pipeline_candidates[pipeline].extend(candidates) + logger.info(f"✓ {name}: {len(candidates)} candidates") - # Thread-safe log merging - if scanner_logs: - with self._tool_logs_lock: - state.setdefault("tool_logs", []).extend(scanner_logs) + # Thread-safe log merging + if scanner_logs: + with self._tool_logs_lock: + state.setdefault("tool_logs", []).extend(scanner_logs) - except TimeoutError: - logger.warning(f"⏱️ {scanner_name}: timeout after {timeout_seconds}s") + except Exception as e: + logger.error(f"⚠️ {scanner_name}: unexpected error - {e}", exc_info=True) - except Exception as e: - logger.error(f"⚠️ {scanner_name}: unexpected error - {e}", exc_info=True) + finally: + completed_count += 1 - finally: - completed_count += 1 + except TimeoutError: + # Identify which scanners did not finish in time + stuck = [name for fut, name in future_to_scanner.items() if not fut.done()] + logger.warning( + f"⏱️ Global scanner timeout ({global_timeout}s) reached. " + f"Timed-out scanners: {stuck}. Continuing with {completed_count} completed." + ) # Log completion stats if completed_count < len(enabled_scanners):