feat: add onTrace observability callback (#18)

Add lightweight onTrace callback to OrchestratorConfig that emits
structured span events (llm_call, tool_call, task, agent) with timing,
token usage, and runId correlation. Zero overhead when not subscribed.

Closes #18
This commit is contained in:
JackChen 2026-04-03 15:28:59 +08:00 committed by GitHub
parent 17546fd93e
commit 0111876264
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 848 additions and 19 deletions

View File

@ -18,6 +18,7 @@ Build AI agent teams that decompose goals into tasks automatically. Define agent
- **Model Agnostic** — Claude, GPT, Gemma 4, and local models (Ollama, vLLM, LM Studio) in the same team. Swap models per agent via `baseURL`.
- **Structured Output** — Add `outputSchema` (Zod) to any agent. Output is parsed as JSON, validated, and auto-retried once on failure. Access typed results via `result.structured`.
- **Task Retry** — Set `maxRetries` on tasks for automatic retry with exponential backoff. Failed attempts accumulate token usage for accurate billing.
- **Observability** — Optional `onTrace` callback emits structured spans for every LLM call, tool execution, task, and agent run — with timing, token usage, and a shared `runId` for correlation. Zero overhead when not subscribed, zero extra dependencies.
- **In-Process Execution** — No subprocess overhead. Everything runs in one Node.js process. Deploy to serverless, Docker, CI/CD.
## Quick Start
@ -120,6 +121,7 @@ npx tsx examples/01-single-agent.ts
| [08 — Gemma 4 Local](examples/08-gemma4-local.ts) | `runTasks()` + `runTeam()` with local Gemma 4 via Ollama — zero API cost |
| [09 — Structured Output](examples/09-structured-output.ts) | `outputSchema` (Zod) on AgentConfig — validated JSON via `result.structured` |
| [10 — Task Retry](examples/10-task-retry.ts) | `maxRetries` / `retryDelayMs` / `retryBackoff` with `task_retry` progress events |
| [11 — Trace Observability](examples/11-trace-observability.ts) | `onTrace` callback — structured spans for LLM calls, tools, tasks, and agents |
## Architecture

View File

@ -18,6 +18,7 @@
- **模型无关** — Claude、GPT、Gemma 4 和本地模型Ollama、vLLM、LM Studio可以在同一个团队中使用。通过 `baseURL` 即可接入任何 OpenAI 兼容服务。
- **结构化输出** — 为任意智能体添加 `outputSchema`Zod输出自动解析为 JSON 并校验,校验失败自动重试一次。通过 `result.structured` 获取类型化结果。
- **任务重试** — 为任务设置 `maxRetries`,失败时自动指数退避重试。所有尝试的 token 用量累计,确保计费准确。
- **可观测性** — 可选的 `onTrace` 回调为每次 LLM 调用、工具执行、任务和智能体运行发出结构化 span 事件——包含耗时、token 用量和共享的 `runId` 用于关联追踪。未订阅时零开销,零额外依赖。
- **进程内执行** — 没有子进程开销。所有内容在一个 Node.js 进程中运行。可部署到 Serverless、Docker、CI/CD。
## 快速开始
@ -120,6 +121,7 @@ npx tsx examples/01-single-agent.ts
| [08 — Gemma 4 本地](examples/08-gemma4-local.ts) | `runTasks()` + `runTeam()` 本地 Gemma 4 via Ollama — 零 API 费用 |
| [09 — 结构化输出](examples/09-structured-output.ts) | `outputSchema`Zod— 校验 JSON 输出,通过 `result.structured` 获取 |
| [10 — 任务重试](examples/10-task-retry.ts) | `maxRetries` / `retryDelayMs` / `retryBackoff` + `task_retry` 进度事件 |
| [11 — 可观测性](examples/11-trace-observability.ts) | `onTrace` 回调 — LLM 调用、工具、任务、智能体的结构化 span 事件 |
## 架构

View File

@ -0,0 +1,133 @@
/**
* Example 11 Trace Observability
*
* Demonstrates the `onTrace` callback for lightweight observability. Every LLM
* call, tool execution, task lifecycle, and agent run emits a structured trace
* event with timing data and token usage giving you full visibility into
* what's happening inside a multi-agent run.
*
* Trace events share a `runId` for correlation, so you can reconstruct the
* full execution timeline. Pipe them into your own logging, OpenTelemetry, or
* dashboard.
*
* Run:
* npx tsx examples/11-trace-observability.ts
*
* Prerequisites:
* ANTHROPIC_API_KEY env var must be set.
*/
import { OpenMultiAgent } from '../src/index.js'
import type { AgentConfig, TraceEvent } from '../src/types.js'
// ---------------------------------------------------------------------------
// Agents
// ---------------------------------------------------------------------------
const researcher: AgentConfig = {
name: 'researcher',
model: 'claude-sonnet-4-6',
systemPrompt: 'You are a research assistant. Provide concise, factual answers.',
maxTurns: 2,
}
const writer: AgentConfig = {
name: 'writer',
model: 'claude-sonnet-4-6',
systemPrompt: 'You are a technical writer. Summarize research into clear prose.',
maxTurns: 2,
}
// ---------------------------------------------------------------------------
// Trace handler — log every span with timing
// ---------------------------------------------------------------------------
function handleTrace(event: TraceEvent): void {
const dur = `${event.durationMs}ms`.padStart(7)
switch (event.type) {
case 'llm_call':
console.log(
` [LLM] ${dur} agent=${event.agent} model=${event.model} turn=${event.turn}` +
` tokens=${event.tokens.input_tokens}in/${event.tokens.output_tokens}out`,
)
break
case 'tool_call':
console.log(
` [TOOL] ${dur} agent=${event.agent} tool=${event.tool}` +
` error=${event.isError}`,
)
break
case 'task':
console.log(
` [TASK] ${dur} task="${event.taskTitle}" agent=${event.agent}` +
` success=${event.success} retries=${event.retries}`,
)
break
case 'agent':
console.log(
` [AGENT] ${dur} agent=${event.agent} turns=${event.turns}` +
` tools=${event.toolCalls} tokens=${event.tokens.input_tokens}in/${event.tokens.output_tokens}out`,
)
break
}
}
// ---------------------------------------------------------------------------
// Orchestrator + team
// ---------------------------------------------------------------------------
const orchestrator = new OpenMultiAgent({
defaultModel: 'claude-sonnet-4-6',
onTrace: handleTrace,
})
const team = orchestrator.createTeam('trace-demo', {
name: 'trace-demo',
agents: [researcher, writer],
sharedMemory: true,
})
// ---------------------------------------------------------------------------
// Tasks — researcher first, then writer summarizes
// ---------------------------------------------------------------------------
const tasks = [
{
title: 'Research topic',
description: 'List 5 key benefits of TypeScript for large codebases. Be concise.',
assignee: 'researcher',
},
{
title: 'Write summary',
description: 'Read the research from shared memory and write a 3-sentence summary.',
assignee: 'writer',
dependsOn: ['Research topic'],
},
]
// ---------------------------------------------------------------------------
// Run
// ---------------------------------------------------------------------------
console.log('Trace Observability Example')
console.log('='.repeat(60))
console.log('Pipeline: research → write (with full trace output)')
console.log('='.repeat(60))
console.log()
const result = await orchestrator.runTasks(team, tasks)
// ---------------------------------------------------------------------------
// Summary
// ---------------------------------------------------------------------------
console.log('\n' + '='.repeat(60))
console.log(`Overall success: ${result.success}`)
console.log(`Tokens — input: ${result.totalTokenUsage.input_tokens}, output: ${result.totalTokenUsage.output_tokens}`)
for (const [name, r] of result.agentResults) {
const icon = r.success ? 'OK ' : 'FAIL'
console.log(` [${icon}] ${name}`)
console.log(` ${r.output.slice(0, 200)}`)
}

View File

@ -32,6 +32,7 @@ import type {
TokenUsage,
ToolUseContext,
} from '../types.js'
import { emitTrace, generateRunId } from '../utils/trace.js'
import type { ToolDefinition as FrameworkToolDefinition, ToolRegistry } from '../tool/framework.js'
import type { ToolExecutor } from '../tool/executor.js'
import { createAdapter } from '../llm/adapter.js'
@ -158,12 +159,12 @@ export class Agent {
*
* Use this for one-shot queries where past context is irrelevant.
*/
async run(prompt: string): Promise<AgentRunResult> {
async run(prompt: string, runOptions?: Partial<RunOptions>): Promise<AgentRunResult> {
const messages: LLMMessage[] = [
{ role: 'user', content: [{ type: 'text', text: prompt }] },
]
return this.executeRun(messages)
return this.executeRun(messages, runOptions)
}
/**
@ -174,6 +175,7 @@ export class Agent {
*
* Use this for multi-turn interactions.
*/
// TODO(#18): accept optional RunOptions to forward trace context
async prompt(message: string): Promise<AgentRunResult> {
const userMessage: LLMMessage = {
role: 'user',
@ -197,6 +199,7 @@ export class Agent {
*
* Like {@link run}, this does not use or update the persistent history.
*/
// TODO(#18): accept optional RunOptions to forward trace context
async *stream(prompt: string): AsyncGenerator<StreamEvent> {
const messages: LLMMessage[] = [
{ role: 'user', content: [{ type: 'text', text: prompt }] },
@ -266,15 +269,26 @@ export class Agent {
* Shared execution path used by both `run` and `prompt`.
* Handles state transitions and error wrapping.
*/
private async executeRun(messages: LLMMessage[]): Promise<AgentRunResult> {
private async executeRun(
messages: LLMMessage[],
callerOptions?: Partial<RunOptions>,
): Promise<AgentRunResult> {
this.transitionTo('running')
const agentStartMs = Date.now()
try {
const runner = await this.getRunner()
const internalOnMessage = (msg: LLMMessage) => {
this.state.messages.push(msg)
callerOptions?.onMessage?.(msg)
}
// Auto-generate runId when onTrace is provided but runId is missing
const needsRunId = callerOptions?.onTrace && !callerOptions.runId
const runOptions: RunOptions = {
onMessage: msg => {
this.state.messages.push(msg)
},
...callerOptions,
onMessage: internalOnMessage,
...(needsRunId ? { runId: generateRunId() } : undefined),
}
const result = await runner.run(messages, runOptions)
@ -282,21 +296,25 @@ export class Agent {
// --- Structured output validation ---
if (this.config.outputSchema) {
return this.validateStructuredOutput(
const validated = await this.validateStructuredOutput(
messages,
result,
runner,
runOptions,
)
this.emitAgentTrace(callerOptions, agentStartMs, validated)
return validated
}
this.transitionTo('completed')
return this.toAgentRunResult(result, true)
const agentResult = this.toAgentRunResult(result, true)
this.emitAgentTrace(callerOptions, agentStartMs, agentResult)
return agentResult
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err))
this.transitionToError(error)
return {
const errorResult: AgentRunResult = {
success: false,
output: error.message,
messages: [],
@ -304,9 +322,33 @@ export class Agent {
toolCalls: [],
structured: undefined,
}
this.emitAgentTrace(callerOptions, agentStartMs, errorResult)
return errorResult
}
}
/** Emit an `agent` trace event if `onTrace` is provided. */
private emitAgentTrace(
options: Partial<RunOptions> | undefined,
startMs: number,
result: AgentRunResult,
): void {
if (!options?.onTrace) return
const endMs = Date.now()
emitTrace(options.onTrace, {
type: 'agent',
runId: options.runId ?? '',
taskId: options.taskId,
agent: options.traceAgent ?? this.name,
turns: result.messages.filter(m => m.role === 'assistant').length,
tokens: result.tokenUsage,
toolCalls: result.toolCalls.length,
startMs,
endMs,
durationMs: endMs - startMs,
})
}
/**
* Validate agent output against the configured `outputSchema`.
* On first validation failure, retry once with error feedback.

View File

@ -21,6 +21,7 @@
*/
import type { AgentRunResult } from '../types.js'
import type { RunOptions } from './runner.js'
import type { Agent } from './agent.js'
import { Semaphore } from '../utils/semaphore.js'
@ -123,12 +124,16 @@ export class AgentPool {
*
* @throws {Error} If the agent name is not found.
*/
async run(agentName: string, prompt: string): Promise<AgentRunResult> {
async run(
agentName: string,
prompt: string,
runOptions?: Partial<RunOptions>,
): Promise<AgentRunResult> {
const agent = this.requireAgent(agentName)
await this.semaphore.acquire()
try {
return await agent.run(prompt)
return await agent.run(prompt, runOptions)
} finally {
this.semaphore.release()
}
@ -144,6 +149,7 @@ export class AgentPool {
*
* @param tasks - Array of `{ agent, prompt }` descriptors.
*/
// TODO(#18): accept RunOptions per task to forward trace context
async runParallel(
tasks: ReadonlyArray<{ readonly agent: string; readonly prompt: string }>,
): Promise<Map<string, AgentRunResult>> {
@ -182,6 +188,7 @@ export class AgentPool {
*
* @throws {Error} If the pool is empty.
*/
// TODO(#18): accept RunOptions to forward trace context
async runAny(prompt: string): Promise<AgentRunResult> {
const allAgents = this.list()
if (allAgents.length === 0) {

View File

@ -25,7 +25,9 @@ import type {
ToolUseContext,
LLMAdapter,
LLMChatOptions,
TraceEvent,
} from '../types.js'
import { emitTrace } from '../utils/trace.js'
import type { ToolRegistry } from '../tool/framework.js'
import type { ToolExecutor } from '../tool/executor.js'
@ -76,6 +78,14 @@ export interface RunOptions {
readonly onToolResult?: (name: string, result: ToolResult) => void
/** Fired after each complete {@link LLMMessage} is appended. */
readonly onMessage?: (message: LLMMessage) => void
/** Trace callback for observability spans. Async callbacks are safe. */
readonly onTrace?: (event: TraceEvent) => void | Promise<void>
/** Run ID for trace correlation. */
readonly runId?: string
/** Task ID for trace correlation. */
readonly taskId?: string
/** Agent name for trace correlation (overrides RunnerOptions.agentName). */
readonly traceAgent?: string
}
/** The aggregated result returned when a full run completes. */
@ -254,7 +264,23 @@ export class AgentRunner {
// ------------------------------------------------------------------
// Step 1: Call the LLM and collect the full response for this turn.
// ------------------------------------------------------------------
const llmStartMs = Date.now()
const response = await this.adapter.chat(conversationMessages, baseChatOptions)
if (options.onTrace) {
const llmEndMs = Date.now()
emitTrace(options.onTrace, {
type: 'llm_call',
runId: options.runId ?? '',
taskId: options.taskId,
agent: options.traceAgent ?? this.options.agentName ?? 'unknown',
model: this.options.model,
turn: turns,
tokens: response.usage,
startMs: llmStartMs,
endMs: llmEndMs,
durationMs: llmEndMs - llmStartMs,
})
}
totalUsage = addTokenUsage(totalUsage, response.usage)
@ -319,10 +345,25 @@ export class AgentRunner {
result = { data: message, isError: true }
}
const duration = Date.now() - startTime
const endTime = Date.now()
const duration = endTime - startTime
options.onToolResult?.(block.name, result)
if (options.onTrace) {
emitTrace(options.onTrace, {
type: 'tool_call',
runId: options.runId ?? '',
taskId: options.taskId,
agent: options.traceAgent ?? this.options.agentName ?? 'unknown',
tool: block.name,
isError: result.isError ?? false,
startMs: startTime,
endMs: endTime,
durationMs: duration,
})
}
const record: ToolCallRecord = {
toolName: block.name,
input: block.input,

View File

@ -161,7 +161,18 @@ export type {
OrchestratorConfig,
OrchestratorEvent,
// Trace
TraceEventType,
TraceEventBase,
TraceEvent,
LLMCallTrace,
ToolCallTrace,
TaskTrace,
AgentTrace,
// Memory
MemoryEntry,
MemoryStore,
} from './types.js'
export { generateRunId } from './utils/trace.js'

View File

@ -52,8 +52,10 @@ import type {
TeamRunResult,
TokenUsage,
} from '../types.js'
import type { RunOptions } from '../agent/runner.js'
import { Agent } from '../agent/agent.js'
import { AgentPool } from '../agent/pool.js'
import { emitTrace, generateRunId } from '../utils/trace.js'
import { ToolRegistry } from '../tool/framework.js'
import { ToolExecutor } from '../tool/executor.js'
import { registerBuiltInTools } from '../tool/built-in/index.js'
@ -260,6 +262,8 @@ interface RunContext {
readonly scheduler: Scheduler
readonly agentResults: Map<string, AgentRunResult>
readonly config: OrchestratorConfig
/** Trace run ID, present when `onTrace` is configured. */
readonly runId?: string
}
/**
@ -338,10 +342,19 @@ async function executeQueue(
// Build the prompt: inject shared memory context + task description
const prompt = await buildTaskPrompt(task, team)
// Build trace context for this task's agent run
const traceOptions: Partial<RunOptions> | undefined = config.onTrace
? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee }
: undefined
const taskStartMs = config.onTrace ? Date.now() : 0
let retryCount = 0
const result = await executeWithRetry(
() => pool.run(assignee, prompt),
() => pool.run(assignee, prompt, traceOptions),
task,
(retryData) => {
retryCount++
config.onProgress?.({
type: 'task_retry',
task: task.id,
@ -351,6 +364,23 @@ async function executeQueue(
},
)
// Emit task trace
if (config.onTrace) {
const taskEndMs = Date.now()
emitTrace(config.onTrace, {
type: 'task',
runId: ctx.runId ?? '',
taskId: task.id,
taskTitle: task.title,
agent: assignee,
success: result.success,
retries: retryCount,
startMs: taskStartMs,
endMs: taskEndMs,
durationMs: taskEndMs - taskStartMs,
})
}
ctx.agentResults.set(`${assignee}:${task.id}`, result)
if (result.success) {
@ -441,8 +471,8 @@ async function buildTaskPrompt(task: Task, team: Team): Promise<string> {
*/
export class OpenMultiAgent {
private readonly config: Required<
Omit<OrchestratorConfig, 'onProgress' | 'defaultBaseURL' | 'defaultApiKey'>
> & Pick<OrchestratorConfig, 'onProgress' | 'defaultBaseURL' | 'defaultApiKey'>
Omit<OrchestratorConfig, 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey'>
> & Pick<OrchestratorConfig, 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey'>
private readonly teams: Map<string, Team> = new Map()
private completedTaskCount = 0
@ -463,6 +493,7 @@ export class OpenMultiAgent {
defaultBaseURL: config.defaultBaseURL,
defaultApiKey: config.defaultApiKey,
onProgress: config.onProgress,
onTrace: config.onTrace,
}
}
@ -520,7 +551,11 @@ export class OpenMultiAgent {
data: { prompt },
})
const result = await agent.run(prompt)
const traceOptions: Partial<RunOptions> | undefined = this.config.onTrace
? { onTrace: this.config.onTrace, runId: generateRunId(), traceAgent: config.name }
: undefined
const result = await agent.run(prompt, traceOptions)
this.config.onProgress?.({
type: 'agent_complete',
@ -578,6 +613,7 @@ export class OpenMultiAgent {
const decompositionPrompt = this.buildDecompositionPrompt(goal, agentConfigs)
const coordinatorAgent = buildAgent(coordinatorConfig)
const runId = this.config.onTrace ? generateRunId() : undefined
this.config.onProgress?.({
type: 'agent_start',
@ -585,7 +621,10 @@ export class OpenMultiAgent {
data: { phase: 'decomposition', goal },
})
const decompositionResult = await coordinatorAgent.run(decompositionPrompt)
const decompTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' }
: undefined
const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions)
const agentResults = new Map<string, AgentRunResult>()
agentResults.set('coordinator:decompose', decompositionResult)
@ -629,6 +668,7 @@ export class OpenMultiAgent {
scheduler,
agentResults,
config: this.config,
runId,
}
await executeQueue(queue, ctx)
@ -637,7 +677,10 @@ export class OpenMultiAgent {
// Step 5: Coordinator synthesises final result
// ------------------------------------------------------------------
const synthesisPrompt = await this.buildSynthesisPrompt(goal, queue.list(), team)
const synthesisResult = await coordinatorAgent.run(synthesisPrompt)
const synthTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' }
: undefined
const synthesisResult = await coordinatorAgent.run(synthesisPrompt, synthTraceOptions)
agentResults.set('coordinator', synthesisResult)
this.config.onProgress?.({
@ -707,6 +750,7 @@ export class OpenMultiAgent {
scheduler,
agentResults,
config: this.config,
runId: this.config.onTrace ? generateRunId() : undefined,
}
await executeQueue(queue, ctx)

View File

@ -315,9 +315,69 @@ export interface OrchestratorConfig {
readonly defaultProvider?: 'anthropic' | 'copilot' | 'openai'
readonly defaultBaseURL?: string
readonly defaultApiKey?: string
onProgress?: (event: OrchestratorEvent) => void
readonly onProgress?: (event: OrchestratorEvent) => void
readonly onTrace?: (event: TraceEvent) => void | Promise<void>
}
// ---------------------------------------------------------------------------
// Trace events — lightweight observability spans
// ---------------------------------------------------------------------------
/** Trace event type discriminants. */
export type TraceEventType = 'llm_call' | 'tool_call' | 'task' | 'agent'
/** Shared fields present on every trace event. */
export interface TraceEventBase {
/** Unique identifier for the entire run (runTeam / runTasks / runAgent call). */
readonly runId: string
readonly type: TraceEventType
/** Unix epoch ms when the span started. */
readonly startMs: number
/** Unix epoch ms when the span ended. */
readonly endMs: number
/** Wall-clock duration in milliseconds (`endMs - startMs`). */
readonly durationMs: number
/** Agent name associated with this span. */
readonly agent: string
/** Task ID associated with this span. */
readonly taskId?: string
}
/** Emitted for each LLM API call (one per agent turn). */
export interface LLMCallTrace extends TraceEventBase {
readonly type: 'llm_call'
readonly model: string
readonly turn: number
readonly tokens: TokenUsage
}
/** Emitted for each tool execution. */
export interface ToolCallTrace extends TraceEventBase {
readonly type: 'tool_call'
readonly tool: string
readonly isError: boolean
}
/** Emitted when a task completes (wraps the full retry sequence). */
export interface TaskTrace extends TraceEventBase {
readonly type: 'task'
readonly taskId: string
readonly taskTitle: string
readonly success: boolean
readonly retries: number
}
/** Emitted when an agent run completes (wraps the full conversation loop). */
export interface AgentTrace extends TraceEventBase {
readonly type: 'agent'
readonly turns: number
readonly tokens: TokenUsage
readonly toolCalls: number
}
/** Discriminated union of all trace event types. */
export type TraceEvent = LLMCallTrace | ToolCallTrace | TaskTrace | AgentTrace
// ---------------------------------------------------------------------------
// Memory
// ---------------------------------------------------------------------------

34
src/utils/trace.ts Normal file
View File

@ -0,0 +1,34 @@
/**
* @fileoverview Trace emission utilities for the observability layer.
*/
import { randomUUID } from 'node:crypto'
import type { TraceEvent } from '../types.js'
/**
* Safely emit a trace event. Swallows callback errors so a broken
* subscriber never crashes agent execution.
*/
export function emitTrace(
fn: ((event: TraceEvent) => void | Promise<void>) | undefined,
event: TraceEvent,
): void {
if (!fn) return
try {
// Guard async callbacks: if fn returns a Promise, swallow its rejection
// so an async onTrace never produces an unhandled promise rejection.
const result = fn(event) as unknown
if (result && typeof (result as Promise<unknown>).catch === 'function') {
;(result as Promise<unknown>).catch(noop)
}
} catch {
// Intentionally swallowed — observability must never break execution.
}
}
function noop() {}
/** Generate a unique run ID for trace correlation. */
export function generateRunId(): string {
return randomUUID()
}

453
tests/trace.test.ts Normal file
View File

@ -0,0 +1,453 @@
import { describe, it, expect, vi } from 'vitest'
import { z } from 'zod'
import { Agent } from '../src/agent/agent.js'
import { AgentRunner, type RunOptions } from '../src/agent/runner.js'
import { ToolRegistry, defineTool } from '../src/tool/framework.js'
import { ToolExecutor } from '../src/tool/executor.js'
import { executeWithRetry } from '../src/orchestrator/orchestrator.js'
import { emitTrace, generateRunId } from '../src/utils/trace.js'
import { createTask } from '../src/task/task.js'
import type {
AgentConfig,
AgentRunResult,
LLMAdapter,
LLMResponse,
TraceEvent,
} from '../src/types.js'
// ---------------------------------------------------------------------------
// Mock adapters
// ---------------------------------------------------------------------------
function mockAdapter(responses: LLMResponse[]): LLMAdapter {
let callIndex = 0
return {
name: 'mock',
async chat() {
return responses[callIndex++]!
},
async *stream() {
/* unused */
},
}
}
function textResponse(text: string): LLMResponse {
return {
id: `resp-${Math.random().toString(36).slice(2)}`,
content: [{ type: 'text' as const, text }],
model: 'mock-model',
stop_reason: 'end_turn',
usage: { input_tokens: 10, output_tokens: 20 },
}
}
function toolUseResponse(toolName: string, input: Record<string, unknown>): LLMResponse {
return {
id: `resp-${Math.random().toString(36).slice(2)}`,
content: [
{
type: 'tool_use' as const,
id: `tu-${Math.random().toString(36).slice(2)}`,
name: toolName,
input,
},
],
model: 'mock-model',
stop_reason: 'tool_use',
usage: { input_tokens: 15, output_tokens: 25 },
}
}
function buildMockAgent(
config: AgentConfig,
responses: LLMResponse[],
registry?: ToolRegistry,
executor?: ToolExecutor,
): Agent {
const reg = registry ?? new ToolRegistry()
const exec = executor ?? new ToolExecutor(reg)
const adapter = mockAdapter(responses)
const agent = new Agent(config, reg, exec)
const runner = new AgentRunner(adapter, reg, exec, {
model: config.model,
systemPrompt: config.systemPrompt,
maxTurns: config.maxTurns,
maxTokens: config.maxTokens,
temperature: config.temperature,
agentName: config.name,
})
;(agent as any).runner = runner
return agent
}
// ---------------------------------------------------------------------------
// emitTrace helper
// ---------------------------------------------------------------------------
describe('emitTrace', () => {
it('does nothing when fn is undefined', () => {
// Should not throw
emitTrace(undefined, {
type: 'agent',
runId: 'r1',
agent: 'a',
turns: 1,
tokens: { input_tokens: 0, output_tokens: 0 },
toolCalls: 0,
startMs: 0,
endMs: 0,
durationMs: 0,
})
})
it('calls fn with the event', () => {
const fn = vi.fn()
const event: TraceEvent = {
type: 'agent',
runId: 'r1',
agent: 'a',
turns: 1,
tokens: { input_tokens: 0, output_tokens: 0 },
toolCalls: 0,
startMs: 0,
endMs: 0,
durationMs: 0,
}
emitTrace(fn, event)
expect(fn).toHaveBeenCalledWith(event)
})
it('swallows errors thrown by callback', () => {
const fn = () => { throw new Error('boom') }
expect(() =>
emitTrace(fn, {
type: 'agent',
runId: 'r1',
agent: 'a',
turns: 1,
tokens: { input_tokens: 0, output_tokens: 0 },
toolCalls: 0,
startMs: 0,
endMs: 0,
durationMs: 0,
}),
).not.toThrow()
})
it('swallows rejected promises from async callbacks', async () => {
// An async onTrace that rejects should not produce unhandled rejection
const fn = async () => { throw new Error('async boom') }
emitTrace(fn as unknown as (event: TraceEvent) => void, {
type: 'agent',
runId: 'r1',
agent: 'a',
turns: 1,
tokens: { input_tokens: 0, output_tokens: 0 },
toolCalls: 0,
startMs: 0,
endMs: 0,
durationMs: 0,
})
// If the rejection is not caught, vitest will fail with unhandled rejection.
// Give the microtask queue a tick to surface any unhandled rejection.
await new Promise(resolve => setTimeout(resolve, 10))
})
})
describe('generateRunId', () => {
it('returns a UUID string', () => {
const id = generateRunId()
expect(id).toMatch(/^[0-9a-f-]{36}$/)
})
it('returns unique IDs', () => {
const ids = new Set(Array.from({ length: 100 }, generateRunId))
expect(ids.size).toBe(100)
})
})
// ---------------------------------------------------------------------------
// AgentRunner trace events
// ---------------------------------------------------------------------------
describe('AgentRunner trace events', () => {
it('emits llm_call trace for each LLM turn', async () => {
const traces: TraceEvent[] = []
const registry = new ToolRegistry()
const executor = new ToolExecutor(registry)
const adapter = mockAdapter([textResponse('Hello!')])
const runner = new AgentRunner(adapter, registry, executor, {
model: 'test-model',
agentName: 'test-agent',
})
const runOptions: RunOptions = {
onTrace: (e) => traces.push(e),
runId: 'run-1',
traceAgent: 'test-agent',
}
await runner.run(
[{ role: 'user', content: [{ type: 'text', text: 'hi' }] }],
runOptions,
)
const llmTraces = traces.filter(t => t.type === 'llm_call')
expect(llmTraces).toHaveLength(1)
const llm = llmTraces[0]!
expect(llm.type).toBe('llm_call')
expect(llm.runId).toBe('run-1')
expect(llm.agent).toBe('test-agent')
expect(llm.model).toBe('test-model')
expect(llm.turn).toBe(1)
expect(llm.tokens).toEqual({ input_tokens: 10, output_tokens: 20 })
expect(llm.durationMs).toBeGreaterThanOrEqual(0)
expect(llm.startMs).toBeLessThanOrEqual(llm.endMs)
})
it('emits tool_call trace with correct fields', async () => {
const traces: TraceEvent[] = []
const registry = new ToolRegistry()
registry.register(
defineTool({
name: 'echo',
description: 'echoes',
inputSchema: z.object({ msg: z.string() }),
execute: async ({ msg }) => ({ data: msg }),
}),
)
const executor = new ToolExecutor(registry)
const adapter = mockAdapter([
toolUseResponse('echo', { msg: 'hello' }),
textResponse('Done'),
])
const runner = new AgentRunner(adapter, registry, executor, {
model: 'test-model',
agentName: 'tooler',
})
await runner.run(
[{ role: 'user', content: [{ type: 'text', text: 'test' }] }],
{ onTrace: (e) => traces.push(e), runId: 'run-2', traceAgent: 'tooler' },
)
const toolTraces = traces.filter(t => t.type === 'tool_call')
expect(toolTraces).toHaveLength(1)
const tool = toolTraces[0]!
expect(tool.type).toBe('tool_call')
expect(tool.runId).toBe('run-2')
expect(tool.agent).toBe('tooler')
expect(tool.tool).toBe('echo')
expect(tool.isError).toBe(false)
expect(tool.durationMs).toBeGreaterThanOrEqual(0)
})
it('tool_call trace has isError: true on tool failure', async () => {
const traces: TraceEvent[] = []
const registry = new ToolRegistry()
registry.register(
defineTool({
name: 'boom',
description: 'fails',
inputSchema: z.object({}),
execute: async () => { throw new Error('fail') },
}),
)
const executor = new ToolExecutor(registry)
const adapter = mockAdapter([
toolUseResponse('boom', {}),
textResponse('Handled'),
])
const runner = new AgentRunner(adapter, registry, executor, {
model: 'test-model',
agentName: 'err-agent',
})
await runner.run(
[{ role: 'user', content: [{ type: 'text', text: 'test' }] }],
{ onTrace: (e) => traces.push(e), runId: 'run-3', traceAgent: 'err-agent' },
)
const toolTraces = traces.filter(t => t.type === 'tool_call')
expect(toolTraces).toHaveLength(1)
expect(toolTraces[0]!.isError).toBe(true)
})
it('does not call Date.now for LLM timing when onTrace is absent', async () => {
// This test just verifies no errors occur when onTrace is not provided
const registry = new ToolRegistry()
const executor = new ToolExecutor(registry)
const adapter = mockAdapter([textResponse('hi')])
const runner = new AgentRunner(adapter, registry, executor, {
model: 'test-model',
})
const result = await runner.run(
[{ role: 'user', content: [{ type: 'text', text: 'test' }] }],
{},
)
expect(result.output).toBe('hi')
})
})
// ---------------------------------------------------------------------------
// Agent-level trace events
// ---------------------------------------------------------------------------
describe('Agent trace events', () => {
it('emits agent trace with turns, tokens, and toolCalls', async () => {
const traces: TraceEvent[] = []
const config: AgentConfig = {
name: 'my-agent',
model: 'mock-model',
systemPrompt: 'You are a test.',
}
const agent = buildMockAgent(config, [textResponse('Hello world')])
const runOptions: Partial<RunOptions> = {
onTrace: (e) => traces.push(e),
runId: 'run-agent-1',
traceAgent: 'my-agent',
}
const result = await agent.run('Say hello', runOptions)
expect(result.success).toBe(true)
const agentTraces = traces.filter(t => t.type === 'agent')
expect(agentTraces).toHaveLength(1)
const at = agentTraces[0]!
expect(at.type).toBe('agent')
expect(at.runId).toBe('run-agent-1')
expect(at.agent).toBe('my-agent')
expect(at.turns).toBe(1) // one assistant message
expect(at.tokens).toEqual({ input_tokens: 10, output_tokens: 20 })
expect(at.toolCalls).toBe(0)
expect(at.durationMs).toBeGreaterThanOrEqual(0)
})
it('all traces share the same runId', async () => {
const traces: TraceEvent[] = []
const registry = new ToolRegistry()
registry.register(
defineTool({
name: 'greet',
description: 'greets',
inputSchema: z.object({ name: z.string() }),
execute: async ({ name }) => ({ data: `Hi ${name}` }),
}),
)
const executor = new ToolExecutor(registry)
const config: AgentConfig = {
name: 'multi-trace-agent',
model: 'mock-model',
tools: ['greet'],
}
const agent = buildMockAgent(
config,
[
toolUseResponse('greet', { name: 'world' }),
textResponse('Done'),
],
registry,
executor,
)
const runId = 'shared-run-id'
await agent.run('test', {
onTrace: (e) => traces.push(e),
runId,
traceAgent: 'multi-trace-agent',
})
// Should have: 2 llm_call, 1 tool_call, 1 agent
expect(traces.length).toBeGreaterThanOrEqual(4)
for (const trace of traces) {
expect(trace.runId).toBe(runId)
}
})
it('onTrace error does not break agent execution', async () => {
const config: AgentConfig = {
name: 'resilient-agent',
model: 'mock-model',
}
const agent = buildMockAgent(config, [textResponse('OK')])
const result = await agent.run('test', {
onTrace: () => { throw new Error('callback exploded') },
runId: 'run-err',
traceAgent: 'resilient-agent',
})
// The run should still succeed despite the broken callback
expect(result.success).toBe(true)
expect(result.output).toBe('OK')
})
it('per-turn token usage in llm_call traces', async () => {
const traces: TraceEvent[] = []
const registry = new ToolRegistry()
registry.register(
defineTool({
name: 'noop',
description: 'noop',
inputSchema: z.object({}),
execute: async () => ({ data: 'ok' }),
}),
)
const executor = new ToolExecutor(registry)
// Two LLM calls: first triggers a tool, second is the final response
const resp1: LLMResponse = {
id: 'r1',
content: [{ type: 'tool_use', id: 'tu1', name: 'noop', input: {} }],
model: 'mock-model',
stop_reason: 'tool_use',
usage: { input_tokens: 100, output_tokens: 50 },
}
const resp2: LLMResponse = {
id: 'r2',
content: [{ type: 'text', text: 'Final answer' }],
model: 'mock-model',
stop_reason: 'end_turn',
usage: { input_tokens: 200, output_tokens: 100 },
}
const adapter = mockAdapter([resp1, resp2])
const runner = new AgentRunner(adapter, registry, executor, {
model: 'mock-model',
agentName: 'token-agent',
})
await runner.run(
[{ role: 'user', content: [{ type: 'text', text: 'go' }] }],
{ onTrace: (e) => traces.push(e), runId: 'run-tok', traceAgent: 'token-agent' },
)
const llmTraces = traces.filter(t => t.type === 'llm_call')
expect(llmTraces).toHaveLength(2)
// Each trace carries its own turn's token usage, not the aggregate
expect(llmTraces[0]!.tokens).toEqual({ input_tokens: 100, output_tokens: 50 })
expect(llmTraces[1]!.tokens).toEqual({ input_tokens: 200, output_tokens: 100 })
// Turn numbers should be sequential
expect(llmTraces[0]!.turn).toBe(1)
expect(llmTraces[1]!.turn).toBe(2)
})
})