fix: add per-agent mutex to prevent concurrent runs on same Agent instance (#72)
AgentPool now maintains a per-agent Semaphore(1) that serializes concurrent run() calls targeting the same Agent. This prevents shared-state races on Agent.state (status, messages, tokenUsage) when multiple independent tasks are assigned to the same agent. Lock acquisition order: per-agent lock first, then pool semaphore, so queued tasks don't waste pool slots while waiting.
This commit is contained in:
parent
60fb2b142e
commit
73b2454c2f
|
|
@ -58,6 +58,14 @@ export interface PoolStatus {
|
|||
export class AgentPool {
|
||||
private readonly agents: Map<string, Agent> = new Map()
|
||||
private readonly semaphore: Semaphore
|
||||
/**
|
||||
* Per-agent mutex (Semaphore(1)) to serialize concurrent runs on the same
|
||||
* Agent instance. Without this, two tasks assigned to the same agent could
|
||||
* race on mutable instance state (`status`, `messages`, `tokenUsage`).
|
||||
*
|
||||
* @see https://github.com/anthropics/open-multi-agent/issues/72
|
||||
*/
|
||||
private readonly agentLocks: Map<string, Semaphore> = new Map()
|
||||
/** Cursor used by `runAny` for round-robin dispatch. */
|
||||
private roundRobinIndex = 0
|
||||
|
||||
|
|
@ -86,6 +94,7 @@ export class AgentPool {
|
|||
)
|
||||
}
|
||||
this.agents.set(agent.name, agent)
|
||||
this.agentLocks.set(agent.name, new Semaphore(1))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,6 +107,7 @@ export class AgentPool {
|
|||
throw new Error(`AgentPool: agent '${name}' is not registered.`)
|
||||
}
|
||||
this.agents.delete(name)
|
||||
this.agentLocks.delete(name)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -130,13 +140,21 @@ export class AgentPool {
|
|||
runOptions?: Partial<RunOptions>,
|
||||
): Promise<AgentRunResult> {
|
||||
const agent = this.requireAgent(agentName)
|
||||
const agentLock = this.agentLocks.get(agentName)!
|
||||
|
||||
// Acquire per-agent lock first so the second call for the same agent waits
|
||||
// here without consuming a pool slot. Then acquire the pool semaphore.
|
||||
await agentLock.acquire()
|
||||
try {
|
||||
await this.semaphore.acquire()
|
||||
try {
|
||||
return await agent.run(prompt, runOptions)
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
}
|
||||
} finally {
|
||||
agentLock.release()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -200,12 +218,19 @@ export class AgentPool {
|
|||
const agent = allAgents[this.roundRobinIndex]!
|
||||
this.roundRobinIndex = (this.roundRobinIndex + 1) % allAgents.length
|
||||
|
||||
const agentLock = this.agentLocks.get(agent.name)!
|
||||
|
||||
await agentLock.acquire()
|
||||
try {
|
||||
await this.semaphore.acquire()
|
||||
try {
|
||||
return await agent.run(prompt)
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
}
|
||||
} finally {
|
||||
agentLock.release()
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -178,6 +178,89 @@ describe('AgentPool', () => {
|
|||
})
|
||||
})
|
||||
|
||||
describe('per-agent serialization (#72)', () => {
|
||||
it('serializes concurrent runs on the same agent', async () => {
|
||||
const executionLog: string[] = []
|
||||
|
||||
const agent = createMockAgent('dev')
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async (prompt: string) => {
|
||||
executionLog.push(`start:${prompt}`)
|
||||
await new Promise(r => setTimeout(r, 50))
|
||||
executionLog.push(`end:${prompt}`)
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(agent)
|
||||
|
||||
// Fire two runs for the same agent concurrently
|
||||
await Promise.all([
|
||||
pool.run('dev', 'task1'),
|
||||
pool.run('dev', 'task2'),
|
||||
])
|
||||
|
||||
// With per-agent serialization, runs must not overlap:
|
||||
// [start:task1, end:task1, start:task2, end:task2] (or reverse order)
|
||||
// i.e. no interleaving like [start:task1, start:task2, ...]
|
||||
expect(executionLog).toHaveLength(4)
|
||||
expect(executionLog[0]).toMatch(/^start:/)
|
||||
expect(executionLog[1]).toMatch(/^end:/)
|
||||
expect(executionLog[2]).toMatch(/^start:/)
|
||||
expect(executionLog[3]).toMatch(/^end:/)
|
||||
})
|
||||
|
||||
it('allows different agents to run in parallel', async () => {
|
||||
let concurrent = 0
|
||||
let maxConcurrent = 0
|
||||
|
||||
const makeTimedAgent = (name: string): Agent => {
|
||||
const agent = createMockAgent(name)
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async () => {
|
||||
concurrent++
|
||||
maxConcurrent = Math.max(maxConcurrent, concurrent)
|
||||
await new Promise(r => setTimeout(r, 50))
|
||||
concurrent--
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
return agent
|
||||
}
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(makeTimedAgent('a'))
|
||||
pool.add(makeTimedAgent('b'))
|
||||
|
||||
await Promise.all([
|
||||
pool.run('a', 'x'),
|
||||
pool.run('b', 'y'),
|
||||
])
|
||||
|
||||
// Different agents should run concurrently
|
||||
expect(maxConcurrent).toBe(2)
|
||||
})
|
||||
|
||||
it('releases agent lock even when run() throws', async () => {
|
||||
const agent = createMockAgent('dev')
|
||||
let callCount = 0
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async () => {
|
||||
callCount++
|
||||
if (callCount === 1) throw new Error('first run fails')
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(agent)
|
||||
|
||||
// First run fails, second should still execute (not deadlock)
|
||||
const results = await Promise.allSettled([
|
||||
pool.run('dev', 'will-fail'),
|
||||
pool.run('dev', 'should-succeed'),
|
||||
])
|
||||
|
||||
expect(results[0]!.status).toBe('rejected')
|
||||
expect(results[1]!.status).toBe('fulfilled')
|
||||
})
|
||||
})
|
||||
|
||||
describe('concurrency', () => {
|
||||
it('respects maxConcurrency limit', async () => {
|
||||
let concurrent = 0
|
||||
|
|
|
|||
Loading…
Reference in New Issue