diff --git a/app.py b/app.py index 45f9a34d..7a14ce63 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,6 @@ """FastAPI SSE backend for the structured equity ranking engine.""" +import logging import os import re import time @@ -10,6 +11,12 @@ import traceback as _tb from datetime import date from fastapi import FastAPI, HTTPException, Request, Depends + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", +) +logger = logging.getLogger(__name__) from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from sse_starlette.sse import EventSourceResponse @@ -51,9 +58,21 @@ async def verify_api_key(request: Request): MAX_CONCURRENT = int(os.getenv("MAX_CONCURRENT_ANALYSES", "3")) _semaphore = asyncio.Semaphore(MAX_CONCURRENT) +# --- Event buffer cap --- +MAX_EVENTS_PER_ANALYSIS = 5000 + analyses: dict[str, dict] = {} +def _append_event(state: dict, evt: dict): + """Append an event to the analysis state, enforcing the buffer cap.""" + events = state["events"] + events.append(evt) + if len(events) > MAX_EVENTS_PER_ANALYSIS: + # Drop oldest events, keep the last MAX_EVENTS_PER_ANALYSIS + state["events"] = events[-MAX_EVENTS_PER_ANALYSIS:] + + class AnalyzeRequest(BaseModel): ticker: str date: str | None = None @@ -74,12 +93,10 @@ def build_config(): "fundamental_data": "yfinance", "news_data": "yfinance", } - print( - f"[CONFIG] provider={config['llm_provider']}, " - f"deep={config['deep_think_llm']}, " - f"quick={config['quick_think_llm']}, " - f"url={config['backend_url']}", - flush=True, + logger.info( + "config_built provider=%s deep=%s quick=%s url=%s", + config['llm_provider'], config['deep_think_llm'], + config['quick_think_llm'], config['backend_url'], ) return config @@ -129,13 +146,14 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): try: graph = TradingAgentsGraph(debug=False, config=config) - print( - f"[ANALYSIS] LLM types: deep={type(graph.deep_thinking_llm).__name__}, " - f"quick={type(graph.quick_thinking_llm).__name__}", - flush=True, + logger.info( + "analysis_init_ok deep_llm=%s quick_llm=%s analysis_id=%s", + type(graph.deep_thinking_llm).__name__, + type(graph.quick_thinking_llm).__name__, + analysis_id, ) except Exception as e: - print(f"[ANALYSIS] Init failed: {e}\n{_tb.format_exc()}", flush=True) + logger.error("analysis_init_failed analysis_id=%s error=%s\n%s", analysis_id, e, _tb.format_exc()) await q.put({"type": "error", "message": f"Init failed: {e}"}) await q.put(None) return @@ -156,14 +174,14 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "status": "pending", "stats": _stats(start_time, emitted_fields), } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) try: async for chunk in graph.graph.astream( init_state, stream_mode="values", - config={"recursion_limit": 50}, + config={"recursion_limit": 25}, ): final_state = chunk @@ -188,7 +206,7 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "status": "completed", "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) # Emit report data for key fields @@ -201,7 +219,7 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "report": _format_report(field, value), "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) elif field == "debate": @@ -216,7 +234,7 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "winner": (value or {}).get("winner", ""), "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) elif field == "master_score": @@ -228,16 +246,16 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "position_role": chunk.get("position_role"), "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) # Mark in-progress agents for upcoming stages await _update_in_progress(chunk, emitted_fields, prev_agent_statuses, state, q, start_time) except Exception as e: - print(f"[ANALYSIS] Stream error: {e}\n{_tb.format_exc()}", flush=True) + logger.error("analysis_stream_error analysis_id=%s error=%s\n%s", analysis_id, e, _tb.format_exc()) evt = {"type": "error", "message": str(e)} - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) state["done"] = True await q.put(None) @@ -260,7 +278,7 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "status": "completed", "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) evt = { @@ -274,7 +292,7 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): "final_decision": decision, "stats": st, } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) state["done"] = True @@ -296,7 +314,7 @@ async def _update_in_progress(chunk, emitted, statuses, state, q, start_time): "status": "in_progress", "stats": _stats(start_time, emitted), } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) # If tier 1 done, mark tier 2 in_progress @@ -317,7 +335,7 @@ async def _update_in_progress(chunk, emitted, statuses, state, q, start_time): "status": "in_progress", "stats": _stats(start_time, emitted), } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) # If scoring done, mark portfolio analysis in_progress @@ -333,7 +351,7 @@ async def _update_in_progress(chunk, emitted, statuses, state, q, start_time): "status": "in_progress", "stats": _stats(start_time, emitted), } - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) @@ -367,9 +385,9 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): timeout=3600, ) except asyncio.TimeoutError: - print(f"[ANALYSIS] Timeout for {analysis_id}", flush=True) + logger.warning("analysis_timeout analysis_id=%s", analysis_id) evt = {"type": "error", "message": "Analysis timed out after 60 minutes"} - state["events"].append(evt) + _append_event(state, evt) await q.put(evt) state["done"] = True await q.put(None) @@ -384,7 +402,7 @@ async def _cleanup_loop(): for aid in expired: analyses.pop(aid, None) if expired: - print(f"[CLEANUP] Removed {len(expired)} expired analyses", flush=True) + logger.info("cleanup_expired count=%d", len(expired)) @app.on_event("startup") @@ -397,8 +415,12 @@ async def _start_cleanup(): @app.post("/analyze", dependencies=[Depends(verify_api_key)]) async def start_analysis(req: AnalyzeRequest): ticker = req.ticker.upper().strip() - if not ticker or not re.match(r'^[A-Z0-9.\-]{1,6}$', ticker): - raise HTTPException(400, "Invalid ticker") + if not ticker: + raise HTTPException(400, "Ticker must not be empty") + if len(ticker) > 10: + raise HTTPException(400, f"Ticker too long ({len(ticker)} chars, max 10)") + if not re.match(r'^[A-Z0-9.\-]{1,10}$', ticker): + raise HTTPException(400, "Invalid ticker — only letters, digits, dots, and hyphens allowed") trade_date = req.date or str(date.today()) analysis_id = str(uuid.uuid4()) analyses[analysis_id] = { @@ -431,7 +453,7 @@ async def stream_analysis(analysis_id: str, last_event: int = 0): try: event = await asyncio.wait_for(q.get(), timeout=15) except asyncio.TimeoutError: - yield {"event": "heartbeat", "data": ""} + yield {"event": "heartbeat", "data": json.dumps({"type": "heartbeat"})} continue if event is None: break diff --git a/tradingagents/agents/structured/tier1.py b/tradingagents/agents/structured/tier1.py index f9cc2d66..bb0b4fd2 100644 --- a/tradingagents/agents/structured/tier1.py +++ b/tradingagents/agents/structured/tier1.py @@ -170,16 +170,18 @@ def create_macro_node(llm): Ticker: {ticker} | Sector: {sector} -MACRO DATA: -- VIX: {macro_data.get('vix_level', 'N/A')} -- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% -- Dollar 1M: {macro_data.get('dollar_1m_return', 'N/A')}% -- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} -- SPY 1M: {spy_perf.get('return_1m', 'N/A')}% +MACRO DATA (source: yfinance): +- VIX: {macro_data.get('vix_level', 'N/A')} (source: yfinance) +- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% (source: yfinance) +- Dollar 1M: {macro_data.get('dollar_1m_return', 'N/A')}% (source: yfinance) +- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} (source: yfinance) +- SPY 1M: {spy_perf.get('return_1m', 'N/A')}% (source: yfinance) -SECTOR PERFORMANCE (1M): +SECTOR PERFORMANCE (1M, source: yfinance): {chr(10).join(sector_lines[:12]) or 'N/A'} +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. + INSTRUCTIONS: 1. Classify risk_appetite: "risk-on" / "risk-off" / "transitional". - risk-on: VIX low, spreads tight, SPY up, breadth strong. @@ -244,11 +246,13 @@ def create_liquidity_node(llm): Ticker: {ticker} | Sector: {card.get('sector', 'Unknown')} -AVAILABLE DATA: -- VIX: {macro_data.get('vix_level', 'N/A')} -- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% -- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} -- Dollar Strength: {macro_data.get('dollar_strength', 'N/A')} +AVAILABLE DATA (source: yfinance macro API): +- VIX: {macro_data.get('vix_level', 'N/A')} (source: yfinance) +- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% (source: yfinance) +- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} (source: yfinance) +- Dollar Strength: {macro_data.get('dollar_strength', 'N/A')} (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Assess Fed stance (dovish / neutral / hawkish) based on yield environment. diff --git a/tradingagents/agents/structured/tier2.py b/tradingagents/agents/structured/tier2.py index c856eda0..392c3f67 100644 --- a/tradingagents/agents/structured/tier2.py +++ b/tradingagents/agents/structured/tier2.py @@ -59,15 +59,17 @@ def create_business_quality_node(llm): Ticker: {ticker} | Sector: {card.get('sector', 'Unknown')} | Industry: {card.get('industry', 'Unknown')} Market Cap: {card.get('market_cap_formatted', 'N/A')} -FINANCIALS: -- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} -- Profit Margins: {_pct(_safe(info, 'profitMargins'))} -- Operating Margins: {_pct(_safe(info, 'operatingMargins'))} -- ROE: {_pct(_safe(info, 'returnOnEquity'))} -- ROA: {_pct(_safe(info, 'returnOnAssets'))} -- Debt/Equity: {_safe(info, 'debtToEquity', 'N/A')} -- Free Cash Flow: {_safe(info, 'freeCashflow', 'N/A')} -- Current Ratio: {_safe(info, 'currentRatio', 'N/A')} +FINANCIALS (source: yfinance): +- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} (source: yfinance) +- Profit Margins: {_pct(_safe(info, 'profitMargins'))} (source: yfinance) +- Operating Margins: {_pct(_safe(info, 'operatingMargins'))} (source: yfinance) +- ROE: {_pct(_safe(info, 'returnOnEquity'))} (source: yfinance) +- ROA: {_pct(_safe(info, 'returnOnAssets'))} (source: yfinance) +- Debt/Equity: {_safe(info, 'debtToEquity', 'N/A')} (source: yfinance) +- Free Cash Flow: {_safe(info, 'freeCashflow', 'N/A')} (source: yfinance) +- Current Ratio: {_safe(info, 'currentRatio', 'N/A')} (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Score business quality 0-10 based on margins, growth, returns, balance sheet. @@ -82,6 +84,10 @@ INSTRUCTIONS: result = BusinessQualityOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Business quality analysis unavailable", + data_quality_flags=[ + DataFlag(field="business_quality", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) # Override with actual data @@ -129,27 +135,29 @@ Your job: track real smart-money movement — not just static ownership percenta Ticker: {ticker} -OWNERSHIP & VOLUME: -- Institutional Ownership: {data.get('held_percent_institutions', 'N/A')}% -- Insider Ownership: {data.get('held_percent_insiders', 'N/A')}% -- Volume Ratio (10d/avg): {data.get('volume_ratio', 'N/A')} -- Short % of Float: {data.get('short_pct_of_float', 'N/A')}% -- Short Ratio (days): {data.get('short_ratio', 'N/A')} -- Float Turnover 5d: {data.get('float_turnover_5d_pct', 'N/A')}% +OWNERSHIP & VOLUME (source: yfinance): +- Institutional Ownership: {data.get('held_percent_institutions', 'N/A')}% (source: yfinance) +- Insider Ownership: {data.get('held_percent_insiders', 'N/A')}% (source: yfinance) +- Volume Ratio (10d/avg): {data.get('volume_ratio', 'N/A')} (source: yfinance) +- Short % of Float: {data.get('short_pct_of_float', 'N/A')}% (source: yfinance) +- Short Ratio (days): {data.get('short_ratio', 'N/A')} (source: yfinance) +- Float Turnover 5d: {data.get('float_turnover_5d_pct', 'N/A')}% (source: yfinance) -SHORT INTEREST TREND: +SHORT INTEREST TREND (source: yfinance): - Short Interest Change (vs prior month): {data.get('short_interest_change_pct', 'N/A')}% - Short Interest Trend: {data.get('short_interest_trend', 'N/A')} -TOP INSTITUTIONAL HOLDERS (13F): +TOP INSTITUTIONAL HOLDERS (13F, source: yfinance): {chr(10).join(holder_lines) or ' No data available'} - Total top holders tracked: {data.get('top_holders_count', 'N/A')} -INSIDER TRANSACTIONS (recent): +INSIDER TRANSACTIONS (recent, source: yfinance): - Insider Buys: {data.get('insider_buys_recent', 'N/A')} - Insider Sells: {data.get('insider_sells_recent', 'N/A')} - Insider Signal: {data.get('insider_transaction_signal', 'N/A')} +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. + INSTRUCTIONS: 1. Score institutional flow signal 0-10 (this has 15% weight — make it count). High ownership + rising volume + low short interest + insider buying = bullish. @@ -171,6 +179,10 @@ INSTRUCTIONS: result = InstitutionalFlowOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Institutional flow analysis unavailable", + data_quality_flags=[ + DataFlag(field="institutional_flow", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) # Override with actual fetched data @@ -212,16 +224,18 @@ def create_valuation_node(llm): Ticker: {ticker} -VALUATION METRICS: -- Trailing P/E: {data.get('trailing_pe', 'N/A')} -- Forward P/E: {data.get('forward_pe', 'N/A')} -- PEG Ratio: {data.get('peg_ratio', 'N/A')} -- P/B: {data.get('price_to_book', 'N/A')} -- EV/EBITDA: {data.get('ev_to_ebitda', 'N/A')} -- P/S: {data.get('price_to_sales', 'N/A')} -- 52W Range Position: {data.get('vs_52w_range_pct', 'N/A')}% -- Revenue Growth: {data.get('revenue_growth', 'N/A')} -- Earnings Growth: {data.get('earnings_growth', 'N/A')} +VALUATION METRICS (source: yfinance): +- Trailing P/E: {data.get('trailing_pe', 'N/A')} (source: yfinance) +- Forward P/E: {data.get('forward_pe', 'N/A')} (source: yfinance) +- PEG Ratio: {data.get('peg_ratio', 'N/A')} (source: yfinance) +- P/B: {data.get('price_to_book', 'N/A')} (source: yfinance) +- EV/EBITDA: {data.get('ev_to_ebitda', 'N/A')} (source: yfinance) +- P/S: {data.get('price_to_sales', 'N/A')} (source: yfinance) +- 52W Range Position: {data.get('vs_52w_range_pct', 'N/A')}% (source: yfinance) +- Revenue Growth: {data.get('revenue_growth', 'N/A')} (source: yfinance) +- Earnings Growth: {data.get('earnings_growth', 'N/A')} (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Score valuation attractiveness 0-10. @@ -236,6 +250,10 @@ INSTRUCTIONS: result = ValuationOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Valuation analysis unavailable", + data_quality_flags=[ + DataFlag(field="valuation", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) result.trailing_pe = data.get("trailing_pe") @@ -299,19 +317,23 @@ def create_entry_timing_node(llm): if ma50 and ma200: ma_rel = "above" if ma50 > ma200 else "below" + _timing_source = "Alpaca" if price is not None and ma50 is not None else "yfinance" + prompt = f"""You are an Entry Timing Analyst in a structured equity ranking pipeline. Ticker: {ticker} -TECHNICALS: -- Price: ${price or 'N/A'} -- 50-day MA: ${ma50 or 'N/A'} -- 200-day MA: ${ma200 or 'N/A'} +TECHNICALS (source: {_timing_source}): +- Price: ${price or 'N/A'} (source: {_timing_source}) +- 50-day MA: ${ma50 or 'N/A'} (source: {_timing_source}) +- 200-day MA: ${ma200 or 'N/A'} (source: {_timing_source}) - 50d vs 200d: {ma_rel} -- 52W High: ${hi52 or 'N/A'} -- 52W Low: ${lo52 or 'N/A'} +- 52W High: ${hi52 or 'N/A'} (source: {_timing_source}) +- 52W Low: ${lo52 or 'N/A'} (source: {_timing_source}) - Position in 52W Range: {range_pct or 'N/A'}% +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. + INSTRUCTIONS: 1. Score entry timing 0-10. Pullback to support in uptrend = high score. Overextended at highs = low score. @@ -325,6 +347,10 @@ INSTRUCTIONS: result = EntryTimingOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Entry timing analysis unavailable", + data_quality_flags=[ + DataFlag(field="entry_timing", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) result.current_price = price @@ -363,12 +389,14 @@ def create_earnings_revisions_node(llm): Ticker: {ticker} -EARNINGS DATA: -- Trailing EPS: {data.get('trailing_eps', 'N/A')} -- Forward EPS: {data.get('forward_eps', 'N/A')} -- Price Target Upside: {upside or 'N/A'}% -- Price Targets: {json.dumps(targets)[:300] if targets else 'N/A'} -- Recent Recommendations: {len(recs)} entries +EARNINGS DATA (source: yfinance): +- Trailing EPS: {data.get('trailing_eps', 'N/A')} (source: yfinance) +- Forward EPS: {data.get('forward_eps', 'N/A')} (source: yfinance) +- Price Target Upside: {upside or 'N/A'}% (source: yfinance) +- Price Targets: {json.dumps(targets)[:300] if targets else 'N/A'} (source: yfinance) +- Recent Recommendations: {len(recs)} entries (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Score earnings revision momentum 0-10. @@ -385,6 +413,10 @@ INSTRUCTIONS: result = EarningsRevisionOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Earnings revision analysis unavailable", + data_quality_flags=[ + DataFlag(field="earnings_revisions", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) result.trailing_eps = data.get("trailing_eps") @@ -417,10 +449,12 @@ def create_sector_rotation_node(llm): Ticker: {ticker} | Sector: {data.get('sector', 'Unknown')} | Sector ETF: {data.get('sector_etf', 'N/A')} -SECTOR DATA: -- Sector vs SPY 1M: {data.get('stock_sector_vs_spy_1m', 'N/A')}% -- Sector vs SPY 3M: {data.get('stock_sector_vs_spy_3m', 'N/A')}% -- Sector Rank: {data.get('stock_sector_rank', 'N/A')} / {data.get('total_sectors', 11)} +SECTOR DATA (source: yfinance): +- Sector vs SPY 1M: {data.get('stock_sector_vs_spy_1m', 'N/A')}% (source: yfinance) +- Sector vs SPY 3M: {data.get('stock_sector_vs_spy_3m', 'N/A')}% (source: yfinance) +- Sector Rank: {data.get('stock_sector_rank', 'N/A')} / {data.get('total_sectors', 11)} (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Score sector rotation favorability 0-10. @@ -435,6 +469,10 @@ INSTRUCTIONS: result = SectorRotationOutput( score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Sector rotation analysis unavailable", + data_quality_flags=[ + DataFlag(field="sector_rotation", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) result.sector = data.get("sector", "Unknown") @@ -472,10 +510,12 @@ def create_backlog_node(llm): Ticker: {ticker} | Sector: {sector} | Industry: {industry} -AVAILABLE DATA: -- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} -- Earnings Growth: {_pct(_safe(info, 'earningsGrowth'))} -- Revenue: {_safe(info, 'totalRevenue', 'N/A')} +AVAILABLE DATA (source: yfinance): +- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} (source: yfinance) +- Earnings Growth: {_pct(_safe(info, 'earningsGrowth'))} (source: yfinance) +- Revenue: {_safe(info, 'totalRevenue', 'N/A')} (source: yfinance) + +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. INSTRUCTIONS: 1. Assess if this company type typically has meaningful backlog data @@ -489,8 +529,12 @@ INSTRUCTIONS: except Exception as e: logger.warning("Backlog LLM failed: %s", e) result = BacklogOrderMomentumOutput( - score_0_to_10=5.0, confidence_0_to_1=0.3, + score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Backlog analysis limited", + data_quality_flags=[ + DataFlag(field="backlog", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) flags = [f.model_dump() for f in result.data_quality_flags] @@ -526,11 +570,13 @@ def create_crowding_node(llm): Ticker: {ticker} | Company: {card.get('company_name', 'Unknown')} Market Cap Category: {card.get('market_cap_category', 'unknown')} -DATA: -- Short % of Float: {short_pct or 'N/A'}% -- Short Ratio (days): {_safe(info, 'shortRatio', 'N/A')} +DATA (source: yfinance): +- Short % of Float: {short_pct or 'N/A'}% (source: yfinance) +- Short Ratio (days): {_safe(info, 'shortRatio', 'N/A')} (source: yfinance) - Analyst Coverage: implied from market cap ({card.get('market_cap_category', 'unknown')}) +NOTE: If a metric shows 'N/A' or 'unknown', say 'data unavailable' rather than guessing. + INSTRUCTIONS: 1. Score narrative crowding 0-10. HIGH score = low crowding (contrarian, under-followed). @@ -545,8 +591,12 @@ INSTRUCTIONS: except Exception as e: logger.warning("Crowding LLM failed: %s", e) result = NarrativeCrowdingOutput( - score_0_to_10=5.0, confidence_0_to_1=0.3, + score_0_to_10=5.0, confidence_0_to_1=0.1, summary_1_sentence="Crowding analysis limited", + data_quality_flags=[ + DataFlag(field="crowding", severity="moderate", + message="Tier 2 analysis used fallback defaults due to LLM failure") + ], ) flags = [f.model_dump() for f in result.data_quality_flags] diff --git a/tradingagents/agents/structured/tier3.py b/tradingagents/agents/structured/tier3.py index 310eabf4..3f735230 100644 --- a/tradingagents/agents/structured/tier3.py +++ b/tradingagents/agents/structured/tier3.py @@ -23,6 +23,32 @@ from tradingagents.models import ( logger = logging.getLogger(__name__) +def _low_confidence_warnings(state: Dict[str, Any]) -> str: + """Check if any Tier 2 agents have confidence < 0.2 and return warnings.""" + _TIER2_FIELDS = { + "business_quality": "Business Quality", + "institutional_flow": "Institutional Flow", + "valuation": "Valuation", + "entry_timing": "Entry Timing", + "earnings_revisions": "Earnings Revisions", + "sector_rotation": "Sector Rotation", + "backlog": "Backlog / Order Momentum", + "crowding": "Narrative Crowding", + } + warnings = [] + for field, display_name in _TIER2_FIELDS.items(): + agent_data = state.get(field) or {} + conf = agent_data.get("confidence_0_to_1") + if conf is not None and conf < 0.2: + warnings.append( + f" WARNING: {display_name} has low confidence ({conf:.2f}) — " + f"its score may be unreliable (fallback defaults or poor data)" + ) + if warnings: + return "\nDATA QUALITY WARNINGS:\n" + "\n".join(warnings) + "\n" + return "" + + def _summarize_tier2(state: Dict[str, Any]) -> str: """Build a compact summary of all Tier 1+2 findings for Tier 3 prompts.""" card = state.get("company_card") or {} @@ -38,6 +64,9 @@ def _summarize_tier2(state: Dict[str, Any]) -> str: cr = state.get("crowding") or {} arch = state.get("archetype") or {} + # Check for low-confidence Tier 2 agents + confidence_warnings = _low_confidence_warnings(state) + lines = [ f"Company: {card.get('company_name', '?')} ({card.get('ticker', '?')})", f"Sector: {card.get('sector', '?')} | Industry: {card.get('industry', '?')}", @@ -67,6 +96,12 @@ def _summarize_tier2(state: Dict[str, Any]) -> str: f" Short Trend: {inst.get('short_interest_trend', '?')} | Insider Signal: {inst.get('insider_transaction_signal', '?')}", f" Timing: {et.get('timing_verdict', '?')}", ] + + if confidence_warnings: + lines.append("") + lines.append(confidence_warnings) + lines.append("Factor these warnings into your analysis — low-confidence scores may not reflect reality.") + return "\n".join(lines) diff --git a/tradingagents/models.py b/tradingagents/models.py index 16311b9b..ea0aedf0 100644 --- a/tradingagents/models.py +++ b/tradingagents/models.py @@ -11,7 +11,7 @@ import json import logging from typing import List, Literal, Optional, Tuple -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ValidationError logger = logging.getLogger(__name__) @@ -449,21 +449,65 @@ def should_hard_veto( # LLM structured output helper # --------------------------------------------------------------------------- -def invoke_structured(llm, model_cls, prompt: str): - """Call LLM with structured output, with JSON fallback.""" - try: +def invoke_structured(llm, model_cls, prompt: str, timeout: int = 60): + """Call LLM with structured output, with JSON fallback. + + Each LLM call is wrapped in a per-call timeout (default 60s) to avoid + hanging on a single call while the global 60-minute analysis timeout + covers the entire pipeline. + """ + import concurrent.futures + + def _call_structured(): structured = llm.with_structured_output(model_cls) return structured.invoke(prompt) - except Exception as e: - logger.warning("Structured output failed for %s: %s — using JSON fallback", model_cls.__name__, e) + + def _call_json_fallback(): schema_str = json.dumps(model_cls.model_json_schema(), indent=2) json_prompt = ( f"{prompt}\n\nReturn ONLY valid JSON matching this schema:\n{schema_str}" ) - response = llm.invoke(json_prompt) - content = response.content.strip() - if "```json" in content: - content = content.split("```json")[1].split("```")[0].strip() - elif "```" in content: - content = content.split("```")[1].split("```")[0].strip() + return llm.invoke(json_prompt) + + # Try structured output with per-call timeout + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(_call_structured) + return future.result(timeout=timeout) + except concurrent.futures.TimeoutError: + logger.warning("Structured output timed out after %ds for %s", timeout, model_cls.__name__) + raise TimeoutError(f"LLM call timed out after {timeout}s for {model_cls.__name__}") + except Exception as e: + logger.warning("Structured output failed for %s: %s — using JSON fallback", model_cls.__name__, e) + + # JSON fallback with per-call timeout + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(_call_json_fallback) + response = future.result(timeout=timeout) + except concurrent.futures.TimeoutError: + logger.warning("JSON fallback timed out after %ds for %s", timeout, model_cls.__name__) + raise TimeoutError(f"LLM JSON fallback timed out after {timeout}s for {model_cls.__name__}") + + content = response.content.strip() + if "```json" in content: + content = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + content = content.split("```")[1].split("```")[0].strip() + try: return model_cls.model_validate_json(content) + except ValidationError as ve: + logger.error( + "JSON fallback validation failed for %s: %s — raw text: %.500s", + model_cls.__name__, ve, content, + ) + # Return minimal defaults so the pipeline keeps running + defaults = {} + if hasattr(model_cls, "model_fields"): + if "score_0_to_10" in model_cls.model_fields: + defaults["score_0_to_10"] = 5.0 + if "confidence_0_to_1" in model_cls.model_fields: + defaults["confidence_0_to_1"] = 0.1 + if "summary_1_sentence" in model_cls.model_fields: + defaults["summary_1_sentence"] = f"{model_cls.__name__} parsing failed — using fallback defaults" + return model_cls(**defaults)