feat: implement token budget management in agent and orchestrator

This commit is contained in:
MrAvalonApple 2026-04-06 17:20:49 +03:00
parent 1e3bd1013e
commit 44f2ad2fe4
7 changed files with 337 additions and 3 deletions

View File

@ -150,6 +150,7 @@ export class Agent {
agentName: this.name,
agentRole: this.config.systemPrompt?.slice(0, 50) ?? 'assistant',
loopDetection: this.config.loopDetection,
maxTokenBudget: this.config.maxTokenBudget,
}
this.runner = new AgentRunner(
@ -328,6 +329,16 @@ export class Agent {
const result = await runner.run(messages, runOptions)
this.state.tokenUsage = addUsage(this.state.tokenUsage, result.tokenUsage)
if (result.budgetExceeded) {
let budgetResult = this.toAgentRunResult(result, false)
if (this.config.afterRun) {
budgetResult = await this.config.afterRun(budgetResult)
}
this.transitionTo('completed')
this.emitAgentTrace(callerOptions, agentStartMs, budgetResult)
return budgetResult
}
// --- Structured output validation ---
if (this.config.outputSchema) {
let validated = await this.validateStructuredOutput(
@ -461,6 +472,7 @@ export class Agent {
tokenUsage: mergedTokenUsage,
toolCalls: mergedToolCalls,
structured: validated,
...(retryResult.budgetExceeded ? { budgetExceeded: true } : {}),
}
} catch {
// Retry also failed
@ -472,6 +484,7 @@ export class Agent {
tokenUsage: mergedTokenUsage,
toolCalls: mergedToolCalls,
structured: undefined,
...(retryResult.budgetExceeded ? { budgetExceeded: true } : {}),
}
}
}
@ -502,7 +515,7 @@ export class Agent {
const result = event.data as import('./runner.js').RunResult
this.state.tokenUsage = addUsage(this.state.tokenUsage, result.tokenUsage)
let agentResult = this.toAgentRunResult(result, true)
let agentResult = this.toAgentRunResult(result, !result.budgetExceeded)
if (this.config.afterRun) {
agentResult = await this.config.afterRun(agentResult)
}
@ -598,6 +611,7 @@ export class Agent {
toolCalls: result.toolCalls,
structured,
...(result.loopDetected ? { loopDetected: true } : {}),
...(result.budgetExceeded ? { budgetExceeded: true } : {}),
}
}

View File

@ -29,6 +29,7 @@ import type {
LoopDetectionConfig,
LoopDetectionInfo,
} from '../types.js'
import { TokenBudgetExceededError } from '../errors.js'
import { LoopDetector } from './loop-detector.js'
import { emitTrace } from '../utils/trace.js'
import type { ToolRegistry } from '../tool/framework.js'
@ -70,6 +71,8 @@ export interface RunnerOptions {
readonly agentRole?: string
/** Loop detection configuration. When set, detects stuck agent loops. */
readonly loopDetection?: LoopDetectionConfig
/** Maximum cumulative tokens (input + output) allowed for this run. */
readonly maxTokenBudget?: number
}
/**
@ -117,6 +120,8 @@ export interface RunResult {
readonly turns: number
/** True when the run was terminated or warned due to loop detection. */
readonly loopDetected?: boolean
/** True when the run was terminated due to token budget limits. */
readonly budgetExceeded?: boolean
}
// ---------------------------------------------------------------------------
@ -232,6 +237,7 @@ export class AgentRunner {
const allToolCalls: ToolCallRecord[] = []
let finalOutput = ''
let turns = 0
let budgetExceeded = false
// Build the stable LLM options once; model / tokens / temp don't change.
// toToolDefs() returns LLMToolDef[] (inputSchema, camelCase) — matches
@ -300,6 +306,20 @@ export class AgentRunner {
}
totalUsage = addTokenUsage(totalUsage, response.usage)
const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens
if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) {
budgetExceeded = true
finalOutput = extractText(response.content)
yield {
type: 'error',
data: new TokenBudgetExceededError(
this.options.agentName ?? 'unknown',
totalTokens,
this.options.maxTokenBudget,
),
} satisfies StreamEvent
break
}
// ------------------------------------------------------------------
// Step 2: Build the assistant message from the response content.
@ -516,6 +536,7 @@ export class AgentRunner {
tokenUsage: totalUsage,
turns,
...(loopDetected ? { loopDetected: true } : {}),
...(budgetExceeded ? { budgetExceeded: true } : {}),
}
yield { type: 'done', data: runResult } satisfies StreamEvent

19
src/errors.ts Normal file
View File

@ -0,0 +1,19 @@
/**
* @fileoverview Framework-specific error classes.
*/
/**
* Raised when an agent or orchestrator run exceeds its configured token budget.
*/
export class TokenBudgetExceededError extends Error {
readonly code = 'TOKEN_BUDGET_EXCEEDED'
constructor(
readonly agent: string,
readonly tokensUsed: number,
readonly budget: number,
) {
super(`Agent "${agent}" exceeded token budget: ${tokensUsed} tokens used (budget: ${budget})`)
this.name = 'TokenBudgetExceededError'
}
}

View File

@ -107,6 +107,7 @@ export {
export { createAdapter } from './llm/adapter.js'
export type { SupportedProvider } from './llm/adapter.js'
export { TokenBudgetExceededError } from './errors.js'
// ---------------------------------------------------------------------------
// Memory

View File

@ -63,6 +63,7 @@ import { Team } from '../team/team.js'
import { TaskQueue } from '../task/queue.js'
import { createTask } from '../task/task.js'
import { Scheduler } from './scheduler.js'
import { TokenBudgetExceededError } from '../errors.js'
// ---------------------------------------------------------------------------
// Internal constants
@ -83,6 +84,12 @@ function addUsage(a: TokenUsage, b: TokenUsage): TokenUsage {
}
}
function resolveTokenBudget(primary?: number, fallback?: number): number | undefined {
if (primary === undefined) return fallback
if (fallback === undefined) return primary
return Math.min(primary, fallback)
}
/**
* Build a minimal {@link Agent} with its own fresh registry/executor.
* Registers all built-in tools so coordinator/worker agents can use them.
@ -266,6 +273,9 @@ interface RunContext {
readonly runId?: string
/** AbortSignal for run-level cancellation. Checked between task dispatch rounds. */
readonly abortSignal?: AbortSignal
cumulativeUsage: TokenUsage
readonly maxTokenBudget?: number
budgetExceededTriggered: boolean
}
/**
@ -409,6 +419,23 @@ async function executeQueue(
}
ctx.agentResults.set(`${assignee}:${task.id}`, result)
ctx.cumulativeUsage = addUsage(ctx.cumulativeUsage, result.tokenUsage)
const totalTokens = ctx.cumulativeUsage.input_tokens + ctx.cumulativeUsage.output_tokens
if (
!ctx.budgetExceededTriggered
&& ctx.maxTokenBudget !== undefined
&& totalTokens > ctx.maxTokenBudget
) {
ctx.budgetExceededTriggered = true
const err = new TokenBudgetExceededError('orchestrator', totalTokens, ctx.maxTokenBudget)
config.onProgress?.({
type: 'budget_exceeded',
agent: assignee,
task: task.id,
data: err,
} satisfies OrchestratorEvent)
queue.skipRemaining(err.message)
}
if (result.success) {
// Persist result into shared memory so other agents can read it
@ -446,6 +473,9 @@ async function executeQueue(
// Wait for the entire parallel batch before checking for newly-unblocked tasks.
await Promise.all(dispatchPromises)
if (ctx.budgetExceededTriggered) {
break
}
// --- Approval gate ---
// After the batch completes, check if the caller wants to approve
@ -524,8 +554,8 @@ async function buildTaskPrompt(task: Task, team: Team): Promise<string> {
*/
export class OpenMultiAgent {
private readonly config: Required<
Omit<OrchestratorConfig, 'onApproval' | 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey'>
> & Pick<OrchestratorConfig, 'onApproval' | 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey'>
Omit<OrchestratorConfig, 'onApproval' | 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey' | 'maxTokenBudget'>
> & Pick<OrchestratorConfig, 'onApproval' | 'onProgress' | 'onTrace' | 'defaultBaseURL' | 'defaultApiKey' | 'maxTokenBudget'>
private readonly teams: Map<string, Team> = new Map()
private completedTaskCount = 0
@ -545,6 +575,7 @@ export class OpenMultiAgent {
defaultProvider: config.defaultProvider ?? 'anthropic',
defaultBaseURL: config.defaultBaseURL,
defaultApiKey: config.defaultApiKey,
maxTokenBudget: config.maxTokenBudget,
onApproval: config.onApproval,
onProgress: config.onProgress,
onTrace: config.onTrace,
@ -592,11 +623,13 @@ export class OpenMultiAgent {
* @param prompt - The user prompt to send.
*/
async runAgent(config: AgentConfig, prompt: string): Promise<AgentRunResult> {
const effectiveBudget = resolveTokenBudget(config.maxTokenBudget, this.config.maxTokenBudget)
const effective: AgentConfig = {
...config,
provider: config.provider ?? this.config.defaultProvider,
baseURL: config.baseURL ?? this.config.defaultBaseURL,
apiKey: config.apiKey ?? this.config.defaultApiKey,
maxTokenBudget: effectiveBudget,
}
const agent = buildAgent(effective)
this.config.onProgress?.({
@ -611,6 +644,18 @@ export class OpenMultiAgent {
const result = await agent.run(prompt, traceOptions)
if (result.budgetExceeded) {
this.config.onProgress?.({
type: 'budget_exceeded',
agent: config.name,
data: new TokenBudgetExceededError(
config.name,
result.tokenUsage.input_tokens + result.tokenUsage.output_tokens,
effectiveBudget ?? 0,
),
})
}
this.config.onProgress?.({
type: 'agent_complete',
agent: config.name,
@ -681,6 +726,24 @@ export class OpenMultiAgent {
const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions)
const agentResults = new Map<string, AgentRunResult>()
agentResults.set('coordinator:decompose', decompositionResult)
const maxTokenBudget = this.config.maxTokenBudget
let cumulativeUsage = addUsage(ZERO_USAGE, decompositionResult.tokenUsage)
if (
maxTokenBudget !== undefined
&& cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget
) {
this.config.onProgress?.({
type: 'budget_exceeded',
agent: 'coordinator',
data: new TokenBudgetExceededError(
'coordinator',
cumulativeUsage.input_tokens + cumulativeUsage.output_tokens,
maxTokenBudget,
),
})
return this.buildTeamRunResult(agentResults)
}
// ------------------------------------------------------------------
// Step 2: Parse tasks from coordinator output
@ -724,19 +787,44 @@ export class OpenMultiAgent {
config: this.config,
runId,
abortSignal: options?.abortSignal,
cumulativeUsage,
maxTokenBudget,
budgetExceededTriggered: false,
}
await executeQueue(queue, ctx)
cumulativeUsage = ctx.cumulativeUsage
// ------------------------------------------------------------------
// Step 5: Coordinator synthesises final result
// ------------------------------------------------------------------
if (
maxTokenBudget !== undefined
&& cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget
) {
return this.buildTeamRunResult(agentResults)
}
const synthesisPrompt = await this.buildSynthesisPrompt(goal, queue.list(), team)
const synthTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' }
: undefined
const synthesisResult = await coordinatorAgent.run(synthesisPrompt, synthTraceOptions)
agentResults.set('coordinator', synthesisResult)
cumulativeUsage = addUsage(cumulativeUsage, synthesisResult.tokenUsage)
if (
maxTokenBudget !== undefined
&& cumulativeUsage.input_tokens + cumulativeUsage.output_tokens > maxTokenBudget
) {
this.config.onProgress?.({
type: 'budget_exceeded',
agent: 'coordinator',
data: new TokenBudgetExceededError(
'coordinator',
cumulativeUsage.input_tokens + cumulativeUsage.output_tokens,
maxTokenBudget,
),
})
}
this.config.onProgress?.({
type: 'agent_complete',
@ -808,6 +896,9 @@ export class OpenMultiAgent {
config: this.config,
runId: this.config.onTrace ? generateRunId() : undefined,
abortSignal: options?.abortSignal,
cumulativeUsage: ZERO_USAGE,
maxTokenBudget: this.config.maxTokenBudget,
budgetExceededTriggered: false,
}
await executeQueue(queue, ctx)

View File

@ -208,6 +208,8 @@ export interface AgentConfig {
readonly tools?: readonly string[]
readonly maxTurns?: number
readonly maxTokens?: number
/** Maximum cumulative tokens (input + output) allowed for this run. */
readonly maxTokenBudget?: number
readonly temperature?: number
/**
* Maximum wall-clock time (in milliseconds) for the entire agent run.
@ -307,6 +309,8 @@ export interface AgentRunResult {
readonly structured?: unknown
/** True when the run was terminated or warned due to loop detection. */
readonly loopDetected?: boolean
/** True when the run stopped because token budget was exceeded. */
readonly budgetExceeded?: boolean
}
// ---------------------------------------------------------------------------
@ -375,6 +379,7 @@ export interface OrchestratorEvent {
| 'task_complete'
| 'task_skipped'
| 'task_retry'
| 'budget_exceeded'
| 'message'
| 'error'
readonly agent?: string
@ -385,6 +390,8 @@ export interface OrchestratorEvent {
/** Top-level configuration for the orchestrator. */
export interface OrchestratorConfig {
readonly maxConcurrency?: number
/** Maximum cumulative tokens (input + output) allowed per orchestrator run. */
readonly maxTokenBudget?: number
readonly defaultModel?: string
readonly defaultProvider?: 'anthropic' | 'copilot' | 'grok' | 'openai' | 'gemini'
readonly defaultBaseURL?: string

181
tests/token-budget.test.ts Normal file
View File

@ -0,0 +1,181 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse, OrchestratorEvent } from '../src/types.js'
let mockAdapterResponses: string[] = []
let mockAdapterUsage: Array<{ input_tokens: number; output_tokens: number }> = []
vi.mock('../src/llm/adapter.js', () => ({
createAdapter: async () => {
let callIndex = 0
return {
name: 'mock',
async chat(_msgs: LLMMessage[], options: LLMChatOptions): Promise<LLMResponse> {
const text = mockAdapterResponses[callIndex] ?? 'default mock response'
const usage = mockAdapterUsage[callIndex] ?? { input_tokens: 10, output_tokens: 20 }
callIndex++
return {
id: `resp-${callIndex}`,
content: [{ type: 'text', text }],
model: options.model ?? 'mock-model',
stop_reason: 'end_turn',
usage,
}
},
async *stream() {
yield { type: 'done' as const, data: {} }
},
}
},
}))
function agentConfig(name: string, maxTokenBudget?: number): AgentConfig {
return {
name,
model: 'mock-model',
provider: 'openai',
systemPrompt: `You are ${name}.`,
maxTokenBudget,
}
}
describe('token budget enforcement', () => {
beforeEach(() => {
mockAdapterResponses = []
mockAdapterUsage = []
})
it('enforces agent-level maxTokenBudget in runAgent', async () => {
mockAdapterResponses = ['over budget']
mockAdapterUsage = [{ input_tokens: 20, output_tokens: 15 }]
const events: OrchestratorEvent[] = []
const oma = new OpenMultiAgent({
defaultModel: 'mock-model',
onProgress: e => events.push(e),
})
const result = await oma.runAgent(agentConfig('solo', 30), 'test')
expect(result.success).toBe(false)
expect(result.budgetExceeded).toBe(true)
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
})
it('does not trigger budget events when budget is not exceeded', async () => {
mockAdapterResponses = ['done-a', 'done-b']
mockAdapterUsage = [
{ input_tokens: 10, output_tokens: 10 },
{ input_tokens: 10, output_tokens: 10 },
]
const events: OrchestratorEvent[] = []
const oma = new OpenMultiAgent({
defaultModel: 'mock-model',
maxTokenBudget: 100,
onProgress: e => events.push(e),
})
const team = oma.createTeam('team-a', {
name: 'team-a',
agents: [agentConfig('worker-a'), agentConfig('worker-b')],
sharedMemory: false,
})
const result = await oma.runTasks(team, [
{ title: 'A', description: 'Do A', assignee: 'worker-a' },
{ title: 'B', description: 'Do B', assignee: 'worker-b', dependsOn: ['A'] },
])
expect(result.success).toBe(true)
expect(events.some(e => e.type === 'budget_exceeded')).toBe(false)
})
it('enforces team budget in runTasks and skips remaining tasks', async () => {
mockAdapterResponses = ['done-a', 'done-b', 'done-c']
mockAdapterUsage = [
{ input_tokens: 20, output_tokens: 15 }, // A => 35
{ input_tokens: 20, output_tokens: 15 }, // B => 70 total (exceeds 60)
{ input_tokens: 20, output_tokens: 15 }, // C should not run
]
const events: OrchestratorEvent[] = []
const oma = new OpenMultiAgent({
defaultModel: 'mock-model',
maxTokenBudget: 60,
onProgress: e => events.push(e),
})
const team = oma.createTeam('team-b', {
name: 'team-b',
agents: [agentConfig('worker')],
sharedMemory: false,
})
const result = await oma.runTasks(team, [
{ title: 'A', description: 'A', assignee: 'worker' },
{ title: 'B', description: 'B', assignee: 'worker', dependsOn: ['A'] },
{ title: 'C', description: 'C', assignee: 'worker', dependsOn: ['B'] },
])
expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70)
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
expect(events.some(e => e.type === 'task_skipped')).toBe(true)
})
it('counts retry token usage before enforcing team budget', async () => {
mockAdapterResponses = ['attempt-1', 'attempt-2', 'should-skip']
mockAdapterUsage = [
{ input_tokens: 20, output_tokens: 15 }, // attempt 1
{ input_tokens: 20, output_tokens: 15 }, // attempt 2
{ input_tokens: 20, output_tokens: 15 }, // next task (should skip)
]
const events: OrchestratorEvent[] = []
const oma = new OpenMultiAgent({
defaultModel: 'mock-model',
maxTokenBudget: 50,
onProgress: e => events.push(e),
})
const team = oma.createTeam('team-c', {
name: 'team-c',
agents: [agentConfig('retry-worker', 1)],
sharedMemory: false,
})
const result = await oma.runTasks(team, [
{ title: 'Retrying task', description: 'Will exceed internal budget', assignee: 'retry-worker', maxRetries: 1 },
{ title: 'Later task', description: 'Should be skipped', assignee: 'retry-worker', dependsOn: ['Retrying task'] },
])
expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70)
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
expect(events.some(e => e.type === 'task_skipped')).toBe(true)
})
it('enforces orchestrator budget in runTeam', async () => {
mockAdapterResponses = [
'```json\n[{"title":"Task A","description":"Do A","assignee":"worker"}]\n```',
'worker result',
'synthesis should not run when budget exceeded',
]
mockAdapterUsage = [
{ input_tokens: 20, output_tokens: 15 }, // decomposition => 35
{ input_tokens: 20, output_tokens: 15 }, // task => 70 total (exceeds 60)
{ input_tokens: 20, output_tokens: 15 }, // synthesis should not execute
]
const events: OrchestratorEvent[] = []
const oma = new OpenMultiAgent({
defaultModel: 'mock-model',
maxTokenBudget: 60,
onProgress: e => events.push(e),
})
const team = oma.createTeam('team-d', {
name: 'team-d',
agents: [agentConfig('worker')],
sharedMemory: false,
})
const result = await oma.runTeam(team, 'Do work')
expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70)
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
})
})