feat: add budget exceeded event handling in agent and orchestrator
This commit is contained in:
parent
bad083f48c
commit
c563d6c836
|
|
@ -222,6 +222,7 @@ export class AgentRunner {
|
||||||
* - `{ type: 'text', data: string }` for each text delta
|
* - `{ type: 'text', data: string }` for each text delta
|
||||||
* - `{ type: 'tool_use', data: ToolUseBlock }` when the model requests a tool
|
* - `{ type: 'tool_use', data: ToolUseBlock }` when the model requests a tool
|
||||||
* - `{ type: 'tool_result', data: ToolResultBlock }` after each execution
|
* - `{ type: 'tool_result', data: ToolResultBlock }` after each execution
|
||||||
|
* - `{ type: 'budget_exceeded', data: TokenBudgetExceededError }` on budget trip
|
||||||
* - `{ type: 'done', data: RunResult }` at the very end
|
* - `{ type: 'done', data: RunResult }` at the very end
|
||||||
* - `{ type: 'error', data: Error }` on unrecoverable failure
|
* - `{ type: 'error', data: Error }` on unrecoverable failure
|
||||||
*/
|
*/
|
||||||
|
|
@ -306,20 +307,6 @@ export class AgentRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
totalUsage = addTokenUsage(totalUsage, response.usage)
|
totalUsage = addTokenUsage(totalUsage, response.usage)
|
||||||
const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens
|
|
||||||
if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) {
|
|
||||||
budgetExceeded = true
|
|
||||||
finalOutput = extractText(response.content)
|
|
||||||
yield {
|
|
||||||
type: 'error',
|
|
||||||
data: new TokenBudgetExceededError(
|
|
||||||
this.options.agentName ?? 'unknown',
|
|
||||||
totalTokens,
|
|
||||||
this.options.maxTokenBudget,
|
|
||||||
),
|
|
||||||
} satisfies StreamEvent
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------------------------------------------------------------------
|
// ------------------------------------------------------------------
|
||||||
// Step 2: Build the assistant message from the response content.
|
// Step 2: Build the assistant message from the response content.
|
||||||
|
|
@ -338,6 +325,21 @@ export class AgentRunner {
|
||||||
yield { type: 'text', data: turnText } satisfies StreamEvent
|
yield { type: 'text', data: turnText } satisfies StreamEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const totalTokens = totalUsage.input_tokens + totalUsage.output_tokens
|
||||||
|
if (this.options.maxTokenBudget !== undefined && totalTokens > this.options.maxTokenBudget) {
|
||||||
|
budgetExceeded = true
|
||||||
|
finalOutput = turnText
|
||||||
|
yield {
|
||||||
|
type: 'budget_exceeded',
|
||||||
|
data: new TokenBudgetExceededError(
|
||||||
|
this.options.agentName ?? 'unknown',
|
||||||
|
totalTokens,
|
||||||
|
this.options.maxTokenBudget,
|
||||||
|
),
|
||||||
|
} satisfies StreamEvent
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// Extract tool-use blocks for detection and execution.
|
// Extract tool-use blocks for detection and execution.
|
||||||
const toolUseBlocks = extractToolUseBlocks(response.content)
|
const toolUseBlocks = extractToolUseBlocks(response.content)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -276,6 +276,7 @@ interface RunContext {
|
||||||
cumulativeUsage: TokenUsage
|
cumulativeUsage: TokenUsage
|
||||||
readonly maxTokenBudget?: number
|
readonly maxTokenBudget?: number
|
||||||
budgetExceededTriggered: boolean
|
budgetExceededTriggered: boolean
|
||||||
|
budgetExceededReason?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -428,13 +429,13 @@ async function executeQueue(
|
||||||
) {
|
) {
|
||||||
ctx.budgetExceededTriggered = true
|
ctx.budgetExceededTriggered = true
|
||||||
const err = new TokenBudgetExceededError('orchestrator', totalTokens, ctx.maxTokenBudget)
|
const err = new TokenBudgetExceededError('orchestrator', totalTokens, ctx.maxTokenBudget)
|
||||||
|
ctx.budgetExceededReason = err.message
|
||||||
config.onProgress?.({
|
config.onProgress?.({
|
||||||
type: 'budget_exceeded',
|
type: 'budget_exceeded',
|
||||||
agent: assignee,
|
agent: assignee,
|
||||||
task: task.id,
|
task: task.id,
|
||||||
data: err,
|
data: err,
|
||||||
} satisfies OrchestratorEvent)
|
} satisfies OrchestratorEvent)
|
||||||
queue.skipRemaining(err.message)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.success) {
|
if (result.success) {
|
||||||
|
|
@ -474,6 +475,7 @@ async function executeQueue(
|
||||||
// Wait for the entire parallel batch before checking for newly-unblocked tasks.
|
// Wait for the entire parallel batch before checking for newly-unblocked tasks.
|
||||||
await Promise.all(dispatchPromises)
|
await Promise.all(dispatchPromises)
|
||||||
if (ctx.budgetExceededTriggered) {
|
if (ctx.budgetExceededTriggered) {
|
||||||
|
queue.skipRemaining(ctx.budgetExceededReason ?? 'Skipped: token budget exceeded.')
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -790,6 +792,7 @@ export class OpenMultiAgent {
|
||||||
cumulativeUsage,
|
cumulativeUsage,
|
||||||
maxTokenBudget,
|
maxTokenBudget,
|
||||||
budgetExceededTriggered: false,
|
budgetExceededTriggered: false,
|
||||||
|
budgetExceededReason: undefined,
|
||||||
}
|
}
|
||||||
|
|
||||||
await executeQueue(queue, ctx)
|
await executeQueue(queue, ctx)
|
||||||
|
|
@ -899,6 +902,7 @@ export class OpenMultiAgent {
|
||||||
cumulativeUsage: ZERO_USAGE,
|
cumulativeUsage: ZERO_USAGE,
|
||||||
maxTokenBudget: this.config.maxTokenBudget,
|
maxTokenBudget: this.config.maxTokenBudget,
|
||||||
budgetExceededTriggered: false,
|
budgetExceededTriggered: false,
|
||||||
|
budgetExceededReason: undefined,
|
||||||
}
|
}
|
||||||
|
|
||||||
await executeQueue(queue, ctx)
|
await executeQueue(queue, ctx)
|
||||||
|
|
|
||||||
|
|
@ -90,11 +90,12 @@ export interface LLMResponse {
|
||||||
* - `text` — incremental text delta
|
* - `text` — incremental text delta
|
||||||
* - `tool_use` — the model has begun or completed a tool-use block
|
* - `tool_use` — the model has begun or completed a tool-use block
|
||||||
* - `tool_result` — a tool result has been appended to the stream
|
* - `tool_result` — a tool result has been appended to the stream
|
||||||
|
* - `budget_exceeded` — token budget threshold reached for this run
|
||||||
* - `done` — the stream has ended; `data` is the final {@link LLMResponse}
|
* - `done` — the stream has ended; `data` is the final {@link LLMResponse}
|
||||||
* - `error` — an unrecoverable error occurred; `data` is an `Error`
|
* - `error` — an unrecoverable error occurred; `data` is an `Error`
|
||||||
*/
|
*/
|
||||||
export interface StreamEvent {
|
export interface StreamEvent {
|
||||||
readonly type: 'text' | 'tool_use' | 'tool_result' | 'loop_detected' | 'done' | 'error'
|
readonly type: 'text' | 'tool_use' | 'tool_result' | 'loop_detected' | 'budget_exceeded' | 'done' | 'error'
|
||||||
readonly data: unknown
|
readonly data: unknown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,8 @@
|
||||||
import { describe, it, expect, vi, beforeEach } from 'vitest'
|
import { describe, it, expect, vi, beforeEach } from 'vitest'
|
||||||
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
|
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
|
||||||
|
import { Agent } from '../src/agent/agent.js'
|
||||||
|
import { ToolRegistry } from '../src/tool/framework.js'
|
||||||
|
import { ToolExecutor } from '../src/tool/executor.js'
|
||||||
import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse, OrchestratorEvent } from '../src/types.js'
|
import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse, OrchestratorEvent } from '../src/types.js'
|
||||||
|
|
||||||
let mockAdapterResponses: string[] = []
|
let mockAdapterResponses: string[] = []
|
||||||
|
|
@ -59,9 +62,70 @@ describe('token budget enforcement', () => {
|
||||||
|
|
||||||
expect(result.success).toBe(false)
|
expect(result.success).toBe(false)
|
||||||
expect(result.budgetExceeded).toBe(true)
|
expect(result.budgetExceeded).toBe(true)
|
||||||
|
expect(result.messages).toHaveLength(1)
|
||||||
|
expect(result.messages[0]?.role).toBe('assistant')
|
||||||
|
expect(result.messages[0]?.content[0]).toMatchObject({ type: 'text', text: 'over budget' })
|
||||||
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
|
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('emits budget_exceeded stream event without error transition', async () => {
|
||||||
|
mockAdapterResponses = ['over budget']
|
||||||
|
mockAdapterUsage = [{ input_tokens: 20, output_tokens: 15 }]
|
||||||
|
|
||||||
|
const agent = new Agent(
|
||||||
|
agentConfig('streamer', 30),
|
||||||
|
new ToolRegistry(),
|
||||||
|
new ToolExecutor(new ToolRegistry()),
|
||||||
|
)
|
||||||
|
|
||||||
|
const eventTypes: string[] = []
|
||||||
|
for await (const event of agent.stream('test')) {
|
||||||
|
eventTypes.push(event.type)
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(eventTypes).toContain('budget_exceeded')
|
||||||
|
expect(eventTypes).toContain('done')
|
||||||
|
expect(eventTypes).not.toContain('error')
|
||||||
|
expect(agent.getState().status).toBe('completed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('does not skip in-progress sibling tasks when team budget is exceeded mid-batch', async () => {
|
||||||
|
mockAdapterResponses = ['done-a', 'done-b', 'done-c']
|
||||||
|
mockAdapterUsage = [
|
||||||
|
{ input_tokens: 15, output_tokens: 10 }, // A => 25
|
||||||
|
{ input_tokens: 15, output_tokens: 10 }, // B => 50 total (exceeds 40)
|
||||||
|
{ input_tokens: 15, output_tokens: 10 }, // C should never run
|
||||||
|
]
|
||||||
|
|
||||||
|
const events: OrchestratorEvent[] = []
|
||||||
|
const oma = new OpenMultiAgent({
|
||||||
|
defaultModel: 'mock-model',
|
||||||
|
maxTokenBudget: 40,
|
||||||
|
onProgress: e => events.push(e),
|
||||||
|
})
|
||||||
|
const team = oma.createTeam('team-siblings', {
|
||||||
|
name: 'team-siblings',
|
||||||
|
agents: [agentConfig('worker-a'), agentConfig('worker-b')],
|
||||||
|
sharedMemory: false,
|
||||||
|
})
|
||||||
|
|
||||||
|
await oma.runTasks(team, [
|
||||||
|
{ title: 'Task A', description: 'A', assignee: 'worker-a' },
|
||||||
|
{ title: 'Task B', description: 'B', assignee: 'worker-b' },
|
||||||
|
{ title: 'Task C', description: 'C', assignee: 'worker-a', dependsOn: ['Task A'] },
|
||||||
|
])
|
||||||
|
|
||||||
|
const completedTaskIds = new Set(
|
||||||
|
events.filter(e => e.type === 'task_complete').map(e => e.task).filter(Boolean) as string[],
|
||||||
|
)
|
||||||
|
const skippedTaskIds = new Set(
|
||||||
|
events.filter(e => e.type === 'task_skipped').map(e => e.task).filter(Boolean) as string[],
|
||||||
|
)
|
||||||
|
|
||||||
|
const overlap = [...completedTaskIds].filter(id => skippedTaskIds.has(id))
|
||||||
|
expect(overlap).toHaveLength(0)
|
||||||
|
})
|
||||||
|
|
||||||
it('does not trigger budget events when budget is not exceeded', async () => {
|
it('does not trigger budget events when budget is not exceeded', async () => {
|
||||||
mockAdapterResponses = ['done-a', 'done-b']
|
mockAdapterResponses = ['done-a', 'done-b']
|
||||||
mockAdapterUsage = [
|
mockAdapterUsage = [
|
||||||
|
|
@ -147,7 +211,7 @@ describe('token budget enforcement', () => {
|
||||||
|
|
||||||
expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70)
|
expect(result.totalTokenUsage.input_tokens + result.totalTokenUsage.output_tokens).toBe(70)
|
||||||
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
|
expect(events.some(e => e.type === 'budget_exceeded')).toBe(true)
|
||||||
expect(events.some(e => e.type === 'task_skipped')).toBe(true)
|
expect(events.some(e => e.type === 'error')).toBe(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('enforces orchestrator budget in runTeam', async () => {
|
it('enforces orchestrator budget in runTeam', async () => {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue