feat: add per-ticker progress logging to pipeline

Before this change, the pipeline showed a generic 'Analyzing...' spinner
for the entire multi-ticker run with no way to know which ticker was
processing or whether anything was actually working.

Changes:
- macro_bridge.py:
  - run_ticker_analysis: logs '▶ Starting', '✓ complete in Xs', '✗ FAILED'
    with elapsed time per ticker using logger.info/logger.error
  - run_all_tickers: replaced asyncio.gather (swallows all progress) with
    asyncio.as_completed + optional on_ticker_done(result, done, total)
    callback; uses asyncio.Semaphore for max_concurrent control
  - Added time and Callable imports

- cli/main.py run_pipeline:
  - Replaced Live(Spinner) with Rich Progress bar (spinner + bar + counter
    + elapsed time)
  - Prints '▷ Queued: TICKER' before analysis starts for each ticker
  - on_ticker_done callback prints '✓ TICKER (N/M, Xs elapsed) → decision'
    or '✗ TICKER failed ...' immediately as each ticker finishes
  - Prints total elapsed time when all tickers complete
This commit is contained in:
Ahmet Guzererler 2026-03-22 00:20:35 +01:00
parent 2201afd33a
commit 8364b7c9a0
2 changed files with 100 additions and 34 deletions

View File

@ -1579,17 +1579,62 @@ def run_pipeline(
console.print(
f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]"
f" [dim](up to 2 concurrent)[/dim]\n"
)
try:
with Live(
Spinner("dots", text="Analyzing..."), console=console, transient=True
):
for c in candidates:
console.print(
f" [dim]▷ Queued:[/dim] [bold cyan]{c.ticker}[/bold cyan]"
f" [dim]{c.sector} · {c.conviction.upper()} conviction[/dim]"
)
console.print()
import time as _time
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
pipeline_start = _time.monotonic()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
overall = progress.add_task("[bold]Pipeline progress[/bold]", total=len(candidates))
def on_done(result, done_count, total_count):
ticker_elapsed = _time.monotonic() - pipeline_start
if result.error:
console.print(
f" [red]✗ {result.ticker}[/red]"
f" [dim]failed ({ticker_elapsed:.0f}s elapsed) — {result.error[:80]}[/dim]"
)
else:
decision_preview = str(result.final_trade_decision)[:70].replace("\n", " ")
console.print(
f" [green]✓ {result.ticker}[/green]"
f" [dim]({done_count}/{total_count}, {ticker_elapsed:.0f}s elapsed)[/dim]"
f"{decision_preview}"
)
progress.advance(overall)
try:
results = asyncio.run(
run_all_tickers(candidates, macro_context, config, analysis_date)
run_all_tickers(
candidates, macro_context, config, analysis_date,
on_ticker_done=on_done,
)
)
except Exception as e:
console.print(f"[red]Pipeline failed: {e}[/red]")
raise typer.Exit(1)
except Exception as e:
console.print(f"[red]Pipeline failed: {e}[/red]")
raise typer.Exit(1)
elapsed_total = _time.monotonic() - pipeline_start
console.print(
f"\n[bold green]All {len(candidates)} ticker(s) finished in {elapsed_total:.0f}s[/bold green]\n"
)
save_results(results, macro_context, output_dir)

View File

@ -5,12 +5,14 @@ from __future__ import annotations
import asyncio
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from tradingagents.agents.utils.json_utils import extract_json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Literal
from typing import Callable, Literal
logger = logging.getLogger(__name__)
@ -172,15 +174,6 @@ def run_ticker_analysis(
NOTE: TradingAgentsGraph is synchronous call this from a thread pool
when running multiple tickers concurrently (see run_all_tickers).
Args:
candidate: The stock candidate to analyse.
macro_context: Macro context to embed in the result.
config: TradingAgents configuration dict.
analysis_date: Date string in YYYY-MM-DD format.
Returns:
TickerResult with all report fields populated, or error set on failure.
"""
result = TickerResult(
ticker=candidate.ticker,
@ -189,11 +182,14 @@ def run_ticker_analysis(
analysis_date=analysis_date,
)
logger.info("Starting analysis for %s on %s", candidate.ticker, analysis_date)
t0 = time.monotonic()
logger.info(
"[%s] ▶ Starting analysis (%s, %s conviction)",
candidate.ticker, candidate.sector, candidate.conviction,
)
try:
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.observability import get_run_logger
rl = get_run_logger()
@ -210,23 +206,31 @@ def run_ticker_analysis(
result.risk_debate = str(final_state.get("risk_debate_state", ""))
result.final_trade_decision = decision
elapsed = time.monotonic() - t0
logger.info(
"Analysis complete for %s: %s", candidate.ticker, str(decision)[:120]
"[%s] ✓ Analysis complete in %.0fs — decision: %s",
candidate.ticker, elapsed, str(decision)[:80],
)
except Exception as exc:
logger.error("Analysis failed for %s: %s", candidate.ticker, exc, exc_info=True)
elapsed = time.monotonic() - t0
logger.error(
"[%s] ✗ Analysis FAILED after %.0fs: %s",
candidate.ticker, elapsed, exc, exc_info=True,
)
result.error = str(exc)
return result
async def run_all_tickers(
candidates: list[StockCandidate],
macro_context: MacroContext,
config: dict,
analysis_date: str,
max_concurrent: int = 2,
on_ticker_done: Callable[[TickerResult, int, int], None] | None = None,
) -> list[TickerResult]:
"""Run TradingAgents for every candidate with controlled concurrency.
@ -239,28 +243,45 @@ async def run_all_tickers(
config: TradingAgents configuration dict.
analysis_date: Date string in YYYY-MM-DD format.
max_concurrent: Maximum number of tickers to process in parallel.
on_ticker_done: Optional callback(result, done_count, total_count) fired
after each ticker finishes use this to drive a progress bar.
Returns:
List of TickerResult in completion order.
"""
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=max_concurrent)
try:
tasks = [
loop.run_in_executor(
executor,
total = len(candidates)
results: list[TickerResult] = []
# Use a semaphore so at most max_concurrent tickers run simultaneously,
# but we still get individual completion callbacks via as_completed.
semaphore = asyncio.Semaphore(max_concurrent)
async def _run_one(candidate: StockCandidate) -> TickerResult:
async with semaphore:
return await loop.run_in_executor(
None, # use default ThreadPoolExecutor
run_ticker_analysis,
c,
candidate,
macro_context,
config,
analysis_date,
)
for c in candidates
]
results = await asyncio.gather(*tasks)
return list(results)
finally:
executor.shutdown(wait=False)
tasks = [asyncio.create_task(_run_one(c)) for c in candidates]
done_count = 0
for coro in asyncio.as_completed(tasks):
result = await coro
done_count += 1
results.append(result)
if on_ticker_done is not None:
try:
on_ticker_done(result, done_count, total)
except Exception: # never let a callback crash the pipeline
pass
return results
# ─── Reporting ────────────────────────────────────────────────────────────────