diff --git a/src/agent/pool.ts b/src/agent/pool.ts index aba0eb8..18545b4 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -58,6 +58,14 @@ export interface PoolStatus { export class AgentPool { private readonly agents: Map = new Map() private readonly semaphore: Semaphore + /** + * Per-agent mutex (Semaphore(1)) to serialize concurrent runs on the same + * Agent instance. Without this, two tasks assigned to the same agent could + * race on mutable instance state (`status`, `messages`, `tokenUsage`). + * + * @see https://github.com/anthropics/open-multi-agent/issues/72 + */ + private readonly agentLocks: Map = new Map() /** Cursor used by `runAny` for round-robin dispatch. */ private roundRobinIndex = 0 @@ -86,6 +94,7 @@ export class AgentPool { ) } this.agents.set(agent.name, agent) + this.agentLocks.set(agent.name, new Semaphore(1)) } /** @@ -98,6 +107,7 @@ export class AgentPool { throw new Error(`AgentPool: agent '${name}' is not registered.`) } this.agents.delete(name) + this.agentLocks.delete(name) } /** @@ -130,12 +140,20 @@ export class AgentPool { runOptions?: Partial, ): Promise { const agent = this.requireAgent(agentName) + const agentLock = this.agentLocks.get(agentName)! - await this.semaphore.acquire() + // Acquire per-agent lock first so the second call for the same agent waits + // here without consuming a pool slot. Then acquire the pool semaphore. + await agentLock.acquire() try { - return await agent.run(prompt, runOptions) + await this.semaphore.acquire() + try { + return await agent.run(prompt, runOptions) + } finally { + this.semaphore.release() + } } finally { - this.semaphore.release() + agentLock.release() } } @@ -200,11 +218,18 @@ export class AgentPool { const agent = allAgents[this.roundRobinIndex]! this.roundRobinIndex = (this.roundRobinIndex + 1) % allAgents.length - await this.semaphore.acquire() + const agentLock = this.agentLocks.get(agent.name)! + + await agentLock.acquire() try { - return await agent.run(prompt) + await this.semaphore.acquire() + try { + return await agent.run(prompt) + } finally { + this.semaphore.release() + } } finally { - this.semaphore.release() + agentLock.release() } } diff --git a/tests/agent-pool.test.ts b/tests/agent-pool.test.ts index 1cfe1d6..343c5b4 100644 --- a/tests/agent-pool.test.ts +++ b/tests/agent-pool.test.ts @@ -178,6 +178,89 @@ describe('AgentPool', () => { }) }) + describe('per-agent serialization (#72)', () => { + it('serializes concurrent runs on the same agent', async () => { + const executionLog: string[] = [] + + const agent = createMockAgent('dev') + ;(agent.run as ReturnType).mockImplementation(async (prompt: string) => { + executionLog.push(`start:${prompt}`) + await new Promise(r => setTimeout(r, 50)) + executionLog.push(`end:${prompt}`) + return SUCCESS_RESULT + }) + + const pool = new AgentPool(5) + pool.add(agent) + + // Fire two runs for the same agent concurrently + await Promise.all([ + pool.run('dev', 'task1'), + pool.run('dev', 'task2'), + ]) + + // With per-agent serialization, runs must not overlap: + // [start:task1, end:task1, start:task2, end:task2] (or reverse order) + // i.e. no interleaving like [start:task1, start:task2, ...] + expect(executionLog).toHaveLength(4) + expect(executionLog[0]).toMatch(/^start:/) + expect(executionLog[1]).toMatch(/^end:/) + expect(executionLog[2]).toMatch(/^start:/) + expect(executionLog[3]).toMatch(/^end:/) + }) + + it('allows different agents to run in parallel', async () => { + let concurrent = 0 + let maxConcurrent = 0 + + const makeTimedAgent = (name: string): Agent => { + const agent = createMockAgent(name) + ;(agent.run as ReturnType).mockImplementation(async () => { + concurrent++ + maxConcurrent = Math.max(maxConcurrent, concurrent) + await new Promise(r => setTimeout(r, 50)) + concurrent-- + return SUCCESS_RESULT + }) + return agent + } + + const pool = new AgentPool(5) + pool.add(makeTimedAgent('a')) + pool.add(makeTimedAgent('b')) + + await Promise.all([ + pool.run('a', 'x'), + pool.run('b', 'y'), + ]) + + // Different agents should run concurrently + expect(maxConcurrent).toBe(2) + }) + + it('releases agent lock even when run() throws', async () => { + const agent = createMockAgent('dev') + let callCount = 0 + ;(agent.run as ReturnType).mockImplementation(async () => { + callCount++ + if (callCount === 1) throw new Error('first run fails') + return SUCCESS_RESULT + }) + + const pool = new AgentPool(5) + pool.add(agent) + + // First run fails, second should still execute (not deadlock) + const results = await Promise.allSettled([ + pool.run('dev', 'will-fail'), + pool.run('dev', 'should-succeed'), + ]) + + expect(results[0]!.status).toBe('rejected') + expect(results[1]!.status).toBe('fulfilled') + }) + }) + describe('concurrency', () => { it('respects maxConcurrency limit', async () => { let concurrent = 0