Compare commits
4 Commits
60fb2b142e
...
607ba57a69
| Author | SHA1 | Date |
|---|---|---|
|
|
607ba57a69 | |
|
|
5a67d559a3 | |
|
|
a29d87f384 | |
|
|
73b2454c2f |
|
|
@ -3,5 +3,4 @@ dist/
|
|||
coverage/
|
||||
*.tgz
|
||||
.DS_Store
|
||||
promo-*.md
|
||||
non-tech_*/
|
||||
docs/
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ This is the framework's key feature. When `runTeam()` is called:
|
|||
|
||||
### Concurrency Control
|
||||
|
||||
Two independent semaphores: `AgentPool` (max concurrent agent runs, default 5) and `ToolExecutor` (max concurrent tool calls, default 4).
|
||||
Three semaphore layers: `AgentPool` pool-level (max concurrent agent runs, default 5), `AgentPool` per-agent mutex (serializes concurrent runs on the same `Agent` instance), and `ToolExecutor` (max concurrent tool calls, default 4).
|
||||
|
||||
### Structured Output
|
||||
|
||||
|
|
|
|||
10
DECISIONS.md
10
DECISIONS.md
|
|
@ -30,14 +30,8 @@ If you're considering a PR in any of these areas, please open a discussion first
|
|||
|
||||
**What**: Anthropic's protocol for connecting LLMs to external tools and data sources.
|
||||
|
||||
**Why not**: MCP is valuable but targets a different layer. Our `defineTool()` API already lets users wrap any external service as a tool in ~10 lines of code. Adding MCP would mean maintaining protocol compatibility, transport layers, and tool discovery — complexity that serves tool platform builders, not our target users who just want to run agent teams.
|
||||
|
||||
### 5. Dashboard / Visualization
|
||||
|
||||
**What**: Built-in web UI to visualize task DAGs, agent activity, and token usage.
|
||||
|
||||
**Why not**: We expose data, we don't build UI. The `onProgress` callback and upcoming `onTrace` (#18) give users all the raw data. They can pipe it into Grafana, build a custom dashboard, or use console logs. Shipping a web UI means owning a frontend stack, which is outside our scope.
|
||||
**Why not now**: Our `defineTool()` API lets users wrap any external service as a tool in ~10 lines of code, and adding MCP would introduce `@modelcontextprotocol/sdk` as a new dependency plus transport layer management, breaking our 3-dependency minimal principle. However, the MCP tool ecosystem has grown significantly — many services now ship MCP servers directly, and asking users to re-wrap each one via `defineTool()` creates unnecessary friction. **This decision may be revisited** when community demand is clear or a lightweight integration approach emerges (e.g., optional peer dependency).
|
||||
|
||||
---
|
||||
|
||||
*Last updated: 2026-04-03*
|
||||
*Last updated: 2026-04-07*
|
||||
|
|
|
|||
|
|
@ -58,6 +58,14 @@ export interface PoolStatus {
|
|||
export class AgentPool {
|
||||
private readonly agents: Map<string, Agent> = new Map()
|
||||
private readonly semaphore: Semaphore
|
||||
/**
|
||||
* Per-agent mutex (Semaphore(1)) to serialize concurrent runs on the same
|
||||
* Agent instance. Without this, two tasks assigned to the same agent could
|
||||
* race on mutable instance state (`status`, `messages`, `tokenUsage`).
|
||||
*
|
||||
* @see https://github.com/anthropics/open-multi-agent/issues/72
|
||||
*/
|
||||
private readonly agentLocks: Map<string, Semaphore> = new Map()
|
||||
/** Cursor used by `runAny` for round-robin dispatch. */
|
||||
private roundRobinIndex = 0
|
||||
|
||||
|
|
@ -86,6 +94,7 @@ export class AgentPool {
|
|||
)
|
||||
}
|
||||
this.agents.set(agent.name, agent)
|
||||
this.agentLocks.set(agent.name, new Semaphore(1))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,6 +107,7 @@ export class AgentPool {
|
|||
throw new Error(`AgentPool: agent '${name}' is not registered.`)
|
||||
}
|
||||
this.agents.delete(name)
|
||||
this.agentLocks.delete(name)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -130,12 +140,20 @@ export class AgentPool {
|
|||
runOptions?: Partial<RunOptions>,
|
||||
): Promise<AgentRunResult> {
|
||||
const agent = this.requireAgent(agentName)
|
||||
const agentLock = this.agentLocks.get(agentName)!
|
||||
|
||||
await this.semaphore.acquire()
|
||||
// Acquire per-agent lock first so the second call for the same agent waits
|
||||
// here without consuming a pool slot. Then acquire the pool semaphore.
|
||||
await agentLock.acquire()
|
||||
try {
|
||||
return await agent.run(prompt, runOptions)
|
||||
await this.semaphore.acquire()
|
||||
try {
|
||||
return await agent.run(prompt, runOptions)
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
}
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
agentLock.release()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -200,11 +218,18 @@ export class AgentPool {
|
|||
const agent = allAgents[this.roundRobinIndex]!
|
||||
this.roundRobinIndex = (this.roundRobinIndex + 1) % allAgents.length
|
||||
|
||||
await this.semaphore.acquire()
|
||||
const agentLock = this.agentLocks.get(agent.name)!
|
||||
|
||||
await agentLock.acquire()
|
||||
try {
|
||||
return await agent.run(prompt)
|
||||
await this.semaphore.acquire()
|
||||
try {
|
||||
return await agent.run(prompt)
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
}
|
||||
} finally {
|
||||
this.semaphore.release()
|
||||
agentLock.release()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -178,6 +178,89 @@ describe('AgentPool', () => {
|
|||
})
|
||||
})
|
||||
|
||||
describe('per-agent serialization (#72)', () => {
|
||||
it('serializes concurrent runs on the same agent', async () => {
|
||||
const executionLog: string[] = []
|
||||
|
||||
const agent = createMockAgent('dev')
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async (prompt: string) => {
|
||||
executionLog.push(`start:${prompt}`)
|
||||
await new Promise(r => setTimeout(r, 50))
|
||||
executionLog.push(`end:${prompt}`)
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(agent)
|
||||
|
||||
// Fire two runs for the same agent concurrently
|
||||
await Promise.all([
|
||||
pool.run('dev', 'task1'),
|
||||
pool.run('dev', 'task2'),
|
||||
])
|
||||
|
||||
// With per-agent serialization, runs must not overlap:
|
||||
// [start:task1, end:task1, start:task2, end:task2] (or reverse order)
|
||||
// i.e. no interleaving like [start:task1, start:task2, ...]
|
||||
expect(executionLog).toHaveLength(4)
|
||||
expect(executionLog[0]).toMatch(/^start:/)
|
||||
expect(executionLog[1]).toMatch(/^end:/)
|
||||
expect(executionLog[2]).toMatch(/^start:/)
|
||||
expect(executionLog[3]).toMatch(/^end:/)
|
||||
})
|
||||
|
||||
it('allows different agents to run in parallel', async () => {
|
||||
let concurrent = 0
|
||||
let maxConcurrent = 0
|
||||
|
||||
const makeTimedAgent = (name: string): Agent => {
|
||||
const agent = createMockAgent(name)
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async () => {
|
||||
concurrent++
|
||||
maxConcurrent = Math.max(maxConcurrent, concurrent)
|
||||
await new Promise(r => setTimeout(r, 50))
|
||||
concurrent--
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
return agent
|
||||
}
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(makeTimedAgent('a'))
|
||||
pool.add(makeTimedAgent('b'))
|
||||
|
||||
await Promise.all([
|
||||
pool.run('a', 'x'),
|
||||
pool.run('b', 'y'),
|
||||
])
|
||||
|
||||
// Different agents should run concurrently
|
||||
expect(maxConcurrent).toBe(2)
|
||||
})
|
||||
|
||||
it('releases agent lock even when run() throws', async () => {
|
||||
const agent = createMockAgent('dev')
|
||||
let callCount = 0
|
||||
;(agent.run as ReturnType<typeof vi.fn>).mockImplementation(async () => {
|
||||
callCount++
|
||||
if (callCount === 1) throw new Error('first run fails')
|
||||
return SUCCESS_RESULT
|
||||
})
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(agent)
|
||||
|
||||
// First run fails, second should still execute (not deadlock)
|
||||
const results = await Promise.allSettled([
|
||||
pool.run('dev', 'will-fail'),
|
||||
pool.run('dev', 'should-succeed'),
|
||||
])
|
||||
|
||||
expect(results[0]!.status).toBe('rejected')
|
||||
expect(results[1]!.status).toBe('fulfilled')
|
||||
})
|
||||
})
|
||||
|
||||
describe('concurrency', () => {
|
||||
it('respects maxConcurrency limit', async () => {
|
||||
let concurrent = 0
|
||||
|
|
|
|||
Loading…
Reference in New Issue