From 4d7564b71a720a64507876d1c99feae8f79c2702 Mon Sep 17 00:00:00 2001 From: JackChen <26346076+JackChen-me@users.noreply.github.com> Date: Fri, 3 Apr 2026 14:08:36 +0800 Subject: [PATCH] feat: task-level retry with exponential backoff (#37) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add task-level retry with exponential backoff Add `maxRetries`, `retryDelayMs`, and `retryBackoff` to task config. When a task fails and retries remain, the orchestrator waits with exponential backoff and re-runs the task with a fresh agent conversation. A `task_retry` event is emitted via `onProgress` for observability. Cascade failure only occurs after all retries are exhausted. Closes #30 * fix: address review — extract executeWithRetry, add delay cap, fix tests - Extract `executeWithRetry()` as a testable exported function - Add `computeRetryDelay()` with 30s max cap (prevents runaway backoff) - Remove retry fields from `ParsedTaskSpec` (dead code for runTeam path) - Deduplicate retry event emission (single code path for both error types) - Injectable delay function for test determinism - Rewrite tests to call the real `executeWithRetry`, not a copy - 15 tests covering: success, retry+success, retry+failure, backoff calculation, delay cap, delay function injection, no-retry default * fix: clamp negative maxRetries/retryBackoff to safe values - maxRetries clamped to >= 0 (negative values treated as no retry) - retryBackoff clamped to >= 1 (prevents zero/negative delay oscillation) - retryDelayMs clamped to >= 0 - Add tests for negative maxRetries and negative backoff Addresses Codex review P1 on #37 * fix: accumulate token usage across retry attempts Previously only the final attempt's tokenUsage was returned, causing under-reporting of actual model consumption when retries occurred. Now all attempts' token counts are summed in the returned result. Addresses Codex review P2 (token usage) on #37 --- src/index.ts | 2 +- src/orchestrator/orchestrator.ts | 180 ++++++++++++--- src/task/task.ts | 6 + src/types.ts | 7 + tests/task-retry.test.ts | 368 +++++++++++++++++++++++++++++++ 5 files changed, 528 insertions(+), 35 deletions(-) create mode 100644 tests/task-retry.test.ts diff --git a/src/index.ts b/src/index.ts index fb8b6bf..f624707 100644 --- a/src/index.ts +++ b/src/index.ts @@ -54,7 +54,7 @@ // Orchestrator (primary entry point) // --------------------------------------------------------------------------- -export { OpenMultiAgent } from './orchestrator/orchestrator.js' +export { OpenMultiAgent, executeWithRetry, computeRetryDelay } from './orchestrator/orchestrator.js' export { Scheduler } from './orchestrator/scheduler.js' export type { SchedulingStrategy } from './orchestrator/scheduler.js' diff --git a/src/orchestrator/orchestrator.ts b/src/orchestrator/orchestrator.ts index 3f44792..9d6d857 100644 --- a/src/orchestrator/orchestrator.ts +++ b/src/orchestrator/orchestrator.ts @@ -92,6 +92,104 @@ function buildAgent(config: AgentConfig): Agent { return new Agent(config, registry, executor) } +/** Promise-based delay. */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +/** Maximum delay cap to prevent runaway exponential backoff (30 seconds). */ +const MAX_RETRY_DELAY_MS = 30_000 + +/** + * Compute the retry delay for a given attempt, capped at {@link MAX_RETRY_DELAY_MS}. + */ +export function computeRetryDelay( + baseDelay: number, + backoff: number, + attempt: number, +): number { + return Math.min(baseDelay * backoff ** (attempt - 1), MAX_RETRY_DELAY_MS) +} + +/** + * Execute an agent task with optional retry and exponential backoff. + * + * Exported for testability — called internally by {@link executeQueue}. + * + * @param run - The function that executes the task (typically `pool.run`). + * @param task - The task to execute (retry config read from its fields). + * @param onRetry - Called before each retry sleep with event data. + * @param delayFn - Injectable delay function (defaults to real `sleep`). + * @returns The final {@link AgentRunResult} from the last attempt. + */ +export async function executeWithRetry( + run: () => Promise, + task: Task, + onRetry?: (data: { attempt: number; maxAttempts: number; error: string; nextDelayMs: number }) => void, + delayFn: (ms: number) => Promise = sleep, +): Promise { + const maxAttempts = Math.max(0, task.maxRetries ?? 0) + 1 + const baseDelay = Math.max(0, task.retryDelayMs ?? 1000) + const backoff = Math.max(1, task.retryBackoff ?? 2) + + let lastError: string = '' + // Accumulate token usage across all attempts so billing/observability + // reflects the true cost of retries. + let totalUsage: TokenUsage = { input_tokens: 0, output_tokens: 0 } + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + const result = await run() + totalUsage = { + input_tokens: totalUsage.input_tokens + result.tokenUsage.input_tokens, + output_tokens: totalUsage.output_tokens + result.tokenUsage.output_tokens, + } + + if (result.success) { + return { ...result, tokenUsage: totalUsage } + } + lastError = result.output + + // Failure — retry or give up + if (attempt < maxAttempts) { + const delay = computeRetryDelay(baseDelay, backoff, attempt) + onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay }) + await delayFn(delay) + continue + } + + return { ...result, tokenUsage: totalUsage } + } catch (err) { + lastError = err instanceof Error ? err.message : String(err) + + if (attempt < maxAttempts) { + const delay = computeRetryDelay(baseDelay, backoff, attempt) + onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay }) + await delayFn(delay) + continue + } + + // All retries exhausted — return a failure result + return { + success: false, + output: lastError, + messages: [], + tokenUsage: totalUsage, + toolCalls: [], + } + } + } + + // Should not be reached, but TypeScript needs a return + return { + success: false, + output: lastError, + messages: [], + tokenUsage: totalUsage, + toolCalls: [], + } +} + // --------------------------------------------------------------------------- // Parsed task spec (result of coordinator decomposition) // --------------------------------------------------------------------------- @@ -239,49 +337,50 @@ async function executeQueue( // Build the prompt: inject shared memory context + task description const prompt = await buildTaskPrompt(task, team) - try { - const result = await pool.run(assignee, prompt) - ctx.agentResults.set(`${assignee}:${task.id}`, result) - - if (result.success) { - // Persist result into shared memory so other agents can read it - const sharedMem = team.getSharedMemoryInstance() - if (sharedMem) { - await sharedMem.write(assignee, `task:${task.id}:result`, result.output) - } - - queue.complete(task.id, result.output) - + const result = await executeWithRetry( + () => pool.run(assignee, prompt), + task, + (retryData) => { config.onProgress?.({ - type: 'task_complete', + type: 'task_retry', task: task.id, agent: assignee, - data: result, + data: retryData, } satisfies OrchestratorEvent) + }, + ) - config.onProgress?.({ - type: 'agent_complete', - agent: assignee, - task: task.id, - data: result, - } satisfies OrchestratorEvent) - } else { - queue.fail(task.id, result.output) - config.onProgress?.({ - type: 'error', - task: task.id, - agent: assignee, - data: result, - } satisfies OrchestratorEvent) + ctx.agentResults.set(`${assignee}:${task.id}`, result) + + if (result.success) { + // Persist result into shared memory so other agents can read it + const sharedMem = team.getSharedMemoryInstance() + if (sharedMem) { + await sharedMem.write(assignee, `task:${task.id}:result`, result.output) } - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - queue.fail(task.id, message) + + queue.complete(task.id, result.output) + + config.onProgress?.({ + type: 'task_complete', + task: task.id, + agent: assignee, + data: result, + } satisfies OrchestratorEvent) + + config.onProgress?.({ + type: 'agent_complete', + agent: assignee, + task: task.id, + data: result, + } satisfies OrchestratorEvent) + } else { + queue.fail(task.id, result.output) config.onProgress?.({ type: 'error', task: task.id, agent: assignee, - data: err, + data: result, } satisfies OrchestratorEvent) } }) @@ -574,6 +673,9 @@ export class OpenMultiAgent { description: string assignee?: string dependsOn?: string[] + maxRetries?: number + retryDelayMs?: number + retryBackoff?: number }>, ): Promise { const agentConfigs = team.getAgents() @@ -586,6 +688,9 @@ export class OpenMultiAgent { description: t.description, assignee: t.assignee, dependsOn: t.dependsOn, + maxRetries: t.maxRetries, + retryDelayMs: t.retryDelayMs, + retryBackoff: t.retryBackoff, })), agentConfigs, queue, @@ -743,7 +848,11 @@ export class OpenMultiAgent { * then resolving them to real IDs before adding tasks to the queue. */ private loadSpecsIntoQueue( - specs: ReadonlyArray, + specs: ReadonlyArray, agentConfigs: AgentConfig[], queue: TaskQueue, ): void { @@ -760,6 +869,9 @@ export class OpenMultiAgent { assignee: spec.assignee && agentNames.has(spec.assignee) ? spec.assignee : undefined, + maxRetries: spec.maxRetries, + retryDelayMs: spec.retryDelayMs, + retryBackoff: spec.retryBackoff, }) titleToId.set(spec.title.toLowerCase().trim(), task.id) createdTasks.push(task) diff --git a/src/task/task.ts b/src/task/task.ts index 9a11476..d74e70b 100644 --- a/src/task/task.ts +++ b/src/task/task.ts @@ -31,6 +31,9 @@ export function createTask(input: { description: string assignee?: string dependsOn?: string[] + maxRetries?: number + retryDelayMs?: number + retryBackoff?: number }): Task { const now = new Date() return { @@ -43,6 +46,9 @@ export function createTask(input: { result: undefined, createdAt: now, updatedAt: now, + maxRetries: input.maxRetries, + retryDelayMs: input.retryDelayMs, + retryBackoff: input.retryBackoff, } } diff --git a/src/types.ts b/src/types.ts index 6e76640..bd2ce64 100644 --- a/src/types.ts +++ b/src/types.ts @@ -281,6 +281,12 @@ export interface Task { result?: string readonly createdAt: Date updatedAt: Date + /** Maximum number of retry attempts on failure (default: 0 — no retry). */ + readonly maxRetries?: number + /** Base delay in ms before the first retry (default: 1000). */ + readonly retryDelayMs?: number + /** Exponential backoff multiplier (default: 2). */ + readonly retryBackoff?: number } // --------------------------------------------------------------------------- @@ -294,6 +300,7 @@ export interface OrchestratorEvent { | 'agent_complete' | 'task_start' | 'task_complete' + | 'task_retry' | 'message' | 'error' readonly agent?: string diff --git a/tests/task-retry.test.ts b/tests/task-retry.test.ts new file mode 100644 index 0000000..56bdb76 --- /dev/null +++ b/tests/task-retry.test.ts @@ -0,0 +1,368 @@ +import { describe, it, expect, vi } from 'vitest' +import { createTask } from '../src/task/task.js' +import { executeWithRetry, computeRetryDelay } from '../src/orchestrator/orchestrator.js' +import type { AgentRunResult } from '../src/types.js' + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const SUCCESS_RESULT: AgentRunResult = { + success: true, + output: 'done', + messages: [], + tokenUsage: { input_tokens: 10, output_tokens: 20 }, + toolCalls: [], +} + +const FAILURE_RESULT: AgentRunResult = { + success: false, + output: 'agent failed', + messages: [], + tokenUsage: { input_tokens: 10, output_tokens: 20 }, + toolCalls: [], +} + +/** No-op delay for tests. */ +const noDelay = () => Promise.resolve() + +// --------------------------------------------------------------------------- +// computeRetryDelay +// --------------------------------------------------------------------------- + +describe('computeRetryDelay', () => { + it('computes exponential backoff', () => { + expect(computeRetryDelay(1000, 2, 1)).toBe(1000) // 1000 * 2^0 + expect(computeRetryDelay(1000, 2, 2)).toBe(2000) // 1000 * 2^1 + expect(computeRetryDelay(1000, 2, 3)).toBe(4000) // 1000 * 2^2 + }) + + it('caps at 30 seconds', () => { + // 1000 * 2^20 = 1,048,576,000 — way over cap + expect(computeRetryDelay(1000, 2, 21)).toBe(30_000) + }) + + it('handles backoff of 1 (constant delay)', () => { + expect(computeRetryDelay(500, 1, 1)).toBe(500) + expect(computeRetryDelay(500, 1, 5)).toBe(500) + }) +}) + +// --------------------------------------------------------------------------- +// createTask: retry fields +// --------------------------------------------------------------------------- + +describe('createTask with retry fields', () => { + it('passes through retry config', () => { + const t = createTask({ + title: 'Retry task', + description: 'test', + maxRetries: 3, + retryDelayMs: 500, + retryBackoff: 1.5, + }) + expect(t.maxRetries).toBe(3) + expect(t.retryDelayMs).toBe(500) + expect(t.retryBackoff).toBe(1.5) + }) + + it('defaults retry fields to undefined', () => { + const t = createTask({ title: 'No retry', description: 'test' }) + expect(t.maxRetries).toBeUndefined() + expect(t.retryDelayMs).toBeUndefined() + expect(t.retryBackoff).toBeUndefined() + }) +}) + +// --------------------------------------------------------------------------- +// executeWithRetry — tests the real exported function +// --------------------------------------------------------------------------- + +describe('executeWithRetry', () => { + it('succeeds on first attempt with no retry config', async () => { + const run = vi.fn().mockResolvedValue(SUCCESS_RESULT) + const task = createTask({ title: 'Simple', description: 'test' }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + expect(result.output).toBe('done') + expect(run).toHaveBeenCalledTimes(1) + }) + + it('succeeds on first attempt even when maxRetries > 0', async () => { + const run = vi.fn().mockResolvedValue(SUCCESS_RESULT) + const task = createTask({ + title: 'Has retries', + description: 'test', + maxRetries: 3, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + expect(run).toHaveBeenCalledTimes(1) + }) + + it('retries on exception and succeeds on second attempt', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('transient error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Retry task', + description: 'test', + maxRetries: 2, + retryDelayMs: 100, + retryBackoff: 2, + }) + + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) + + expect(result.success).toBe(true) + expect(run).toHaveBeenCalledTimes(2) + expect(retryEvents).toHaveLength(1) + expect(retryEvents[0]).toEqual({ + attempt: 1, + maxAttempts: 3, + error: 'transient error', + nextDelayMs: 100, // 100 * 2^0 + }) + }) + + it('retries on success:false and succeeds on second attempt', async () => { + const run = vi.fn() + .mockResolvedValueOnce(FAILURE_RESULT) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Retry task', + description: 'test', + maxRetries: 1, + retryDelayMs: 50, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + expect(run).toHaveBeenCalledTimes(2) + }) + + it('exhausts all retries on persistent exception', async () => { + const run = vi.fn().mockRejectedValue(new Error('persistent error')) + + const task = createTask({ + title: 'Always fails', + description: 'test', + maxRetries: 2, + retryDelayMs: 10, + retryBackoff: 1, + }) + + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) + + expect(result.success).toBe(false) + expect(result.output).toBe('persistent error') + expect(run).toHaveBeenCalledTimes(3) // 1 initial + 2 retries + expect(retryEvents).toHaveLength(2) + }) + + it('exhausts all retries on persistent success:false', async () => { + const run = vi.fn().mockResolvedValue(FAILURE_RESULT) + + const task = createTask({ + title: 'Always fails', + description: 'test', + maxRetries: 1, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(false) + expect(result.output).toBe('agent failed') + expect(run).toHaveBeenCalledTimes(2) + }) + + it('emits correct exponential backoff delays', async () => { + const run = vi.fn().mockRejectedValue(new Error('error')) + + const task = createTask({ + title: 'Backoff test', + description: 'test', + maxRetries: 3, + retryDelayMs: 100, + retryBackoff: 2, + }) + + const retryEvents: Array<{ nextDelayMs: number }> = [] + await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) + + expect(retryEvents).toHaveLength(3) + expect(retryEvents[0]!.nextDelayMs).toBe(100) // 100 * 2^0 + expect(retryEvents[1]!.nextDelayMs).toBe(200) // 100 * 2^1 + expect(retryEvents[2]!.nextDelayMs).toBe(400) // 100 * 2^2 + }) + + it('no retry events when maxRetries is 0 (default)', async () => { + const run = vi.fn().mockRejectedValue(new Error('fail')) + const task = createTask({ title: 'No retry', description: 'test' }) + + const retryEvents: unknown[] = [] + const result = await executeWithRetry( + run, + task, + (data) => retryEvents.push(data), + noDelay, + ) + + expect(result.success).toBe(false) + expect(run).toHaveBeenCalledTimes(1) + expect(retryEvents).toHaveLength(0) + }) + + it('calls the delay function with computed delay', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Delay test', + description: 'test', + maxRetries: 1, + retryDelayMs: 250, + retryBackoff: 3, + }) + + const mockDelay = vi.fn().mockResolvedValue(undefined) + await executeWithRetry(run, task, undefined, mockDelay) + + expect(mockDelay).toHaveBeenCalledTimes(1) + expect(mockDelay).toHaveBeenCalledWith(250) // 250 * 3^0 + }) + + it('caps delay at 30 seconds', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Cap test', + description: 'test', + maxRetries: 1, + retryDelayMs: 50_000, + retryBackoff: 2, + }) + + const mockDelay = vi.fn().mockResolvedValue(undefined) + await executeWithRetry(run, task, undefined, mockDelay) + + expect(mockDelay).toHaveBeenCalledWith(30_000) // capped + }) + + it('accumulates token usage across retry attempts', async () => { + const failResult: AgentRunResult = { + ...FAILURE_RESULT, + tokenUsage: { input_tokens: 100, output_tokens: 50 }, + } + const successResult: AgentRunResult = { + ...SUCCESS_RESULT, + tokenUsage: { input_tokens: 200, output_tokens: 80 }, + } + + const run = vi.fn() + .mockResolvedValueOnce(failResult) + .mockResolvedValueOnce(failResult) + .mockResolvedValueOnce(successResult) + + const task = createTask({ + title: 'Token test', + description: 'test', + maxRetries: 2, + retryDelayMs: 10, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(true) + // 100+100+200 input, 50+50+80 output + expect(result.tokenUsage.input_tokens).toBe(400) + expect(result.tokenUsage.output_tokens).toBe(180) + }) + + it('accumulates token usage even when all retries fail', async () => { + const failResult: AgentRunResult = { + ...FAILURE_RESULT, + tokenUsage: { input_tokens: 50, output_tokens: 30 }, + } + + const run = vi.fn().mockResolvedValue(failResult) + + const task = createTask({ + title: 'Token fail test', + description: 'test', + maxRetries: 1, + }) + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(false) + // 50+50 input, 30+30 output (2 attempts) + expect(result.tokenUsage.input_tokens).toBe(100) + expect(result.tokenUsage.output_tokens).toBe(60) + }) + + it('clamps negative maxRetries to 0 (single attempt)', async () => { + const run = vi.fn().mockRejectedValue(new Error('fail')) + + const task = createTask({ + title: 'Negative retry', + description: 'test', + maxRetries: -5, + }) + // Manually set negative value since createTask doesn't validate + ;(task as any).maxRetries = -5 + + const result = await executeWithRetry(run, task, undefined, noDelay) + + expect(result.success).toBe(false) + expect(run).toHaveBeenCalledTimes(1) // exactly 1 attempt, no retries + }) + + it('clamps backoff below 1 to 1 (constant delay)', async () => { + const run = vi.fn() + .mockRejectedValueOnce(new Error('error')) + .mockResolvedValueOnce(SUCCESS_RESULT) + + const task = createTask({ + title: 'Bad backoff', + description: 'test', + maxRetries: 1, + retryDelayMs: 100, + retryBackoff: -2, + }) + ;(task as any).retryBackoff = -2 + + const mockDelay = vi.fn().mockResolvedValue(undefined) + await executeWithRetry(run, task, undefined, mockDelay) + + // backoff clamped to 1, so delay = 100 * 1^0 = 100 + expect(mockDelay).toHaveBeenCalledWith(100) + }) +})