feat: concurrent per-ticker pipelines in auto mode, controlled by TRADINGAGENTS_MAX_CONCURRENT_PIPELINES (#103)

* Initial plan

* feat: concurrent per-ticker pipelines in auto mode, configurable via TRADINGAGENTS_MAX_CONCURRENT_PIPELINES

Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/464f2038-3a60-4d86-9fe8-4e1cc6943174

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
Co-authored-by: ahmet guzererler <guzererler@gmail.com>
This commit is contained in:
Copilot 2026-03-24 18:08:06 +01:00 committed by GitHub
parent 9c148b208a
commit cd655cdae5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 207 additions and 16 deletions

View File

@ -61,6 +61,12 @@ FINNHUB_API_KEY=
# TRADINGAGENTS_MAX_RISK_DISCUSS_ROUNDS=2 # risk analyst discussion rounds (15) # TRADINGAGENTS_MAX_RISK_DISCUSS_ROUNDS=2 # risk analyst discussion rounds (15)
# TRADINGAGENTS_MAX_RECUR_LIMIT=100 # LangGraph recursion limit # TRADINGAGENTS_MAX_RECUR_LIMIT=100 # LangGraph recursion limit
# ── Concurrency settings ──────────────────────────────────────────────
# Number of per-ticker analysis pipelines that run in parallel during
# 'auto' mode (CLI: `pipeline` command; AgentOS: auto run type).
# Raise this if your LLM & data-vendor API plans support higher call rates.
# TRADINGAGENTS_MAX_CONCURRENT_PIPELINES=2
# ── Google NotebookLM sync (optional) ──────────────────────────────── # ── Google NotebookLM sync (optional) ────────────────────────────────
# Notebook ID for daily digest upload via the nlm CLI tool # Notebook ID for daily digest upload via the nlm CLI tool
# NOTEBOOKLM_ID= # NOTEBOOKLM_ID=

View File

@ -504,16 +504,57 @@ class LangGraphEngine:
"Skipping pipeline phase." "Skipping pipeline phase."
) )
else: else:
for ticker in tickers: max_concurrent = int(self.config.get("max_concurrent_pipelines", 2))
if not force and store.load_analysis(date, ticker): yield self._system_log(
yield self._system_log(f"Phase 2: Analysis for {ticker} on {date} already exists, skipping.") f"Phase 2/3: Queuing {len(tickers)} ticker(s) "
continue f"(max {max_concurrent} concurrent)…"
)
yield self._system_log(f"Phase 2/3: Running analysis pipeline for {ticker}") # Run all tickers concurrently, bounded by a semaphore.
async for evt in self.run_pipeline( # Events from all pipelines are funnelled through a shared queue
f"{run_id}_pipeline_{ticker}", {"ticker": ticker, "date": date} # so this async generator can yield them as they arrive.
): _sentinel = object()
yield evt pipeline_queue: asyncio.Queue = asyncio.Queue()
semaphore = asyncio.Semaphore(max_concurrent)
async def _run_one_ticker(ticker: str) -> None:
async with semaphore:
if not force and store.load_analysis(date, ticker):
await pipeline_queue.put(
self._system_log(
f"Phase 2: Analysis for {ticker} on {date} already exists, skipping."
)
)
return
await pipeline_queue.put(
self._system_log(f"Phase 2/3: Running analysis pipeline for {ticker}")
)
try:
async for evt in self.run_pipeline(
f"{run_id}_pipeline_{ticker}", {"ticker": ticker, "date": date}
):
await pipeline_queue.put(evt)
except Exception as exc:
logger.exception(
"Pipeline failed ticker=%s run=%s", ticker, run_id
)
await pipeline_queue.put(
self._system_log(
f"Warning: pipeline for {ticker} failed: {exc}"
)
)
async def _pipeline_producer() -> None:
await asyncio.gather(*[_run_one_ticker(t) for t in tickers])
await pipeline_queue.put(_sentinel)
asyncio.create_task(_pipeline_producer())
while True:
item = await pipeline_queue.get()
if item is _sentinel:
break
yield item
# Phase 3: Portfolio management # Phase 3: Portfolio management
yield self._system_log("Phase 3/3: Running portfolio manager…") yield self._system_log("Phase 3/3: Running portfolio manager…")

View File

@ -1583,13 +1583,14 @@ def run_pipeline(
config = DEFAULT_CONFIG.copy() config = DEFAULT_CONFIG.copy()
output_dir = get_daily_dir(analysis_date) output_dir = get_daily_dir(analysis_date)
max_concurrent = int(config.get("max_concurrent_pipelines", 2))
run_logger = RunLogger() run_logger = RunLogger()
set_run_logger(run_logger) set_run_logger(run_logger)
console.print( console.print(
f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]" f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]"
f" [dim](up to 2 concurrent)[/dim]\n" f" [dim](up to {max_concurrent} concurrent)[/dim]\n"
) )
for c in candidates: for c in candidates:
console.print( console.print(
@ -1631,6 +1632,7 @@ def run_pipeline(
results = asyncio.run( results = asyncio.run(
run_all_tickers( run_all_tickers(
candidates, macro_context, config, analysis_date, candidates, macro_context, config, analysis_date,
max_concurrent=max_concurrent,
on_ticker_done=on_done, on_ticker_done=on_done,
) )
) )

View File

@ -600,6 +600,19 @@ class TestRunAutoTickerSource(unittest.TestCase):
mock_pg.graph = mock_graph mock_pg.graph = mock_graph
return mock_pg return mock_pg
def _make_mock_store(self, scan_data=None):
"""Return a ReportStore mock where all 'already exists' checks return None
(falsy) by default so that tests do not accidentally hit the skip branches.
Pass scan_data to make load_scan() return it (simulating a completed scan).
"""
mock_store = MagicMock()
mock_store.load_scan.return_value = scan_data
# By default: no existing analysis / execution / decision
mock_store.load_analysis.return_value = None
mock_store.load_execution_result.return_value = None
mock_store.load_pm_decision.return_value = None
return mock_store
def test_run_auto_gets_tickers_from_scan_report(self): def test_run_auto_gets_tickers_from_scan_report(self):
"""run_auto should run pipeline for AAPL and TSLA from the scan report.""" """run_auto should run pipeline for AAPL and TSLA from the scan report."""
scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]} scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]}
@ -675,9 +688,7 @@ class TestRunAutoTickerSource(unittest.TestCase):
fake_daily.exists.return_value = False fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily mock_gdd.return_value = fake_daily
mock_store = MagicMock() mock_rs_cls.return_value = self._make_mock_store(scan_data)
mock_store.load_scan.return_value = scan_data
mock_rs_cls.return_value = mock_store
asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01", "ticker": "GOOG"}))) asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01", "ticker": "GOOG"})))
@ -795,9 +806,7 @@ class TestRunAutoTickerSource(unittest.TestCase):
fake_mdir.mkdir = MagicMock() fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir mock_gmd.return_value = fake_mdir
mock_store = MagicMock() mock_rs_cls.return_value = self._make_mock_store(scan_data={})
mock_store.load_scan.return_value = {}
mock_rs_cls.return_value = mock_store
events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"}))) events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
@ -809,6 +818,135 @@ class TestRunAutoTickerSource(unittest.TestCase):
self.assertTrue(any("Phase 3/3" in m or "3/3" in m for m in log_messages), self.assertTrue(any("Phase 3/3" in m or "3/3" in m for m in log_messages),
f"Expected Phase 3/3 log. Got: {log_messages}") f"Expected Phase 3/3 log. Got: {log_messages}")
def test_run_auto_concurrent_all_tickers_processed(self):
"""All tickers should be processed even when run concurrently (max_concurrent=3)."""
scan_data = {"stocks_to_investigate": ["AAPL", "TSLA", "NVDA", "MSFT"]}
pipeline_calls = []
engine = LangGraphEngine()
engine.config["max_concurrent_pipelines"] = 3
async def fake_run_pipeline(run_id, params):
pipeline_calls.append(params.get("ticker"))
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
self.assertEqual(sorted(pipeline_calls), ["AAPL", "MSFT", "NVDA", "TSLA"])
def test_run_auto_concurrency_log_mentions_max_concurrent(self):
"""Phase 2 log should mention the configured max_concurrent value."""
scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]}
engine = LangGraphEngine()
engine.config["max_concurrent_pipelines"] = 5
async def fake_run_pipeline(run_id, params):
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
log_messages = [e.get("message", "") for e in events if e.get("type") == "log"]
self.assertTrue(
any("5" in m for m in log_messages),
f"Expected a log mentioning max_concurrent=5. Got: {log_messages}",
)
def test_run_auto_pipeline_failure_does_not_abort_other_tickers(self):
"""If one ticker's pipeline raises, the other ticker should still complete."""
scan_data = {"stocks_to_investigate": ["AAPL", "TSLA"]}
completed = []
engine = LangGraphEngine()
engine.config["max_concurrent_pipelines"] = 2
async def fake_run_pipeline(run_id, params):
ticker = params.get("ticker")
if ticker == "AAPL":
raise RuntimeError("Simulated AAPL failure")
completed.append(ticker)
for _ in ():
yield {}
engine.run_pipeline = fake_run_pipeline
with patch("agent_os.backend.services.langgraph_engine.ScannerGraph",
return_value=self._make_noop_scanner()), \
patch("agent_os.backend.services.langgraph_engine.PortfolioGraph",
return_value=self._make_noop_portfolio_graph()), \
patch("agent_os.backend.services.langgraph_engine.get_market_dir") as mock_gmd, \
patch("agent_os.backend.services.langgraph_engine.get_ticker_dir"), \
patch("tradingagents.report_paths.get_daily_dir") as mock_gdd, \
patch("agent_os.backend.services.langgraph_engine.ReportStore") as mock_rs_cls, \
patch("agent_os.backend.services.langgraph_engine.append_to_digest"), \
patch("agent_os.backend.services.langgraph_engine.extract_json", return_value=scan_data):
fake_mdir = MagicMock(spec=Path)
fake_mdir.__truediv__ = MagicMock(return_value=MagicMock(spec=Path))
fake_mdir.mkdir = MagicMock()
mock_gmd.return_value = fake_mdir
fake_daily = MagicMock(spec=Path)
fake_daily.exists.return_value = False
mock_gdd.return_value = fake_daily
mock_rs_cls.return_value = self._make_mock_store(scan_data)
events = asyncio.run(_collect(engine.run_auto("auto1", {"date": "2026-01-01"})))
# TSLA should still complete despite AAPL failure
self.assertIn("TSLA", completed)
# A warning log should mention the failure
log_messages = [e.get("message", "") for e in events if e.get("type") == "log"]
self.assertTrue(
any("AAPL" in m and ("failed" in m or "Warning" in m) for m in log_messages),
f"Expected a warning log about AAPL failure. Got: {log_messages}",
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# TestExtractTickersFromScanData # TestExtractTickersFromScanData

View File

@ -85,6 +85,10 @@ DEFAULT_CONFIG = {
"max_debate_rounds": _env_int("MAX_DEBATE_ROUNDS", 2), "max_debate_rounds": _env_int("MAX_DEBATE_ROUNDS", 2),
"max_risk_discuss_rounds": _env_int("MAX_RISK_DISCUSS_ROUNDS", 2), "max_risk_discuss_rounds": _env_int("MAX_RISK_DISCUSS_ROUNDS", 2),
"max_recur_limit": _env_int("MAX_RECUR_LIMIT", 100), "max_recur_limit": _env_int("MAX_RECUR_LIMIT", 100),
# Concurrency settings
# Controls how many per-ticker analysis pipelines run in parallel during
# 'auto' mode (CLI and AgentOS). Set higher if your API plan supports it.
"max_concurrent_pipelines": _env_int("MAX_CONCURRENT_PIPELINES", 2),
# Data vendor configuration # Data vendor configuration
# Category-level configuration (default for all tools in category) # Category-level configuration (default for all tools in category)
"data_vendors": { "data_vendors": {