diff --git a/cli/main.py b/cli/main.py index c7b047f4..da5932aa 100644 --- a/cli/main.py +++ b/cli/main.py @@ -1580,17 +1580,62 @@ def run_pipeline( console.print( f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]" + f" [dim](up to 2 concurrent)[/dim]\n" ) - try: - with Live( - Spinner("dots", text="Analyzing..."), console=console, transient=True - ): + for c in candidates: + console.print( + f" [dim]▷ Queued:[/dim] [bold cyan]{c.ticker}[/bold cyan]" + f" [dim]{c.sector} · {c.conviction.upper()} conviction[/dim]" + ) + console.print() + import time as _time + from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn + + pipeline_start = _time.monotonic() + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[cyan]{task.completed}/{task.total}[/cyan]"), + TimeElapsedColumn(), + console=console, + transient=False, + ) as progress: + overall = progress.add_task("[bold]Pipeline progress[/bold]", total=len(candidates)) + + def on_done(result, done_count, total_count): + ticker_elapsed = _time.monotonic() - pipeline_start + if result.error: + console.print( + f" [red]✗ {result.ticker}[/red]" + f" [dim]failed ({ticker_elapsed:.0f}s elapsed) — {result.error[:80]}[/dim]" + ) + else: + decision_preview = str(result.final_trade_decision)[:70].replace("\n", " ") + console.print( + f" [green]✓ {result.ticker}[/green]" + f" [dim]({done_count}/{total_count}, {ticker_elapsed:.0f}s elapsed)[/dim]" + f" → {decision_preview}" + ) + progress.advance(overall) + + try: results = asyncio.run( - run_all_tickers(candidates, macro_context, config, analysis_date) + run_all_tickers( + candidates, macro_context, config, analysis_date, + on_ticker_done=on_done, + ) ) - except Exception as e: - console.print(f"[red]Pipeline failed: {e}[/red]") - raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Pipeline failed: {e}[/red]") + raise typer.Exit(1) + + elapsed_total = _time.monotonic() - pipeline_start + console.print( + f"\n[bold green]All {len(candidates)} ticker(s) finished in {elapsed_total:.0f}s[/bold green]\n" + ) + save_results(results, macro_context, output_dir) diff --git a/tradingagents/pipeline/macro_bridge.py b/tradingagents/pipeline/macro_bridge.py index 42759c63..e18c6ef9 100644 --- a/tradingagents/pipeline/macro_bridge.py +++ b/tradingagents/pipeline/macro_bridge.py @@ -5,12 +5,14 @@ from __future__ import annotations import asyncio import json import logging +import time from concurrent.futures import ThreadPoolExecutor from tradingagents.agents.utils.json_utils import extract_json from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import Literal +from typing import Callable, Literal + logger = logging.getLogger(__name__) @@ -172,15 +174,6 @@ def run_ticker_analysis( NOTE: TradingAgentsGraph is synchronous — call this from a thread pool when running multiple tickers concurrently (see run_all_tickers). - - Args: - candidate: The stock candidate to analyse. - macro_context: Macro context to embed in the result. - config: TradingAgents configuration dict. - analysis_date: Date string in YYYY-MM-DD format. - - Returns: - TickerResult with all report fields populated, or error set on failure. """ result = TickerResult( ticker=candidate.ticker, @@ -189,11 +182,14 @@ def run_ticker_analysis( analysis_date=analysis_date, ) - logger.info("Starting analysis for %s on %s", candidate.ticker, analysis_date) + t0 = time.monotonic() + logger.info( + "[%s] ▶ Starting analysis (%s, %s conviction)", + candidate.ticker, candidate.sector, candidate.conviction, + ) try: from tradingagents.graph.trading_graph import TradingAgentsGraph - from tradingagents.observability import get_run_logger rl = get_run_logger() @@ -210,23 +206,31 @@ def run_ticker_analysis( result.risk_debate = str(final_state.get("risk_debate_state", "")) result.final_trade_decision = decision + elapsed = time.monotonic() - t0 logger.info( - "Analysis complete for %s: %s", candidate.ticker, str(decision)[:120] + "[%s] ✓ Analysis complete in %.0fs — decision: %s", + candidate.ticker, elapsed, str(decision)[:80], ) except Exception as exc: - logger.error("Analysis failed for %s: %s", candidate.ticker, exc, exc_info=True) + elapsed = time.monotonic() - t0 + logger.error( + "[%s] ✗ Analysis FAILED after %.0fs: %s", + candidate.ticker, elapsed, exc, exc_info=True, + ) result.error = str(exc) return result + async def run_all_tickers( candidates: list[StockCandidate], macro_context: MacroContext, config: dict, analysis_date: str, max_concurrent: int = 2, + on_ticker_done: Callable[[TickerResult, int, int], None] | None = None, ) -> list[TickerResult]: """Run TradingAgents for every candidate with controlled concurrency. @@ -239,28 +243,45 @@ async def run_all_tickers( config: TradingAgents configuration dict. analysis_date: Date string in YYYY-MM-DD format. max_concurrent: Maximum number of tickers to process in parallel. + on_ticker_done: Optional callback(result, done_count, total_count) fired + after each ticker finishes — use this to drive a progress bar. Returns: List of TickerResult in completion order. """ loop = asyncio.get_running_loop() - executor = ThreadPoolExecutor(max_workers=max_concurrent) - try: - tasks = [ - loop.run_in_executor( - executor, + total = len(candidates) + results: list[TickerResult] = [] + + # Use a semaphore so at most max_concurrent tickers run simultaneously, + # but we still get individual completion callbacks via as_completed. + semaphore = asyncio.Semaphore(max_concurrent) + + async def _run_one(candidate: StockCandidate) -> TickerResult: + async with semaphore: + return await loop.run_in_executor( + None, # use default ThreadPoolExecutor run_ticker_analysis, - c, + candidate, macro_context, config, analysis_date, ) - for c in candidates - ] - results = await asyncio.gather(*tasks) - return list(results) - finally: - executor.shutdown(wait=False) + + tasks = [asyncio.create_task(_run_one(c)) for c in candidates] + done_count = 0 + for coro in asyncio.as_completed(tasks): + result = await coro + done_count += 1 + results.append(result) + if on_ticker_done is not None: + try: + on_ticker_done(result, done_count, total) + except Exception: # never let a callback crash the pipeline + pass + + return results + # ─── Reporting ────────────────────────────────────────────────────────────────