diff --git a/src/index.ts b/src/index.ts index fb8b6bf..f624707 100644 --- a/src/index.ts +++ b/src/index.ts @@ -54,7 +54,7 @@ // Orchestrator (primary entry point) // --------------------------------------------------------------------------- -export { OpenMultiAgent } from './orchestrator/orchestrator.js' +export { OpenMultiAgent, executeWithRetry, computeRetryDelay } from './orchestrator/orchestrator.js' export { Scheduler } from './orchestrator/scheduler.js' export type { SchedulingStrategy } from './orchestrator/scheduler.js' diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 16b8579..db6687f 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -92,11 +92,96 @@ function buildAgent(config: AgentConfig): Agent { return new Agent(config, registry, executor) } -/** Promise-based delay for retry backoff. */ +/** Promise-based delay. */ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) } +/** Maximum delay cap to prevent runaway exponential backoff (30 seconds). */ +const MAX_RETRY_DELAY_MS = 30_000 + +/** + * Compute the retry delay for a given attempt, capped at {@link MAX_RETRY_DELAY_MS}. + */ +export function computeRetryDelay( + baseDelay: number, + backoff: number, + attempt: number, +): number { + return Math.min(baseDelay * backoff ** (attempt - 1), MAX_RETRY_DELAY_MS) +} + +/** + * Execute an agent task with optional retry and exponential backoff. + * + * Exported for testability — called internally by {@link executeQueue}. + * + * @param run - The function that executes the task (typically `pool.run`). + * @param task - The task to execute (retry config read from its fields). + * @param onRetry - Called before each retry sleep with event data. + * @param delayFn - Injectable delay function (defaults to real `sleep`). + * @returns The final {@link AgentRunResult} from the last attempt. + */ +export async function executeWithRetry( + run: () => Promise, + task: Task, + onRetry?: (data: { attempt: number; maxAttempts: number; error: string; nextDelayMs: number }) => void, + delayFn: (ms: number) => Promise = sleep, +): Promise { + const maxAttempts = (task.maxRetries ?? 0) + 1 + const baseDelay = task.retryDelayMs ?? 1000 + const backoff = task.retryBackoff ?? 2 + + let lastError: string = '' + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + const result = await run() + if (result.success) { + return result + } + lastError = result.output + + // Failure — retry or give up + if (attempt < maxAttempts) { + const delay = computeRetryDelay(baseDelay, backoff, attempt) + onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay }) + await delayFn(delay) + continue + } + + return result + } catch (err) { + lastError = err instanceof Error ? err.message : String(err) + + if (attempt < maxAttempts) { + const delay = computeRetryDelay(baseDelay, backoff, attempt) + onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay }) + await delayFn(delay) + continue + } + + // All retries exhausted — return a failure result + return { + success: false, + output: lastError, + messages: [], + tokenUsage: { input_tokens: 0, output_tokens: 0 }, + toolCalls: [], + } + } + } + + // Should not be reached, but TypeScript needs a return + return { + success: false, + output: lastError, + messages: [], + tokenUsage: { input_tokens: 0, output_tokens: 0 }, + toolCalls: [], + } +} + // --------------------------------------------------------------------------- // Parsed task spec (result of coordinator decomposition) // --------------------------------------------------------------------------- @@ -106,9 +191,6 @@ interface ParsedTaskSpec { description: string assignee?: string dependsOn?: string[] - maxRetries?: number - retryDelayMs?: number - retryBackoff?: number } /** @@ -247,85 +329,51 @@ async function executeQueue( // Build the prompt: inject shared memory context + task description const prompt = await buildTaskPrompt(task, team) - const maxAttempts = (task.maxRetries ?? 0) + 1 - const baseDelay = task.retryDelayMs ?? 1000 - const backoff = task.retryBackoff ?? 2 - - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - const result = await pool.run(assignee, prompt) - ctx.agentResults.set(`${assignee}:${task.id}`, result) - - if (result.success) { - // Persist result into shared memory so other agents can read it - const sharedMem = team.getSharedMemoryInstance() - if (sharedMem) { - await sharedMem.write(assignee, `task:${task.id}:result`, result.output) - } - - queue.complete(task.id, result.output) - - config.onProgress?.({ - type: 'task_complete', - task: task.id, - agent: assignee, - data: result, - } satisfies OrchestratorEvent) - - config.onProgress?.({ - type: 'agent_complete', - agent: assignee, - task: task.id, - data: result, - } satisfies OrchestratorEvent) - break - } - - // Agent returned success: false — treat as failure for retry purposes - if (attempt < maxAttempts) { - const delay = baseDelay * backoff ** (attempt - 1) - config.onProgress?.({ - type: 'task_retry', - task: task.id, - agent: assignee, - data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay }, - } satisfies OrchestratorEvent) - await sleep(delay) - continue - } - - // All retries exhausted - queue.fail(task.id, result.output) + const result = await executeWithRetry( + () => pool.run(assignee, prompt), + task, + (retryData) => { config.onProgress?.({ - type: 'error', + type: 'task_retry', task: task.id, agent: assignee, - data: result, + data: retryData, } satisfies OrchestratorEvent) - } catch (err) { - const message = err instanceof Error ? err.message : String(err) + }, + ) - if (attempt < maxAttempts) { - const delay = baseDelay * backoff ** (attempt - 1) - config.onProgress?.({ - type: 'task_retry', - task: task.id, - agent: assignee, - data: { attempt, maxAttempts, error: message, nextDelayMs: delay }, - } satisfies OrchestratorEvent) - await sleep(delay) - continue - } + ctx.agentResults.set(`${assignee}:${task.id}`, result) - // All retries exhausted - queue.fail(task.id, message) - config.onProgress?.({ - type: 'error', - task: task.id, - agent: assignee, - data: err, - } satisfies OrchestratorEvent) + if (result.success) { + // Persist result into shared memory so other agents can read it + const sharedMem = team.getSharedMemoryInstance() + if (sharedMem) { + await sharedMem.write(assignee, `task:${task.id}:result`, result.output) } + + queue.complete(task.id, result.output) + + config.onProgress?.({ + type: 'task_complete', + task: task.id, + agent: assignee, + data: result, + } satisfies OrchestratorEvent) + + config.onProgress?.({ + type: 'agent_complete', + agent: assignee, + task: task.id, + data: result, + } satisfies OrchestratorEvent) + } else { + queue.fail(task.id, result.output) + config.onProgress?.({ + type: 'error', + task: task.id, + agent: assignee, + data: result, + } satisfies OrchestratorEvent) } }) @@ -792,7 +840,11 @@ export class OpenMultiAgent { * then resolving them to real IDs before adding tasks to the queue. */ private loadSpecsIntoQueue( - specs: ReadonlyArray, + specs: ReadonlyArray, agentConfigs: AgentConfig[], queue: TaskQueue, ): void { diff --git a/tests/task-retry.test.ts b/tests/task-retry.test.ts index 97f68fb..23172d1 100644 --- a/tests/task-retry.test.ts +++ b/tests/task-retry.test.ts @@ -1,13 +1,52 @@ import { describe, it, expect, vi } from 'vitest' -import { z } from 'zod' import { createTask } from '../src/task/task.js' -import { TaskQueue } from '../src/task/queue.js' -import type { Task, OrchestratorEvent, AgentRunResult, LLMResponse, LLMAdapter } from '../src/types.js' -import { Agent } from '../src/agent/agent.js' -import { AgentRunner } from '../src/agent/runner.js' -import { AgentPool } from '../src/agent/pool.js' -import { ToolRegistry } from '../src/tool/framework.js' -import { ToolExecutor } from '../src/tool/executor.js' +import { executeWithRetry, computeRetryDelay } from '../src/orchestrator/orchestrator.js' +import type { AgentRunResult } from '../src/types.js' + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const SUCCESS_RESULT: AgentRunResult = { + success: true, + output: 'done', + messages: [], + tokenUsage: { input_tokens: 10, output_tokens: 20 }, + toolCalls: [], +} + +const FAILURE_RESULT: AgentRunResult = { + success: false, + output: 'agent failed', + messages: [], + tokenUsage: { input_tokens: 10, output_tokens: 20 }, + toolCalls: [], +} + +/** No-op delay for tests. */ +const noDelay = () => Promise.resolve() + +// --------------------------------------------------------------------------- +// computeRetryDelay +// --------------------------------------------------------------------------- + +describe('computeRetryDelay', () => { + it('computes exponential backoff', () => { + expect(computeRetryDelay(1000, 2, 1)).toBe(1000) // 1000 * 2^0 + expect(computeRetryDelay(1000, 2, 2)).toBe(2000) // 1000 * 2^1 + expect(computeRetryDelay(1000, 2, 3)).toBe(4000) // 1000 * 2^2 + }) + + it('caps at 30 seconds', () => { + // 1000 * 2^20 = 1,048,576,000 — way over cap + expect(computeRetryDelay(1000, 2, 21)).toBe(30_000) + }) + + it('handles backoff of 1 (constant delay)', () => { + expect(computeRetryDelay(500, 1, 1)).toBe(500) + expect(computeRetryDelay(500, 1, 5)).toBe(500) + }) +}) // --------------------------------------------------------------------------- // createTask: retry fields @@ -36,239 +75,204 @@ describe('createTask with retry fields', () => { }) // --------------------------------------------------------------------------- -// executeQueue retry integration (mock agent pool) +// executeWithRetry — tests the real exported function // --------------------------------------------------------------------------- -/** - * Build a mock adapter that returns responses in sequence. - * Each response can be configured as success or failure. - */ -function mockAdapter(responses: Array<{ text: string; success?: boolean }>): LLMAdapter { - let callIndex = 0 - return { - name: 'mock', - async chat() { - const resp = responses[callIndex++] ?? { text: 'fallback' } - return { - id: `mock-${callIndex}`, - content: [{ type: 'text' as const, text: resp.text }], - model: 'mock-model', - stop_reason: 'end_turn', - usage: { input_tokens: 10, output_tokens: 20 }, - } satisfies LLMResponse - }, - async *stream() { /* unused */ }, - } -} - -function buildMockPool( - agentName: string, - responses: Array<{ text: string; success?: boolean }>, -): AgentPool { - const adapter = mockAdapter(responses) - const registry = new ToolRegistry() - const executor = new ToolExecutor(registry) - const config = { name: agentName, model: 'mock-model', systemPrompt: 'test' } - const agent = new Agent(config, registry, executor) - - // Inject a pre-built runner to bypass createAdapter - const runner = new AgentRunner(adapter, registry, executor, { - model: config.model, - systemPrompt: config.systemPrompt, - agentName: config.name, - }) - ;(agent as any).runner = runner - - const pool = new AgentPool(5) - pool.add(agent) - return pool -} - -/** - * Minimal re-implementation of the retry logic from executeQueue for isolated testing. - * This tests the retry pattern directly without needing the full orchestrator. - */ -async function executeTaskWithRetry( - task: Task, - pool: AgentPool, - onProgress?: (event: OrchestratorEvent) => void, -): Promise { - const maxAttempts = (task.maxRetries ?? 0) + 1 - const baseDelay = task.retryDelayMs ?? 1000 - const backoff = task.retryBackoff ?? 2 - const assignee = task.assignee! - - let lastResult: AgentRunResult | undefined - - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - const result = await pool.run(assignee, task.description) - lastResult = result - - if (result.success) { - return result - } - - if (attempt < maxAttempts) { - const delay = baseDelay * backoff ** (attempt - 1) - onProgress?.({ - type: 'task_retry', - task: task.id, - agent: assignee, - data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay }, - }) - // Skip actual sleep in tests — just verify the event was emitted - continue - } - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - - if (attempt < maxAttempts) { - const delay = baseDelay * backoff ** (attempt - 1) - onProgress?.({ - type: 'task_retry', - task: task.id, - agent: assignee, - data: { attempt, maxAttempts, error: message, nextDelayMs: delay }, - }) - continue - } - - return { - success: false, - output: message, - messages: [], - tokenUsage: { input_tokens: 0, output_tokens: 0 }, - toolCalls: [], - } - } - } - - return lastResult! -} - -describe('Task retry logic', () => { +describe('executeWithRetry', () => { it('succeeds on first attempt with no retry config', async () => { - const pool = buildMockPool('worker', [{ text: 'done' }]) - const task = createTask({ - title: 'Simple', - description: 'do something', - assignee: 'worker', - }) + const run = vi.fn().mockResolvedValue(SUCCESS_RESULT) + const task = createTask({ title: 'Simple', description: 'test' }) + + const result = await executeWithRetry(run, task, undefined, noDelay) - const result = await executeTaskWithRetry(task, pool) expect(result.success).toBe(true) expect(result.output).toBe('done') + expect(run).toHaveBeenCalledTimes(1) }) - it('retries and succeeds on second attempt', async () => { - // First call returns empty text (success:true but the agent just returned text) - // We need to simulate failure. Since AgentRunner always returns success:true - // for normal text responses, let's test with multiple responses where - // the first throws and the second succeeds. - const pool = buildMockPool('worker', [ - { text: 'result1' }, - { text: 'result2' }, - ]) + it('succeeds on first attempt even when maxRetries > 0', async () => { + const run = vi.fn().mockResolvedValue(SUCCESS_RESULT) + const task = createTask({ + title: 'Has retries', + description: 'test', + maxRetries: 3, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + expect(run).toHaveBeenCalledTimes(1) + }) + + it('retries on exception and succeeds on second attempt', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('transient error')) + .mockResolvedValueOnce(SUCCESS_RESULT) const task = createTask({ title: 'Retry task', - description: 'do something', - assignee: 'worker', + description: 'test', maxRetries: 2, - retryDelayMs: 10, - retryBackoff: 1, + retryDelayMs: 100, + retryBackoff: 2, }) - // Simulate first attempt failure by making pool.run throw on first call - let callCount = 0 - const originalRun = pool.run.bind(pool) - vi.spyOn(pool, 'run').mockImplementation(async (name, prompt) => { - callCount++ - if (callCount === 1) { - throw new Error('transient LLM error') - } - return originalRun(name, prompt) - }) - - const events: OrchestratorEvent[] = [] - const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) expect(result.success).toBe(true) - expect(result.output).toBe('result1') // second adapter response (first call threw) - expect(callCount).toBe(2) - - // Verify task_retry event was emitted - expect(events).toHaveLength(1) - expect(events[0]!.type).toBe('task_retry') - expect((events[0]!.data as any).attempt).toBe(1) + expect(run).toHaveBeenCalledTimes(2) + expect(retryEvents).toHaveLength(1) + expect(retryEvents[0]).toEqual({ + attempt: 1, + maxAttempts: 3, + error: 'transient error', + nextDelayMs: 100, // 100 * 2^0 + }) }) - it('exhausts all retries and fails', async () => { - const pool = buildMockPool('worker', []) + it('retries on success:false and succeeds on second attempt', async () => { + const run = vi.fn() + .mockResolvedValueOnce(FAILURE_RESULT) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Retry task', + description: 'test', + maxRetries: 1, + retryDelayMs: 50, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + expect(run).toHaveBeenCalledTimes(2) + }) + + it('exhausts all retries on persistent exception', async () => { + const run = vi.fn().mockRejectedValue(new Error('persistent error')) const task = createTask({ title: 'Always fails', - description: 'do something', - assignee: 'worker', + description: 'test', maxRetries: 2, retryDelayMs: 10, retryBackoff: 1, }) - vi.spyOn(pool, 'run').mockRejectedValue(new Error('persistent error')) - - const events: OrchestratorEvent[] = [] - const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) expect(result.success).toBe(false) expect(result.output).toBe('persistent error') - - // Should have emitted 2 retry events (attempt 1 and 2), then failed on attempt 3 - expect(events).toHaveLength(2) - expect(events.every(e => e.type === 'task_retry')).toBe(true) + expect(run).toHaveBeenCalledTimes(3) // 1 initial + 2 retries + expect(retryEvents).toHaveLength(2) }) - it('emits correct backoff delays', async () => { - const pool = buildMockPool('worker', []) + it('exhausts all retries on persistent success:false', async () => { + const run = vi.fn().mockResolvedValue(FAILURE_RESULT) + + const task = createTask({ + title: 'Always fails', + description: 'test', + maxRetries: 1, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(false) + expect(result.output).toBe('agent failed') + expect(run).toHaveBeenCalledTimes(2) + }) + + it('emits correct exponential backoff delays', async () => { + const run = vi.fn().mockRejectedValue(new Error('error')) const task = createTask({ title: 'Backoff test', - description: 'do something', - assignee: 'worker', + description: 'test', maxRetries: 3, retryDelayMs: 100, retryBackoff: 2, }) - vi.spyOn(pool, 'run').mockRejectedValue(new Error('error')) + const retryEvents: Array<{ nextDelayMs: number }> = [] + await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) - const events: OrchestratorEvent[] = [] - await executeTaskWithRetry(task, pool, (e) => events.push(e)) - - // 3 retry events: attempts 1, 2, 3 (attempt 4 is the final failure) - expect(events).toHaveLength(3) - expect((events[0]!.data as any).nextDelayMs).toBe(100) // 100 * 2^0 - expect((events[1]!.data as any).nextDelayMs).toBe(200) // 100 * 2^1 - expect((events[2]!.data as any).nextDelayMs).toBe(400) // 100 * 2^2 + expect(retryEvents).toHaveLength(3) + expect(retryEvents[0]!.nextDelayMs).toBe(100) // 100 * 2^0 + expect(retryEvents[1]!.nextDelayMs).toBe(200) // 100 * 2^1 + expect(retryEvents[2]!.nextDelayMs).toBe(400) // 100 * 2^2 }) - it('no retry events when maxRetries is 0', async () => { - const pool = buildMockPool('worker', []) + it('no retry events when maxRetries is 0 (default)', async () => { + const run = vi.fn().mockRejectedValue(new Error('fail')) + const task = createTask({ title: 'No retry', description: 'test' }) - const task = createTask({ - title: 'No retry', - description: 'do something', - assignee: 'worker', - maxRetries: 0, - }) - - vi.spyOn(pool, 'run').mockRejectedValue(new Error('fail')) - - const events: OrchestratorEvent[] = [] - const result = await executeTaskWithRetry(task, pool, (e) => events.push(e)) + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) expect(result.success).toBe(false) - expect(events).toHaveLength(0) + expect(run).toHaveBeenCalledTimes(1) + expect(retryEvents).toHaveLength(0) + }) + + it('calls the delay function with computed delay', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Delay test', + description: 'test', + maxRetries: 1, + retryDelayMs: 250, + retryBackoff: 3, + }) + + const mockDelay = vi.fn().mockResolvedValue(undefined) + await executeWithRetry(run, task, undefined, mockDelay) + + expect(mockDelay).toHaveBeenCalledTimes(1) + expect(mockDelay).toHaveBeenCalledWith(250) // 250 * 3^0 + }) + + it('caps delay at 30 seconds', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Cap test', + description: 'test', + maxRetries: 1, + retryDelayMs: 50_000, + retryBackoff: 2, + }) + + const mockDelay = vi.fn().mockResolvedValue(undefined) + await executeWithRetry(run, task, undefined, mockDelay) + + expect(mockDelay).toHaveBeenCalledWith(30_000) // capped }) })