diff --git a/orchestrator/profile_stage_chain.py b/orchestrator/profile_stage_chain.py index 1856c20d..41e2f5ac 100644 --- a/orchestrator/profile_stage_chain.py +++ b/orchestrator/profile_stage_chain.py @@ -8,6 +8,7 @@ from collections import defaultdict from datetime import datetime, timezone from pathlib import Path +from orchestrator.profile_trace_utils import TRACE_KIND, TRACE_SCHEMA_VERSION, build_trace_summary from tradingagents.graph.propagation import Propagator from tradingagents.graph.trading_graph import TradingAgentsGraph @@ -84,6 +85,48 @@ def _extract_research_state(event: dict) -> tuple[str | None, str | None, int | ) +<<<<<<< HEAD +======= + +def build_trace_payload( + *, + status: str, + run_id: str, + ticker: str, + date: str, + selected_analysts: list[str], + analysis_prompt_style: str, + variant_label: str, + node_timings: list[dict], + phase_totals: dict[str, float], + dump_path: Path, + raw_events: list[dict], + error: str | None = None, + exception_type: str | None = None, +) -> dict: + payload = { + "trace_schema_version": TRACE_SCHEMA_VERSION, + "trace_kind": TRACE_KIND, + "run_id": run_id, + "status": status, + "ticker": ticker, + "date": date, + "variant_label": variant_label, + "selected_analysts": selected_analysts, + "analysis_prompt_style": analysis_prompt_style, + "node_timings": node_timings, + "summary": build_trace_summary(node_timings, phase_totals), + "dump_path": str(dump_path), + "raw_events": raw_events, + } + if error is not None: + payload["error"] = error + if exception_type is not None: + payload["exception_type"] = exception_type + return payload + + +>>>>>>> 82e61cb (omx(team): auto-checkpoint worker-4 [unknown]) def main() -> None: args = build_parser().parse_args() selected_analysts = [item.strip() for item in args.selected_analysts.split(",") if item.strip()] diff --git a/orchestrator/profile_trace_utils.py b/orchestrator/profile_trace_utils.py new file mode 100644 index 00000000..b6772e05 --- /dev/null +++ b/orchestrator/profile_trace_utils.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from collections import Counter + +TRACE_SCHEMA_VERSION = "tradingagents.profile_trace.v1alpha1" +TRACE_KIND = "tradingagents_stage_profile" + + +def build_trace_summary(node_timings: list[dict], phase_totals: dict[str, float]) -> dict: + phase_totals_seconds = {key: round(value, 3) for key, value in phase_totals.items()} + degraded_events = [entry for entry in node_timings if entry.get("research_status") not in (None, "full")] + node_counter = Counter(node for entry in node_timings for node in entry.get("nodes", [])) + total_elapsed_ms = sum(int(entry.get("elapsed_ms", 0)) for entry in node_timings) + return { + "event_count": len(node_timings), + "total_elapsed_ms": total_elapsed_ms, + "phase_totals_seconds": phase_totals_seconds, + "degraded_event_count": len(degraded_events), + "final_research_status": node_timings[-1].get("research_status") if node_timings else None, + "final_degraded_reason": node_timings[-1].get("degraded_reason") if node_timings else None, + "unique_nodes": sorted(node_counter.keys()), + "node_hit_count": dict(sorted(node_counter.items())), + }