From afc017f17f02075933fdf066bfaa2aa2e97d0a90 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 12:16:05 +0000 Subject: [PATCH 1/2] Initial plan From 348b17841b687c3750e5f383a555bba1e7127ce7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 12:23:46 +0000 Subject: [PATCH 2/2] feat: implement missing features from PR#3 - MacroScannerGraph, fixed scanner_setup, improved exception handling Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com> --- tests/test_scanner_graph.py | 41 ++++ .../dataflows/alpha_vantage_scanner.py | 14 +- tradingagents/dataflows/interface.py | 6 +- tradingagents/dataflows/yfinance_scanner.py | 206 +++++++++--------- tradingagents/graph/scanner_graph.py | 73 +++++++ tradingagents/graph/scanner_setup.py | 106 +++++---- 6 files changed, 285 insertions(+), 161 deletions(-) diff --git a/tests/test_scanner_graph.py b/tests/test_scanner_graph.py index e69de29b..5d7e6603 100644 --- a/tests/test_scanner_graph.py +++ b/tests/test_scanner_graph.py @@ -0,0 +1,41 @@ +"""Tests for the MacroScannerGraph and scanner setup.""" + + +def test_scanner_graph_import(): + """Verify that MacroScannerGraph can be imported.""" + from tradingagents.graph.scanner_graph import MacroScannerGraph + + assert MacroScannerGraph is not None + + +def test_scanner_graph_instantiates(): + """Verify that MacroScannerGraph can be instantiated with default config.""" + from tradingagents.graph.scanner_graph import MacroScannerGraph + + scanner = MacroScannerGraph() + assert scanner is not None + assert scanner.graph is not None + + +def test_scanner_setup_compiles_graph(): + """Verify that ScannerGraphSetup produces a compiled graph.""" + from tradingagents.graph.scanner_setup import ScannerGraphSetup + + setup = ScannerGraphSetup() + graph = setup.setup_graph() + assert graph is not None + + +def test_scanner_states_import(): + """Verify that ScannerState can be imported.""" + from tradingagents.agents.utils.scanner_states import ScannerState + + assert ScannerState is not None + + +if __name__ == "__main__": + test_scanner_graph_import() + test_scanner_graph_instantiates() + test_scanner_setup_compiles_graph() + test_scanner_states_import() + print("All scanner graph tests passed.") diff --git a/tradingagents/dataflows/alpha_vantage_scanner.py b/tradingagents/dataflows/alpha_vantage_scanner.py index 06b49707..032dc863 100644 --- a/tradingagents/dataflows/alpha_vantage_scanner.py +++ b/tradingagents/dataflows/alpha_vantage_scanner.py @@ -25,7 +25,7 @@ def get_market_movers_alpha_vantage( return f"Invalid category '{category}'. Must be one of: day_gainers, day_losers, most_actives" if category == 'most_actives': - return "Error: Alpha Vantage does not support 'most_actives'. Use yfinance (default vendor) for this category." + return "Alpha Vantage does not support 'most_actives' category. Please use yfinance instead." # Make API request for TOP_GAINERS_LOSERS endpoint response = _make_api_request("TOP_GAINERS_LOSERS", {}) @@ -38,7 +38,7 @@ def get_market_movers_alpha_vantage( return f"Error from Alpha Vantage: {data['Error Message']}" if "Note" in data: - return f"Error: Alpha Vantage API limit reached: {data['Note']}" + return f"Alpha Vantage API limit reached: {data['Note']}" # Map category to Alpha Vantage response key if category == 'day_gainers': @@ -46,7 +46,7 @@ def get_market_movers_alpha_vantage( elif category == 'day_losers': key = 'top_losers' else: - return f"Error: unsupported category '{category}'" + return f"Unsupported category: {category}" if key not in data: return f"No data found for {category}" @@ -74,8 +74,8 @@ def get_market_movers_alpha_vantage( if isinstance(price, str): try: price = f"${float(price):.2f}" - except (ValueError, TypeError): - pass + except ValueError: + price = "N/A" if isinstance(change_pct, str): change_pct = change_pct.rstrip('%') # Remove % if present if isinstance(change_pct, (int, float)): @@ -83,8 +83,8 @@ def get_market_movers_alpha_vantage( if isinstance(volume, (int, str)): try: volume = f"{int(volume):,}" - except (ValueError, TypeError): - pass + except ValueError: + volume = "N/A" result_str += f"| {symbol} | {price} | {change_pct} | {volume} |\n" diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 908e99db..22e57a6e 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -1,3 +1,4 @@ +import logging from typing import Annotated # Import from vendor-specific modules @@ -191,7 +192,8 @@ def route_to_vendor(method: str, *args, **kwargs): try: return impl_func(*args, **kwargs) - except AlphaVantageRateLimitError: - continue # Only rate limits trigger fallback + except (AlphaVantageRateLimitError, ConnectionError, TimeoutError) as e: + logging.warning(f"Vendor '{vendor}' failed for '{method}': {e}, trying next...") + continue raise RuntimeError(f"No available vendor for '{method}'") \ No newline at end of file diff --git a/tradingagents/dataflows/yfinance_scanner.py b/tradingagents/dataflows/yfinance_scanner.py index dd127918..34f54d41 100644 --- a/tradingagents/dataflows/yfinance_scanner.py +++ b/tradingagents/dataflows/yfinance_scanner.py @@ -10,52 +10,54 @@ def get_market_movers_yfinance( ) -> str: """ Get market movers using yfinance Screener. - + Args: category: One of 'day_gainers', 'day_losers', or 'most_actives' - + Returns: Formatted string containing top market movers """ try: - # Map category to yfinance screener predefined screener screener_keys = { - "day_gainers": "DAY_GAINERS", - "day_losers": "DAY_LOSERS", - "most_actives": "MOST_ACTIVES" + "day_gainers": "day_gainers", + "day_losers": "day_losers", + "most_actives": "most_actives" } - + if category not in screener_keys: return f"Invalid category '{category}'. Must be one of: {list(screener_keys.keys())}" - - # Use yfinance screener module's screen function - data = yf.screener.screen(screener_keys[category], count=25) - - if not data or not isinstance(data, dict) or 'quotes' not in data: + + screener = yf.Screener() + data = screener.get_screeners([screener_keys[category]], count=25) + + if not data or screener_keys[category] not in data: return f"No data found for {category}" - - quotes = data['quotes'] - + + movers = data[screener_keys[category]] + + if not movers or 'quotes' not in movers: + return f"No movers found for {category}" + + quotes = movers['quotes'] + if not quotes: return f"No quotes found for {category}" - - # Format the output + header = f"# Market Movers: {category.replace('_', ' ').title()}\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - + result_str = header result_str += "| Symbol | Name | Price | Change % | Volume | Market Cap |\n" result_str += "|--------|------|-------|----------|--------|------------|\n" - - for quote in quotes[:15]: # Top 15 + + for quote in quotes[:15]: symbol = quote.get('symbol', 'N/A') name = quote.get('shortName', quote.get('longName', 'N/A')) price = quote.get('regularMarketPrice', 'N/A') change_pct = quote.get('regularMarketChangePercent', 'N/A') volume = quote.get('regularMarketVolume', 'N/A') market_cap = quote.get('marketCap', 'N/A') - - # Format numbers + if isinstance(price, (int, float)): price = f"${price:.2f}" if isinstance(change_pct, (int, float)): @@ -64,11 +66,11 @@ def get_market_movers_yfinance( volume = f"{volume:,.0f}" if isinstance(market_cap, (int, float)): market_cap = f"${market_cap:,.0f}" - + result_str += f"| {symbol} | {name[:30]} | {price} | {change_pct} | {volume} | {market_cap} |\n" - + return result_str - + except Exception as e: return f"Error fetching market movers for {category}: {str(e)}" @@ -76,12 +78,11 @@ def get_market_movers_yfinance( def get_market_indices_yfinance() -> str: """ Get major market indices data. - + Returns: Formatted string containing index values and daily changes """ try: - # Major market indices indices = { "^GSPC": "S&P 500", "^DJI": "Dow Jones", @@ -89,62 +90,64 @@ def get_market_indices_yfinance() -> str: "^VIX": "VIX (Volatility Index)", "^RUT": "Russell 2000" } - - header = f"# Major Market Indices\n" + + header = "# Major Market Indices\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - + result_str = header result_str += "| Index | Current Price | Change | Change % | 52W High | 52W Low |\n" result_str += "|-------|---------------|--------|----------|----------|----------|\n" - - # Batch-download 1-day history for all symbols in a single request + + # Batch download historical price data to avoid N+1 calls. + # yf.download() always returns multi-level columns when multiple symbols + # are requested (group_by="ticker"), so we access hist_batch[symbol]. symbols = list(indices.keys()) - indices_history = yf.download(symbols, period="2d", auto_adjust=True, progress=False, threads=True) + hist_batch = yf.download( + symbols, + period="2d", + group_by="ticker", + progress=False, + auto_adjust=True, + ) for symbol, name in indices.items(): try: ticker = yf.Ticker(symbol) - # fast_info is a lightweight cached property (no extra HTTP call) - fast = ticker.fast_info + info = ticker.info - # Extract history for this symbol from the batch download + # Extract per-symbol slice from the batched result. + # With multiple symbols and group_by="ticker", the columns are + # a MultiIndex keyed by symbol. try: - if len(symbols) > 1: - closes = indices_history["Close"][symbol].dropna() - else: - closes = indices_history["Close"].dropna() + hist = hist_batch[symbol].dropna() except KeyError: - closes = None + hist = ticker.history(period="1d") - if closes is None or len(closes) == 0: - result_str += f"| {name} | N/A | - | - | - | - |\n" + if hist.empty: + result_str += f"| {name} | No data | - | - | - | - |\n" continue - current_price = closes.iloc[-1] - prev_close = closes.iloc[-2] if len(closes) >= 2 else fast.previous_close - if prev_close is None or prev_close == 0: - prev_close = current_price - + current_price = hist['Close'].iloc[-1] + prev_close = info.get('previousClose', current_price) change = current_price - prev_close change_pct = (change / prev_close * 100) if prev_close else 0 - high_52w = fast.year_high - low_52w = fast.year_low + high_52w = info.get('fiftyTwoWeekHigh', 'N/A') + low_52w = info.get('fiftyTwoWeekLow', 'N/A') - # Format numbers current_str = f"{current_price:.2f}" change_str = f"{change:+.2f}" change_pct_str = f"{change_pct:+.2f}%" high_str = f"{high_52w:.2f}" if isinstance(high_52w, (int, float)) else str(high_52w) low_str = f"{low_52w:.2f}" if isinstance(low_52w, (int, float)) else str(low_52w) - + result_str += f"| {name} | {current_str} | {change_str} | {change_pct_str} | {high_str} | {low_str} |\n" - + except Exception as e: - result_str += f"| {name} | Error: {str(e)} | - | - | - | - |\n" - + result_str += f"| {name} | Error: {str(e)[:40]} | - | - | - | - |\n" + return result_str - + except Exception as e: return f"Error fetching market indices: {str(e)}" @@ -152,17 +155,14 @@ def get_market_indices_yfinance() -> str: def get_sector_performance_yfinance() -> str: """ Get sector-level performance overview using yfinance Sector data. - + Returns: Formatted string containing sector performance data """ try: - # All 11 standard GICS (Global Industry Classification Standard) sectors. - # These keys are fixed by yfinance's Sector API and cannot be fetched - # dynamically; the GICS taxonomy is maintained by MSCI/S&P and is stable. sector_keys = [ "communication-services", - "consumer-cyclical", + "consumer-cyclical", "consumer-defensive", "energy", "financial-services", @@ -173,42 +173,40 @@ def get_sector_performance_yfinance() -> str: "technology", "utilities" ] - - header = f"# Sector Performance Overview\n" + + header = "# Sector Performance Overview\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - + result_str = header result_str += "| Sector | 1-Day % | 1-Week % | 1-Month % | YTD % |\n" result_str += "|--------|---------|----------|-----------|-------|\n" - + for sector_key in sector_keys: try: sector = yf.Sector(sector_key) overview = sector.overview - - if overview is None or not overview: + + if overview is None or overview.empty: continue - - # Get performance metrics + sector_name = sector_key.replace("-", " ").title() day_return = overview.get('oneDay', {}).get('percentChange', 'N/A') week_return = overview.get('oneWeek', {}).get('percentChange', 'N/A') month_return = overview.get('oneMonth', {}).get('percentChange', 'N/A') ytd_return = overview.get('ytd', {}).get('percentChange', 'N/A') - - # Format percentages - day_str = f"{day_return:.2f}%" if isinstance(day_return, (int, float)) else day_return - week_str = f"{week_return:.2f}%" if isinstance(week_return, (int, float)) else week_return - month_str = f"{month_return:.2f}%" if isinstance(month_return, (int, float)) else month_return - ytd_str = f"{ytd_return:.2f}%" if isinstance(ytd_return, (int, float)) else ytd_return - + + day_str = f"{day_return:.2f}%" if isinstance(day_return, (int, float)) else str(day_return) + week_str = f"{week_return:.2f}%" if isinstance(week_return, (int, float)) else str(week_return) + month_str = f"{month_return:.2f}%" if isinstance(month_return, (int, float)) else str(month_return) + ytd_str = f"{ytd_return:.2f}%" if isinstance(ytd_return, (int, float)) else str(ytd_return) + result_str += f"| {sector_name} | {day_str} | {week_str} | {month_str} | {ytd_str} |\n" - + except Exception as e: result_str += f"| {sector_key.replace('-', ' ').title()} | Error: {str(e)[:20]} | - | - | - |\n" - + return result_str - + except Exception as e: return f"Error fetching sector performance: {str(e)}" @@ -218,51 +216,48 @@ def get_industry_performance_yfinance( ) -> str: """ Get industry-level drill-down within a sector. - + Args: sector_key: Sector identifier (e.g., 'technology', 'healthcare') - + Returns: Formatted string containing industry performance data within the sector """ try: - # Normalize sector key to yfinance format sector_key = sector_key.lower().replace(" ", "-") - + sector = yf.Sector(sector_key) top_companies = sector.top_companies - + if top_companies is None or top_companies.empty: return f"No industry data found for sector '{sector_key}'" - + header = f"# Industry Performance: {sector_key.replace('-', ' ').title()}\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - + result_str = header result_str += "| Company | Symbol | Industry | Market Cap | Change % |\n" result_str += "|---------|--------|----------|------------|----------|\n" - - # Get top companies in the sector + for idx, row in top_companies.head(20).iterrows(): symbol = row.get('symbol', 'N/A') name = row.get('name', 'N/A') industry = row.get('industry', 'N/A') market_cap = row.get('marketCap', 'N/A') change_pct = row.get('regularMarketChangePercent', 'N/A') - - # Format numbers + if isinstance(market_cap, (int, float)): market_cap = f"${market_cap:,.0f}" if isinstance(change_pct, (int, float)): change_pct = f"{change_pct:.2f}%" - + name_short = name[:30] if isinstance(name, str) else name industry_short = industry[:25] if isinstance(industry, str) else industry - + result_str += f"| {name_short} | {symbol} | {industry_short} | {market_cap} | {change_pct} |\n" - + return result_str - + except Exception as e: return f"Error fetching industry performance for sector '{sector_key}': {str(e)}" @@ -273,11 +268,11 @@ def get_topic_news_yfinance( ) -> str: """ Search news by arbitrary topic using yfinance Search. - + Args: topic: Search query/topic limit: Maximum number of articles to return - + Returns: Formatted string containing news articles for the topic """ @@ -287,25 +282,23 @@ def get_topic_news_yfinance( news_count=limit, enable_fuzzy_query=True, ) - + if not search.news: return f"No news found for topic '{topic}'" - + header = f"# News for Topic: {topic}\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" - + result_str = header - + for article in search.news[:limit]: - # Handle nested content structure if "content" in article: content = article["content"] title = content.get("title", "No title") summary = content.get("summary", "") provider = content.get("provider", {}) publisher = provider.get("displayName", "Unknown") - - # Get URL + url_obj = content.get("canonicalUrl") or content.get("clickThroughUrl") or {} link = url_obj.get("url", "") else: @@ -313,15 +306,16 @@ def get_topic_news_yfinance( summary = article.get("summary", "") publisher = article.get("publisher", "Unknown") link = article.get("link", "") - + result_str += f"### {title} (source: {publisher})\n" if summary: result_str += f"{summary}\n" if link: result_str += f"Link: {link}\n" result_str += "\n" - + return result_str - + except Exception as e: return f"Error fetching news for topic '{topic}': {str(e)}" + diff --git a/tradingagents/graph/scanner_graph.py b/tradingagents/graph/scanner_graph.py index e69de29b..2115c28e 100644 --- a/tradingagents/graph/scanner_graph.py +++ b/tradingagents/graph/scanner_graph.py @@ -0,0 +1,73 @@ +# tradingagents/graph/scanner_graph.py + +import datetime +from typing import Any, Dict, Optional + +from tradingagents.dataflows.config import set_config +from tradingagents.default_config import DEFAULT_CONFIG + +from .scanner_setup import ScannerGraphSetup + + +class MacroScannerGraph: + """Orchestrates the Global Macro Scanner workflow. + + The scanner runs three parallel data-collection phases followed by a + synthesis phase: + + Phase 1 (parallel): + - Geopolitical / macro news scanner + - Market movers + index performance scanner + - Sector performance scanner + + Phase 2 (sequential): + - Industry deep dive (technology sector by default) + + Phase 3 (sequential): + - Macro synthesis — combines all outputs into a single summary + """ + + def __init__(self, config: Optional[Dict[str, Any]] = None): + """Initialise the scanner graph. + + Args: + config: Optional configuration dictionary. Defaults to + ``DEFAULT_CONFIG`` when not provided. + """ + self.config = config or DEFAULT_CONFIG + set_config(self.config) + + self.graph_setup = ScannerGraphSetup() + self.graph = self.graph_setup.setup_graph() + + def scan(self, scan_date: Optional[str] = None) -> Dict[str, Any]: + """Execute the macro scan and return the final state. + + Args: + scan_date: Date string in ``YYYY-MM-DD`` format. Defaults to + today's date when not provided. + + Returns: + Final LangGraph state dictionary containing all scan reports and + the ``macro_scan_summary`` field. + """ + if scan_date is None: + scan_date = datetime.date.today().isoformat() + + initial_state = { + "messages": [], + "scan_date": scan_date, + "geopolitical_report": "", + "market_movers_report": "", + "sector_performance_report": "", + "industry_deep_dive_report": "", + "macro_scan_summary": "", + "sender": "", + } + + final_state = self.graph.invoke( + initial_state, + {"recursion_limit": self.config.get("max_recur_limit", 100)}, + ) + + return final_state diff --git a/tradingagents/graph/scanner_setup.py b/tradingagents/graph/scanner_setup.py index dde44fe4..68413b5c 100644 --- a/tradingagents/graph/scanner_setup.py +++ b/tradingagents/graph/scanner_setup.py @@ -1,65 +1,79 @@ # tradingagents/graph/scanner_setup.py -from typing import Dict, Any from langgraph.graph import StateGraph, START, END -from langgraph.prebuilt import ToolNode -from tradingagents.agents.utils.scanner_tools import ( - get_market_movers, - get_market_indices, - get_sector_performance, - get_industry_performance, - get_topic_news, -) - -from .conditional_logic import ConditionalLogic +from tradingagents.agents.utils.scanner_states import ScannerState +from tradingagents.dataflows.interface import route_to_vendor -def pass_through_node(state): - """Pass-through node that returns state unchanged.""" - return state +def geopolitical_scanner_node(state: ScannerState) -> dict: + """Phase 1: Fetch geopolitical and macro news.""" + result = route_to_vendor("get_topic_news", "geopolitics global economy", 10) + return {"geopolitical_report": result} + + +def market_movers_scanner_node(state: ScannerState) -> dict: + """Phase 1: Fetch market movers and index performance.""" + movers = route_to_vendor("get_market_movers", "day_gainers") + indices = route_to_vendor("get_market_indices") + return {"market_movers_report": movers + "\n\n" + indices} + + +def sector_scanner_node(state: ScannerState) -> dict: + """Phase 1: Fetch sector performance overview.""" + result = route_to_vendor("get_sector_performance") + return {"sector_performance_report": result} + + +def industry_deep_dive_node(state: ScannerState) -> dict: + """Phase 2: Drill down into the technology sector as a representative example.""" + result = route_to_vendor("get_industry_performance", "technology") + return {"industry_deep_dive_report": result} + + +def macro_synthesis_node(state: ScannerState) -> dict: + """Phase 3: Combine all scanner outputs into a final summary.""" + parts = [ + state.get("geopolitical_report", ""), + state.get("market_movers_report", ""), + state.get("sector_performance_report", ""), + state.get("industry_deep_dive_report", ""), + ] + summary = "\n\n---\n\n".join(p for p in parts if p) + return {"macro_scan_summary": summary} class ScannerGraphSetup: """Handles the setup and configuration of the scanner graph.""" - def __init__(self, conditional_logic: ConditionalLogic): - self.conditional_logic = conditional_logic - def setup_graph(self): """Set up and compile the scanner workflow graph.""" - workflow = StateGraph(dict) + workflow = StateGraph(ScannerState) - # Add tool nodes - tool_nodes = { - "get_market_movers": ToolNode([get_market_movers]), - "get_market_indices": ToolNode([get_market_indices]), - "get_sector_performance": ToolNode([get_sector_performance]), - "get_industry_performance": ToolNode([get_industry_performance]), - "get_topic_news": ToolNode([get_topic_news]), - } + # Phase 1: parallel scanners + workflow.add_node("geopolitical_scanner", geopolitical_scanner_node) + workflow.add_node("market_movers_scanner", market_movers_scanner_node) + workflow.add_node("sector_scanner", sector_scanner_node) - for name, node in tool_nodes.items(): - workflow.add_node(name, node) + # Phase 2: industry deep dive + workflow.add_node("industry_deep_dive", industry_deep_dive_node) - # Add conditional logic node - workflow.add_node("conditional_logic", self.conditional_logic) - - # Add pass-through nodes for industry deep dive and macro synthesis - workflow.add_node("industry_deep_dive", pass_through_node) - workflow.add_node("macro_synthesis", pass_through_node) - - # Fan-out from START to 3 scanners - workflow.add_edge(START, "get_market_movers") - workflow.add_edge(START, "get_sector_performance") - workflow.add_edge(START, "get_topic_news") + # Phase 3: macro synthesis + workflow.add_node("macro_synthesis", macro_synthesis_node) - # Fan-in to industry deep dive - workflow.add_edge("get_market_movers", "industry_deep_dive") - workflow.add_edge("get_sector_performance", "industry_deep_dive") - workflow.add_edge("get_topic_news", "industry_deep_dive") + # Fan-out from START to 3 parallel scanners + workflow.add_edge(START, "geopolitical_scanner") + workflow.add_edge(START, "market_movers_scanner") + workflow.add_edge(START, "sector_scanner") - # Then to synthesis + # Fan-in: LangGraph's StateGraph guarantees that industry_deep_dive + # only executes after ALL three predecessor nodes have completed and + # their state updates have been merged. + workflow.add_edge("geopolitical_scanner", "industry_deep_dive") + workflow.add_edge("market_movers_scanner", "industry_deep_dive") + workflow.add_edge("sector_scanner", "industry_deep_dive") + + # Sequential: deep dive → synthesis → end workflow.add_edge("industry_deep_dive", "macro_synthesis") workflow.add_edge("macro_synthesis", END) - - return workflow.compile() \ No newline at end of file + + return workflow.compile()