From ae2c813d8ad161f924a244d614f58e889aadf6a0 Mon Sep 17 00:00:00 2001 From: Robin Lindbladh Date: Tue, 24 Mar 2026 20:10:21 +0100 Subject: [PATCH] feat: add portfolio analysis for multi-stock comparison Add PortfolioAnalyzer class that runs the full agent pipeline on multiple stocks and produces a comparative KEEP/REDUCE/EXIT recommendation using the deep thinking LLM. Includes per-ticker error handling, graceful degradation on LLM failure, and result logging. Addresses #60 and partially #406. Co-Authored-By: Claude Opus 4.6 (1M context) --- examples/portfolio_analysis.py | 37 +++++ tradingagents/graph/__init__.py | 2 + tradingagents/graph/portfolio_analysis.py | 183 ++++++++++++++++++++++ tradingagents/graph/trading_graph.py | 17 ++ 4 files changed, 239 insertions(+) create mode 100644 examples/portfolio_analysis.py create mode 100644 tradingagents/graph/portfolio_analysis.py diff --git a/examples/portfolio_analysis.py b/examples/portfolio_analysis.py new file mode 100644 index 00000000..2ddeecc2 --- /dev/null +++ b/examples/portfolio_analysis.py @@ -0,0 +1,37 @@ +"""Portfolio analysis example. + +Analyzes multiple stocks in a portfolio and produces a comparative +recommendation for each position (KEEP / REDUCE / EXIT). + +Related GitHub issues: #60, #406 +""" + +from tradingagents.graph.trading_graph import TradingAgentsGraph +from tradingagents.default_config import DEFAULT_CONFIG +from dotenv import load_dotenv + +load_dotenv() + +config = DEFAULT_CONFIG.copy() +# Customize LLM provider and models as needed: +# config["llm_provider"] = "anthropic" # or "openai", "google" +# config["deep_think_llm"] = "claude-sonnet-4-20250514" +# config["quick_think_llm"] = "claude-haiku-4-5-20251001" +config["max_debate_rounds"] = 1 + +ta = TradingAgentsGraph(debug=False, config=config) + +# Define your portfolio +portfolio = ["NVDA", "AAPL", "MSFT", "GOOGL", "AMZN"] + +# Run the portfolio analysis +results = ta.propagate_portfolio(portfolio, "2025-03-23") + +# Print individual signals +print("\n=== INDIVIDUAL SIGNALS ===") +for ticker, result in results["individual_results"].items(): + print(f" {ticker}: {result['signal']}") + +# Print the comparative portfolio summary +print("\n=== PORTFOLIO SUMMARY ===") +print(results["portfolio_summary"]) diff --git a/tradingagents/graph/__init__.py b/tradingagents/graph/__init__.py index 80982c19..9bc3e725 100644 --- a/tradingagents/graph/__init__.py +++ b/tradingagents/graph/__init__.py @@ -6,6 +6,7 @@ from .setup import GraphSetup from .propagation import Propagator from .reflection import Reflector from .signal_processing import SignalProcessor +from .portfolio_analysis import PortfolioAnalyzer __all__ = [ "TradingAgentsGraph", @@ -14,4 +15,5 @@ __all__ = [ "Propagator", "Reflector", "SignalProcessor", + "PortfolioAnalyzer", ] diff --git a/tradingagents/graph/portfolio_analysis.py b/tradingagents/graph/portfolio_analysis.py new file mode 100644 index 00000000..355f6e69 --- /dev/null +++ b/tradingagents/graph/portfolio_analysis.py @@ -0,0 +1,183 @@ +# TradingAgents/graph/portfolio_analysis.py + +import json +from pathlib import Path +from typing import Any, Callable, Dict, List, Tuple + +from langchain_openai import ChatOpenAI + + +class PortfolioAnalyzer: + """Analyzes multiple stocks and produces a comparative portfolio recommendation. + + Follows the same delegation pattern as SignalProcessor and Reflector — + the orchestrator (TradingAgentsGraph) owns the graph and LLMs, this class + owns the portfolio-level prompt, comparison logic, and logging. + """ + + def __init__(self, deep_thinking_llm: ChatOpenAI): + """Initialize with the deep thinking LLM for comparative analysis. + + Args: + deep_thinking_llm: The LLM instance used for the portfolio summary. + """ + self.deep_thinking_llm = deep_thinking_llm + + def analyze( + self, + tickers: List[str], + trade_date: str, + propagate_fn: Callable[[str, str], Tuple[Dict[str, Any], str]], + debug: bool = False, + ) -> Dict[str, Any]: + """Run analysis on multiple stocks and produce a comparative summary. + + Args: + tickers: List of ticker symbols to analyze. + trade_date: The trade date string (e.g., "2026-03-23"). + propagate_fn: The single-stock propagation function (typically + TradingAgentsGraph.propagate). + debug: Whether to print progress output. + + Returns: + Dictionary with: + - "individual_results": dict mapping ticker to its decision and signal + - "portfolio_summary": the comparative LLM analysis + + Raises: + ValueError: If tickers is empty. + """ + if not tickers: + raise ValueError("tickers must be a non-empty list") + + individual_results = self._analyze_individual( + tickers, trade_date, propagate_fn, debug + ) + + portfolio_summary = self._generate_summary( + individual_results, trade_date + ) + + self._log_portfolio(trade_date, tickers, individual_results, portfolio_summary) + + return { + "individual_results": individual_results, + "portfolio_summary": portfolio_summary, + } + + def _analyze_individual( + self, + tickers: List[str], + trade_date: str, + propagate_fn: Callable, + debug: bool, + ) -> Dict[str, Dict[str, str]]: + """Run the agent pipeline on each ticker, collecting results.""" + individual_results = {} + + for ticker in tickers: + if debug: + print(f"\n{'='*60}") + print(f"Analyzing {ticker}...") + print(f"{'='*60}\n") + + try: + final_state, signal = propagate_fn(ticker, trade_date) + individual_results[ticker] = { + "signal": signal, + "final_trade_decision": final_state["final_trade_decision"], + } + except Exception as e: + if debug: + print(f"Error analyzing {ticker}: {e}") + individual_results[ticker] = { + "signal": "ERROR", + "final_trade_decision": f"Analysis failed: {e}", + } + + return individual_results + + def _generate_summary( + self, + individual_results: Dict[str, Dict[str, str]], + trade_date: str, + ) -> str: + """Use the deep thinking LLM to compare all positions.""" + # Skip summary if all tickers failed + successful = { + t: r for t, r in individual_results.items() if r["signal"] != "ERROR" + } + if not successful: + return "Portfolio summary unavailable — all individual analyses failed." + + analyses_text = self._build_analyses_text(successful) + messages = [ + ("system", self._get_system_prompt()), + ( + "human", + f"Here are the individual analyses for my portfolio positions " + f"as of {trade_date}:\n{analyses_text}\n\n" + f"Please provide a comparative portfolio recommendation.", + ), + ] + + try: + return self.deep_thinking_llm.invoke(messages).content + except Exception as e: + return ( + f"Portfolio summary generation failed: {e}\n" + f"Individual signals were: " + + ", ".join(f"{t}: {r['signal']}" for t, r in individual_results.items()) + ) + + def _build_analyses_text(self, results: Dict[str, Dict[str, str]]) -> str: + """Format individual results into a text block for the LLM prompt.""" + parts = [] + for ticker, result in results.items(): + parts.append( + f"--- {ticker} ---\n" + f"Rating: {result['signal']}\n" + f"Full Analysis:\n{result['final_trade_decision']}" + ) + return "\n".join(parts) + + def _get_system_prompt(self) -> str: + """Return the system prompt for the portfolio comparison LLM call.""" + return ( + "You are a senior portfolio strategist. You have received individual " + "stock analyses for all positions in a portfolio. Your job is to compare " + "them relative to each other and provide a clear, actionable portfolio " + "recommendation.\n\n" + "For each stock, assign one of: KEEP, REDUCE, or EXIT.\n\n" + "Structure your response as:\n" + "1. A ranked summary table (best to worst) with ticker, action, and " + "one-line rationale.\n" + "2. A brief portfolio-level commentary covering overall risk exposure, " + "sector concentration, and any suggested rebalancing.\n\n" + "Be direct and concise. This is for an experienced investor." + ) + + def _log_portfolio( + self, + trade_date: str, + tickers: List[str], + individual_results: Dict[str, Dict[str, str]], + portfolio_summary: str, + ) -> None: + """Log the portfolio analysis results to a JSON file.""" + directory = Path("eval_results/portfolio/") + directory.mkdir(parents=True, exist_ok=True) + + log_data = { + "trade_date": str(trade_date), + "tickers": tickers, + "individual_results": individual_results, + "portfolio_summary": portfolio_summary, + } + + with open( + f"eval_results/portfolio/portfolio_analysis_{trade_date}.json", + "w", + encoding="utf-8", + ) as f: + json.dump(log_data, f, indent=4) diff --git a/tradingagents/graph/trading_graph.py b/tradingagents/graph/trading_graph.py index c8cd7492..b8c9c825 100644 --- a/tradingagents/graph/trading_graph.py +++ b/tradingagents/graph/trading_graph.py @@ -38,6 +38,7 @@ from .setup import GraphSetup from .propagation import Propagator from .reflection import Reflector from .signal_processing import SignalProcessor +from .portfolio_analysis import PortfolioAnalyzer class TradingAgentsGraph: @@ -124,6 +125,7 @@ class TradingAgentsGraph: self.propagator = Propagator() self.reflector = Reflector(self.quick_thinking_llm) self.signal_processor = SignalProcessor(self.quick_thinking_llm) + self.portfolio_analyzer = PortfolioAnalyzer(self.deep_thinking_llm) # State tracking self.curr_state = None @@ -269,6 +271,21 @@ class TradingAgentsGraph: ) as f: json.dump(self.log_states_dict, f, indent=4) + def propagate_portfolio( + self, tickers: List[str], trade_date: str + ) -> Dict[str, Any]: + """Run analysis on multiple stocks and produce a comparative portfolio summary. + + Delegates to PortfolioAnalyzer.analyze — see that class for full details. + + Note: Each call to propagate() overwrites self.ticker and self.curr_state, + so after this method returns, both reflect only the last ticker analyzed. + Calling reflect_and_remember() afterward will only apply to that last ticker. + """ + return self.portfolio_analyzer.analyze( + tickers, trade_date, self.propagate, debug=self.debug + ) + def reflect_and_remember(self, returns_losses): """Reflect on decisions and update memory based on returns.""" self.reflector.reflect_bull_researcher(