diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 46e35c45..04901a9b 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -492,16 +492,53 @@ class LangGraphEngine: async for evt in self.run_scan(f"{run_id}_scan", {"date": date}): yield evt - # Phase 2: Pipeline analysis — get tickers from saved scan report + # Phase 2: Pipeline analysis — get tickers from scan report + portfolio holdings yield self._system_log("Phase 2/3: Loading stocks from scan report…") scan_data = store.load_scan(date) - tickers = self._extract_tickers_from_scan_data(scan_data) + scan_tickers = self._extract_tickers_from_scan_data(scan_data) + + # Also include tickers from current portfolio holdings so the PM agent + # has fresh analysis for existing positions (hold/sell/add decisions). + portfolio_id = params.get("portfolio_id", "main_portfolio") + holding_tickers: list[str] = [] + try: + from tradingagents.portfolio.repository import PortfolioRepository + _repo = PortfolioRepository() + _, holdings = _repo.get_portfolio_with_holdings(portfolio_id) + holding_tickers = [h.ticker.upper() for h in holdings] + except Exception as exc: + logger.warning("run_auto: could not load holdings for pipeline: %s", exc) + + # Merge & deduplicate (scan candidates first, then holdings-only tickers) + seen: set[str] = set() + tickers: list[str] = [] + for t in scan_tickers: + up = t.upper() + if up not in seen: + seen.add(up) + tickers.append(up) + holdings_only: list[str] = [] + for t in holding_tickers: + if t not in seen: + seen.add(t) + tickers.append(t) + holdings_only.append(t) + + if scan_tickers: + yield self._system_log( + f"Phase 2/3: {len(scan_tickers)} ticker(s) from scan report" + ) + if holdings_only: + yield self._system_log( + f"Phase 2/3: {len(holdings_only)} additional ticker(s) from portfolio holdings: " + + ", ".join(holdings_only) + ) if not tickers: yield self._system_log( - "Warning: no stocks found in scan summary — ensure the scan completed " - "successfully and produced a 'stocks_to_investigate' list. " - "Skipping pipeline phase." + "Warning: no stocks found in scan summary and no portfolio holdings — " + "ensure the scan completed successfully and produced a " + "'stocks_to_investigate' list. Skipping pipeline phase." ) else: max_concurrent = int(self.config.get("max_concurrent_pipelines", 2)) diff --git a/cli/main.py b/cli/main.py index 8ff48476..13197ce5 100644 --- a/cli/main.py +++ b/cli/main.py @@ -1500,6 +1500,7 @@ def run_pipeline( ticker_filter_list: Optional[list[str]] = None, analysis_date_opt: Optional[str] = None, dry_run_opt: Optional[bool] = None, + holdings_candidates: Optional[list] = None, ): """Full pipeline: scan -> filter -> per-ticker deep dive.""" import asyncio @@ -1559,9 +1560,26 @@ def run_pipeline( # Parse macro output macro_context, all_candidates = parse_macro_output(macro_path) candidates = filter_candidates(all_candidates, min_conviction, ticker_filter) + num_scan_candidates = len(candidates) + + # Append portfolio-holding candidates that aren't already in the scan list + num_holdings_added = 0 + if holdings_candidates: + scan_tickers = {c.ticker.upper() for c in candidates} + extra = [h for h in holdings_candidates if h.ticker.upper() not in scan_tickers] + if extra: + console.print( + f"[cyan]Adding {len(extra)} ticker(s) from portfolio holdings: " + + ", ".join(c.ticker for c in extra) + + "[/cyan]" + ) + candidates = list(candidates) + extra + num_holdings_added = len(extra) console.print( - f"\n[cyan]Candidates: {len(candidates)} of {len(all_candidates)} stocks passed filter[/cyan]" + f"\n[cyan]Candidates: {len(candidates)} total " + f"({num_scan_candidates} from scan, " + f"{num_holdings_added} from holdings)[/cyan]" ) table = Table(title="Selected Stocks", box=box.ROUNDED) @@ -1881,6 +1899,28 @@ def auto( console.print("\n[bold magenta]--- Step 1: Market Scan ---[/bold magenta]") run_scan(date=date) + # Load portfolio holdings so existing positions also get pipeline analysis. + # This gives the portfolio manager fresh data for hold/sell/add decisions. + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + extra_candidates: list = [] + try: + from tradingagents.portfolio.repository import PortfolioRepository + + repo = PortfolioRepository() + _, holdings = repo.get_portfolio_with_holdings(portfolio_id) + if holdings: + extra_candidates = candidates_from_holdings(holdings) + console.print( + f"[cyan]Portfolio '{portfolio_id}' has {len(holdings)} holding(s) — " + f"{len(extra_candidates)} will be added to pipeline analysis[/cyan]" + ) + except Exception as e: + console.print( + f"[yellow]Could not load portfolio holdings: {e} — " + f"proceeding with scan candidates only[/yellow]" + ) + console.print("\n[bold magenta]--- Step 2: Per-Ticker Pipeline ---[/bold magenta]") macro_path = get_market_dir(date) / "scan_summary.json" run_pipeline( @@ -1889,6 +1929,7 @@ def auto( ticker_filter_list=None, analysis_date_opt=date, dry_run_opt=False, + holdings_candidates=extra_candidates or None, ) console.print("\n[bold magenta]--- Step 3: Portfolio Manager ---[/bold magenta]") diff --git a/tests/unit/test_langgraph_engine_run_modes.py b/tests/unit/test_langgraph_engine_run_modes.py index ee24d7c8..9eb63b33 100644 --- a/tests/unit/test_langgraph_engine_run_modes.py +++ b/tests/unit/test_langgraph_engine_run_modes.py @@ -947,6 +947,145 @@ class TestRunAutoTickerSource(unittest.TestCase): f"Expected a warning log about AAPL failure. Got: {log_messages}", ) + def test_run_auto_includes_holdings_tickers_in_pipeline(self): + """run_auto should also run pipeline for portfolio holdings not in scan report.""" + scan_data = {"stocks_to_investigate": ["AAPL"]} + pipeline_calls = [] + + engine = LangGraphEngine() + + async def fake_run_pipeline(run_id, params): + pipeline_calls.append(params.get("ticker")) + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + # Create mock holdings + mock_holding = MagicMock() + mock_holding.ticker = "MSFT" + + mock_repo = MagicMock() + mock_repo.get_portfolio_with_holdings.return_value = (MagicMock(), [mock_holding]) + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \ + patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + # Both scan ticker and holdings ticker should be analyzed + self.assertIn("AAPL", pipeline_calls) + self.assertIn("MSFT", pipeline_calls) + + def test_run_auto_deduplicates_scan_and_holdings_tickers(self): + """A ticker appearing in both scan and holdings should only run once.""" + scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]} + pipeline_calls = [] + + engine = LangGraphEngine() + + async def fake_run_pipeline(run_id, params): + pipeline_calls.append(params.get("ticker")) + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + # Holding that overlaps with scan candidate + mock_holding = MagicMock() + mock_holding.ticker = "AAPL" + + mock_repo = MagicMock() + mock_repo.get_portfolio_with_holdings.return_value = (MagicMock(), [mock_holding]) + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \ + patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + # AAPL should appear only once (not duplicated) + self.assertEqual(pipeline_calls.count("AAPL"), 1) + self.assertIn("TSLA", pipeline_calls) + + def test_run_auto_proceeds_when_holdings_load_fails(self): + """Pipeline should still run scan tickers even if holdings fail to load.""" + scan_data = {"stocks_to_investigate": ["AAPL"]} + pipeline_calls = [] + + engine = LangGraphEngine() + + async def fake_run_pipeline(run_id, params): + pipeline_calls.append(params.get("ticker")) + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + mock_repo = MagicMock() + mock_repo.get_portfolio_with_holdings.side_effect = RuntimeError("DB unavailable") + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data), \ + patch("tradingagents.portfolio.repository.PortfolioRepository", return_value=mock_repo): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + # Should still run pipeline for scan tickers despite holdings failure + self.assertIn("AAPL", pipeline_calls) + # --------------------------------------------------------------------------- # TestExtractTickersFromScanData diff --git a/tests/unit/test_macro_bridge.py b/tests/unit/test_macro_bridge.py index 0c8650e3..7e6c3b0d 100644 --- a/tests/unit/test_macro_bridge.py +++ b/tests/unit/test_macro_bridge.py @@ -212,3 +212,64 @@ class TestReportRendering: assert (output_dir / "summary.md").exists() assert (output_dir / "results.json").exists() assert (output_dir / "NVDA" / "2026-03-17_deep_dive.md").exists() + + +class TestCandidatesFromHoldings: + """Tests for candidates_from_holdings — portfolio holdings → StockCandidate.""" + + def _make_holding(self, ticker, sector=None, industry=None): + """Minimal holding-like object with .ticker, .sector, .industry.""" + from types import SimpleNamespace + return SimpleNamespace(ticker=ticker, sector=sector, industry=industry) + + def test_basic_conversion(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + holdings = [self._make_holding("AAPL", sector="Technology")] + result = candidates_from_holdings(holdings) + assert len(result) == 1 + assert result[0].ticker == "AAPL" + assert result[0].thesis_angle == "portfolio_holding" + assert result[0].conviction == "medium" + assert result[0].sector == "Technology" + + def test_skips_existing_tickers(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + holdings = [ + self._make_holding("AAPL"), + self._make_holding("TSLA"), + ] + result = candidates_from_holdings(holdings, existing_tickers={"AAPL"}) + assert len(result) == 1 + assert result[0].ticker == "TSLA" + + def test_case_insensitive_dedup(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + holdings = [self._make_holding("aapl")] + result = candidates_from_holdings(holdings, existing_tickers={"AAPL"}) + assert len(result) == 0 + + def test_empty_holdings(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + result = candidates_from_holdings([]) + assert result == [] + + def test_deduplicates_within_holdings(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + holdings = [ + self._make_holding("AAPL"), + self._make_holding("AAPL"), + ] + result = candidates_from_holdings(holdings) + assert len(result) == 1 + + def test_missing_sector_defaults_to_empty(self): + from tradingagents.pipeline.macro_bridge import candidates_from_holdings + + holdings = [self._make_holding("TSLA")] + result = candidates_from_holdings(holdings) + assert result[0].sector == "" \ No newline at end of file diff --git a/tradingagents/pipeline/macro_bridge.py b/tradingagents/pipeline/macro_bridge.py index 637c1256..e3131b1a 100644 --- a/tradingagents/pipeline/macro_bridge.py +++ b/tradingagents/pipeline/macro_bridge.py @@ -138,6 +138,52 @@ def _match_theme(sector: str, themes: list[dict]) -> str: return themes[0].get("theme", "") if themes else "" +# ─── Holdings helpers ───────────────────────────────────────────────────────── + + +def candidates_from_holdings( + holdings: list, + existing_tickers: set[str] | None = None, +) -> list[StockCandidate]: + """Create StockCandidate objects for portfolio holdings not already in candidates. + + Holdings are assigned ``thesis_angle='portfolio_holding'`` and + ``conviction='medium'`` so the pipeline treats them with equal priority + while the PM agent can distinguish their source. + + Args: + holdings: List of Holding objects (must have ``.ticker`` and + optionally ``.sector`` / ``.industry``). + existing_tickers: Tickers already present in the scan candidate list + (uppercase). Holdings matching these are skipped to avoid + duplicate pipeline runs. + + Returns: + List of StockCandidate for holdings that aren't already candidates. + """ + existing = {t.upper() for t in (existing_tickers or set())} + result: list[StockCandidate] = [] + for h in holdings: + ticker = h.ticker.upper() + if ticker in existing: + continue + existing.add(ticker) + result.append( + StockCandidate( + ticker=ticker, + name=ticker, + sector=getattr(h, "sector", None) or "", + rationale="Existing portfolio holding — re-analysis for portfolio review.", + thesis_angle="portfolio_holding", + conviction="medium", + key_catalysts=[], + risks=[], + macro_theme="", + ) + ) + return result + + # ─── Core pipeline ────────────────────────────────────────────────────────────