From 73b2454c2f8a7a6b91a5ec0ca885ecba5bd58386 Mon Sep 17 00:00:00 2001 From: JackChen Date: Tue, 7 Apr 2026 03:42:24 +0800 Subject: [PATCH 1/2] fix: add per-agent mutex to prevent concurrent runs on same Agent instance (#72) AgentPool now maintains a per-agent Semaphore(1) that serializes concurrent run() calls targeting the same Agent. This prevents shared-state races on Agent.state (status, messages, tokenUsage) when multiple independent tasks are assigned to the same agent. Lock acquisition order: per-agent lock first, then pool semaphore, so queued tasks don't waste pool slots while waiting. --- src/agent/pool.ts | 37 +++++++++++++++--- tests/agent-pool.test.ts | 83 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 6 deletions(-) diff --git a/src/agent/pool.ts b/src/agent/pool.ts index aba0eb8..18545b4 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -58,6 +58,14 @@ export interface PoolStatus { export class AgentPool { private readonly agents: Map = new Map() private readonly semaphore: Semaphore + /** + * Per-agent mutex (Semaphore(1)) to serialize concurrent runs on the same + * Agent instance. Without this, two tasks assigned to the same agent could + * race on mutable instance state (`status`, `messages`, `tokenUsage`). + * + * @see https://github.com/anthropics/open-multi-agent/issues/72 + */ + private readonly agentLocks: Map = new Map() /** Cursor used by `runAny` for round-robin dispatch. */ private roundRobinIndex = 0 @@ -86,6 +94,7 @@ export class AgentPool { ) } this.agents.set(agent.name, agent) + this.agentLocks.set(agent.name, new Semaphore(1)) } /** @@ -98,6 +107,7 @@ export class AgentPool { throw new Error(`AgentPool: agent '${name}' is not registered.`) } this.agents.delete(name) + this.agentLocks.delete(name) } /** @@ -130,12 +140,20 @@ export class AgentPool { runOptions?: Partial, ): Promise { const agent = this.requireAgent(agentName) + const agentLock = this.agentLocks.get(agentName)! - await this.semaphore.acquire() + // Acquire per-agent lock first so the second call for the same agent waits + // here without consuming a pool slot. Then acquire the pool semaphore. + await agentLock.acquire() try { - return await agent.run(prompt, runOptions) + await this.semaphore.acquire() + try { + return await agent.run(prompt, runOptions) + } finally { + this.semaphore.release() + } } finally { - this.semaphore.release() + agentLock.release() } } @@ -200,11 +218,18 @@ export class AgentPool { const agent = allAgents[this.roundRobinIndex]! this.roundRobinIndex = (this.roundRobinIndex + 1) % allAgents.length - await this.semaphore.acquire() + const agentLock = this.agentLocks.get(agent.name)! + + await agentLock.acquire() try { - return await agent.run(prompt) + await this.semaphore.acquire() + try { + return await agent.run(prompt) + } finally { + this.semaphore.release() + } } finally { - this.semaphore.release() + agentLock.release() } } diff --git a/tests/agent-pool.test.ts b/tests/agent-pool.test.ts index 1cfe1d6..343c5b4 100644 --- a/tests/agent-pool.test.ts +++ b/tests/agent-pool.test.ts @@ -178,6 +178,89 @@ describe('AgentPool', () => { }) }) + describe('per-agent serialization (#72)', () => { + it('serializes concurrent runs on the same agent', async () => { + const executionLog: string[] = [] + + const agent = createMockAgent('dev') + ;(agent.run as ReturnType).mockImplementation(async (prompt: string) => { + executionLog.push(`start:${prompt}`) + await new Promise(r => setTimeout(r, 50)) + executionLog.push(`end:${prompt}`) + return SUCCESS_RESULT + }) + + const pool = new AgentPool(5) + pool.add(agent) + + // Fire two runs for the same agent concurrently + await Promise.all([ + pool.run('dev', 'task1'), + pool.run('dev', 'task2'), + ]) + + // With per-agent serialization, runs must not overlap: + // [start:task1, end:task1, start:task2, end:task2] (or reverse order) + // i.e. no interleaving like [start:task1, start:task2, ...] + expect(executionLog).toHaveLength(4) + expect(executionLog[0]).toMatch(/^start:/) + expect(executionLog[1]).toMatch(/^end:/) + expect(executionLog[2]).toMatch(/^start:/) + expect(executionLog[3]).toMatch(/^end:/) + }) + + it('allows different agents to run in parallel', async () => { + let concurrent = 0 + let maxConcurrent = 0 + + const makeTimedAgent = (name: string): Agent => { + const agent = createMockAgent(name) + ;(agent.run as ReturnType).mockImplementation(async () => { + concurrent++ + maxConcurrent = Math.max(maxConcurrent, concurrent) + await new Promise(r => setTimeout(r, 50)) + concurrent-- + return SUCCESS_RESULT + }) + return agent + } + + const pool = new AgentPool(5) + pool.add(makeTimedAgent('a')) + pool.add(makeTimedAgent('b')) + + await Promise.all([ + pool.run('a', 'x'), + pool.run('b', 'y'), + ]) + + // Different agents should run concurrently + expect(maxConcurrent).toBe(2) + }) + + it('releases agent lock even when run() throws', async () => { + const agent = createMockAgent('dev') + let callCount = 0 + ;(agent.run as ReturnType).mockImplementation(async () => { + callCount++ + if (callCount === 1) throw new Error('first run fails') + return SUCCESS_RESULT + }) + + const pool = new AgentPool(5) + pool.add(agent) + + // First run fails, second should still execute (not deadlock) + const results = await Promise.allSettled([ + pool.run('dev', 'will-fail'), + pool.run('dev', 'should-succeed'), + ]) + + expect(results[0]!.status).toBe('rejected') + expect(results[1]!.status).toBe('fulfilled') + }) + }) + describe('concurrency', () => { it('respects maxConcurrency limit', async () => { let concurrent = 0 From a29d87f384aa807b1e5de9f3ec537073cd33f0ae Mon Sep 17 00:00:00 2001 From: JackChen Date: Tue, 7 Apr 2026 10:46:08 +0800 Subject: [PATCH 2/2] docs: update CLAUDE.md concurrency section for per-agent mutex --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 6cbeb45..7a74bdb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,7 +55,7 @@ This is the framework's key feature. When `runTeam()` is called: ### Concurrency Control -Two independent semaphores: `AgentPool` (max concurrent agent runs, default 5) and `ToolExecutor` (max concurrent tool calls, default 4). +Three semaphore layers: `AgentPool` pool-level (max concurrent agent runs, default 5), `AgentPool` per-agent mutex (serializes concurrent runs on the same `Agent` instance), and `ToolExecutor` (max concurrent tool calls, default 4). ### Structured Output