feat: add AbortSignal support to runTeam() and runTasks() (#69)

Fixes #61

Thread AbortSignal from the top-level API through RunContext to
executeQueue(), enabling graceful cancellation in Express, Next.js,
serverless, and CLI scenarios.

Changes:
- Added optional  to RunContext interface
-  now accepts
-  now accepts
- executeQueue() checks signal.aborted before each dispatch round
  and skips remaining tasks when cancelled
- Signal is forwarded to coordinator's run() and per-task pool.run()
  so in-flight LLM calls are also cancelled
- Full backward compatibility: both methods work without options

The abort infrastructure already existed at lower layers
(AgentRunner, Agent, AgentPool) — this commit bridges the last gap
at the orchestrator level.

Co-authored-by: JasonOA888 <JasonOA888@users.noreply.github.com>
This commit is contained in:
Jason 2026-04-06 12:49:01 +08:00 committed by GitHub
parent d59898ce3d
commit 336d94e50d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 128 additions and 7 deletions

4
package-lock.json generated
View File

@ -1,12 +1,12 @@
{ {
"name": "@jackchen_me/open-multi-agent", "name": "@jackchen_me/open-multi-agent",
"version": "0.2.0", "version": "1.0.1",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@jackchen_me/open-multi-agent", "name": "@jackchen_me/open-multi-agent",
"version": "0.2.0", "version": "1.0.1",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.52.0", "@anthropic-ai/sdk": "^0.52.0",

View File

@ -264,6 +264,8 @@ interface RunContext {
readonly config: OrchestratorConfig readonly config: OrchestratorConfig
/** Trace run ID, present when `onTrace` is configured. */ /** Trace run ID, present when `onTrace` is configured. */
readonly runId?: string readonly runId?: string
/** AbortSignal for run-level cancellation. Checked between task dispatch rounds. */
readonly abortSignal?: AbortSignal
} }
/** /**
@ -295,6 +297,15 @@ async function executeQueue(
: undefined : undefined
while (true) { while (true) {
// Check for cancellation before each dispatch round.
if (ctx.abortSignal?.aborted) {
// Mark all remaining pending tasks as skipped.
for (const t of queue.getByStatus('pending')) {
queue.update(t.id, { status: 'skipped' as TaskStatus })
}
break
}
// Re-run auto-assignment each iteration so tasks that were unblocked since // Re-run auto-assignment each iteration so tasks that were unblocked since
// the last round (and thus have no assignee yet) get assigned before dispatch. // the last round (and thus have no assignee yet) get assigned before dispatch.
scheduler.autoAssign(queue, team.getAgents()) scheduler.autoAssign(queue, team.getAgents())
@ -360,8 +371,8 @@ async function executeQueue(
// Build trace context for this task's agent run // Build trace context for this task's agent run
const traceOptions: Partial<RunOptions> | undefined = config.onTrace const traceOptions: Partial<RunOptions> | undefined = config.onTrace
? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee } ? { onTrace: config.onTrace, runId: ctx.runId ?? '', taskId: task.id, traceAgent: assignee, abortSignal: ctx.abortSignal }
: undefined : ctx.abortSignal ? { abortSignal: ctx.abortSignal } : undefined
const taskStartMs = config.onTrace ? Date.now() : 0 const taskStartMs = config.onTrace ? Date.now() : 0
let retryCount = 0 let retryCount = 0
@ -638,7 +649,7 @@ export class OpenMultiAgent {
* @param team - A team created via {@link createTeam} (or `new Team(...)`). * @param team - A team created via {@link createTeam} (or `new Team(...)`).
* @param goal - High-level natural-language goal for the team. * @param goal - High-level natural-language goal for the team.
*/ */
async runTeam(team: Team, goal: string): Promise<TeamRunResult> { async runTeam(team: Team, goal: string, options?: { abortSignal?: AbortSignal }): Promise<TeamRunResult> {
const agentConfigs = team.getAgents() const agentConfigs = team.getAgents()
// ------------------------------------------------------------------ // ------------------------------------------------------------------
@ -665,8 +676,8 @@ export class OpenMultiAgent {
}) })
const decompTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace const decompTraceOptions: Partial<RunOptions> | undefined = this.config.onTrace
? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator' } ? { onTrace: this.config.onTrace, runId: runId ?? '', traceAgent: 'coordinator', abortSignal: options?.abortSignal }
: undefined : options?.abortSignal ? { abortSignal: options.abortSignal } : undefined
const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions) const decompositionResult = await coordinatorAgent.run(decompositionPrompt, decompTraceOptions)
const agentResults = new Map<string, AgentRunResult>() const agentResults = new Map<string, AgentRunResult>()
agentResults.set('coordinator:decompose', decompositionResult) agentResults.set('coordinator:decompose', decompositionResult)
@ -712,6 +723,7 @@ export class OpenMultiAgent {
agentResults, agentResults,
config: this.config, config: this.config,
runId, runId,
abortSignal: options?.abortSignal,
} }
await executeQueue(queue, ctx) await executeQueue(queue, ctx)
@ -764,6 +776,7 @@ export class OpenMultiAgent {
retryDelayMs?: number retryDelayMs?: number
retryBackoff?: number retryBackoff?: number
}>, }>,
options?: { abortSignal?: AbortSignal },
): Promise<TeamRunResult> { ): Promise<TeamRunResult> {
const agentConfigs = team.getAgents() const agentConfigs = team.getAgents()
const queue = new TaskQueue() const queue = new TaskQueue()
@ -794,6 +807,7 @@ export class OpenMultiAgent {
agentResults, agentResults,
config: this.config, config: this.config,
runId: this.config.onTrace ? generateRunId() : undefined, runId: this.config.onTrace ? generateRunId() : undefined,
abortSignal: options?.abortSignal,
} }
await executeQueue(queue, ctx) await executeQueue(queue, ctx)

107
tests/abort-signal.test.ts Normal file
View File

@ -0,0 +1,107 @@
import { describe, it, expect, vi } from 'vitest'
import { OpenMultiAgent } from '../src/orchestrator/orchestrator.js'
import { Team } from '../src/team/team.js'
describe('AbortSignal support for runTeam and runTasks', () => {
it('runTeam should accept an abortSignal option', async () => {
const orchestrator = new OpenMultiAgent({
defaultModel: 'test-model',
defaultProvider: 'openai',
})
// Verify the API accepts the option without throwing
const controller = new AbortController()
const team = new Team('test', {
name: 'test',
agents: [
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
],
})
// Abort immediately so the run won't actually execute LLM calls
controller.abort()
// runTeam should return gracefully (no unhandled rejection)
const result = await orchestrator.runTeam(team, 'test goal', {
abortSignal: controller.signal,
})
// With immediate abort, coordinator may or may not have run,
// but the function should not throw.
expect(result).toBeDefined()
expect(result.agentResults).toBeInstanceOf(Map)
})
it('runTasks should accept an abortSignal option', async () => {
const orchestrator = new OpenMultiAgent({
defaultModel: 'test-model',
defaultProvider: 'openai',
})
const controller = new AbortController()
const team = new Team('test', {
name: 'test',
agents: [
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
],
})
controller.abort()
const result = await orchestrator.runTasks(team, [
{ title: 'task1', description: 'do something', assignee: 'agent1' },
], { abortSignal: controller.signal })
expect(result).toBeDefined()
expect(result.agentResults).toBeInstanceOf(Map)
})
it('pre-aborted signal should skip pending tasks', async () => {
const orchestrator = new OpenMultiAgent({
defaultModel: 'test-model',
defaultProvider: 'openai',
})
const controller = new AbortController()
controller.abort()
const team = new Team('test', {
name: 'test',
agents: [
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
],
})
const result = await orchestrator.runTasks(team, [
{ title: 'task1', description: 'first', assignee: 'agent1' },
{ title: 'task2', description: 'second', assignee: 'agent1' },
], { abortSignal: controller.signal })
// No agent runs should complete since signal was already aborted
expect(result).toBeDefined()
})
it('runTeam and runTasks work without abortSignal (backward compat)', async () => {
const orchestrator = new OpenMultiAgent({
defaultModel: 'test-model',
defaultProvider: 'openai',
})
const team = new Team('test', {
name: 'test',
agents: [
{ name: 'agent1', model: 'test-model', systemPrompt: 'test' },
],
})
// These should not throw even without abortSignal
const promise1 = orchestrator.runTeam(team, 'goal')
const promise2 = orchestrator.runTasks(team, [
{ title: 'task1', description: 'do something', assignee: 'agent1' },
])
// Both return promises (won't resolve without real LLM, but API is correct)
expect(promise1).toBeInstanceOf(Promise)
expect(promise2).toBeInstanceOf(Promise)
})
})