diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index e04aa2c..0acedb6 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -306,6 +306,8 @@ async function executeQueue( } // Track tasks that complete successfully in this round for the approval gate. + // Safe to push from concurrent promises: JS is single-threaded, so + // Array.push calls from resolved microtasks never interleave. const completedThisRound: Task[] = [] // Dispatch all currently-pending tasks as a parallel batch. diff --git a/src/task/queue.ts b/src/task/queue.ts index c5f6a17..df2c26a 100644 --- a/src/task/queue.ts +++ b/src/task/queue.ts @@ -161,7 +161,9 @@ export class TaskQueue { * Marks `taskId` as `'skipped'` and records `reason` in the `result` field. * * Fires `'task:skipped'` for the skipped task and cascades to every - * downstream task that transitively depended on it. + * downstream task that transitively depended on it — even if the dependent + * has other dependencies that are still pending or completed. A skipped + * upstream is treated as permanently unsatisfiable, mirroring `fail()`. * * @throws {Error} when `taskId` is not found. */ @@ -180,6 +182,11 @@ export class TaskQueue { * * Used when an approval gate rejects continuation — every pending, blocked, * or in-progress task is skipped with the given reason. + * + * **Important:** Call only when no tasks are actively executing. The + * orchestrator invokes this after `await Promise.all()`, so no tasks are + * in-flight. Calling while agents are running may mark an in-progress task + * as skipped while its agent continues executing. */ skipRemaining(reason = 'Skipped: approval rejected.'): void { // Snapshot first — update() mutates the live map, which is unsafe to diff --git a/src/types.ts b/src/types.ts index e49f8a0..df7fa3a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -313,7 +313,12 @@ export interface Task { // Orchestrator // --------------------------------------------------------------------------- -/** Progress event emitted by the orchestrator during a run. */ +/** + * Progress event emitted by the orchestrator during a run. + * + * **v0.3 addition:** `'task_skipped'` — consumers with exhaustive switches + * on `type` will need to add a case for this variant. + */ export interface OrchestratorEvent { readonly type: | 'agent_start' @@ -346,7 +351,13 @@ export interface OrchestratorConfig { * to start next. Return `true` to continue or `false` to abort — * remaining tasks will be marked `'skipped'`. * - * Not called after the final round (when no tasks remain to start). + * Not called when: + * - No tasks succeeded in the round (all failed). + * - No pending tasks remain after the round (final batch). + * + * **Note:** Do not mutate the {@link Task} objects passed to this + * callback — they are live references to queue state. Mutation is + * undefined behavior. */ readonly onApproval?: (completedTasks: readonly Task[], nextTasks: readonly Task[]) => Promise } diff --git a/tests/approval.test.ts b/tests/approval.test.ts index a9ddfb4..ae82f2d 100644 --- a/tests/approval.test.ts +++ b/tests/approval.test.ts @@ -353,4 +353,112 @@ describe('onApproval integration', () => { const titles = completedTasks.map((t: Task) => t.title).sort() expect(titles).toEqual(['task-1', 'task-2']) }) + + it('single batch with no second round — callback never fires', async () => { + const approvalSpy = vi.fn().mockResolvedValue(true) + const { orchestrator, team } = setup(approvalSpy) + + const result = await orchestrator.runTasks(team, [ + { title: 'task-1', description: 'first', assignee: 'agent-a' }, + { title: 'task-2', description: 'second', assignee: 'agent-b' }, + ]) + + expect(result.success).toBe(true) + // No second round → callback never called + expect(approvalSpy).not.toHaveBeenCalled() + }) + + it('mixed success/failure in batch — completedTasks only contains succeeded tasks', async () => { + const approvalSpy = vi.fn().mockResolvedValue(true) + const agentA: AgentConfig = { name: 'agent-a', model: 'mock', systemPrompt: 'A' } + const agentB: AgentConfig = { name: 'agent-b', model: 'mock', systemPrompt: 'B' } + const agentC: AgentConfig = { name: 'agent-c', model: 'mock', systemPrompt: 'C' } + + const orchestrator = new OpenMultiAgent({ + defaultModel: 'mock', + onApproval: approvalSpy, + }) + + const team = orchestrator.createTeam('test', { + name: 'test', + agents: [agentA, agentB, agentC], + }) + + const mockAgents = new Map() + mockAgents.set('agent-a', buildMockAgent(agentA, 'A done')) + mockAgents.set('agent-b', buildMockAgent(agentB, 'B done')) + mockAgents.set('agent-c', buildMockAgent(agentC, 'C done')) + + // Patch buildPool so that pool.run for agent-b returns a failure result + ;(orchestrator as any).buildPool = () => { + const pool = new AgentPool(5) + for (const [, agent] of mockAgents) pool.add(agent) + const originalRun = pool.run.bind(pool) + pool.run = async (agentName: string, prompt: string, opts?: any) => { + if (agentName === 'agent-b') { + return { + success: false, + output: 'simulated failure', + messages: [], + tokenUsage: { input_tokens: 0, output_tokens: 0 }, + toolCalls: [], + } + } + return originalRun(agentName, prompt, opts) + } + return pool + } + + // task-1 (success) and task-2 (fail) run in parallel, task-3 depends on task-1 + await orchestrator.runTasks(team, [ + { title: 'task-1', description: 'first', assignee: 'agent-a' }, + { title: 'task-2', description: 'second', assignee: 'agent-b' }, + { title: 'task-3', description: 'third', assignee: 'agent-c', dependsOn: ['task-1'] }, + ]) + + expect(approvalSpy).toHaveBeenCalledTimes(1) + const completedTasks = approvalSpy.mock.calls[0][0] as Task[] + // Only task-1 succeeded — task-2 failed, so it should not appear + expect(completedTasks).toHaveLength(1) + expect(completedTasks[0].title).toBe('task-1') + expect(completedTasks[0].status).toBe('completed') + }) + + it('onProgress receives task_skipped events when approval is rejected', async () => { + const progressSpy = vi.fn() + const agentA: AgentConfig = { name: 'agent-a', model: 'mock', systemPrompt: 'A' } + const agentB: AgentConfig = { name: 'agent-b', model: 'mock', systemPrompt: 'B' } + + const orchestrator = new OpenMultiAgent({ + defaultModel: 'mock', + onApproval: vi.fn().mockResolvedValue(false), + onProgress: progressSpy, + }) + + const team = orchestrator.createTeam('test', { + name: 'test', + agents: [agentA, agentB], + }) + + const mockAgents = new Map() + mockAgents.set('agent-a', buildMockAgent(agentA, 'A done')) + mockAgents.set('agent-b', buildMockAgent(agentB, 'B done')) + ;(orchestrator as any).buildPool = () => { + const pool = new AgentPool(5) + for (const [, agent] of mockAgents) pool.add(agent) + return pool + } + + await orchestrator.runTasks(team, [ + { title: 'task-1', description: 'first', assignee: 'agent-a' }, + { title: 'task-2', description: 'second', assignee: 'agent-b', dependsOn: ['task-1'] }, + ]) + + const skippedEvents = progressSpy.mock.calls + .map((c: any) => c[0]) + .filter((e: any) => e.type === 'task_skipped') + + expect(skippedEvents).toHaveLength(1) + expect(skippedEvents[0].data.status).toBe('skipped') + }) })