From c563d6c836317e1edf303ba10accc3899cbcd62b Mon Sep 17 00:00:00 2001 From: MrAvalonApple <74775400+ibrahimkazimov@users.noreply.github.com> Date: Mon, 6 Apr 2026 18:06:58 +0300 Subject: [PATCH] feat: add budget exceeded event handling in agent and orchestrator --- src/agent/runner.ts | 30 ++++++++------- src/orchestrator/orchestrator.ts | 6 ++- src/types.ts | 3 +- tests/token-budget.test.ts | 66 +++++++++++++++++++++++++++++++- 4 files changed, 88 insertions(+), 17 deletions(-) diff --git a/src/agent/runner.ts b/src/agent/runner.ts index aa66f5f..4ca0975 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -222,6 +222,7 @@ export class AgentRunner { * - `{ type: 'text', data: string }` for each text delta * - `{ type: 'tool_use', data: ToolUseBlock }` when the model requests a tool * - `{ type: 'tool_result', data: ToolResultBlock }` after each execution + * - `{ type: 'budget_exceeded', data: TokenBudgetExceededError }` on budget trip * - `{ type: 'done', data: RunResult }` at the very end * - `{ type: 'error', data: Error }` on unrecoverable failure */ @@ -306,20 +307,6 @@ export class AgentRunner { } totalUsage = addTokenUsage(totalUsage, response.usage) - const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens - if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) { - budgetExceeded = true - finalOutput = extractText(response.content) - yield { - type: 'error', - data: new TokenBudgetExceededError( - this.options.agentName ?? 'unknown', - totalTokens, - this.options.maxTokenBudget, - ), - } satisfies StreamEvent - break - } // ------------------------------------------------------------------ // Step 2: Build the assistant message from the response content. @@ -338,6 +325,21 @@ export class AgentRunner { yield { type: 'text', data: turnText } satisfies StreamEvent } + const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens + if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) { + budgetExceeded = true + finalOutput = turnText + yield { + type: 'budget_exceeded', + data: new TokenBudgetExceededError( + this.options.agentName ?? 'unknown', + totalTokens, + this.options.maxTokenBudget, + ), + } satisfies StreamEvent + break + } + // Extract tool-use blocks for detection and execution. const toolUseBlocks = extractToolUseBlocks(response.content) diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 0373a3f..8b0073d 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -276,6 +276,7 @@ interface RunContext { cumulativeUsage: TokenUsage readonly maxTokenBudget?: number budgetExceededTriggered: boolean + budgetExceededReason?: string } /** @@ -428,13 +429,13 @@ async function executeQueue( ) { ctx.budgetExceededTriggered = true const err = new TokenBudgetExceededError('orchestrator', totalTokens, ctx.maxTokenBudget) + ctx.budgetExceededReason = err.message config.onProgress?.({ type: 'budget_exceeded', agent: assignee, task: task.id, data: err, } satisfies OrchestratorEvent) - queue.skipRemaining(err.message) } if (result.success) { @@ -474,6 +475,7 @@ async function executeQueue( // Wait for the entire parallel batch before checking for newly-unblocked tasks. await Promise.all(dispatchPromises) if (ctx.budgetExceededTriggered) { + queue.skipRemaining(ctx.budgetExceededReason ?? 'Skipped: token budget exceeded.') break } @@ -790,6 +792,7 @@ export class OpenMultiAgent { cumulativeUsage, maxTokenBudget, budgetExceededTriggered: false, + budgetExceededReason: undefined, } await executeQueue(queue, ctx) @@ -899,6 +902,7 @@ export class OpenMultiAgent { cumulativeUsage: ZERO_USAGE, maxTokenBudget: this.config.maxTokenBudget, budgetExceededTriggered: false, + budgetExceededReason: undefined, } await executeQueue(queue, ctx) diff --git a/src/types.ts b/src/types.ts index 6cdaa92..e43bfbd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -90,11 +90,12 @@ export interface LLMResponse { * - `text` — incremental text delta * - `tool_use` — the model has begun or completed a tool-use block * - `tool_result` — a tool result has been appended to the stream + * - `budget_exceeded` — token budget threshold reached for this run * - `done` — the stream has ended; `data` is the final {@link LLMResponse} * - `error` — an unrecoverable error occurred; `data` is an `Error` */ export interface StreamEvent { - readonly type: 'text' | 'tool_use' | 'tool_result' | 'loop_detected' | 'done' | 'error' + readonly type: 'text' | 'tool_use' | 'tool_result' | 'loop_detected' | 'budget_exceeded' | 'done' | 'error' readonly data: unknown } diff --git a/tests/token-budget.test.ts b/tests/token-budget.test.ts index 3e47c21..72cde15 100644 --- a/tests/token-budget.test.ts +++ b/tests/token-budget.test.ts @@ -1,5 +1,8 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js' +import { Agent } from '../src/agent/agent.js' +import { ToolRegistry } from '../src/tool/framework.js' +import { ToolExecutor } from '../src/tool/executor.js' import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse, OrchestratorEvent } from '../src/types.js' let mockAdapterResponses: string[] = [] @@ -59,9 +62,70 @@ describe('token budget enforcement', () => { expect(result.success).toBe(false) expect(result.budgetExceeded).toBe(true) + expect(result.messages).toHaveLength(1) + expect(result.messages[0]?.role).toBe('assistant') + expect(result.messages[0]?.content[0]).toMatchObject({ type: 'text', text: 'over budget' }) expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) }) + it('emits budget_exceeded stream event without error transition', async () => { + mockAdapterResponses = ['over budget'] + mockAdapterUsage = [{ input_tokens: 20, output_tokens: 15 }] + + const agent = new Agent( + agentConfig('streamer', 30), + new ToolRegistry(), + new ToolExecutor(new ToolRegistry()), + ) + + const eventTypes: string[] = [] + for await (const event of agent.stream('test')) { + eventTypes.push(event.type) + } + + expect(eventTypes).toContain('budget_exceeded') + expect(eventTypes).toContain('done') + expect(eventTypes).not.toContain('error') + expect(agent.getState().status).toBe('completed') + }) + + it('does not skip in-progress sibling tasks when team budget is exceeded mid-batch', async () => { + mockAdapterResponses = ['done-a', 'done-b', 'done-c'] + mockAdapterUsage = [ + { input_tokens: 15, output_tokens: 10 }, // A => 25 + { input_tokens: 15, output_tokens: 10 }, // B => 50 total (exceeds 40) + { input_tokens: 15, output_tokens: 10 }, // C should never run + ] + + const events: OrchestratorEvent[] = [] + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + maxTokenBudget: 40, + onProgress: e => events.push(e), + }) + const team = oma.createTeam('team-siblings', { + name: 'team-siblings', + agents: [agentConfig('worker-a'), agentConfig('worker-b')], + sharedMemory: false, + }) + + await oma.runTasks(team, [ + { title: 'Task A', description: 'A', assignee: 'worker-a' }, + { title: 'Task B', description: 'B', assignee: 'worker-b' }, + { title: 'Task C', description: 'C', assignee: 'worker-a', dependsOn: ['Task A'] }, + ]) + + const completedTaskIds = new Set( + events.filter(e => e.type === 'task_complete').map(e => e.task).filter(Boolean) as string[], + ) + const skippedTaskIds = new Set( + events.filter(e => e.type === 'task_skipped').map(e => e.task).filter(Boolean) as string[], + ) + + const overlap = [...completedTaskIds].filter(id => skippedTaskIds.has(id)) + expect(overlap).toHaveLength(0) + }) + it('does not trigger budget events when budget is not exceeded', async () => { mockAdapterResponses = ['done-a', 'done-b'] mockAdapterUsage = [ @@ -147,7 +211,7 @@ describe('token budget enforcement', () => { expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70) expect(events.some(e => e.type === 'budget_exceeded')).toBe(true) - expect(events.some(e => e.type === 'task_skipped')).toBe(true) + expect(events.some(e => e.type === 'error')).toBe(true) }) it('enforces orchestrator budget in runTeam', async () => {