From 0111876264152316d335e033f4511ddc29a24172 Mon Sep 17 00:00:00 2001 From: JackChen <26346076+JackChen-me@users.noreply.github.com> Date: Fri, 3 Apr 2026 15:28:59 +0800 Subject: [PATCH] feat: add onTrace observability callback (#18) Add lightweight onTrace callback to OrchestratorConfig that emits structured span events (llm_call, tool_call, task, agent) with timing, token usage, and runId correlation. Zero overhead when not subscribed. Closes #18 --- README.md | 2 + README_zh.md | 2 + examples/11-trace-observability.ts | 133 +++++++++ src/agent/agent.ts | 60 +++- src/agent/pool.ts | 11 +- src/agent/runner.ts | 43 ++- src/index.ts | 11 + src/orchestrator/orchestrator.ts | 56 +++- src/types.ts | 62 +++- src/utils/trace.ts | 34 +++ tests/trace.test.ts | 453 +++++++++++++++++++++++++++++ 11 files changed, 848 insertions(+), 19 deletions(-) create mode 100644 examples/11-trace-observability.ts create mode 100644 src/utils/trace.ts create mode 100644 tests/trace.test.ts diff --git a/README.md b/README.md index df5a920..7c36d83 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Build AI agent teams that decompose goals into tasks automatically. Define agent - **Model Agnostic** — Claude, GPT, Gemma 4, and local models (Ollama, vLLM, LM Studio) in the same team. Swap models per agent via `baseURL`. - **Structured Output** — Add `outputSchema` (Zod) to any agent. Output is parsed as JSON, validated, and auto-retried once on failure. Access typed results via `result.structured`. - **Task Retry** — Set `maxRetries` on tasks for automatic retry with exponential backoff. Failed attempts accumulate token usage for accurate billing. +- **Observability** — Optional `onTrace` callback emits structured spans for every LLM call, tool execution, task, and agent run — with timing, token usage, and a shared `runId` for correlation. Zero overhead when not subscribed, zero extra dependencies. - **In-Process Execution** — No subprocess overhead. Everything runs in one Node.js process. Deploy to serverless, Docker, CI/CD. ## Quick Start @@ -120,6 +121,7 @@ npx tsx examples/01-single-agent.ts | [08 — Gemma 4 Local](examples/08-gemma4-local.ts) | `runTasks()` + `runTeam()` with local Gemma 4 via Ollama — zero API cost | | [09 — Structured Output](examples/09-structured-output.ts) | `outputSchema` (Zod) on AgentConfig — validated JSON via `result.structured` | | [10 — Task Retry](examples/10-task-retry.ts) | `maxRetries` / `retryDelayMs` / `retryBackoff` with `task_retry` progress events | +| [11 — Trace Observability](examples/11-trace-observability.ts) | `onTrace` callback — structured spans for LLM calls, tools, tasks, and agents | ## Architecture diff --git a/README_zh.md b/README_zh.md index c9f7ac9..458d6de 100644 --- a/README_zh.md +++ b/README_zh.md @@ -18,6 +18,7 @@ - **模型无关** — Claude、GPT、Gemma 4 和本地模型(Ollama、vLLM、LM Studio)可以在同一个团队中使用。通过 `baseURL` 即可接入任何 OpenAI 兼容服务。 - **结构化输出** — 为任意智能体添加 `outputSchema`(Zod),输出自动解析为 JSON 并校验,校验失败自动重试一次。通过 `result.structured` 获取类型化结果。 - **任务重试** — 为任务设置 `maxRetries`,失败时自动指数退避重试。所有尝试的 token 用量累计,确保计费准确。 +- **可观测性** — 可选的 `onTrace` 回调为每次 LLM 调用、工具执行、任务和智能体运行发出结构化 span 事件——包含耗时、token 用量和共享的 `runId` 用于关联追踪。未订阅时零开销,零额外依赖。 - **进程内执行** — 没有子进程开销。所有内容在一个 Node.js 进程中运行。可部署到 Serverless、Docker、CI/CD。 ## 快速开始 @@ -120,6 +121,7 @@ npx tsx examples/01-single-agent.ts | [08 — Gemma 4 本地](examples/08-gemma4-local.ts) | `runTasks()` + `runTeam()` 本地 Gemma 4 via Ollama — 零 API 费用 | | [09 — 结构化输出](examples/09-structured-output.ts) | `outputSchema`(Zod)— 校验 JSON 输出,通过 `result.structured` 获取 | | [10 — 任务重试](examples/10-task-retry.ts) | `maxRetries` / `retryDelayMs` / `retryBackoff` + `task_retry` 进度事件 | +| [11 — 可观测性](examples/11-trace-observability.ts) | `onTrace` 回调 — LLM 调用、工具、任务、智能体的结构化 span 事件 | ## 架构 diff --git a/examples/11-trace-observability.ts b/examples/11-trace-observability.ts new file mode 100644 index 0000000..20b463e --- /dev/null +++ b/examples/11-trace-observability.ts @@ -0,0 +1,133 @@ +/** + * Example 11 — Trace Observability + * + * Demonstrates the `onTrace` callback for lightweight observability. Every LLM + * call, tool execution, task lifecycle, and agent run emits a structured trace + * event with timing data and token usage — giving you full visibility into + * what's happening inside a multi-agent run. + * + * Trace events share a `runId` for correlation, so you can reconstruct the + * full execution timeline. Pipe them into your own logging, OpenTelemetry, or + * dashboard. + * + * Run: + * npx tsx examples/11-trace-observability.ts + * + * Prerequisites: + * ANTHROPIC_API_KEY env var must be set. + */ + +import { OpenMultiAgent } from '../src/index.js' +import type { AgentConfig, TraceEvent } from '../src/types.js' + +// --------------------------------------------------------------------------- +// Agents +// --------------------------------------------------------------------------- + +const researcher: AgentConfig = { + name: 'researcher', + model: 'claude-sonnet-4-6', + systemPrompt: 'You are a research assistant. Provide concise, factual answers.', + maxTurns: 2, +} + +const writer: AgentConfig = { + name: 'writer', + model: 'claude-sonnet-4-6', + systemPrompt: 'You are a technical writer. Summarize research into clear prose.', + maxTurns: 2, +} + +// --------------------------------------------------------------------------- +// Trace handler — log every span with timing +// --------------------------------------------------------------------------- + +function handleTrace(event: TraceEvent): void { + const dur = `${event.durationMs}ms`.padStart(7) + + switch (event.type) { + case 'llm_call': + console.log( + ` [LLM] ${dur} agent=${event.agent} model=${event.model} turn=${event.turn}` + + ` tokens=${event.tokens.input_tokens}in/${event.tokens.output_tokens}out`, + ) + break + case 'tool_call': + console.log( + ` [TOOL] ${dur} agent=${event.agent} tool=${event.tool}` + + ` error=${event.isError}`, + ) + break + case 'task': + console.log( + ` [TASK] ${dur} task="${event.taskTitle}" agent=${event.agent}` + + ` success=${event.success} retries=${event.retries}`, + ) + break + case 'agent': + console.log( + ` [AGENT] ${dur} agent=${event.agent} turns=${event.turns}` + + ` tools=${event.toolCalls} tokens=${event.tokens.input_tokens}in/${event.tokens.output_tokens}out`, + ) + break + } +} + +// --------------------------------------------------------------------------- +// Orchestrator + team +// --------------------------------------------------------------------------- + +const orchestrator = new OpenMultiAgent({ + defaultModel: 'claude-sonnet-4-6', + onTrace: handleTrace, +}) + +const team = orchestrator.createTeam('trace-demo', { + name: 'trace-demo', + agents: [researcher, writer], + sharedMemory: true, +}) + +// --------------------------------------------------------------------------- +// Tasks — researcher first, then writer summarizes +// --------------------------------------------------------------------------- + +const tasks = [ + { + title: 'Research topic', + description: 'List 5 key benefits of TypeScript for large codebases. Be concise.', + assignee: 'researcher', + }, + { + title: 'Write summary', + description: 'Read the research from shared memory and write a 3-sentence summary.', + assignee: 'writer', + dependsOn: ['Research topic'], + }, +] + +// --------------------------------------------------------------------------- +// Run +// --------------------------------------------------------------------------- + +console.log('Trace Observability Example') +console.log('='.repeat(60)) +console.log('Pipeline: research → write (with full trace output)') +console.log('='.repeat(60)) +console.log() + +const result = await orchestrator.runTasks(team, tasks) + +// --------------------------------------------------------------------------- +// Summary +// --------------------------------------------------------------------------- + +console.log('\n' + '='.repeat(60)) +console.log(`Overall success: ${result.success}`) +console.log(`Tokens — input: ${result.totalTokenUsage.input_tokens}, output: ${result.totalTokenUsage.output_tokens}`) + +for (const [name, r] of result.agentResults) { + const icon = r.success ? 'OK ' : 'FAIL' + console.log(` [${icon}] ${name}`) + console.log(` ${r.output.slice(0, 200)}`) +} diff --git a/src/agent/agent.ts b/src/agent/agent.ts index df6b7df..58a1df3 100644 --- a/src/agent/agent.ts +++ b/src/agent/agent.ts @@ -32,6 +32,7 @@ import type { TokenUsage, ToolUseContext, } from '../types.js' +import { emitTrace, generateRunId } from '../utils/trace.js' import type { ToolDefinition as FrameworkToolDefinition, ToolRegistry } from '../tool/framework.js' import type { ToolExecutor } from '../tool/executor.js' import { createAdapter } from '../llm/adapter.js' @@ -158,12 +159,12 @@ export class Agent { * * Use this for one-shot queries where past context is irrelevant. */ - async run(prompt: string): Promise { + async run(prompt: string, runOptions?: Partial): Promise { const messages: LLMMessage[] = [ { role: 'user', content: [{ type: 'text', text: prompt }] }, ] - return this.executeRun(messages) + return this.executeRun(messages, runOptions) } /** @@ -174,6 +175,7 @@ export class Agent { * * Use this for multi-turn interactions. */ + // TODO(#18): accept optional RunOptions to forward trace context async prompt(message: string): Promise { const userMessage: LLMMessage = { role: 'user', @@ -197,6 +199,7 @@ export class Agent { * * Like {@link run}, this does not use or update the persistent history. */ + // TODO(#18): accept optional RunOptions to forward trace context async *stream(prompt: string): AsyncGenerator { const messages: LLMMessage[] = [ { role: 'user', content: [{ type: 'text', text: prompt }] }, @@ -266,15 +269,26 @@ export class Agent { * Shared execution path used by both `run` and `prompt`. * Handles state transitions and error wrapping. */ - private async executeRun(messages: LLMMessage[]): Promise { + private async executeRun( + messages: LLMMessage[], + callerOptions?: Partial, + ): Promise { this.transitionTo('running') + const agentStartMs = Date.now() + try { const runner = await this.getRunner() + const internalOnMessage = (msg: LLMMessage) => { + this.state.messages.push(msg) + callerOptions?.onMessage?.(msg) + } + // Auto-generate runId when onTrace is provided but runId is missing + const needsRunId = callerOptions?.onTrace && !callerOptions.runId const runOptions: RunOptions = { - onMessage: msg => { - this.state.messages.push(msg) - }, + ...callerOptions, + onMessage: internalOnMessage, + ...(needsRunId ? { runId: generateRunId() } : undefined), } const result = await runner.run(messages, runOptions) @@ -282,21 +296,25 @@ export class Agent { // --- Structured output validation --- if (this.config.outputSchema) { - return this.validateStructuredOutput( + const validated = await this.validateStructuredOutput( messages, result, runner, runOptions, ) + this.emitAgentTrace(callerOptions, agentStartMs, validated) + return validated } this.transitionTo('completed') - return this.toAgentRunResult(result, true) + const agentResult = this.toAgentRunResult(result, true) + this.emitAgentTrace(callerOptions, agentStartMs, agentResult) + return agentResult } catch (err) { const error = err instanceof Error ? err : new Error(String(err)) this.transitionToError(error) - return { + const errorResult: AgentRunResult = { success: false, output: error.message, messages: [], @@ -304,9 +322,33 @@ export class Agent { toolCalls: [], structured: undefined, } + this.emitAgentTrace(callerOptions, agentStartMs, errorResult) + return errorResult } } + /** Emit an `agent` trace event if `onTrace` is provided. */ + private emitAgentTrace( + options: Partial | undefined, + startMs: number, + result: AgentRunResult, + ): void { + if (!options?.onTrace) return + const endMs = Date.now() + emitTrace(options.onTrace, { + type: 'agent', + runId: options.runId ?? '', + taskId: options.taskId, + agent: options.traceAgent ?? this.name, + turns: result.messages.filter(m => m.role === 'assistant').length, + tokens: result.tokenUsage, + toolCalls: result.toolCalls.length, + startMs, + endMs, + durationMs: endMs - startMs, + }) + } + /** * Validate agent output against the configured `outputSchema`. * On first validation failure, retry once with error feedback. diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 915f361..aba0eb8 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -21,6 +21,7 @@ */ import type { AgentRunResult } from '../types.js' +import type { RunOptions } from './runner.js' import type { Agent } from './agent.js' import { Semaphore } from '../utils/semaphore.js' @@ -123,12 +124,16 @@ export class AgentPool { * * @throws {Error} If the agent name is not found. */ - async run(agentName: string, prompt: string): Promise { + async run( + agentName: string, + prompt: string, + runOptions?: Partial, + ): Promise { const agent = this.requireAgent(agentName) await this.semaphore.acquire() try { - return await agent.run(prompt) + return await agent.run(prompt, runOptions) } finally { this.semaphore.release() } @@ -144,6 +149,7 @@ export class AgentPool { * * @param tasks - Array of `{ agent, prompt }` descriptors. */ + // TODO(#18): accept RunOptions per task to forward trace context async runParallel( tasks: ReadonlyArray<{ readonly agent: string; readonly prompt: string }>, ): Promise> { @@ -182,6 +188,7 @@ export class AgentPool { * * @throws {Error} If the pool is empty. */ + // TODO(#18): accept RunOptions to forward trace context async runAny(prompt: string): Promise { const allAgents = this.list() if (allAgents.length === 0) { diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 13667db..113f93c 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -25,7 +25,9 @@ import type { ToolUseContext, LLMAdapter, LLMChatOptions, + TraceEvent, } from '../types.js' +import { emitTrace } from '../utils/trace.js' import type { ToolRegistry } from '../tool/framework.js' import type { ToolExecutor } from '../tool/executor.js' @@ -76,6 +78,14 @@ export interface RunOptions { readonly onToolResult?: (name: string, result: ToolResult) => void /** Fired after each complete {@link LLMMessage} is appended. */ readonly onMessage?: (message: LLMMessage) => void + /** Trace callback for observability spans. Async callbacks are safe. */ + readonly onTrace?: (event: TraceEvent) => void | Promise + /** Run ID for trace correlation. */ + readonly runId?: string + /** Task ID for trace correlation. */ + readonly taskId?: string + /** Agent name for trace correlation (overrides RunnerOptions.agentName). */ + readonly traceAgent?: string } /** The aggregated result returned when a full run completes. */ @@ -254,7 +264,23 @@ export class AgentRunner { // ------------------------------------------------------------------ // Step 1: Call the LLM and collect the full response for this turn. // ------------------------------------------------------------------ + const llmStartMs = Date.now() const response = await this.adapter.chat(conversationMessages, baseChatOptions) + if (options.onTrace) { + const llmEndMs = Date.now() + emitTrace(options.onTrace, { + type: 'llm_call', + runId: options.runId ?? '', + taskId: options.taskId, + agent: options.traceAgent ?? this.options.agentName ?? 'unknown', + model: this.options.model, + turn: turns, + tokens: response.usage, + startMs: llmStartMs, + endMs: llmEndMs, + durationMs: llmEndMs - llmStartMs, + }) + } totalUsage = addTokenUsage(totalUsage, response.usage) @@ -319,10 +345,25 @@ export class AgentRunner { result = { data: message, isError: true } } - const duration = Date.now() - startTime + const endTime = Date.now() + const duration = endTime - startTime options.onToolResult?.(block.name, result) + if (options.onTrace) { + emitTrace(options.onTrace, { + type: 'tool_call', + runId: options.runId ?? '', + taskId: options.taskId, + agent: options.traceAgent ?? this.options.agentName ?? 'unknown', + tool: block.name, + isError: result.isError ?? false, + startMs: startTime, + endMs: endTime, + durationMs: duration, + }) + } + const record: ToolCallRecord = { toolName: block.name, input: block.input, diff --git a/src/index.ts b/src/index.ts index f624707..312f852 100644 --- a/src/index.ts +++ b/src/index.ts @@ -161,7 +161,18 @@ export type { OrchestratorConfig, OrchestratorEvent, + // Trace + TraceEventType, + TraceEventBase, + TraceEvent, + LLMCallTrace, + ToolCallTrace, + TaskTrace, + AgentTrace, + // Memory MemoryEntry, MemoryStore, } from './types.js' + +export { generateRunId } from './utils/trace.js' diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 848bfde..86f16c0 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -52,8 +52,10 @@ import type { TeamRunResult, TokenUsage, } from '../types.js' +import type { RunOptions } from '../agent/runner.js' import { Agent } from '../agent/agent.js' import { AgentPool } from '../agent/pool.js' +import { emitTrace, generateRunId } from '../utils/trace.js' import { ToolRegistry } from '../tool/framework.js' import { ToolExecutor } from '../tool/executor.js' import { registerBuiltInTools } from '../tool/built-in/index.js' @@ -260,6 +262,8 @@ interface RunContext { readonly scheduler: Scheduler readonly agentResults: Map readonly config: OrchestratorConfig + /** Trace run ID, present when `onTrace` is configured. */ + readonly runId?: string } /** @@ -338,10 +342,19 @@ async function executeQueue( // Build the prompt: inject shared memory context + task description const prompt = await buildTaskPrompt(task, team) + // Build trace context for this task's agent run + const traceOptions: Partial | undefined = config.onTrace + ? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee } + : undefined + + const taskStartMs = config.onTrace ? Date.now() : 0 + let retryCount = 0 + const result = await executeWithRetry( - () => pool.run(assignee, prompt), + () => pool.run(assignee, prompt, traceOptions), task, (retryData) => { + retryCount++ config.onProgress?.({ type: 'task_retry', task: task.id, @@ -351,6 +364,23 @@ async function executeQueue( }, ) + // Emit task trace + if (config.onTrace) { + const taskEndMs = Date.now() + emitTrace(config.onTrace, { + type: 'task', + runId: ctx.runId ?? '', + taskId: task.id, + taskTitle: task.title, + agent: assignee, + success: result.success, + retries: retryCount, + startMs: taskStartMs, + endMs: taskEndMs, + durationMs: taskEndMs - taskStartMs, + }) + } + ctx.agentResults.set(`${assignee}:${task.id}`, result) if (result.success) { @@ -441,8 +471,8 @@ async function buildTaskPrompt(task: Task, team: Team): Promise { */ export class OpenMultiAgent { private readonly config: Required< - Omit - > & Pick + Omit + > & Pick private readonly teams: Map = new Map() private completedTaskCount = 0 @@ -463,6 +493,7 @@ export class OpenMultiAgent { defaultBaseURL: config.defaultBaseURL, defaultApiKey: config.defaultApiKey, onProgress: config.onProgress, + onTrace: config.onTrace, } } @@ -520,7 +551,11 @@ export class OpenMultiAgent { data: { prompt }, }) - const result = await agent.run(prompt) + const traceOptions: Partial | undefined = this.config.onTrace + ? { onTrace: this.config.onTrace, runId: generateRunId(), traceAgent: config.name } + : undefined + + const result = await agent.run(prompt, traceOptions) this.config.onProgress?.({ type: 'agent_complete', @@ -578,6 +613,7 @@ export class OpenMultiAgent { const decompositionPrompt = this.buildDecompositionPrompt(goal, agentConfigs) const coordinatorAgent = buildAgent(coordinatorConfig) + const runId = this.config.onTrace ? generateRunId() : undefined this.config.onProgress?.({ type: 'agent_start', @@ -585,7 +621,10 @@ export class OpenMultiAgent { data: { phase: 'decomposition', goal }, }) - const decompositionResult = await coordinatorAgent.run(decompositionPrompt) + const decompTraceOptions: Partial | undefined = this.config.onTrace + ? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' } + : undefined + const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions) const agentResults = new Map() agentResults.set('coordinator:decompose', decompositionResult) @@ -629,6 +668,7 @@ export class OpenMultiAgent { scheduler, agentResults, config: this.config, + runId, } await executeQueue(queue, ctx) @@ -637,7 +677,10 @@ export class OpenMultiAgent { // Step 5: Coordinator synthesises final result // ------------------------------------------------------------------ const synthesisPrompt = await this.buildSynthesisPrompt(goal, queue.list(), team) - const synthesisResult = await coordinatorAgent.run(synthesisPrompt) + const synthTraceOptions: Partial | undefined = this.config.onTrace + ? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' } + : undefined + const synthesisResult = await coordinatorAgent.run(synthesisPrompt, synthTraceOptions) agentResults.set('coordinator', synthesisResult) this.config.onProgress?.({ @@ -707,6 +750,7 @@ export class OpenMultiAgent { scheduler, agentResults, config: this.config, + runId: this.config.onTrace ? generateRunId() : undefined, } await executeQueue(queue, ctx) diff --git a/src/types.ts b/src/types.ts index bd2ce64..418d54e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -315,9 +315,69 @@ export interface OrchestratorConfig { readonly defaultProvider?: 'anthropic' | 'copilot' | 'openai' readonly defaultBaseURL?: string readonly defaultApiKey?: string - onProgress?: (event: OrchestratorEvent) => void + readonly onProgress?: (event: OrchestratorEvent) => void + readonly onTrace?: (event: TraceEvent) => void | Promise } +// --------------------------------------------------------------------------- +// Trace events — lightweight observability spans +// --------------------------------------------------------------------------- + +/** Trace event type discriminants. */ +export type TraceEventType = 'llm_call' | 'tool_call' | 'task' | 'agent' + +/** Shared fields present on every trace event. */ +export interface TraceEventBase { + /** Unique identifier for the entire run (runTeam / runTasks / runAgent call). */ + readonly runId: string + readonly type: TraceEventType + /** Unix epoch ms when the span started. */ + readonly startMs: number + /** Unix epoch ms when the span ended. */ + readonly endMs: number + /** Wall-clock duration in milliseconds (`endMs - startMs`). */ + readonly durationMs: number + /** Agent name associated with this span. */ + readonly agent: string + /** Task ID associated with this span. */ + readonly taskId?: string +} + +/** Emitted for each LLM API call (one per agent turn). */ +export interface LLMCallTrace extends TraceEventBase { + readonly type: 'llm_call' + readonly model: string + readonly turn: number + readonly tokens: TokenUsage +} + +/** Emitted for each tool execution. */ +export interface ToolCallTrace extends TraceEventBase { + readonly type: 'tool_call' + readonly tool: string + readonly isError: boolean +} + +/** Emitted when a task completes (wraps the full retry sequence). */ +export interface TaskTrace extends TraceEventBase { + readonly type: 'task' + readonly taskId: string + readonly taskTitle: string + readonly success: boolean + readonly retries: number +} + +/** Emitted when an agent run completes (wraps the full conversation loop). */ +export interface AgentTrace extends TraceEventBase { + readonly type: 'agent' + readonly turns: number + readonly tokens: TokenUsage + readonly toolCalls: number +} + +/** Discriminated union of all trace event types. */ +export type TraceEvent = LLMCallTrace | ToolCallTrace | TaskTrace | AgentTrace + // --------------------------------------------------------------------------- // Memory // --------------------------------------------------------------------------- diff --git a/src/utils/trace.ts b/src/utils/trace.ts new file mode 100644 index 0000000..4f01f5f --- /dev/null +++ b/src/utils/trace.ts @@ -0,0 +1,34 @@ +/** + * @fileoverview Trace emission utilities for the observability layer. + */ + +import { randomUUID } from 'node:crypto' +import type { TraceEvent } from '../types.js' + +/** + * Safely emit a trace event. Swallows callback errors so a broken + * subscriber never crashes agent execution. + */ +export function emitTrace( + fn: ((event: TraceEvent) => void | Promise) | undefined, + event: TraceEvent, +): void { + if (!fn) return + try { + // Guard async callbacks: if fn returns a Promise, swallow its rejection + // so an async onTrace never produces an unhandled promise rejection. + const result = fn(event) as unknown + if (result && typeof (result as Promise).catch === 'function') { + ;(result as Promise).catch(noop) + } + } catch { + // Intentionally swallowed — observability must never break execution. + } +} + +function noop() {} + +/** Generate a unique run ID for trace correlation. */ +export function generateRunId(): string { + return randomUUID() +} diff --git a/tests/trace.test.ts b/tests/trace.test.ts new file mode 100644 index 0000000..fbeb78c --- /dev/null +++ b/tests/trace.test.ts @@ -0,0 +1,453 @@ +import { describe, it, expect, vi } from 'vitest' +import { z } from 'zod' +import { Agent } from '../src/agent/agent.js' +import { AgentRunner, type RunOptions } from '../src/agent/runner.js' +import { ToolRegistry, defineTool } from '../src/tool/framework.js' +import { ToolExecutor } from '../src/tool/executor.js' +import { executeWithRetry } from '../src/orchestrator/orchestrator.js' +import { emitTrace, generateRunId } from '../src/utils/trace.js' +import { createTask } from '../src/task/task.js' +import type { + AgentConfig, + AgentRunResult, + LLMAdapter, + LLMResponse, + TraceEvent, +} from '../src/types.js' + +// --------------------------------------------------------------------------- +// Mock adapters +// --------------------------------------------------------------------------- + +function mockAdapter(responses: LLMResponse[]): LLMAdapter { + let callIndex = 0 + return { + name: 'mock', + async chat() { + return responses[callIndex++]! + }, + async *stream() { + /* unused */ + }, + } +} + +function textResponse(text: string): LLMResponse { + return { + id: `resp-${Math.random().toString(36).slice(2)}`, + content: [{ type: 'text' as const, text }], + model: 'mock-model', + stop_reason: 'end_turn', + usage: { input_tokens: 10, output_tokens: 20 }, + } +} + +function toolUseResponse(toolName: string, input: Record): LLMResponse { + return { + id: `resp-${Math.random().toString(36).slice(2)}`, + content: [ + { + type: 'tool_use' as const, + id: `tu-${Math.random().toString(36).slice(2)}`, + name: toolName, + input, + }, + ], + model: 'mock-model', + stop_reason: 'tool_use', + usage: { input_tokens: 15, output_tokens: 25 }, + } +} + +function buildMockAgent( + config: AgentConfig, + responses: LLMResponse[], + registry?: ToolRegistry, + executor?: ToolExecutor, +): Agent { + const reg = registry ?? new ToolRegistry() + const exec = executor ?? new ToolExecutor(reg) + const adapter = mockAdapter(responses) + const agent = new Agent(config, reg, exec) + + const runner = new AgentRunner(adapter, reg, exec, { + model: config.model, + systemPrompt: config.systemPrompt, + maxTurns: config.maxTurns, + maxTokens: config.maxTokens, + temperature: config.temperature, + agentName: config.name, + }) + ;(agent as any).runner = runner + + return agent +} + +// --------------------------------------------------------------------------- +// emitTrace helper +// --------------------------------------------------------------------------- + +describe('emitTrace', () => { + it('does nothing when fn is undefined', () => { + // Should not throw + emitTrace(undefined, { + type: 'agent', + runId: 'r1', + agent: 'a', + turns: 1, + tokens: { input_tokens: 0, output_tokens: 0 }, + toolCalls: 0, + startMs: 0, + endMs: 0, + durationMs: 0, + }) + }) + + it('calls fn with the event', () => { + const fn = vi.fn() + const event: TraceEvent = { + type: 'agent', + runId: 'r1', + agent: 'a', + turns: 1, + tokens: { input_tokens: 0, output_tokens: 0 }, + toolCalls: 0, + startMs: 0, + endMs: 0, + durationMs: 0, + } + emitTrace(fn, event) + expect(fn).toHaveBeenCalledWith(event) + }) + + it('swallows errors thrown by callback', () => { + const fn = () => { throw new Error('boom') } + expect(() => + emitTrace(fn, { + type: 'agent', + runId: 'r1', + agent: 'a', + turns: 1, + tokens: { input_tokens: 0, output_tokens: 0 }, + toolCalls: 0, + startMs: 0, + endMs: 0, + durationMs: 0, + }), + ).not.toThrow() + }) + + it('swallows rejected promises from async callbacks', async () => { + // An async onTrace that rejects should not produce unhandled rejection + const fn = async () => { throw new Error('async boom') } + emitTrace(fn as unknown as (event: TraceEvent) => void, { + type: 'agent', + runId: 'r1', + agent: 'a', + turns: 1, + tokens: { input_tokens: 0, output_tokens: 0 }, + toolCalls: 0, + startMs: 0, + endMs: 0, + durationMs: 0, + }) + // If the rejection is not caught, vitest will fail with unhandled rejection. + // Give the microtask queue a tick to surface any unhandled rejection. + await new Promise(resolve => setTimeout(resolve, 10)) + }) +}) + +describe('generateRunId', () => { + it('returns a UUID string', () => { + const id = generateRunId() + expect(id).toMatch(/^[0-9a-f-]{36}$/) + }) + + it('returns unique IDs', () => { + const ids = new Set(Array.from({ length: 100 }, generateRunId)) + expect(ids.size).toBe(100) + }) +}) + +// --------------------------------------------------------------------------- +// AgentRunner trace events +// --------------------------------------------------------------------------- + +describe('AgentRunner trace events', () => { + it('emits llm_call trace for each LLM turn', async () => { + const traces: TraceEvent[] = [] + const registry = new ToolRegistry() + const executor = new ToolExecutor(registry) + const adapter = mockAdapter([textResponse('Hello!')]) + + const runner = new AgentRunner(adapter, registry, executor, { + model: 'test-model', + agentName: 'test-agent', + }) + + const runOptions: RunOptions = { + onTrace: (e) => traces.push(e), + runId: 'run-1', + traceAgent: 'test-agent', + } + + await runner.run( + [{ role: 'user', content: [{ type: 'text', text: 'hi' }] }], + runOptions, + ) + + const llmTraces = traces.filter(t => t.type === 'llm_call') + expect(llmTraces).toHaveLength(1) + + const llm = llmTraces[0]! + expect(llm.type).toBe('llm_call') + expect(llm.runId).toBe('run-1') + expect(llm.agent).toBe('test-agent') + expect(llm.model).toBe('test-model') + expect(llm.turn).toBe(1) + expect(llm.tokens).toEqual({ input_tokens: 10, output_tokens: 20 }) + expect(llm.durationMs).toBeGreaterThanOrEqual(0) + expect(llm.startMs).toBeLessThanOrEqual(llm.endMs) + }) + + it('emits tool_call trace with correct fields', async () => { + const traces: TraceEvent[] = [] + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'echo', + description: 'echoes', + inputSchema: z.object({ msg: z.string() }), + execute: async ({ msg }) => ({ data: msg }), + }), + ) + const executor = new ToolExecutor(registry) + const adapter = mockAdapter([ + toolUseResponse('echo', { msg: 'hello' }), + textResponse('Done'), + ]) + + const runner = new AgentRunner(adapter, registry, executor, { + model: 'test-model', + agentName: 'tooler', + }) + + await runner.run( + [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + { onTrace: (e) => traces.push(e), runId: 'run-2', traceAgent: 'tooler' }, + ) + + const toolTraces = traces.filter(t => t.type === 'tool_call') + expect(toolTraces).toHaveLength(1) + + const tool = toolTraces[0]! + expect(tool.type).toBe('tool_call') + expect(tool.runId).toBe('run-2') + expect(tool.agent).toBe('tooler') + expect(tool.tool).toBe('echo') + expect(tool.isError).toBe(false) + expect(tool.durationMs).toBeGreaterThanOrEqual(0) + }) + + it('tool_call trace has isError: true on tool failure', async () => { + const traces: TraceEvent[] = [] + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'boom', + description: 'fails', + inputSchema: z.object({}), + execute: async () => { throw new Error('fail') }, + }), + ) + const executor = new ToolExecutor(registry) + const adapter = mockAdapter([ + toolUseResponse('boom', {}), + textResponse('Handled'), + ]) + + const runner = new AgentRunner(adapter, registry, executor, { + model: 'test-model', + agentName: 'err-agent', + }) + + await runner.run( + [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + { onTrace: (e) => traces.push(e), runId: 'run-3', traceAgent: 'err-agent' }, + ) + + const toolTraces = traces.filter(t => t.type === 'tool_call') + expect(toolTraces).toHaveLength(1) + expect(toolTraces[0]!.isError).toBe(true) + }) + + it('does not call Date.now for LLM timing when onTrace is absent', async () => { + // This test just verifies no errors occur when onTrace is not provided + const registry = new ToolRegistry() + const executor = new ToolExecutor(registry) + const adapter = mockAdapter([textResponse('hi')]) + + const runner = new AgentRunner(adapter, registry, executor, { + model: 'test-model', + }) + + const result = await runner.run( + [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + {}, + ) + + expect(result.output).toBe('hi') + }) +}) + +// --------------------------------------------------------------------------- +// Agent-level trace events +// --------------------------------------------------------------------------- + +describe('Agent trace events', () => { + it('emits agent trace with turns, tokens, and toolCalls', async () => { + const traces: TraceEvent[] = [] + const config: AgentConfig = { + name: 'my-agent', + model: 'mock-model', + systemPrompt: 'You are a test.', + } + + const agent = buildMockAgent(config, [textResponse('Hello world')]) + + const runOptions: Partial = { + onTrace: (e) => traces.push(e), + runId: 'run-agent-1', + traceAgent: 'my-agent', + } + + const result = await agent.run('Say hello', runOptions) + expect(result.success).toBe(true) + + const agentTraces = traces.filter(t => t.type === 'agent') + expect(agentTraces).toHaveLength(1) + + const at = agentTraces[0]! + expect(at.type).toBe('agent') + expect(at.runId).toBe('run-agent-1') + expect(at.agent).toBe('my-agent') + expect(at.turns).toBe(1) // one assistant message + expect(at.tokens).toEqual({ input_tokens: 10, output_tokens: 20 }) + expect(at.toolCalls).toBe(0) + expect(at.durationMs).toBeGreaterThanOrEqual(0) + }) + + it('all traces share the same runId', async () => { + const traces: TraceEvent[] = [] + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'greet', + description: 'greets', + inputSchema: z.object({ name: z.string() }), + execute: async ({ name }) => ({ data: `Hi ${name}` }), + }), + ) + const executor = new ToolExecutor(registry) + const config: AgentConfig = { + name: 'multi-trace-agent', + model: 'mock-model', + tools: ['greet'], + } + + const agent = buildMockAgent( + config, + [ + toolUseResponse('greet', { name: 'world' }), + textResponse('Done'), + ], + registry, + executor, + ) + + const runId = 'shared-run-id' + await agent.run('test', { + onTrace: (e) => traces.push(e), + runId, + traceAgent: 'multi-trace-agent', + }) + + // Should have: 2 llm_call, 1 tool_call, 1 agent + expect(traces.length).toBeGreaterThanOrEqual(4) + + for (const trace of traces) { + expect(trace.runId).toBe(runId) + } + }) + + it('onTrace error does not break agent execution', async () => { + const config: AgentConfig = { + name: 'resilient-agent', + model: 'mock-model', + } + + const agent = buildMockAgent(config, [textResponse('OK')]) + + const result = await agent.run('test', { + onTrace: () => { throw new Error('callback exploded') }, + runId: 'run-err', + traceAgent: 'resilient-agent', + }) + + // The run should still succeed despite the broken callback + expect(result.success).toBe(true) + expect(result.output).toBe('OK') + }) + + it('per-turn token usage in llm_call traces', async () => { + const traces: TraceEvent[] = [] + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'noop', + description: 'noop', + inputSchema: z.object({}), + execute: async () => ({ data: 'ok' }), + }), + ) + const executor = new ToolExecutor(registry) + + // Two LLM calls: first triggers a tool, second is the final response + const resp1: LLMResponse = { + id: 'r1', + content: [{ type: 'tool_use', id: 'tu1', name: 'noop', input: {} }], + model: 'mock-model', + stop_reason: 'tool_use', + usage: { input_tokens: 100, output_tokens: 50 }, + } + const resp2: LLMResponse = { + id: 'r2', + content: [{ type: 'text', text: 'Final answer' }], + model: 'mock-model', + stop_reason: 'end_turn', + usage: { input_tokens: 200, output_tokens: 100 }, + } + + const adapter = mockAdapter([resp1, resp2]) + const runner = new AgentRunner(adapter, registry, executor, { + model: 'mock-model', + agentName: 'token-agent', + }) + + await runner.run( + [{ role: 'user', content: [{ type: 'text', text: 'go' }] }], + { onTrace: (e) => traces.push(e), runId: 'run-tok', traceAgent: 'token-agent' }, + ) + + const llmTraces = traces.filter(t => t.type === 'llm_call') + expect(llmTraces).toHaveLength(2) + + // Each trace carries its own turn's token usage, not the aggregate + expect(llmTraces[0]!.tokens).toEqual({ input_tokens: 100, output_tokens: 50 }) + expect(llmTraces[1]!.tokens).toEqual({ input_tokens: 200, output_tokens: 100 }) + + // Turn numbers should be sequential + expect(llmTraces[0]!.turn).toBe(1) + expect(llmTraces[1]!.turn).toBe(2) + }) +})