diff --git a/tradingagents/dataflows/discovery/common_utils.py b/tradingagents/dataflows/discovery/common_utils.py index bd774b2c..88b995e8 100644 --- a/tradingagents/dataflows/discovery/common_utils.py +++ b/tradingagents/dataflows/discovery/common_utils.py @@ -138,12 +138,13 @@ def validate_ticker_format(ticker: str) -> bool: ticker: Ticker symbol to validate Returns: - True if ticker matches expected format (2-5 uppercase letters) + True if ticker matches expected format (1-5 uppercase letters). + Single-letter tickers (C, A, F, T, X, M, etc.) are valid NYSE symbols. """ if not ticker or not isinstance(ticker, str): return False - return bool(re.match(r"^[A-Z]{2,5}$", ticker.strip().upper())) + return bool(re.match(r"^[A-Z]{1,5}$", ticker.strip().upper())) def validate_candidate_structure(candidate: dict) -> bool: diff --git a/tradingagents/dataflows/discovery/scanners/ml_signal.py b/tradingagents/dataflows/discovery/scanners/ml_signal.py index 5e528969..05f9be1d 100644 --- a/tradingagents/dataflows/discovery/scanners/ml_signal.py +++ b/tradingagents/dataflows/discovery/scanners/ml_signal.py @@ -58,7 +58,9 @@ class MLSignalScanner(BaseScanner): def __init__(self, config: Dict[str, Any]): super().__init__(config) self.min_win_prob = self.scanner_config.get("min_win_prob", 0.50) - self.lookback_period = self.scanner_config.get("lookback_period", "1y") + # "6mo" instead of "1y" — halves the yfinance payload for 500+ tickers, + # which prevents the batch download from hanging on slow connections. + self.lookback_period = self.scanner_config.get("lookback_period", "6mo") self.max_workers = self.scanner_config.get("max_workers", 8) self.fetch_market_cap = self.scanner_config.get("fetch_market_cap", False) @@ -137,54 +139,57 @@ class MLSignalScanner(BaseScanner): return None def _fetch_universe_ohlcv(self) -> Dict[str, pd.DataFrame]: - """Batch-fetch OHLCV data for the entire ticker universe. + """Batch-fetch OHLCV data for the entire ticker universe in chunks. - Uses yfinance batch download — a single HTTP request regardless of - universe size. This is the key optimization for large universes. + Downloads in chunks of 150 tickers so a single slow/failed chunk doesn't + block the whole scanner. This replaces the previous single-request approach + which would hang on large universes (500+ tickers × 1y of data). """ - try: - from tradingagents.dataflows.y_finance import download_history + from tradingagents.dataflows.y_finance import download_history - logger.info( - f"Batch-downloading {len(self.universe)} tickers ({self.lookback_period})..." - ) + chunk_size = 150 + universe = self.universe + result: Dict[str, pd.DataFrame] = {} - # yfinance batch download — single HTTP request for all tickers - raw = download_history( - " ".join(self.universe), - period=self.lookback_period, - auto_adjust=True, - progress=False, - ) + chunks = [universe[i : i + chunk_size] for i in range(0, len(universe), chunk_size)] + logger.info( + f"Batch-downloading {len(universe)} tickers ({self.lookback_period}) " + f"in {len(chunks)} chunks..." + ) - if raw.empty: - return {} + for idx, chunk in enumerate(chunks): + try: + raw = download_history( + " ".join(chunk), + period=self.lookback_period, + auto_adjust=True, + progress=False, + ) - # Handle multi-level columns from batch download - result = {} - if isinstance(raw.columns, pd.MultiIndex): - # Multi-ticker: columns are (Price, Ticker) - tickers_in_data = raw.columns.get_level_values(1).unique() - for ticker in tickers_in_data: - try: - ticker_df = raw.xs(ticker, level=1, axis=1).copy() - ticker_df = ticker_df.reset_index() - if len(ticker_df) > 0: - result[ticker] = ticker_df - except (KeyError, ValueError): - continue - else: - # Single ticker fallback - raw = raw.reset_index() - if len(self.universe) == 1: - result[self.universe[0]] = raw + if raw is None or raw.empty: + continue - logger.info(f"Fetched OHLCV for {len(result)} tickers") - return result + if isinstance(raw.columns, pd.MultiIndex): + tickers_in_data = raw.columns.get_level_values(1).unique() + for ticker in tickers_in_data: + try: + ticker_df = raw.xs(ticker, level=1, axis=1).copy().reset_index() + if len(ticker_df) > 0: + result[ticker] = ticker_df + except (KeyError, ValueError): + continue + else: + # Single-ticker fallback + raw = raw.reset_index() + if chunk: + result[chunk[0]] = raw - except Exception as e: - logger.warning(f"OHLCV batch fetch failed: {e}") - return {} + except Exception as e: + logger.warning(f"Chunk {idx + 1}/{len(chunks)} download failed: {e}") + continue + + logger.info(f"Fetched OHLCV for {len(result)} tickers") + return result def _predict_universe( self, predictor, ohlcv_by_ticker: Dict[str, pd.DataFrame] diff --git a/tradingagents/graph/discovery_graph.py b/tradingagents/graph/discovery_graph.py index c07831ca..aebe3e89 100644 --- a/tradingagents/graph/discovery_graph.py +++ b/tradingagents/graph/discovery_graph.py @@ -483,17 +483,22 @@ class DiscoveryGraph: logger.error(f"Scanner {name} failed: {e}", exc_info=True) return (name, pipeline, [], str(e), []) - # Submit all scanner tasks - with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all scanner tasks. + # NOTE: Do NOT use `with ThreadPoolExecutor() as executor` here — that + # form calls shutdown(wait=True) on exit, which blocks until every thread + # finishes even after as_completed() has already timed out. We call + # shutdown(wait=False) explicitly so stuck threads are abandoned. + executor = ThreadPoolExecutor(max_workers=max_workers) + try: future_to_scanner = { executor.submit(run_scanner, scanner_info): scanner_info[1] for scanner_info in enabled_scanners } # Collect results as they complete. - # The global_timeout passed to as_completed() ensures that if any - # scanner thread blocks indefinitely (e.g. waiting on a hung network - # call), we raise TimeoutError and continue rather than hanging forever. + # global_timeout is the wall-clock budget for ALL scanners together. + # If any thread blocks indefinitely (e.g. a hung yfinance download), + # as_completed() raises TimeoutError so we continue immediately. completed_count = 0 try: for future in as_completed(future_to_scanner, timeout=global_timeout): @@ -535,6 +540,12 @@ class DiscoveryGraph: if completed_count < len(enabled_scanners): logger.warning(f"Only {completed_count}/{len(enabled_scanners)} scanners completed") + finally: + # wait=False: don't block on threads that are still running (e.g. a + # hung ml_signal download). The daemon threads will be cleaned up + # when the process exits. + executor.shutdown(wait=False) + return pipeline_candidates def filter_node(self, state: DiscoveryState) -> Dict[str, Any]: