diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 76e060f..16b8579 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -92,6 +92,11 @@ function buildAgent(config: AgentConfig): Agent { return new Agent(config, registry, executor) } +/** Promise-based delay for retry backoff. */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + // --------------------------------------------------------------------------- // Parsed task spec (result of coordinator decomposition) // --------------------------------------------------------------------------- @@ -101,6 +106,9 @@ interface ParsedTaskSpec { description: string assignee?: string dependsOn?: string[] + maxRetries?: number + retryDelayMs?: number + retryBackoff?: number } /** @@ -239,33 +247,54 @@ async function executeQueue( // Build the prompt: inject shared memory context + task description const prompt = await buildTaskPrompt(task, team) - try { - const result = await pool.run(assignee, prompt) - ctx.agentResults.set(`${assignee}:${task.id}`, result) + const maxAttempts = (task.maxRetries ?? 0) + 1 + const baseDelay = task.retryDelayMs ?? 1000 + const backoff = task.retryBackoff ?? 2 - if (result.success) { - // Persist result into shared memory so other agents can read it - const sharedMem = team.getSharedMemoryInstance() - if (sharedMem) { - await sharedMem.write(assignee, `task:${task.id}:result`, result.output) + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + const result = await pool.run(assignee, prompt) + ctx.agentResults.set(`${assignee}:${task.id}`, result) + + if (result.success) { + // Persist result into shared memory so other agents can read it + const sharedMem = team.getSharedMemoryInstance() + if (sharedMem) { + await sharedMem.write(assignee, `task:${task.id}:result`, result.output) + } + + queue.complete(task.id, result.output) + + config.onProgress?.({ + type: 'task_complete', + task: task.id, + agent: assignee, + data: result, + } satisfies OrchestratorEvent) + + config.onProgress?.({ + type: 'agent_complete', + agent: assignee, + task: task.id, + data: result, + } satisfies OrchestratorEvent) + break } - queue.complete(task.id, result.output) + // Agent returned success: false — treat as failure for retry purposes + if (attempt < maxAttempts) { + const delay = baseDelay * backoff ** (attempt - 1) + config.onProgress?.({ + type: 'task_retry', + task: task.id, + agent: assignee, + data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay }, + } satisfies OrchestratorEvent) + await sleep(delay) + continue + } - config.onProgress?.({ - type: 'task_complete', - task: task.id, - agent: assignee, - data: result, - } satisfies OrchestratorEvent) - - config.onProgress?.({ - type: 'agent_complete', - agent: assignee, - task: task.id, - data: result, - } satisfies OrchestratorEvent) - } else { + // All retries exhausted queue.fail(task.id, result.output) config.onProgress?.({ type: 'error', @@ -273,16 +302,30 @@ async function executeQueue( agent: assignee, data: result, } satisfies OrchestratorEvent) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + + if (attempt < maxAttempts) { + const delay = baseDelay * backoff ** (attempt - 1) + config.onProgress?.({ + type: 'task_retry', + task: task.id, + agent: assignee, + data: { attempt, maxAttempts, error: message, nextDelayMs: delay }, + } satisfies OrchestratorEvent) + await sleep(delay) + continue + } + + // All retries exhausted + queue.fail(task.id, message) + config.onProgress?.({ + type: 'error', + task: task.id, + agent: assignee, + data: err, + } satisfies OrchestratorEvent) } - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - queue.fail(task.id, message) - config.onProgress?.({ - type: 'error', - task: task.id, - agent: assignee, - data: err, - } satisfies OrchestratorEvent) } }) @@ -574,6 +617,9 @@ export class OpenMultiAgent { description: string assignee?: string dependsOn?: string[] + maxRetries?: number + retryDelayMs?: number + retryBackoff?: number }>, ): Promise { const agentConfigs = team.getAgents() @@ -586,6 +632,9 @@ export class OpenMultiAgent { description: t.description, assignee: t.assignee, dependsOn: t.dependsOn, + maxRetries: t.maxRetries, + retryDelayMs: t.retryDelayMs, + retryBackoff: t.retryBackoff, })), agentConfigs, queue, @@ -760,6 +809,9 @@ export class OpenMultiAgent { assignee: spec.assignee && agentNames.has(spec.assignee) ? spec.assignee : undefined, + maxRetries: spec.maxRetries, + retryDelayMs: spec.retryDelayMs, + retryBackoff: spec.retryBackoff, }) titleToId.set(spec.title.toLowerCase().trim(), task.id) createdTasks.push(task) diff --git a/src/task/task.ts b/src/task/task.ts index 9a11476..d74e70b 100644 --- a/src/task/task.ts +++ b/src/task/task.ts @@ -31,6 +31,9 @@ export function createTask(input: { description: string assignee?: string dependsOn?: string[] + maxRetries?: number + retryDelayMs?: number + retryBackoff?: number }): Task { const now = new Date() return { @@ -43,6 +46,9 @@ export function createTask(input: { result: undefined, createdAt: now, updatedAt: now, + maxRetries: input.maxRetries, + retryDelayMs: input.retryDelayMs, + retryBackoff: input.retryBackoff, } } diff --git a/src/types.ts b/src/types.ts index 6e76640..bd2ce64 100644 --- a/src/types.ts +++ b/src/types.ts @@ -281,6 +281,12 @@ export interface Task { result?: string readonly createdAt: Date updatedAt: Date + /** Maximum number of retry attempts on failure (default: 0 — no retry). */ + readonly maxRetries?: number + /** Base delay in ms before the first retry (default: 1000). */ + readonly retryDelayMs?: number + /** Exponential backoff multiplier (default: 2). */ + readonly retryBackoff?: number } // --------------------------------------------------------------------------- @@ -294,6 +300,7 @@ export interface OrchestratorEvent { | 'agent_complete' | 'task_start' | 'task_complete' + | 'task_retry' | 'message' | 'error' readonly agent?: string diff --git a/tests/task-retry.test.ts b/tests/task-retry.test.ts new file mode 100644 index 0000000..97f68fb --- /dev/null +++ b/tests/task-retry.test.ts @@ -0,0 +1,274 @@ +import { describe, it, expect, vi } from 'vitest' +import { z } from 'zod' +import { createTask } from '../src/task/task.js' +import { TaskQueue } from '../src/task/queue.js' +import type { Task, OrchestratorEvent, AgentRunResult, LLMResponse, LLMAdapter } from '../src/types.js' +import { Agent } from '../src/agent/agent.js' +import { AgentRunner } from '../src/agent/runner.js' +import { AgentPool } from '../src/agent/pool.js' +import { ToolRegistry } from '../src/tool/framework.js' +import { ToolExecutor } from '../src/tool/executor.js' + +// --------------------------------------------------------------------------- +// createTask: retry fields +// --------------------------------------------------------------------------- + +describe('createTask with retry fields', () => { + it('passes through retry config', () => { + const t = createTask({ + title: 'Retry task', + description: 'test', + maxRetries: 3, + retryDelayMs: 500, + retryBackoff: 1.5, + }) + expect(t.maxRetries).toBe(3) + expect(t.retryDelayMs).toBe(500) + expect(t.retryBackoff).toBe(1.5) + }) + + it('defaults retry fields to undefined', () => { + const t = createTask({ title: 'No retry', description: 'test' }) + expect(t.maxRetries).toBeUndefined() + expect(t.retryDelayMs).toBeUndefined() + expect(t.retryBackoff).toBeUndefined() + }) +}) + +// --------------------------------------------------------------------------- +// executeQueue retry integration (mock agent pool) +// --------------------------------------------------------------------------- + +/** + * Build a mock adapter that returns responses in sequence. + * Each response can be configured as success or failure. + */ +function mockAdapter(responses: Array<{ text: string; success?: boolean }>): LLMAdapter { + let callIndex = 0 + return { + name: 'mock', + async chat() { + const resp = responses[callIndex++] ?? { text: 'fallback' } + return { + id: `mock-${callIndex}`, + content: [{ type: 'text' as const, text: resp.text }], + model: 'mock-model', + stop_reason: 'end_turn', + usage: { input_tokens: 10, output_tokens: 20 }, + } satisfies LLMResponse + }, + async *stream() { /* unused */ }, + } +} + +function buildMockPool( + agentName: string, + responses: Array<{ text: string; success?: boolean }>, +): AgentPool { + const adapter = mockAdapter(responses) + const registry = new ToolRegistry() + const executor = new ToolExecutor(registry) + const config = { name: agentName, model: 'mock-model', systemPrompt: 'test' } + const agent = new Agent(config, registry, executor) + + // Inject a pre-built runner to bypass createAdapter + const runner = new AgentRunner(adapter, registry, executor, { + model: config.model, + systemPrompt: config.systemPrompt, + agentName: config.name, + }) + ;(agent as any).runner = runner + + const pool = new AgentPool(5) + pool.add(agent) + return pool +} + +/** + * Minimal re-implementation of the retry logic from executeQueue for isolated testing. + * This tests the retry pattern directly without needing the full orchestrator. + */ +async function executeTaskWithRetry( + task: Task, + pool: AgentPool, + onProgress?: (event: OrchestratorEvent) => void, +): Promise { + const maxAttempts = (task.maxRetries ?? 0) + 1 + const baseDelay = task.retryDelayMs ?? 1000 + const backoff = task.retryBackoff ?? 2 + const assignee = task.assignee! + + let lastResult: AgentRunResult | undefined + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + const result = await pool.run(assignee, task.description) + lastResult = result + + if (result.success) { + return result + } + + if (attempt < maxAttempts) { + const delay = baseDelay * backoff ** (attempt - 1) + onProgress?.({ + type: 'task_retry', + task: task.id, + agent: assignee, + data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay }, + }) + // Skip actual sleep in tests — just verify the event was emitted + continue + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + + if (attempt < maxAttempts) { + const delay = baseDelay * backoff ** (attempt - 1) + onProgress?.({ + type: 'task_retry', + task: task.id, + agent: assignee, + data: { attempt, maxAttempts, error: message, nextDelayMs: delay }, + }) + continue + } + + return { + success: false, + output: message, + messages: [], + tokenUsage: { input_tokens: 0, output_tokens: 0 }, + toolCalls: [], + } + } + } + + return lastResult! +} + +describe('Task retry logic', () => { + it('succeeds on first attempt with no retry config', async () => { + const pool = buildMockPool('worker', [{ text: 'done' }]) + const task = createTask({ + title: 'Simple', + description: 'do something', + assignee: 'worker', + }) + + const result = await executeTaskWithRetry(task, pool) + expect(result.success).toBe(true) + expect(result.output).toBe('done') + }) + + it('retries and succeeds on second attempt', async () => { + // First call returns empty text (success:true but the agent just returned text) + // We need to simulate failure. Since AgentRunner always returns success:true + // for normal text responses, let's test with multiple responses where + // the first throws and the second succeeds. + const pool = buildMockPool('worker', [ + { text: 'result1' }, + { text: 'result2' }, + ]) + + const task = createTask({ + title: 'Retry task', + description: 'do something', + assignee: 'worker', + maxRetries: 2, + retryDelayMs: 10, + retryBackoff: 1, + }) + + // Simulate first attempt failure by making pool.run throw on first call + let callCount = 0 + const originalRun = pool.run.bind(pool) + vi.spyOn(pool, 'run').mockImplementation(async (name, prompt) => { + callCount++ + if (callCount === 1) { + throw new Error('transient LLM error') + } + return originalRun(name, prompt) + }) + + const events: OrchestratorEvent[] = [] + const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + + expect(result.success).toBe(true) + expect(result.output).toBe('result1') // second adapter response (first call threw) + expect(callCount).toBe(2) + + // Verify task_retry event was emitted + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe('task_retry') + expect((events[0]!.data as any).attempt).toBe(1) + }) + + it('exhausts all retries and fails', async () => { + const pool = buildMockPool('worker', []) + + const task = createTask({ + title: 'Always fails', + description: 'do something', + assignee: 'worker', + maxRetries: 2, + retryDelayMs: 10, + retryBackoff: 1, + }) + + vi.spyOn(pool, 'run').mockRejectedValue(new Error('persistent error')) + + const events: OrchestratorEvent[] = [] + const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + + expect(result.success).toBe(false) + expect(result.output).toBe('persistent error') + + // Should have emitted 2 retry events (attempt 1 and 2), then failed on attempt 3 + expect(events).toHaveLength(2) + expect(events.every(e => e.type === 'task_retry')).toBe(true) + }) + + it('emits correct backoff delays', async () => { + const pool = buildMockPool('worker', []) + + const task = createTask({ + title: 'Backoff test', + description: 'do something', + assignee: 'worker', + maxRetries: 3, + retryDelayMs: 100, + retryBackoff: 2, + }) + + vi.spyOn(pool, 'run').mockRejectedValue(new Error('error')) + + const events: OrchestratorEvent[] = [] + await executeTaskWithRetry(task, pool, (e) => events.push(e)) + + // 3 retry events: attempts 1, 2, 3 (attempt 4 is the final failure) + expect(events).toHaveLength(3) + expect((events[0]!.data as any).nextDelayMs).toBe(100) // 100 * 2^0 + expect((events[1]!.data as any).nextDelayMs).toBe(200) // 100 * 2^1 + expect((events[2]!.data as any).nextDelayMs).toBe(400) // 100 * 2^2 + }) + + it('no retry events when maxRetries is 0', async () => { + const pool = buildMockPool('worker', []) + + const task = createTask({ + title: 'No retry', + description: 'do something', + assignee: 'worker', + maxRetries: 0, + }) + + vi.spyOn(pool, 'run').mockRejectedValue(new Error('fail')) + + const events: OrchestratorEvent[] = [] + const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + + expect(result.success).toBe(false) + expect(events).toHaveLength(0) + }) +})