feat: add AbortSignal support to runTeam() and runTasks()
Fixes #61 Thread AbortSignal from the top-level API through RunContext to executeQueue(), enabling graceful cancellation in Express, Next.js, serverless, and CLI scenarios. Changes: - Added optional to RunContext interface - now accepts - now accepts - executeQueue() checks signal.aborted before each dispatch round and skips remaining tasks when cancelled - Signal is forwarded to coordinator's run() and per-task pool.run() so in-flight LLM calls are also cancelled - Full backward compatibility: both methods work without options The abort infrastructure already existed at lower layers (AgentRunner, Agent, AgentPool) — this commit bridges the last gap at the orchestrator level.
This commit is contained in:
parent
d59898ce3d
commit
fa8242456a
|
|
@ -1,12 +1,12 @@
|
||||||
{
|
{
|
||||||
"name": "@jackchen_me/open-multi-agent",
|
"name": "@jackchen_me/open-multi-agent",
|
||||||
"version": "0.2.0",
|
"version": "1.0.1",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "@jackchen_me/open-multi-agent",
|
"name": "@jackchen_me/open-multi-agent",
|
||||||
"version": "0.2.0",
|
"version": "1.0.1",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@anthropic-ai/sdk": "^0.52.0",
|
"@anthropic-ai/sdk": "^0.52.0",
|
||||||
|
|
|
||||||
|
|
@ -264,6 +264,8 @@ interface RunContext {
|
||||||
readonly config: OrchestratorConfig
|
readonly config: OrchestratorConfig
|
||||||
/** Trace run ID, present when `onTrace` is configured. */
|
/** Trace run ID, present when `onTrace` is configured. */
|
||||||
readonly runId?: string
|
readonly runId?: string
|
||||||
|
/** AbortSignal for run-level cancellation. Checked between task dispatch rounds. */
|
||||||
|
readonly abortSignal?: AbortSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -295,6 +297,15 @@ async function executeQueue(
|
||||||
: undefined
|
: undefined
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
// Check for cancellation before each dispatch round.
|
||||||
|
if (ctx.abortSignal?.aborted) {
|
||||||
|
// Mark all remaining pending tasks as skipped.
|
||||||
|
for (const t of queue.getByStatus('pending')) {
|
||||||
|
queue.update(t.id, { status: 'skipped' as TaskStatus })
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// Re-run auto-assignment each iteration so tasks that were unblocked since
|
// Re-run auto-assignment each iteration so tasks that were unblocked since
|
||||||
// the last round (and thus have no assignee yet) get assigned before dispatch.
|
// the last round (and thus have no assignee yet) get assigned before dispatch.
|
||||||
scheduler.autoAssign(queue, team.getAgents())
|
scheduler.autoAssign(queue, team.getAgents())
|
||||||
|
|
@ -360,8 +371,8 @@ async function executeQueue(
|
||||||
|
|
||||||
// Build trace context for this task's agent run
|
// Build trace context for this task's agent run
|
||||||
const traceOptions: Partial<RunOptions> | undefined = config.onTrace
|
const traceOptions: Partial<RunOptions> | undefined = config.onTrace
|
||||||
? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee }
|
? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee, abortSignal: ctx.abortSignal }
|
||||||
: undefined
|
: ctx.abortSignal ? { abortSignal: ctx.abortSignal } : undefined
|
||||||
|
|
||||||
const taskStartMs = config.onTrace ? Date.now() : 0
|
const taskStartMs = config.onTrace ? Date.now() : 0
|
||||||
let retryCount = 0
|
let retryCount = 0
|
||||||
|
|
@ -638,7 +649,7 @@ export class OpenMultiAgent {
|
||||||
* @param team - A team created via {@link createTeam} (or `new Team(...)`).
|
* @param team - A team created via {@link createTeam} (or `new Team(...)`).
|
||||||
* @param goal - High-level natural-language goal for the team.
|
* @param goal - High-level natural-language goal for the team.
|
||||||
*/
|
*/
|
||||||
async runTeam(team: Team, goal: string): Promise<TeamRunResult> {
|
async runTeam(team: Team, goal: string, options?: { abortSignal?: AbortSignal }): Promise<TeamRunResult> {
|
||||||
const agentConfigs = team.getAgents()
|
const agentConfigs = team.getAgents()
|
||||||
|
|
||||||
// ------------------------------------------------------------------
|
// ------------------------------------------------------------------
|
||||||
|
|
@ -665,8 +676,8 @@ export class OpenMultiAgent {
|
||||||
})
|
})
|
||||||
|
|
||||||
const decompTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
|
const decompTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
|
||||||
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' }
|
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator', abortSignal: options?.abortSignal }
|
||||||
: undefined
|
: options?.abortSignal ? { abortSignal: options.abortSignal } : undefined
|
||||||
const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions)
|
const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions)
|
||||||
const agentResults = new Map<string, AgentRunResult>()
|
const agentResults = new Map<string, AgentRunResult>()
|
||||||
agentResults.set('coordinator:decompose', decompositionResult)
|
agentResults.set('coordinator:decompose', decompositionResult)
|
||||||
|
|
@ -712,6 +723,7 @@ export class OpenMultiAgent {
|
||||||
agentResults,
|
agentResults,
|
||||||
config: this.config,
|
config: this.config,
|
||||||
runId,
|
runId,
|
||||||
|
abortSignal: options?.abortSignal,
|
||||||
}
|
}
|
||||||
|
|
||||||
await executeQueue(queue, ctx)
|
await executeQueue(queue, ctx)
|
||||||
|
|
@ -764,6 +776,7 @@ export class OpenMultiAgent {
|
||||||
retryDelayMs?: number
|
retryDelayMs?: number
|
||||||
retryBackoff?: number
|
retryBackoff?: number
|
||||||
}>,
|
}>,
|
||||||
|
options?: { abortSignal?: AbortSignal },
|
||||||
): Promise<TeamRunResult> {
|
): Promise<TeamRunResult> {
|
||||||
const agentConfigs = team.getAgents()
|
const agentConfigs = team.getAgents()
|
||||||
const queue = new TaskQueue()
|
const queue = new TaskQueue()
|
||||||
|
|
@ -794,6 +807,7 @@ export class OpenMultiAgent {
|
||||||
agentResults,
|
agentResults,
|
||||||
config: this.config,
|
config: this.config,
|
||||||
runId: this.config.onTrace ? generateRunId() : undefined,
|
runId: this.config.onTrace ? generateRunId() : undefined,
|
||||||
|
abortSignal: options?.abortSignal,
|
||||||
}
|
}
|
||||||
|
|
||||||
await executeQueue(queue, ctx)
|
await executeQueue(queue, ctx)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
import { describe, it, expect, vi } from 'vitest'
|
||||||
|
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
|
||||||
|
import { Team } from '../src/team/team.js'
|
||||||
|
|
||||||
|
describe('AbortSignal support for runTeam and runTasks', () => {
|
||||||
|
it('runTeam should accept an abortSignal option', async () => {
|
||||||
|
const orchestrator = new OpenMultiAgent({
|
||||||
|
defaultModel: 'test-model',
|
||||||
|
defaultProvider: 'openai',
|
||||||
|
})
|
||||||
|
|
||||||
|
// Verify the API accepts the option without throwing
|
||||||
|
const controller = new AbortController()
|
||||||
|
const team = new Team('test', {
|
||||||
|
name: 'test',
|
||||||
|
agents: [
|
||||||
|
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
// Abort immediately so the run won't actually execute LLM calls
|
||||||
|
controller.abort()
|
||||||
|
|
||||||
|
// runTeam should return gracefully (no unhandled rejection)
|
||||||
|
const result = await orchestrator.runTeam(team, 'test goal', {
|
||||||
|
abortSignal: controller.signal,
|
||||||
|
})
|
||||||
|
|
||||||
|
// With immediate abort, coordinator may or may not have run,
|
||||||
|
// but the function should not throw.
|
||||||
|
expect(result).toBeDefined()
|
||||||
|
expect(result.agentResults).toBeInstanceOf(Map)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('runTasks should accept an abortSignal option', async () => {
|
||||||
|
const orchestrator = new OpenMultiAgent({
|
||||||
|
defaultModel: 'test-model',
|
||||||
|
defaultProvider: 'openai',
|
||||||
|
})
|
||||||
|
|
||||||
|
const controller = new AbortController()
|
||||||
|
const team = new Team('test', {
|
||||||
|
name: 'test',
|
||||||
|
agents: [
|
||||||
|
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
controller.abort()
|
||||||
|
|
||||||
|
const result = await orchestrator.runTasks(team, [
|
||||||
|
{ title: 'task1', description: 'do something', assignee: 'agent1' },
|
||||||
|
], { abortSignal: controller.signal })
|
||||||
|
|
||||||
|
expect(result).toBeDefined()
|
||||||
|
expect(result.agentResults).toBeInstanceOf(Map)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('pre-aborted signal should skip pending tasks', async () => {
|
||||||
|
const orchestrator = new OpenMultiAgent({
|
||||||
|
defaultModel: 'test-model',
|
||||||
|
defaultProvider: 'openai',
|
||||||
|
})
|
||||||
|
|
||||||
|
const controller = new AbortController()
|
||||||
|
controller.abort()
|
||||||
|
|
||||||
|
const team = new Team('test', {
|
||||||
|
name: 'test',
|
||||||
|
agents: [
|
||||||
|
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
const result = await orchestrator.runTasks(team, [
|
||||||
|
{ title: 'task1', description: 'first', assignee: 'agent1' },
|
||||||
|
{ title: 'task2', description: 'second', assignee: 'agent1' },
|
||||||
|
], { abortSignal: controller.signal })
|
||||||
|
|
||||||
|
// No agent runs should complete since signal was already aborted
|
||||||
|
expect(result).toBeDefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('runTeam and runTasks work without abortSignal (backward compat)', async () => {
|
||||||
|
const orchestrator = new OpenMultiAgent({
|
||||||
|
defaultModel: 'test-model',
|
||||||
|
defaultProvider: 'openai',
|
||||||
|
})
|
||||||
|
|
||||||
|
const team = new Team('test', {
|
||||||
|
name: 'test',
|
||||||
|
agents: [
|
||||||
|
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
// These should not throw even without abortSignal
|
||||||
|
const promise1 = orchestrator.runTeam(team, 'goal')
|
||||||
|
const promise2 = orchestrator.runTasks(team, [
|
||||||
|
{ title: 'task1', description: 'do something', assignee: 'agent1' },
|
||||||
|
])
|
||||||
|
|
||||||
|
// Both return promises (won't resolve without real LLM, but API is correct)
|
||||||
|
expect(promise1).toBeInstanceOf(Promise)
|
||||||
|
expect(promise2).toBeInstanceOf(Promise)
|
||||||
|
})
|
||||||
|
})
|
||||||
Loading…
Reference in New Issue