feat: agent delegation mechanism (#123)
Implements `delegate_to_agent` built-in tool (closes #63). Opt-in registration via `includeDelegateTool`; only wired up by `runTeam` / `runTasks` for pool workers. Guards: self-delegation, unknown target, cycle detection via `delegationChain`, depth cap (`maxDelegationDepth`, default 3), pool deadlock. Delegation runs on ephemeral Agent instances via `AgentPool.runEphemeral` (pool semaphore only, no per-agent lock) so mutual delegation (A→B while B→A) can't deadlock. Delegated run `tokenUsage` surfaces via `ToolResult.metadata` and rolls into the parent runner's total before the next budget check; delegation tool_result blocks are exempt from `compressToolResults` and the `compact` strategy. Best-effort SharedMemory audit writes at `{caller}/delegation:{target}:{ts}-{rand}`. Picks up @NamelessNATM's work from #84 and adds cycle detection, token aggregation, compression exemption, mutual-delegation deadlock fix (Codex P1), and tool_result-preservation on budget-exceeded (Codex P2). Co-authored-by: NamelessNATM <hamzarstar@gmail.com>
This commit is contained in:
parent
eea87e2b81
commit
b857c001a8
16
CLAUDE.md
16
CLAUDE.md
|
|
@ -74,7 +74,21 @@ Optional `maxRetries`, `retryDelayMs`, `retryBackoff` on task config (used via `
|
|||
|
||||
### Built-in Tools
|
||||
|
||||
`bash`, `file_read`, `file_write`, `file_edit`, `grep` — registered via `registerBuiltInTools(registry)`.
|
||||
`bash`, `file_read`, `file_write`, `file_edit`, `grep`, `glob` — registered via `registerBuiltInTools(registry)`. `delegate_to_agent` is opt-in (`registerBuiltInTools(registry, { includeDelegateTool: true })`) and only wired up inside pool workers by `runTeam`/`runTasks` — see "Agent Delegation" below.
|
||||
|
||||
### Agent Delegation
|
||||
|
||||
`delegate_to_agent` (in `src/tool/built-in/delegate.ts`) lets an agent synchronously hand a sub-prompt to another roster agent and receive its final output as a tool result. Only active during orchestrated runs; standalone `runAgent` and the `runTeam` short-circuit path (`isSimpleGoal` hit) do not inject it.
|
||||
|
||||
Guards (all enforced in the tool itself, before `runDelegatedAgent` is called):
|
||||
|
||||
- **Self-delegation:** rejected (`target === context.agent.name`)
|
||||
- **Unknown agent:** rejected (target not in team roster)
|
||||
- **Cycle detection:** rejected if target already in `TeamInfo.delegationChain` (prevents `A → B → A` from burning tokens up to the depth cap)
|
||||
- **Depth cap:** `OrchestratorConfig.maxDelegationDepth` (default 3)
|
||||
- **Pool deadlock:** rejected when `AgentPool.availableRunSlots < 1`, without calling the pool
|
||||
|
||||
The delegated run's `AgentRunResult.tokenUsage` is surfaced via `ToolResult.metadata.tokenUsage`; the runner accumulates it into `totalUsage` before the next `maxTokenBudget` check, so delegation cannot silently bypass the parent's budget. Delegation tool_result blocks are exempt from `compressToolResults` and the `compact` context strategy so the parent agent retains the full sub-agent output across turns. Best-effort SharedMemory audit writes at `{caller}/delegation:{target}:{timestamp}-{rand}` if the team has shared memory enabled.
|
||||
|
||||
### Adding an LLM Adapter
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Example 16 — Synchronous agent handoff via `delegate_to_agent`
|
||||
*
|
||||
* During `runTeam` / `runTasks`, pool agents register the built-in
|
||||
* `delegate_to_agent` tool so one specialist can run a sub-prompt on another
|
||||
* roster agent and read the answer in the same conversation turn.
|
||||
*
|
||||
* Whitelist `delegate_to_agent` in `tools` when you want the model to see it;
|
||||
* standalone `runAgent()` does not register this tool by default.
|
||||
*
|
||||
* Run:
|
||||
* npx tsx examples/16-agent-handoff.ts
|
||||
*
|
||||
* Prerequisites:
|
||||
* ANTHROPIC_API_KEY
|
||||
*/
|
||||
|
||||
import { OpenMultiAgent } from '../src/index.js'
|
||||
import type { AgentConfig } from '../src/types.js'
|
||||
|
||||
const researcher: AgentConfig = {
|
||||
name: 'researcher',
|
||||
model: 'claude-sonnet-4-6',
|
||||
provider: 'anthropic',
|
||||
systemPrompt:
|
||||
'You answer factual questions briefly. When the user asks for a second opinion ' +
|
||||
'from the analyst, use delegate_to_agent to ask the analyst agent, then summarize both views.',
|
||||
tools: ['delegate_to_agent'],
|
||||
maxTurns: 6,
|
||||
}
|
||||
|
||||
const analyst: AgentConfig = {
|
||||
name: 'analyst',
|
||||
model: 'claude-sonnet-4-6',
|
||||
provider: 'anthropic',
|
||||
systemPrompt: 'You give short, skeptical analysis of claims. Push back when evidence is weak.',
|
||||
tools: [],
|
||||
maxTurns: 4,
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const orchestrator = new OpenMultiAgent({ maxConcurrency: 2 })
|
||||
const team = orchestrator.createTeam('handoff-demo', {
|
||||
name: 'handoff-demo',
|
||||
agents: [researcher, analyst],
|
||||
sharedMemory: true,
|
||||
})
|
||||
|
||||
const goal =
|
||||
'In one paragraph: state a simple fact about photosynthesis. ' +
|
||||
'Then ask the analyst (via delegate_to_agent) for a one-sentence critique of overstated claims in popular science. ' +
|
||||
'Merge both into a final short answer.'
|
||||
|
||||
const result = await orchestrator.runTeam(team, goal)
|
||||
console.log('Success:', result.success)
|
||||
for (const [name, ar] of result.agentResults) {
|
||||
console.log(`\n--- ${name} ---\n${ar.output.slice(0, 2000)}`)
|
||||
}
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(err)
|
||||
process.exit(1)
|
||||
})
|
||||
|
|
@ -77,6 +77,16 @@ export class AgentPool {
|
|||
this.semaphore = new Semaphore(maxConcurrency)
|
||||
}
|
||||
|
||||
/**
|
||||
* Pool semaphore slots not currently held (`maxConcurrency - active`).
|
||||
* Used to avoid deadlocks when a nested `run()` would wait forever for a slot
|
||||
* held by the parent run. Best-effort only if multiple nested runs start in
|
||||
* parallel after the same synchronous check.
|
||||
*/
|
||||
get availableRunSlots(): number {
|
||||
return this.maxConcurrency - this.semaphore.active
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Registry operations
|
||||
// -------------------------------------------------------------------------
|
||||
|
|
@ -157,6 +167,32 @@ export class AgentPool {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a prompt on a caller-supplied Agent instance, acquiring only the pool
|
||||
* semaphore — no per-agent lock, no registry lookup.
|
||||
*
|
||||
* Designed for delegation: each delegated call should use a **fresh** Agent
|
||||
* instance (matching `delegate_to_agent`'s "runs in a fresh conversation"
|
||||
* semantics), so the per-agent mutex used by {@link run} would be dead
|
||||
* weight and, worse, a deadlock vector for mutual delegation (A→B while
|
||||
* B→A, each caller holding its own `run`'s agent lock).
|
||||
*
|
||||
* The caller is responsible for constructing the Agent; {@link AgentPool}
|
||||
* does not register or track it.
|
||||
*/
|
||||
async runEphemeral(
|
||||
agent: Agent,
|
||||
prompt: string,
|
||||
runOptions?: Partial<RunOptions>,
|
||||
): Promise<AgentRunResult> {
|
||||
await this.semaphore.acquire()
|
||||
try {
|
||||
return await agent.run(prompt, runOptions)
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run prompts on multiple agents in parallel, subject to the concurrency
|
||||
* cap set at construction time.
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import type {
|
|||
StreamEvent,
|
||||
ToolResult,
|
||||
ToolUseContext,
|
||||
TeamInfo,
|
||||
LLMAdapter,
|
||||
LLMChatOptions,
|
||||
TraceEvent,
|
||||
|
|
@ -134,6 +135,11 @@ export interface RunOptions {
|
|||
* {@link RunnerOptions.abortSignal}. Useful for per-run timeouts.
|
||||
*/
|
||||
readonly abortSignal?: AbortSignal
|
||||
/**
|
||||
* Team context for built-in tools such as `delegate_to_agent`.
|
||||
* Injected by the orchestrator during `runTeam` / `runTasks` pool runs.
|
||||
*/
|
||||
readonly team?: TeamInfo
|
||||
}
|
||||
|
||||
/** The aggregated result returned when a full run completes. */
|
||||
|
|
@ -733,11 +739,12 @@ export class AgentRunner {
|
|||
// Parallel execution is critical for multi-tool responses where the
|
||||
// tools are independent (e.g. reading several files at once).
|
||||
// ------------------------------------------------------------------
|
||||
const toolContext: ToolUseContext = this.buildToolContext(effectiveAbortSignal)
|
||||
const toolContext: ToolUseContext = this.buildToolContext(options)
|
||||
|
||||
const executionPromises = toolUseBlocks.map(async (block): Promise<{
|
||||
resultBlock: ToolResultBlock
|
||||
record: ToolCallRecord
|
||||
delegationUsage?: TokenUsage
|
||||
}> => {
|
||||
options.onToolCall?.(block.name, block.input)
|
||||
|
||||
|
|
@ -789,12 +796,30 @@ export class AgentRunner {
|
|||
is_error: result.isError,
|
||||
}
|
||||
|
||||
return { resultBlock, record }
|
||||
return {
|
||||
resultBlock,
|
||||
record,
|
||||
...(result.metadata?.tokenUsage !== undefined
|
||||
? { delegationUsage: result.metadata.tokenUsage }
|
||||
: {}),
|
||||
}
|
||||
})
|
||||
|
||||
// Wait for every tool in this turn to finish.
|
||||
const executions = await Promise.all(executionPromises)
|
||||
|
||||
// Roll up any nested-run token usage surfaced via ToolResult.metadata
|
||||
// (e.g. from delegate_to_agent) so it counts against this agent's budget.
|
||||
let delegationTurnUsage: TokenUsage | undefined
|
||||
for (const ex of executions) {
|
||||
if (ex.delegationUsage !== undefined) {
|
||||
totalUsage = addTokenUsage(totalUsage, ex.delegationUsage)
|
||||
delegationTurnUsage = delegationTurnUsage === undefined
|
||||
? ex.delegationUsage
|
||||
: addTokenUsage(delegationTurnUsage, ex.delegationUsage)
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Step 5: Accumulate results and build the user message that carries
|
||||
// them back to the LLM in the next turn.
|
||||
|
|
@ -828,6 +853,27 @@ export class AgentRunner {
|
|||
conversationMessages.push(toolResultMessage)
|
||||
options.onMessage?.(toolResultMessage)
|
||||
|
||||
// Budget check is deferred until tool_result events have been yielded
|
||||
// and the tool_result user message has been appended, so stream
|
||||
// consumers see matched tool_use/tool_result pairs and the returned
|
||||
// `messages` remain resumable against the Anthropic/OpenAI APIs.
|
||||
if (delegationTurnUsage !== undefined && this.options.maxTokenBudget !== undefined) {
|
||||
const totalAfterDelegation = totalUsage.input_tokens + totalUsage.output_tokens
|
||||
if (totalAfterDelegation > this.options.maxTokenBudget) {
|
||||
budgetExceeded = true
|
||||
finalOutput = turnText
|
||||
yield {
|
||||
type: 'budget_exceeded',
|
||||
data: new TokenBudgetExceededError(
|
||||
this.options.agentName ?? 'unknown',
|
||||
totalAfterDelegation,
|
||||
this.options.maxTokenBudget,
|
||||
),
|
||||
} satisfies StreamEvent
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Loop back to Step 1 — send updated conversation to the LLM.
|
||||
}
|
||||
} catch (err) {
|
||||
|
|
@ -968,8 +1014,10 @@ export class AgentRunner {
|
|||
}
|
||||
// Short results: preserve.
|
||||
if (block.content.length < minToolResultChars) return block
|
||||
// Compress.
|
||||
const toolName = toolNameMap.get(block.tool_use_id) ?? 'unknown'
|
||||
// Delegation results: preserve — parent agent may still reason over them.
|
||||
if (toolName === 'delegate_to_agent') return block
|
||||
// Compress.
|
||||
msgChanged = true
|
||||
return {
|
||||
type: 'tool_result',
|
||||
|
|
@ -1024,6 +1072,16 @@ export class AgentRunner {
|
|||
// Nothing to compress if there's at most one tool-result user message.
|
||||
if (lastToolResultUserIdx <= 0) return messages
|
||||
|
||||
// Build a tool_use_id → tool name map so we can exempt delegation results,
|
||||
// whose full output the parent agent may need to re-read in later turns.
|
||||
const toolNameMap = new Map<string, string>()
|
||||
for (const msg of messages) {
|
||||
if (msg.role !== 'assistant') continue
|
||||
for (const block of msg.content) {
|
||||
if (block.type === 'tool_use') toolNameMap.set(block.id, block.name)
|
||||
}
|
||||
}
|
||||
|
||||
let anyChanged = false
|
||||
const result = messages.map((msg, idx) => {
|
||||
// Only compress user messages that appear before the last one.
|
||||
|
|
@ -1039,6 +1097,9 @@ export class AgentRunner {
|
|||
// Never compress error results — they carry diagnostic value.
|
||||
if (block.is_error) return block
|
||||
|
||||
// Never compress delegation results — the parent agent relies on the full sub-agent output.
|
||||
if (toolNameMap.get(block.tool_use_id) === 'delegate_to_agent') return block
|
||||
|
||||
// Skip already-compressed results — avoid re-compression with wrong char count.
|
||||
if (block.content.startsWith('[Tool output compressed')) return block
|
||||
|
||||
|
|
@ -1067,14 +1128,15 @@ export class AgentRunner {
|
|||
* Build the {@link ToolUseContext} passed to every tool execution.
|
||||
* Identifies this runner as the invoking agent.
|
||||
*/
|
||||
private buildToolContext(abortSignal?: AbortSignal): ToolUseContext {
|
||||
private buildToolContext(options: RunOptions = {}): ToolUseContext {
|
||||
return {
|
||||
agent: {
|
||||
name: this.options.agentName ?? 'runner',
|
||||
role: this.options.agentRole ?? 'assistant',
|
||||
model: this.options.model,
|
||||
},
|
||||
abortSignal,
|
||||
abortSignal: options.abortSignal ?? this.options.abortSignal,
|
||||
...(options.team !== undefined ? { team: options.team } : {}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,13 +94,16 @@ export type { ToolExecutorOptions, BatchToolCall } from './tool/executor.js'
|
|||
export {
|
||||
registerBuiltInTools,
|
||||
BUILT_IN_TOOLS,
|
||||
ALL_BUILT_IN_TOOLS_WITH_DELEGATE,
|
||||
bashTool,
|
||||
delegateToAgentTool,
|
||||
fileReadTool,
|
||||
fileWriteTool,
|
||||
fileEditTool,
|
||||
globTool,
|
||||
grepTool,
|
||||
} from './tool/built-in/index.js'
|
||||
export type { RegisterBuiltInToolsOptions } from './tool/built-in/index.js'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// LLM adapters
|
||||
|
|
@ -145,6 +148,7 @@ export type {
|
|||
ToolUseContext,
|
||||
AgentInfo,
|
||||
TeamInfo,
|
||||
DelegationPoolView,
|
||||
|
||||
// Agent
|
||||
AgentConfig,
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ import type {
|
|||
Task,
|
||||
TaskStatus,
|
||||
TeamConfig,
|
||||
TeamInfo,
|
||||
TeamRunResult,
|
||||
TokenUsage,
|
||||
} from '../types.js'
|
||||
|
|
@ -73,6 +74,7 @@ import { extractKeywords, keywordScore } from '../utils/keywords.js'
|
|||
|
||||
const ZERO_USAGE: TokenUsage = { input_tokens: 0, output_tokens: 0 }
|
||||
const DEFAULT_MAX_CONCURRENCY = 5
|
||||
const DEFAULT_MAX_DELEGATION_DEPTH = 3
|
||||
const DEFAULT_MODEL = 'claude-opus-4-6'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -207,11 +209,14 @@ function resolveTokenBudget(primary?: number, fallback?: number): number | undef
|
|||
|
||||
/**
|
||||
* Build a minimal {@link Agent} with its own fresh registry/executor.
|
||||
* Registers all built-in tools so coordinator/worker agents can use them.
|
||||
* Pool workers pass `includeDelegateTool` so `delegate_to_agent` is available during `runTeam` / `runTasks`.
|
||||
*/
|
||||
function buildAgent(config: AgentConfig): Agent {
|
||||
function buildAgent(
|
||||
config: AgentConfig,
|
||||
toolRegistration?: { readonly includeDelegateTool?: boolean },
|
||||
): Agent {
|
||||
const registry = new ToolRegistry()
|
||||
registerBuiltInTools(registry)
|
||||
registerBuiltInTools(registry, toolRegistration)
|
||||
if (config.customTools) {
|
||||
for (const tool of config.customTools) {
|
||||
registry.register(tool, { runtimeAdded: true })
|
||||
|
|
@ -411,6 +416,91 @@ interface RunContext {
|
|||
budgetExceededReason?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Build {@link TeamInfo} for tool context, including nested `runDelegatedAgent`
|
||||
* that respects pool capacity to avoid semaphore deadlocks.
|
||||
*
|
||||
* Delegation always builds a **fresh** Agent instance for the target and runs
|
||||
* it via `pool.runEphemeral` — the pool semaphore still gates total concurrency,
|
||||
* but the per-agent lock is bypassed. This matches `delegate_to_agent`'s "runs
|
||||
* in a fresh conversation for this prompt only" contract and prevents mutual
|
||||
* delegation (A→B while B→A) from deadlocking on each other's agent locks.
|
||||
*/
|
||||
function buildTaskAgentTeamInfo(
|
||||
ctx: RunContext,
|
||||
taskId: string,
|
||||
traceBase: Partial<RunOptions>,
|
||||
delegationDepth: number,
|
||||
delegationChain: readonly string[],
|
||||
): TeamInfo {
|
||||
const sharedMem = ctx.team.getSharedMemoryInstance()
|
||||
const maxDepth = ctx.config.maxDelegationDepth
|
||||
const agentConfigs = ctx.team.getAgents()
|
||||
const agentNames = agentConfigs.map((a) => a.name)
|
||||
|
||||
const runDelegatedAgent = async (targetAgent: string, prompt: string): Promise<AgentRunResult> => {
|
||||
const pool = ctx.pool
|
||||
if (pool.availableRunSlots < 1) {
|
||||
return {
|
||||
success: false,
|
||||
output:
|
||||
'Agent pool has no free concurrency slot for a delegated run (would deadlock). ' +
|
||||
'Increase maxConcurrency or reduce parallel delegation.',
|
||||
messages: [],
|
||||
tokenUsage: ZERO_USAGE,
|
||||
toolCalls: [],
|
||||
}
|
||||
}
|
||||
|
||||
const targetConfig = agentConfigs.find((a) => a.name === targetAgent)
|
||||
if (!targetConfig) {
|
||||
return {
|
||||
success: false,
|
||||
output: `Unknown agent "${targetAgent}" — not in team roster [${agentNames.join(', ')}].`,
|
||||
messages: [],
|
||||
tokenUsage: ZERO_USAGE,
|
||||
toolCalls: [],
|
||||
}
|
||||
}
|
||||
|
||||
// Apply orchestrator-level defaults just like buildPool, then construct a
|
||||
// one-shot Agent for this delegation only.
|
||||
const effective: AgentConfig = {
|
||||
...targetConfig,
|
||||
provider: targetConfig.provider ?? ctx.config.defaultProvider,
|
||||
baseURL: targetConfig.baseURL ?? ctx.config.defaultBaseURL,
|
||||
apiKey: targetConfig.apiKey ?? ctx.config.defaultApiKey,
|
||||
}
|
||||
const tempAgent = buildAgent(effective, { includeDelegateTool: true })
|
||||
|
||||
const nestedTeam = buildTaskAgentTeamInfo(
|
||||
ctx,
|
||||
taskId,
|
||||
traceBase,
|
||||
delegationDepth + 1,
|
||||
[...delegationChain, targetAgent],
|
||||
)
|
||||
const childOpts: Partial<RunOptions> = {
|
||||
...traceBase,
|
||||
traceAgent: targetAgent,
|
||||
taskId,
|
||||
team: nestedTeam,
|
||||
}
|
||||
return pool.runEphemeral(tempAgent, prompt, childOpts)
|
||||
}
|
||||
|
||||
return {
|
||||
name: ctx.team.name,
|
||||
agents: agentNames,
|
||||
...(sharedMem ? { sharedMemory: sharedMem.getStore() } : {}),
|
||||
delegationDepth,
|
||||
maxDelegationDepth: maxDepth,
|
||||
delegationPool: ctx.pool,
|
||||
delegationChain,
|
||||
runDelegatedAgent,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute all tasks in `queue` using agents in `pool`, respecting dependencies
|
||||
* and running independent tasks in parallel.
|
||||
|
|
@ -509,16 +599,28 @@ async function executeQueue(
|
|||
// Build the prompt: task description + dependency-only context by default.
|
||||
const prompt = await buildTaskPrompt(task, team, queue)
|
||||
|
||||
// Build trace context for this task's agent run
|
||||
const traceOptions: Partial<RunOptions> | undefined = config.onTrace
|
||||
? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee, abortSignal: ctx.abortSignal }
|
||||
: ctx.abortSignal ? { abortSignal: ctx.abortSignal } : undefined
|
||||
// Trace + abort + team tool context (delegate_to_agent)
|
||||
const traceBase: Partial<RunOptions> = {
|
||||
...(config.onTrace
|
||||
? {
|
||||
onTrace: config.onTrace,
|
||||
runId: ctx.runId ?? '',
|
||||
taskId: task.id,
|
||||
traceAgent: assignee,
|
||||
}
|
||||
: {}),
|
||||
...(ctx.abortSignal ? { abortSignal: ctx.abortSignal } : {}),
|
||||
}
|
||||
const runOptions: Partial<RunOptions> = {
|
||||
...traceBase,
|
||||
team: buildTaskAgentTeamInfo(ctx, task.id, traceBase, 0, [assignee]),
|
||||
}
|
||||
|
||||
const taskStartMs = config.onTrace ? Date.now() : 0
|
||||
let retryCount = 0
|
||||
|
||||
const result = await executeWithRetry(
|
||||
() => pool.run(assignee, prompt, traceOptions),
|
||||
() => pool.run(assignee, prompt, runOptions),
|
||||
task,
|
||||
(retryData) => {
|
||||
retryCount++
|
||||
|
|
@ -711,12 +813,14 @@ export class OpenMultiAgent {
|
|||
*
|
||||
* Sensible defaults:
|
||||
* - `maxConcurrency`: 5
|
||||
* - `maxDelegationDepth`: 3
|
||||
* - `defaultModel`: `'claude-opus-4-6'`
|
||||
* - `defaultProvider`: `'anthropic'`
|
||||
*/
|
||||
constructor(config: OrchestratorConfig = {}) {
|
||||
this.config = {
|
||||
maxConcurrency: config.maxConcurrency ?? DEFAULT_MAX_CONCURRENCY,
|
||||
maxDelegationDepth: config.maxDelegationDepth ?? DEFAULT_MAX_DELEGATION_DEPTH,
|
||||
defaultModel: config.defaultModel ?? DEFAULT_MODEL,
|
||||
defaultProvider: config.defaultProvider ?? 'anthropic',
|
||||
defaultBaseURL: config.defaultBaseURL,
|
||||
|
|
@ -1409,7 +1513,7 @@ export class OpenMultiAgent {
|
|||
baseURL: config.baseURL ?? this.config.defaultBaseURL,
|
||||
apiKey: config.apiKey ?? this.config.defaultApiKey,
|
||||
}
|
||||
pool.add(buildAgent(effective))
|
||||
pool.add(buildAgent(effective, { includeDelegateTool: true }))
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* @fileoverview Built-in `delegate_to_agent` tool for synchronous handoff to a roster agent.
|
||||
*/
|
||||
|
||||
import { z } from 'zod'
|
||||
import type { ToolDefinition, ToolResult, ToolUseContext } from '../../types.js'
|
||||
|
||||
const inputSchema = z.object({
|
||||
target_agent: z.string().min(1).describe('Name of the team agent to run the sub-task.'),
|
||||
prompt: z.string().min(1).describe('Instructions / question for the target agent.'),
|
||||
})
|
||||
|
||||
/**
|
||||
* Delegates a sub-task to another agent on the team and returns that agent's final text output.
|
||||
*
|
||||
* Only available when the orchestrator injects {@link ToolUseContext.team} with
|
||||
* `runDelegatedAgent` (pool-backed `runTeam` / `runTasks`). Standalone `runAgent`
|
||||
* does not register this tool by default.
|
||||
*
|
||||
* Nested {@link AgentRunResult.tokenUsage} from the delegated run is surfaced via
|
||||
* {@link ToolResult.metadata} so the parent runner can aggregate it into its total
|
||||
* (keeps `maxTokenBudget` accurate across delegation chains).
|
||||
*/
|
||||
export const delegateToAgentTool: ToolDefinition<z.infer<typeof inputSchema>> = {
|
||||
name: 'delegate_to_agent',
|
||||
description:
|
||||
'Run a sub-task on another agent from this team and return that agent\'s final answer as the tool result. ' +
|
||||
'Use when you need a specialist teammate to produce output you will incorporate. ' +
|
||||
'The target agent runs in a fresh conversation for this prompt only.',
|
||||
inputSchema,
|
||||
async execute(
|
||||
{ target_agent: targetAgent, prompt },
|
||||
context: ToolUseContext,
|
||||
): Promise<ToolResult> {
|
||||
const team = context.team
|
||||
if (!team?.runDelegatedAgent) {
|
||||
return {
|
||||
data:
|
||||
'delegate_to_agent is only available during orchestrated team runs with the delegation tool enabled. ' +
|
||||
'Use SharedMemory or explicit tasks instead.',
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
if (targetAgent === context.agent.name) {
|
||||
return {
|
||||
data: 'Cannot delegate to yourself; use another team member.',
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
if (!team.agents.includes(targetAgent)) {
|
||||
return {
|
||||
data: `Unknown agent "${targetAgent}". Roster: ${team.agents.join(', ')}`,
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
const chain = team.delegationChain ?? []
|
||||
if (chain.includes(targetAgent)) {
|
||||
return {
|
||||
data:
|
||||
`Delegation cycle detected: ${[...chain, targetAgent].join(' -> ')}. ` +
|
||||
'Pick a different target or restructure the plan.',
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
const depth = team.delegationDepth ?? 0
|
||||
const maxDepth = team.maxDelegationDepth ?? 3
|
||||
if (depth >= maxDepth) {
|
||||
return {
|
||||
data: `Maximum delegation depth (${maxDepth}) reached; cannot delegate further.`,
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
if (team.delegationPool !== undefined && team.delegationPool.availableRunSlots < 1) {
|
||||
return {
|
||||
data:
|
||||
'Agent pool has no free concurrency slot for a delegated run (nested run would block indefinitely). ' +
|
||||
'Increase orchestrator maxConcurrency, wait for parallel work to finish, or avoid delegating while the pool is saturated.',
|
||||
isError: true,
|
||||
}
|
||||
}
|
||||
|
||||
const result = await team.runDelegatedAgent(targetAgent, prompt)
|
||||
|
||||
if (team.sharedMemory) {
|
||||
const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`
|
||||
const key = `delegation:${targetAgent}:${suffix}`
|
||||
try {
|
||||
await team.sharedMemory.set(`${context.agent.name}/${key}`, result.output, {
|
||||
agent: context.agent.name,
|
||||
delegatedTo: targetAgent,
|
||||
success: String(result.success),
|
||||
})
|
||||
} catch {
|
||||
// Audit is best-effort; do not fail the tool on store errors.
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
data: result.output,
|
||||
isError: !result.success,
|
||||
metadata: { tokenUsage: result.tokenUsage },
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -8,13 +8,23 @@
|
|||
import type { ToolDefinition } from '../../types.js'
|
||||
import { ToolRegistry } from '../framework.js'
|
||||
import { bashTool } from './bash.js'
|
||||
import { delegateToAgentTool } from './delegate.js'
|
||||
import { fileEditTool } from './file-edit.js'
|
||||
import { fileReadTool } from './file-read.js'
|
||||
import { fileWriteTool } from './file-write.js'
|
||||
import { globTool } from './glob.js'
|
||||
import { grepTool } from './grep.js'
|
||||
|
||||
export { bashTool, fileEditTool, fileReadTool, fileWriteTool, globTool, grepTool }
|
||||
export { bashTool, delegateToAgentTool, fileEditTool, fileReadTool, fileWriteTool, globTool, grepTool }
|
||||
|
||||
/** Options for {@link registerBuiltInTools}. */
|
||||
export interface RegisterBuiltInToolsOptions {
|
||||
/**
|
||||
* When true, registers `delegate_to_agent` (team orchestration handoff).
|
||||
* Default false so standalone agents and `runAgent` do not expose a tool that always errors.
|
||||
*/
|
||||
readonly includeDelegateTool?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* The ordered list of all built-in tools. Import this when you need to
|
||||
|
|
@ -33,6 +43,12 @@ export const BUILT_IN_TOOLS: ToolDefinition<any>[] = [
|
|||
globTool,
|
||||
]
|
||||
|
||||
/** All built-ins including `delegate_to_agent` (for team registry setup). */
|
||||
export const ALL_BUILT_IN_TOOLS_WITH_DELEGATE: ToolDefinition<any>[] = [
|
||||
...BUILT_IN_TOOLS,
|
||||
delegateToAgentTool,
|
||||
]
|
||||
|
||||
/**
|
||||
* Register all built-in tools with the given registry.
|
||||
*
|
||||
|
|
@ -45,8 +61,14 @@ export const BUILT_IN_TOOLS: ToolDefinition<any>[] = [
|
|||
* registerBuiltInTools(registry)
|
||||
* ```
|
||||
*/
|
||||
export function registerBuiltInTools(registry: ToolRegistry): void {
|
||||
export function registerBuiltInTools(
|
||||
registry: ToolRegistry,
|
||||
options?: RegisterBuiltInToolsOptions,
|
||||
): void {
|
||||
for (const tool of BUILT_IN_TOOLS) {
|
||||
registry.register(tool)
|
||||
}
|
||||
if (options?.includeDelegateTool) {
|
||||
registry.register(delegateToAgentTool)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
46
src/types.ts
46
src/types.ts
|
|
@ -178,17 +178,54 @@ export interface AgentInfo {
|
|||
readonly model: string
|
||||
}
|
||||
|
||||
/** Descriptor for a team of agents with shared memory. */
|
||||
/**
|
||||
* Minimal pool surface used by `delegate_to_agent` to detect nested-run capacity.
|
||||
* {@link AgentPool} satisfies this structurally via {@link AgentPool.availableRunSlots}.
|
||||
*/
|
||||
export interface DelegationPoolView {
|
||||
readonly availableRunSlots: number
|
||||
}
|
||||
|
||||
/** Descriptor for a team of agents (orchestrator-injected into tool context). */
|
||||
export interface TeamInfo {
|
||||
readonly name: string
|
||||
readonly agents: readonly string[]
|
||||
readonly sharedMemory: MemoryStore
|
||||
/** When the team has shared memory enabled; used for delegation audit writes. */
|
||||
readonly sharedMemory?: MemoryStore
|
||||
/** Zero-based depth of nested delegation from the root task run. */
|
||||
readonly delegationDepth?: number
|
||||
readonly maxDelegationDepth?: number
|
||||
readonly delegationPool?: DelegationPoolView
|
||||
/**
|
||||
* Ordered chain of agent names from the root task to the current agent.
|
||||
* Used to block `A -> B -> A` cycles before they burn turns against `maxDelegationDepth`.
|
||||
*/
|
||||
readonly delegationChain?: readonly string[]
|
||||
/**
|
||||
* Run another roster agent to completion and return its result.
|
||||
* Only set during orchestrated pool execution (`runTeam` / `runTasks`).
|
||||
*/
|
||||
readonly runDelegatedAgent?: (targetAgent: string, prompt: string) => Promise<AgentRunResult>
|
||||
}
|
||||
|
||||
/**
|
||||
* Optional side-channel metadata a tool may attach to its result.
|
||||
* Not shown to the LLM — the runner reads it for accounting purposes.
|
||||
*/
|
||||
export interface ToolResultMetadata {
|
||||
/**
|
||||
* Token usage consumed inside the tool execution itself (e.g. nested LLM
|
||||
* calls from `delegate_to_agent`). Accumulated into the parent runner's
|
||||
* total so budgets/cost tracking stay accurate across delegation.
|
||||
*/
|
||||
readonly tokenUsage?: TokenUsage
|
||||
}
|
||||
|
||||
/** Value returned by a tool's `execute` function. */
|
||||
export interface ToolResult {
|
||||
readonly data: string
|
||||
readonly isError?: boolean
|
||||
readonly metadata?: ToolResultMetadata
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -472,6 +509,11 @@ export interface OrchestratorEvent {
|
|||
/** Top-level configuration for the orchestrator. */
|
||||
export interface OrchestratorConfig {
|
||||
readonly maxConcurrency?: number
|
||||
/**
|
||||
* Maximum depth of `delegate_to_agent` chains from a task run (default `3`).
|
||||
* Depth is per nested delegated run, not per team.
|
||||
*/
|
||||
readonly maxDelegationDepth?: number
|
||||
/** Maximum cumulative tokens (input + output) allowed per orchestrator run. */
|
||||
readonly maxTokenBudget?: number
|
||||
readonly defaultModel?: string
|
||||
|
|
|
|||
|
|
@ -34,6 +34,11 @@ export class Semaphore {
|
|||
}
|
||||
}
|
||||
|
||||
/** Maximum concurrent holders configured for this semaphore. */
|
||||
get limit(): number {
|
||||
return this.max
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a slot. Resolves immediately when one is free, or waits until a
|
||||
* holder calls `release()`.
|
||||
|
|
|
|||
|
|
@ -291,5 +291,93 @@ describe('AgentPool', () => {
|
|||
|
||||
expect(maxConcurrent).toBeLessThanOrEqual(2)
|
||||
})
|
||||
|
||||
it('availableRunSlots matches maxConcurrency when idle', () => {
|
||||
const pool = new AgentPool(3)
|
||||
pool.add(createMockAgent('a'))
|
||||
expect(pool.availableRunSlots).toBe(3)
|
||||
})
|
||||
|
||||
it('availableRunSlots is zero while a run holds the pool slot', async () => {
|
||||
const pool = new AgentPool(1)
|
||||
const agent = createMockAgent('solo')
|
||||
pool.add(agent)
|
||||
|
||||
let finishRun!: (value: AgentRunResult) => void
|
||||
const holdPromise = new Promise<AgentRunResult>((resolve) => {
|
||||
finishRun = resolve
|
||||
})
|
||||
vi.mocked(agent.run).mockReturnValue(holdPromise)
|
||||
|
||||
const runPromise = pool.run('solo', 'hold-slot')
|
||||
await Promise.resolve()
|
||||
await Promise.resolve()
|
||||
expect(pool.availableRunSlots).toBe(0)
|
||||
|
||||
finishRun(SUCCESS_RESULT)
|
||||
await runPromise
|
||||
expect(pool.availableRunSlots).toBe(1)
|
||||
})
|
||||
|
||||
it('runEphemeral runs a caller-supplied Agent without touching the agentLock', async () => {
|
||||
// Registered agent's lock is held by a pending pool.run — a second
|
||||
// pool.run() against the same name would queue on the agent lock.
|
||||
// runEphemeral on a fresh Agent instance must NOT block on that lock.
|
||||
const pool = new AgentPool(3)
|
||||
const registered = createMockAgent('alice')
|
||||
pool.add(registered)
|
||||
|
||||
let releaseRegistered!: (v: AgentRunResult) => void
|
||||
vi.mocked(registered.run).mockReturnValue(
|
||||
new Promise<AgentRunResult>((resolve) => {
|
||||
releaseRegistered = resolve
|
||||
}),
|
||||
)
|
||||
const heldRun = pool.run('alice', 'long running')
|
||||
await Promise.resolve()
|
||||
await Promise.resolve()
|
||||
|
||||
const ephemeral = createMockAgent('alice') // same name, fresh instance
|
||||
const ephemeralResult = await pool.runEphemeral(ephemeral, 'quick task')
|
||||
|
||||
expect(ephemeralResult).toBe(SUCCESS_RESULT)
|
||||
expect(ephemeral.run).toHaveBeenCalledWith('quick task', undefined)
|
||||
|
||||
releaseRegistered(SUCCESS_RESULT)
|
||||
await heldRun
|
||||
})
|
||||
|
||||
it('runEphemeral still respects pool semaphore', async () => {
|
||||
const pool = new AgentPool(1)
|
||||
const holder = createMockAgent('holder')
|
||||
pool.add(holder)
|
||||
|
||||
let releaseHolder!: (v: AgentRunResult) => void
|
||||
vi.mocked(holder.run).mockReturnValue(
|
||||
new Promise<AgentRunResult>((resolve) => {
|
||||
releaseHolder = resolve
|
||||
}),
|
||||
)
|
||||
const heldRun = pool.run('holder', 'hold-slot')
|
||||
await Promise.resolve()
|
||||
await Promise.resolve()
|
||||
expect(pool.availableRunSlots).toBe(0)
|
||||
|
||||
// Ephemeral agent should queue on the semaphore, not run immediately.
|
||||
const ephemeral = createMockAgent('ephemeral')
|
||||
let ephemeralResolved = false
|
||||
const ephemeralRun = pool.runEphemeral(ephemeral, 'p').then((r) => {
|
||||
ephemeralResolved = true
|
||||
return r
|
||||
})
|
||||
await Promise.resolve()
|
||||
await Promise.resolve()
|
||||
expect(ephemeralResolved).toBe(false)
|
||||
|
||||
releaseHolder(SUCCESS_RESULT)
|
||||
await heldRun
|
||||
await ephemeralRun
|
||||
expect(ephemeralResolved).toBe(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { describe, it, expect, beforeEach, afterEach } from 'vitest'
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'
|
||||
import { mkdtemp, rm, writeFile, readFile } from 'fs/promises'
|
||||
import { join } from 'path'
|
||||
import { tmpdir } from 'os'
|
||||
|
|
@ -8,9 +8,14 @@ import { fileEditTool } from '../src/tool/built-in/file-edit.js'
|
|||
import { bashTool } from '../src/tool/built-in/bash.js'
|
||||
import { globTool } from '../src/tool/built-in/glob.js'
|
||||
import { grepTool } from '../src/tool/built-in/grep.js'
|
||||
import { registerBuiltInTools, BUILT_IN_TOOLS } from '../src/tool/built-in/index.js'
|
||||
import {
|
||||
registerBuiltInTools,
|
||||
BUILT_IN_TOOLS,
|
||||
delegateToAgentTool,
|
||||
} from '../src/tool/built-in/index.js'
|
||||
import { ToolRegistry } from '../src/tool/framework.js'
|
||||
import type { ToolUseContext } from '../src/types.js'
|
||||
import { InMemoryStore } from '../src/memory/store.js'
|
||||
import type { AgentRunResult, ToolUseContext } from '../src/types.js'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
|
|
@ -45,6 +50,13 @@ describe('registerBuiltInTools', () => {
|
|||
expect(registry.get('file_edit')).toBeDefined()
|
||||
expect(registry.get('grep')).toBeDefined()
|
||||
expect(registry.get('glob')).toBeDefined()
|
||||
expect(registry.get('delegate_to_agent')).toBeUndefined()
|
||||
})
|
||||
|
||||
it('registers delegate_to_agent when includeDelegateTool is set', () => {
|
||||
const registry = new ToolRegistry()
|
||||
registerBuiltInTools(registry, { includeDelegateTool: true })
|
||||
expect(registry.get('delegate_to_agent')).toBeDefined()
|
||||
})
|
||||
|
||||
it('BUILT_IN_TOOLS has correct length', () => {
|
||||
|
|
@ -489,3 +501,241 @@ describe('grep', () => {
|
|||
expect(result.data.toLowerCase()).toContain('no such file')
|
||||
})
|
||||
})
|
||||
|
||||
// ===========================================================================
|
||||
// delegate_to_agent
|
||||
// ===========================================================================
|
||||
|
||||
const DELEGATE_OK: AgentRunResult = {
|
||||
success: true,
|
||||
output: 'research done',
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 1, output_tokens: 2 },
|
||||
toolCalls: [],
|
||||
}
|
||||
|
||||
describe('delegate_to_agent', () => {
|
||||
it('returns delegated agent output on success', async () => {
|
||||
const runDelegatedAgent = vi.fn().mockResolvedValue(DELEGATE_OK)
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationDepth: 0,
|
||||
maxDelegationDepth: 3,
|
||||
delegationPool: { availableRunSlots: 2 },
|
||||
runDelegatedAgent,
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Summarize X.' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(false)
|
||||
expect(result.data).toBe('research done')
|
||||
expect(runDelegatedAgent).toHaveBeenCalledWith('bob', 'Summarize X.')
|
||||
})
|
||||
|
||||
it('errors when delegation would form a cycle (A -> B -> A)', async () => {
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'bob', role: 'worker', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationDepth: 1,
|
||||
maxDelegationDepth: 5,
|
||||
delegationChain: ['alice', 'bob'],
|
||||
delegationPool: { availableRunSlots: 2 },
|
||||
runDelegatedAgent: vi.fn().mockResolvedValue(DELEGATE_OK),
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'alice', prompt: 'loop back' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/Delegation cycle detected: alice -> bob -> alice/)
|
||||
expect(ctx.team!.runDelegatedAgent).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('surfaces delegated run tokenUsage via ToolResult.metadata', async () => {
|
||||
const runDelegatedAgent = vi.fn().mockResolvedValue({
|
||||
success: true,
|
||||
output: 'answer',
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 123, output_tokens: 45 },
|
||||
toolCalls: [],
|
||||
} satisfies AgentRunResult)
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationPool: { availableRunSlots: 2 },
|
||||
runDelegatedAgent,
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.metadata?.tokenUsage).toEqual({ input_tokens: 123, output_tokens: 45 })
|
||||
})
|
||||
|
||||
it('errors when delegation is not configured', async () => {
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: { name: 't', agents: ['alice', 'bob'] },
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/only available during orchestrated team runs/i)
|
||||
})
|
||||
|
||||
it('errors for unknown target agent', async () => {
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
runDelegatedAgent: vi.fn(),
|
||||
delegationPool: { availableRunSlots: 1 },
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'charlie', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/Unknown agent/)
|
||||
})
|
||||
|
||||
it('errors on self-delegation', async () => {
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
runDelegatedAgent: vi.fn(),
|
||||
delegationPool: { availableRunSlots: 1 },
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'alice', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/yourself/)
|
||||
})
|
||||
|
||||
it('errors when delegation depth limit is reached', async () => {
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationDepth: 3,
|
||||
maxDelegationDepth: 3,
|
||||
runDelegatedAgent: vi.fn(),
|
||||
delegationPool: { availableRunSlots: 1 },
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/Maximum delegation depth/)
|
||||
})
|
||||
|
||||
it('errors fast when pool has no free slots without calling runDelegatedAgent', async () => {
|
||||
const runDelegatedAgent = vi.fn()
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationPool: { availableRunSlots: 0 },
|
||||
runDelegatedAgent,
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toMatch(/no free concurrency slot/i)
|
||||
expect(runDelegatedAgent).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('writes unique SharedMemory audit keys for repeated delegations', async () => {
|
||||
const store = new InMemoryStore()
|
||||
const runDelegatedAgent = vi.fn().mockResolvedValue(DELEGATE_OK)
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
sharedMemory: store,
|
||||
delegationPool: { availableRunSlots: 2 },
|
||||
runDelegatedAgent,
|
||||
},
|
||||
}
|
||||
|
||||
await delegateToAgentTool.execute({ target_agent: 'bob', prompt: 'a' }, ctx)
|
||||
await delegateToAgentTool.execute({ target_agent: 'bob', prompt: 'b' }, ctx)
|
||||
|
||||
const keys = (await store.list()).map((e) => e.key)
|
||||
const delegationKeys = keys.filter((k) => k.includes('delegation:bob:'))
|
||||
expect(delegationKeys).toHaveLength(2)
|
||||
expect(delegationKeys[0]).not.toBe(delegationKeys[1])
|
||||
})
|
||||
|
||||
it('returns isError when delegated run reports success false', async () => {
|
||||
const runDelegatedAgent = vi.fn().mockResolvedValue({
|
||||
success: false,
|
||||
output: 'delegated agent failed',
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 0, output_tokens: 0 },
|
||||
toolCalls: [],
|
||||
} satisfies AgentRunResult)
|
||||
|
||||
const ctx: ToolUseContext = {
|
||||
agent: { name: 'alice', role: 'lead', model: 'test' },
|
||||
team: {
|
||||
name: 't',
|
||||
agents: ['alice', 'bob'],
|
||||
delegationPool: { availableRunSlots: 1 },
|
||||
runDelegatedAgent,
|
||||
},
|
||||
}
|
||||
|
||||
const result = await delegateToAgentTool.execute(
|
||||
{ target_agent: 'bob', prompt: 'Hi' },
|
||||
ctx,
|
||||
)
|
||||
|
||||
expect(result.isError).toBe(true)
|
||||
expect(result.data).toBe('delegated agent failed')
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,114 @@
|
|||
import { describe, it, expect } from 'vitest'
|
||||
import { z } from 'zod'
|
||||
import { AgentRunner } from '../src/agent/runner.js'
|
||||
import { ToolRegistry, defineTool } from '../src/tool/framework.js'
|
||||
import { ToolExecutor } from '../src/tool/executor.js'
|
||||
import type { LLMAdapter, LLMMessage, LLMResponse, StreamEvent, ToolUseBlock, ToolResultBlock } from '../src/types.js'
|
||||
|
||||
function toolUseResponse(toolName: string, input: Record<string, unknown>): LLMResponse {
|
||||
return {
|
||||
id: `resp-${Math.random().toString(36).slice(2)}`,
|
||||
content: [{
|
||||
type: 'tool_use',
|
||||
id: `tu-${Math.random().toString(36).slice(2)}`,
|
||||
name: toolName,
|
||||
input,
|
||||
}],
|
||||
model: 'mock-model',
|
||||
stop_reason: 'tool_use',
|
||||
usage: { input_tokens: 5, output_tokens: 5 },
|
||||
}
|
||||
}
|
||||
|
||||
function textResponse(text: string): LLMResponse {
|
||||
return {
|
||||
id: `resp-${Math.random().toString(36).slice(2)}`,
|
||||
content: [{ type: 'text', text }],
|
||||
model: 'mock-model',
|
||||
stop_reason: 'end_turn',
|
||||
usage: { input_tokens: 5, output_tokens: 5 },
|
||||
}
|
||||
}
|
||||
|
||||
describe('delegation-triggered budget_exceeded', () => {
|
||||
it('yields tool_result events and appends tool_result message before break', async () => {
|
||||
// Parent turn 1: LLM asks for a delegation.
|
||||
// Tool returns metadata.tokenUsage that alone pushes totalUsage past the budget.
|
||||
// Expectation: stream yields tool_use AND tool_result, and the returned
|
||||
// `messages` contains the user tool_result message, so downstream consumers
|
||||
// can resume without API "tool_use without tool_result" errors.
|
||||
const responses = [
|
||||
toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'work' }),
|
||||
textResponse('should not be reached'),
|
||||
]
|
||||
let idx = 0
|
||||
const adapter: LLMAdapter = {
|
||||
name: 'mock',
|
||||
async chat() {
|
||||
return responses[idx++]!
|
||||
},
|
||||
async *stream() { /* unused */ },
|
||||
}
|
||||
|
||||
const registry = new ToolRegistry()
|
||||
registry.register(
|
||||
defineTool({
|
||||
name: 'delegate_to_agent',
|
||||
description: 'Fake delegation for test',
|
||||
inputSchema: z.object({ target_agent: z.string(), prompt: z.string() }),
|
||||
async execute() {
|
||||
return {
|
||||
data: 'delegated output',
|
||||
metadata: { tokenUsage: { input_tokens: 500, output_tokens: 500 } },
|
||||
}
|
||||
},
|
||||
}),
|
||||
)
|
||||
|
||||
const runner = new AgentRunner(adapter, registry, new ToolExecutor(registry), {
|
||||
model: 'mock-model',
|
||||
allowedTools: ['delegate_to_agent'],
|
||||
maxTurns: 5,
|
||||
maxTokenBudget: 100, // 10 (parent LLM) + 1000 (delegation) ≫ 100
|
||||
agentName: 'parent',
|
||||
})
|
||||
|
||||
const events: StreamEvent[] = []
|
||||
for await (const ev of runner.stream([{ role: 'user', content: [{ type: 'text', text: 'start' }] }])) {
|
||||
events.push(ev)
|
||||
}
|
||||
|
||||
const toolUseEvents = events.filter((e): e is StreamEvent & { type: 'tool_use'; data: ToolUseBlock } => e.type === 'tool_use')
|
||||
const toolResultEvents = events.filter((e): e is StreamEvent & { type: 'tool_result'; data: ToolResultBlock } => e.type === 'tool_result')
|
||||
const budgetEvents = events.filter(e => e.type === 'budget_exceeded')
|
||||
const doneEvents = events.filter((e): e is StreamEvent & { type: 'done'; data: { messages: LLMMessage[]; budgetExceeded?: boolean } } => e.type === 'done')
|
||||
|
||||
// 1. Every tool_use event has a matching tool_result event.
|
||||
expect(toolUseEvents).toHaveLength(1)
|
||||
expect(toolResultEvents).toHaveLength(1)
|
||||
expect(toolResultEvents[0]!.data.tool_use_id).toBe(toolUseEvents[0]!.data.id)
|
||||
|
||||
// 2. Budget event fires and the run terminates with budgetExceeded=true.
|
||||
expect(budgetEvents).toHaveLength(1)
|
||||
expect(doneEvents).toHaveLength(1)
|
||||
expect(doneEvents[0]!.data.budgetExceeded).toBe(true)
|
||||
|
||||
// 3. Returned messages contain the tool_result user message so the
|
||||
// conversation is API-resumable.
|
||||
const messages = doneEvents[0]!.data.messages
|
||||
const lastMsg = messages[messages.length - 1]!
|
||||
expect(lastMsg.role).toBe('user')
|
||||
const hasMatchingToolResult = lastMsg.content.some(
|
||||
b => b.type === 'tool_result' && b.tool_use_id === toolUseEvents[0]!.data.id,
|
||||
)
|
||||
expect(hasMatchingToolResult).toBe(true)
|
||||
|
||||
// 4. Ordering: tool_result event is emitted before budget_exceeded.
|
||||
const toolResultIdx = events.findIndex(e => e.type === 'tool_result')
|
||||
const budgetIdx = events.findIndex(e => e.type === 'budget_exceeded')
|
||||
expect(toolResultIdx).toBeLessThan(budgetIdx)
|
||||
|
||||
// 5. LLM was only called once — we broke before a second turn.
|
||||
expect(idx).toBe(1)
|
||||
})
|
||||
})
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
import { describe, it, expect, vi } from 'vitest'
|
||||
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
|
||||
import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse } from '../src/types.js'
|
||||
|
||||
// Single shared mock adapter, routed by systemPrompt + first-turn user text.
|
||||
vi.mock('../src/llm/adapter.js', () => ({
|
||||
createAdapter: async () => ({
|
||||
name: 'mock',
|
||||
async chat(messages: LLMMessage[], options: LLMChatOptions): Promise<LLMResponse> {
|
||||
const sys = options.systemPrompt ?? ''
|
||||
const firstUserText = extractText(messages[0]?.content ?? [])
|
||||
const onlyOneMessage = messages.length === 1
|
||||
|
||||
// Root parent task (turn 1) emits a delegation tool_use.
|
||||
// Task description strings are set to 'ROOT-A' / 'ROOT-B' so we can
|
||||
// distinguish the parent's first turn from the ephemeral delegate's
|
||||
// first turn (which sees 'ping-A' / 'ping-B' as its user prompt).
|
||||
if (onlyOneMessage && firstUserText.includes('ROOT-A')) {
|
||||
return toolUseResponse('delegate_to_agent', { target_agent: 'B', prompt: 'ping-B' })
|
||||
}
|
||||
if (onlyOneMessage && firstUserText.includes('ROOT-B')) {
|
||||
return toolUseResponse('delegate_to_agent', { target_agent: 'A', prompt: 'ping-A' })
|
||||
}
|
||||
|
||||
// Ephemeral delegate's first (and only) turn — return plain text so it
|
||||
// terminates cleanly without another delegation.
|
||||
if (onlyOneMessage) {
|
||||
const who = sys.startsWith('A-') ? 'A' : 'B'
|
||||
return textResponse(`${who} nested done`)
|
||||
}
|
||||
|
||||
// Root parent turn 2 — after tool_result. Return text to end the loop.
|
||||
const who = sys.startsWith('A-') ? 'A' : 'B'
|
||||
return textResponse(`${who} parent done`)
|
||||
},
|
||||
async *stream() { yield { type: 'done' as const, data: {} } },
|
||||
}),
|
||||
}))
|
||||
|
||||
function textResponse(text: string): LLMResponse {
|
||||
return {
|
||||
id: `r-${Math.random().toString(36).slice(2)}`,
|
||||
content: [{ type: 'text', text }],
|
||||
model: 'mock-model',
|
||||
stop_reason: 'end_turn',
|
||||
usage: { input_tokens: 5, output_tokens: 5 },
|
||||
}
|
||||
}
|
||||
|
||||
function toolUseResponse(toolName: string, input: Record<string, unknown>): LLMResponse {
|
||||
return {
|
||||
id: `r-${Math.random().toString(36).slice(2)}`,
|
||||
content: [{
|
||||
type: 'tool_use',
|
||||
id: `tu-${Math.random().toString(36).slice(2)}`,
|
||||
name: toolName,
|
||||
input,
|
||||
}],
|
||||
model: 'mock-model',
|
||||
stop_reason: 'tool_use',
|
||||
usage: { input_tokens: 5, output_tokens: 5 },
|
||||
}
|
||||
}
|
||||
|
||||
function extractText(content: readonly { type: string; text?: string }[]): string {
|
||||
return content
|
||||
.filter((b): b is { type: 'text'; text: string } => b.type === 'text' && typeof b.text === 'string')
|
||||
.map((b) => b.text)
|
||||
.join(' ')
|
||||
}
|
||||
|
||||
function agentA(): AgentConfig {
|
||||
return {
|
||||
name: 'A',
|
||||
model: 'mock-model',
|
||||
provider: 'openai',
|
||||
// sysPrompt prefix used by the mock to disambiguate A vs B.
|
||||
systemPrompt: 'A-agent. You are agent A. Delegate to B when asked.',
|
||||
tools: ['delegate_to_agent'],
|
||||
maxTurns: 4,
|
||||
}
|
||||
}
|
||||
|
||||
function agentB(): AgentConfig {
|
||||
return {
|
||||
name: 'B',
|
||||
model: 'mock-model',
|
||||
provider: 'openai',
|
||||
systemPrompt: 'B-agent. You are agent B. Delegate to A when asked.',
|
||||
tools: ['delegate_to_agent'],
|
||||
maxTurns: 4,
|
||||
}
|
||||
}
|
||||
|
||||
describe('mutual delegation (A↔B) completes without agent-lock deadlock', () => {
|
||||
it('two parallel root tasks both finish when each delegates to the other', async () => {
|
||||
// Previously: pool.run('B') inside A's tool call waited on B's agent lock
|
||||
// (held by the parent B task), while pool.run('A') inside B's tool call
|
||||
// waited on A's agent lock — classic mutual deadlock.
|
||||
// After the fix: delegation uses runEphemeral on a fresh Agent instance,
|
||||
// so neither call touches the per-agent lock.
|
||||
const oma = new OpenMultiAgent({
|
||||
defaultModel: 'mock-model',
|
||||
defaultProvider: 'openai',
|
||||
// Need room for 2 parent runs + 2 ephemeral delegates.
|
||||
maxConcurrency: 4,
|
||||
})
|
||||
const team = oma.createTeam('mutual', {
|
||||
name: 'mutual',
|
||||
agents: [agentA(), agentB()],
|
||||
sharedMemory: false,
|
||||
})
|
||||
|
||||
// Race against a 10s timeout so a regression surfaces as a test failure
|
||||
// rather than a hanging CI job.
|
||||
const runPromise = oma.runTasks(team, [
|
||||
{ title: 'Task A', description: 'ROOT-A', assignee: 'A' },
|
||||
{ title: 'Task B', description: 'ROOT-B', assignee: 'B' },
|
||||
])
|
||||
const timeout = new Promise((_resolve, reject) =>
|
||||
setTimeout(() => reject(new Error('mutual delegation deadlock (timeout)')), 10_000),
|
||||
)
|
||||
|
||||
const result = (await Promise.race([runPromise, timeout])) as Awaited<typeof runPromise>
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
const agentOutputs = [...result.agentResults.values()].map((r) => r.output)
|
||||
expect(agentOutputs.some((o) => o.includes('A parent done'))).toBe(true)
|
||||
expect(agentOutputs.some((o) => o.includes('B parent done'))).toBe(true)
|
||||
})
|
||||
})
|
||||
|
|
@ -6,6 +6,10 @@ describe('Semaphore', () => {
|
|||
expect(() => new Semaphore(0)).toThrow()
|
||||
})
|
||||
|
||||
it('exposes configured limit', () => {
|
||||
expect(new Semaphore(5).limit).toBe(5)
|
||||
})
|
||||
|
||||
it('allows up to max concurrent holders', async () => {
|
||||
const sem = new Semaphore(2)
|
||||
let running = 0
|
||||
|
|
|
|||
|
|
@ -495,4 +495,49 @@ describe('AgentRunner compressToolResults', () => {
|
|||
const allToolResults = extractToolResultContents(turn3Messages)
|
||||
expect(allToolResults[0]).toContain('compressed')
|
||||
})
|
||||
|
||||
it('does NOT compress delegate_to_agent results on turn 3+', async () => {
|
||||
const calls: LLMMessage[][] = []
|
||||
const longOutput = 'y'.repeat(600)
|
||||
const responses = [
|
||||
toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'do work' }),
|
||||
toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'do more' }),
|
||||
textResponse('done'),
|
||||
]
|
||||
let idx = 0
|
||||
const adapter: LLMAdapter = {
|
||||
name: 'mock',
|
||||
async chat(messages) {
|
||||
calls.push(messages.map(m => ({ role: m.role, content: [...m.content] })))
|
||||
return responses[idx++]!
|
||||
},
|
||||
async *stream() { /* unused */ },
|
||||
}
|
||||
const registry = new ToolRegistry()
|
||||
registry.register(
|
||||
defineTool({
|
||||
name: 'delegate_to_agent',
|
||||
description: 'Fake delegation tool for test',
|
||||
inputSchema: z.object({ target_agent: z.string(), prompt: z.string() }),
|
||||
async execute() {
|
||||
return { data: longOutput }
|
||||
},
|
||||
}),
|
||||
)
|
||||
const runner = new AgentRunner(adapter, registry, new ToolExecutor(registry), {
|
||||
model: 'mock-model',
|
||||
allowedTools: ['delegate_to_agent'],
|
||||
maxTurns: 5,
|
||||
compressToolResults: true,
|
||||
})
|
||||
|
||||
await runner.run([{ role: 'user', content: [{ type: 'text', text: 'start' }] }])
|
||||
|
||||
// Turn 3: both delegation results should survive unchanged.
|
||||
const turn3Messages = calls[2]!
|
||||
const allToolResults = extractToolResultContents(turn3Messages)
|
||||
expect(allToolResults).toHaveLength(2)
|
||||
expect(allToolResults[0]).toBe(longOutput)
|
||||
expect(allToolResults[1]).toBe(longOutput)
|
||||
})
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue