feat: include portfolio holdings in auto mode pipeline analysis (#104)

* Initial plan

* feat: include portfolio holdings in auto mode pipeline analysis

In run_auto (both AgentOS and CLI), Phase 2 now loads current portfolio
holdings and merges their tickers with scan candidates before running
the per-ticker pipeline. This ensures the portfolio manager has fresh
analysis for both new opportunities and existing positions.

Key changes:
- macro_bridge.py: add candidates_from_holdings() factory
- langgraph_engine.py run_auto: merge holding tickers with scan tickers
- cli/main.py auto: load holdings, create StockCandidates, pass to run_pipeline
- cli/main.py run_pipeline: accept optional holdings_candidates parameter
- 9 new unit tests covering holdings inclusion, dedup, and graceful fallback

Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/53065a07-d9f8-47be-9956-0eb4ee8c87da

* fix: normalize ticker case in dedup and clarify count display

Address code review feedback:
- Use .upper() for case-insensitive ticker comparison in run_pipeline
- Display accurate filtered scan count instead of raw candidate count

Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/53065a07-d9f8-47be-9956-0eb4ee8c87da

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
This commit is contained in:
Copilot 2026-03-24 18:35:53 +01:00 committed by GitHub
parent cd655cdae5
commit fdf54ae279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 330 additions and 6 deletions

View File

@ -492,16 +492,53 @@ class LangGraphEngine:
async for evt in self.run_scan(f"{run_id}_scan", {"date": date}):
yield evt
# Phase 2: Pipeline analysis — get tickers from saved scan report
# Phase 2: Pipeline analysis — get tickers from scan report + portfolio holdings
yield self._system_log("Phase 2/3: Loading stocks from scan report…")
scan_data = store.load_scan(date)
tickers = self._extract_tickers_from_scan_data(scan_data)
scan_tickers = self._extract_tickers_from_scan_data(scan_data)
# Also include tickers from current portfolio holdings so the PM agent
# has fresh analysis for existing positions (hold/sell/add decisions).
portfolio_id = params.get("portfolio_id", "main_portfolio")
holding_tickers: list[str] = []
try:
from tradingagents.portfolio.repository import PortfolioRepository
_repo = PortfolioRepository()
_, holdings = _repo.get_portfolio_with_holdings(portfolio_id)
holding_tickers = [h.ticker.upper() for h in holdings]
except Exception as exc:
logger.warning("run_auto: could not load holdings for pipeline: %s", exc)
# Merge & deduplicate (scan candidates first, then holdings-only tickers)
seen: set[str] = set()
tickers: list[str] = []
for t in scan_tickers:
up = t.upper()
if up not in seen:
seen.add(up)
tickers.append(up)
holdings_only: list[str] = []
for t in holding_tickers:
if t not in seen:
seen.add(t)
tickers.append(t)
holdings_only.append(t)
if scan_tickers:
yield self._system_log(
f"Phase 2/3: {len(scan_tickers)} ticker(s) from scan report"
)
if holdings_only:
yield self._system_log(
f"Phase 2/3: {len(holdings_only)} additional ticker(s) from portfolio holdings: "
+ ", ".join(holdings_only)
)
if not tickers:
yield self._system_log(
"Warning: no stocks found in scan summary — ensure the scan completed "
"successfully and produced a 'stocks_to_investigate' list. "
"Skipping pipeline phase."
"Warning: no stocks found in scan summary and no portfolio holdings — "
"ensure the scan completed successfully and produced a "
"'stocks_to_investigate' list. Skipping pipeline phase."
)
else:
max_concurrent = int(self.config.get("max_concurrent_pipelines", 2))

View File

@ -1500,6 +1500,7 @@ def run_pipeline(
ticker_filter_list: Optional[list[str]] = None,
analysis_date_opt: Optional[str] = None,
dry_run_opt: Optional[bool] = None,
holdings_candidates: Optional[list] = None,
):
"""Full pipeline: scan -> filter -> per-ticker deep dive."""
import asyncio
@ -1559,9 +1560,26 @@ def run_pipeline(
# Parse macro output
macro_context, all_candidates = parse_macro_output(macro_path)
candidates = filter_candidates(all_candidates, min_conviction, ticker_filter)
num_scan_candidates = len(candidates)
# Append portfolio-holding candidates that aren't already in the scan list
num_holdings_added = 0
if holdings_candidates:
scan_tickers = {c.ticker.upper() for c in candidates}
extra = [h for h in holdings_candidates if h.ticker.upper() not in scan_tickers]
if extra:
console.print(
f"[cyan]Adding {len(extra)} ticker(s) from portfolio holdings: "
+ ", ".join(c.ticker for c in extra)
+ "[/cyan]"
)
candidates = list(candidates) + extra
num_holdings_added = len(extra)
console.print(
f"\n[cyan]Candidates: {len(candidates)} of {len(all_candidates)} stocks passed filter[/cyan]"
f"\n[cyan]Candidates: {len(candidates)} total "
f"({num_scan_candidates} from scan, "
f"{num_holdings_added} from holdings)[/cyan]"
)
table = Table(title="Selected Stocks", box=box.ROUNDED)
@ -1881,6 +1899,28 @@ def auto(
console.print("\n[bold magenta]--- Step 1: Market Scan ---[/bold magenta]")
run_scan(date=date)
# Load portfolio holdings so existing positions also get pipeline analysis.
# This gives the portfolio manager fresh data for hold/sell/add decisions.
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
extra_candidates: list = []
try:
from tradingagents.portfolio.repository import PortfolioRepository
repo = PortfolioRepository()
_, holdings = repo.get_portfolio_with_holdings(portfolio_id)
if holdings:
extra_candidates = candidates_from_holdings(holdings)
console.print(
f"[cyan]Portfolio '{portfolio_id}' has {len(holdings)} holding(s) — "
f"{len(extra_candidates)} will be added to pipeline analysis[/cyan]"
)
except Exception as e:
console.print(
f"[yellow]Could not load portfolio holdings: {e}"
f"proceeding with scan candidates only[/yellow]"
)
console.print("\n[bold magenta]--- Step 2: Per-Ticker Pipeline ---[/bold magenta]")
macro_path = get_market_dir(date) / "scan_summary.json"
run_pipeline(
@ -1889,6 +1929,7 @@ def auto(
ticker_filter_list=None,
analysis_date_opt=date,
dry_run_opt=False,
holdings_candidates=extra_candidates or None,
)
console.print("\n[bold magenta]--- Step 3: Portfolio Manager ---[/bold magenta]")

View File

@ -947,6 +947,145 @@ class TestRunAutoTickerSource(unittest.TestCase):
f"Expected a warning log about AAPL failure. Got: {log_messages}",
)
def test_run_auto_includes_holdings_tickers_in_pipeline(self):
"""run_auto should also run pipeline for portfolio holdings not in scan report."""
scan_data = {"stocks_to_investigate": ["AAPL"]}
pipeline_calls = []
engine = LangGraphEngine()
async def fake_run_pipeline(run_id, params):
pipeline_calls.append(params.get("ticker"))
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
# Create mock holdings
mock_holding = MagicMock()
mock_holding.ticker = "MSFT"
mock_repo = MagicMock()
mock_repo.get_portfolio_with_holdings.return_value = (MagicMock(), [mock_holding])
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \
patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
# Both scan ticker and holdings ticker should be analyzed
self.assertIn("AAPL", pipeline_calls)
self.assertIn("MSFT", pipeline_calls)
def test_run_auto_deduplicates_scan_and_holdings_tickers(self):
"""A ticker appearing in both scan and holdings should only run once."""
scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]}
pipeline_calls = []
engine = LangGraphEngine()
async def fake_run_pipeline(run_id, params):
pipeline_calls.append(params.get("ticker"))
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
# Holding that overlaps with scan candidate
mock_holding = MagicMock()
mock_holding.ticker = "AAPL"
mock_repo = MagicMock()
mock_repo.get_portfolio_with_holdings.return_value = (MagicMock(), [mock_holding])
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \
patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
# AAPL should appear only once (not duplicated)
self.assertEqual(pipeline_calls.count("AAPL"), 1)
self.assertIn("TSLA", pipeline_calls)
def test_run_auto_proceeds_when_holdings_load_fails(self):
"""Pipeline should still run scan tickers even if holdings fail to load."""
scan_data = {"stocks_to_investigate": ["AAPL"]}
pipeline_calls = []
engine = LangGraphEngine()
async def fake_run_pipeline(run_id, params):
pipeline_calls.append(params.get("ticker"))
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
mock_repo = MagicMock()
mock_repo.get_portfolio_with_holdings.side_effect = RuntimeError("DB unavailable")
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \
patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
# Should still run pipeline for scan tickers despite holdings failure
self.assertIn("AAPL", pipeline_calls)
# ---------------------------------------------------------------------------
# TestExtractTickersFromScanData

View File

@ -212,3 +212,64 @@ class TestReportRendering:
assert (output_dir / "summary.md").exists()
assert (output_dir / "results.json").exists()
assert (output_dir / "NVDA" / "2026-03-17_deep_dive.md").exists()
class TestCandidatesFromHoldings:
"""Tests for candidates_from_holdings — portfolio holdings → StockCandidate."""
def _make_holding(self, ticker, sector=None, industry=None):
"""Minimal holding-like object with .ticker, .sector, .industry."""
from types import SimpleNamespace
return SimpleNamespace(ticker=ticker, sector=sector, industry=industry)
def test_basic_conversion(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
holdings = [self._make_holding("AAPL", sector="Technology")]
result = candidates_from_holdings(holdings)
assert len(result) == 1
assert result[0].ticker == "AAPL"
assert result[0].thesis_angle == "portfolio_holding"
assert result[0].conviction == "medium"
assert result[0].sector == "Technology"
def test_skips_existing_tickers(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
holdings = [
self._make_holding("AAPL"),
self._make_holding("TSLA"),
]
result = candidates_from_holdings(holdings, existing_tickers={"AAPL"})
assert len(result) == 1
assert result[0].ticker == "TSLA"
def test_case_insensitive_dedup(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
holdings = [self._make_holding("aapl")]
result = candidates_from_holdings(holdings, existing_tickers={"AAPL"})
assert len(result) == 0
def test_empty_holdings(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
result = candidates_from_holdings([])
assert result == []
def test_deduplicates_within_holdings(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
holdings = [
self._make_holding("AAPL"),
self._make_holding("AAPL"),
]
result = candidates_from_holdings(holdings)
assert len(result) == 1
def test_missing_sector_defaults_to_empty(self):
from tradingagents.pipeline.macro_bridge import candidates_from_holdings
holdings = [self._make_holding("TSLA")]
result = candidates_from_holdings(holdings)
assert result[0].sector == ""

View File

@ -138,6 +138,52 @@ def _match_theme(sector: str, themes: list[dict]) -> str:
return themes[0].get("theme", "") if themes else ""
# ─── Holdings helpers ─────────────────────────────────────────────────────────
def candidates_from_holdings(
holdings: list,
existing_tickers: set[str] | None = None,
) -> list[StockCandidate]:
"""Create StockCandidate objects for portfolio holdings not already in candidates.
Holdings are assigned ``thesis_angle='portfolio_holding'`` and
``conviction='medium'`` so the pipeline treats them with equal priority
while the PM agent can distinguish their source.
Args:
holdings: List of Holding objects (must have ``.ticker`` and
optionally ``.sector`` / ``.industry``).
existing_tickers: Tickers already present in the scan candidate list
(uppercase). Holdings matching these are skipped to avoid
duplicate pipeline runs.
Returns:
List of StockCandidate for holdings that aren't already candidates.
"""
existing = {t.upper() for t in (existing_tickers or set())}
result: list[StockCandidate] = []
for h in holdings:
ticker = h.ticker.upper()
if ticker in existing:
continue
existing.add(ticker)
result.append(
StockCandidate(
ticker=ticker,
name=ticker,
sector=getattr(h, "sector", None) or "",
rationale="Existing portfolio holding — re-analysis for portfolio review.",
thesis_angle="portfolio_holding",
conviction="medium",
key_catalysts=[],
risks=[],
macro_theme="",
)
)
return result
# ─── Core pipeline ────────────────────────────────────────────────────────────