feat: add per-ticker progress logging to pipeline

Before this change, the pipeline showed a generic 'Analyzing...' spinner
for the entire multi-ticker run with no way to know which ticker was
processing or whether anything was actually working.

Changes:
- macro_bridge.py:
  - run_ticker_analysis: logs '▶ Starting', '✓ complete in Xs', '✗ FAILED'
    with elapsed time per ticker using logger.info/logger.error
  - run_all_tickers: replaced asyncio.gather (swallows all progress) with
    asyncio.as_completed + optional on_ticker_done(result, done, total)
    callback; uses asyncio.Semaphore for max_concurrent control
  - Added time and Callable imports

- cli/main.py run_pipeline:
  - Replaced Live(Spinner) with Rich Progress bar (spinner + bar + counter
    + elapsed time)
  - Prints '▷ Queued: TICKER' before analysis starts for each ticker
  - on_ticker_done callback prints '✓ TICKER (N/M, Xs elapsed) → decision'
    or '✗ TICKER failed ...' immediately as each ticker finishes
  - Prints total elapsed time when all tickers complete
This commit is contained in:
Ahmet Guzererler 2026-03-22 00:20:35 +01:00
parent eafdce3121
commit 9ddf489c28
2 changed files with 100 additions and 34 deletions

View File

@ -1580,18 +1580,63 @@ def run_pipeline(
console.print( console.print(
f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]" f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]"
f" [dim](up to 2 concurrent)[/dim]\n"
) )
for c in candidates:
console.print(
f" [dim]▷ Queued:[/dim] [bold cyan]{c.ticker}[/bold cyan]"
f" [dim]{c.sector} · {c.conviction.upper()} conviction[/dim]"
)
console.print()
import time as _time
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
pipeline_start = _time.monotonic()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
overall = progress.add_task("[bold]Pipeline progress[/bold]", total=len(candidates))
def on_done(result, done_count, total_count):
ticker_elapsed = _time.monotonic() - pipeline_start
if result.error:
console.print(
f" [red]✗ {result.ticker}[/red]"
f" [dim]failed ({ticker_elapsed:.0f}s elapsed) — {result.error[:80]}[/dim]"
)
else:
decision_preview = str(result.final_trade_decision)[:70].replace("\n", " ")
console.print(
f" [green]✓ {result.ticker}[/green]"
f" [dim]({done_count}/{total_count}, {ticker_elapsed:.0f}s elapsed)[/dim]"
f"{decision_preview}"
)
progress.advance(overall)
try: try:
with Live(
Spinner("dots", text="Analyzing..."), console=console, transient=True
):
results = asyncio.run( results = asyncio.run(
run_all_tickers(candidates, macro_context, config, analysis_date) run_all_tickers(
candidates, macro_context, config, analysis_date,
on_ticker_done=on_done,
)
) )
except Exception as e: except Exception as e:
console.print(f"[red]Pipeline failed: {e}[/red]") console.print(f"[red]Pipeline failed: {e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
elapsed_total = _time.monotonic() - pipeline_start
console.print(
f"\n[bold green]All {len(candidates)} ticker(s) finished in {elapsed_total:.0f}s[/bold green]\n"
)
save_results(results, macro_context, output_dir) save_results(results, macro_context, output_dir)
# Write observability log # Write observability log

View File

@ -5,12 +5,14 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from tradingagents.agents.utils.json_utils import extract_json from tradingagents.agents.utils.json_utils import extract_json
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Literal from typing import Callable, Literal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -172,15 +174,6 @@ def run_ticker_analysis(
NOTE: TradingAgentsGraph is synchronous call this from a thread pool NOTE: TradingAgentsGraph is synchronous call this from a thread pool
when running multiple tickers concurrently (see run_all_tickers). when running multiple tickers concurrently (see run_all_tickers).
Args:
candidate: The stock candidate to analyse.
macro_context: Macro context to embed in the result.
config: TradingAgents configuration dict.
analysis_date: Date string in YYYY-MM-DD format.
Returns:
TickerResult with all report fields populated, or error set on failure.
""" """
result = TickerResult( result = TickerResult(
ticker=candidate.ticker, ticker=candidate.ticker,
@ -189,11 +182,14 @@ def run_ticker_analysis(
analysis_date=analysis_date, analysis_date=analysis_date,
) )
logger.info("Starting analysis for %s on %s", candidate.ticker, analysis_date) t0 = time.monotonic()
logger.info(
"[%s] ▶ Starting analysis (%s, %s conviction)",
candidate.ticker, candidate.sector, candidate.conviction,
)
try: try:
from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.observability import get_run_logger from tradingagents.observability import get_run_logger
rl = get_run_logger() rl = get_run_logger()
@ -210,23 +206,31 @@ def run_ticker_analysis(
result.risk_debate = str(final_state.get("risk_debate_state", "")) result.risk_debate = str(final_state.get("risk_debate_state", ""))
result.final_trade_decision = decision result.final_trade_decision = decision
elapsed = time.monotonic() - t0
logger.info( logger.info(
"Analysis complete for %s: %s", candidate.ticker, str(decision)[:120] "[%s] ✓ Analysis complete in %.0fs — decision: %s",
candidate.ticker, elapsed, str(decision)[:80],
) )
except Exception as exc: except Exception as exc:
logger.error("Analysis failed for %s: %s", candidate.ticker, exc, exc_info=True) elapsed = time.monotonic() - t0
logger.error(
"[%s] ✗ Analysis FAILED after %.0fs: %s",
candidate.ticker, elapsed, exc, exc_info=True,
)
result.error = str(exc) result.error = str(exc)
return result return result
async def run_all_tickers( async def run_all_tickers(
candidates: list[StockCandidate], candidates: list[StockCandidate],
macro_context: MacroContext, macro_context: MacroContext,
config: dict, config: dict,
analysis_date: str, analysis_date: str,
max_concurrent: int = 2, max_concurrent: int = 2,
on_ticker_done: Callable[[TickerResult, int, int], None] | None = None,
) -> list[TickerResult]: ) -> list[TickerResult]:
"""Run TradingAgents for every candidate with controlled concurrency. """Run TradingAgents for every candidate with controlled concurrency.
@ -239,28 +243,45 @@ async def run_all_tickers(
config: TradingAgents configuration dict. config: TradingAgents configuration dict.
analysis_date: Date string in YYYY-MM-DD format. analysis_date: Date string in YYYY-MM-DD format.
max_concurrent: Maximum number of tickers to process in parallel. max_concurrent: Maximum number of tickers to process in parallel.
on_ticker_done: Optional callback(result, done_count, total_count) fired
after each ticker finishes use this to drive a progress bar.
Returns: Returns:
List of TickerResult in completion order. List of TickerResult in completion order.
""" """
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=max_concurrent) total = len(candidates)
try: results: list[TickerResult] = []
tasks = [
loop.run_in_executor( # Use a semaphore so at most max_concurrent tickers run simultaneously,
executor, # but we still get individual completion callbacks via as_completed.
semaphore = asyncio.Semaphore(max_concurrent)
async def _run_one(candidate: StockCandidate) -> TickerResult:
async with semaphore:
return await loop.run_in_executor(
None, # use default ThreadPoolExecutor
run_ticker_analysis, run_ticker_analysis,
c, candidate,
macro_context, macro_context,
config, config,
analysis_date, analysis_date,
) )
for c in candidates
] tasks = [asyncio.create_task(_run_one(c)) for c in candidates]
results = await asyncio.gather(*tasks) done_count = 0
return list(results) for coro in asyncio.as_completed(tasks):
finally: result = await coro
executor.shutdown(wait=False) done_count += 1
results.append(result)
if on_ticker_done is not None:
try:
on_ticker_done(result, done_count, total)
except Exception: # never let a callback crash the pipeline
pass
return results
# ─── Reporting ──────────────────────────────────────────────────────────────── # ─── Reporting ────────────────────────────────────────────────────────────────