From cd655cdae5aff75eb53b6d4fde15055e38dea375 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Tue, 24 Mar 2026 18:08:06 +0100 Subject: [PATCH] feat: concurrent per-ticker pipelines in auto mode, controlled by TRADINGAGENTS_MAX_CONCURRENT_PIPELINES (#103) * Initial plan * feat: concurrent per-ticker pipelines in auto mode, configurable via TRADINGAGENTS_MAX_CONCURRENT_PIPELINES Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com> Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/464f2038-3a60-4d86-9fe8-4e1cc6943174 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com> Co-authored-by: ahmet guzererler --- .env.example | 6 + agent_os/backend/services/langgraph_engine.py | 59 +++++-- cli/main.py | 4 +- tests/unit/test_langgraph_engine_run_modes.py | 150 +++++++++++++++++- tradingagents/default_config.py | 4 + 5 files changed, 207 insertions(+), 16 deletions(-) diff --git a/.env.example b/.env.example index 2636569e..0cb52e02 100644 --- a/.env.example +++ b/.env.example @@ -61,6 +61,12 @@ FINNHUB_API_KEY= # TRADINGAGENTS_MAX_RISK_DISCUSS_ROUNDS=2 # risk analyst discussion rounds (1–5) # TRADINGAGENTS_MAX_RECUR_LIMIT=100 # LangGraph recursion limit +# ── Concurrency settings ────────────────────────────────────────────── +# Number of per-ticker analysis pipelines that run in parallel during +# 'auto' mode (CLI: `pipeline` command; AgentOS: auto run type). +# Raise this if your LLM & data-vendor API plans support higher call rates. +# TRADINGAGENTS_MAX_CONCURRENT_PIPELINES=2 + # ── Google NotebookLM sync (optional) ──────────────────────────────── # Notebook ID for daily digest upload via the nlm CLI tool # NOTEBOOKLM_ID= diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 901f6ead..46e35c45 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -504,16 +504,57 @@ class LangGraphEngine: "Skipping pipeline phase." ) else: - for ticker in tickers: - if not force and store.load_analysis(date, ticker): - yield self._system_log(f"Phase 2: Analysis for {ticker} on {date} already exists, skipping.") - continue + max_concurrent = int(self.config.get("max_concurrent_pipelines", 2)) + yield self._system_log( + f"Phase 2/3: Queuing {len(tickers)} ticker(s) " + f"(max {max_concurrent} concurrent)…" + ) - yield self._system_log(f"Phase 2/3: Running analysis pipeline for {ticker}…") - async for evt in self.run_pipeline( - f"{run_id}_pipeline_{ticker}", {"ticker": ticker, "date": date} - ): - yield evt + # Run all tickers concurrently, bounded by a semaphore. + # Events from all pipelines are funnelled through a shared queue + # so this async generator can yield them as they arrive. + _sentinel = object() + pipeline_queue: asyncio.Queue = asyncio.Queue() + semaphore = asyncio.Semaphore(max_concurrent) + + async def _run_one_ticker(ticker: str) -> None: + async with semaphore: + if not force and store.load_analysis(date, ticker): + await pipeline_queue.put( + self._system_log( + f"Phase 2: Analysis for {ticker} on {date} already exists, skipping." + ) + ) + return + await pipeline_queue.put( + self._system_log(f"Phase 2/3: Running analysis pipeline for {ticker}…") + ) + try: + async for evt in self.run_pipeline( + f"{run_id}_pipeline_{ticker}", {"ticker": ticker, "date": date} + ): + await pipeline_queue.put(evt) + except Exception as exc: + logger.exception( + "Pipeline failed ticker=%s run=%s", ticker, run_id + ) + await pipeline_queue.put( + self._system_log( + f"Warning: pipeline for {ticker} failed: {exc}" + ) + ) + + async def _pipeline_producer() -> None: + await asyncio.gather(*[_run_one_ticker(t) for t in tickers]) + await pipeline_queue.put(_sentinel) + + asyncio.create_task(_pipeline_producer()) + + while True: + item = await pipeline_queue.get() + if item is _sentinel: + break + yield item # Phase 3: Portfolio management yield self._system_log("Phase 3/3: Running portfolio manager…") diff --git a/cli/main.py b/cli/main.py index 06dd94d6..8ff48476 100644 --- a/cli/main.py +++ b/cli/main.py @@ -1583,13 +1583,14 @@ def run_pipeline( config = DEFAULT_CONFIG.copy() output_dir = get_daily_dir(analysis_date) + max_concurrent = int(config.get("max_concurrent_pipelines", 2)) run_logger = RunLogger() set_run_logger(run_logger) console.print( f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]" - f" [dim](up to 2 concurrent)[/dim]\n" + f" [dim](up to {max_concurrent} concurrent)[/dim]\n" ) for c in candidates: console.print( @@ -1631,6 +1632,7 @@ def run_pipeline( results = asyncio.run( run_all_tickers( candidates, macro_context, config, analysis_date, + max_concurrent=max_concurrent, on_ticker_done=on_done, ) ) diff --git a/tests/unit/test_langgraph_engine_run_modes.py b/tests/unit/test_langgraph_engine_run_modes.py index 51fad2e8..ee24d7c8 100644 --- a/tests/unit/test_langgraph_engine_run_modes.py +++ b/tests/unit/test_langgraph_engine_run_modes.py @@ -600,6 +600,19 @@ class TestRunAutoTickerSource(unittest.TestCase): mock_pg.graph = mock_graph return mock_pg + def _make_mock_store(self, scan_data=None): + """Return a ReportStore mock where all 'already exists' checks return None + (falsy) by default so that tests do not accidentally hit the skip branches. + Pass scan_data to make load_scan() return it (simulating a completed scan). + """ + mock_store = MagicMock() + mock_store.load_scan.return_value = scan_data + # By default: no existing analysis / execution / decision + mock_store.load_analysis.return_value = None + mock_store.load_execution_result.return_value = None + mock_store.load_pm_decision.return_value = None + return mock_store + def test_run_auto_gets_tickers_from_scan_report(self): """run_auto should run pipeline for AAPL and TSLA from the scan report.""" scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]} @@ -675,9 +688,7 @@ class TestRunAutoTickerSource(unittest.TestCase): fake_daily.exists.return_value = False mock_gdd.return_value = fake_daily - mock_store = MagicMock() - mock_store.load_scan.return_value = scan_data - mock_rs_cls.return_value = mock_store + mock_rs_cls.return_value = self._make_mock_store(scan_data) asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01", "ticker": "GOOG"}))) @@ -795,9 +806,7 @@ class TestRunAutoTickerSource(unittest.TestCase): fake_mdir.mkdir = MagicMock() mock_gmd.return_value = fake_mdir - mock_store = MagicMock() - mock_store.load_scan.return_value = {} - mock_rs_cls.return_value = mock_store + mock_rs_cls.return_value = self._make_mock_store(scan_data={}) events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) @@ -809,6 +818,135 @@ class TestRunAutoTickerSource(unittest.TestCase): self.assertTrue(any("Phase 3/3" in m or "3/3" in m for m in log_messages), f"Expected Phase 3/3 log. Got: {log_messages}") + def test_run_auto_concurrent_all_tickers_processed(self): + """All tickers should be processed even when run concurrently (max_concurrent=3).""" + scan_data = {"stocks_to_investigate": ["AAPL", "TSLA", "NVDA", "MSFT"]} + pipeline_calls = [] + + engine = LangGraphEngine() + engine.config["max_concurrent_pipelines"] = 3 + + async def fake_run_pipeline(run_id, params): + pipeline_calls.append(params.get("ticker")) + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + self.assertEqual(sorted(pipeline_calls), ["AAPL", "MSFT", "NVDA", "TSLA"]) + + def test_run_auto_concurrency_log_mentions_max_concurrent(self): + """Phase 2 log should mention the configured max_concurrent value.""" + scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]} + + engine = LangGraphEngine() + engine.config["max_concurrent_pipelines"] = 5 + + async def fake_run_pipeline(run_id, params): + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + log_messages = [e.get("message", "") for e in events if e.get("type") == "log"] + self.assertTrue( + any("5" in m for m in log_messages), + f"Expected a log mentioning max_concurrent=5. Got: {log_messages}", + ) + + def test_run_auto_pipeline_failure_does_not_abort_other_tickers(self): + """If one ticker's pipeline raises, the other ticker should still complete.""" + scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]} + completed = [] + + engine = LangGraphEngine() + engine.config["max_concurrent_pipelines"] = 2 + + async def fake_run_pipeline(run_id, params): + ticker = params.get("ticker") + if ticker == "AAPL": + raise RuntimeError("Simulated AAPL failure") + completed.append(ticker) + for _ in (): + yield {} + + engine.run_pipeline = fake_run_pipeline + + with patch("agent_os.backend.services.langgraph_engine.ScannerGraph", + return_value=self._make_noop_scanner()), \ + patch("agent_os.backend.services.langgraph_engine.PortfolioGraph", + return_value=self._make_noop_portfolio_graph()), \ + patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \ + patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \ + patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \ + patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \ + patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \ + patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data): + fake_mdir = MagicMock(spec=Path) + fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path)) + fake_mdir.mkdir = MagicMock() + mock_gmd.return_value = fake_mdir + fake_daily = MagicMock(spec=Path) + fake_daily.exists.return_value = False + mock_gdd.return_value = fake_daily + + mock_rs_cls.return_value = self._make_mock_store(scan_data) + + events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) + + # TSLA should still complete despite AAPL failure + self.assertIn("TSLA", completed) + # A warning log should mention the failure + log_messages = [e.get("message", "") for e in events if e.get("type") == "log"] + self.assertTrue( + any("AAPL" in m and ("failed" in m or "Warning" in m) for m in log_messages), + f"Expected a warning log about AAPL failure. Got: {log_messages}", + ) + # --------------------------------------------------------------------------- # TestExtractTickersFromScanData diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index f3533884..27d67135 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -85,6 +85,10 @@ DEFAULT_CONFIG = { "max_debate_rounds": _env_int("MAX_DEBATE_ROUNDS", 2), "max_risk_discuss_rounds": _env_int("MAX_RISK_DISCUSS_ROUNDS", 2), "max_recur_limit": _env_int("MAX_RECUR_LIMIT", 100), + # Concurrency settings + # Controls how many per-ticker analysis pipelines run in parallel during + # 'auto' mode (CLI and AgentOS). Set higher if your API plan supports it. + "max_concurrent_pipelines": _env_int("MAX_CONCURRENT_PIPELINES", 2), # Data vendor configuration # Category-level configuration (default for all tools in category) "data_vendors": {