From 44f2ad2fe42930805c0fa10ab07899fdc3064b06 Mon Sep 17 00:00:00 2001 From: MrAvalonApple <74775400+ibrahimkazimov@users.noreply.github.com> Date: Mon, 6 Apr 2026 17:20:49 +0300 Subject: [PATCH] feat: implement token budget management in agent and orchestrator --- src/agent/agent.ts | 16 ++- src/agent/runner.ts | 21 ++++ src/errors.ts | 19 ++++ src/index.ts | 1 + src/orchestrator/orchestrator.ts | 95 +++++++++++++++- src/types.ts | 7 ++ tests/token-budget.test.ts | 181 +++++++++++++++++++++++++++++++ 7 files changed, 337 insertions(+), 3 deletions(-) create mode 100644 src/errors.ts create mode 100644 tests/token-budget.test.ts diff --git a/src/agent/agent.ts b/src/agent/agent.ts index 904b5bc..8b02641 100644 --- a/src/agent/agent.ts +++ b/src/agent/agent.ts @@ -150,6 +150,7 @@ export class Agent { agentName: this.name, agentRole: this.config.systemPrompt?.slice(0, 50) ?? 'assistant', loopDetection: this.config.loopDetection, + maxTokenBudget: this.config.maxTokenBudget, } this.runner = new AgentRunner( @@ -328,6 +329,16 @@ export class Agent { const result = await runner.run(messages, runOptions) this.state.tokenUsage = addUsage(this.state.tokenUsage, result.tokenUsage) + if (result.budgetExceeded) { + let budgetResult = this.toAgentRunResult(result, false) + if (this.config.afterRun) { + budgetResult = await this.config.afterRun(budgetResult) + } + this.transitionTo('completed') + this.emitAgentTrace(callerOptions, agentStartMs, budgetResult) + return budgetResult + } + // --- Structured output validation --- if (this.config.outputSchema) { let validated = await this.validateStructuredOutput( @@ -461,6 +472,7 @@ export class Agent { tokenUsage: mergedTokenUsage, toolCalls: mergedToolCalls, structured: validated, + ...(retryResult.budgetExceeded ? { budgetExceeded: true } : {}), } } catch { // Retry also failed @@ -472,6 +484,7 @@ export class Agent { tokenUsage: mergedTokenUsage, toolCalls: mergedToolCalls, structured: undefined, + ...(retryResult.budgetExceeded ? { budgetExceeded: true } : {}), } } } @@ -502,7 +515,7 @@ export class Agent { const result = event.data as import('./runner.js').RunResult this.state.tokenUsage = addUsage(this.state.tokenUsage, result.tokenUsage) - let agentResult = this.toAgentRunResult(result, true) + let agentResult = this.toAgentRunResult(result, !result.budgetExceeded) if (this.config.afterRun) { agentResult = await this.config.afterRun(agentResult) } @@ -598,6 +611,7 @@ export class Agent { toolCalls: result.toolCalls, structured, ...(result.loopDetected ? { loopDetected: true } : {}), + ...(result.budgetExceeded ? { budgetExceeded: true } : {}), } } diff --git a/src/agent/runner.ts b/src/agent/runner.ts index f7f1d6a..aa66f5f 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -29,6 +29,7 @@ import type { LoopDetectionConfig, LoopDetectionInfo, } from '../types.js' +import { TokenBudgetExceededError } from '../errors.js' import { LoopDetector } from './loop-detector.js' import { emitTrace } from '../utils/trace.js' import type { ToolRegistry } from '../tool/framework.js' @@ -70,6 +71,8 @@ export interface RunnerOptions { readonly agentRole?: string /** Loop detection configuration. When set, detects stuck agent loops. */ readonly loopDetection?: LoopDetectionConfig + /** Maximum cumulative tokens (input + output) allowed for this run. */ + readonly maxTokenBudget?: number } /** @@ -117,6 +120,8 @@ export interface RunResult { readonly turns: number /** True when the run was terminated or warned due to loop detection. */ readonly loopDetected?: boolean + /** True when the run was terminated due to token budget limits. */ + readonly budgetExceeded?: boolean } // --------------------------------------------------------------------------- @@ -232,6 +237,7 @@ export class AgentRunner { const allToolCalls: ToolCallRecord[] = [] let finalOutput = '' let turns = 0 + let budgetExceeded = false // Build the stable LLM options once; model / tokens / temp don't change. // toToolDefs() returns LLMToolDef[] (inputSchema, camelCase) — matches @@ -300,6 +306,20 @@ export class AgentRunner { } totalUsage = addTokenUsage(totalUsage, response.usage) + const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens + if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) { + budgetExceeded = true + finalOutput = extractText(response.content) + yield { + type: 'error', + data: new TokenBudgetExceededError( + this.options.agentName ?? 'unknown', + totalTokens, + this.options.maxTokenBudget, + ), + } satisfies StreamEvent + break + } // ------------------------------------------------------------------ // Step 2: Build the assistant message from the response content. @@ -516,6 +536,7 @@ export class AgentRunner { tokenUsage: totalUsage, turns, ...(loopDetected ? { loopDetected: true } : {}), + ...(budgetExceeded ? { budgetExceeded: true } : {}), } yield { type: 'done', data: runResult } satisfies StreamEvent diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 0000000..be7ac67 --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,19 @@ +/** + * @fileoverview Framework-specific error classes. + */ + +/** + * Raised when an agent or orchestrator run exceeds its configured token budget. + */ +export class TokenBudgetExceededError extends Error { + readonly code = 'TOKEN_BUDGET_EXCEEDED' + + constructor( + readonly agent: string, + readonly tokensUsed: number, + readonly budget: number, + ) { + super(`Agent "${agent}" exceeded token budget: ${tokensUsed} tokens used (budget: ${budget})`) + this.name = 'TokenBudgetExceededError' + } +} diff --git a/src/index.ts b/src/index.ts index 115ab2e..20d8d1a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -107,6 +107,7 @@ export { export { createAdapter } from './llm/adapter.js' export type { SupportedProvider } from './llm/adapter.js' +export { TokenBudgetExceededError } from './errors.js' // --------------------------------------------------------------------------- // Memory diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 1526827..0373a3f 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -63,6 +63,7 @@ import { Team } from '../team/team.js' import { TaskQueue } from '../task/queue.js' import { createTask } from '../task/task.js' import { Scheduler } from './scheduler.js' +import { TokenBudgetExceededError } from '../errors.js' // --------------------------------------------------------------------------- // Internal constants @@ -83,6 +84,12 @@ function addUsage(a: TokenUsage, b: TokenUsage): TokenUsage { } } +function resolveTokenBudget(primary?: number, fallback?: number): number | undefined { + if (primary === undefined) return fallback + if (fallback === undefined) return primary + return Math.min(primary, fallback) +} + /** * Build a minimal {@link Agent} with its own fresh registry/executor. * Registers all built-in tools so coordinator/worker agents can use them. @@ -266,6 +273,9 @@ interface RunContext { readonly runId?: string /** AbortSignal for run-level cancellation. Checked between task dispatch rounds. */ readonly abortSignal?: AbortSignal + cumulativeUsage: TokenUsage + readonly maxTokenBudget?: number + budgetExceededTriggered: boolean } /** @@ -409,6 +419,23 @@ async function executeQueue( } ctx.agentResults.set(`${assignee}:${task.id}`, result) + ctx.cumulativeUsage = addUsage(ctx.cumulativeUsage, result.tokenUsage) + const totalTokens = ctx.cumulativeUsage.input_tokens + ctx.cumulativeUsage.output_tokens + if ( + !ctx.budgetExceededTriggered + && ctx.maxTokenBudget !== undefined + && totalTokens > ctx.maxTokenBudget + ) { + ctx.budgetExceededTriggered = true + const err = new TokenBudgetExceededError('orchestrator', totalTokens, ctx.maxTokenBudget) + config.onProgress?.({ + type: 'budget_exceeded', + agent: assignee, + task: task.id, + data: err, + } satisfies OrchestratorEvent) + queue.skipRemaining(err.message) + } if (result.success) { // Persist result into shared memory so other agents can read it @@ -446,6 +473,9 @@ async function executeQueue( // Wait for the entire parallel batch before checking for newly-unblocked tasks. await Promise.all(dispatchPromises) + if (ctx.budgetExceededTriggered) { + break + } // --- Approval gate --- // After the batch completes, check if the caller wants to approve @@ -524,8 +554,8 @@ async function buildTaskPrompt(task: Task, team: Team): Promise { */ export class OpenMultiAgent { private readonly config: Required< - Omit - > & Pick + Omit + > & Pick private readonly teams: Map = new Map() private completedTaskCount = 0 @@ -545,6 +575,7 @@ export class OpenMultiAgent { defaultProvider: config.defaultProvider ?? 'anthropic', defaultBaseURL: config.defaultBaseURL, defaultApiKey: config.defaultApiKey, + maxTokenBudget: config.maxTokenBudget, onApproval: config.onApproval, onProgress: config.onProgress, onTrace: config.onTrace, @@ -592,11 +623,13 @@ export class OpenMultiAgent { * @param prompt - The user prompt to send. */ async runAgent(config: AgentConfig, prompt: string): Promise { + const effectiveBudget = resolveTokenBudget(config.maxTokenBudget, this.config.maxTokenBudget) const effective: AgentConfig = { ...config, provider: config.provider ?? this.config.defaultProvider, baseURL: config.baseURL ?? this.config.defaultBaseURL, apiKey: config.apiKey ?? this.config.defaultApiKey, + maxTokenBudget: effectiveBudget, } const agent = buildAgent(effective) this.config.onProgress?.({ @@ -611,6 +644,18 @@ export class OpenMultiAgent { const result = await agent.run(prompt, traceOptions) + if (result.budgetExceeded) { + this.config.onProgress?.({ + type: 'budget_exceeded', + agent: config.name, + data: new TokenBudgetExceededError( + config.name, + result.tokenUsage.input_tokens + result.tokenUsage.output_tokens, + effectiveBudget ?? 0, + ), + }) + } + this.config.onProgress?.({ type: 'agent_complete', agent: config.name, @@ -681,6 +726,24 @@ export class OpenMultiAgent { const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions) const agentResults = new Map() agentResults.set('coordinator:decompose', decompositionResult) + const maxTokenBudget = this.config.maxTokenBudget + let cumulativeUsage = addUsage(ZERO_USAGE, decompositionResult.tokenUsage) + + if ( + maxTokenBudget !== undefined + && cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget + ) { + this.config.onProgress?.({ + type: 'budget_exceeded', + agent: 'coordinator', + data: new TokenBudgetExceededError( + 'coordinator', + cumulativeUsage.input_tokens + cumulativeUsage.output_tokens, + maxTokenBudget, + ), + }) + return this.buildTeamRunResult(agentResults) + } // ------------------------------------------------------------------ // Step 2: Parse tasks from coordinator output @@ -724,19 +787,44 @@ export class OpenMultiAgent { config: this.config, runId, abortSignal: options?.abortSignal, + cumulativeUsage, + maxTokenBudget, + budgetExceededTriggered: false, } await executeQueue(queue, ctx) + cumulativeUsage = ctx.cumulativeUsage // ------------------------------------------------------------------ // Step 5: Coordinator synthesises final result // ------------------------------------------------------------------ + if ( + maxTokenBudget !== undefined + && cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget + ) { + return this.buildTeamRunResult(agentResults) + } const synthesisPrompt = await this.buildSynthesisPrompt(goal, queue.list(), team) const synthTraceOptions: Partial | undefined = this.config.onTrace ? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' } : undefined const synthesisResult = await coordinatorAgent.run(synthesisPrompt, synthTraceOptions) agentResults.set('coordinator', synthesisResult) + cumulativeUsage = addUsage(cumulativeUsage, synthesisResult.tokenUsage) + if ( + maxTokenBudget !== undefined + && cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget + ) { + this.config.onProgress?.({ + type: 'budget_exceeded', + agent: 'coordinator', + data: new TokenBudgetExceededError( + 'coordinator', + cumulativeUsage.input_tokens + cumulativeUsage.output_tokens, + maxTokenBudget, + ), + }) + } this.config.onProgress?.({ type: 'agent_complete', @@ -808,6 +896,9 @@ export class OpenMultiAgent { config: this.config, runId: this.config.onTrace ? generateRunId() : undefined, abortSignal: options?.abortSignal, + cumulativeUsage: ZERO_USAGE, + maxTokenBudget: this.config.maxTokenBudget, + budgetExceededTriggered: false, } await executeQueue(queue, ctx) diff --git a/src/types.ts b/src/types.ts index 3ffbfee..6cdaa92 100644 --- a/src/types.ts +++ b/src/types.ts @@ -208,6 +208,8 @@ export interface AgentConfig { readonly tools?: readonly string[] readonly maxTurns?: number readonly maxTokens?: number + /** Maximum cumulative tokens (input + output) allowed for this run. */ + readonly maxTokenBudget?: number readonly temperature?: number /** * Maximum wall-clock time (in milliseconds) for the entire agent run. @@ -307,6 +309,8 @@ export interface AgentRunResult { readonly structured?: unknown /** True when the run was terminated or warned due to loop detection. */ readonly loopDetected?: boolean + /** True when the run stopped because token budget was exceeded. */ + readonly budgetExceeded?: boolean } // --------------------------------------------------------------------------- @@ -375,6 +379,7 @@ export interface OrchestratorEvent { | 'task_complete' | 'task_skipped' | 'task_retry' + | 'budget_exceeded' | 'message' | 'error' readonly agent?: string @@ -385,6 +390,8 @@ export interface OrchestratorEvent { /** Top-level configuration for the orchestrator. */ export interface OrchestratorConfig { readonly maxConcurrency?: number + /** Maximum cumulative tokens (input + output) allowed per orchestrator run. */ + readonly maxTokenBudget?: number readonly defaultModel?: string readonly defaultProvider?: 'anthropic' | 'copilot' | 'grok' | 'openai' | 'gemini' readonly defaultBaseURL?: string diff --git a/tests/token-budget.test.ts b/tests/token-budget.test.ts new file mode 100644 index 0000000..3e47c21 --- /dev/null +++ b/tests/token-budget.test.ts @@ -0,0 +1,181 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js' +import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse, OrchestratorEvent } from '../src/types.js' + +let mockAdapterResponses: string[] = [] +let mockAdapterUsage: Array<{ input_tokens: number; output_tokens: number }> = [] + +vi.mock('../src/llm/adapter.js', () => ({ + createAdapter: async () => { + let callIndex = 0 + return { + name: 'mock', + async chat(_msgs: LLMMessage[], options: LLMChatOptions): Promise { + const text = mockAdapterResponses[callIndex] ?? 'default mock response' + const usage = mockAdapterUsage[callIndex] ?? { input_tokens: 10, output_tokens: 20 } + callIndex++ + return { + id: `resp-${callIndex}`, + content: [{ type: 'text', text }], + model: options.model ?? 'mock-model', + stop_reason: 'end_turn', + usage, + } + }, + async *stream() { + yield { type: 'done' as const, data: {} } + }, + } + }, +})) + +function agentConfig(name: string, maxTokenBudget?: number): AgentConfig { + return { + name, + model: 'mock-model', + provider: 'openai', + systemPrompt: `You are ${name}.`, + maxTokenBudget, + } +} + +describe('token budget enforcement', () => { + beforeEach(() => { + mockAdapterResponses = [] + mockAdapterUsage = [] + }) + + it('enforces agent-level maxTokenBudget in runAgent', async () => { + mockAdapterResponses = ['over budget'] + mockAdapterUsage = [{ input_tokens: 20, output_tokens: 15 }] + + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + onProgress: e => events.push(e), + }) + + const result = await oma.runAgent(agentConfig('solo', 30), 'test') + + expect(result.success).toBe(false) + expect(result.budgetExceeded).toBe(true) + expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) + }) + + it('does not trigger budget events when budget is not exceeded', async () => { + mockAdapterResponses = ['done-a', 'done-b'] + mockAdapterUsage = [ + { input_tokens: 10, output_tokens: 10 }, + { input_tokens: 10, output_tokens: 10 }, + ] + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + maxTokenBudget: 100, + onProgress: e => events.push(e), + }) + const team = oma.createTeam('team-a', { + name: 'team-a', + agents: [agentConfig('worker-a'), agentConfig('worker-b')], + sharedMemory: false, + }) + + const result = await oma.runTasks(team, [ + { title: 'A', description: 'Do A', assignee: 'worker-a' }, + { title: 'B', description: 'Do B', assignee: 'worker-b', dependsOn: ['A'] }, + ]) + + expect(result.success).toBe(true) + expect(events.some(e => e.type === 'budget_exceeded')).toBe(false) + }) + + it('enforces team budget in runTasks and skips remaining tasks', async () => { + mockAdapterResponses = ['done-a', 'done-b', 'done-c'] + mockAdapterUsage = [ + { input_tokens: 20, output_tokens: 15 }, // A => 35 + { input_tokens: 20, output_tokens: 15 }, // B => 70 total (exceeds 60) + { input_tokens: 20, output_tokens: 15 }, // C should not run + ] + + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + maxTokenBudget: 60, + onProgress: e => events.push(e), + }) + const team = oma.createTeam('team-b', { + name: 'team-b', + agents: [agentConfig('worker')], + sharedMemory: false, + }) + + const result = await oma.runTasks(team, [ + { title: 'A', description: 'A', assignee: 'worker' }, + { title: 'B', description: 'B', assignee: 'worker', dependsOn: ['A'] }, + { title: 'C', description: 'C', assignee: 'worker', dependsOn: ['B'] }, + ]) + + expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70) + expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) + expect(events.some(e => e.type === 'task_skipped')).toBe(true) + }) + + it('counts retry token usage before enforcing team budget', async () => { + mockAdapterResponses = ['attempt-1', 'attempt-2', 'should-skip'] + mockAdapterUsage = [ + { input_tokens: 20, output_tokens: 15 }, // attempt 1 + { input_tokens: 20, output_tokens: 15 }, // attempt 2 + { input_tokens: 20, output_tokens: 15 }, // next task (should skip) + ] + + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + maxTokenBudget: 50, + onProgress: e => events.push(e), + }) + const team = oma.createTeam('team-c', { + name: 'team-c', + agents: [agentConfig('retry-worker', 1)], + sharedMemory: false, + }) + + const result = await oma.runTasks(team, [ + { title: 'Retrying task', description: 'Will exceed internal budget', assignee: 'retry-worker', maxRetries: 1 }, + { title: 'Later task', description: 'Should be skipped', assignee: 'retry-worker', dependsOn: ['Retrying task'] }, + ]) + + expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70) + expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) + expect(events.some(e => e.type === 'task_skipped')).toBe(true) + }) + + it('enforces orchestrator budget in runTeam', async () => { + mockAdapterResponses = [ + '```json\n[{"title":"Task A","description":"Do A","assignee":"worker"}]\n```', + 'worker result', + 'synthesis should not run when budget exceeded', + ] + mockAdapterUsage = [ + { input_tokens: 20, output_tokens: 15 }, // decomposition => 35 + { input_tokens: 20, output_tokens: 15 }, // task => 70 total (exceeds 60) + { input_tokens: 20, output_tokens: 15 }, // synthesis should not execute + ] + + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + maxTokenBudget: 60, + onProgress: e => events.push(e), + }) + const team = oma.createTeam('team-d', { + name: 'team-d', + agents: [agentConfig('worker')], + sharedMemory: false, + }) + + const result = await oma.runTeam(team, 'Do work') + expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70) + expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) + }) +})