From a90f14c086f4cc0baee132f67eede3932dde2c80 Mon Sep 17 00:00:00 2001 From: ahmet guzererler Date: Thu, 19 Mar 2026 09:06:40 +0100 Subject: [PATCH] feat: unified report paths, structured observability logging, and memory system update (#22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * gitignore * feat: unify report paths under reports/daily/{date}/ hierarchy All generated artifacts now land under a single reports/ tree: - reports/daily/{date}/market/ for scan results (was results/macro_scan/) - reports/daily/{date}/{TICKER}/ for per-ticker analysis (was reports/{TICKER}_{timestamp}/) - reports/daily/{date}/{TICKER}/eval/ for eval logs (was eval_results/{TICKER}/...) Adds tradingagents/report_paths.py with centralized path helpers used by CLI commands, trading graph, and pipeline. Co-Authored-By: Claude Opus 4.6 * feat: structured observability logging for LLM, tool, and vendor calls Add RunLogger (tradingagents/observability.py) that emits JSON-lines events for every LLM call (model, agent, tokens in/out, latency), tool invocation (tool name, args, success, latency), data vendor call (method, vendor, success/failure, latency), and report save. Integration points: - route_to_vendor: log_vendor_call() on every try/catch - run_tool_loop: log_tool_call() on every tool invoke - ScannerGraph: new callbacks param, passes RunLogger.callback to all LLM tiers - pipeline/macro_bridge: picks up RunLogger from thread-local, passes to TradingAgentsGraph - cli/main.py: one RunLogger per command (analyze/scan/pipeline), write_log() at end, summary line printed to console Log files co-located with reports: reports/daily/{date}/{TICKER}/run_log.jsonl (analyze) reports/daily/{date}/market/run_log.jsonl (scan) reports/daily/{date}/run_log.jsonl (pipeline) Also fix test_long_response_no_nudge: update "A"*600 → "A"*2100 to match MIN_REPORT_LENGTH=2000 threshold set in an earlier commit. Update memory system context files (ARCHITECTURE, COMPONENTS, CONVENTIONS, GLOSSARY, CURRENT_STATE) to document observability and report path systems. Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Opus 4.6 --- .gitignore | 4 +- cli/main.py | 59 +++- docs/agent/CURRENT_STATE.md | 22 +- docs/agent/context/ARCHITECTURE.md | 37 ++- docs/agent/context/COMPONENTS.md | 4 +- docs/agent/context/CONVENTIONS.md | 12 +- docs/agent/context/GLOSSARY.md | 21 +- docs/agent/context/TECH_STACK.md | 2 +- tests/test_industry_deep_dive.py | 4 +- tests/test_scanner_comprehensive.py | 35 +-- tests/test_scanner_final.py | 2 +- tradingagents/agents/utils/tool_runner.py | 11 + tradingagents/dataflows/interface.py | 14 +- tradingagents/default_config.py | 2 +- tradingagents/graph/scanner_graph.py | 14 +- tradingagents/graph/trading_graph.py | 6 +- tradingagents/observability.py | 325 ++++++++++++++++++++++ tradingagents/pipeline/macro_bridge.py | 6 +- tradingagents/report_paths.py | 44 +++ 19 files changed, 568 insertions(+), 56 deletions(-) create mode 100644 tradingagents/observability.py create mode 100644 tradingagents/report_paths.py diff --git a/.gitignore b/.gitignore index 281b183f..b3d7582e 100644 --- a/.gitignore +++ b/.gitignore @@ -218,8 +218,10 @@ __marimo__/ # Cache **/data_cache/ -# Scan results and execution plans (generated artifacts) +# Generated reports and execution plans +reports/ results/ +eval_results/ plans/ # Backup files diff --git a/cli/main.py b/cli/main.py index 3ed61212..b485a836 100644 --- a/cli/main.py +++ b/cli/main.py @@ -28,12 +28,14 @@ from rich.align import Align from rich.rule import Rule from tradingagents.graph.trading_graph import TradingAgentsGraph +from tradingagents.report_paths import get_daily_dir, get_market_dir, get_ticker_dir from tradingagents.default_config import DEFAULT_CONFIG from cli.models import AnalystType from cli.utils import * from tradingagents.graph.scanner_graph import ScannerGraph from cli.announcements import fetch_announcements, display_announcements from cli.stats_handler import StatsCallbackHandler +from tradingagents.observability import RunLogger, set_run_logger console = Console() @@ -923,6 +925,8 @@ def run_analysis(): # Create stats callback handler for tracking LLM/tool calls stats_handler = StatsCallbackHandler() + run_logger = RunLogger() + set_run_logger(run_logger) # Normalize analyst selection to predefined order (selection is a 'set', order is fixed) selected_set = {analyst.value for analyst in selections["analysts"]} @@ -933,7 +937,7 @@ def run_analysis(): selected_analyst_keys, config=config, debug=True, - callbacks=[stats_handler], + callbacks=[stats_handler, run_logger.callback], ) # Initialize message buffer with selected analysts @@ -943,7 +947,7 @@ def run_analysis(): start_time = time.time() # Create result directory - results_dir = Path(config["results_dir"]) / selections["ticker"] / selections["analysis_date"] + results_dir = get_ticker_dir(selections["analysis_date"], selections["ticker"]) results_dir.mkdir(parents=True, exist_ok=True) report_dir = results_dir / "reports" report_dir.mkdir(parents=True, exist_ok=True) @@ -1156,8 +1160,7 @@ def run_analysis(): # Prompt to save report save_choice = typer.prompt("Save report?", default="Y").strip().upper() if save_choice in ("Y", "YES", ""): - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - default_path = Path.cwd() / "reports" / f"{selections['ticker']}_{timestamp}" + default_path = get_ticker_dir(selections["analysis_date"], selections["ticker"]) save_path_str = typer.prompt( "Save path (press Enter for default)", default=str(default_path) @@ -1170,6 +1173,19 @@ def run_analysis(): except Exception as e: console.print(f"[red]Error saving report: {e}[/red]") + # Write observability log + log_dir = get_ticker_dir(selections["analysis_date"], selections["ticker"]) + log_dir.mkdir(parents=True, exist_ok=True) + run_logger.write_log(log_dir / "run_log.jsonl") + summary = run_logger.summary() + console.print( + f"[dim]LLM calls: {summary['llm_calls']} | " + f"Tokens: {summary['tokens_in']}→{summary['tokens_out']} | " + f"Tools: {summary['tool_calls']} | " + f"Vendor calls: {summary['vendor_success']}ok/{summary['vendor_fail']}fail[/dim]" + ) + set_run_logger(None) + # Prompt to display full report display_choice = typer.prompt("\nDisplay full report on screen?", default="Y").strip().upper() if display_choice in ("Y", "YES", ""): @@ -1186,7 +1202,7 @@ def run_scan(date: Optional[str] = None): scan_date = typer.prompt("Scan date (YYYY-MM-DD)", default=default_date) # Prepare save directory - save_dir = Path("results/macro_scan") / scan_date + save_dir = get_market_dir(scan_date) save_dir.mkdir(parents=True, exist_ok=True) console.print(f"[cyan]Running 3-phase macro scanner for {scan_date}...[/cyan]") @@ -1194,8 +1210,11 @@ def run_scan(date: Optional[str] = None): console.print("[dim]Phase 2: Industry Deep Dive[/dim]") console.print("[dim]Phase 3: Macro Synthesis → stocks to investigate[/dim]\n") + run_logger = RunLogger() + set_run_logger(run_logger) + try: - scanner = ScannerGraph(config=DEFAULT_CONFIG.copy()) + scanner = ScannerGraph(config=DEFAULT_CONFIG.copy(), callbacks=[run_logger.callback]) with Live(Spinner("dots", text="Scanning..."), console=console, transient=True): result = scanner.scan(scan_date) except Exception as e: @@ -1239,6 +1258,17 @@ def run_scan(date: Optional[str] = None): pass # Summary wasn't valid JSON — already printed as markdown + # Write observability log + run_logger.write_log(save_dir / "run_log.jsonl") + scan_summary = run_logger.summary() + console.print( + f"[dim]LLM calls: {scan_summary['llm_calls']} | " + f"Tokens: {scan_summary['tokens_in']}→{scan_summary['tokens_out']} | " + f"Tools: {scan_summary['tool_calls']} | " + f"Vendor calls: {scan_summary['vendor_success']}ok/{scan_summary['vendor_fail']}fail[/dim]" + ) + set_run_logger(None) + console.print(f"\n[green]Results saved to {save_dir}[/green]") @@ -1290,7 +1320,10 @@ def run_pipeline(): return config = DEFAULT_CONFIG.copy() - output_dir = Path("results/macro_pipeline") + output_dir = get_daily_dir(analysis_date) + + run_logger = RunLogger() + set_run_logger(run_logger) console.print(f"\n[cyan]Running TradingAgents for {len(candidates)} tickers...[/cyan]") try: @@ -1304,6 +1337,18 @@ def run_pipeline(): save_results(results, macro_context, output_dir) + # Write observability log + output_dir.mkdir(parents=True, exist_ok=True) + run_logger.write_log(output_dir / "run_log.jsonl") + pipe_summary = run_logger.summary() + console.print( + f"[dim]LLM calls: {pipe_summary['llm_calls']} | " + f"Tokens: {pipe_summary['tokens_in']}→{pipe_summary['tokens_out']} | " + f"Tools: {pipe_summary['tool_calls']} | " + f"Vendor calls: {pipe_summary['vendor_success']}ok/{pipe_summary['vendor_fail']}fail[/dim]" + ) + set_run_logger(None) + successes = [r for r in results if not r.error] failures = [r for r in results if r.error] console.print(f"\n[green]Done: {len(successes)} succeeded, {len(failures)} failed[/green]") diff --git a/docs/agent/CURRENT_STATE.md b/docs/agent/CURRENT_STATE.md index 14bd64cd..7ccdb135 100644 --- a/docs/agent/CURRENT_STATE.md +++ b/docs/agent/CURRENT_STATE.md @@ -1,18 +1,28 @@ # Current Milestone -Opt-in vendor fallback (ADR 011) merged (PR #18). Pipeline CLI command implemented. Memory system v2 being built. Next: PR #20 review (Copilot memory rebuild), structured context files under `docs/agent/context/`. +Report path unification complete. Observability logging (data sources, LLM calls, tool calls, token counts) is the active task. Next: `pipeline` CLI command. # Recent Progress -- **PR #18 merged**: Fail-fast vendor routing — `FALLBACK_ALLOWED` whitelist for 5 fungible-data methods only. ADR 011 written, ADR 002 superseded. +- **PR #21 merged**: Memory system v2 — builder/reader skills, 5 context files, post-commit hook +- **PR #18 merged**: Opt-in vendor fallback — fail-fast by default, `FALLBACK_ALLOWED` whitelist for fungible data only (ADR 011) +- **PR #19 merged**: Merge conflict resolution after PR #18 +- **Report path unification** (`80e174c`): All reports now written under `reports/daily/{date}/{ticker}/` for per-ticker analysis and `reports/daily/{date}/market/` for scanner output - `pipeline` CLI command implemented — scan JSON → filter by conviction → per-ticker deep dive via `MacroBridge` - `extract_json()` utility in `agents/utils/json_utils.py` handles DeepSeek R1 `` blocks and markdown fences -- All 3 `@dataclass` types defined: `MacroContext`, `StockCandidate`, `TickerResult` in `pipeline/macro_bridge.py` -- 12 pre-existing test failures fixed across 5 files (PR merged to main) - Memory builder and reader skills created in `.claude/skills/` - Structured context files generated under `docs/agent/context/` (ARCHITECTURE, CONVENTIONS, COMPONENTS, TECH_STACK, GLOSSARY) +- 220+ offline tests passing +- 12 pre-existing test failures fixed across 5 files + +# In Progress + +- **Observability logging**: Structured logging for data source calls (vendor, endpoint, success/failure), LLM requests (model name, agent, token counts), and tool invocations (tool name, duration). Goal: understand what's being called, by whom, and at what cost per run. + +# Planned Next + +- Report path unification tests (verify new paths in integration tests) # Active Blockers -- PR #20 (Copilot memory rebuild) is open/draft — needs review for aspirational deps and model name accuracy before merge -- Some scanner integration tests lack `@pytest.mark.integration` marker despite making live network calls +- None currently diff --git a/docs/agent/context/ARCHITECTURE.md b/docs/agent/context/ARCHITECTURE.md index 350ee1a7..33291f30 100644 --- a/docs/agent/context/ARCHITECTURE.md +++ b/docs/agent/context/ARCHITECTURE.md @@ -1,4 +1,4 @@ - + # Architecture @@ -80,6 +80,39 @@ Scanner JSON output → `MacroBridge.load()` → parse into `MacroContext` + `li Source: `tradingagents/pipeline/macro_bridge.py` +## Unified Report Paths + +All generated artifacts live under `reports/daily/{YYYY-MM-DD}/`: + +``` +reports/ +└── daily/{YYYY-MM-DD}/ + ├── market/ # scan results (geopolitical_report.md, etc.) + ├── {TICKER}/ # per-ticker analysis / pipeline + │ ├── 1_analysts/ + │ ├── complete_report.md + │ └── eval/full_states_log.json + └── summary.md # pipeline combined summary +``` + +Helper functions: `get_daily_dir()`, `get_market_dir()`, `get_ticker_dir()`, `get_eval_dir()`. + +Source: `tradingagents/report_paths.py` + +## Observability + +`RunLogger` accumulates structured events (JSON-lines) for a single run. Four event kinds: `llm` (model, agent, tokens in/out, latency), `tool` (tool name, args, success, latency), `vendor` (method, vendor, success, latency), `report` (path). Thread-safe via `_lock`. + +Integration points: +- **LLM calls**: `_LLMCallbackHandler` (LangChain `BaseCallbackHandler`) — attach as callback to LLM constructors or graph invocations. Extracts model name from `invocation_params` / `serialized`, token counts from `usage_metadata`. +- **Vendor calls**: `log_vendor_call()` — called from `route_to_vendor`. +- **Tool calls**: `log_tool_call()` — called from `run_tool_loop()`. +- **Thread-local context**: `set_run_logger()` / `get_run_logger()` for passing logger to vendor/tool layers without changing signatures. + +`RunLogger.summary()` returns aggregated stats (total tokens, model breakdown, vendor success/fail counts). `RunLogger.write_log(path)` writes all events + summary to a JSON-lines file. + +Source: `tradingagents/observability.py` + ## CLI Architecture 3 Typer commands: `analyze` (interactive per-ticker), `scan` (macro scanner), `pipeline` (scan → filter → deep dive). Rich-based live UI with `MessageBuffer` (deque-backed state manager tracking agent status, reports, tool calls, defined in `cli/main.py`) and `StatsCallbackHandler` (token/timing stats, defined in `cli/stats_handler.py`). 7-step interactive questionnaire in `analyze` for provider/model selection. @@ -102,4 +135,6 @@ Source: `cli/main.py`, `cli/stats_handler.py` | `tradingagents/pipeline/macro_bridge.py` | `MacroBridge`, data classes, pipeline orchestration | | `tradingagents/agents/utils/json_utils.py` | `extract_json()` — handles DeepSeek R1 markdown wrapping | | `cli/main.py` | CLI commands, `MessageBuffer`, Rich UI, interactive setup | +| `tradingagents/report_paths.py` | Unified report path helpers (`get_market_dir`, `get_ticker_dir`, etc.) | +| `tradingagents/observability.py` | `RunLogger`, `_LLMCallbackHandler`, structured event logging | | `tradingagents/dataflows/config.py` | `set_config()`, `get_config()`, `initialize_config()` | diff --git a/docs/agent/context/COMPONENTS.md b/docs/agent/context/COMPONENTS.md index 0865261b..4b7a2631 100644 --- a/docs/agent/context/COMPONENTS.md +++ b/docs/agent/context/COMPONENTS.md @@ -1,4 +1,4 @@ - + # Components @@ -8,6 +8,8 @@ tradingagents/ ├── __init__.py ├── default_config.py # All config keys, defaults, env var overrides +├── report_paths.py # Unified report path helpers (reports/daily/{date}/) +├── observability.py # RunLogger, _LLMCallbackHandler, structured event logging ├── agents/ │ ├── __init__.py │ ├── analysts/ diff --git a/docs/agent/context/CONVENTIONS.md b/docs/agent/context/CONVENTIONS.md index f3d5d180..507346f3 100644 --- a/docs/agent/context/CONVENTIONS.md +++ b/docs/agent/context/CONVENTIONS.md @@ -1,4 +1,4 @@ - + # Conventions @@ -64,7 +64,7 @@ - Typer for command definitions, Rich for live UI. (`cli/main.py`) - `MessageBuffer` — deque-based singleton tracking agent statuses, reports, tool calls. Fixed agents grouped by team (`FIXED_AGENTS`), analysts selectable. (`cli/main.py`) - `StatsCallbackHandler` — token and timing statistics for display. (`cli/stats_handler.py`) -- Scan results saved as `{key}.md` files to `results/macro_scan/{scan_date}/`. (`cli/main.py`) +- All reports go under `reports/daily/{date}/` — use helpers from `report_paths.py`: `get_market_dir(date)` for scan results, `get_ticker_dir(date, ticker)` for per-ticker analysis, `get_eval_dir(date, ticker)` for eval logs. Never hardcode report paths. (`report_paths.py`) ## Pipeline Patterns @@ -81,6 +81,14 @@ - Env isolation: always mock env vars before `importlib.reload()` — `load_dotenv()` leaks real `.env` values otherwise. - `callable()` returns False on LangChain `@tool` objects — use `hasattr(x, "invoke")` instead. +## Observability + +- Create one `RunLogger` per CLI command (analyze/scan/pipeline). Attach `logger.callback` to LLM constructors. (`observability.py`) +- Call `set_run_logger(logger)` at run start so vendor/tool layers can access it via `get_run_logger()`. (`observability.py`) +- Vendor calls: `log_vendor_call(method, vendor, success, duration_ms)` — called inside `route_to_vendor`. (`observability.py`, `interface.py`) +- Tool calls: `log_tool_call(tool_name, args_summary, success, duration_ms)` — called inside `run_tool_loop`. (`observability.py`, `tool_runner.py`) +- Write the run log at the end of each command: `logger.write_log(report_dir / "run_log.jsonl")`. (`observability.py`) + ## Error Handling - Fail-fast by default — no silent fallback unless method is in `FALLBACK_ALLOWED`. (ADR 011) diff --git a/docs/agent/context/GLOSSARY.md b/docs/agent/context/GLOSSARY.md index ee6fc122..7369d88d 100644 --- a/docs/agent/context/GLOSSARY.md +++ b/docs/agent/context/GLOSSARY.md @@ -1,4 +1,4 @@ - + # Glossary @@ -77,6 +77,25 @@ | FIXED_AGENTS | Dict grouping non-analyst agents by team: Research Team, Trading Team, Risk Management, Portfolio Management | `cli/main.py` | | ANALYST_MAPPING | Dict: `"market"` → `"Market Analyst"`, `"social"` → `"Social Analyst"`, etc. | `cli/main.py` | +## Observability + +| Term | Definition | Source | +|------|-----------|--------| +| RunLogger | Accumulates structured events (llm, tool, vendor, report) for a single CLI run. Thread-safe. | `observability.py` | +| _LLMCallbackHandler | LangChain `BaseCallbackHandler` that feeds LLM call events (model, tokens, latency) into a `RunLogger` | `observability.py` | +| _Event | @dataclass: `kind`, `ts`, `data` — one JSON-line per event | `observability.py` | +| set_run_logger / get_run_logger | Thread-local context for passing `RunLogger` to vendor/tool layers | `observability.py` | + +## Report Paths + +| Term | Definition | Source | +|------|-----------|--------| +| REPORTS_ROOT | `Path("reports")` — root for all generated artifacts | `report_paths.py` | +| get_daily_dir | Returns `reports/daily/{date}/` | `report_paths.py` | +| get_market_dir | Returns `reports/daily/{date}/market/` — scan results | `report_paths.py` | +| get_ticker_dir | Returns `reports/daily/{date}/{TICKER}/` — per-ticker analysis | `report_paths.py` | +| get_eval_dir | Returns `reports/daily/{date}/{TICKER}/eval/` — eval logs | `report_paths.py` | + ## Constants | Constant | Value | Source | diff --git a/docs/agent/context/TECH_STACK.md b/docs/agent/context/TECH_STACK.md index f99f072c..e1cf8798 100644 --- a/docs/agent/context/TECH_STACK.md +++ b/docs/agent/context/TECH_STACK.md @@ -1,4 +1,4 @@ - + # Tech Stack diff --git a/tests/test_industry_deep_dive.py b/tests/test_industry_deep_dive.py index 52c98678..f239fcec 100644 --- a/tests/test_industry_deep_dive.py +++ b/tests/test_industry_deep_dive.py @@ -127,7 +127,7 @@ class TestToolLoopNudge: def test_long_response_no_nudge(self): """A long first response (no tool calls) should be returned as-is.""" - long_text = "A" * 600 + long_text = "A" * 2100 # must exceed MIN_REPORT_LENGTH (2000) response = AIMessage(content=long_text, tool_calls=[]) chain = self._make_chain([response]) tool = self._make_tool() @@ -139,7 +139,7 @@ class TestToolLoopNudge: def test_short_response_triggers_nudge(self): """A short first response triggers a nudge, then the LLM is re-invoked.""" short_resp = AIMessage(content="Brief.", tool_calls=[]) - long_resp = AIMessage(content="A" * 600, tool_calls=[]) + long_resp = AIMessage(content="A" * 2100, tool_calls=[]) chain = self._make_chain([short_resp, long_resp]) tool = self._make_tool() diff --git a/tests/test_scanner_comprehensive.py b/tests/test_scanner_comprehensive.py index 40047e98..ca089dd0 100644 --- a/tests/test_scanner_comprehensive.py +++ b/tests/test_scanner_comprehensive.py @@ -79,47 +79,29 @@ class TestScannerEndToEnd: def test_scan_command_creates_output_files(self): """Test that the scan command creates all expected output files.""" with tempfile.TemporaryDirectory() as temp_dir: - # Set up the test directory structure - macro_scan_dir = Path(temp_dir) / "results" / "macro_scan" - test_date_dir = macro_scan_dir / "2026-03-15" + test_date_dir = Path(temp_dir) / "market" test_date_dir.mkdir(parents=True) - - # Mock the current working directory to use our temp directory - with patch('cli.main.Path') as mock_path_class: - # Mock Path.cwd() to return our temp directory - mock_path_class.cwd.return_value = Path(temp_dir) - - # Mock Path constructor for results/macro_scan/{date} - def mock_path_constructor(*args): - path_obj = Path(*args) - # If this is the results/macro_scan/{date} path, return our test directory - if len(args) >= 3 and args[0] == "results" and args[1] == "macro_scan" and args[2] == "2026-03-15": - return test_date_dir - return path_obj - - mock_path_class.side_effect = mock_path_constructor - + + # Mock get_market_dir to redirect output into the temp directory + with patch('cli.main.get_market_dir', return_value=test_date_dir): # Mock the write_text method to capture what gets written written_files = {} def mock_write_text(self, content, encoding=None): - # Store what was written to each file written_files[str(self)] = content - + with patch('pathlib.Path.write_text', mock_write_text): - # Mock typer.prompt to return our test date with patch('typer.prompt', return_value='2026-03-15'): try: run_scan() except SystemExit: - # typer might raise SystemExit, that's ok pass - + # Verify that run_scan() uses the correct output file naming convention. # # run_scan() writes via: (save_dir / f"{key}.md").write_text(content) - # where save_dir = Path("results/macro_scan") / scan_date (relative). + # where save_dir = get_market_dir(scan_date). # pathlib.Path.write_text is mocked, so written_files keys are the - # str() of those relative Path objects — NOT absolute paths. + # str() of those Path objects. # # LLM output is non-deterministic: a phase may produce an empty string, # causing run_scan()'s `if content:` guard to skip writing that file. @@ -133,6 +115,7 @@ class TestScannerEndToEnd: "sector_performance_report.md", "industry_deep_dive_report.md", "macro_scan_summary.md", + "run_log.jsonl", } assert len(written_files) >= 1, ( diff --git a/tests/test_scanner_final.py b/tests/test_scanner_final.py index 85a3d11b..5a6f9b53 100644 --- a/tests/test_scanner_final.py +++ b/tests/test_scanner_final.py @@ -100,7 +100,7 @@ def test_scanner_integration_with_cli_scan(): # 3. CLI scan command calls get_sector_performance.invoke() # 4. CLI scan command calls get_industry_performance.invoke() # 5. CLI scan command calls get_topic_news.invoke() - # 6. Results are written to files in results/macro_scan/{date}/ + # 6. Results are written to files in reports/daily/{date}/market/ # Since we've verified the individual tools work above, and we've seen # the CLI scan command work manually, we can be confident the integration works. diff --git a/tradingagents/agents/utils/tool_runner.py b/tradingagents/agents/utils/tool_runner.py index 226db133..ada1dbc4 100644 --- a/tradingagents/agents/utils/tool_runner.py +++ b/tradingagents/agents/utils/tool_runner.py @@ -7,6 +7,7 @@ phase — so they need an inline tool-execution loop. from __future__ import annotations +import time from typing import Any, List from langchain_core.messages import AIMessage, HumanMessage, ToolMessage @@ -79,17 +80,27 @@ def run_tool_loop( first_round = False # Execute each requested tool call and append ToolMessages + from tradingagents.observability import get_run_logger + + rl = get_run_logger() for tc in result.tool_calls: tool_name = tc["name"] tool_args = tc["args"] tool_fn = tool_map.get(tool_name) if tool_fn is None: tool_output = f"Error: unknown tool '{tool_name}'" + if rl: + rl.log_tool_call(tool_name, str(tool_args)[:120], False, 0, error="unknown tool") else: + t0 = time.time() try: tool_output = tool_fn.invoke(tool_args) + if rl: + rl.log_tool_call(tool_name, str(tool_args)[:120], True, (time.time() - t0) * 1000) except Exception as e: tool_output = f"Error calling {tool_name}: {e}" + if rl: + rl.log_tool_call(tool_name, str(tool_args)[:120], False, (time.time() - t0) * 1000, error=str(e)[:200]) current_messages.append( ToolMessage(content=str(tool_output), tool_call_id=tc["id"]) diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 357109f9..d9cb7508 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -1,3 +1,4 @@ +import time from typing import Annotated # Import from vendor-specific modules @@ -239,6 +240,11 @@ def route_to_vendor(method: str, *args, **kwargs): # Fail-fast: only try configured primary vendor(s) vendors_to_try = primary_vendors + from tradingagents.observability import get_run_logger + + rl = get_run_logger() + args_summary = ", ".join(str(a)[:80] for a in args) if args else "" + last_error = None tried = [] for vendor in vendors_to_try: @@ -249,9 +255,15 @@ def route_to_vendor(method: str, *args, **kwargs): vendor_impl = VENDOR_METHODS[method][vendor] impl_func = vendor_impl[0] if isinstance(vendor_impl, list) else vendor_impl + t0 = time.time() try: - return impl_func(*args, **kwargs) + result = impl_func(*args, **kwargs) + if rl: + rl.log_vendor_call(method, vendor, True, (time.time() - t0) * 1000, args_summary=args_summary) + return result except (AlphaVantageError, FinnhubError, ConnectionError, TimeoutError) as exc: + if rl: + rl.log_vendor_call(method, vendor, False, (time.time() - t0) * 1000, error=str(exc)[:200], args_summary=args_summary) last_error = exc continue diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index d5cb1c3a..213afd80 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -38,7 +38,7 @@ def _env_int(key: str, default=None): DEFAULT_CONFIG = { "project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")), - "results_dir": _env("RESULTS_DIR", "./results"), + "results_dir": _env("RESULTS_DIR", "./reports"), "data_cache_dir": os.path.join( os.path.abspath(os.path.join(os.path.dirname(__file__), ".")), "dataflows/data_cache", diff --git a/tradingagents/graph/scanner_graph.py b/tradingagents/graph/scanner_graph.py index 9bccd0ff..c35890c5 100644 --- a/tradingagents/graph/scanner_graph.py +++ b/tradingagents/graph/scanner_graph.py @@ -1,6 +1,6 @@ """Scanner graph — orchestrates the 3-phase macro scanner pipeline.""" -from typing import Any +from typing import Any, List, Optional from tradingagents.default_config import DEFAULT_CONFIG from tradingagents.llm_clients import create_llm_client @@ -22,15 +22,22 @@ class ScannerGraph: Phase 3: macro_synthesis -> END """ - def __init__(self, config: dict[str, Any] | None = None, debug: bool = False) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + debug: bool = False, + callbacks: Optional[List] = None, + ) -> None: """Initialize the scanner graph. Args: config: Configuration dictionary. Falls back to DEFAULT_CONFIG when None. debug: Whether to stream and print intermediate states. + callbacks: Optional LangChain callback handlers (e.g. RunLogger.callback). """ self.config = config or DEFAULT_CONFIG.copy() self.debug = debug + self.callbacks = callbacks or [] quick_llm = self._create_llm("quick_think") mid_llm = self._create_llm("mid_think") @@ -78,6 +85,9 @@ class ScannerGraph: provider = self.config.get(f"{tier}_llm_provider") or self.config["llm_provider"] backend_url = self.config.get(f"{tier}_backend_url") or self.config.get("backend_url") + if self.callbacks: + kwargs["callbacks"] = self.callbacks + client = create_llm_client( provider=provider, model=model, diff --git a/tradingagents/graph/trading_graph.py b/tradingagents/graph/trading_graph.py index e4f09df4..7119e6ad 100644 --- a/tradingagents/graph/trading_graph.py +++ b/tradingagents/graph/trading_graph.py @@ -320,11 +320,13 @@ class TradingAgentsGraph: } # Save to file - directory = Path(f"eval_results/{self.ticker}/TradingAgentsStrategy_logs/") + from tradingagents.report_paths import get_eval_dir + + directory = get_eval_dir(str(trade_date), self.ticker) directory.mkdir(parents=True, exist_ok=True) with open( - f"eval_results/{self.ticker}/TradingAgentsStrategy_logs/full_states_log_{trade_date}.json", + directory / f"full_states_log_{trade_date}.json", "w", encoding="utf-8", ) as f: diff --git a/tradingagents/observability.py b/tradingagents/observability.py new file mode 100644 index 00000000..92fd7a28 --- /dev/null +++ b/tradingagents/observability.py @@ -0,0 +1,325 @@ +"""Structured observability logging for TradingAgents. + +Emits JSON-lines logs capturing: +- LLM calls (model, agent/node, token counts, latency) +- Tool calls (tool name, args summary, success/failure, latency) +- Data vendor calls (method, vendor, success/failure, fallback chain) +- Report saves + +Usage: + from tradingagents.observability import RunLogger, get_run_logger + + logger = RunLogger() # one per run + # pass logger.callback to LangChain as a callback handler + # pass logger to route_to_vendor / run_tool_loop via context + +All events are written as JSON lines to a file and also to Python's +``logging`` module at DEBUG level for console visibility. +""" + +from __future__ import annotations + +import json +import logging +import threading +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.messages import AIMessage +from langchain_core.outputs import LLMResult + +_py_logger = logging.getLogger("tradingagents.observability") + +# ────────────────────────────────────────────────────────────────────────────── +# Event dataclass — each logged event becomes one JSON line +# ────────────────────────────────────────────────────────────────────────────── + +@dataclass +class _Event: + kind: str # "llm", "tool", "vendor", "report" + ts: float # time.time() + data: dict # kind-specific payload + + def to_dict(self) -> dict: + return {"kind": self.kind, "ts": self.ts, **self.data} + + +# ────────────────────────────────────────────────────────────────────────────── +# RunLogger — accumulates events and writes a JSON-lines log file +# ────────────────────────────────────────────────────────────────────────────── + +class RunLogger: + """Accumulates structured events for a single run (analyze / scan / pipeline). + + Attributes: + callback: A LangChain ``BaseCallbackHandler`` that can be passed to + LLM constructors or graph invocations. + events: Thread-safe list of all recorded events. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self.events: list[_Event] = [] + self.callback = _LLMCallbackHandler(self) + self._start = time.time() + + # -- public helpers to record events from non-callback code ---------------- + + def log_vendor_call( + self, + method: str, + vendor: str, + success: bool, + duration_ms: float, + error: str | None = None, + args_summary: str = "", + ) -> None: + """Record a data-vendor call (called from ``route_to_vendor``).""" + evt = _Event( + kind="vendor", + ts=time.time(), + data={ + "method": method, + "vendor": vendor, + "success": success, + "duration_ms": round(duration_ms, 1), + "error": error, + "args": args_summary, + }, + ) + self._append(evt) + + def log_tool_call( + self, + tool_name: str, + args_summary: str, + success: bool, + duration_ms: float, + error: str | None = None, + ) -> None: + """Record a tool invocation (called from ``run_tool_loop``).""" + evt = _Event( + kind="tool", + ts=time.time(), + data={ + "tool": tool_name, + "args": args_summary, + "success": success, + "duration_ms": round(duration_ms, 1), + "error": error, + }, + ) + self._append(evt) + + def log_report_save(self, path: str) -> None: + """Record that a report file was written.""" + evt = _Event(kind="report", ts=time.time(), data={"path": path}) + self._append(evt) + + # -- summary --------------------------------------------------------------- + + def summary(self) -> dict: + """Return an aggregated summary suitable for ``run_summary.json``.""" + with self._lock: + events = list(self.events) + + llm_events = [e for e in events if e.kind == "llm"] + tool_events = [e for e in events if e.kind == "tool"] + vendor_events = [e for e in events if e.kind == "vendor"] + + total_tokens_in = sum(e.data.get("tokens_in", 0) for e in llm_events) + total_tokens_out = sum(e.data.get("tokens_out", 0) for e in llm_events) + + vendor_ok = sum(1 for e in vendor_events if e.data["success"]) + vendor_fail = sum(1 for e in vendor_events if not e.data["success"]) + + # Group LLM calls by model + model_counts: dict[str, int] = {} + for e in llm_events: + m = e.data.get("model", "unknown") + model_counts[m] = model_counts.get(m, 0) + 1 + + # Group vendor calls by vendor + vendor_counts: dict[str, dict] = {} + for e in vendor_events: + v = e.data["vendor"] + if v not in vendor_counts: + vendor_counts[v] = {"ok": 0, "fail": 0} + if e.data["success"]: + vendor_counts[v]["ok"] += 1 + else: + vendor_counts[v]["fail"] += 1 + + return { + "elapsed_s": round(time.time() - self._start, 1), + "llm_calls": len(llm_events), + "tokens_in": total_tokens_in, + "tokens_out": total_tokens_out, + "tokens_total": total_tokens_in + total_tokens_out, + "models_used": model_counts, + "tool_calls": len(tool_events), + "tool_success": sum(1 for e in tool_events if e.data["success"]), + "tool_fail": sum(1 for e in tool_events if not e.data["success"]), + "vendor_calls": len(vendor_events), + "vendor_success": vendor_ok, + "vendor_fail": vendor_fail, + "vendors_used": vendor_counts, + } + + def write_log(self, path: Path) -> None: + """Write all events as JSON lines + a summary block to *path*.""" + path.parent.mkdir(parents=True, exist_ok=True) + with self._lock: + events = list(self.events) + + lines = [json.dumps(e.to_dict()) for e in events] + lines.append(json.dumps({"kind": "summary", **self.summary()})) + path.write_text("\n".join(lines) + "\n") + _py_logger.info("Run log written to %s", path) + + # -- internals ------------------------------------------------------------- + + def _append(self, evt: _Event) -> None: + with self._lock: + self.events.append(evt) + _py_logger.debug("%s | %s", evt.kind, json.dumps(evt.data)) + + +# ────────────────────────────────────────────────────────────────────────────── +# LangChain callback handler — captures LLM call details +# ────────────────────────────────────────────────────────────────────────────── + +class _LLMCallbackHandler(BaseCallbackHandler): + """LangChain callback that feeds LLM events into a ``RunLogger``.""" + + def __init__(self, run_logger: RunLogger) -> None: + super().__init__() + self._rl = run_logger + self._lock = threading.Lock() + # Track in-flight calls: run_id -> metadata + self._inflight: dict[str, dict] = {} + + # -- chat model start (preferred path for ChatOpenAI / ChatAnthropic) ------ + + def on_chat_model_start( + self, + serialized: Dict[str, Any], + messages: List[List[Any]], + *, + run_id: Any = None, + **kwargs: Any, + ) -> None: + model = _extract_model(serialized, kwargs) + agent = kwargs.get("name") or serialized.get("name") or _extract_graph_node(kwargs) + key = str(run_id) if run_id else str(id(messages)) + with self._lock: + self._inflight[key] = { + "model": model, + "agent": agent or "", + "t0": time.time(), + } + + # -- legacy LLM start (completion-style) ----------------------------------- + + def on_llm_start( + self, + serialized: Dict[str, Any], + prompts: List[str], + *, + run_id: Any = None, + **kwargs: Any, + ) -> None: + model = _extract_model(serialized, kwargs) + agent = kwargs.get("name") or serialized.get("name") or _extract_graph_node(kwargs) + key = str(run_id) if run_id else str(id(prompts)) + with self._lock: + self._inflight[key] = { + "model": model, + "agent": agent or "", + "t0": time.time(), + } + + # -- LLM end --------------------------------------------------------------- + + def on_llm_end(self, response: LLMResult, *, run_id: Any = None, **kwargs: Any) -> None: + key = str(run_id) if run_id else None + with self._lock: + meta = self._inflight.pop(key, None) if key else None + + tokens_in = 0 + tokens_out = 0 + model_from_response = "" + try: + generation = response.generations[0][0] + if hasattr(generation, "message"): + msg = generation.message + if isinstance(msg, AIMessage) and hasattr(msg, "usage_metadata") and msg.usage_metadata: + tokens_in = msg.usage_metadata.get("input_tokens", 0) + tokens_out = msg.usage_metadata.get("output_tokens", 0) + if hasattr(msg, "response_metadata"): + model_from_response = msg.response_metadata.get("model_name", "") or msg.response_metadata.get("model", "") + except (IndexError, TypeError, AttributeError): + pass + + model = model_from_response or (meta["model"] if meta else "unknown") + agent = meta["agent"] if meta else "" + duration_ms = (time.time() - meta["t0"]) * 1000 if meta else 0 + + evt = _Event( + kind="llm", + ts=time.time(), + data={ + "model": model, + "agent": agent, + "tokens_in": tokens_in, + "tokens_out": tokens_out, + "duration_ms": round(duration_ms, 1), + }, + ) + self._rl._append(evt) + + +# ────────────────────────────────────────────────────────────────────────────── +# Helpers +# ────────────────────────────────────────────────────────────────────────────── + +def _extract_model(serialized: dict, kwargs: dict) -> str: + """Best-effort model name from LangChain callback metadata.""" + # kwargs.invocation_params often has the model name + inv = kwargs.get("invocation_params") or {} + model = inv.get("model_name") or inv.get("model") or "" + if model: + return model + # serialized might have it nested + kw = serialized.get("kwargs", {}) + return kw.get("model_name") or kw.get("model") or serialized.get("id", [""])[-1] + + +def _extract_graph_node(kwargs: dict) -> str: + """Try to get the current graph node name from LangGraph metadata.""" + tags = kwargs.get("tags") or [] + for tag in tags: + if isinstance(tag, str) and tag.startswith("graph:node:"): + return tag.split(":", 2)[-1] + metadata = kwargs.get("metadata") or {} + return metadata.get("langgraph_node", "") + + +# ────────────────────────────────────────────────────────────────────────────── +# Thread-local context for passing RunLogger to vendor/tool layers +# ────────────────────────────────────────────────────────────────────────────── + +_current_run_logger: threading.local = threading.local() + + +def set_run_logger(rl: RunLogger | None) -> None: + """Set the active RunLogger for the current thread.""" + _current_run_logger.instance = rl + + +def get_run_logger() -> RunLogger | None: + """Get the active RunLogger (or None if not set).""" + return getattr(_current_run_logger, "instance", None) diff --git a/tradingagents/pipeline/macro_bridge.py b/tradingagents/pipeline/macro_bridge.py index a7bf86a7..38fc5d10 100644 --- a/tradingagents/pipeline/macro_bridge.py +++ b/tradingagents/pipeline/macro_bridge.py @@ -193,7 +193,11 @@ def run_ticker_analysis( try: from tradingagents.graph.trading_graph import TradingAgentsGraph - ta = TradingAgentsGraph(debug=False, config=config) + from tradingagents.observability import get_run_logger + + rl = get_run_logger() + cbs = [rl.callback] if rl else None + ta = TradingAgentsGraph(debug=False, config=config, callbacks=cbs) final_state, decision = ta.propagate(candidate.ticker, analysis_date) result.market_report = final_state.get("market_report", "") diff --git a/tradingagents/report_paths.py b/tradingagents/report_paths.py new file mode 100644 index 00000000..f692dead --- /dev/null +++ b/tradingagents/report_paths.py @@ -0,0 +1,44 @@ +"""Unified report-path helpers. + +Every CLI command and internal save routine should use these helpers so that +all generated artifacts land under a single ``reports/`` tree:: + + reports/ + └── daily/{YYYY-MM-DD}/ + ├── market/ # scan results + │ ├── geopolitical_report.md + │ └── ... + ├── {TICKER}/ # per-ticker analysis / pipeline + │ ├── 1_analysts/ + │ ├── ... + │ ├── complete_report.md + │ └── eval/ + │ └── full_states_log.json + └── summary.md # pipeline combined summary +""" + +from __future__ import annotations + +from pathlib import Path + +REPORTS_ROOT = Path("reports") + + +def get_daily_dir(date: str) -> Path: + """``reports/daily/{date}/``""" + return REPORTS_ROOT / "daily" / date + + +def get_market_dir(date: str) -> Path: + """``reports/daily/{date}/market/``""" + return get_daily_dir(date) / "market" + + +def get_ticker_dir(date: str, ticker: str) -> Path: + """``reports/daily/{date}/{TICKER}/``""" + return get_daily_dir(date) / ticker.upper() + + +def get_eval_dir(date: str, ticker: str) -> Path: + """``reports/daily/{date}/{TICKER}/eval/``""" + return get_ticker_dir(date, ticker) / "eval"