From c792b17ab6726a4fd0702702caffd419476123cd Mon Sep 17 00:00:00 2001 From: Youssef Aitousarrah Date: Fri, 20 Feb 2026 22:35:42 -0800 Subject: [PATCH] fix(discovery): fix three scanner hang/validation bugs found in ranker_debug.log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. executor.shutdown(wait=True) still blocked after global timeout (critical) The previous fix added timeout= to as_completed() but used `with ThreadPoolExecutor() as executor`, whose __exit__ calls shutdown(wait=True). This meant the process still hung waiting for stuck threads (ml_signal) even after the TimeoutError was caught. Fixed by creating the executor explicitly and calling shutdown(wait=False) in a finally block. 2. ml_signal hangs on every run — "Batch-downloading 592 tickers (1y)..." never completes. Root cause: a single yfinance request for 592 tickers × 1 year of daily OHLCV is a very large payload that regularly times out at the network layer. Fixed by: - Reducing default lookback from "1y" to "6mo" (halves download size) - Splitting downloads into 150-ticker chunks so a slow chunk doesn't kill the whole scan (partial results are still returned) 3. C (Citigroup) and other single-letter NYSE tickers rejected as invalid. validate_ticker_format used ^[A-Z]{2,5}$ requiring at least 2 letters. Real tickers like C, A, F, T, X, M are 1 letter. Fixed to ^[A-Z]{1,5}$. Co-Authored-By: Claude Sonnet 4.6 --- .../dataflows/discovery/common_utils.py | 5 +- .../dataflows/discovery/scanners/ml_signal.py | 87 ++++++++++--------- tradingagents/graph/discovery_graph.py | 21 +++-- 3 files changed, 65 insertions(+), 48 deletions(-) diff --git a/tradingagents/dataflows/discovery/common_utils.py b/tradingagents/dataflows/discovery/common_utils.py index bd774b2c..88b995e8 100644 --- a/tradingagents/dataflows/discovery/common_utils.py +++ b/tradingagents/dataflows/discovery/common_utils.py @@ -138,12 +138,13 @@ def validate_ticker_format(ticker: str) -> bool: ticker: Ticker symbol to validate Returns: - True if ticker matches expected format (2-5 uppercase letters) + True if ticker matches expected format (1-5 uppercase letters). + Single-letter tickers (C, A, F, T, X, M, etc.) are valid NYSE symbols. """ if not ticker or not isinstance(ticker, str): return False - return bool(re.match(r"^[A-Z]{2,5}$", ticker.strip().upper())) + return bool(re.match(r"^[A-Z]{1,5}$", ticker.strip().upper())) def validate_candidate_structure(candidate: dict) -> bool: diff --git a/tradingagents/dataflows/discovery/scanners/ml_signal.py b/tradingagents/dataflows/discovery/scanners/ml_signal.py index 5e528969..05f9be1d 100644 --- a/tradingagents/dataflows/discovery/scanners/ml_signal.py +++ b/tradingagents/dataflows/discovery/scanners/ml_signal.py @@ -58,7 +58,9 @@ class MLSignalScanner(BaseScanner): def __init__(self, config: Dict[str, Any]): super().__init__(config) self.min_win_prob = self.scanner_config.get("min_win_prob", 0.50) - self.lookback_period = self.scanner_config.get("lookback_period", "1y") + # "6mo" instead of "1y" — halves the yfinance payload for 500+ tickers, + # which prevents the batch download from hanging on slow connections. + self.lookback_period = self.scanner_config.get("lookback_period", "6mo") self.max_workers = self.scanner_config.get("max_workers", 8) self.fetch_market_cap = self.scanner_config.get("fetch_market_cap", False) @@ -137,54 +139,57 @@ class MLSignalScanner(BaseScanner): return None def _fetch_universe_ohlcv(self) -> Dict[str, pd.DataFrame]: - """Batch-fetch OHLCV data for the entire ticker universe. + """Batch-fetch OHLCV data for the entire ticker universe in chunks. - Uses yfinance batch download — a single HTTP request regardless of - universe size. This is the key optimization for large universes. + Downloads in chunks of 150 tickers so a single slow/failed chunk doesn't + block the whole scanner. This replaces the previous single-request approach + which would hang on large universes (500+ tickers × 1y of data). """ - try: - from tradingagents.dataflows.y_finance import download_history + from tradingagents.dataflows.y_finance import download_history - logger.info( - f"Batch-downloading {len(self.universe)} tickers ({self.lookback_period})..." - ) + chunk_size = 150 + universe = self.universe + result: Dict[str, pd.DataFrame] = {} - # yfinance batch download — single HTTP request for all tickers - raw = download_history( - " ".join(self.universe), - period=self.lookback_period, - auto_adjust=True, - progress=False, - ) + chunks = [universe[i : i + chunk_size] for i in range(0, len(universe), chunk_size)] + logger.info( + f"Batch-downloading {len(universe)} tickers ({self.lookback_period}) " + f"in {len(chunks)} chunks..." + ) - if raw.empty: - return {} + for idx, chunk in enumerate(chunks): + try: + raw = download_history( + " ".join(chunk), + period=self.lookback_period, + auto_adjust=True, + progress=False, + ) - # Handle multi-level columns from batch download - result = {} - if isinstance(raw.columns, pd.MultiIndex): - # Multi-ticker: columns are (Price, Ticker) - tickers_in_data = raw.columns.get_level_values(1).unique() - for ticker in tickers_in_data: - try: - ticker_df = raw.xs(ticker, level=1, axis=1).copy() - ticker_df = ticker_df.reset_index() - if len(ticker_df) > 0: - result[ticker] = ticker_df - except (KeyError, ValueError): - continue - else: - # Single ticker fallback - raw = raw.reset_index() - if len(self.universe) == 1: - result[self.universe[0]] = raw + if raw is None or raw.empty: + continue - logger.info(f"Fetched OHLCV for {len(result)} tickers") - return result + if isinstance(raw.columns, pd.MultiIndex): + tickers_in_data = raw.columns.get_level_values(1).unique() + for ticker in tickers_in_data: + try: + ticker_df = raw.xs(ticker, level=1, axis=1).copy().reset_index() + if len(ticker_df) > 0: + result[ticker] = ticker_df + except (KeyError, ValueError): + continue + else: + # Single-ticker fallback + raw = raw.reset_index() + if chunk: + result[chunk[0]] = raw - except Exception as e: - logger.warning(f"OHLCV batch fetch failed: {e}") - return {} + except Exception as e: + logger.warning(f"Chunk {idx + 1}/{len(chunks)} download failed: {e}") + continue + + logger.info(f"Fetched OHLCV for {len(result)} tickers") + return result def _predict_universe( self, predictor, ohlcv_by_ticker: Dict[str, pd.DataFrame] diff --git a/tradingagents/graph/discovery_graph.py b/tradingagents/graph/discovery_graph.py index c07831ca..aebe3e89 100644 --- a/tradingagents/graph/discovery_graph.py +++ b/tradingagents/graph/discovery_graph.py @@ -483,17 +483,22 @@ class DiscoveryGraph: logger.error(f"Scanner {name} failed: {e}", exc_info=True) return (name, pipeline, [], str(e), []) - # Submit all scanner tasks - with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all scanner tasks. + # NOTE: Do NOT use `with ThreadPoolExecutor() as executor` here — that + # form calls shutdown(wait=True) on exit, which blocks until every thread + # finishes even after as_completed() has already timed out. We call + # shutdown(wait=False) explicitly so stuck threads are abandoned. + executor = ThreadPoolExecutor(max_workers=max_workers) + try: future_to_scanner = { executor.submit(run_scanner, scanner_info): scanner_info[1] for scanner_info in enabled_scanners } # Collect results as they complete. - # The global_timeout passed to as_completed() ensures that if any - # scanner thread blocks indefinitely (e.g. waiting on a hung network - # call), we raise TimeoutError and continue rather than hanging forever. + # global_timeout is the wall-clock budget for ALL scanners together. + # If any thread blocks indefinitely (e.g. a hung yfinance download), + # as_completed() raises TimeoutError so we continue immediately. completed_count = 0 try: for future in as_completed(future_to_scanner, timeout=global_timeout): @@ -535,6 +540,12 @@ class DiscoveryGraph: if completed_count < len(enabled_scanners): logger.warning(f"Only {completed_count}/{len(enabled_scanners)} scanners completed") + finally: + # wait=False: don't block on threads that are still running (e.g. a + # hung ml_signal download). The daemon threads will be cleaned up + # when the process exits. + executor.shutdown(wait=False) + return pipeline_candidates def filter_node(self, state: DiscoveryState) -> Dict[str, Any]: