diff --git a/app.py b/app.py index f68c4bd9..9d8d5727 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,4 @@ -"""FastAPI SSE backend for TradingAgents.""" +"""FastAPI SSE backend for the structured equity ranking engine.""" import os import time @@ -21,14 +21,8 @@ if not os.environ.get("OPENAI_API_KEY"): from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.default_config import DEFAULT_CONFIG -from cli.stats_handler import StatsCallbackHandler -from cli.main import ( - MessageBuffer, - classify_message_type, - update_analyst_statuses, -) -app = FastAPI(title="TradingAgents API") +app = FastAPI(title="TradingAgents Structured Pipeline") # --- CORS --- _cors_env = os.getenv("CORS_ORIGINS", "") @@ -40,13 +34,13 @@ app.add_middleware( allow_headers=["*"], ) -# --- Auth dependency --- +# --- Auth --- _API_KEY = os.getenv("AGENTS_API_KEY", "") async def verify_api_key(request: Request): if not _API_KEY: - return # dev mode — no auth + return auth = request.headers.get("Authorization", "") if auth != f"Bearer {_API_KEY}": raise HTTPException(401, "Invalid or missing API key") @@ -56,7 +50,6 @@ async def verify_api_key(request: Request): MAX_CONCURRENT = int(os.getenv("MAX_CONCURRENT_ANALYSES", "3")) _semaphore = asyncio.Semaphore(MAX_CONCURRENT) -# Active analysis state: id -> {queue, events (replay buffer), done, created_at} analyses: dict[str, dict] = {} @@ -66,7 +59,7 @@ class AnalyzeRequest(BaseModel): def build_config(): - """Build TradingAgents config — uses Groq (OpenAI-compatible) by default.""" + """Build TradingAgents config from env vars.""" config = DEFAULT_CONFIG.copy() config["llm_provider"] = os.getenv("LLM_PROVIDER", "openai") config["deep_think_llm"] = os.getenv("DEEP_THINK_MODEL", "deepseek-v3.1:671b-cloud") @@ -80,237 +73,164 @@ def build_config(): "fundamental_data": "yfinance", "news_data": "yfinance", } - config["parallel_analysts"] = True - print(f"[CONFIG] provider={config['llm_provider']}, deep={config['deep_think_llm']}, quick={config['quick_think_llm']}, url={config['backend_url']}", flush=True) + print( + f"[CONFIG] provider={config['llm_provider']}, " + f"deep={config['deep_think_llm']}, " + f"quick={config['quick_think_llm']}, " + f"url={config['backend_url']}", + flush=True, + ) return config -def get_stats_dict(stats_handler, buf, start_time): - """Build stats dict for SSE events.""" - s = stats_handler.get_stats() - agents_done = sum(1 for v in buf.agent_status.values() if v == "completed") - elapsed = time.time() - start_time - return { - "agents_done": agents_done, - "agents_total": len(buf.agent_status), - "llm_calls": s["llm_calls"], - "tool_calls": s["tool_calls"], - "tokens_in": s["tokens_in"], - "tokens_out": s["tokens_out"], - "reports_done": buf.get_completed_reports_count(), - "reports_total": len(buf.report_sections), - "elapsed": round(elapsed, 1), - } +# --------------------------------------------------------------------------- +# Stage/agent mapping for SSE events +# --------------------------------------------------------------------------- + +# Maps state field → (agent display name, pipeline stage) +FIELD_AGENT_MAP = { + "validation": ("Validation", "validation"), + "company_card": ("Company Card", "validation"), + "macro": ("Macro Regime", "tier1"), + "liquidity": ("Liquidity", "tier1"), + "business_quality": ("Business Quality", "tier2"), + "institutional_flow": ("Institutional Flow", "tier2"), + "valuation": ("Valuation", "tier2"), + "entry_timing": ("Entry Timing", "tier2"), + "earnings_revisions": ("Earnings Revisions", "tier2"), + "sector_rotation": ("Sector Rotation", "tier2"), + "backlog": ("Backlog / Order Momentum", "tier2"), + "crowding": ("Narrative Crowding", "tier2"), + "archetype": ("Archetype", "scoring"), + "master_score": ("Master Score", "scoring"), + "bull_case": ("Bull Researcher", "debate"), + "bear_case": ("Bear Researcher", "debate"), + "debate": ("Debate Referee", "debate"), + "risk": ("Risk / Invalidation", "decision"), + "final_decision": ("Final Decision", "decision"), +} + +ALL_AGENTS = [name for name, _ in FIELD_AGENT_MAP.values()] +ALL_STAGES = ["validation", "tier1", "tier2", "scoring", "debate", "decision"] -def _agent_stage(agent_name): - """Map agent name to pipeline stage.""" - if agent_name in ("Market Analyst", "Social Analyst", "News Analyst", "Fundamentals Analyst"): - return "analysts" - if agent_name in ("Bull Researcher", "Bear Researcher", "Research Manager"): - return "research" - if agent_name == "Trader": - return "trading" - if agent_name in ("Aggressive Analyst", "Conservative Analyst", "Neutral Analyst"): - return "risk" - if agent_name == "Portfolio Manager": - return "decision" - return "unknown" - +# --------------------------------------------------------------------------- +# Analysis runner +# --------------------------------------------------------------------------- async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): - """Core analysis logic.""" + """Core analysis logic — streams structured pipeline state changes as SSE.""" state = analyses[analysis_id] q = state["queue"] config = build_config() - stats_handler = StatsCallbackHandler() - selected_analysts = ["market", "social", "news", "fundamentals"] try: - graph = TradingAgentsGraph( - selected_analysts=selected_analysts, - debug=False, - config=config, - callbacks=[stats_handler], + graph = TradingAgentsGraph(debug=False, config=config) + print( + f"[ANALYSIS] LLM types: deep={type(graph.deep_thinking_llm).__name__}, " + f"quick={type(graph.quick_thinking_llm).__name__}", + flush=True, ) - print(f"[ANALYSIS] LLM types: deep={type(graph.deep_thinking_llm).__name__}, quick={type(graph.quick_thinking_llm).__name__}", flush=True) except Exception as e: print(f"[ANALYSIS] Init failed: {e}\n{_tb.format_exc()}", flush=True) await q.put({"type": "error", "message": f"Init failed: {e}"}) await q.put(None) return - buf = MessageBuffer() - buf.init_for_analysis(selected_analysts) - init_state = graph.propagator.create_initial_state(ticker, trade_date) - args = graph.propagator.get_graph_args(callbacks=[stats_handler]) + init_state = graph._create_initial_state(ticker, trade_date) start_time = time.time() - - emitted_reports = set() - research_emitted = False - trader_emitted = False - risk_emitted = False + emitted_fields = set() + prev_agent_statuses = {} final_state = None - prev_statuses = {} - # Emit all analysts as "in_progress" immediately (they run in parallel) - analyst_name_map = { - "market": "Market Analyst", - "social": "Social Analyst", - "news": "News Analyst", - "fundamentals": "Fundamentals Analyst", - } - for analyst_type in selected_analysts: - agent_name = analyst_name_map[analyst_type] - buf.update_agent_status(agent_name, "in_progress") - st = get_stats_dict(stats_handler, buf, start_time) + # Emit initial status: all agents pending + for field, (agent_name, stage) in FIELD_AGENT_MAP.items(): + prev_agent_statuses[field] = "pending" evt = { "type": "agent_update", "agent": agent_name, - "stage": "analysts", - "status": "in_progress", - "stats": st, + "stage": stage, + "status": "pending", + "stats": _stats(start_time, emitted_fields), } state["events"].append(evt) await q.put(evt) - prev_statuses[agent_name] = "in_progress" try: - async for chunk in graph.graph.astream(init_state, **args): + async for chunk in graph.graph.astream( + init_state, + stream_mode="values", + config={"recursion_limit": 50}, + ): final_state = chunk - # Process messages (same logic as Chainlit app) - if chunk.get("messages") and len(chunk["messages"]) > 0: - last_msg = chunk["messages"][-1] - msg_id = getattr(last_msg, "id", None) - if msg_id != buf._last_message_id: - buf._last_message_id = msg_id - msg_type, content = classify_message_type(last_msg) - if content and content.strip(): - buf.add_message(msg_type, content) - if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: - for tc in last_msg.tool_calls: - if isinstance(tc, dict): - buf.add_tool_call(tc["name"], tc["args"]) - else: - buf.add_tool_call(tc.name, tc.args) + # Detect newly populated fields + for field, (agent_name, stage) in FIELD_AGENT_MAP.items(): + if field in emitted_fields: + continue - update_analyst_statuses(buf, chunk) - st = get_stats_dict(stats_handler, buf, start_time) + value = chunk.get(field) + if value is None: + continue - # Emit agent status changes only (avoid flooding) - for agent, status in buf.agent_status.items(): - if prev_statuses.get(agent) != status: - prev_statuses[agent] = status - evt = { - "type": "agent_update", - "agent": agent, - "stage": _agent_stage(agent), - "status": status, - "stats": st, - } - state["events"].append(evt) - await q.put(evt) + emitted_fields.add(field) + st = _stats(start_time, emitted_fields) - # Analyst reports - report_map = { - "market_report": ("Market Analyst", "analysts"), - "sentiment_report": ("Social Analyst", "analysts"), - "news_report": ("News Analyst", "analysts"), - "fundamentals_report": ("Fundamentals Analyst", "analysts"), - } - for field, (agent_name, stage) in report_map.items(): - if field not in emitted_reports and chunk.get(field): - emitted_reports.add(field) + # Mark this agent completed + prev_agent_statuses[field] = "completed" + evt = { + "type": "agent_update", + "agent": agent_name, + "stage": stage, + "status": "completed", + "stats": st, + } + state["events"].append(evt) + await q.put(evt) + + # Emit report data for key fields + if field in ("validation", "company_card"): evt = { "type": "report", "agent": agent_name, "stage": stage, "field": field, - "report": chunk[field], + "report": _format_report(field, value), "stats": st, } state["events"].append(evt) await q.put(evt) - # Research debate - if chunk.get("investment_debate_state") and not research_emitted: - debate = chunk["investment_debate_state"] - bull = debate.get("bull_history", "").strip() - bear = debate.get("bear_history", "").strip() - judge = debate.get("judge_decision", "").strip() - - if bull or bear: - for a in ("Bull Researcher", "Bear Researcher", "Research Manager"): - buf.update_agent_status(a, "in_progress") - - if judge: - research_emitted = True - for a in ("Bull Researcher", "Bear Researcher", "Research Manager"): - buf.update_agent_status(a, "completed") - buf.update_agent_status("Trader", "in_progress") - buf.update_report_section("investment_plan", judge) + elif field == "debate": + bull = chunk.get("bull_case") or {} + bear = chunk.get("bear_case") or {} evt = { "type": "debate", - "stage": "research", - "bull": bull, - "bear": bear, - "judge": judge, - "stats": get_stats_dict(stats_handler, buf, start_time), + "stage": "debate", + "bull": bull.get("thesis", ""), + "bear": bear.get("thesis", ""), + "judge": (value or {}).get("reasoning", ""), + "winner": (value or {}).get("winner", ""), + "stats": st, } state["events"].append(evt) await q.put(evt) - # Trader plan - if chunk.get("trader_investment_plan") and not trader_emitted: - trader_emitted = True - buf.update_agent_status("Trader", "completed") - buf.update_agent_status("Aggressive Analyst", "in_progress") - buf.update_agent_status("Conservative Analyst", "in_progress") - buf.update_agent_status("Neutral Analyst", "in_progress") - buf.update_report_section("trader_investment_plan", chunk["trader_investment_plan"]) - evt = { - "type": "trader", - "stage": "trading", - "plan": chunk["trader_investment_plan"], - "stats": get_stats_dict(stats_handler, buf, start_time), - } - state["events"].append(evt) - await q.put(evt) - - # Risk debate - if chunk.get("risk_debate_state") and not risk_emitted: - risk = chunk["risk_debate_state"] - agg = risk.get("aggressive_history", "").strip() - con = risk.get("conservative_history", "").strip() - neu = risk.get("neutral_history", "").strip() - judge = risk.get("judge_decision", "").strip() - - if agg: - buf.update_agent_status("Aggressive Analyst", "in_progress") - if con: - buf.update_agent_status("Conservative Analyst", "in_progress") - if neu: - buf.update_agent_status("Neutral Analyst", "in_progress") - - if judge: - risk_emitted = True - buf.update_agent_status("Aggressive Analyst", "completed") - buf.update_agent_status("Conservative Analyst", "completed") - buf.update_agent_status("Neutral Analyst", "completed") - buf.update_agent_status("Portfolio Manager", "completed") + elif field == "master_score": evt = { - "type": "risk", - "stage": "risk", - "aggressive": agg, - "conservative": con, - "neutral": neu, - "judge": judge, - "stats": get_stats_dict(stats_handler, buf, start_time), + "type": "score", + "stage": "scoring", + "master_score": value, + "adjusted_score": chunk.get("adjusted_score"), + "position_role": chunk.get("position_role"), + "stats": st, } state["events"].append(evt) await q.put(evt) + # Mark in-progress agents for upcoming stages + _update_in_progress(chunk, emitted_fields, prev_agent_statuses, state, q, start_time) + except Exception as e: print(f"[ANALYSIS] Stream error: {e}\n{_tb.format_exc()}", flush=True) evt = {"type": "error", "message": str(e)} @@ -320,49 +240,112 @@ async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): await q.put(None) return - # Final decision + # Final decision event if final_state: - decision_text = final_state.get("final_trade_decision", "No decision reached.") - signal = graph.process_signal(decision_text) - buf.update_report_section("final_trade_decision", decision_text) - for agent in buf.agent_status: - buf.update_agent_status(agent, "completed") - st = get_stats_dict(stats_handler, buf, start_time) - for agent, status in buf.agent_status.items(): - if prev_statuses.get(agent) != "completed": - prev_statuses[agent] = "completed" + decision = final_state.get("final_decision") or {} + st = _stats(start_time, emitted_fields) + + # Mark all remaining as completed + for field in FIELD_AGENT_MAP: + if prev_agent_statuses.get(field) != "completed": + agent_name, stage = FIELD_AGENT_MAP[field] + prev_agent_statuses[field] = "completed" evt = { "type": "agent_update", - "agent": agent, - "stage": _agent_stage(agent), + "agent": agent_name, + "stage": stage, "status": "completed", "stats": st, } state["events"].append(evt) await q.put(evt) + evt = { "type": "decision", "stage": "decision", - "signal": signal, - "decision_text": decision_text, + "signal": decision.get("action", "AVOID"), + "decision_text": decision.get("narrative", ""), + "master_score": final_state.get("master_score"), + "adjusted_score": final_state.get("adjusted_score"), + "position_role": final_state.get("position_role"), + "final_decision": decision, "stats": st, } state["events"].append(evt) await q.put(evt) state["done"] = True - await q.put(None) # sentinel — stream done + await q.put(None) + + +async def _update_in_progress(chunk, emitted, statuses, state, q, start_time): + """Heuristic: mark agents as in_progress based on stage progression.""" + # If validation is done, mark tier 1 as in_progress + if "validation" in emitted: + for field in ("macro", "liquidity"): + if field not in emitted and statuses.get(field) == "pending": + statuses[field] = "in_progress" + agent_name, stage = FIELD_AGENT_MAP[field] + evt = { + "type": "agent_update", + "agent": agent_name, + "stage": stage, + "status": "in_progress", + "stats": _stats(start_time, emitted), + } + state["events"].append(evt) + await q.put(evt) + + # If tier 1 done, mark tier 2 in_progress + if "macro" in emitted and "liquidity" in emitted: + tier2_fields = [ + "business_quality", "institutional_flow", "valuation", + "entry_timing", "earnings_revisions", "sector_rotation", + "backlog", "crowding", + ] + for field in tier2_fields: + if field not in emitted and statuses.get(field) == "pending": + statuses[field] = "in_progress" + agent_name, stage = FIELD_AGENT_MAP[field] + evt = { + "type": "agent_update", + "agent": agent_name, + "stage": stage, + "status": "in_progress", + "stats": _stats(start_time, emitted), + } + state["events"].append(evt) + await q.put(evt) + + +def _stats(start_time: float, emitted_fields: set) -> dict: + return { + "agents_done": len(emitted_fields), + "agents_total": len(FIELD_AGENT_MAP), + "elapsed": round(time.time() - start_time, 1), + } + + +def _format_report(field: str, value) -> str: + """Format a state field value as a readable report string.""" + if isinstance(value, dict): + if "summary_1_sentence" in value: + return value["summary_1_sentence"] + if "company_name" in value: + return f"{value.get('company_name', '')} ({value.get('ticker', '')}) — {value.get('sector', '')} / {value.get('industry', '')}" + return json.dumps(value, indent=2, default=str)[:500] + return str(value)[:500] async def run_analysis(analysis_id: str, ticker: str, trade_date: str): - """Background task: acquires semaphore, runs analysis with timeout.""" + """Background task with semaphore and timeout.""" state = analyses[analysis_id] q = state["queue"] async with _semaphore: try: await asyncio.wait_for( _run_analysis_inner(analysis_id, ticker, trade_date), - timeout=3600, # 60 minutes + timeout=3600, ) except asyncio.TimeoutError: print(f"[ANALYSIS] Timeout for {analysis_id}", flush=True) @@ -373,9 +356,8 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): await q.put(None) -# --- Memory cleanup background task --- +# --- Cleanup --- async def _cleanup_loop(): - """Remove analyses older than 30 minutes every 5 minutes.""" while True: await asyncio.sleep(300) now = time.time() @@ -412,22 +394,19 @@ async def start_analysis(req: AnalyzeRequest): @app.get("/analyze/{analysis_id}/stream", dependencies=[Depends(verify_api_key)]) async def stream_analysis(analysis_id: str, last_event: int = 0): - """Stream SSE events. Supports reconnection via ?last_event=N to replay missed events.""" + """Stream SSE events. Supports reconnection via ?last_event=N.""" if analysis_id not in analyses: raise HTTPException(404, "Analysis not found") state = analyses[analysis_id] async def event_generator(): idx = last_event - # Replay any events the client missed while idx < len(state["events"]): evt = state["events"][idx] idx += 1 yield {"id": str(idx), "data": json.dumps(evt)} - # If analysis already done after replay, stop if state["done"]: return - # Stream new events from queue q = state["queue"] while True: try: @@ -445,4 +424,4 @@ async def stream_analysis(analysis_id: str, last_event: int = 0): @app.get("/health") async def health(): - return {"status": "ok"} + return {"status": "ok", "engine": "structured_pipeline"} diff --git a/tradingagents/agents/structured/__init__.py b/tradingagents/agents/structured/__init__.py new file mode 100644 index 00000000..521fd573 --- /dev/null +++ b/tradingagents/agents/structured/__init__.py @@ -0,0 +1,47 @@ +"""Structured output agents for the equity ranking engine.""" + +from .tier1 import ( + create_validation_node, + create_macro_node, + create_liquidity_node, +) +from .tier2 import ( + create_business_quality_node, + create_institutional_flow_node, + create_valuation_node, + create_entry_timing_node, + create_earnings_revisions_node, + create_sector_rotation_node, + create_backlog_node, + create_crowding_node, + create_archetype_node, +) +from .tier3 import ( + create_bull_case_node, + create_bear_case_node, + create_debate_node, + create_risk_node, + create_final_decision_node, +) +from .scoring import create_scoring_node + +__all__ = [ + "create_validation_node", + "create_macro_node", + "create_liquidity_node", + "create_business_quality_node", + "create_institutional_flow_node", + "create_valuation_node", + "create_entry_timing_node", + "create_earnings_revisions_node", + "create_sector_rotation_node", + "create_backlog_node", + "create_crowding_node", + "create_archetype_node", + "create_bull_case_node", + "create_bear_case_node", + "create_debate_node", + "create_risk_node", + "create_final_decision_node", + "create_scoring_node", +] diff --git a/tradingagents/agents/structured/scoring.py b/tradingagents/agents/structured/scoring.py new file mode 100644 index 00000000..0e6975bf --- /dev/null +++ b/tradingagents/agents/structured/scoring.py @@ -0,0 +1,53 @@ +"""Deterministic scoring node — no LLM, pure Python. + +Computes master_score, applies confidence penalties, checks hard vetoes, +and assigns position roles. This is the heart of the deterministic pipeline. +""" + +from __future__ import annotations + +from typing import Any, Dict + +from tradingagents.models import ( + DataFlag, + apply_confidence_penalty, + assign_position_role, + compute_master_score, +) + + +def create_scoring_node(): + """Create the deterministic scoring node (no LLM needed).""" + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + # Extract scores from each agent output + bq = (state.get("business_quality") or {}).get("score_0_to_10", 5.0) + macro = (state.get("macro") or {}).get("macro_alignment_0_to_10", 5.0) + inst = (state.get("institutional_flow") or {}).get("score_0_to_10", 5.0) + val = (state.get("valuation") or {}).get("score_0_to_10", 5.0) + et = (state.get("entry_timing") or {}).get("score_0_to_10", 5.0) + er = (state.get("earnings_revisions") or {}).get("score_0_to_10", 5.0) + bl = (state.get("backlog") or {}).get("score_0_to_10", 5.0) + cr = (state.get("crowding") or {}).get("score_0_to_10", 5.0) + + master = compute_master_score(bq, macro, inst, val, et, er, bl, cr) + + # Collect all data quality flags + all_flags = [] + for f in (state.get("global_flags") or []): + if isinstance(f, dict): + all_flags.append(DataFlag(**f)) + elif isinstance(f, DataFlag): + all_flags.append(f) + + hard_veto = state.get("hard_veto", False) + adjusted = apply_confidence_penalty(master, all_flags, hard_veto) + role = assign_position_role(adjusted) + + return { + "master_score": master, + "adjusted_score": adjusted, + "position_role": role, + } + + return node diff --git a/tradingagents/agents/structured/tier1.py b/tradingagents/agents/structured/tier1.py new file mode 100644 index 00000000..a0e6bdd3 --- /dev/null +++ b/tradingagents/agents/structured/tier1.py @@ -0,0 +1,261 @@ +"""Tier 1 agents: Validation, Macro Regime, Liquidity. + +Tier 1 is cheap and fast — runs on every stock. Validation is deterministic +(no LLM). Macro and Liquidity use the quick-thinking LLM. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict + +import yfinance as yf + +from tradingagents.models import ( + CompanyCard, + DataFlag, + LiquidityOutput, + MacroRegimeOutput, + ValidationOutput, + invoke_structured, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _fmt_num(val): + if val is None: + return None + if abs(val) >= 1e12: + return f"${val / 1e12:.2f}T" + if abs(val) >= 1e9: + return f"${val / 1e9:.2f}B" + if abs(val) >= 1e6: + return f"${val / 1e6:.2f}M" + return f"${val:,.0f}" + + +def _fetch_yf_info(ticker: str) -> dict: + """Fetch yfinance info dict for a ticker.""" + try: + t = yf.Ticker(ticker.upper()) + return t.info or {} + except Exception as e: + logger.warning("yfinance fetch failed for %s: %s", ticker, e) + return {} + + +def _fetch_macro_data() -> dict: + """Fetch macro indicators via yfinance.""" + from tradingagents.dataflows.y_finance import get_macro_indicators + + try: + raw = get_macro_indicators() + return json.loads(raw) if isinstance(raw, str) else raw + except Exception as e: + logger.warning("Macro data fetch failed: %s", e) + return {} + + +# --------------------------------------------------------------------------- +# Validation (deterministic — no LLM) +# --------------------------------------------------------------------------- + +def create_validation_node(llm=None): + """Validation + CompanyCard node. Does NOT use LLM — purely data-driven.""" + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + info = _fetch_yf_info(ticker) + + # No data at all → hard veto + company_name = info.get("longName") or info.get("shortName") or "" + if not company_name: + v = ValidationOutput( + ticker_valid=False, + ticker_resolved=ticker.upper(), + company_name="", + veto=True, + veto_reason=f"No company data found for {ticker}", + data_quality_flags=[ + DataFlag(field="ticker", severity="severe", + message=f"No data for {ticker}") + ], + ) + return { + "validation": v.model_dump(), + "hard_veto": True, + "hard_veto_reason": v.veto_reason, + "global_flags": [ + DataFlag(field="ticker", severity="severe", + message=f"No data for {ticker}").model_dump() + ], + } + + validation = ValidationOutput( + ticker_valid=True, + ticker_resolved=ticker.upper(), + company_name=company_name, + company_name_match=True, + exchange=info.get("exchange"), + sector=info.get("sector"), + industry=info.get("industry"), + is_active=True, + ) + + # Build company card + mc = info.get("marketCap") + if mc and mc >= 10e9: + mc_cat = "large_cap" + elif mc and mc >= 2e9: + mc_cat = "mid_cap" + elif mc and mc >= 300e6: + mc_cat = "small_cap" + else: + mc_cat = "micro_cap" if mc else "unknown" + + card = CompanyCard( + company_name=company_name, + ticker=ticker.upper(), + sector=info.get("sector", "Unknown"), + industry=info.get("industry", "Unknown"), + description=(info.get("longBusinessSummary") or "")[:500], + market_cap=mc, + market_cap_formatted=_fmt_num(mc), + market_cap_category=mc_cat, + current_price=info.get("currentPrice") or info.get("regularMarketPrice"), + revenue=info.get("totalRevenue"), + profit_margins=info.get("profitMargins"), + employees=info.get("fullTimeEmployees"), + ) + + return { + "validation": validation.model_dump(), + "company_card": card.model_dump(), + } + + return node + + +# --------------------------------------------------------------------------- +# Macro Regime +# --------------------------------------------------------------------------- + +def create_macro_node(llm): + """Macro regime analysis node — uses quick LLM.""" + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + macro_data = _fetch_macro_data() + card = state.get("company_card") or {} + sector = card.get("sector", "Unknown") + + spy_perf = (macro_data.get("sector_performance") or {}).get("SPY", {}) + sector_perfs = macro_data.get("sector_performance") or {} + + # Build compact sector table + sector_lines = [] + for etf, data in sorted(sector_perfs.items()): + r1 = data.get("return_1m") + name = data.get("name", etf) + if r1 is not None: + sector_lines.append(f" {etf} ({name}): {r1:+.1f}% 1M") + + prompt = f"""You are a Macro Regime Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Sector: {sector} + +MACRO DATA: +- VIX: {macro_data.get('vix_level', 'N/A')} +- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% +- Dollar 1M: {macro_data.get('dollar_1m_return', 'N/A')}% +- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} +- SPY 1M: {spy_perf.get('return_1m', 'N/A')}% + +SECTOR PERFORMANCE (1M): +{chr(10).join(sector_lines[:12]) or 'N/A'} + +INSTRUCTIONS: +1. Classify the macro regime (risk-on / risk-off / transitional). +2. Score macro_alignment_0_to_10: how well the current macro supports {ticker} specifically. + - 0 = macro is hostile to this stock; 10 = macro tailwinds are perfect. +3. Also provide the standard score_0_to_10 (overall macro health). +4. List key positives, negatives, risks for the macro environment. +5. Be concise. One sentence summary.""" + + try: + result = invoke_structured(llm, MacroRegimeOutput, prompt) + except Exception as e: + logger.warning("Macro LLM call failed: %s", e) + result = MacroRegimeOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Macro analysis unavailable", + data_quality_flags=[ + DataFlag(field="macro", severity="moderate", message=str(e)) + ], + ) + + # Override with actual fetched data + result.vix_level = macro_data.get("vix_level") + result.vix_regime = macro_data.get("vix_regime", "unknown") + result.ten_year_yield = macro_data.get("ten_year_yield") + result.dollar_strength = macro_data.get("dollar_strength", "unknown") + result.credit_spread_direction = macro_data.get( + "credit_spread_direction", "unknown" + ) + result.spy_1m_return = spy_perf.get("return_1m") + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"macro": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Liquidity +# --------------------------------------------------------------------------- + +def create_liquidity_node(llm): + """Liquidity analysis node — uses quick LLM.""" + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + macro_data = _fetch_macro_data() + card = state.get("company_card") or {} + + prompt = f"""You are a Liquidity Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Sector: {card.get('sector', 'Unknown')} + +AVAILABLE DATA: +- VIX: {macro_data.get('vix_level', 'N/A')} +- 10Y Yield: {macro_data.get('ten_year_yield', 'N/A')}% +- Credit Spreads: {macro_data.get('credit_spread_direction', 'N/A')} +- Dollar Strength: {macro_data.get('dollar_strength', 'N/A')} + +INSTRUCTIONS: +1. Assess Fed stance (dovish / neutral / hawkish) based on yield environment. +2. Assess market breadth (strong / moderate / weak). +3. Assess volume profile (above_average / average / below_average). +4. Assess SPY trend (uptrend / downtrend / sideways). +5. Score overall liquidity favorability 0-10 for this stock. +6. Be concise.""" + + try: + result = invoke_structured(llm, LiquidityOutput, prompt) + except Exception as e: + logger.warning("Liquidity LLM call failed: %s", e) + result = LiquidityOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Liquidity analysis unavailable", + ) + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"liquidity": result.model_dump(), "global_flags": flags} + + return node diff --git a/tradingagents/agents/structured/tier2.py b/tradingagents/agents/structured/tier2.py new file mode 100644 index 00000000..38acd66b --- /dev/null +++ b/tradingagents/agents/structured/tier2.py @@ -0,0 +1,542 @@ +"""Tier 2 agents: Deep analysis that runs only on Tier 1 survivors. + +Each agent fetches its own data via yfinance, calls the LLM once with +structured output, and returns a typed result into PipelineState. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict + +import yfinance as yf + +from tradingagents.models import ( + ArchetypeOutput, + BacklogOrderMomentumOutput, + BusinessQualityOutput, + DataFlag, + EarningsRevisionOutput, + EntryTimingOutput, + InstitutionalFlowOutput, + NarrativeCrowdingOutput, + SectorRotationOutput, + ValuationOutput, + invoke_structured, +) + +logger = logging.getLogger(__name__) + + +def _safe(info, key, default=None): + v = info.get(key) + return default if v is None else v + + +def _pct(v): + return f"{v * 100:.1f}%" if v is not None else "N/A" + + +# --------------------------------------------------------------------------- +# Business Quality +# --------------------------------------------------------------------------- + +def create_business_quality_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + card = state.get("company_card") or {} + + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + except Exception: + info = {} + + prompt = f"""You are a Business Quality Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Sector: {card.get('sector', 'Unknown')} | Industry: {card.get('industry', 'Unknown')} +Market Cap: {card.get('market_cap_formatted', 'N/A')} + +FINANCIALS: +- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} +- Profit Margins: {_pct(_safe(info, 'profitMargins'))} +- Operating Margins: {_pct(_safe(info, 'operatingMargins'))} +- ROE: {_pct(_safe(info, 'returnOnEquity'))} +- ROA: {_pct(_safe(info, 'returnOnAssets'))} +- Debt/Equity: {_safe(info, 'debtToEquity', 'N/A')} +- Free Cash Flow: {_safe(info, 'freeCashflow', 'N/A')} +- Current Ratio: {_safe(info, 'currentRatio', 'N/A')} + +INSTRUCTIONS: +1. Score business quality 0-10 based on margins, growth, returns, balance sheet. +2. Classify competitive moat: wide / narrow / none. +3. Classify management quality: strong / adequate / weak. +4. List positives, negatives, risks. Be concise.""" + + try: + result = invoke_structured(llm, BusinessQualityOutput, prompt) + except Exception as e: + logger.warning("BusinessQuality LLM failed: %s", e) + result = BusinessQualityOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Business quality analysis unavailable", + ) + + # Override with actual data + result.revenue_growth = _safe(info, "revenueGrowth") + result.profit_margins = _safe(info, "profitMargins") + result.operating_margins = _safe(info, "operatingMargins") + result.return_on_equity = _safe(info, "returnOnEquity") + result.return_on_assets = _safe(info, "returnOnAssets") + result.debt_to_equity = _safe(info, "debtToEquity") + result.free_cashflow = _safe(info, "freeCashflow") + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"business_quality": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Institutional Flow +# --------------------------------------------------------------------------- + +def create_institutional_flow_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + + from tradingagents.dataflows.y_finance import get_institutional_flow + try: + raw = get_institutional_flow(ticker) + data = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + data = {} + + prompt = f"""You are an Institutional Flow Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} + +FLOW DATA: +- Institutional Ownership: {data.get('held_percent_institutions', 'N/A')}% +- Insider Ownership: {data.get('held_percent_insiders', 'N/A')}% +- Volume Ratio (10d/avg): {data.get('volume_ratio', 'N/A')} +- Short % of Float: {data.get('short_pct_of_float', 'N/A')}% +- Short Ratio (days): {data.get('short_ratio', 'N/A')} +- Float Turnover 5d: {data.get('float_turnover_5d_pct', 'N/A')}% + +INSTRUCTIONS: +1. Score institutional flow signal 0-10. + High ownership + rising volume + low short interest = bullish. +2. Classify accumulation_signal: accumulating / distributing / neutral. +3. This score has 15% weight in the master score — make it count.""" + + try: + result = invoke_structured(llm, InstitutionalFlowOutput, prompt) + except Exception as e: + logger.warning("InstitutionalFlow LLM failed: %s", e) + result = InstitutionalFlowOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Institutional flow analysis unavailable", + ) + + result.institutional_ownership_pct = data.get("held_percent_institutions") + result.insider_ownership_pct = data.get("held_percent_insiders") + result.volume_ratio = data.get("volume_ratio") + result.short_interest_pct = data.get("short_pct_of_float") + result.short_ratio = data.get("short_ratio") + result.float_turnover_pct = data.get("float_turnover_5d_pct") + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"institutional_flow": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Valuation +# --------------------------------------------------------------------------- + +def create_valuation_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + + from tradingagents.dataflows.y_finance import get_valuation_peers + try: + raw = get_valuation_peers(ticker) + data = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + data = {} + + prompt = f"""You are a Valuation Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} + +VALUATION METRICS: +- Trailing P/E: {data.get('trailing_pe', 'N/A')} +- Forward P/E: {data.get('forward_pe', 'N/A')} +- PEG Ratio: {data.get('peg_ratio', 'N/A')} +- P/B: {data.get('price_to_book', 'N/A')} +- EV/EBITDA: {data.get('ev_to_ebitda', 'N/A')} +- P/S: {data.get('price_to_sales', 'N/A')} +- 52W Range Position: {data.get('vs_52w_range_pct', 'N/A')}% +- Revenue Growth: {data.get('revenue_growth', 'N/A')} +- Earnings Growth: {data.get('earnings_growth', 'N/A')} + +INSTRUCTIONS: +1. Score valuation attractiveness 0-10. + Low multiples relative to growth = high score. +2. Classify: undervalued / fair / overvalued. +3. Consider industry context (growth stocks deserve higher multiples).""" + + try: + result = invoke_structured(llm, ValuationOutput, prompt) + except Exception as e: + logger.warning("Valuation LLM failed: %s", e) + result = ValuationOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Valuation analysis unavailable", + ) + + result.trailing_pe = data.get("trailing_pe") + result.forward_pe = data.get("forward_pe") + result.peg_ratio = data.get("peg_ratio") + result.price_to_book = data.get("price_to_book") + result.ev_to_ebitda = data.get("ev_to_ebitda") + result.price_to_sales = data.get("price_to_sales") + result.vs_52w_range_pct = data.get("vs_52w_range_pct") + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"valuation": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Entry Timing +# --------------------------------------------------------------------------- + +def create_entry_timing_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + except Exception: + info = {} + + price = _safe(info, "currentPrice") or _safe(info, "regularMarketPrice") + ma50 = _safe(info, "fiftyDayAverage") + ma200 = _safe(info, "twoHundredDayAverage") + hi52 = _safe(info, "fiftyTwoWeekHigh") + lo52 = _safe(info, "fiftyTwoWeekLow") + + range_pct = None + if hi52 and lo52 and price and (hi52 - lo52) > 0: + range_pct = round(((price - lo52) / (hi52 - lo52)) * 100, 1) + + ma_rel = "unknown" + if ma50 and ma200: + ma_rel = "above" if ma50 > ma200 else "below" + + prompt = f"""You are an Entry Timing Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} + +TECHNICALS: +- Price: ${price or 'N/A'} +- 50-day MA: ${ma50 or 'N/A'} +- 200-day MA: ${ma200 or 'N/A'} +- 50d vs 200d: {ma_rel} +- 52W High: ${hi52 or 'N/A'} +- 52W Low: ${lo52 or 'N/A'} +- Position in 52W Range: {range_pct or 'N/A'}% + +INSTRUCTIONS: +1. Score entry timing 0-10. + Pullback to support in uptrend = high score. Overextended at highs = low score. +2. Classify timing_verdict: favorable / neutral / unfavorable. +3. Be concise.""" + + try: + result = invoke_structured(llm, EntryTimingOutput, prompt) + except Exception as e: + logger.warning("EntryTiming LLM failed: %s", e) + result = EntryTimingOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Entry timing analysis unavailable", + ) + + result.current_price = price + result.fifty_day_avg = ma50 + result.two_hundred_day_avg = ma200 + result.fifty_day_vs_200_day = ma_rel + result.vs_52w_range_pct = range_pct + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"entry_timing": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Earnings Revisions +# --------------------------------------------------------------------------- + +def create_earnings_revisions_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + + from tradingagents.dataflows.y_finance import get_earnings_estimates + try: + raw = get_earnings_estimates(ticker) + data = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + data = {} + + recs = data.get("recent_recommendations", []) + targets = data.get("price_targets", {}) + upside = data.get("price_target_upside_pct") + + prompt = f"""You are an Earnings Revisions Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} + +EARNINGS DATA: +- Trailing EPS: {data.get('trailing_eps', 'N/A')} +- Forward EPS: {data.get('forward_eps', 'N/A')} +- Price Target Upside: {upside or 'N/A'}% +- Price Targets: {json.dumps(targets)[:300] if targets else 'N/A'} +- Recent Recommendations: {len(recs)} entries + +INSTRUCTIONS: +1. Score earnings revision momentum 0-10. + Rising estimates + strong buy consensus + upside = high score. +2. Classify eps_revision_direction: up / down / flat. +3. Classify revenue_revision_direction: up / down / flat. +4. Classify analyst_consensus: strong_buy / buy / hold / sell / strong_sell. +5. This score has 10% weight in the master score — must materially affect it.""" + + try: + result = invoke_structured(llm, EarningsRevisionOutput, prompt) + except Exception as e: + logger.warning("EarningsRevisions LLM failed: %s", e) + result = EarningsRevisionOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Earnings revision analysis unavailable", + ) + + result.trailing_eps = data.get("trailing_eps") + result.forward_eps = data.get("forward_eps") + result.price_target_upside_pct = upside + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"earnings_revisions": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Sector Rotation +# --------------------------------------------------------------------------- + +def create_sector_rotation_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + + from tradingagents.dataflows.y_finance import get_sector_rotation + try: + raw = get_sector_rotation(ticker) + data = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + data = {} + + prompt = f"""You are a Sector Rotation Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Sector: {data.get('sector', 'Unknown')} | Sector ETF: {data.get('sector_etf', 'N/A')} + +SECTOR DATA: +- Sector vs SPY 1M: {data.get('stock_sector_vs_spy_1m', 'N/A')}% +- Sector vs SPY 3M: {data.get('stock_sector_vs_spy_3m', 'N/A')}% +- Sector Rank: {data.get('stock_sector_rank', 'N/A')} / {data.get('total_sectors', 11)} + +INSTRUCTIONS: +1. Score sector rotation favorability 0-10. + Top-ranked sector with positive relative strength = high score. +2. Classify rotation_direction: inflow / outflow / neutral. +3. Be concise.""" + + try: + result = invoke_structured(llm, SectorRotationOutput, prompt) + except Exception as e: + logger.warning("SectorRotation LLM failed: %s", e) + result = SectorRotationOutput( + score_0_to_10=5.0, confidence_0_to_1=0.1, + summary_1_sentence="Sector rotation analysis unavailable", + ) + + result.sector = data.get("sector", "Unknown") + result.sector_etf = data.get("sector_etf") + result.sector_vs_spy_1m = data.get("stock_sector_vs_spy_1m") + result.sector_vs_spy_3m = data.get("stock_sector_vs_spy_3m") + result.sector_rank = data.get("stock_sector_rank") + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"sector_rotation": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Backlog / Order Momentum +# --------------------------------------------------------------------------- + +def create_backlog_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + card = state.get("company_card") or {} + sector = card.get("sector", "Unknown") + industry = card.get("industry", "Unknown") + + # Backlog data is limited via yfinance — use revenue trajectory as proxy + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + except Exception: + info = {} + + prompt = f"""You are a Backlog / Order Momentum Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Sector: {sector} | Industry: {industry} + +AVAILABLE DATA: +- Revenue Growth: {_pct(_safe(info, 'revenueGrowth'))} +- Earnings Growth: {_pct(_safe(info, 'earningsGrowth'))} +- Revenue: {_safe(info, 'totalRevenue', 'N/A')} + +INSTRUCTIONS: +1. Assess if this company type typically has meaningful backlog data + (defense, industrials, semiconductors = yes; consumer, finance = no). +2. Score order momentum 0-10 based on revenue trajectory and industry context. +3. Set has_backlog_data=true only if this industry typically reports backlog. +4. This has 5% weight — be quick and concise.""" + + try: + result = invoke_structured(llm, BacklogOrderMomentumOutput, prompt) + except Exception as e: + logger.warning("Backlog LLM failed: %s", e) + result = BacklogOrderMomentumOutput( + score_0_to_10=5.0, confidence_0_to_1=0.3, + summary_1_sentence="Backlog analysis limited", + ) + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"backlog": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Narrative Crowding +# --------------------------------------------------------------------------- + +def create_crowding_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + card = state.get("company_card") or {} + + try: + t = yf.Ticker(ticker.upper()) + info = t.info or {} + except Exception: + info = {} + + short_pct = None + float_shares = _safe(info, "floatShares") + shares_short = _safe(info, "sharesShort") + if float_shares and shares_short and float_shares > 0: + short_pct = round(shares_short / float_shares * 100, 2) + + prompt = f"""You are a Narrative Crowding Analyst in a structured equity ranking pipeline. + +Ticker: {ticker} | Company: {card.get('company_name', 'Unknown')} +Market Cap Category: {card.get('market_cap_category', 'unknown')} + +DATA: +- Short % of Float: {short_pct or 'N/A'}% +- Short Ratio (days): {_safe(info, 'shortRatio', 'N/A')} +- Analyst Coverage: implied from market cap ({card.get('market_cap_category', 'unknown')}) + +INSTRUCTIONS: +1. Score narrative crowding 0-10. + HIGH score = low crowding (contrarian, under-followed). + LOW score = extremely crowded (everyone owns it, consensus trade). +2. Assess narrative_saturation: low / moderate / high. +3. Flag contrarian_opportunity if stock is hated but fundamentals are intact. +4. Flag short_squeeze_potential if short interest is high (>15% of float). +5. This has 5% weight — be concise.""" + + try: + result = invoke_structured(llm, NarrativeCrowdingOutput, prompt) + except Exception as e: + logger.warning("Crowding LLM failed: %s", e) + result = NarrativeCrowdingOutput( + score_0_to_10=5.0, confidence_0_to_1=0.3, + summary_1_sentence="Crowding analysis limited", + ) + + flags = [f.model_dump() for f in result.data_quality_flags] + return {"crowding": result.model_dump(), "global_flags": flags} + + return node + + +# --------------------------------------------------------------------------- +# Archetype +# --------------------------------------------------------------------------- + +def create_archetype_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + card = state.get("company_card") or {} + bq = state.get("business_quality") or {} + + prompt = f"""You are a Company Archetype Classifier. + +Company: {card.get('company_name', 'Unknown')} ({card.get('ticker', '?')}) +Sector: {card.get('sector', 'Unknown')} | Industry: {card.get('industry', 'Unknown')} +Market Cap: {card.get('market_cap_formatted', 'N/A')} +Description: {card.get('description', 'N/A')[:300]} + +Competitive Moat: {bq.get('competitive_moat', 'N/A')} +Revenue Growth: {bq.get('revenue_growth', 'N/A')} + +ARCHETYPES (pick exactly one): +- Infrastructure Builder: builds platforms/networks others depend on +- Bottleneck Supplier: controls scarce supply in a critical chain +- Platform Company: multi-sided marketplace with network effects +- Commodity Leverage: earnings levered to commodity prices +- Secular Growth Innovator: disrupting with new tech/business model +- Turnaround: beaten-down company with improving fundamentals +- Defensive Compounder: steady earnings, dividend grower, low vol + +Return archetype, confidence (0-1), and one-sentence reasoning.""" + + try: + result = invoke_structured(llm, ArchetypeOutput, prompt) + except Exception as e: + logger.warning("Archetype LLM failed: %s", e) + result = ArchetypeOutput() + + return {"archetype": result.model_dump()} + + return node diff --git a/tradingagents/agents/structured/tier3.py b/tradingagents/agents/structured/tier3.py new file mode 100644 index 00000000..93488323 --- /dev/null +++ b/tradingagents/agents/structured/tier3.py @@ -0,0 +1,358 @@ +"""Tier 3 agents: Bull/Bear debate, Risk assessment, Final decision. + +Only runs on stocks that pass Tier 1 + Tier 2. Uses the deep-thinking LLM +for reasoning-heavy tasks (debate, risk, final synthesis). +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict + +from tradingagents.models import ( + BearCaseOutput, + BullCaseOutput, + DataFlag, + DebateRefereeOutput, + FinalDecisionOutput, + RiskInvalidationOutput, + invoke_structured, +) + +logger = logging.getLogger(__name__) + + +def _summarize_tier2(state: Dict[str, Any]) -> str: + """Build a compact summary of all Tier 1+2 findings for Tier 3 prompts.""" + card = state.get("company_card") or {} + macro = state.get("macro") or {} + liq = state.get("liquidity") or {} + bq = state.get("business_quality") or {} + inst = state.get("institutional_flow") or {} + val = state.get("valuation") or {} + et = state.get("entry_timing") or {} + er = state.get("earnings_revisions") or {} + sr = state.get("sector_rotation") or {} + bl = state.get("backlog") or {} + cr = state.get("crowding") or {} + arch = state.get("archetype") or {} + + lines = [ + f"Company: {card.get('company_name', '?')} ({card.get('ticker', '?')})", + f"Sector: {card.get('sector', '?')} | Industry: {card.get('industry', '?')}", + f"Market Cap: {card.get('market_cap_formatted', 'N/A')}", + f"Price: ${card.get('current_price', 'N/A')}", + f"Archetype: {arch.get('archetype', 'N/A')}", + "", + f"Master Score: {state.get('master_score', 'N/A')} | Role: {state.get('position_role', 'N/A')}", + "", + "AGENT SCORES (0-10):", + f" Business Quality: {bq.get('score_0_to_10', 'N/A')} — {bq.get('summary_1_sentence', '')}", + f" Macro Alignment: {macro.get('macro_alignment_0_to_10', 'N/A')} — {macro.get('summary_1_sentence', '')}", + f" Institutional Flow: {inst.get('score_0_to_10', 'N/A')} — {inst.get('summary_1_sentence', '')}", + f" Valuation: {val.get('score_0_to_10', 'N/A')} — {val.get('summary_1_sentence', '')}", + f" Entry Timing: {et.get('score_0_to_10', 'N/A')} — {et.get('summary_1_sentence', '')}", + f" Earnings Revisions: {er.get('score_0_to_10', 'N/A')} — {er.get('summary_1_sentence', '')}", + f" Sector Rotation: {sr.get('score_0_to_10', 'N/A')} — {sr.get('summary_1_sentence', '')}", + f" Backlog: {bl.get('score_0_to_10', 'N/A')} — {bl.get('summary_1_sentence', '')}", + f" Crowding: {cr.get('score_0_to_10', 'N/A')} — {cr.get('summary_1_sentence', '')}", + f" Liquidity: {liq.get('score_0_to_10', 'N/A')} — {liq.get('summary_1_sentence', '')}", + "", + f" Macro Regime: {macro.get('regime_label', '?')} | VIX: {macro.get('vix_level', '?')}", + f" Moat: {bq.get('competitive_moat', '?')} | Valuation: {val.get('valuation_verdict', '?')}", + f" Accumulation: {inst.get('accumulation_signal', '?')} | Timing: {et.get('timing_verdict', '?')}", + ] + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Bull Case +# --------------------------------------------------------------------------- + +def create_bull_case_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + summary = _summarize_tier2(state) + + prompt = f"""You are a Bull Case Researcher. Build the strongest possible bullish thesis for {ticker}. + +{summary} + +INSTRUCTIONS: +1. Write a concise thesis (2-3 sentences) for why this stock should be bought. +2. List 3-5 specific catalysts that could drive the stock higher. +3. Estimate upside_target (price) and upside_pct from current price. +4. List key assumptions your thesis depends on. +5. List thesis_invalidation_triggers — what would kill the bull case. +6. Set confidence 0-1 for how strong the bull case is. + +Attack the investment aggressively. Find every reason to be bullish. +But be honest — don't fabricate catalysts. Use the data above.""" + + try: + result = invoke_structured(llm, BullCaseOutput, prompt) + except Exception as e: + logger.warning("BullCase LLM failed: %s", e) + result = BullCaseOutput( + thesis="Bull case analysis unavailable", + confidence_0_to_1=0.1, + ) + + return {"bull_case": result.model_dump()} + + return node + + +# --------------------------------------------------------------------------- +# Bear Case +# --------------------------------------------------------------------------- + +def create_bear_case_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + summary = _summarize_tier2(state) + + prompt = f"""You are a Bear Case Researcher. Build the strongest possible bearish thesis for {ticker}. + +{summary} + +INSTRUCTIONS: +1. Write a concise thesis (2-3 sentences) for why this stock should be avoided or sold. +2. List 3-5 specific risks that could drive the stock lower. +3. Estimate downside_target (price) and downside_pct from current price. +4. List key assumptions your bear thesis depends on. +5. List thesis_invalidation_triggers — what would kill the bear case. +6. Set confidence 0-1 for how strong the bear case is. + +Be ruthless. Find every vulnerability, every overvaluation, every risk. +But be honest — don't fabricate risks. Use the data above.""" + + try: + result = invoke_structured(llm, BearCaseOutput, prompt) + except Exception as e: + logger.warning("BearCase LLM failed: %s", e) + result = BearCaseOutput( + thesis="Bear case analysis unavailable", + confidence_0_to_1=0.1, + ) + + return {"bear_case": result.model_dump()} + + return node + + +# --------------------------------------------------------------------------- +# Debate Referee +# --------------------------------------------------------------------------- + +def create_debate_node(llm): + """Referee that evaluates bull vs bear case.""" + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + bull = state.get("bull_case") or {} + bear = state.get("bear_case") or {} + + prompt = f"""You are the Debate Referee. Evaluate the bull vs bear case for {ticker}. + +BULL CASE (confidence: {bull.get('confidence_0_to_1', 'N/A')}): +Thesis: {bull.get('thesis', 'N/A')} +Catalysts: {', '.join(bull.get('catalysts', []))} +Upside: {bull.get('upside_pct', 'N/A')}% +Invalidation: {', '.join(bull.get('thesis_invalidation_triggers', []))} + +BEAR CASE (confidence: {bear.get('confidence_0_to_1', 'N/A')}): +Thesis: {bear.get('thesis', 'N/A')} +Risks: {', '.join(bear.get('risks', []))} +Downside: {bear.get('downside_pct', 'N/A')}% +Invalidation: {', '.join(bear.get('thesis_invalidation_triggers', []))} + +MASTER SCORE: {state.get('master_score', 'N/A')} | ROLE: {state.get('position_role', 'N/A')} + +INSTRUCTIONS: +1. Declare winner: "bull" or "bear". +2. Score each side 0-10 on argument strength. +3. List key unresolved questions. +4. Set net_conviction_adjustment (-2 to +2) to modify the master score. + Positive = debate strengthened the bull case. Negative = weakened it. +5. Provide reasoning for your decision.""" + + try: + result = invoke_structured(llm, DebateRefereeOutput, prompt) + except Exception as e: + logger.warning("Debate LLM failed: %s", e) + result = DebateRefereeOutput() + + return {"debate": result.model_dump()} + + return node + + +# --------------------------------------------------------------------------- +# Risk / Invalidation +# --------------------------------------------------------------------------- + +def create_risk_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + summary = _summarize_tier2(state) + bull = state.get("bull_case") or {} + bear = state.get("bear_case") or {} + debate = state.get("debate") or {} + + prompt = f"""You are the Risk / Invalidation Analyst. Final risk gate for {ticker}. + +{summary} + +DEBATE OUTCOME: {debate.get('winner', '?')} won + Bull strength: {debate.get('bull_strength_0_to_10', '?')}/10 + Bear strength: {debate.get('bear_strength_0_to_10', '?')}/10 + Conviction adjustment: {debate.get('net_conviction_adjustment', 0)} + +Bear risks: {', '.join(bear.get('risks', []))} +Bull invalidation triggers: {', '.join(bull.get('thesis_invalidation_triggers', []))} + +INSTRUCTIONS: +1. Classify overall_risk_level: low / medium / high. +2. Set max_position_size_pct (0-100). Low risk = up to 10%. High risk = max 2%. +3. Suggest stop_loss_pct (distance from entry to stop). +4. List invalidation_triggers — concrete events that should trigger exit. +5. Score overall risk-reward 0-10 (10 = great risk/reward). +6. Set veto=true ONLY if you find impossible/fraudulent data, or risk is so extreme + that no position should be taken. This is a hard kill switch. +7. Be concise.""" + + try: + result = invoke_structured(llm, RiskInvalidationOutput, prompt) + except Exception as e: + logger.warning("Risk LLM failed: %s", e) + result = RiskInvalidationOutput( + score_0_to_10=5.0, confidence_0_to_1=0.3, + summary_1_sentence="Risk analysis unavailable", + ) + + flags = [f.model_dump() for f in result.data_quality_flags] + update: Dict[str, Any] = {"risk": result.model_dump(), "global_flags": flags} + + if result.veto: + update["hard_veto"] = True + update["hard_veto_reason"] = result.veto_reason + + return update + + return node + + +# --------------------------------------------------------------------------- +# Final Decision (prose generated AFTER all scoring) +# --------------------------------------------------------------------------- + +def create_final_decision_node(llm): + + def node(state: Dict[str, Any]) -> Dict[str, Any]: + ticker = state["ticker"] + card = state.get("company_card") or {} + summary = _summarize_tier2(state) + + bull = state.get("bull_case") or {} + bear = state.get("bear_case") or {} + debate = state.get("debate") or {} + risk = state.get("risk") or {} + + master_score = state.get("master_score", 0) + adjusted_score = state.get("adjusted_score", 0) + position_role = state.get("position_role", "Avoid") + conviction_adj = debate.get("net_conviction_adjustment", 0) + + # Apply debate conviction adjustment + final_score = round(adjusted_score + conviction_adj, 2) + final_role = _role_from_score(final_score) + + # Determine action + if state.get("hard_veto"): + action = "AVOID" + final_role = "Avoid" + final_score = 0.0 + elif final_score >= 70: + action = "BUY" + elif final_score >= 50: + action = "HOLD" + else: + action = "AVOID" + + prompt = f"""You are the Final Decision Synthesizer for {ticker}. + +{summary} + +DEBATE: {debate.get('winner', '?')} won | Conviction adjustment: {conviction_adj:+.1f} +RISK: {risk.get('overall_risk_level', '?')} | Max position: {risk.get('max_position_size_pct', '?')}% + +FINAL SCORES: + Master Score: {master_score} + Adjusted Score: {adjusted_score} (after data quality penalties) + Post-Debate Score: {final_score} (after conviction adjustment) + Position Role: {final_role} + Action: {action} + +INSTRUCTIONS: +Write a concise narrative (3-5 sentences) that: +1. Summarizes the investment thesis. +2. Highlights the top 2-3 catalysts and top 2-3 risks. +3. States the action ({action}) and position role ({final_role}). +4. Notes what would change the thesis (invalidation triggers). + +Also provide: +- thesis_summary (one sentence) +- key_catalysts (top 3 from bull case) +- key_risks (top 3 from bear case) +- invalidation_triggers (from risk agent) +- position_sizing_pct (from risk agent) +- confidence (average of all agent confidences)""" + + try: + result = invoke_structured(llm, FinalDecisionOutput, prompt) + except Exception as e: + logger.warning("FinalDecision LLM failed: %s", e) + result = FinalDecisionOutput() + + # Override with computed values (deterministic, not LLM-driven) + result.ticker = ticker + result.company_name = card.get("company_name", "") + result.master_score = master_score + result.adjusted_score = final_score + result.position_role = final_role + result.action = action + result.risk_level = risk.get("overall_risk_level", "medium") + result.position_sizing_pct = risk.get("max_position_size_pct", 0) + + # Compute aggregate confidence + agents_with_confidence = [ + state.get(k, {}).get("confidence_0_to_1") + for k in ( + "macro", "liquidity", "business_quality", "institutional_flow", + "valuation", "entry_timing", "earnings_revisions", + "sector_rotation", "backlog", "crowding", + ) + ] + valid_confs = [c for c in agents_with_confidence if c is not None] + result.confidence = round(sum(valid_confs) / len(valid_confs), 2) if valid_confs else 0.5 + + return {"final_decision": result.model_dump()} + + return node + + +def _role_from_score(score: float) -> str: + if score > 80: + return "Core Position" + if score > 70: + return "Strong Position" + if score > 60: + return "Tactical / Satellite" + if score > 50: + return "Watchlist" + return "Avoid" diff --git a/tradingagents/agents/utils/agent_states.py b/tradingagents/agents/utils/agent_states.py index 813b00ee..9b55ead9 100644 --- a/tradingagents/agents/utils/agent_states.py +++ b/tradingagents/agents/utils/agent_states.py @@ -1,76 +1,105 @@ -from typing import Annotated, Sequence -from datetime import date, timedelta, datetime -from typing_extensions import TypedDict, Optional +"""State definitions for the TradingAgents pipeline. + +PipelineState is the new structured state used by the equity ranking engine. +Legacy state types are preserved for backward compatibility. +""" + +from __future__ import annotations + +import operator +from typing import Annotated, Optional, Sequence + +from typing_extensions import TypedDict + from langchain_openai import ChatOpenAI -from tradingagents.agents import * -from langgraph.prebuilt import ToolNode -from langgraph.graph import END, StateGraph, START, MessagesState +from langgraph.graph import MessagesState -# Researcher team state +# --------------------------------------------------------------------------- +# New structured pipeline state +# --------------------------------------------------------------------------- + +class PipelineState(TypedDict): + """Shared state for the structured equity ranking pipeline. + + Each agent writes its output as a dict (Pydantic .model_dump()). + The scoring node computes master_score/adjusted_score deterministically. + global_flags uses operator.add to accumulate across all agents. + """ + ticker: str + trade_date: str + + # Tier 1 + validation: Optional[dict] + company_card: Optional[dict] + macro: Optional[dict] + liquidity: Optional[dict] + + # Tier 2 + sector_rotation: Optional[dict] + business_quality: Optional[dict] + institutional_flow: Optional[dict] + valuation: Optional[dict] + entry_timing: Optional[dict] + earnings_revisions: Optional[dict] + backlog: Optional[dict] + crowding: Optional[dict] + archetype: Optional[dict] + + # Scoring (deterministic) + master_score: Optional[float] + adjusted_score: Optional[float] + position_role: Optional[str] + + # Tier 3 + bull_case: Optional[dict] + bear_case: Optional[dict] + debate: Optional[dict] + risk: Optional[dict] + final_decision: Optional[dict] + + # Control + hard_veto: bool + hard_veto_reason: Optional[str] + global_flags: Annotated[list, operator.add] + + +# --------------------------------------------------------------------------- +# Legacy state types (preserved for backward compatibility) +# --------------------------------------------------------------------------- + class InvestDebateState(TypedDict): - bull_history: Annotated[ - str, "Bullish Conversation history" - ] # Bullish Conversation history - bear_history: Annotated[ - str, "Bearish Conversation history" - ] # Bullish Conversation history - history: Annotated[str, "Conversation history"] # Conversation history - current_response: Annotated[str, "Latest response"] # Last response - judge_decision: Annotated[str, "Final judge decision"] # Last response - count: Annotated[int, "Length of the current conversation"] # Conversation length + bull_history: Annotated[str, "Bullish Conversation history"] + bear_history: Annotated[str, "Bearish Conversation history"] + history: Annotated[str, "Conversation history"] + current_response: Annotated[str, "Latest response"] + judge_decision: Annotated[str, "Final judge decision"] + count: Annotated[int, "Length of the current conversation"] -# Risk management team state class RiskDebateState(TypedDict): - aggressive_history: Annotated[ - str, "Aggressive Agent's Conversation history" - ] # Conversation history - conservative_history: Annotated[ - str, "Conservative Agent's Conversation history" - ] # Conversation history - neutral_history: Annotated[ - str, "Neutral Agent's Conversation history" - ] # Conversation history - history: Annotated[str, "Conversation history"] # Conversation history + aggressive_history: Annotated[str, "Aggressive Agent's Conversation history"] + conservative_history: Annotated[str, "Conservative Agent's Conversation history"] + neutral_history: Annotated[str, "Neutral Agent's Conversation history"] + history: Annotated[str, "Conversation history"] latest_speaker: Annotated[str, "Analyst that spoke last"] - current_aggressive_response: Annotated[ - str, "Latest response by the aggressive analyst" - ] # Last response - current_conservative_response: Annotated[ - str, "Latest response by the conservative analyst" - ] # Last response - current_neutral_response: Annotated[ - str, "Latest response by the neutral analyst" - ] # Last response + current_aggressive_response: Annotated[str, "Latest response by the aggressive analyst"] + current_conservative_response: Annotated[str, "Latest response by the conservative analyst"] + current_neutral_response: Annotated[str, "Latest response by the neutral analyst"] judge_decision: Annotated[str, "Judge's decision"] - count: Annotated[int, "Length of the current conversation"] # Conversation length + count: Annotated[int, "Length of the current conversation"] class AgentState(MessagesState): company_of_interest: Annotated[str, "Company that we are interested in trading"] trade_date: Annotated[str, "What date we are trading at"] - sender: Annotated[str, "Agent that sent this message"] - - # research step market_report: Annotated[str, "Report from the Market Analyst"] sentiment_report: Annotated[str, "Report from the Social Media Analyst"] - news_report: Annotated[ - str, "Report from the News Researcher of current world affairs" - ] + news_report: Annotated[str, "Report from the News Researcher"] fundamentals_report: Annotated[str, "Report from the Fundamentals Researcher"] - - # researcher team discussion step - investment_debate_state: Annotated[ - InvestDebateState, "Current state of the debate on if to invest or not" - ] + investment_debate_state: Annotated[InvestDebateState, "Current state of the investment debate"] investment_plan: Annotated[str, "Plan generated by the Analyst"] - trader_investment_plan: Annotated[str, "Plan generated by the Trader"] - - # risk management team discussion step - risk_debate_state: Annotated[ - RiskDebateState, "Current state of the debate on evaluating risk" - ] + risk_debate_state: Annotated[RiskDebateState, "Current state of the risk debate"] final_trade_decision: Annotated[str, "Final decision made by the Risk Analysts"] diff --git a/tradingagents/agents/utils/macro_data_tools.py b/tradingagents/agents/utils/macro_data_tools.py new file mode 100644 index 00000000..2172cff9 --- /dev/null +++ b/tradingagents/agents/utils/macro_data_tools.py @@ -0,0 +1,594 @@ +"""Macro-aware data tools for the structured equity ranking engine. + +These tools fetch company profile, macro regime, sector rotation, +institutional flow, earnings estimates, and valuation data via yfinance. +They are used directly by analyst agents (not routed through interface.py). +""" + +from langchain_core.tools import tool +from typing import Annotated +from datetime import datetime +from dateutil.relativedelta import relativedelta +import json + + +def _safe_get(info, key, default=None): + """Safely get a value from yfinance info dict.""" + val = info.get(key) + if val is None: + return default + return val + + +def _fmt_large_number(val): + """Format large numbers for readability.""" + if val is None: + return None + if abs(val) >= 1e12: + return f"${val/1e12:.2f}T" + if abs(val) >= 1e9: + return f"${val/1e9:.2f}B" + if abs(val) >= 1e6: + return f"${val/1e6:.2f}M" + return f"${val:,.0f}" + + +def _market_cap_category(market_cap): + """Classify market cap size.""" + if market_cap is None: + return "unknown" + if market_cap >= 10e9: + return "large_cap" + if market_cap >= 2e9: + return "mid_cap" + if market_cap >= 300e6: + return "small_cap" + return "micro_cap" + + +# Sector to ETF mapping +SECTOR_ETF_MAP = { + "Technology": "XLK", + "Information Technology": "XLK", + "Communication Services": "XLC", + "Healthcare": "XLV", + "Health Care": "XLV", + "Financials": "XLF", + "Financial Services": "XLF", + "Consumer Discretionary": "XLY", + "Consumer Cyclical": "XLY", + "Consumer Staples": "XLP", + "Consumer Defensive": "XLP", + "Industrials": "XLI", + "Energy": "XLE", + "Utilities": "XLU", + "Materials": "XLB", + "Basic Materials": "XLB", + "Real Estate": "XLRE", +} + +ALL_SECTOR_ETFS = ["XLK", "XLC", "XLV", "XLF", "XLY", "XLP", "XLI", "XLE", "XLU", "XLB", "XLRE"] + + +def _get_period_return(ticker_obj, period_months, ref_date=None): + """Calculate return over a given period ending at ref_date.""" + import yfinance as yf + import pandas as pd + + try: + if ref_date: + end_dt = pd.to_datetime(ref_date) + else: + end_dt = pd.Timestamp.today() + + start_dt = end_dt - pd.DateOffset(months=period_months) + data = ticker_obj.history( + start=start_dt.strftime("%Y-%m-%d"), + end=end_dt.strftime("%Y-%m-%d"), + ) + if data.empty or len(data) < 2: + return None + return ((data["Close"].iloc[-1] / data["Close"].iloc[0]) - 1) * 100 + except Exception: + return None + + +@tool +def get_company_profile( + ticker: Annotated[str, "Ticker symbol of the company"], +) -> str: + """Fetch company profile: name, sector, industry, description, market cap, business model. + Returns structured text with all fields for the Company Intelligence Analyst. + """ + import yfinance as yf + + try: + t = yf.Ticker(ticker.upper()) + info = t.info + + if not info or not info.get("longName"): + return json.dumps({"error": f"No company data found for {ticker}", "ticker": ticker}) + + market_cap = _safe_get(info, "marketCap") + + profile = { + "company_name": _safe_get(info, "longName", "Unknown"), + "ticker": ticker.upper(), + "sector": _safe_get(info, "sector", "Unknown"), + "industry": _safe_get(info, "industry", "Unknown"), + "description": _safe_get(info, "longBusinessSummary", "No description available"), + "market_cap": market_cap, + "market_cap_formatted": _fmt_large_number(market_cap), + "market_cap_category": _market_cap_category(market_cap), + "trailing_pe": _safe_get(info, "trailingPE"), + "forward_pe": _safe_get(info, "forwardPE"), + "peg_ratio": _safe_get(info, "pegRatio"), + "price_to_book": _safe_get(info, "priceToBook"), + "dividend_yield": _safe_get(info, "dividendYield"), + "beta": _safe_get(info, "beta"), + "trailing_eps": _safe_get(info, "trailingEps"), + "forward_eps": _safe_get(info, "forwardEps"), + "revenue": _safe_get(info, "totalRevenue"), + "revenue_formatted": _fmt_large_number(_safe_get(info, "totalRevenue")), + "gross_profits": _safe_get(info, "grossProfits"), + "ebitda": _safe_get(info, "ebitda"), + "net_income": _safe_get(info, "netIncomeToCommon"), + "profit_margins": _safe_get(info, "profitMargins"), + "operating_margins": _safe_get(info, "operatingMargins"), + "return_on_equity": _safe_get(info, "returnOnEquity"), + "return_on_assets": _safe_get(info, "returnOnAssets"), + "debt_to_equity": _safe_get(info, "debtToEquity"), + "current_ratio": _safe_get(info, "currentRatio"), + "book_value": _safe_get(info, "bookValue"), + "free_cashflow": _safe_get(info, "freeCashflow"), + "fifty_two_week_high": _safe_get(info, "fiftyTwoWeekHigh"), + "fifty_two_week_low": _safe_get(info, "fiftyTwoWeekLow"), + "fifty_day_average": _safe_get(info, "fiftyDayAverage"), + "two_hundred_day_average": _safe_get(info, "twoHundredDayAverage"), + "average_volume": _safe_get(info, "averageVolume"), + "average_volume_10d": _safe_get(info, "averageVolume10days"), + "shares_outstanding": _safe_get(info, "sharesOutstanding"), + "float_shares": _safe_get(info, "floatShares"), + "shares_short": _safe_get(info, "sharesShort"), + "short_ratio": _safe_get(info, "shortRatio"), + "held_percent_insiders": _safe_get(info, "heldPercentInsiders"), + "held_percent_institutions": _safe_get(info, "heldPercentInstitutions"), + "current_price": _safe_get(info, "currentPrice") or _safe_get(info, "regularMarketPrice"), + } + + return json.dumps(profile, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching company profile for {ticker}: {str(e)}", "ticker": ticker}) + + +@tool +def get_macro_indicators( + curr_date: Annotated[str, "Current trading date in yyyy-mm-dd format"], +) -> str: + """Fetch macro regime indicators: VIX, 10Y yield, dollar strength, credit spreads, sector ETF performance. + Returns structured text for the Company Intelligence and Macro Regime Analyst. + """ + import yfinance as yf + import pandas as pd + + try: + results = {} + + # VIX + try: + vix = yf.Ticker("^VIX") + vix_data = vix.history(period="5d") + if not vix_data.empty: + results["vix_level"] = round(vix_data["Close"].iloc[-1], 2) + if results["vix_level"] < 15: + results["vix_regime"] = "low" + elif results["vix_level"] < 20: + results["vix_regime"] = "moderate" + elif results["vix_level"] < 30: + results["vix_regime"] = "elevated" + else: + results["vix_regime"] = "stressed" + except Exception: + results["vix_level"] = None + results["vix_regime"] = "unknown" + + # 10Y yield + try: + tnx = yf.Ticker("^TNX") + tnx_data = tnx.history(period="5d") + if not tnx_data.empty: + results["ten_year_yield"] = round(tnx_data["Close"].iloc[-1], 3) + except Exception: + results["ten_year_yield"] = None + + # Dollar strength (UUP as proxy) + try: + uup = yf.Ticker("UUP") + uup_1m = _get_period_return(uup, 1) + uup_3m = _get_period_return(uup, 3) + results["dollar_1m_return"] = round(uup_1m, 2) if uup_1m is not None else None + results["dollar_3m_return"] = round(uup_3m, 2) if uup_3m is not None else None + if uup_1m is not None: + if uup_1m > 1: + results["dollar_strength"] = "strong" + elif uup_1m < -1: + results["dollar_strength"] = "weak" + else: + results["dollar_strength"] = "neutral" + except Exception: + results["dollar_strength"] = "unknown" + + # Credit spreads: HYG vs LQD + try: + hyg = yf.Ticker("HYG") + lqd = yf.Ticker("LQD") + hyg_1m = _get_period_return(hyg, 1) + lqd_1m = _get_period_return(lqd, 1) + if hyg_1m is not None and lqd_1m is not None: + spread_change = hyg_1m - lqd_1m + results["hyg_1m_return"] = round(hyg_1m, 2) + results["lqd_1m_return"] = round(lqd_1m, 2) + results["credit_spread_change"] = round(spread_change, 2) + if spread_change > 0.5: + results["credit_spread_direction"] = "tightening" + elif spread_change < -0.5: + results["credit_spread_direction"] = "widening" + else: + results["credit_spread_direction"] = "stable" + except Exception: + results["credit_spread_direction"] = "unknown" + + # SPY and sector ETF performance + sector_etfs = { + "SPY": "S&P 500", + "XLK": "Technology", + "XLC": "Communication Services", + "XLV": "Healthcare", + "XLF": "Financials", + "XLY": "Consumer Discretionary", + "XLP": "Consumer Staples", + "XLI": "Industrials", + "XLE": "Energy", + "XLU": "Utilities", + "XLB": "Materials", + "XLRE": "Real Estate", + } + + sector_performance = {} + for etf_ticker, sector_name in sector_etfs.items(): + try: + etf = yf.Ticker(etf_ticker) + ret_1m = _get_period_return(etf, 1) + ret_3m = _get_period_return(etf, 3) + sector_performance[etf_ticker] = { + "name": sector_name, + "return_1m": round(ret_1m, 2) if ret_1m is not None else None, + "return_3m": round(ret_3m, 2) if ret_3m is not None else None, + } + except Exception: + sector_performance[etf_ticker] = { + "name": sector_name, + "return_1m": None, + "return_3m": None, + } + + results["sector_performance"] = sector_performance + + return json.dumps(results, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching macro indicators: {str(e)}"}) + + +@tool +def get_sector_rotation( + ticker: Annotated[str, "Ticker symbol of the company"], + curr_date: Annotated[str, "Current trading date in yyyy-mm-dd format"], +) -> str: + """Fetch sector rotation data: sector ETF relative strength vs SPY over 1M/3M/6M, breadth indicators. + Returns structured text for the Sector Rotation and Institutional Flow Analyst. + """ + import yfinance as yf + + try: + # Get the company's sector + t = yf.Ticker(ticker.upper()) + info = t.info + sector = _safe_get(info, "sector", "Unknown") + + # Map sector to ETF + sector_etf = SECTOR_ETF_MAP.get(sector, None) + + # Get SPY returns + spy = yf.Ticker("SPY") + spy_1m = _get_period_return(spy, 1) + spy_3m = _get_period_return(spy, 3) + spy_6m = _get_period_return(spy, 6) + + # Get all sector ETF returns for ranking + sector_returns = {} + for etf_sym in ALL_SECTOR_ETFS: + try: + etf = yf.Ticker(etf_sym) + ret_1m = _get_period_return(etf, 1) + ret_3m = _get_period_return(etf, 3) + ret_6m = _get_period_return(etf, 6) + sector_returns[etf_sym] = { + "return_1m": round(ret_1m, 2) if ret_1m is not None else None, + "return_3m": round(ret_3m, 2) if ret_3m is not None else None, + "return_6m": round(ret_6m, 2) if ret_6m is not None else None, + "vs_spy_1m": round(ret_1m - spy_1m, 2) if (ret_1m is not None and spy_1m is not None) else None, + "vs_spy_3m": round(ret_3m - spy_3m, 2) if (ret_3m is not None and spy_3m is not None) else None, + "vs_spy_6m": round(ret_6m - spy_6m, 2) if (ret_6m is not None and spy_6m is not None) else None, + } + except Exception: + sector_returns[etf_sym] = { + "return_1m": None, "return_3m": None, "return_6m": None, + "vs_spy_1m": None, "vs_spy_3m": None, "vs_spy_6m": None, + } + + # Rank sectors by 1M relative strength + ranked = sorted( + [(sym, data) for sym, data in sector_returns.items() if data["vs_spy_1m"] is not None], + key=lambda x: x[1]["vs_spy_1m"], + reverse=True, + ) + rank_map = {sym: i + 1 for i, (sym, _) in enumerate(ranked)} + + # Stock's sector data + stock_sector_data = {} + stock_sector_rank = None + if sector_etf and sector_etf in sector_returns: + stock_sector_data = sector_returns[sector_etf] + stock_sector_rank = rank_map.get(sector_etf) + + result = { + "ticker": ticker.upper(), + "sector": sector, + "sector_etf": sector_etf, + "stock_sector_vs_spy_1m": stock_sector_data.get("vs_spy_1m"), + "stock_sector_vs_spy_3m": stock_sector_data.get("vs_spy_3m"), + "stock_sector_vs_spy_6m": stock_sector_data.get("vs_spy_6m"), + "stock_sector_rank": stock_sector_rank, + "total_sectors": len(ranked), + "spy_1m_return": round(spy_1m, 2) if spy_1m is not None else None, + "spy_3m_return": round(spy_3m, 2) if spy_3m is not None else None, + "spy_6m_return": round(spy_6m, 2) if spy_6m is not None else None, + "all_sector_returns": sector_returns, + "sector_rankings_1m": [{"etf": sym, "vs_spy_1m": data["vs_spy_1m"]} for sym, data in ranked], + } + + return json.dumps(result, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching sector rotation data for {ticker}: {str(e)}"}) + + +@tool +def get_institutional_flow( + ticker: Annotated[str, "Ticker symbol of the company"], +) -> str: + """Fetch institutional flow data: volume ratios, float turnover, short interest, institutional ownership. + Returns structured text for the Sector Rotation and Institutional Flow Analyst. + """ + import yfinance as yf + + try: + t = yf.Ticker(ticker.upper()) + info = t.info + + avg_vol = _safe_get(info, "averageVolume") + avg_vol_10d = _safe_get(info, "averageVolume10days") + shares_outstanding = _safe_get(info, "sharesOutstanding") + float_shares = _safe_get(info, "floatShares") + shares_short = _safe_get(info, "sharesShort") + short_ratio = _safe_get(info, "shortRatio") + held_institutions = _safe_get(info, "heldPercentInstitutions") + held_insiders = _safe_get(info, "heldPercentInsiders") + + # Compute derived metrics + volume_ratio = None + if avg_vol and avg_vol_10d and avg_vol > 0: + volume_ratio = round(avg_vol_10d / avg_vol, 2) + + float_turnover_5d = None + float_turnover_20d = None + if float_shares and float_shares > 0: + if avg_vol_10d: + float_turnover_5d = round((avg_vol_10d * 5) / float_shares * 100, 2) + if avg_vol: + float_turnover_20d = round((avg_vol * 20) / float_shares * 100, 2) + + short_pct_of_float = None + if shares_short and float_shares and float_shares > 0: + short_pct_of_float = round(shares_short / float_shares * 100, 2) + + result = { + "ticker": ticker.upper(), + "average_volume": avg_vol, + "average_volume_10d": avg_vol_10d, + "volume_ratio": volume_ratio, + "shares_outstanding": shares_outstanding, + "float_shares": float_shares, + "shares_short": shares_short, + "short_ratio": short_ratio, + "short_pct_of_float": short_pct_of_float, + "float_turnover_5d_pct": float_turnover_5d, + "float_turnover_20d_pct": float_turnover_20d, + "held_percent_institutions": round(held_institutions * 100, 2) if held_institutions else None, + "held_percent_insiders": round(held_insiders * 100, 2) if held_insiders else None, + } + + return json.dumps(result, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching institutional flow data for {ticker}: {str(e)}"}) + + +@tool +def get_earnings_estimates( + ticker: Annotated[str, "Ticker symbol of the company"], +) -> str: + """Fetch earnings revision data: analyst recommendations, price targets, EPS estimates. + Returns structured text for the Earnings Revision and News Catalyst Analyst. + """ + import yfinance as yf + + try: + t = yf.Ticker(ticker.upper()) + info = t.info + + result = { + "ticker": ticker.upper(), + "current_price": _safe_get(info, "currentPrice") or _safe_get(info, "regularMarketPrice"), + "trailing_eps": _safe_get(info, "trailingEps"), + "forward_eps": _safe_get(info, "forwardEps"), + } + + # Analyst recommendations + try: + recs = t.recommendations + if recs is not None and not recs.empty: + # Get the most recent recommendations + recent_recs = recs.tail(20) + rec_list = [] + for _, row in recent_recs.iterrows(): + rec_entry = {} + for col in recent_recs.columns: + val = row[col] + if hasattr(val, 'item'): + val = val.item() + rec_entry[col] = val + rec_list.append(rec_entry) + result["recent_recommendations"] = rec_list + else: + result["recent_recommendations"] = [] + except Exception: + result["recent_recommendations"] = [] + + # Analyst price targets + try: + targets = t.analyst_price_targets + if targets is not None: + target_dict = {} + if hasattr(targets, 'items'): + for k, v in targets.items(): + if hasattr(v, 'item'): + target_dict[k] = v.item() + else: + target_dict[k] = v + elif isinstance(targets, dict): + target_dict = targets + result["price_targets"] = target_dict + + # Calculate upside + current = result.get("current_price") + mean_target = target_dict.get("mean") or target_dict.get("current") + if current and mean_target and current > 0: + result["price_target_upside_pct"] = round(((mean_target / current) - 1) * 100, 2) + else: + result["price_targets"] = {} + except Exception: + result["price_targets"] = {} + + # Earnings estimates if available + try: + earnings_est = t.earnings_estimate + if earnings_est is not None and not earnings_est.empty: + est_dict = {} + for col in earnings_est.columns: + est_dict[str(col)] = {} + for idx in earnings_est.index: + val = earnings_est.loc[idx, col] + if hasattr(val, 'item'): + val = val.item() + est_dict[str(col)][str(idx)] = val + result["earnings_estimates"] = est_dict + else: + result["earnings_estimates"] = {} + except Exception: + result["earnings_estimates"] = {} + + # Revenue estimates if available + try: + rev_est = t.revenue_estimate + if rev_est is not None and not rev_est.empty: + rev_dict = {} + for col in rev_est.columns: + rev_dict[str(col)] = {} + for idx in rev_est.index: + val = rev_est.loc[idx, col] + if hasattr(val, 'item'): + val = val.item() + rev_dict[str(col)][str(idx)] = val + result["revenue_estimates"] = rev_dict + else: + result["revenue_estimates"] = {} + except Exception: + result["revenue_estimates"] = {} + + return json.dumps(result, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching earnings estimates for {ticker}: {str(e)}"}) + + +@tool +def get_valuation_peers( + ticker: Annotated[str, "Ticker symbol of the company"], +) -> str: + """Fetch valuation metrics and peer comparison data. + Returns structured text for the Business Quality, Valuation, and Entry Timing Analyst. + """ + import yfinance as yf + + try: + t = yf.Ticker(ticker.upper()) + info = t.info + + current_price = _safe_get(info, "currentPrice") or _safe_get(info, "regularMarketPrice") + fifty_two_high = _safe_get(info, "fiftyTwoWeekHigh") + fifty_two_low = _safe_get(info, "fiftyTwoWeekLow") + + # Calculate position in 52-week range + vs_52w_range_pct = None + if fifty_two_high and fifty_two_low and current_price and (fifty_two_high - fifty_two_low) > 0: + vs_52w_range_pct = round( + ((current_price - fifty_two_low) / (fifty_two_high - fifty_two_low)) * 100, 1 + ) + + result = { + "ticker": ticker.upper(), + "current_price": current_price, + "trailing_pe": _safe_get(info, "trailingPE"), + "forward_pe": _safe_get(info, "forwardPE"), + "peg_ratio": _safe_get(info, "pegRatio"), + "price_to_book": _safe_get(info, "priceToBook"), + "price_to_sales": _safe_get(info, "priceToSalesTrailing12Months"), + "enterprise_value": _safe_get(info, "enterpriseValue"), + "ev_to_ebitda": _safe_get(info, "enterpriseToEbitda"), + "ev_to_revenue": _safe_get(info, "enterpriseToRevenue"), + "market_cap": _safe_get(info, "marketCap"), + "fifty_two_week_high": fifty_two_high, + "fifty_two_week_low": fifty_two_low, + "vs_52w_range_pct": vs_52w_range_pct, + "fifty_day_average": _safe_get(info, "fiftyDayAverage"), + "two_hundred_day_average": _safe_get(info, "twoHundredDayAverage"), + "profit_margins": _safe_get(info, "profitMargins"), + "operating_margins": _safe_get(info, "operatingMargins"), + "gross_margins": _safe_get(info, "grossMargins"), + "return_on_equity": _safe_get(info, "returnOnEquity"), + "return_on_assets": _safe_get(info, "returnOnAssets"), + "revenue_growth": _safe_get(info, "revenueGrowth"), + "earnings_growth": _safe_get(info, "earningsGrowth"), + "debt_to_equity": _safe_get(info, "debtToEquity"), + "current_ratio": _safe_get(info, "currentRatio"), + "free_cashflow": _safe_get(info, "freeCashflow"), + "book_value": _safe_get(info, "bookValue"), + } + + return json.dumps(result, default=str) + + except Exception as e: + return json.dumps({"error": f"Error fetching valuation data for {ticker}: {str(e)}"}) diff --git a/tradingagents/dataflows/y_finance.py b/tradingagents/dataflows/y_finance.py index bc78d8b3..ed7d11e5 100644 --- a/tradingagents/dataflows/y_finance.py +++ b/tradingagents/dataflows/y_finance.py @@ -461,4 +461,147 @@ def get_insider_transactions( return header + csv_string except Exception as e: - return f"Error retrieving insider transactions for {ticker}: {str(e)}" \ No newline at end of file + return f"Error retrieving insider transactions for {ticker}: {str(e)}" + + +# --- Macro data functions (used by interface.py routing) --- +# These are thin wrappers that delegate to yfinance directly. +# The actual @tool versions live in agents/utils/macro_data_tools.py. + +import json as _json + + +def _safe_get_yf(info, key, default=None): + val = info.get(key) + return default if val is None else val + + +def _fmt_num(val): + if val is None: + return None + if abs(val) >= 1e12: + return f"${val/1e12:.2f}T" + if abs(val) >= 1e9: + return f"${val/1e9:.2f}B" + if abs(val) >= 1e6: + return f"${val/1e6:.2f}M" + return f"${val:,.0f}" + + +def _period_return(ticker_obj, months): + import pandas as pd + try: + end_dt = pd.Timestamp.today() + start_dt = end_dt - pd.DateOffset(months=months) + data = ticker_obj.history(start=start_dt.strftime("%Y-%m-%d"), end=end_dt.strftime("%Y-%m-%d")) + if data.empty or len(data) < 2: + return None + return ((data["Close"].iloc[-1] / data["Close"].iloc[0]) - 1) * 100 + except Exception: + return None + + +def get_company_profile(ticker, curr_date=None): + """Get company profile via yfinance (plain function for interface routing).""" + try: + t = yf.Ticker(ticker.upper()) + info = t.info + if not info or not info.get("longName"): + return _json.dumps({"error": f"No data for {ticker}", "ticker": ticker}) + mc = _safe_get_yf(info, "marketCap") + cat = "large_cap" if mc and mc >= 10e9 else "mid_cap" if mc and mc >= 2e9 else "small_cap" if mc and mc >= 300e6 else "micro_cap" if mc else "unknown" + profile = { + "company_name": _safe_get_yf(info, "longName", "Unknown"), + "ticker": ticker.upper(), + "sector": _safe_get_yf(info, "sector", "Unknown"), + "industry": _safe_get_yf(info, "industry", "Unknown"), + "description": _safe_get_yf(info, "longBusinessSummary", ""), + "market_cap": mc, + "market_cap_formatted": _fmt_num(mc), + "market_cap_category": cat, + "current_price": _safe_get_yf(info, "currentPrice") or _safe_get_yf(info, "regularMarketPrice"), + } + return _json.dumps(profile, default=str) + except Exception as e: + return _json.dumps({"error": str(e), "ticker": ticker}) + + +def get_macro_indicators(curr_date=None): + """Get macro indicators via yfinance (plain function for interface routing).""" + results = {} + try: + vix = yf.Ticker("^VIX") + vd = vix.history(period="5d") + if not vd.empty: + results["vix_level"] = round(vd["Close"].iloc[-1], 2) + except Exception: + pass + try: + tnx = yf.Ticker("^TNX") + td = tnx.history(period="5d") + if not td.empty: + results["ten_year_yield"] = round(td["Close"].iloc[-1], 3) + except Exception: + pass + return _json.dumps(results, default=str) + + +def get_sector_rotation(ticker, curr_date=None): + """Get sector rotation data via yfinance (plain function for interface routing).""" + try: + t = yf.Ticker(ticker.upper()) + info = t.info + sector = _safe_get_yf(info, "sector", "Unknown") + return _json.dumps({"ticker": ticker.upper(), "sector": sector}, default=str) + except Exception as e: + return _json.dumps({"error": str(e)}) + + +def get_institutional_flow(ticker): + """Get institutional flow data via yfinance (plain function for interface routing).""" + try: + t = yf.Ticker(ticker.upper()) + info = t.info + return _json.dumps({ + "ticker": ticker.upper(), + "average_volume": _safe_get_yf(info, "averageVolume"), + "average_volume_10d": _safe_get_yf(info, "averageVolume10days"), + "float_shares": _safe_get_yf(info, "floatShares"), + "shares_short": _safe_get_yf(info, "sharesShort"), + "short_ratio": _safe_get_yf(info, "shortRatio"), + "held_percent_institutions": _safe_get_yf(info, "heldPercentInstitutions"), + }, default=str) + except Exception as e: + return _json.dumps({"error": str(e)}) + + +def get_earnings_estimates(ticker): + """Get earnings estimates via yfinance (plain function for interface routing).""" + try: + t = yf.Ticker(ticker.upper()) + info = t.info + return _json.dumps({ + "ticker": ticker.upper(), + "trailing_eps": _safe_get_yf(info, "trailingEps"), + "forward_eps": _safe_get_yf(info, "forwardEps"), + "current_price": _safe_get_yf(info, "currentPrice") or _safe_get_yf(info, "regularMarketPrice"), + }, default=str) + except Exception as e: + return _json.dumps({"error": str(e)}) + + +def get_valuation_peers(ticker): + """Get valuation peer data via yfinance (plain function for interface routing).""" + try: + t = yf.Ticker(ticker.upper()) + info = t.info + return _json.dumps({ + "ticker": ticker.upper(), + "trailing_pe": _safe_get_yf(info, "trailingPE"), + "forward_pe": _safe_get_yf(info, "forwardPE"), + "peg_ratio": _safe_get_yf(info, "pegRatio"), + "price_to_book": _safe_get_yf(info, "priceToBook"), + "ev_to_ebitda": _safe_get_yf(info, "enterpriseToEbitda"), + }, default=str) + except Exception as e: + return _json.dumps({"error": str(e)}) \ No newline at end of file diff --git a/tradingagents/graph/__init__.py b/tradingagents/graph/__init__.py index 80982c19..562b5a3d 100644 --- a/tradingagents/graph/__init__.py +++ b/tradingagents/graph/__init__.py @@ -1,17 +1,9 @@ # TradingAgents/graph/__init__.py from .trading_graph import TradingAgentsGraph -from .conditional_logic import ConditionalLogic -from .setup import GraphSetup -from .propagation import Propagator -from .reflection import Reflector -from .signal_processing import SignalProcessor +from .setup import StructuredGraphSetup __all__ = [ "TradingAgentsGraph", - "ConditionalLogic", - "GraphSetup", - "Propagator", - "Reflector", - "SignalProcessor", + "StructuredGraphSetup", ] diff --git a/tradingagents/graph/setup.py b/tradingagents/graph/setup.py index 08f6815c..b2f24033 100644 --- a/tradingagents/graph/setup.py +++ b/tradingagents/graph/setup.py @@ -1,243 +1,188 @@ -# TradingAgents/graph/setup.py +"""Graph setup for the structured equity ranking pipeline. -from typing import Dict, Any -from langchain_openai import ChatOpenAI -from langgraph.graph import END, StateGraph, START -from langgraph.prebuilt import ToolNode +Pipeline stages: + START → Validation → [veto gate] → Tier 1 (Macro+Liquidity parallel) + → Tier 2 (8 agents parallel) → Scoring (Archetype+MasterScore) + → Tier 3 (Bull+Bear parallel → Debate → Risk → FinalDecision) + → END +""" -from tradingagents.agents import * -from tradingagents.agents.utils.agent_states import AgentState +from __future__ import annotations -from .conditional_logic import ConditionalLogic -from .parallel_analysts import ( - create_parallel_analyst_node, - create_parallel_research_node, - create_parallel_risk_node, -) +import asyncio +import logging +import time +from typing import Any, Dict, List + +from langgraph.graph import END, START, StateGraph + +from tradingagents.agents.utils.agent_states import PipelineState + +logger = logging.getLogger(__name__) -class GraphSetup: - """Handles the setup and configuration of the agent graph.""" +class StructuredGraphSetup: + """Builds the structured equity ranking LangGraph.""" - def __init__( - self, - quick_thinking_llm: ChatOpenAI, - deep_thinking_llm: ChatOpenAI, - tool_nodes: Dict[str, ToolNode], - bull_memory, - bear_memory, - trader_memory, - invest_judge_memory, - risk_manager_memory, - conditional_logic: ConditionalLogic, - ): - """Initialize with required components.""" - self.quick_thinking_llm = quick_thinking_llm - self.deep_thinking_llm = deep_thinking_llm - self.tool_nodes = tool_nodes - self.bull_memory = bull_memory - self.bear_memory = bear_memory - self.trader_memory = trader_memory - self.invest_judge_memory = invest_judge_memory - self.risk_manager_memory = risk_manager_memory - self.conditional_logic = conditional_logic + def __init__(self, quick_llm, deep_llm): + self.quick_llm = quick_llm + self.deep_llm = deep_llm - def setup_graph( - self, selected_analysts=["market", "social", "news", "fundamentals"], - parallel=False, - ): - """Set up and compile the agent workflow graph. - - Args: - selected_analysts (list): List of analyst types to include. Options are: - - "market": Market analyst - - "social": Social media analyst - - "news": News analyst - - "fundamentals": Fundamentals analyst - parallel (bool): Run analysts in parallel instead of sequentially. - """ - if len(selected_analysts) == 0: - raise ValueError("Trading Agents Graph Setup Error: no analysts selected!") - - # Create analyst node functions and tool nodes - analyst_nodes = {} - delete_nodes = {} - tool_nodes = {} - - if "market" in selected_analysts: - analyst_nodes["market"] = create_market_analyst( - self.quick_thinking_llm - ) - delete_nodes["market"] = create_msg_delete() - tool_nodes["market"] = self.tool_nodes["market"] - - if "social" in selected_analysts: - analyst_nodes["social"] = create_social_media_analyst( - self.quick_thinking_llm - ) - delete_nodes["social"] = create_msg_delete() - tool_nodes["social"] = self.tool_nodes["social"] - - if "news" in selected_analysts: - analyst_nodes["news"] = create_news_analyst( - self.quick_thinking_llm - ) - delete_nodes["news"] = create_msg_delete() - tool_nodes["news"] = self.tool_nodes["news"] - - if "fundamentals" in selected_analysts: - analyst_nodes["fundamentals"] = create_fundamentals_analyst( - self.quick_thinking_llm - ) - delete_nodes["fundamentals"] = create_msg_delete() - tool_nodes["fundamentals"] = self.tool_nodes["fundamentals"] - - # Create researcher and manager nodes - bull_researcher_node = create_bull_researcher( - self.quick_thinking_llm, self.bull_memory - ) - bear_researcher_node = create_bear_researcher( - self.quick_thinking_llm, self.bear_memory - ) - research_manager_node = create_research_manager( - self.deep_thinking_llm, self.invest_judge_memory - ) - trader_node = create_trader(self.quick_thinking_llm, self.trader_memory) - - # Create risk analysis nodes - aggressive_analyst = create_aggressive_debator(self.quick_thinking_llm) - neutral_analyst = create_neutral_debator(self.quick_thinking_llm) - conservative_analyst = create_conservative_debator(self.quick_thinking_llm) - risk_manager_node = create_risk_manager( - self.deep_thinking_llm, self.risk_manager_memory + def setup_graph(self): + """Build and compile the structured pipeline graph.""" + from tradingagents.agents.structured import ( + create_archetype_node, + create_backlog_node, + create_bear_case_node, + create_bull_case_node, + create_business_quality_node, + create_crowding_node, + create_debate_node, + create_earnings_revisions_node, + create_entry_timing_node, + create_final_decision_node, + create_institutional_flow_node, + create_liquidity_node, + create_macro_node, + create_risk_node, + create_scoring_node, + create_sector_rotation_node, + create_validation_node, + create_valuation_node, ) - # Create workflow - workflow = StateGraph(AgentState) + # Create node functions + # Tier 1: cheap model (or no LLM for validation) + validation_fn = create_validation_node() + macro_fn = create_macro_node(self.quick_llm) + liquidity_fn = create_liquidity_node(self.quick_llm) - if parallel: - # Single node runs all analysts concurrently - parallel_node = create_parallel_analyst_node( - analyst_nodes, tool_nodes, selected_analysts - ) - workflow.add_node("Parallel Analysts", parallel_node) - else: - # Add analyst nodes individually for sequential execution - for analyst_type, node in analyst_nodes.items(): - workflow.add_node(f"{analyst_type.capitalize()} Analyst", node) - workflow.add_node( - f"Msg Clear {analyst_type.capitalize()}", delete_nodes[analyst_type] - ) - workflow.add_node(f"tools_{analyst_type}", tool_nodes[analyst_type]) + # Tier 2: cheap model for analysis + bq_fn = create_business_quality_node(self.quick_llm) + inst_fn = create_institutional_flow_node(self.quick_llm) + val_fn = create_valuation_node(self.quick_llm) + et_fn = create_entry_timing_node(self.quick_llm) + er_fn = create_earnings_revisions_node(self.quick_llm) + sr_fn = create_sector_rotation_node(self.quick_llm) + bl_fn = create_backlog_node(self.quick_llm) + cr_fn = create_crowding_node(self.quick_llm) + arch_fn = create_archetype_node(self.quick_llm) + score_fn = create_scoring_node() - if parallel: - # --- Parallel mode --- - # Analysts: single parallel node - # Research: Bull+Bear run concurrently in one node - # Risk: Agg+Con+Neu run concurrently in one node + # Tier 3: deep model for reasoning/debate + bull_fn = create_bull_case_node(self.deep_llm) + bear_fn = create_bear_case_node(self.deep_llm) + debate_fn = create_debate_node(self.deep_llm) + risk_fn = create_risk_node(self.deep_llm) + final_fn = create_final_decision_node(self.deep_llm) - parallel_research = create_parallel_research_node( - bull_researcher_node, bear_researcher_node - ) - parallel_risk = create_parallel_risk_node( - aggressive_analyst, conservative_analyst, neutral_analyst - ) + # Build parallel wrapper nodes + parallel_tier1 = _create_parallel_node( + [("macro", macro_fn), ("liquidity", liquidity_fn)], + "Tier 1", + ) + parallel_tier2 = _create_parallel_node( + [ + ("business_quality", bq_fn), + ("institutional_flow", inst_fn), + ("valuation", val_fn), + ("entry_timing", et_fn), + ("earnings_revisions", er_fn), + ("sector_rotation", sr_fn), + ("backlog", bl_fn), + ("crowding", cr_fn), + ], + "Tier 2", + ) + parallel_bull_bear = _create_parallel_node( + [("bull_case", bull_fn), ("bear_case", bear_fn)], + "Bull/Bear", + ) - workflow.add_node("Research Manager", research_manager_node) - workflow.add_node("Trader", trader_node) - workflow.add_node("Parallel Research", parallel_research) - workflow.add_node("Parallel Risk", parallel_risk) - workflow.add_node("Risk Judge", risk_manager_node) + # Archetype + Score combined node + def archetype_and_score(state): + arch_result = arch_fn(state) + merged = {**state, **arch_result} + score_result = score_fn(merged) + return {**arch_result, **score_result} - # Parallel Analysts → Parallel Research → Manager → Trader → Parallel Risk → Judge → END - workflow.add_edge(START, "Parallel Analysts") - workflow.add_edge("Parallel Analysts", "Parallel Research") - workflow.add_edge("Parallel Research", "Research Manager") - workflow.add_edge("Research Manager", "Trader") - workflow.add_edge("Trader", "Parallel Risk") - workflow.add_edge("Parallel Risk", "Risk Judge") - workflow.add_edge("Risk Judge", END) + # Risk + Final Decision combined node + def risk_and_decision(state): + risk_result = risk_fn(state) + merged = {**state, **risk_result} + final_result = final_fn(merged) + return {**risk_result, **final_result} - else: - # --- Sequential mode --- - # Individual analyst nodes with tool-calling loops - # Bull/Bear debate with conditional routing - # Agg/Con/Neu risk debate with conditional routing + # Build graph + workflow = StateGraph(PipelineState) - workflow.add_node("Bull Researcher", bull_researcher_node) - workflow.add_node("Bear Researcher", bear_researcher_node) - workflow.add_node("Research Manager", research_manager_node) - workflow.add_node("Trader", trader_node) - workflow.add_node("Aggressive Analyst", aggressive_analyst) - workflow.add_node("Neutral Analyst", neutral_analyst) - workflow.add_node("Conservative Analyst", conservative_analyst) - workflow.add_node("Risk Judge", risk_manager_node) + workflow.add_node("Validation", validation_fn) + workflow.add_node("Tier 1 Analysis", parallel_tier1) + workflow.add_node("Tier 2 Analysis", parallel_tier2) + workflow.add_node("Scoring", archetype_and_score) + workflow.add_node("Debate", parallel_bull_bear) + workflow.add_node("Debate Referee", debate_fn) + workflow.add_node("Decision", risk_and_decision) - first_analyst = selected_analysts[0] - workflow.add_edge(START, f"{first_analyst.capitalize()} Analyst") + # Edges + workflow.add_edge(START, "Validation") + workflow.add_conditional_edges( + "Validation", + _veto_gate, + {END: END, "continue": "Tier 1 Analysis"}, + ) + workflow.add_edge("Tier 1 Analysis", "Tier 2 Analysis") + workflow.add_edge("Tier 2 Analysis", "Scoring") + workflow.add_edge("Scoring", "Debate") + workflow.add_edge("Debate", "Debate Referee") + workflow.add_edge("Debate Referee", "Decision") + workflow.add_edge("Decision", END) - for i, analyst_type in enumerate(selected_analysts): - current_analyst = f"{analyst_type.capitalize()} Analyst" - current_tools = f"tools_{analyst_type}" - current_clear = f"Msg Clear {analyst_type.capitalize()}" - - workflow.add_conditional_edges( - current_analyst, - getattr(self.conditional_logic, f"should_continue_{analyst_type}"), - [current_tools, current_clear], - ) - workflow.add_edge(current_tools, current_analyst) - - if i < len(selected_analysts) - 1: - next_analyst = f"{selected_analysts[i+1].capitalize()} Analyst" - workflow.add_edge(current_clear, next_analyst) - else: - workflow.add_edge(current_clear, "Bull Researcher") - - workflow.add_conditional_edges( - "Bull Researcher", - self.conditional_logic.should_continue_debate, - { - "Bear Researcher": "Bear Researcher", - "Research Manager": "Research Manager", - }, - ) - workflow.add_conditional_edges( - "Bear Researcher", - self.conditional_logic.should_continue_debate, - { - "Bull Researcher": "Bull Researcher", - "Research Manager": "Research Manager", - }, - ) - workflow.add_edge("Research Manager", "Trader") - workflow.add_edge("Trader", "Aggressive Analyst") - workflow.add_conditional_edges( - "Aggressive Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Conservative Analyst": "Conservative Analyst", - "Risk Judge": "Risk Judge", - }, - ) - workflow.add_conditional_edges( - "Conservative Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Neutral Analyst": "Neutral Analyst", - "Risk Judge": "Risk Judge", - }, - ) - workflow.add_conditional_edges( - "Neutral Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Aggressive Analyst": "Aggressive Analyst", - "Risk Judge": "Risk Judge", - }, - ) - - workflow.add_edge("Risk Judge", END) - - # Compile and return return workflow.compile() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _veto_gate(state: Dict[str, Any]) -> str: + """Check if validation resulted in a hard veto.""" + if state.get("hard_veto"): + return END + validation = state.get("validation") or {} + if validation.get("veto"): + return END + return "continue" + + +def _create_parallel_node(agent_fns: List[tuple], label: str): + """Create an async node that runs multiple agent functions in parallel. + + Args: + agent_fns: List of (name, fn) tuples. + label: Label for logging. + """ + + async def parallel_node(state): + t0 = time.time() + + async def run_one(name, fn): + logger.debug("[%s] %s starting", label, name) + result = await asyncio.to_thread(fn, state) + logger.debug("[%s] %s done (%.1fs)", label, name, time.time() - t0) + return result + + tasks = [run_one(name, fn) for name, fn in agent_fns] + results = await asyncio.gather(*tasks, return_exceptions=True) + + merged: Dict[str, Any] = {} + for (name, _), result in zip(agent_fns, results): + if isinstance(result, Exception): + logger.error("[%s] %s failed: %s", label, name, result) + continue + merged.update(result) + + logger.info("[%s] completed in %.1fs", label, time.time() - t0) + return merged + + return parallel_node diff --git a/tradingagents/graph/trading_graph.py b/tradingagents/graph/trading_graph.py index bd33c4b9..2fd3ba80 100644 --- a/tradingagents/graph/trading_graph.py +++ b/tradingagents/graph/trading_graph.py @@ -1,80 +1,49 @@ -# TradingAgents/graph/trading_graph.py +"""Main orchestrator for the structured equity ranking engine. + +Replaces the old TradingAgentsGraph with a tiered Pydantic-based pipeline. +""" + +from __future__ import annotations -import os -from pathlib import Path import json +import os +import logging from datetime import date -from typing import Dict, Any, Tuple, List, Optional +from pathlib import Path +from typing import Any, Dict, List, Optional -from langgraph.prebuilt import ToolNode - -from tradingagents.llm_clients import create_llm_client - -from tradingagents.agents import * from tradingagents.default_config import DEFAULT_CONFIG -from tradingagents.agents.utils.memory import FinancialSituationMemory -from tradingagents.agents.utils.agent_states import ( - AgentState, - InvestDebateState, - RiskDebateState, -) +from tradingagents.llm_clients import create_llm_client from tradingagents.dataflows.config import set_config -# Import the new abstract tool methods from agent_utils -from tradingagents.agents.utils.agent_utils import ( - get_stock_data, - get_indicators, - get_fundamentals, - get_balance_sheet, - get_cashflow, - get_income_statement, - get_news, - get_insider_transactions, - get_global_news -) +from .setup import StructuredGraphSetup -from .conditional_logic import ConditionalLogic -from .setup import GraphSetup -from .propagation import Propagator -from .reflection import Reflector -from .signal_processing import SignalProcessor +logger = logging.getLogger(__name__) class TradingAgentsGraph: - """Main class that orchestrates the trading agents framework.""" + """Structured equity ranking engine built on LangGraph.""" def __init__( self, - selected_analysts=["market", "social", "news", "fundamentals"], + selected_analysts=None, # ignored — all agents run in structured pipeline debug=False, - config: Dict[str, Any] = None, + config: Optional[Dict[str, Any]] = None, callbacks: Optional[List] = None, ): - """Initialize the trading agents graph and components. - - Args: - selected_analysts: List of analyst types to include - debug: Whether to run in debug mode - config: Configuration dictionary. If None, uses default config - callbacks: Optional list of callback handlers (e.g., for tracking LLM/tool stats) - """ self.debug = debug self.config = config or DEFAULT_CONFIG self.callbacks = callbacks or [] - # Update the interface's config set_config(self.config) - # Create necessary directories os.makedirs( os.path.join(self.config["project_dir"], "dataflows/data_cache"), exist_ok=True, ) - # Initialize LLMs with provider-specific thinking configuration + # Initialize LLMs llm_kwargs = self._get_provider_kwargs() - - # Add callbacks to kwargs if provided (passed to LLM constructor) if self.callbacks: llm_kwargs["callbacks"] = self.callbacks @@ -93,192 +62,133 @@ class TradingAgentsGraph: self.deep_thinking_llm = deep_client.get_llm() self.quick_thinking_llm = quick_client.get_llm() - - # Initialize memories - self.bull_memory = FinancialSituationMemory("bull_memory", self.config) - self.bear_memory = FinancialSituationMemory("bear_memory", self.config) - self.trader_memory = FinancialSituationMemory("trader_memory", self.config) - self.invest_judge_memory = FinancialSituationMemory("invest_judge_memory", self.config) - self.risk_manager_memory = FinancialSituationMemory("risk_manager_memory", self.config) - # Create tool nodes - self.tool_nodes = self._create_tool_nodes() - - # Initialize components - self.conditional_logic = ConditionalLogic() - self.graph_setup = GraphSetup( - self.quick_thinking_llm, - self.deep_thinking_llm, - self.tool_nodes, - self.bull_memory, - self.bear_memory, - self.trader_memory, - self.invest_judge_memory, - self.risk_manager_memory, - self.conditional_logic, + # Build the structured pipeline graph + graph_setup = StructuredGraphSetup( + self.quick_thinking_llm, self.deep_thinking_llm ) - - self.propagator = Propagator() - self.reflector = Reflector(self.quick_thinking_llm) - self.signal_processor = SignalProcessor(self.quick_thinking_llm) + self.graph = graph_setup.setup_graph() # State tracking self.curr_state = None self.ticker = None - self.log_states_dict = {} # date to full state dict - - # Set up the graph (parallel analysts for speed when enabled) - parallel = self.config.get("parallel_analysts", False) - self.graph = self.graph_setup.setup_graph(selected_analysts, parallel=parallel) def _get_provider_kwargs(self) -> Dict[str, Any]: - """Get provider-specific kwargs for LLM client creation.""" kwargs = {} provider = self.config.get("llm_provider", "").lower() - if provider == "google": thinking_level = self.config.get("google_thinking_level") if thinking_level: kwargs["thinking_level"] = thinking_level - elif provider == "openai": reasoning_effort = self.config.get("openai_reasoning_effort") if reasoning_effort: kwargs["reasoning_effort"] = reasoning_effort - return kwargs - def _create_tool_nodes(self) -> Dict[str, ToolNode]: - """Create tool nodes for different data sources using abstract methods.""" - return { - "market": ToolNode( - [ - # Core stock data tools - get_stock_data, - # Technical indicators - get_indicators, - ] - ), - "social": ToolNode( - [ - # News tools for social media analysis - get_news, - ] - ), - "news": ToolNode( - [ - # News and insider information - get_news, - get_global_news, - get_insider_transactions, - ] - ), - "fundamentals": ToolNode( - [ - # Fundamental analysis tools - get_fundamentals, - get_balance_sheet, - get_cashflow, - get_income_statement, - ] - ), - } - - def propagate(self, company_name, trade_date): - """Run the trading agents graph for a company on a specific date.""" + async def propagate(self, company_name: str, trade_date: str): + """Run the structured pipeline for a company (async — parallel nodes).""" + import asyncio self.ticker = company_name - - # Initialize state - init_agent_state = self.propagator.create_initial_state( - company_name, trade_date - ) - args = self.propagator.get_graph_args() + init_state = self._create_initial_state(company_name, trade_date) + args = {"config": {"recursion_limit": 50}} if self.debug: - # Debug mode with tracing trace = [] - for chunk in self.graph.stream(init_agent_state, **args): - if len(chunk["messages"]) == 0: - pass - else: - chunk["messages"][-1].pretty_print() - trace.append(chunk) - - final_state = trace[-1] + async for chunk in self.graph.astream(init_state, stream_mode="values", **args): + trace.append(chunk) + final_state = trace[-1] if trace else init_state else: - # Standard mode without tracing - final_state = self.graph.invoke(init_agent_state, **args) + final_state = await self.graph.ainvoke(init_state, **args) - # Store current state for reflection self.curr_state = final_state - - # Log state self._log_state(trade_date, final_state) - # Return decision and processed signal - return final_state, self.process_signal(final_state["final_trade_decision"]) + decision = final_state.get("final_decision") or {} + signal = decision.get("action", "AVOID") + return final_state, signal - def _log_state(self, trade_date, final_state): - """Log the final state to a JSON file.""" - self.log_states_dict[str(trade_date)] = { - "company_of_interest": final_state["company_of_interest"], - "trade_date": final_state["trade_date"], - "market_report": final_state["market_report"], - "sentiment_report": final_state["sentiment_report"], - "news_report": final_state["news_report"], - "fundamentals_report": final_state["fundamentals_report"], - "investment_debate_state": { - "bull_history": final_state["investment_debate_state"]["bull_history"], - "bear_history": final_state["investment_debate_state"]["bear_history"], - "history": final_state["investment_debate_state"]["history"], - "current_response": final_state["investment_debate_state"][ - "current_response" - ], - "judge_decision": final_state["investment_debate_state"][ - "judge_decision" - ], - }, - "trader_investment_decision": final_state["trader_investment_plan"], - "risk_debate_state": { - "aggressive_history": final_state["risk_debate_state"]["aggressive_history"], - "conservative_history": final_state["risk_debate_state"]["conservative_history"], - "neutral_history": final_state["risk_debate_state"]["neutral_history"], - "history": final_state["risk_debate_state"]["history"], - "judge_decision": final_state["risk_debate_state"]["judge_decision"], - }, - "investment_plan": final_state["investment_plan"], - "final_trade_decision": final_state["final_trade_decision"], + def _create_initial_state(self, ticker: str, trade_date: str) -> Dict[str, Any]: + return { + "ticker": ticker.upper(), + "trade_date": str(trade_date), + "validation": None, + "company_card": None, + "macro": None, + "liquidity": None, + "sector_rotation": None, + "business_quality": None, + "institutional_flow": None, + "valuation": None, + "entry_timing": None, + "earnings_revisions": None, + "backlog": None, + "crowding": None, + "archetype": None, + "master_score": None, + "adjusted_score": None, + "position_role": None, + "bull_case": None, + "bear_case": None, + "debate": None, + "risk": None, + "final_decision": None, + "hard_veto": False, + "hard_veto_reason": None, + "global_flags": [], } - # Save to file - directory = Path(f"eval_results/{self.ticker}/TradingAgentsStrategy_logs/") + def _log_state(self, trade_date: str, state: Dict[str, Any]): + """Log the final state to JSON.""" + log_data = { + "ticker": state.get("ticker"), + "trade_date": str(trade_date), + "master_score": state.get("master_score"), + "adjusted_score": state.get("adjusted_score"), + "position_role": state.get("position_role"), + "hard_veto": state.get("hard_veto"), + "validation": state.get("validation"), + "company_card": state.get("company_card"), + "macro": state.get("macro"), + "liquidity": state.get("liquidity"), + "business_quality": state.get("business_quality"), + "institutional_flow": state.get("institutional_flow"), + "valuation": state.get("valuation"), + "entry_timing": state.get("entry_timing"), + "earnings_revisions": state.get("earnings_revisions"), + "sector_rotation": state.get("sector_rotation"), + "backlog": state.get("backlog"), + "crowding": state.get("crowding"), + "archetype": state.get("archetype"), + "bull_case": state.get("bull_case"), + "bear_case": state.get("bear_case"), + "debate": state.get("debate"), + "risk": state.get("risk"), + "final_decision": state.get("final_decision"), + } + + directory = Path(f"eval_results/{self.ticker}/StructuredPipeline_logs/") directory.mkdir(parents=True, exist_ok=True) - with open( - f"eval_results/{self.ticker}/TradingAgentsStrategy_logs/full_states_log_{trade_date}.json", - "w", - ) as f: - json.dump(self.log_states_dict, f, indent=4) + filepath = directory / f"analysis_{trade_date}.json" + with open(filepath, "w") as f: + json.dump(log_data, f, indent=2, default=str) + logger.info("State logged to %s", filepath) + + def process_signal(self, decision_text: str) -> str: + """Extract signal from decision text (legacy compatibility).""" + if isinstance(decision_text, dict): + return decision_text.get("action", "AVOID") + text = str(decision_text).upper() + if "BUY" in text: + return "BUY" + if "SELL" in text: + return "SELL" + if "HOLD" in text: + return "HOLD" + return "AVOID" def reflect_and_remember(self, returns_losses): - """Reflect on decisions and update memory based on returns.""" - self.reflector.reflect_bull_researcher( - self.curr_state, returns_losses, self.bull_memory - ) - self.reflector.reflect_bear_researcher( - self.curr_state, returns_losses, self.bear_memory - ) - self.reflector.reflect_trader( - self.curr_state, returns_losses, self.trader_memory - ) - self.reflector.reflect_invest_judge( - self.curr_state, returns_losses, self.invest_judge_memory - ) - self.reflector.reflect_risk_manager( - self.curr_state, returns_losses, self.risk_manager_memory - ) - - def process_signal(self, full_signal): - """Process a signal to extract the core decision.""" - return self.signal_processor.process_signal(full_signal) + """No-op for structured pipeline (no BM25 memory).""" + pass diff --git a/tradingagents/models.py b/tradingagents/models.py new file mode 100644 index 00000000..627b77c3 --- /dev/null +++ b/tradingagents/models.py @@ -0,0 +1,408 @@ +"""Pydantic structured output models for the equity ranking engine. + +All agents return one of these models. Deterministic scoring functions +compute master_score, confidence penalties, position roles, and hard vetoes +using only structured outputs — no prose drives downstream decisions. +""" + +from __future__ import annotations + +import json +import logging +from typing import List, Literal, Optional, Tuple + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Types +# --------------------------------------------------------------------------- + +RiskLevel = Literal["low", "medium", "high"] + +PositionRole = Literal[ + "Core Position", + "Strong Position", + "Tactical / Satellite", + "Watchlist", + "Avoid", +] + +Archetype = Literal[ + "Infrastructure Builder", + "Bottleneck Supplier", + "Platform Company", + "Commodity Leverage", + "Secular Growth Innovator", + "Turnaround", + "Defensive Compounder", +] + + +# --------------------------------------------------------------------------- +# Data quality flag +# --------------------------------------------------------------------------- + +class DataFlag(BaseModel): + """A data quality issue discovered during analysis.""" + field: str + severity: Literal["minor", "moderate", "severe"] + message: str + + +# --------------------------------------------------------------------------- +# Base output (inherited by most agents) +# --------------------------------------------------------------------------- + +class AgentBaseOutput(BaseModel): + """Common structured fields every analyst agent must return.""" + agent_name: str = "" + score_0_to_10: float = Field(default=5.0, ge=0, le=10) + confidence_0_to_1: float = Field(default=0.5, ge=0, le=1) + key_positives: List[str] = Field(default_factory=list) + key_negatives: List[str] = Field(default_factory=list) + key_risks: List[str] = Field(default_factory=list) + data_quality_flags: List[DataFlag] = Field(default_factory=list) + veto: bool = False + veto_reason: Optional[str] = None + summary_1_sentence: str = "" + + +# --------------------------------------------------------------------------- +# Tier 1 outputs +# --------------------------------------------------------------------------- + +class ValidationOutput(BaseModel): + """Ticker validation and identity check.""" + agent_name: str = "Validation" + ticker_valid: bool = True + ticker_resolved: str = "" + company_name: str = "" + company_name_match: bool = True + exchange: Optional[str] = None + sector: Optional[str] = None + industry: Optional[str] = None + is_active: bool = True + veto: bool = False + veto_reason: Optional[str] = None + data_quality_flags: List[DataFlag] = Field(default_factory=list) + + +class CompanyCard(BaseModel): + """Cached company identity card.""" + company_name: str = "" + ticker: str = "" + sector: str = "Unknown" + industry: str = "Unknown" + description: str = "" + market_cap: Optional[float] = None + market_cap_formatted: Optional[str] = None + market_cap_category: str = "unknown" + current_price: Optional[float] = None + revenue: Optional[float] = None + profit_margins: Optional[float] = None + employees: Optional[int] = None + competitors: List[str] = Field(default_factory=list) + + +class MacroRegimeOutput(AgentBaseOutput): + """Macro regime assessment.""" + agent_name: str = "Macro Regime" + vix_level: Optional[float] = None + vix_regime: str = "unknown" + ten_year_yield: Optional[float] = None + dollar_strength: str = "unknown" + credit_spread_direction: str = "unknown" + spy_1m_return: Optional[float] = None + regime_label: str = "unknown" + macro_alignment_0_to_10: float = Field(default=5.0, ge=0, le=10) + + +class LiquidityOutput(AgentBaseOutput): + """Liquidity and market conditions.""" + agent_name: str = "Liquidity" + fed_stance: str = "unknown" + market_breadth: str = "unknown" + volume_profile: str = "unknown" + spy_trend: str = "unknown" + + +# --------------------------------------------------------------------------- +# Tier 2 outputs +# --------------------------------------------------------------------------- + +class BusinessQualityOutput(AgentBaseOutput): + """Business quality and competitive position.""" + agent_name: str = "Business Quality" + revenue_growth: Optional[float] = None + profit_margins: Optional[float] = None + operating_margins: Optional[float] = None + return_on_equity: Optional[float] = None + return_on_assets: Optional[float] = None + debt_to_equity: Optional[float] = None + free_cashflow: Optional[float] = None + competitive_moat: str = "unknown" + management_quality: str = "unknown" + + +class InstitutionalFlowOutput(AgentBaseOutput): + """Institutional ownership and flow signals.""" + agent_name: str = "Institutional Flow" + institutional_ownership_pct: Optional[float] = None + insider_ownership_pct: Optional[float] = None + volume_ratio: Optional[float] = None + short_interest_pct: Optional[float] = None + short_ratio: Optional[float] = None + float_turnover_pct: Optional[float] = None + accumulation_signal: str = "unknown" + + +class ValuationOutput(AgentBaseOutput): + """Valuation metrics and verdict.""" + agent_name: str = "Valuation" + trailing_pe: Optional[float] = None + forward_pe: Optional[float] = None + peg_ratio: Optional[float] = None + price_to_book: Optional[float] = None + ev_to_ebitda: Optional[float] = None + price_to_sales: Optional[float] = None + vs_52w_range_pct: Optional[float] = None + valuation_verdict: str = "unknown" + + +class EntryTimingOutput(AgentBaseOutput): + """Technical entry timing signals.""" + agent_name: str = "Entry Timing" + current_price: Optional[float] = None + fifty_day_avg: Optional[float] = None + two_hundred_day_avg: Optional[float] = None + fifty_day_vs_200_day: str = "unknown" + vs_52w_range_pct: Optional[float] = None + timing_verdict: str = "unknown" + + +class EarningsRevisionOutput(AgentBaseOutput): + """Earnings revision and analyst consensus.""" + agent_name: str = "Earnings Revisions" + trailing_eps: Optional[float] = None + forward_eps: Optional[float] = None + eps_revision_direction: str = "unknown" + revenue_revision_direction: str = "unknown" + analyst_consensus: str = "unknown" + price_target_upside_pct: Optional[float] = None + num_analysts: Optional[int] = None + + +class SectorRotationOutput(AgentBaseOutput): + """Sector rotation and relative strength.""" + agent_name: str = "Sector Rotation" + sector: str = "Unknown" + sector_etf: Optional[str] = None + sector_vs_spy_1m: Optional[float] = None + sector_vs_spy_3m: Optional[float] = None + sector_rank: Optional[int] = None + total_sectors: int = 11 + rotation_direction: str = "unknown" + + +class BacklogOrderMomentumOutput(AgentBaseOutput): + """Backlog and order momentum (where applicable).""" + agent_name: str = "Backlog / Order Momentum" + has_backlog_data: bool = False + backlog_trend: str = "unknown" + order_momentum: str = "unknown" + + +class NarrativeCrowdingOutput(AgentBaseOutput): + """Narrative crowding and contrarian signals.""" + agent_name: str = "Narrative Crowding" + narrative_saturation: str = "unknown" + contrarian_opportunity: bool = False + media_sentiment: str = "unknown" + short_squeeze_potential: bool = False + + +class ArchetypeOutput(BaseModel): + """Company archetype classification.""" + archetype: str = "Secular Growth Innovator" + archetype_confidence: float = Field(default=0.5, ge=0, le=1) + reasoning: str = "" + + +# --------------------------------------------------------------------------- +# Tier 3 outputs +# --------------------------------------------------------------------------- + +class BullCaseOutput(BaseModel): + """Structured bull case thesis.""" + thesis: str = "" + catalysts: List[str] = Field(default_factory=list) + upside_target: Optional[float] = None + upside_pct: Optional[float] = None + key_assumptions: List[str] = Field(default_factory=list) + thesis_invalidation_triggers: List[str] = Field(default_factory=list) + confidence_0_to_1: float = Field(default=0.5, ge=0, le=1) + + +class BearCaseOutput(BaseModel): + """Structured bear case thesis.""" + thesis: str = "" + risks: List[str] = Field(default_factory=list) + downside_target: Optional[float] = None + downside_pct: Optional[float] = None + key_assumptions: List[str] = Field(default_factory=list) + thesis_invalidation_triggers: List[str] = Field(default_factory=list) + confidence_0_to_1: float = Field(default=0.5, ge=0, le=1) + + +class DebateRefereeOutput(BaseModel): + """Debate referee decision.""" + winner: str = "bull" + reasoning: str = "" + bull_strength_0_to_10: float = Field(default=5.0, ge=0, le=10) + bear_strength_0_to_10: float = Field(default=5.0, ge=0, le=10) + key_unresolved_questions: List[str] = Field(default_factory=list) + net_conviction_adjustment: float = Field(default=0.0, ge=-2, le=2) + + +class RiskInvalidationOutput(AgentBaseOutput): + """Risk assessment and invalidation triggers.""" + agent_name: str = "Risk / Invalidation" + overall_risk_level: str = "medium" + max_position_size_pct: float = Field(default=5.0, ge=0, le=100) + stop_loss_pct: Optional[float] = None + invalidation_triggers: List[str] = Field(default_factory=list) + + +class FinalDecisionOutput(BaseModel): + """Final synthesized decision with narrative.""" + ticker: str = "" + company_name: str = "" + master_score: float = 0.0 + adjusted_score: float = 0.0 + confidence: float = 0.0 + position_role: str = "Avoid" + action: str = "AVOID" + risk_level: str = "medium" + thesis_summary: str = "" + key_catalysts: List[str] = Field(default_factory=list) + key_risks: List[str] = Field(default_factory=list) + invalidation_triggers: List[str] = Field(default_factory=list) + position_sizing_pct: float = 0.0 + narrative: str = "" + + +# --------------------------------------------------------------------------- +# Deterministic scoring functions +# --------------------------------------------------------------------------- + +def compute_master_score( + business_quality: float, + macro_alignment: float, + institutional_flow: float, + valuation: float, + entry_timing: float, + earnings_revisions: float, + backlog: float, + crowding: float, +) -> float: + """Compute weighted master score (0-100). + + Weights: + 25% business_quality + 20% macro_alignment + 15% institutional_flow + 10% valuation + 10% entry_timing + 10% earnings_revisions + 5% backlog + 5% crowding + """ + weighted = ( + 0.25 * business_quality + + 0.20 * macro_alignment + + 0.15 * institutional_flow + + 0.10 * valuation + + 0.10 * entry_timing + + 0.10 * earnings_revisions + + 0.05 * backlog + + 0.05 * crowding + ) + return round(weighted * 10, 2) + + +def assign_position_role(score: float) -> str: + """Map master score to a position role.""" + if score > 80: + return "Core Position" + if score > 70: + return "Strong Position" + if score > 60: + return "Tactical / Satellite" + if score > 50: + return "Watchlist" + return "Avoid" + + +def apply_confidence_penalty( + base_score: float, + flags: List[DataFlag], + hard_veto: bool, +) -> float: + """Reduce score based on data quality flags. + + Penalties: minor=-0.5, moderate=-1.0, severe=-2.0 + """ + if hard_veto: + return 0.0 + penalty = 0.0 + for flag in flags: + if flag.severity == "minor": + penalty += 0.5 + elif flag.severity == "moderate": + penalty += 1.0 + elif flag.severity == "severe": + penalty += 2.0 + return max(0.0, base_score - penalty) + + +def should_hard_veto( + validation: Optional[ValidationOutput], + risk: Optional[RiskInvalidationOutput] = None, +) -> Tuple[bool, Optional[str]]: + """Check if analysis should be hard-vetoed.""" + if validation is not None: + if validation.veto: + return True, validation.veto_reason or "Validation veto" + if not validation.company_name_match: + return True, "Company name mismatch" + if not validation.ticker_valid: + return True, "Invalid ticker" + if risk is not None and risk.veto: + return True, risk.veto_reason or "Risk veto" + return False, None + + +# --------------------------------------------------------------------------- +# LLM structured output helper +# --------------------------------------------------------------------------- + +def invoke_structured(llm, model_cls, prompt: str): + """Call LLM with structured output, with JSON fallback.""" + try: + structured = llm.with_structured_output(model_cls) + return structured.invoke(prompt) + except Exception as e: + logger.warning("Structured output failed for %s: %s — using JSON fallback", model_cls.__name__, e) + schema_str = json.dumps(model_cls.model_json_schema(), indent=2) + json_prompt = ( + f"{prompt}\n\nReturn ONLY valid JSON matching this schema:\n{schema_str}" + ) + response = llm.invoke(json_prompt) + content = response.content.strip() + if "```json" in content: + content = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + content = content.split("```")[1].split("```")[0].strip() + return model_cls.model_validate_json(content)