Add API consumption estimation module and CLI command

- New tradingagents/api_usage.py: Pre-run estimation of API calls per vendor
  for analyze, scan, and pipeline commands. Includes Alpha Vantage tier
  assessment (free: 25/day vs premium: 75/min).
- New CLI command: `estimate-api [analyze|scan|pipeline|all]`
- Enhanced observability: RunLogger.summary() now includes vendor_methods
  breakdown (vendor → method → call count)
- Enhanced CLI output: All 3 command summaries (analyze, scan, pipeline)
  now show per-vendor breakdown and Alpha Vantage assessment after runs
- 32 new tests in tests/unit/test_api_usage.py

Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/bb80e772-3e03-420e-bb0e-76cfdde14a04
This commit is contained in:
copilot-swe-agent[bot] 2026-03-21 17:25:26 +00:00
parent a8ccff31a0
commit 92ebc13ce4
4 changed files with 780 additions and 0 deletions

View File

@ -38,6 +38,7 @@ from tradingagents.graph.scanner_graph import ScannerGraph
from cli.announcements import fetch_announcements, display_announcements from cli.announcements import fetch_announcements, display_announcements
from cli.stats_handler import StatsCallbackHandler from cli.stats_handler import StatsCallbackHandler
from tradingagents.observability import RunLogger, set_run_logger from tradingagents.observability import RunLogger, set_run_logger
from tradingagents.api_usage import format_vendor_breakdown, format_av_assessment
console = Console() console = Console()
@ -1212,12 +1213,17 @@ def run_analysis():
log_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True)
run_logger.write_log(log_dir / "run_log.jsonl") run_logger.write_log(log_dir / "run_log.jsonl")
summary = run_logger.summary() summary = run_logger.summary()
vendor_breakdown = format_vendor_breakdown(summary)
av_assessment = format_av_assessment(summary)
console.print( console.print(
f"[dim]LLM calls: {summary['llm_calls']} | " f"[dim]LLM calls: {summary['llm_calls']} | "
f"Tokens: {summary['tokens_in']}{summary['tokens_out']} | " f"Tokens: {summary['tokens_in']}{summary['tokens_out']} | "
f"Tools: {summary['tool_calls']} | " f"Tools: {summary['tool_calls']} | "
f"Vendor calls: {summary['vendor_success']}ok/{summary['vendor_fail']}fail[/dim]" f"Vendor calls: {summary['vendor_success']}ok/{summary['vendor_fail']}fail[/dim]"
) )
if vendor_breakdown:
console.print(f"[dim] Vendors: {vendor_breakdown}[/dim]")
console.print(f"[dim] {av_assessment}[/dim]")
set_run_logger(None) set_run_logger(None)
# Prompt to display full report # Prompt to display full report
@ -1295,12 +1301,17 @@ def run_scan(date: Optional[str] = None):
# Write observability log # Write observability log
run_logger.write_log(save_dir / "run_log.jsonl") run_logger.write_log(save_dir / "run_log.jsonl")
scan_summary = run_logger.summary() scan_summary = run_logger.summary()
vendor_breakdown = format_vendor_breakdown(scan_summary)
av_assessment = format_av_assessment(scan_summary)
console.print( console.print(
f"[dim]LLM calls: {scan_summary['llm_calls']} | " f"[dim]LLM calls: {scan_summary['llm_calls']} | "
f"Tokens: {scan_summary['tokens_in']}{scan_summary['tokens_out']} | " f"Tokens: {scan_summary['tokens_in']}{scan_summary['tokens_out']} | "
f"Tools: {scan_summary['tool_calls']} | " f"Tools: {scan_summary['tool_calls']} | "
f"Vendor calls: {scan_summary['vendor_success']}ok/{scan_summary['vendor_fail']}fail[/dim]" f"Vendor calls: {scan_summary['vendor_success']}ok/{scan_summary['vendor_fail']}fail[/dim]"
) )
if vendor_breakdown:
console.print(f"[dim] Vendors: {vendor_breakdown}[/dim]")
console.print(f"[dim] {av_assessment}[/dim]")
set_run_logger(None) set_run_logger(None)
# Append to daily digest and sync to NotebookLM # Append to daily digest and sync to NotebookLM
@ -1419,12 +1430,17 @@ def run_pipeline(
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
run_logger.write_log(output_dir / "run_log.jsonl") run_logger.write_log(output_dir / "run_log.jsonl")
pipe_summary = run_logger.summary() pipe_summary = run_logger.summary()
vendor_breakdown = format_vendor_breakdown(pipe_summary)
av_assessment = format_av_assessment(pipe_summary)
console.print( console.print(
f"[dim]LLM calls: {pipe_summary['llm_calls']} | " f"[dim]LLM calls: {pipe_summary['llm_calls']} | "
f"Tokens: {pipe_summary['tokens_in']}{pipe_summary['tokens_out']} | " f"Tokens: {pipe_summary['tokens_in']}{pipe_summary['tokens_out']} | "
f"Tools: {pipe_summary['tool_calls']} | " f"Tools: {pipe_summary['tool_calls']} | "
f"Vendor calls: {pipe_summary['vendor_success']}ok/{pipe_summary['vendor_fail']}fail[/dim]" f"Vendor calls: {pipe_summary['vendor_success']}ok/{pipe_summary['vendor_fail']}fail[/dim]"
) )
if vendor_breakdown:
console.print(f"[dim] Vendors: {vendor_breakdown}[/dim]")
console.print(f"[dim] {av_assessment}[/dim]")
set_run_logger(None) set_run_logger(None)
# Append to daily digest and sync to NotebookLM # Append to daily digest and sync to NotebookLM
@ -1591,5 +1607,65 @@ def auto(
run_portfolio(portfolio_id, date, macro_path) run_portfolio(portfolio_id, date, macro_path)
@app.command(name="estimate-api")
def estimate_api(
command: str = typer.Argument("all", help="Command to estimate: analyze, scan, pipeline, or all"),
num_tickers: int = typer.Option(5, "--tickers", "-t", help="Expected tickers for pipeline estimate"),
num_indicators: int = typer.Option(6, "--indicators", "-i", help="Expected indicator calls per ticker"),
):
"""Estimate API usage per vendor (helps decide if AV premium is needed)."""
from tradingagents.api_usage import (
estimate_analyze,
estimate_scan,
estimate_pipeline,
format_estimate,
AV_FREE_DAILY_LIMIT,
AV_PREMIUM_PER_MINUTE,
)
console.print(Panel("[bold green]API Usage Estimation[/bold green]", border_style="green"))
console.print(
f"[dim]Alpha Vantage tiers: FREE = {AV_FREE_DAILY_LIMIT} calls/day | "
f"Premium ($30/mo) = {AV_PREMIUM_PER_MINUTE} calls/min, unlimited daily[/dim]\n"
)
estimates = []
if command in ("analyze", "all"):
estimates.append(estimate_analyze(num_indicators=num_indicators))
if command in ("scan", "all"):
estimates.append(estimate_scan())
if command in ("pipeline", "all"):
estimates.append(estimate_pipeline(num_tickers=num_tickers, num_indicators=num_indicators))
if not estimates:
console.print(f"[red]Unknown command: {command}. Use: analyze, scan, pipeline, or all[/red]")
raise typer.Exit(1)
for est in estimates:
console.print(Panel(format_estimate(est), title=est.command, border_style="cyan"))
# Overall AV assessment
console.print("\n[bold]Alpha Vantage Subscription Recommendation:[/bold]")
max_av = max(e.vendor_calls.alpha_vantage for e in estimates)
if max_av == 0:
console.print(
" [green]✓ Current config uses yfinance (free) for all data.[/green]\n"
" [green] Alpha Vantage subscription is NOT needed.[/green]\n"
" [dim] To switch to AV, set TRADINGAGENTS_VENDOR_* env vars to 'alpha_vantage'.[/dim]"
)
else:
total_daily = sum(e.vendor_calls.alpha_vantage for e in estimates)
if total_daily <= AV_FREE_DAILY_LIMIT:
console.print(
f" [green]✓ Total AV calls ({total_daily}) fit the FREE tier ({AV_FREE_DAILY_LIMIT}/day).[/green]\n"
f" [green] No premium subscription needed for a single daily run.[/green]"
)
else:
console.print(
f" [yellow]⚠ Total AV calls ({total_daily}) exceed the FREE tier ({AV_FREE_DAILY_LIMIT}/day).[/yellow]\n"
f" [yellow] Premium subscription recommended ($30/month).[/yellow]"
)
if __name__ == "__main__": if __name__ == "__main__":
app() app()

View File

@ -0,0 +1,288 @@
"""Tests for tradingagents/api_usage.py — API consumption estimation."""
import pytest
from tradingagents.api_usage import (
AV_FREE_DAILY_LIMIT,
AV_PREMIUM_PER_MINUTE,
UsageEstimate,
VendorEstimate,
estimate_analyze,
estimate_pipeline,
estimate_scan,
format_av_assessment,
format_estimate,
format_vendor_breakdown,
)
# ──────────────────────────────────────────────────────────────────────────────
# VendorEstimate
# ──────────────────────────────────────────────────────────────────────────────
class TestVendorEstimate:
def test_total(self):
ve = VendorEstimate(yfinance=10, alpha_vantage=5, finnhub=2)
assert ve.total == 17
def test_default_zeros(self):
ve = VendorEstimate()
assert ve.total == 0
# ──────────────────────────────────────────────────────────────────────────────
# UsageEstimate
# ──────────────────────────────────────────────────────────────────────────────
class TestUsageEstimate:
def test_av_fits_free_tier_true(self):
est = UsageEstimate(
command="test",
description="test",
vendor_calls=VendorEstimate(alpha_vantage=10),
)
assert est.av_fits_free_tier() is True
def test_av_fits_free_tier_false(self):
est = UsageEstimate(
command="test",
description="test",
vendor_calls=VendorEstimate(alpha_vantage=100),
)
assert est.av_fits_free_tier() is False
def test_av_daily_runs_free(self):
est = UsageEstimate(
command="test",
description="test",
vendor_calls=VendorEstimate(alpha_vantage=5),
)
assert est.av_daily_runs_free() == AV_FREE_DAILY_LIMIT // 5
def test_av_daily_runs_free_zero_av(self):
est = UsageEstimate(
command="test",
description="test",
vendor_calls=VendorEstimate(alpha_vantage=0),
)
assert est.av_daily_runs_free() == -1 # unlimited
# ──────────────────────────────────────────────────────────────────────────────
# estimate_analyze — default config (yfinance primary)
# ──────────────────────────────────────────────────────────────────────────────
class TestEstimateAnalyze:
def test_default_config_no_av_calls(self):
"""With default config (yfinance primary), AV calls should be 0."""
est = estimate_analyze()
assert est.vendor_calls.alpha_vantage == 0
assert est.vendor_calls.yfinance > 0
def test_all_analysts_nonzero_total(self):
est = estimate_analyze(selected_analysts=["market", "news", "fundamentals", "social"])
assert est.vendor_calls.total > 0
def test_market_only(self):
est = estimate_analyze(selected_analysts=["market"], num_indicators=4)
# 1 stock data + 4 indicators = 5 calls
assert est.vendor_calls.total >= 5
def test_fundamentals_includes_insider(self):
"""Fundamentals analyst should include insider_transactions (Finnhub default)."""
est = estimate_analyze(selected_analysts=["fundamentals"])
# insider_transactions defaults to finnhub
assert est.vendor_calls.finnhub >= 1
def test_num_indicators_varies_total(self):
est_low = estimate_analyze(selected_analysts=["market"], num_indicators=2)
est_high = estimate_analyze(selected_analysts=["market"], num_indicators=8)
assert est_high.vendor_calls.total > est_low.vendor_calls.total
def test_av_config_counts_av_calls(self):
"""When AV is configured as primary, calls should show up under alpha_vantage."""
av_config = {
"data_vendors": {
"core_stock_apis": "alpha_vantage",
"technical_indicators": "alpha_vantage",
"fundamental_data": "alpha_vantage",
"news_data": "alpha_vantage",
"scanner_data": "alpha_vantage",
"calendar_data": "finnhub",
},
"tool_vendors": {
"get_insider_transactions": "alpha_vantage",
},
}
est = estimate_analyze(config=av_config, selected_analysts=["market", "fundamentals"])
assert est.vendor_calls.alpha_vantage > 0
assert est.vendor_calls.yfinance == 0
def test_method_breakdown_has_entries(self):
est = estimate_analyze(selected_analysts=["market"])
assert len(est.method_breakdown) > 0
def test_notes_populated(self):
est = estimate_analyze()
assert len(est.notes) > 0
# ──────────────────────────────────────────────────────────────────────────────
# estimate_scan — default config (yfinance primary)
# ──────────────────────────────────────────────────────────────────────────────
class TestEstimateScan:
def test_default_config_uses_yfinance(self):
est = estimate_scan()
assert est.vendor_calls.yfinance > 0
def test_finnhub_for_calendars(self):
"""Calendars should always use Finnhub."""
est = estimate_scan()
assert est.vendor_calls.finnhub >= 2 # earnings + economic calendar
def test_scan_total_reasonable(self):
est = estimate_scan()
# Should be between 15-40 calls total
assert 10 <= est.vendor_calls.total <= 50
def test_notes_have_phases(self):
est = estimate_scan()
phase_notes = [n for n in est.notes if "Phase" in n]
assert len(phase_notes) >= 3 # Phase 1A, 1B, 1C, 2, 3
# ──────────────────────────────────────────────────────────────────────────────
# estimate_pipeline
# ──────────────────────────────────────────────────────────────────────────────
class TestEstimatePipeline:
def test_pipeline_larger_than_scan(self):
scan_est = estimate_scan()
pipe_est = estimate_pipeline(num_tickers=3)
assert pipe_est.vendor_calls.total > scan_est.vendor_calls.total
def test_pipeline_scales_with_tickers(self):
est3 = estimate_pipeline(num_tickers=3)
est7 = estimate_pipeline(num_tickers=7)
assert est7.vendor_calls.total > est3.vendor_calls.total
def test_pipeline_av_config(self):
"""Pipeline with AV config should report AV calls."""
av_config = {
"data_vendors": {
"core_stock_apis": "alpha_vantage",
"technical_indicators": "alpha_vantage",
"fundamental_data": "alpha_vantage",
"news_data": "alpha_vantage",
"scanner_data": "alpha_vantage",
"calendar_data": "finnhub",
},
"tool_vendors": {},
}
est = estimate_pipeline(config=av_config, num_tickers=5)
assert est.vendor_calls.alpha_vantage > 0
# ──────────────────────────────────────────────────────────────────────────────
# format_estimate
# ──────────────────────────────────────────────────────────────────────────────
class TestFormatEstimate:
def test_contains_vendor_counts(self):
est = estimate_analyze()
text = format_estimate(est)
assert "yfinance" in text
assert "Total:" in text
def test_no_av_shows_not_needed(self):
est = estimate_analyze() # default config → no AV
text = format_estimate(est)
assert "NOT needed" in text
def test_av_shows_assessment(self):
av_config = {
"data_vendors": {
"core_stock_apis": "alpha_vantage",
"technical_indicators": "alpha_vantage",
"fundamental_data": "alpha_vantage",
"news_data": "alpha_vantage",
"scanner_data": "alpha_vantage",
"calendar_data": "finnhub",
},
"tool_vendors": {},
}
est = estimate_analyze(config=av_config)
text = format_estimate(est)
assert "Alpha Vantage" in text
# ──────────────────────────────────────────────────────────────────────────────
# format_vendor_breakdown (actual run data)
# ──────────────────────────────────────────────────────────────────────────────
class TestFormatVendorBreakdown:
def test_empty_summary(self):
assert format_vendor_breakdown({}) == ""
def test_yfinance_only(self):
summary = {"vendors_used": {"yfinance": {"ok": 10, "fail": 0}}}
text = format_vendor_breakdown(summary)
assert "yfinance:10ok/0fail" in text
def test_multiple_vendors(self):
summary = {
"vendors_used": {
"yfinance": {"ok": 8, "fail": 1},
"alpha_vantage": {"ok": 3, "fail": 0},
"finnhub": {"ok": 2, "fail": 0},
}
}
text = format_vendor_breakdown(summary)
assert "yfinance:8ok/1fail" in text
assert "AV:3ok/0fail" in text
assert "Finnhub:2ok/0fail" in text
# ──────────────────────────────────────────────────────────────────────────────
# format_av_assessment (actual run data)
# ──────────────────────────────────────────────────────────────────────────────
class TestFormatAvAssessment:
def test_no_av_used(self):
summary = {"vendors_used": {"yfinance": {"ok": 10, "fail": 0}}}
text = format_av_assessment(summary)
assert "not used" in text
def test_av_within_free(self):
summary = {"vendors_used": {"alpha_vantage": {"ok": 5, "fail": 0}}}
text = format_av_assessment(summary)
assert "free tier" in text
assert "5 calls" in text
def test_av_exceeds_free(self):
summary = {"vendors_used": {"alpha_vantage": {"ok": 30, "fail": 0}}}
text = format_av_assessment(summary)
assert "exceeds" in text
assert "Premium" in text
# ──────────────────────────────────────────────────────────────────────────────
# Constants
# ──────────────────────────────────────────────────────────────────────────────
class TestConstants:
def test_av_free_daily_limit(self):
assert AV_FREE_DAILY_LIMIT == 25
def test_av_premium_per_minute(self):
assert AV_PREMIUM_PER_MINUTE == 75

406
tradingagents/api_usage.py Normal file
View File

@ -0,0 +1,406 @@
"""API consumption estimation for TradingAgents.
Provides static estimates of how many external API calls each command
(analyze, scan, pipeline) will make, broken down by vendor. This helps
users decide whether they need an Alpha Vantage premium subscription.
Alpha Vantage tiers
-------------------
- **Free**: 25 API calls per day
- **Premium (30 $/month)**: 75 calls per minute, unlimited daily
Each ``get_*`` method that hits Alpha Vantage counts as **1 API call**,
regardless of how much data is returned.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
# ──────────────────────────────────────────────────────────────────────────────
# Alpha Vantage tier limits
# ──────────────────────────────────────────────────────────────────────────────
AV_FREE_DAILY_LIMIT = 25
AV_PREMIUM_PER_MINUTE = 75
# ──────────────────────────────────────────────────────────────────────────────
# Per-method AV call cost.
# When Alpha Vantage is the vendor, each invocation of a route_to_vendor
# method triggers exactly one AV HTTP request — except get_indicators,
# which the LLM may call multiple times (once per indicator).
# ──────────────────────────────────────────────────────────────────────────────
_AV_CALLS_PER_METHOD: dict[str, int] = {
"get_stock_data": 1, # TIME_SERIES_DAILY_ADJUSTED
"get_indicators": 1, # SMA / EMA / RSI / MACD / BBANDS / ATR (1 call each)
"get_fundamentals": 1, # OVERVIEW
"get_balance_sheet": 1, # BALANCE_SHEET
"get_cashflow": 1, # CASH_FLOW
"get_income_statement": 1, # INCOME_STATEMENT
"get_news": 1, # NEWS_SENTIMENT
"get_global_news": 1, # NEWS_SENTIMENT (no ticker)
"get_insider_transactions": 1, # INSIDER_TRANSACTIONS
"get_market_movers": 1, # TOP_GAINERS_LOSERS
"get_market_indices": 1, # multiple quote calls
"get_sector_performance": 1, # SECTOR
"get_industry_performance": 1, # sector ETF lookup
"get_topic_news": 1, # NEWS_SENTIMENT (topic filter)
}
@dataclass
class VendorEstimate:
"""Estimated API call counts per vendor for a single operation."""
yfinance: int = 0
alpha_vantage: int = 0
finnhub: int = 0
@property
def total(self) -> int:
return self.yfinance + self.alpha_vantage + self.finnhub
@dataclass
class UsageEstimate:
"""Full API usage estimate for a command."""
command: str
description: str
vendor_calls: VendorEstimate = field(default_factory=VendorEstimate)
# Breakdown of calls by method → count (only for non-zero vendors)
method_breakdown: dict[str, dict[str, int]] = field(default_factory=dict)
notes: list[str] = field(default_factory=list)
def av_fits_free_tier(self) -> bool:
"""Whether the Alpha Vantage calls fit within the free daily limit."""
return self.vendor_calls.alpha_vantage <= AV_FREE_DAILY_LIMIT
def av_daily_runs_free(self) -> int:
"""How many times this command can run per day on the free AV tier."""
if self.vendor_calls.alpha_vantage == 0:
return -1 # unlimited (doesn't use AV)
return AV_FREE_DAILY_LIMIT // self.vendor_calls.alpha_vantage
# ──────────────────────────────────────────────────────────────────────────────
# Estimators for each command type
# ──────────────────────────────────────────────────────────────────────────────
def _resolve_vendor(config: dict, method: str) -> str:
"""Determine which vendor a method will use given the config."""
from tradingagents.dataflows.interface import (
TOOLS_CATEGORIES,
VENDOR_METHODS,
get_category_for_method,
)
# Tool-level override first
tool_vendors = config.get("tool_vendors", {})
if method in tool_vendors:
return tool_vendors[method]
# Category-level
try:
category = get_category_for_method(method)
except ValueError:
return "unknown"
return config.get("data_vendors", {}).get(category, "yfinance")
def estimate_analyze(
config: dict | None = None,
selected_analysts: list[str] | None = None,
num_indicators: int = 6,
) -> UsageEstimate:
"""Estimate API calls for a single stock analysis.
Args:
config: TradingAgents config dict (uses DEFAULT_CONFIG if None).
selected_analysts: Which analysts are enabled.
Defaults to ``["market", "social", "news", "fundamentals"]``.
num_indicators: Expected number of indicator calls from the market
analyst (LLM decides, but 4-8 is typical).
Returns:
:class:`UsageEstimate` with per-vendor breakdowns.
"""
if config is None:
from tradingagents.default_config import DEFAULT_CONFIG
config = DEFAULT_CONFIG
if selected_analysts is None:
selected_analysts = ["market", "social", "news", "fundamentals"]
est = UsageEstimate(
command="analyze",
description="Single stock analysis",
)
breakdown: dict[str, dict[str, int]] = {}
def _add(method: str, count: int = 1) -> None:
vendor = _resolve_vendor(config, method)
if vendor == "yfinance":
est.vendor_calls.yfinance += count
elif vendor == "alpha_vantage":
est.vendor_calls.alpha_vantage += count
elif vendor == "finnhub":
est.vendor_calls.finnhub += count
# Track breakdown
if vendor not in breakdown:
breakdown[vendor] = {}
breakdown[vendor][method] = breakdown[vendor].get(method, 0) + count
# Market Analyst
if "market" in selected_analysts:
_add("get_stock_data")
for _ in range(num_indicators):
_add("get_indicators")
est.notes.append(
f"Market analyst: 1 stock data + ~{num_indicators} indicator calls "
f"(LLM chooses which indicators; actual count may vary)"
)
# Fundamentals Analyst
if "fundamentals" in selected_analysts:
_add("get_fundamentals")
_add("get_income_statement")
_add("get_balance_sheet")
_add("get_cashflow")
_add("get_insider_transactions")
est.notes.append(
"Fundamentals analyst: overview + 3 financial statements + insider transactions"
)
# News Analyst
if "news" in selected_analysts:
_add("get_news")
_add("get_global_news")
est.notes.append("News analyst: ticker news + global news")
# Social Media Analyst (uses same news tools)
if "social" in selected_analysts:
_add("get_news")
est.notes.append("Social analyst: ticker news/sentiment")
est.method_breakdown = breakdown
return est
def estimate_scan(config: dict | None = None) -> UsageEstimate:
"""Estimate API calls for a market-wide scan.
Args:
config: TradingAgents config dict (uses DEFAULT_CONFIG if None).
Returns:
:class:`UsageEstimate` with per-vendor breakdowns.
"""
if config is None:
from tradingagents.default_config import DEFAULT_CONFIG
config = DEFAULT_CONFIG
est = UsageEstimate(
command="scan",
description="Market-wide macro scan (3 phases)",
)
breakdown: dict[str, dict[str, int]] = {}
def _add(method: str, count: int = 1) -> None:
vendor = _resolve_vendor(config, method)
if vendor == "yfinance":
est.vendor_calls.yfinance += count
elif vendor == "alpha_vantage":
est.vendor_calls.alpha_vantage += count
elif vendor == "finnhub":
est.vendor_calls.finnhub += count
if vendor not in breakdown:
breakdown[vendor] = {}
breakdown[vendor][method] = breakdown[vendor].get(method, 0) + count
# Phase 1A: Geopolitical Scanner — ~4 topic news calls
topic_news_calls = 4
for _ in range(topic_news_calls):
_add("get_topic_news")
est.notes.append(f"Phase 1A (Geopolitical): ~{topic_news_calls} topic news calls")
# Phase 1B: Market Movers Scanner — 3 market_movers + 1 indices
_add("get_market_movers", 3)
_add("get_market_indices")
est.notes.append("Phase 1B (Market Movers): 3 screener calls + 1 indices call")
# Phase 1C: Sector Scanner — 1 sector performance
_add("get_sector_performance")
est.notes.append("Phase 1C (Sector): 1 sector performance call")
# Phase 2: Industry Deep Dive — ~3 industry perf + ~3 topic news
industry_calls = 3
_add("get_industry_performance", industry_calls)
_add("get_topic_news", industry_calls)
est.notes.append(
f"Phase 2 (Industry Deep Dive): ~{industry_calls} industry perf + "
f"~{industry_calls} topic news calls"
)
# Phase 3: Macro Synthesis — ~2 topic news + calendars
_add("get_topic_news", 2)
_add("get_earnings_calendar")
_add("get_economic_calendar")
est.notes.append("Phase 3 (Macro Synthesis): ~2 topic news + calendar calls")
est.method_breakdown = breakdown
return est
def estimate_pipeline(
config: dict | None = None,
num_tickers: int = 5,
selected_analysts: list[str] | None = None,
num_indicators: int = 6,
) -> UsageEstimate:
"""Estimate API calls for a full pipeline (scan → filter → analyze).
Args:
config: TradingAgents config dict.
num_tickers: Expected number of tickers after filtering (typically 3-7).
selected_analysts: Analysts for each ticker analysis.
num_indicators: Expected indicator calls per ticker.
Returns:
:class:`UsageEstimate` with per-vendor breakdowns.
"""
scan_est = estimate_scan(config)
analyze_est = estimate_analyze(config, selected_analysts, num_indicators)
est = UsageEstimate(
command="pipeline",
description=f"Full pipeline: scan + {num_tickers} ticker analyses",
)
# Scan phase
est.vendor_calls.yfinance += scan_est.vendor_calls.yfinance
est.vendor_calls.alpha_vantage += scan_est.vendor_calls.alpha_vantage
est.vendor_calls.finnhub += scan_est.vendor_calls.finnhub
# Analyze phase × num_tickers
est.vendor_calls.yfinance += analyze_est.vendor_calls.yfinance * num_tickers
est.vendor_calls.alpha_vantage += analyze_est.vendor_calls.alpha_vantage * num_tickers
est.vendor_calls.finnhub += analyze_est.vendor_calls.finnhub * num_tickers
# Merge breakdowns
merged: dict[str, dict[str, int]] = {}
for vendor, methods in scan_est.method_breakdown.items():
merged.setdefault(vendor, {})
for method, count in methods.items():
merged[vendor][method] = merged[vendor].get(method, 0) + count
for vendor, methods in analyze_est.method_breakdown.items():
merged.setdefault(vendor, {})
for method, count in methods.items():
merged[vendor][method] = merged[vendor].get(method, 0) + count * num_tickers
est.method_breakdown = merged
est.notes.append(f"Scan phase: {scan_est.vendor_calls.total} calls")
est.notes.append(
f"Analyze phase: {analyze_est.vendor_calls.total} calls × {num_tickers} tickers "
f"= {analyze_est.vendor_calls.total * num_tickers} calls"
)
return est
# ──────────────────────────────────────────────────────────────────────────────
# Formatting helpers
# ──────────────────────────────────────────────────────────────────────────────
def format_estimate(est: UsageEstimate) -> str:
"""Format an estimate as a human-readable multi-line string."""
lines = [
f"API Usage Estimate — {est.command}",
f" {est.description}",
"",
f" Vendor calls (estimated):",
]
vc = est.vendor_calls
if vc.yfinance:
lines.append(f" yfinance: {vc.yfinance:>4} calls (free, no key needed)")
if vc.alpha_vantage:
lines.append(f" Alpha Vantage: {vc.alpha_vantage:>3} calls (free tier: {AV_FREE_DAILY_LIMIT}/day)")
if vc.finnhub:
lines.append(f" Finnhub: {vc.finnhub:>3} calls (free tier: 60/min)")
lines.append(f" Total: {vc.total:>4} vendor API calls")
# Alpha Vantage assessment
if vc.alpha_vantage > 0:
lines.append("")
lines.append(" Alpha Vantage Assessment:")
if est.av_fits_free_tier():
daily_runs = est.av_daily_runs_free()
lines.append(
f" ✓ Fits FREE tier ({vc.alpha_vantage}/{AV_FREE_DAILY_LIMIT} daily calls). "
f"~{daily_runs} run(s)/day possible."
)
else:
lines.append(
f" ✗ Exceeds FREE tier ({vc.alpha_vantage} calls > {AV_FREE_DAILY_LIMIT}/day limit). "
f"Premium required ($30/month → {AV_PREMIUM_PER_MINUTE}/min)."
)
else:
lines.append("")
lines.append(
" Alpha Vantage Assessment:"
)
lines.append(
" ✓ No Alpha Vantage calls — AV subscription NOT needed with current config."
)
return "\n".join(lines)
def format_vendor_breakdown(summary: dict) -> str:
"""Format a RunLogger summary dict into a per-vendor breakdown string.
This is called *after* a run completes, using the actual (not estimated)
vendor call counts from ``RunLogger.summary()``.
"""
vendors_used = summary.get("vendors_used", {})
if not vendors_used:
return ""
parts: list[str] = []
for vendor in ("yfinance", "alpha_vantage", "finnhub"):
counts = vendors_used.get(vendor)
if counts:
ok = counts.get("ok", 0)
fail = counts.get("fail", 0)
label = {
"yfinance": "yfinance",
"alpha_vantage": "AV",
"finnhub": "Finnhub",
}.get(vendor, vendor)
parts.append(f"{label}:{ok}ok/{fail}fail")
return " | ".join(parts) if parts else ""
def format_av_assessment(summary: dict) -> str:
"""Return a one-line Alpha Vantage assessment from actual run data."""
vendors_used = summary.get("vendors_used", {})
av = vendors_used.get("alpha_vantage")
if not av:
return "AV: not used (no subscription needed with current config)"
av_total = av.get("ok", 0) + av.get("fail", 0)
if av_total <= AV_FREE_DAILY_LIMIT:
daily_runs = AV_FREE_DAILY_LIMIT // max(av_total, 1)
return (
f"AV: {av_total} calls — fits free tier "
f"({AV_FREE_DAILY_LIMIT}/day, ~{daily_runs} runs/day)"
)
return (
f"AV: {av_total} calls — exceeds free tier! "
f"Premium needed ($30/mo → {AV_PREMIUM_PER_MINUTE}/min)"
)

View File

@ -153,6 +153,15 @@ class RunLogger:
else: else:
vendor_counts[v]["fail"] += 1 vendor_counts[v]["fail"] += 1
# Group vendor calls by vendor → method for detailed breakdown
vendor_methods: dict[str, dict[str, int]] = {}
for e in vendor_events:
v = e.data["vendor"]
m = e.data.get("method", "unknown")
if v not in vendor_methods:
vendor_methods[v] = {}
vendor_methods[v][m] = vendor_methods[v].get(m, 0) + 1
return { return {
"elapsed_s": round(time.time() - self._start, 1), "elapsed_s": round(time.time() - self._start, 1),
"llm_calls": len(llm_events), "llm_calls": len(llm_events),
@ -167,6 +176,7 @@ class RunLogger:
"vendor_success": vendor_ok, "vendor_success": vendor_ok,
"vendor_fail": vendor_fail, "vendor_fail": vendor_fail,
"vendors_used": vendor_counts, "vendors_used": vendor_counts,
"vendor_methods": vendor_methods,
} }
def write_log(self, path: Path) -> None: def write_log(self, path: Path) -> None: