Merge ccf375eafd into 10c136f49c
This commit is contained in:
commit
7958654394
|
|
@ -0,0 +1,37 @@
|
||||||
|
"""Portfolio analysis example.
|
||||||
|
|
||||||
|
Analyzes multiple stocks in a portfolio and produces a comparative
|
||||||
|
recommendation for each position (KEEP / REDUCE / EXIT).
|
||||||
|
|
||||||
|
Related GitHub issues: #60, #406
|
||||||
|
"""
|
||||||
|
|
||||||
|
from tradingagents.graph.trading_graph import TradingAgentsGraph
|
||||||
|
from tradingagents.default_config import DEFAULT_CONFIG
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
config = DEFAULT_CONFIG.copy()
|
||||||
|
# Customize LLM provider and models as needed:
|
||||||
|
# config["llm_provider"] = "anthropic" # or "openai", "google"
|
||||||
|
# config["deep_think_llm"] = "claude-sonnet-4-20250514"
|
||||||
|
# config["quick_think_llm"] = "claude-haiku-4-5-20251001"
|
||||||
|
config["max_debate_rounds"] = 1
|
||||||
|
|
||||||
|
ta = TradingAgentsGraph(debug=False, config=config)
|
||||||
|
|
||||||
|
# Define your portfolio
|
||||||
|
portfolio = ["NVDA", "AAPL", "MSFT", "GOOGL", "AMZN"]
|
||||||
|
|
||||||
|
# Run the portfolio analysis
|
||||||
|
results = ta.propagate_portfolio(portfolio, "2025-03-23")
|
||||||
|
|
||||||
|
# Print individual signals
|
||||||
|
print("\n=== INDIVIDUAL SIGNALS ===")
|
||||||
|
for ticker, result in results["individual_results"].items():
|
||||||
|
print(f" {ticker}: {result['signal']}")
|
||||||
|
|
||||||
|
# Print the comparative portfolio summary
|
||||||
|
print("\n=== PORTFOLIO SUMMARY ===")
|
||||||
|
print(results["portfolio_summary"])
|
||||||
|
|
@ -6,6 +6,7 @@ from .setup import GraphSetup
|
||||||
from .propagation import Propagator
|
from .propagation import Propagator
|
||||||
from .reflection import Reflector
|
from .reflection import Reflector
|
||||||
from .signal_processing import SignalProcessor
|
from .signal_processing import SignalProcessor
|
||||||
|
from .portfolio_analysis import PortfolioAnalyzer
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"TradingAgentsGraph",
|
"TradingAgentsGraph",
|
||||||
|
|
@ -14,4 +15,5 @@ __all__ = [
|
||||||
"Propagator",
|
"Propagator",
|
||||||
"Reflector",
|
"Reflector",
|
||||||
"SignalProcessor",
|
"SignalProcessor",
|
||||||
|
"PortfolioAnalyzer",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,192 @@
|
||||||
|
# TradingAgents/graph/portfolio_analysis.py
|
||||||
|
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import traceback
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Callable, Dict, List, Tuple
|
||||||
|
|
||||||
|
from langchain_core.language_models.chat_models import BaseChatModel
|
||||||
|
|
||||||
|
|
||||||
|
class PortfolioAnalyzer:
|
||||||
|
"""Analyzes multiple stocks and produces a comparative portfolio recommendation.
|
||||||
|
|
||||||
|
Follows the same delegation pattern as SignalProcessor and Reflector —
|
||||||
|
the orchestrator (TradingAgentsGraph) owns the graph and LLMs, this class
|
||||||
|
owns the portfolio-level prompt, comparison logic, and logging.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, deep_thinking_llm: BaseChatModel, config: Dict[str, Any]):
|
||||||
|
"""Initialize with the deep thinking LLM for comparative analysis.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
deep_thinking_llm: The LLM instance used for the portfolio summary.
|
||||||
|
config: The configuration dictionary for the application.
|
||||||
|
"""
|
||||||
|
self.deep_thinking_llm = deep_thinking_llm
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
def analyze(
|
||||||
|
self,
|
||||||
|
tickers: List[str],
|
||||||
|
trade_date: str,
|
||||||
|
propagate_fn: Callable[[str, str], Tuple[Dict[str, Any], str]],
|
||||||
|
debug: bool = False,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Run analysis on multiple stocks and produce a comparative summary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tickers: List of ticker symbols to analyze.
|
||||||
|
trade_date: The trade date string (e.g., "2026-03-23").
|
||||||
|
propagate_fn: The single-stock propagation function (typically
|
||||||
|
TradingAgentsGraph.propagate).
|
||||||
|
debug: Whether to print progress output.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with:
|
||||||
|
- "individual_results": dict mapping ticker to its decision and signal
|
||||||
|
- "portfolio_summary": the comparative LLM analysis
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If tickers is empty.
|
||||||
|
"""
|
||||||
|
if not tickers:
|
||||||
|
raise ValueError("tickers must be a non-empty list")
|
||||||
|
|
||||||
|
individual_results = self._analyze_individual(
|
||||||
|
tickers, trade_date, propagate_fn, debug
|
||||||
|
)
|
||||||
|
|
||||||
|
portfolio_summary = self._generate_summary(
|
||||||
|
individual_results, trade_date, debug
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._log_portfolio(trade_date, tickers, individual_results, portfolio_summary)
|
||||||
|
except Exception as e:
|
||||||
|
if debug:
|
||||||
|
print(f"Warning: failed to save portfolio log: {e}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"individual_results": individual_results,
|
||||||
|
"portfolio_summary": portfolio_summary,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _analyze_individual(
|
||||||
|
self,
|
||||||
|
tickers: List[str],
|
||||||
|
trade_date: str,
|
||||||
|
propagate_fn: Callable[[str, str], Tuple[Dict[str, Any], str]],
|
||||||
|
debug: bool,
|
||||||
|
) -> Dict[str, Dict[str, str]]:
|
||||||
|
"""Run the agent pipeline on each ticker, collecting results."""
|
||||||
|
individual_results = {}
|
||||||
|
|
||||||
|
for ticker in tickers:
|
||||||
|
if debug:
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"Analyzing {ticker}...")
|
||||||
|
print(f"{'='*60}\n")
|
||||||
|
|
||||||
|
try:
|
||||||
|
final_state, signal = propagate_fn(ticker, trade_date)
|
||||||
|
individual_results[ticker] = {
|
||||||
|
"signal": signal,
|
||||||
|
"final_trade_decision": final_state["final_trade_decision"],
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
if debug:
|
||||||
|
print(f"Error analyzing {ticker}: {e}")
|
||||||
|
error_msg = f"Analysis failed: {e}"
|
||||||
|
if debug:
|
||||||
|
error_msg += f"\n{traceback.format_exc()}"
|
||||||
|
individual_results[ticker] = {
|
||||||
|
"signal": "ERROR",
|
||||||
|
"final_trade_decision": error_msg,
|
||||||
|
}
|
||||||
|
|
||||||
|
return individual_results
|
||||||
|
|
||||||
|
def _generate_summary(
|
||||||
|
self,
|
||||||
|
individual_results: Dict[str, Dict[str, str]],
|
||||||
|
trade_date: str,
|
||||||
|
debug: bool = False,
|
||||||
|
) -> str:
|
||||||
|
"""Use the deep thinking LLM to compare all positions."""
|
||||||
|
# Skip summary if all tickers failed
|
||||||
|
successful = {
|
||||||
|
t: r for t, r in individual_results.items() if r["signal"] != "ERROR"
|
||||||
|
}
|
||||||
|
if not successful:
|
||||||
|
return "Portfolio summary unavailable — all individual analyses failed."
|
||||||
|
|
||||||
|
analyses_text = self._build_analyses_text(successful)
|
||||||
|
messages = [
|
||||||
|
("system", self._get_system_prompt()),
|
||||||
|
(
|
||||||
|
"human",
|
||||||
|
f"Here are the individual analyses for my portfolio positions "
|
||||||
|
f"as of {trade_date}:\n{analyses_text}\n\n"
|
||||||
|
f"Please provide a comparative portfolio recommendation.",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self.deep_thinking_llm.invoke(messages).content
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Portfolio summary generation failed: {e}"
|
||||||
|
if debug:
|
||||||
|
error_msg += f"\n{traceback.format_exc()}"
|
||||||
|
signals = ", ".join(f"{t}: {r['signal']}" for t, r in individual_results.items())
|
||||||
|
return f"{error_msg}\nIndividual signals were: {signals}"
|
||||||
|
|
||||||
|
def _build_analyses_text(self, results: Dict[str, Dict[str, str]]) -> str:
|
||||||
|
"""Format individual results into a text block for the LLM prompt."""
|
||||||
|
parts = []
|
||||||
|
for ticker, result in results.items():
|
||||||
|
parts.append(
|
||||||
|
f"--- {ticker} ---\n"
|
||||||
|
f"Rating: {result['signal']}\n"
|
||||||
|
f"Full Analysis:\n{result['final_trade_decision']}"
|
||||||
|
)
|
||||||
|
return "\n".join(parts)
|
||||||
|
|
||||||
|
def _get_system_prompt(self) -> str:
|
||||||
|
"""Return the system prompt for the portfolio comparison LLM call."""
|
||||||
|
return (
|
||||||
|
"You are a senior portfolio strategist. You have received individual "
|
||||||
|
"stock analyses for all positions in a portfolio. Your job is to compare "
|
||||||
|
"them relative to each other and provide a clear, actionable portfolio "
|
||||||
|
"recommendation.\n\n"
|
||||||
|
"For each stock, assign one of: KEEP, REDUCE, or EXIT.\n\n"
|
||||||
|
"Structure your response as:\n"
|
||||||
|
"1. A ranked summary table (best to worst) with ticker, action, and "
|
||||||
|
"one-line rationale.\n"
|
||||||
|
"2. A brief portfolio-level commentary covering overall risk exposure, "
|
||||||
|
"sector concentration, and any suggested rebalancing.\n\n"
|
||||||
|
"Be direct and concise. This is for an experienced investor."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _log_portfolio(
|
||||||
|
self,
|
||||||
|
trade_date: str,
|
||||||
|
tickers: List[str],
|
||||||
|
individual_results: Dict[str, Dict[str, str]],
|
||||||
|
portfolio_summary: str,
|
||||||
|
) -> None:
|
||||||
|
"""Log the portfolio analysis results to a JSON file."""
|
||||||
|
directory = Path(self.config.get("portfolio_log_dir", "eval_results/portfolio/"))
|
||||||
|
directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
log_data = {
|
||||||
|
"trade_date": trade_date,
|
||||||
|
"tickers": tickers,
|
||||||
|
"individual_results": individual_results,
|
||||||
|
"portfolio_summary": portfolio_summary,
|
||||||
|
}
|
||||||
|
|
||||||
|
log_file = directory / f"portfolio_analysis_{re.sub(r'[^\w.-]', '_', trade_date)}.json"
|
||||||
|
with log_file.open("w", encoding="utf-8") as f:
|
||||||
|
json.dump(log_data, f, indent=4)
|
||||||
|
|
@ -38,6 +38,7 @@ from .setup import GraphSetup
|
||||||
from .propagation import Propagator
|
from .propagation import Propagator
|
||||||
from .reflection import Reflector
|
from .reflection import Reflector
|
||||||
from .signal_processing import SignalProcessor
|
from .signal_processing import SignalProcessor
|
||||||
|
from .portfolio_analysis import PortfolioAnalyzer
|
||||||
|
|
||||||
|
|
||||||
class TradingAgentsGraph:
|
class TradingAgentsGraph:
|
||||||
|
|
@ -124,6 +125,7 @@ class TradingAgentsGraph:
|
||||||
self.propagator = Propagator()
|
self.propagator = Propagator()
|
||||||
self.reflector = Reflector(self.quick_thinking_llm)
|
self.reflector = Reflector(self.quick_thinking_llm)
|
||||||
self.signal_processor = SignalProcessor(self.quick_thinking_llm)
|
self.signal_processor = SignalProcessor(self.quick_thinking_llm)
|
||||||
|
self.portfolio_analyzer = PortfolioAnalyzer(self.deep_thinking_llm, self.config)
|
||||||
|
|
||||||
# State tracking
|
# State tracking
|
||||||
self.curr_state = None
|
self.curr_state = None
|
||||||
|
|
@ -266,6 +268,26 @@ class TradingAgentsGraph:
|
||||||
with open(log_path, "w", encoding="utf-8") as f:
|
with open(log_path, "w", encoding="utf-8") as f:
|
||||||
json.dump(self.log_states_dict[str(trade_date)], f, indent=4)
|
json.dump(self.log_states_dict[str(trade_date)], f, indent=4)
|
||||||
|
|
||||||
|
def propagate_portfolio(
|
||||||
|
self, tickers: List[str], trade_date: str
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Run analysis on multiple stocks and produce a comparative portfolio summary.
|
||||||
|
|
||||||
|
Delegates to PortfolioAnalyzer.analyze — see that class for full details.
|
||||||
|
|
||||||
|
This method preserves the instance's ticker and curr_state attributes,
|
||||||
|
restoring them after the portfolio analysis is complete.
|
||||||
|
"""
|
||||||
|
original_ticker = self.ticker
|
||||||
|
original_curr_state = self.curr_state
|
||||||
|
try:
|
||||||
|
return self.portfolio_analyzer.analyze(
|
||||||
|
tickers, trade_date, self.propagate, debug=self.debug
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
self.ticker = original_ticker
|
||||||
|
self.curr_state = original_curr_state
|
||||||
|
|
||||||
def reflect_and_remember(self, returns_losses):
|
def reflect_and_remember(self, returns_losses):
|
||||||
"""Reflect on decisions and update memory based on returns."""
|
"""Reflect on decisions and update memory based on returns."""
|
||||||
self.reflector.reflect_bull_researcher(
|
self.reflector.reflect_bull_researcher(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue