From b857c001a88740e726d49aac5fd74714f7dc1f24 Mon Sep 17 00:00:00 2001 From: JackChen Date: Sun, 19 Apr 2026 10:50:44 +0800 Subject: [PATCH] feat: agent delegation mechanism (#123) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements `delegate_to_agent` built-in tool (closes #63). Opt-in registration via `includeDelegateTool`; only wired up by `runTeam` / `runTasks` for pool workers. Guards: self-delegation, unknown target, cycle detection via `delegationChain`, depth cap (`maxDelegationDepth`, default 3), pool deadlock. Delegation runs on ephemeral Agent instances via `AgentPool.runEphemeral` (pool semaphore only, no per-agent lock) so mutual delegation (A→B while B→A) can't deadlock. Delegated run `tokenUsage` surfaces via `ToolResult.metadata` and rolls into the parent runner's total before the next budget check; delegation tool_result blocks are exempt from `compressToolResults` and the `compact` strategy. Best-effort SharedMemory audit writes at `{caller}/delegation:{target}:{ts}-{rand}`. Picks up @NamelessNATM's work from #84 and adds cycle detection, token aggregation, compression exemption, mutual-delegation deadlock fix (Codex P1), and tool_result-preservation on budget-exceeded (Codex P2). Co-authored-by: NamelessNATM --- CLAUDE.md | 16 +- examples/16-agent-handoff.ts | 64 +++++++ src/agent/pool.ts | 36 ++++ src/agent/runner.ts | 72 +++++++- src/index.ts | 4 + src/orchestrator/orchestrator.ts | 122 +++++++++++- src/tool/built-in/delegate.ts | 109 +++++++++++ src/tool/built-in/index.ts | 26 ++- src/types.ts | 46 ++++- src/utils/semaphore.ts | 5 + tests/agent-pool.test.ts | 88 +++++++++ tests/built-in-tools.test.ts | 256 +++++++++++++++++++++++++- tests/delegation-budget.test.ts | 114 ++++++++++++ tests/delegation-concurrency.test.ts | 131 +++++++++++++ tests/semaphore.test.ts | 4 + tests/tool-result-compression.test.ts | 45 +++++ 16 files changed, 1116 insertions(+), 22 deletions(-) create mode 100644 examples/16-agent-handoff.ts create mode 100644 src/tool/built-in/delegate.ts create mode 100644 tests/delegation-budget.test.ts create mode 100644 tests/delegation-concurrency.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index 0b78e16..a586a50 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -74,7 +74,21 @@ Optional `maxRetries`, `retryDelayMs`, `retryBackoff` on task config (used via ` ### Built-in Tools -`bash`, `file_read`, `file_write`, `file_edit`, `grep` — registered via `registerBuiltInTools(registry)`. +`bash`, `file_read`, `file_write`, `file_edit`, `grep`, `glob` — registered via `registerBuiltInTools(registry)`. `delegate_to_agent` is opt-in (`registerBuiltInTools(registry, { includeDelegateTool: true })`) and only wired up inside pool workers by `runTeam`/`runTasks` — see "Agent Delegation" below. + +### Agent Delegation + +`delegate_to_agent` (in `src/tool/built-in/delegate.ts`) lets an agent synchronously hand a sub-prompt to another roster agent and receive its final output as a tool result. Only active during orchestrated runs; standalone `runAgent` and the `runTeam` short-circuit path (`isSimpleGoal` hit) do not inject it. + +Guards (all enforced in the tool itself, before `runDelegatedAgent` is called): + +- **Self-delegation:** rejected (`target === context.agent.name`) +- **Unknown agent:** rejected (target not in team roster) +- **Cycle detection:** rejected if target already in `TeamInfo.delegationChain` (prevents `A → B → A` from burning tokens up to the depth cap) +- **Depth cap:** `OrchestratorConfig.maxDelegationDepth` (default 3) +- **Pool deadlock:** rejected when `AgentPool.availableRunSlots < 1`, without calling the pool + +The delegated run's `AgentRunResult.tokenUsage` is surfaced via `ToolResult.metadata.tokenUsage`; the runner accumulates it into `totalUsage` before the next `maxTokenBudget` check, so delegation cannot silently bypass the parent's budget. Delegation tool_result blocks are exempt from `compressToolResults` and the `compact` context strategy so the parent agent retains the full sub-agent output across turns. Best-effort SharedMemory audit writes at `{caller}/delegation:{target}:{timestamp}-{rand}` if the team has shared memory enabled. ### Adding an LLM Adapter diff --git a/examples/16-agent-handoff.ts b/examples/16-agent-handoff.ts new file mode 100644 index 0000000..fd2775c --- /dev/null +++ b/examples/16-agent-handoff.ts @@ -0,0 +1,64 @@ +/** + * Example 16 — Synchronous agent handoff via `delegate_to_agent` + * + * During `runTeam` / `runTasks`, pool agents register the built-in + * `delegate_to_agent` tool so one specialist can run a sub-prompt on another + * roster agent and read the answer in the same conversation turn. + * + * Whitelist `delegate_to_agent` in `tools` when you want the model to see it; + * standalone `runAgent()` does not register this tool by default. + * + * Run: + * npx tsx examples/16-agent-handoff.ts + * + * Prerequisites: + * ANTHROPIC_API_KEY + */ + +import { OpenMultiAgent } from '../src/index.js' +import type { AgentConfig } from '../src/types.js' + +const researcher: AgentConfig = { + name: 'researcher', + model: 'claude-sonnet-4-6', + provider: 'anthropic', + systemPrompt: + 'You answer factual questions briefly. When the user asks for a second opinion ' + + 'from the analyst, use delegate_to_agent to ask the analyst agent, then summarize both views.', + tools: ['delegate_to_agent'], + maxTurns: 6, +} + +const analyst: AgentConfig = { + name: 'analyst', + model: 'claude-sonnet-4-6', + provider: 'anthropic', + systemPrompt: 'You give short, skeptical analysis of claims. Push back when evidence is weak.', + tools: [], + maxTurns: 4, +} + +async function main(): Promise { + const orchestrator = new OpenMultiAgent({ maxConcurrency: 2 }) + const team = orchestrator.createTeam('handoff-demo', { + name: 'handoff-demo', + agents: [researcher, analyst], + sharedMemory: true, + }) + + const goal = + 'In one paragraph: state a simple fact about photosynthesis. ' + + 'Then ask the analyst (via delegate_to_agent) for a one-sentence critique of overstated claims in popular science. ' + + 'Merge both into a final short answer.' + + const result = await orchestrator.runTeam(team, goal) + console.log('Success:', result.success) + for (const [name, ar] of result.agentResults) { + console.log(`\n--- ${name} ---\n${ar.output.slice(0, 2000)}`) + } +} + +main().catch((err) => { + console.error(err) + process.exit(1) +}) diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 18545b4..d97c047 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -77,6 +77,16 @@ export class AgentPool { this.semaphore = new Semaphore(maxConcurrency) } + /** + * Pool semaphore slots not currently held (`maxConcurrency - active`). + * Used to avoid deadlocks when a nested `run()` would wait forever for a slot + * held by the parent run. Best-effort only if multiple nested runs start in + * parallel after the same synchronous check. + */ + get availableRunSlots(): number { + return this.maxConcurrency - this.semaphore.active + } + // ------------------------------------------------------------------------- // Registry operations // ------------------------------------------------------------------------- @@ -157,6 +167,32 @@ export class AgentPool { } } + /** + * Run a prompt on a caller-supplied Agent instance, acquiring only the pool + * semaphore — no per-agent lock, no registry lookup. + * + * Designed for delegation: each delegated call should use a **fresh** Agent + * instance (matching `delegate_to_agent`'s "runs in a fresh conversation" + * semantics), so the per-agent mutex used by {@link run} would be dead + * weight and, worse, a deadlock vector for mutual delegation (A→B while + * B→A, each caller holding its own `run`'s agent lock). + * + * The caller is responsible for constructing the Agent; {@link AgentPool} + * does not register or track it. + */ + async runEphemeral( + agent: Agent, + prompt: string, + runOptions?: Partial, + ): Promise { + await this.semaphore.acquire() + try { + return await agent.run(prompt, runOptions) + } finally { + this.semaphore.release() + } + } + /** * Run prompts on multiple agents in parallel, subject to the concurrency * cap set at construction time. diff --git a/src/agent/runner.ts b/src/agent/runner.ts index 92b482c..6264c13 100644 --- a/src/agent/runner.ts +++ b/src/agent/runner.ts @@ -23,6 +23,7 @@ import type { StreamEvent, ToolResult, ToolUseContext, + TeamInfo, LLMAdapter, LLMChatOptions, TraceEvent, @@ -134,6 +135,11 @@ export interface RunOptions { * {@link RunnerOptions.abortSignal}. Useful for per-run timeouts. */ readonly abortSignal?: AbortSignal + /** + * Team context for built-in tools such as `delegate_to_agent`. + * Injected by the orchestrator during `runTeam` / `runTasks` pool runs. + */ + readonly team?: TeamInfo } /** The aggregated result returned when a full run completes. */ @@ -733,11 +739,12 @@ export class AgentRunner { // Parallel execution is critical for multi-tool responses where the // tools are independent (e.g. reading several files at once). // ------------------------------------------------------------------ - const toolContext: ToolUseContext = this.buildToolContext(effectiveAbortSignal) + const toolContext: ToolUseContext = this.buildToolContext(options) const executionPromises = toolUseBlocks.map(async (block): Promise<{ resultBlock: ToolResultBlock record: ToolCallRecord + delegationUsage?: TokenUsage }> => { options.onToolCall?.(block.name, block.input) @@ -789,12 +796,30 @@ export class AgentRunner { is_error: result.isError, } - return { resultBlock, record } + return { + resultBlock, + record, + ...(result.metadata?.tokenUsage !== undefined + ? { delegationUsage: result.metadata.tokenUsage } + : {}), + } }) // Wait for every tool in this turn to finish. const executions = await Promise.all(executionPromises) + // Roll up any nested-run token usage surfaced via ToolResult.metadata + // (e.g. from delegate_to_agent) so it counts against this agent's budget. + let delegationTurnUsage: TokenUsage | undefined + for (const ex of executions) { + if (ex.delegationUsage !== undefined) { + totalUsage = addTokenUsage(totalUsage, ex.delegationUsage) + delegationTurnUsage = delegationTurnUsage === undefined + ? ex.delegationUsage + : addTokenUsage(delegationTurnUsage, ex.delegationUsage) + } + } + // ------------------------------------------------------------------ // Step 5: Accumulate results and build the user message that carries // them back to the LLM in the next turn. @@ -828,6 +853,27 @@ export class AgentRunner { conversationMessages.push(toolResultMessage) options.onMessage?.(toolResultMessage) + // Budget check is deferred until tool_result events have been yielded + // and the tool_result user message has been appended, so stream + // consumers see matched tool_use/tool_result pairs and the returned + // `messages` remain resumable against the Anthropic/OpenAI APIs. + if (delegationTurnUsage !== undefined && this.options.maxTokenBudget !== undefined) { + const totalAfterDelegation = totalUsage.input_tokens + totalUsage.output_tokens + if (totalAfterDelegation > this.options.maxTokenBudget) { + budgetExceeded = true + finalOutput = turnText + yield { + type: 'budget_exceeded', + data: new TokenBudgetExceededError( + this.options.agentName ?? 'unknown', + totalAfterDelegation, + this.options.maxTokenBudget, + ), + } satisfies StreamEvent + break + } + } + // Loop back to Step 1 — send updated conversation to the LLM. } } catch (err) { @@ -968,8 +1014,10 @@ export class AgentRunner { } // Short results: preserve. if (block.content.length < minToolResultChars) return block - // Compress. const toolName = toolNameMap.get(block.tool_use_id) ?? 'unknown' + // Delegation results: preserve — parent agent may still reason over them. + if (toolName === 'delegate_to_agent') return block + // Compress. msgChanged = true return { type: 'tool_result', @@ -1024,6 +1072,16 @@ export class AgentRunner { // Nothing to compress if there's at most one tool-result user message. if (lastToolResultUserIdx <= 0) return messages + // Build a tool_use_id → tool name map so we can exempt delegation results, + // whose full output the parent agent may need to re-read in later turns. + const toolNameMap = new Map() + for (const msg of messages) { + if (msg.role !== 'assistant') continue + for (const block of msg.content) { + if (block.type === 'tool_use') toolNameMap.set(block.id, block.name) + } + } + let anyChanged = false const result = messages.map((msg, idx) => { // Only compress user messages that appear before the last one. @@ -1039,6 +1097,9 @@ export class AgentRunner { // Never compress error results — they carry diagnostic value. if (block.is_error) return block + // Never compress delegation results — the parent agent relies on the full sub-agent output. + if (toolNameMap.get(block.tool_use_id) === 'delegate_to_agent') return block + // Skip already-compressed results — avoid re-compression with wrong char count. if (block.content.startsWith('[Tool output compressed')) return block @@ -1067,14 +1128,15 @@ export class AgentRunner { * Build the {@link ToolUseContext} passed to every tool execution. * Identifies this runner as the invoking agent. */ - private buildToolContext(abortSignal?: AbortSignal): ToolUseContext { + private buildToolContext(options: RunOptions = {}): ToolUseContext { return { agent: { name: this.options.agentName ?? 'runner', role: this.options.agentRole ?? 'assistant', model: this.options.model, }, - abortSignal, + abortSignal: options.abortSignal ?? this.options.abortSignal, + ...(options.team !== undefined ? { team: options.team } : {}), } } } diff --git a/src/index.ts b/src/index.ts index b10e6d2..9a76e41 100644 --- a/src/index.ts +++ b/src/index.ts @@ -94,13 +94,16 @@ export type { ToolExecutorOptions, BatchToolCall } from './tool/executor.js' export { registerBuiltInTools, BUILT_IN_TOOLS, + ALL_BUILT_IN_TOOLS_WITH_DELEGATE, bashTool, + delegateToAgentTool, fileReadTool, fileWriteTool, fileEditTool, globTool, grepTool, } from './tool/built-in/index.js' +export type { RegisterBuiltInToolsOptions } from './tool/built-in/index.js' // --------------------------------------------------------------------------- // LLM adapters @@ -145,6 +148,7 @@ export type { ToolUseContext, AgentInfo, TeamInfo, + DelegationPoolView, // Agent AgentConfig, diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 12bcf2e..5050480 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -50,6 +50,7 @@ import type { Task, TaskStatus, TeamConfig, + TeamInfo, TeamRunResult, TokenUsage, } from '../types.js' @@ -73,6 +74,7 @@ import { extractKeywords, keywordScore } from '../utils/keywords.js' const ZERO_USAGE: TokenUsage = { input_tokens: 0, output_tokens: 0 } const DEFAULT_MAX_CONCURRENCY = 5 +const DEFAULT_MAX_DELEGATION_DEPTH = 3 const DEFAULT_MODEL = 'claude-opus-4-6' // --------------------------------------------------------------------------- @@ -207,11 +209,14 @@ function resolveTokenBudget(primary?: number, fallback?: number): number | undef /** * Build a minimal {@link Agent} with its own fresh registry/executor. - * Registers all built-in tools so coordinator/worker agents can use them. + * Pool workers pass `includeDelegateTool` so `delegate_to_agent` is available during `runTeam` / `runTasks`. */ -function buildAgent(config: AgentConfig): Agent { +function buildAgent( + config: AgentConfig, + toolRegistration?: { readonly includeDelegateTool?: boolean }, +): Agent { const registry = new ToolRegistry() - registerBuiltInTools(registry) + registerBuiltInTools(registry, toolRegistration) if (config.customTools) { for (const tool of config.customTools) { registry.register(tool, { runtimeAdded: true }) @@ -411,6 +416,91 @@ interface RunContext { budgetExceededReason?: string } +/** + * Build {@link TeamInfo} for tool context, including nested `runDelegatedAgent` + * that respects pool capacity to avoid semaphore deadlocks. + * + * Delegation always builds a **fresh** Agent instance for the target and runs + * it via `pool.runEphemeral` — the pool semaphore still gates total concurrency, + * but the per-agent lock is bypassed. This matches `delegate_to_agent`'s "runs + * in a fresh conversation for this prompt only" contract and prevents mutual + * delegation (A→B while B→A) from deadlocking on each other's agent locks. + */ +function buildTaskAgentTeamInfo( + ctx: RunContext, + taskId: string, + traceBase: Partial, + delegationDepth: number, + delegationChain: readonly string[], +): TeamInfo { + const sharedMem = ctx.team.getSharedMemoryInstance() + const maxDepth = ctx.config.maxDelegationDepth + const agentConfigs = ctx.team.getAgents() + const agentNames = agentConfigs.map((a) => a.name) + + const runDelegatedAgent = async (targetAgent: string, prompt: string): Promise => { + const pool = ctx.pool + if (pool.availableRunSlots < 1) { + return { + success: false, + output: + 'Agent pool has no free concurrency slot for a delegated run (would deadlock). ' + + 'Increase maxConcurrency or reduce parallel delegation.', + messages: [], + tokenUsage: ZERO_USAGE, + toolCalls: [], + } + } + + const targetConfig = agentConfigs.find((a) => a.name === targetAgent) + if (!targetConfig) { + return { + success: false, + output: `Unknown agent "${targetAgent}" — not in team roster [${agentNames.join(', ')}].`, + messages: [], + tokenUsage: ZERO_USAGE, + toolCalls: [], + } + } + + // Apply orchestrator-level defaults just like buildPool, then construct a + // one-shot Agent for this delegation only. + const effective: AgentConfig = { + ...targetConfig, + provider: targetConfig.provider ?? ctx.config.defaultProvider, + baseURL: targetConfig.baseURL ?? ctx.config.defaultBaseURL, + apiKey: targetConfig.apiKey ?? ctx.config.defaultApiKey, + } + const tempAgent = buildAgent(effective, { includeDelegateTool: true }) + + const nestedTeam = buildTaskAgentTeamInfo( + ctx, + taskId, + traceBase, + delegationDepth + 1, + [...delegationChain, targetAgent], + ) + const childOpts: Partial = { + ...traceBase, + traceAgent: targetAgent, + taskId, + team: nestedTeam, + } + return pool.runEphemeral(tempAgent, prompt, childOpts) + } + + return { + name: ctx.team.name, + agents: agentNames, + ...(sharedMem ? { sharedMemory: sharedMem.getStore() } : {}), + delegationDepth, + maxDelegationDepth: maxDepth, + delegationPool: ctx.pool, + delegationChain, + runDelegatedAgent, + } +} + /** * Execute all tasks in `queue` using agents in `pool`, respecting dependencies * and running independent tasks in parallel. @@ -509,16 +599,28 @@ async function executeQueue( // Build the prompt: task description + dependency-only context by default. const prompt = await buildTaskPrompt(task, team, queue) - // Build trace context for this task's agent run - const traceOptions: Partial | undefined = config.onTrace - ? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee, abortSignal: ctx.abortSignal } - : ctx.abortSignal ? { abortSignal: ctx.abortSignal } : undefined + // Trace + abort + team tool context (delegate_to_agent) + const traceBase: Partial = { + ...(config.onTrace + ? { + onTrace: config.onTrace, + runId: ctx.runId ?? '', + taskId: task.id, + traceAgent: assignee, + } + : {}), + ...(ctx.abortSignal ? { abortSignal: ctx.abortSignal } : {}), + } + const runOptions: Partial = { + ...traceBase, + team: buildTaskAgentTeamInfo(ctx, task.id, traceBase, 0, [assignee]), + } const taskStartMs = config.onTrace ? Date.now() : 0 let retryCount = 0 const result = await executeWithRetry( - () => pool.run(assignee, prompt, traceOptions), + () => pool.run(assignee, prompt, runOptions), task, (retryData) => { retryCount++ @@ -711,12 +813,14 @@ export class OpenMultiAgent { * * Sensible defaults: * - `maxConcurrency`: 5 + * - `maxDelegationDepth`: 3 * - `defaultModel`: `'claude-opus-4-6'` * - `defaultProvider`: `'anthropic'` */ constructor(config: OrchestratorConfig = {}) { this.config = { maxConcurrency: config.maxConcurrency ?? DEFAULT_MAX_CONCURRENCY, + maxDelegationDepth: config.maxDelegationDepth ?? DEFAULT_MAX_DELEGATION_DEPTH, defaultModel: config.defaultModel ?? DEFAULT_MODEL, defaultProvider: config.defaultProvider ?? 'anthropic', defaultBaseURL: config.defaultBaseURL, @@ -1409,7 +1513,7 @@ export class OpenMultiAgent { baseURL: config.baseURL ?? this.config.defaultBaseURL, apiKey: config.apiKey ?? this.config.defaultApiKey, } - pool.add(buildAgent(effective)) + pool.add(buildAgent(effective, { includeDelegateTool: true })) } return pool } diff --git a/src/tool/built-in/delegate.ts b/src/tool/built-in/delegate.ts new file mode 100644 index 0000000..5f0fc97 --- /dev/null +++ b/src/tool/built-in/delegate.ts @@ -0,0 +1,109 @@ +/** + * @fileoverview Built-in `delegate_to_agent` tool for synchronous handoff to a roster agent. + */ + +import { z } from 'zod' +import type { ToolDefinition, ToolResult, ToolUseContext } from '../../types.js' + +const inputSchema = z.object({ + target_agent: z.string().min(1).describe('Name of the team agent to run the sub-task.'), + prompt: z.string().min(1).describe('Instructions / question for the target agent.'), +}) + +/** + * Delegates a sub-task to another agent on the team and returns that agent's final text output. + * + * Only available when the orchestrator injects {@link ToolUseContext.team} with + * `runDelegatedAgent` (pool-backed `runTeam` / `runTasks`). Standalone `runAgent` + * does not register this tool by default. + * + * Nested {@link AgentRunResult.tokenUsage} from the delegated run is surfaced via + * {@link ToolResult.metadata} so the parent runner can aggregate it into its total + * (keeps `maxTokenBudget` accurate across delegation chains). + */ +export const delegateToAgentTool: ToolDefinition> = { + name: 'delegate_to_agent', + description: + 'Run a sub-task on another agent from this team and return that agent\'s final answer as the tool result. ' + + 'Use when you need a specialist teammate to produce output you will incorporate. ' + + 'The target agent runs in a fresh conversation for this prompt only.', + inputSchema, + async execute( + { target_agent: targetAgent, prompt }, + context: ToolUseContext, + ): Promise { + const team = context.team + if (!team?.runDelegatedAgent) { + return { + data: + 'delegate_to_agent is only available during orchestrated team runs with the delegation tool enabled. ' + + 'Use SharedMemory or explicit tasks instead.', + isError: true, + } + } + + if (targetAgent === context.agent.name) { + return { + data: 'Cannot delegate to yourself; use another team member.', + isError: true, + } + } + + if (!team.agents.includes(targetAgent)) { + return { + data: `Unknown agent "${targetAgent}". Roster: ${team.agents.join(', ')}`, + isError: true, + } + } + + const chain = team.delegationChain ?? [] + if (chain.includes(targetAgent)) { + return { + data: + `Delegation cycle detected: ${[...chain, targetAgent].join(' -> ')}. ` + + 'Pick a different target or restructure the plan.', + isError: true, + } + } + + const depth = team.delegationDepth ?? 0 + const maxDepth = team.maxDelegationDepth ?? 3 + if (depth >= maxDepth) { + return { + data: `Maximum delegation depth (${maxDepth}) reached; cannot delegate further.`, + isError: true, + } + } + + if (team.delegationPool !== undefined && team.delegationPool.availableRunSlots < 1) { + return { + data: + 'Agent pool has no free concurrency slot for a delegated run (nested run would block indefinitely). ' + + 'Increase orchestrator maxConcurrency, wait for parallel work to finish, or avoid delegating while the pool is saturated.', + isError: true, + } + } + + const result = await team.runDelegatedAgent(targetAgent, prompt) + + if (team.sharedMemory) { + const suffix = `${Date.now()}-${Math.random().toString(36).slice(2, 10)}` + const key = `delegation:${targetAgent}:${suffix}` + try { + await team.sharedMemory.set(`${context.agent.name}/${key}`, result.output, { + agent: context.agent.name, + delegatedTo: targetAgent, + success: String(result.success), + }) + } catch { + // Audit is best-effort; do not fail the tool on store errors. + } + } + + return { + data: result.output, + isError: !result.success, + metadata: { tokenUsage: result.tokenUsage }, + } + }, +} diff --git a/src/tool/built-in/index.ts b/src/tool/built-in/index.ts index b9c0977..722d0c4 100644 --- a/src/tool/built-in/index.ts +++ b/src/tool/built-in/index.ts @@ -8,13 +8,23 @@ import type { ToolDefinition } from '../../types.js' import { ToolRegistry } from '../framework.js' import { bashTool } from './bash.js' +import { delegateToAgentTool } from './delegate.js' import { fileEditTool } from './file-edit.js' import { fileReadTool } from './file-read.js' import { fileWriteTool } from './file-write.js' import { globTool } from './glob.js' import { grepTool } from './grep.js' -export { bashTool, fileEditTool, fileReadTool, fileWriteTool, globTool, grepTool } +export { bashTool, delegateToAgentTool, fileEditTool, fileReadTool, fileWriteTool, globTool, grepTool } + +/** Options for {@link registerBuiltInTools}. */ +export interface RegisterBuiltInToolsOptions { + /** + * When true, registers `delegate_to_agent` (team orchestration handoff). + * Default false so standalone agents and `runAgent` do not expose a tool that always errors. + */ + readonly includeDelegateTool?: boolean +} /** * The ordered list of all built-in tools. Import this when you need to @@ -33,6 +43,12 @@ export const BUILT_IN_TOOLS: ToolDefinition[] = [ globTool, ] +/** All built-ins including `delegate_to_agent` (for team registry setup). */ +export const ALL_BUILT_IN_TOOLS_WITH_DELEGATE: ToolDefinition[] = [ + ...BUILT_IN_TOOLS, + delegateToAgentTool, +] + /** * Register all built-in tools with the given registry. * @@ -45,8 +61,14 @@ export const BUILT_IN_TOOLS: ToolDefinition[] = [ * registerBuiltInTools(registry) * ``` */ -export function registerBuiltInTools(registry: ToolRegistry): void { +export function registerBuiltInTools( + registry: ToolRegistry, + options?: RegisterBuiltInToolsOptions, +): void { for (const tool of BUILT_IN_TOOLS) { registry.register(tool) } + if (options?.includeDelegateTool) { + registry.register(delegateToAgentTool) + } } diff --git a/src/types.ts b/src/types.ts index e4f928b..d5db8e0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -178,17 +178,54 @@ export interface AgentInfo { readonly model: string } -/** Descriptor for a team of agents with shared memory. */ +/** + * Minimal pool surface used by `delegate_to_agent` to detect nested-run capacity. + * {@link AgentPool} satisfies this structurally via {@link AgentPool.availableRunSlots}. + */ +export interface DelegationPoolView { + readonly availableRunSlots: number +} + +/** Descriptor for a team of agents (orchestrator-injected into tool context). */ export interface TeamInfo { readonly name: string readonly agents: readonly string[] - readonly sharedMemory: MemoryStore + /** When the team has shared memory enabled; used for delegation audit writes. */ + readonly sharedMemory?: MemoryStore + /** Zero-based depth of nested delegation from the root task run. */ + readonly delegationDepth?: number + readonly maxDelegationDepth?: number + readonly delegationPool?: DelegationPoolView + /** + * Ordered chain of agent names from the root task to the current agent. + * Used to block `A -> B -> A` cycles before they burn turns against `maxDelegationDepth`. + */ + readonly delegationChain?: readonly string[] + /** + * Run another roster agent to completion and return its result. + * Only set during orchestrated pool execution (`runTeam` / `runTasks`). + */ + readonly runDelegatedAgent?: (targetAgent: string, prompt: string) => Promise +} + +/** + * Optional side-channel metadata a tool may attach to its result. + * Not shown to the LLM — the runner reads it for accounting purposes. + */ +export interface ToolResultMetadata { + /** + * Token usage consumed inside the tool execution itself (e.g. nested LLM + * calls from `delegate_to_agent`). Accumulated into the parent runner's + * total so budgets/cost tracking stay accurate across delegation. + */ + readonly tokenUsage?: TokenUsage } /** Value returned by a tool's `execute` function. */ export interface ToolResult { readonly data: string readonly isError?: boolean + readonly metadata?: ToolResultMetadata } /** @@ -472,6 +509,11 @@ export interface OrchestratorEvent { /** Top-level configuration for the orchestrator. */ export interface OrchestratorConfig { readonly maxConcurrency?: number + /** + * Maximum depth of `delegate_to_agent` chains from a task run (default `3`). + * Depth is per nested delegated run, not per team. + */ + readonly maxDelegationDepth?: number /** Maximum cumulative tokens (input + output) allowed per orchestrator run. */ readonly maxTokenBudget?: number readonly defaultModel?: string diff --git a/src/utils/semaphore.ts b/src/utils/semaphore.ts index 30fe61a..d4f4c01 100644 --- a/src/utils/semaphore.ts +++ b/src/utils/semaphore.ts @@ -34,6 +34,11 @@ export class Semaphore { } } + /** Maximum concurrent holders configured for this semaphore. */ + get limit(): number { + return this.max + } + /** * Acquire a slot. Resolves immediately when one is free, or waits until a * holder calls `release()`. diff --git a/tests/agent-pool.test.ts b/tests/agent-pool.test.ts index 343c5b4..6bb1bc4 100644 --- a/tests/agent-pool.test.ts +++ b/tests/agent-pool.test.ts @@ -291,5 +291,93 @@ describe('AgentPool', () => { expect(maxConcurrent).toBeLessThanOrEqual(2) }) + + it('availableRunSlots matches maxConcurrency when idle', () => { + const pool = new AgentPool(3) + pool.add(createMockAgent('a')) + expect(pool.availableRunSlots).toBe(3) + }) + + it('availableRunSlots is zero while a run holds the pool slot', async () => { + const pool = new AgentPool(1) + const agent = createMockAgent('solo') + pool.add(agent) + + let finishRun!: (value: AgentRunResult) => void + const holdPromise = new Promise((resolve) => { + finishRun = resolve + }) + vi.mocked(agent.run).mockReturnValue(holdPromise) + + const runPromise = pool.run('solo', 'hold-slot') + await Promise.resolve() + await Promise.resolve() + expect(pool.availableRunSlots).toBe(0) + + finishRun(SUCCESS_RESULT) + await runPromise + expect(pool.availableRunSlots).toBe(1) + }) + + it('runEphemeral runs a caller-supplied Agent without touching the agentLock', async () => { + // Registered agent's lock is held by a pending pool.run — a second + // pool.run() against the same name would queue on the agent lock. + // runEphemeral on a fresh Agent instance must NOT block on that lock. + const pool = new AgentPool(3) + const registered = createMockAgent('alice') + pool.add(registered) + + let releaseRegistered!: (v: AgentRunResult) => void + vi.mocked(registered.run).mockReturnValue( + new Promise((resolve) => { + releaseRegistered = resolve + }), + ) + const heldRun = pool.run('alice', 'long running') + await Promise.resolve() + await Promise.resolve() + + const ephemeral = createMockAgent('alice') // same name, fresh instance + const ephemeralResult = await pool.runEphemeral(ephemeral, 'quick task') + + expect(ephemeralResult).toBe(SUCCESS_RESULT) + expect(ephemeral.run).toHaveBeenCalledWith('quick task', undefined) + + releaseRegistered(SUCCESS_RESULT) + await heldRun + }) + + it('runEphemeral still respects pool semaphore', async () => { + const pool = new AgentPool(1) + const holder = createMockAgent('holder') + pool.add(holder) + + let releaseHolder!: (v: AgentRunResult) => void + vi.mocked(holder.run).mockReturnValue( + new Promise((resolve) => { + releaseHolder = resolve + }), + ) + const heldRun = pool.run('holder', 'hold-slot') + await Promise.resolve() + await Promise.resolve() + expect(pool.availableRunSlots).toBe(0) + + // Ephemeral agent should queue on the semaphore, not run immediately. + const ephemeral = createMockAgent('ephemeral') + let ephemeralResolved = false + const ephemeralRun = pool.runEphemeral(ephemeral, 'p').then((r) => { + ephemeralResolved = true + return r + }) + await Promise.resolve() + await Promise.resolve() + expect(ephemeralResolved).toBe(false) + + releaseHolder(SUCCESS_RESULT) + await heldRun + await ephemeralRun + expect(ephemeralResolved).toBe(true) + }) }) }) diff --git a/tests/built-in-tools.test.ts b/tests/built-in-tools.test.ts index e644726..ee6d63b 100644 --- a/tests/built-in-tools.test.ts +++ b/tests/built-in-tools.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' import { mkdtemp, rm, writeFile, readFile } from 'fs/promises' import { join } from 'path' import { tmpdir } from 'os' @@ -8,9 +8,14 @@ import { fileEditTool } from '../src/tool/built-in/file-edit.js' import { bashTool } from '../src/tool/built-in/bash.js' import { globTool } from '../src/tool/built-in/glob.js' import { grepTool } from '../src/tool/built-in/grep.js' -import { registerBuiltInTools, BUILT_IN_TOOLS } from '../src/tool/built-in/index.js' +import { + registerBuiltInTools, + BUILT_IN_TOOLS, + delegateToAgentTool, +} from '../src/tool/built-in/index.js' import { ToolRegistry } from '../src/tool/framework.js' -import type { ToolUseContext } from '../src/types.js' +import { InMemoryStore } from '../src/memory/store.js' +import type { AgentRunResult, ToolUseContext } from '../src/types.js' // --------------------------------------------------------------------------- // Helpers @@ -45,6 +50,13 @@ describe('registerBuiltInTools', () => { expect(registry.get('file_edit')).toBeDefined() expect(registry.get('grep')).toBeDefined() expect(registry.get('glob')).toBeDefined() + expect(registry.get('delegate_to_agent')).toBeUndefined() + }) + + it('registers delegate_to_agent when includeDelegateTool is set', () => { + const registry = new ToolRegistry() + registerBuiltInTools(registry, { includeDelegateTool: true }) + expect(registry.get('delegate_to_agent')).toBeDefined() }) it('BUILT_IN_TOOLS has correct length', () => { @@ -489,3 +501,241 @@ describe('grep', () => { expect(result.data.toLowerCase()).toContain('no such file') }) }) + +// =========================================================================== +// delegate_to_agent +// =========================================================================== + +const DELEGATE_OK: AgentRunResult = { + success: true, + output: 'research done', + messages: [], + tokenUsage: { input_tokens: 1, output_tokens: 2 }, + toolCalls: [], +} + +describe('delegate_to_agent', () => { + it('returns delegated agent output on success', async () => { + const runDelegatedAgent = vi.fn().mockResolvedValue(DELEGATE_OK) + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationDepth: 0, + maxDelegationDepth: 3, + delegationPool: { availableRunSlots: 2 }, + runDelegatedAgent, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Summarize X.' }, + ctx, + ) + + expect(result.isError).toBe(false) + expect(result.data).toBe('research done') + expect(runDelegatedAgent).toHaveBeenCalledWith('bob', 'Summarize X.') + }) + + it('errors when delegation would form a cycle (A -> B -> A)', async () => { + const ctx: ToolUseContext = { + agent: { name: 'bob', role: 'worker', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationDepth: 1, + maxDelegationDepth: 5, + delegationChain: ['alice', 'bob'], + delegationPool: { availableRunSlots: 2 }, + runDelegatedAgent: vi.fn().mockResolvedValue(DELEGATE_OK), + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'alice', prompt: 'loop back' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/Delegation cycle detected: alice -> bob -> alice/) + expect(ctx.team!.runDelegatedAgent).not.toHaveBeenCalled() + }) + + it('surfaces delegated run tokenUsage via ToolResult.metadata', async () => { + const runDelegatedAgent = vi.fn().mockResolvedValue({ + success: true, + output: 'answer', + messages: [], + tokenUsage: { input_tokens: 123, output_tokens: 45 }, + toolCalls: [], + } satisfies AgentRunResult) + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationPool: { availableRunSlots: 2 }, + runDelegatedAgent, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Hi' }, + ctx, + ) + + expect(result.metadata?.tokenUsage).toEqual({ input_tokens: 123, output_tokens: 45 }) + }) + + it('errors when delegation is not configured', async () => { + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { name: 't', agents: ['alice', 'bob'] }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/only available during orchestrated team runs/i) + }) + + it('errors for unknown target agent', async () => { + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + runDelegatedAgent: vi.fn(), + delegationPool: { availableRunSlots: 1 }, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'charlie', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/Unknown agent/) + }) + + it('errors on self-delegation', async () => { + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + runDelegatedAgent: vi.fn(), + delegationPool: { availableRunSlots: 1 }, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'alice', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/yourself/) + }) + + it('errors when delegation depth limit is reached', async () => { + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationDepth: 3, + maxDelegationDepth: 3, + runDelegatedAgent: vi.fn(), + delegationPool: { availableRunSlots: 1 }, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/Maximum delegation depth/) + }) + + it('errors fast when pool has no free slots without calling runDelegatedAgent', async () => { + const runDelegatedAgent = vi.fn() + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationPool: { availableRunSlots: 0 }, + runDelegatedAgent, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toMatch(/no free concurrency slot/i) + expect(runDelegatedAgent).not.toHaveBeenCalled() + }) + + it('writes unique SharedMemory audit keys for repeated delegations', async () => { + const store = new InMemoryStore() + const runDelegatedAgent = vi.fn().mockResolvedValue(DELEGATE_OK) + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + sharedMemory: store, + delegationPool: { availableRunSlots: 2 }, + runDelegatedAgent, + }, + } + + await delegateToAgentTool.execute({ target_agent: 'bob', prompt: 'a' }, ctx) + await delegateToAgentTool.execute({ target_agent: 'bob', prompt: 'b' }, ctx) + + const keys = (await store.list()).map((e) => e.key) + const delegationKeys = keys.filter((k) => k.includes('delegation:bob:')) + expect(delegationKeys).toHaveLength(2) + expect(delegationKeys[0]).not.toBe(delegationKeys[1]) + }) + + it('returns isError when delegated run reports success false', async () => { + const runDelegatedAgent = vi.fn().mockResolvedValue({ + success: false, + output: 'delegated agent failed', + messages: [], + tokenUsage: { input_tokens: 0, output_tokens: 0 }, + toolCalls: [], + } satisfies AgentRunResult) + + const ctx: ToolUseContext = { + agent: { name: 'alice', role: 'lead', model: 'test' }, + team: { + name: 't', + agents: ['alice', 'bob'], + delegationPool: { availableRunSlots: 1 }, + runDelegatedAgent, + }, + } + + const result = await delegateToAgentTool.execute( + { target_agent: 'bob', prompt: 'Hi' }, + ctx, + ) + + expect(result.isError).toBe(true) + expect(result.data).toBe('delegated agent failed') + }) +}) diff --git a/tests/delegation-budget.test.ts b/tests/delegation-budget.test.ts new file mode 100644 index 0000000..b32e79a --- /dev/null +++ b/tests/delegation-budget.test.ts @@ -0,0 +1,114 @@ +import { describe, it, expect } from 'vitest' +import { z } from 'zod' +import { AgentRunner } from '../src/agent/runner.js' +import { ToolRegistry, defineTool } from '../src/tool/framework.js' +import { ToolExecutor } from '../src/tool/executor.js' +import type { LLMAdapter, LLMMessage, LLMResponse, StreamEvent, ToolUseBlock, ToolResultBlock } from '../src/types.js' + +function toolUseResponse(toolName: string, input: Record): LLMResponse { + return { + id: `resp-${Math.random().toString(36).slice(2)}`, + content: [{ + type: 'tool_use', + id: `tu-${Math.random().toString(36).slice(2)}`, + name: toolName, + input, + }], + model: 'mock-model', + stop_reason: 'tool_use', + usage: { input_tokens: 5, output_tokens: 5 }, + } +} + +function textResponse(text: string): LLMResponse { + return { + id: `resp-${Math.random().toString(36).slice(2)}`, + content: [{ type: 'text', text }], + model: 'mock-model', + stop_reason: 'end_turn', + usage: { input_tokens: 5, output_tokens: 5 }, + } +} + +describe('delegation-triggered budget_exceeded', () => { + it('yields tool_result events and appends tool_result message before break', async () => { + // Parent turn 1: LLM asks for a delegation. + // Tool returns metadata.tokenUsage that alone pushes totalUsage past the budget. + // Expectation: stream yields tool_use AND tool_result, and the returned + // `messages` contains the user tool_result message, so downstream consumers + // can resume without API "tool_use without tool_result" errors. + const responses = [ + toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'work' }), + textResponse('should not be reached'), + ] + let idx = 0 + const adapter: LLMAdapter = { + name: 'mock', + async chat() { + return responses[idx++]! + }, + async *stream() { /* unused */ }, + } + + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'delegate_to_agent', + description: 'Fake delegation for test', + inputSchema: z.object({ target_agent: z.string(), prompt: z.string() }), + async execute() { + return { + data: 'delegated output', + metadata: { tokenUsage: { input_tokens: 500, output_tokens: 500 } }, + } + }, + }), + ) + + const runner = new AgentRunner(adapter, registry, new ToolExecutor(registry), { + model: 'mock-model', + allowedTools: ['delegate_to_agent'], + maxTurns: 5, + maxTokenBudget: 100, // 10 (parent LLM) + 1000 (delegation) ≫ 100 + agentName: 'parent', + }) + + const events: StreamEvent[] = [] + for await (const ev of runner.stream([{ role: 'user', content: [{ type: 'text', text: 'start' }] }])) { + events.push(ev) + } + + const toolUseEvents = events.filter((e): e is StreamEvent & { type: 'tool_use'; data: ToolUseBlock } => e.type === 'tool_use') + const toolResultEvents = events.filter((e): e is StreamEvent & { type: 'tool_result'; data: ToolResultBlock } => e.type === 'tool_result') + const budgetEvents = events.filter(e => e.type === 'budget_exceeded') + const doneEvents = events.filter((e): e is StreamEvent & { type: 'done'; data: { messages: LLMMessage[]; budgetExceeded?: boolean } } => e.type === 'done') + + // 1. Every tool_use event has a matching tool_result event. + expect(toolUseEvents).toHaveLength(1) + expect(toolResultEvents).toHaveLength(1) + expect(toolResultEvents[0]!.data.tool_use_id).toBe(toolUseEvents[0]!.data.id) + + // 2. Budget event fires and the run terminates with budgetExceeded=true. + expect(budgetEvents).toHaveLength(1) + expect(doneEvents).toHaveLength(1) + expect(doneEvents[0]!.data.budgetExceeded).toBe(true) + + // 3. Returned messages contain the tool_result user message so the + // conversation is API-resumable. + const messages = doneEvents[0]!.data.messages + const lastMsg = messages[messages.length - 1]! + expect(lastMsg.role).toBe('user') + const hasMatchingToolResult = lastMsg.content.some( + b => b.type === 'tool_result' && b.tool_use_id === toolUseEvents[0]!.data.id, + ) + expect(hasMatchingToolResult).toBe(true) + + // 4. Ordering: tool_result event is emitted before budget_exceeded. + const toolResultIdx = events.findIndex(e => e.type === 'tool_result') + const budgetIdx = events.findIndex(e => e.type === 'budget_exceeded') + expect(toolResultIdx).toBeLessThan(budgetIdx) + + // 5. LLM was only called once — we broke before a second turn. + expect(idx).toBe(1) + }) +}) diff --git a/tests/delegation-concurrency.test.ts b/tests/delegation-concurrency.test.ts new file mode 100644 index 0000000..ba9e940 --- /dev/null +++ b/tests/delegation-concurrency.test.ts @@ -0,0 +1,131 @@ +import { describe, it, expect, vi } from 'vitest' +import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js' +import type { AgentConfig, LLMChatOptions, LLMMessage, LLMResponse } from '../src/types.js' + +// Single shared mock adapter, routed by systemPrompt + first-turn user text. +vi.mock('../src/llm/adapter.js', () => ({ + createAdapter: async () => ({ + name: 'mock', + async chat(messages: LLMMessage[], options: LLMChatOptions): Promise { + const sys = options.systemPrompt ?? '' + const firstUserText = extractText(messages[0]?.content ?? []) + const onlyOneMessage = messages.length === 1 + + // Root parent task (turn 1) emits a delegation tool_use. + // Task description strings are set to 'ROOT-A' / 'ROOT-B' so we can + // distinguish the parent's first turn from the ephemeral delegate's + // first turn (which sees 'ping-A' / 'ping-B' as its user prompt). + if (onlyOneMessage && firstUserText.includes('ROOT-A')) { + return toolUseResponse('delegate_to_agent', { target_agent: 'B', prompt: 'ping-B' }) + } + if (onlyOneMessage && firstUserText.includes('ROOT-B')) { + return toolUseResponse('delegate_to_agent', { target_agent: 'A', prompt: 'ping-A' }) + } + + // Ephemeral delegate's first (and only) turn — return plain text so it + // terminates cleanly without another delegation. + if (onlyOneMessage) { + const who = sys.startsWith('A-') ? 'A' : 'B' + return textResponse(`${who} nested done`) + } + + // Root parent turn 2 — after tool_result. Return text to end the loop. + const who = sys.startsWith('A-') ? 'A' : 'B' + return textResponse(`${who} parent done`) + }, + async *stream() { yield { type: 'done' as const, data: {} } }, + }), +})) + +function textResponse(text: string): LLMResponse { + return { + id: `r-${Math.random().toString(36).slice(2)}`, + content: [{ type: 'text', text }], + model: 'mock-model', + stop_reason: 'end_turn', + usage: { input_tokens: 5, output_tokens: 5 }, + } +} + +function toolUseResponse(toolName: string, input: Record): LLMResponse { + return { + id: `r-${Math.random().toString(36).slice(2)}`, + content: [{ + type: 'tool_use', + id: `tu-${Math.random().toString(36).slice(2)}`, + name: toolName, + input, + }], + model: 'mock-model', + stop_reason: 'tool_use', + usage: { input_tokens: 5, output_tokens: 5 }, + } +} + +function extractText(content: readonly { type: string; text?: string }[]): string { + return content + .filter((b): b is { type: 'text'; text: string } => b.type === 'text' && typeof b.text === 'string') + .map((b) => b.text) + .join(' ') +} + +function agentA(): AgentConfig { + return { + name: 'A', + model: 'mock-model', + provider: 'openai', + // sysPrompt prefix used by the mock to disambiguate A vs B. + systemPrompt: 'A-agent. You are agent A. Delegate to B when asked.', + tools: ['delegate_to_agent'], + maxTurns: 4, + } +} + +function agentB(): AgentConfig { + return { + name: 'B', + model: 'mock-model', + provider: 'openai', + systemPrompt: 'B-agent. You are agent B. Delegate to A when asked.', + tools: ['delegate_to_agent'], + maxTurns: 4, + } +} + +describe('mutual delegation (A↔B) completes without agent-lock deadlock', () => { + it('two parallel root tasks both finish when each delegates to the other', async () => { + // Previously: pool.run('B') inside A's tool call waited on B's agent lock + // (held by the parent B task), while pool.run('A') inside B's tool call + // waited on A's agent lock — classic mutual deadlock. + // After the fix: delegation uses runEphemeral on a fresh Agent instance, + // so neither call touches the per-agent lock. + const oma = new OpenMultiAgent({ + defaultModel: 'mock-model', + defaultProvider: 'openai', + // Need room for 2 parent runs + 2 ephemeral delegates. + maxConcurrency: 4, + }) + const team = oma.createTeam('mutual', { + name: 'mutual', + agents: [agentA(), agentB()], + sharedMemory: false, + }) + + // Race against a 10s timeout so a regression surfaces as a test failure + // rather than a hanging CI job. + const runPromise = oma.runTasks(team, [ + { title: 'Task A', description: 'ROOT-A', assignee: 'A' }, + { title: 'Task B', description: 'ROOT-B', assignee: 'B' }, + ]) + const timeout = new Promise((_resolve, reject) => + setTimeout(() => reject(new Error('mutual delegation deadlock (timeout)')), 10_000), + ) + + const result = (await Promise.race([runPromise, timeout])) as Awaited + + expect(result.success).toBe(true) + const agentOutputs = [...result.agentResults.values()].map((r) => r.output) + expect(agentOutputs.some((o) => o.includes('A parent done'))).toBe(true) + expect(agentOutputs.some((o) => o.includes('B parent done'))).toBe(true) + }) +}) diff --git a/tests/semaphore.test.ts b/tests/semaphore.test.ts index ddc1b34..734d07c 100644 --- a/tests/semaphore.test.ts +++ b/tests/semaphore.test.ts @@ -6,6 +6,10 @@ describe('Semaphore', () => { expect(() => new Semaphore(0)).toThrow() }) + it('exposes configured limit', () => { + expect(new Semaphore(5).limit).toBe(5) + }) + it('allows up to max concurrent holders', async () => { const sem = new Semaphore(2) let running = 0 diff --git a/tests/tool-result-compression.test.ts b/tests/tool-result-compression.test.ts index 0dd37fa..77601b3 100644 --- a/tests/tool-result-compression.test.ts +++ b/tests/tool-result-compression.test.ts @@ -495,4 +495,49 @@ describe('AgentRunner compressToolResults', () => { const allToolResults = extractToolResultContents(turn3Messages) expect(allToolResults[0]).toContain('compressed') }) + + it('does NOT compress delegate_to_agent results on turn 3+', async () => { + const calls: LLMMessage[][] = [] + const longOutput = 'y'.repeat(600) + const responses = [ + toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'do work' }), + toolUseResponse('delegate_to_agent', { target_agent: 'bob', prompt: 'do more' }), + textResponse('done'), + ] + let idx = 0 + const adapter: LLMAdapter = { + name: 'mock', + async chat(messages) { + calls.push(messages.map(m => ({ role: m.role, content: [...m.content] }))) + return responses[idx++]! + }, + async *stream() { /* unused */ }, + } + const registry = new ToolRegistry() + registry.register( + defineTool({ + name: 'delegate_to_agent', + description: 'Fake delegation tool for test', + inputSchema: z.object({ target_agent: z.string(), prompt: z.string() }), + async execute() { + return { data: longOutput } + }, + }), + ) + const runner = new AgentRunner(adapter, registry, new ToolExecutor(registry), { + model: 'mock-model', + allowedTools: ['delegate_to_agent'], + maxTurns: 5, + compressToolResults: true, + }) + + await runner.run([{ role: 'user', content: [{ type: 'text', text: 'start' }] }]) + + // Turn 3: both delegation results should survive unchanged. + const turn3Messages = calls[2]! + const allToolResults = extractToolResultContents(turn3Messages) + expect(allToolResults).toHaveLength(2) + expect(allToolResults[0]).toBe(longOutput) + expect(allToolResults[1]).toBe(longOutput) + }) })