fix: address review — extract executeWithRetry, add delay cap, fix tests
- Extract `executeWithRetry()` as a testable exported function - Add `computeRetryDelay()` with 30s max cap (prevents runaway backoff) - Remove retry fields from `ParsedTaskSpec` (dead code for runTeam path) - Deduplicate retry event emission (single code path for both error types) - Injectable delay function for test determinism - Rewrite tests to call the real `executeWithRetry`, not a copy - 15 tests covering: success, retry+success, retry+failure, backoff calculation, delay cap, delay function injection, no-retry default
This commit is contained in:
parent
d7714e6189
commit
972f9f0c74
|
|
@ -54,7 +54,7 @@
|
|||
// Orchestrator (primary entry point)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export { OpenMultiAgent } from './orchestrator/orchestrator.js'
|
||||
export { OpenMultiAgent, executeWithRetry, computeRetryDelay } from './orchestrator/orchestrator.js'
|
||||
export { Scheduler } from './orchestrator/scheduler.js'
|
||||
export type { SchedulingStrategy } from './orchestrator/scheduler.js'
|
||||
|
||||
|
|
|
|||
|
|
@ -92,11 +92,96 @@ function buildAgent(config: AgentConfig): Agent {
|
|||
return new Agent(config, registry, executor)
|
||||
}
|
||||
|
||||
/** Promise-based delay for retry backoff. */
|
||||
/** Promise-based delay. */
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
/** Maximum delay cap to prevent runaway exponential backoff (30 seconds). */
|
||||
const MAX_RETRY_DELAY_MS = 30_000
|
||||
|
||||
/**
|
||||
* Compute the retry delay for a given attempt, capped at {@link MAX_RETRY_DELAY_MS}.
|
||||
*/
|
||||
export function computeRetryDelay(
|
||||
baseDelay: number,
|
||||
backoff: number,
|
||||
attempt: number,
|
||||
): number {
|
||||
return Math.min(baseDelay * backoff ** (attempt - 1), MAX_RETRY_DELAY_MS)
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an agent task with optional retry and exponential backoff.
|
||||
*
|
||||
* Exported for testability — called internally by {@link executeQueue}.
|
||||
*
|
||||
* @param run - The function that executes the task (typically `pool.run`).
|
||||
* @param task - The task to execute (retry config read from its fields).
|
||||
* @param onRetry - Called before each retry sleep with event data.
|
||||
* @param delayFn - Injectable delay function (defaults to real `sleep`).
|
||||
* @returns The final {@link AgentRunResult} from the last attempt.
|
||||
*/
|
||||
export async function executeWithRetry(
|
||||
run: () => Promise<AgentRunResult>,
|
||||
task: Task,
|
||||
onRetry?: (data: { attempt: number; maxAttempts: number; error: string; nextDelayMs: number }) => void,
|
||||
delayFn: (ms: number) => Promise<void> = sleep,
|
||||
): Promise<AgentRunResult> {
|
||||
const maxAttempts = (task.maxRetries ?? 0) + 1
|
||||
const baseDelay = task.retryDelayMs ?? 1000
|
||||
const backoff = task.retryBackoff ?? 2
|
||||
|
||||
let lastError: string = ''
|
||||
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
const result = await run()
|
||||
if (result.success) {
|
||||
return result
|
||||
}
|
||||
lastError = result.output
|
||||
|
||||
// Failure — retry or give up
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = computeRetryDelay(baseDelay, backoff, attempt)
|
||||
onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay })
|
||||
await delayFn(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
return result
|
||||
} catch (err) {
|
||||
lastError = err instanceof Error ? err.message : String(err)
|
||||
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = computeRetryDelay(baseDelay, backoff, attempt)
|
||||
onRetry?.({ attempt, maxAttempts, error: lastError, nextDelayMs: delay })
|
||||
await delayFn(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
// All retries exhausted — return a failure result
|
||||
return {
|
||||
success: false,
|
||||
output: lastError,
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 0, output_tokens: 0 },
|
||||
toolCalls: [],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Should not be reached, but TypeScript needs a return
|
||||
return {
|
||||
success: false,
|
||||
output: lastError,
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 0, output_tokens: 0 },
|
||||
toolCalls: [],
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Parsed task spec (result of coordinator decomposition)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -106,9 +191,6 @@ interface ParsedTaskSpec {
|
|||
description: string
|
||||
assignee?: string
|
||||
dependsOn?: string[]
|
||||
maxRetries?: number
|
||||
retryDelayMs?: number
|
||||
retryBackoff?: number
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -247,85 +329,51 @@ async function executeQueue(
|
|||
// Build the prompt: inject shared memory context + task description
|
||||
const prompt = await buildTaskPrompt(task, team)
|
||||
|
||||
const maxAttempts = (task.maxRetries ?? 0) + 1
|
||||
const baseDelay = task.retryDelayMs ?? 1000
|
||||
const backoff = task.retryBackoff ?? 2
|
||||
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
const result = await pool.run(assignee, prompt)
|
||||
ctx.agentResults.set(`${assignee}:${task.id}`, result)
|
||||
|
||||
if (result.success) {
|
||||
// Persist result into shared memory so other agents can read it
|
||||
const sharedMem = team.getSharedMemoryInstance()
|
||||
if (sharedMem) {
|
||||
await sharedMem.write(assignee, `task:${task.id}:result`, result.output)
|
||||
}
|
||||
|
||||
queue.complete(task.id, result.output)
|
||||
|
||||
config.onProgress?.({
|
||||
type: 'task_complete',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: result,
|
||||
} satisfies OrchestratorEvent)
|
||||
|
||||
config.onProgress?.({
|
||||
type: 'agent_complete',
|
||||
agent: assignee,
|
||||
task: task.id,
|
||||
data: result,
|
||||
} satisfies OrchestratorEvent)
|
||||
break
|
||||
}
|
||||
|
||||
// Agent returned success: false — treat as failure for retry purposes
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = baseDelay * backoff ** (attempt - 1)
|
||||
config.onProgress?.({
|
||||
type: 'task_retry',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay },
|
||||
} satisfies OrchestratorEvent)
|
||||
await sleep(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
// All retries exhausted
|
||||
queue.fail(task.id, result.output)
|
||||
const result = await executeWithRetry(
|
||||
() => pool.run(assignee, prompt),
|
||||
task,
|
||||
(retryData) => {
|
||||
config.onProgress?.({
|
||||
type: 'error',
|
||||
type: 'task_retry',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: result,
|
||||
data: retryData,
|
||||
} satisfies OrchestratorEvent)
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err)
|
||||
},
|
||||
)
|
||||
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = baseDelay * backoff ** (attempt - 1)
|
||||
config.onProgress?.({
|
||||
type: 'task_retry',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: { attempt, maxAttempts, error: message, nextDelayMs: delay },
|
||||
} satisfies OrchestratorEvent)
|
||||
await sleep(delay)
|
||||
continue
|
||||
}
|
||||
ctx.agentResults.set(`${assignee}:${task.id}`, result)
|
||||
|
||||
// All retries exhausted
|
||||
queue.fail(task.id, message)
|
||||
config.onProgress?.({
|
||||
type: 'error',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: err,
|
||||
} satisfies OrchestratorEvent)
|
||||
if (result.success) {
|
||||
// Persist result into shared memory so other agents can read it
|
||||
const sharedMem = team.getSharedMemoryInstance()
|
||||
if (sharedMem) {
|
||||
await sharedMem.write(assignee, `task:${task.id}:result`, result.output)
|
||||
}
|
||||
|
||||
queue.complete(task.id, result.output)
|
||||
|
||||
config.onProgress?.({
|
||||
type: 'task_complete',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: result,
|
||||
} satisfies OrchestratorEvent)
|
||||
|
||||
config.onProgress?.({
|
||||
type: 'agent_complete',
|
||||
agent: assignee,
|
||||
task: task.id,
|
||||
data: result,
|
||||
} satisfies OrchestratorEvent)
|
||||
} else {
|
||||
queue.fail(task.id, result.output)
|
||||
config.onProgress?.({
|
||||
type: 'error',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: result,
|
||||
} satisfies OrchestratorEvent)
|
||||
}
|
||||
})
|
||||
|
||||
|
|
@ -792,7 +840,11 @@ export class OpenMultiAgent {
|
|||
* then resolving them to real IDs before adding tasks to the queue.
|
||||
*/
|
||||
private loadSpecsIntoQueue(
|
||||
specs: ReadonlyArray<ParsedTaskSpec>,
|
||||
specs: ReadonlyArray<ParsedTaskSpec & {
|
||||
maxRetries?: number
|
||||
retryDelayMs?: number
|
||||
retryBackoff?: number
|
||||
}>,
|
||||
agentConfigs: AgentConfig[],
|
||||
queue: TaskQueue,
|
||||
): void {
|
||||
|
|
|
|||
|
|
@ -1,13 +1,52 @@
|
|||
import { describe, it, expect, vi } from 'vitest'
|
||||
import { z } from 'zod'
|
||||
import { createTask } from '../src/task/task.js'
|
||||
import { TaskQueue } from '../src/task/queue.js'
|
||||
import type { Task, OrchestratorEvent, AgentRunResult, LLMResponse, LLMAdapter } from '../src/types.js'
|
||||
import { Agent } from '../src/agent/agent.js'
|
||||
import { AgentRunner } from '../src/agent/runner.js'
|
||||
import { AgentPool } from '../src/agent/pool.js'
|
||||
import { ToolRegistry } from '../src/tool/framework.js'
|
||||
import { ToolExecutor } from '../src/tool/executor.js'
|
||||
import { executeWithRetry, computeRetryDelay } from '../src/orchestrator/orchestrator.js'
|
||||
import type { AgentRunResult } from '../src/types.js'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const SUCCESS_RESULT: AgentRunResult = {
|
||||
success: true,
|
||||
output: 'done',
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 10, output_tokens: 20 },
|
||||
toolCalls: [],
|
||||
}
|
||||
|
||||
const FAILURE_RESULT: AgentRunResult = {
|
||||
success: false,
|
||||
output: 'agent failed',
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 10, output_tokens: 20 },
|
||||
toolCalls: [],
|
||||
}
|
||||
|
||||
/** No-op delay for tests. */
|
||||
const noDelay = () => Promise.resolve()
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// computeRetryDelay
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('computeRetryDelay', () => {
|
||||
it('computes exponential backoff', () => {
|
||||
expect(computeRetryDelay(1000, 2, 1)).toBe(1000) // 1000 * 2^0
|
||||
expect(computeRetryDelay(1000, 2, 2)).toBe(2000) // 1000 * 2^1
|
||||
expect(computeRetryDelay(1000, 2, 3)).toBe(4000) // 1000 * 2^2
|
||||
})
|
||||
|
||||
it('caps at 30 seconds', () => {
|
||||
// 1000 * 2^20 = 1,048,576,000 — way over cap
|
||||
expect(computeRetryDelay(1000, 2, 21)).toBe(30_000)
|
||||
})
|
||||
|
||||
it('handles backoff of 1 (constant delay)', () => {
|
||||
expect(computeRetryDelay(500, 1, 1)).toBe(500)
|
||||
expect(computeRetryDelay(500, 1, 5)).toBe(500)
|
||||
})
|
||||
})
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// createTask: retry fields
|
||||
|
|
@ -36,239 +75,204 @@ describe('createTask with retry fields', () => {
|
|||
})
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// executeQueue retry integration (mock agent pool)
|
||||
// executeWithRetry — tests the real exported function
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Build a mock adapter that returns responses in sequence.
|
||||
* Each response can be configured as success or failure.
|
||||
*/
|
||||
function mockAdapter(responses: Array<{ text: string; success?: boolean }>): LLMAdapter {
|
||||
let callIndex = 0
|
||||
return {
|
||||
name: 'mock',
|
||||
async chat() {
|
||||
const resp = responses[callIndex++] ?? { text: 'fallback' }
|
||||
return {
|
||||
id: `mock-${callIndex}`,
|
||||
content: [{ type: 'text' as const, text: resp.text }],
|
||||
model: 'mock-model',
|
||||
stop_reason: 'end_turn',
|
||||
usage: { input_tokens: 10, output_tokens: 20 },
|
||||
} satisfies LLMResponse
|
||||
},
|
||||
async *stream() { /* unused */ },
|
||||
}
|
||||
}
|
||||
|
||||
function buildMockPool(
|
||||
agentName: string,
|
||||
responses: Array<{ text: string; success?: boolean }>,
|
||||
): AgentPool {
|
||||
const adapter = mockAdapter(responses)
|
||||
const registry = new ToolRegistry()
|
||||
const executor = new ToolExecutor(registry)
|
||||
const config = { name: agentName, model: 'mock-model', systemPrompt: 'test' }
|
||||
const agent = new Agent(config, registry, executor)
|
||||
|
||||
// Inject a pre-built runner to bypass createAdapter
|
||||
const runner = new AgentRunner(adapter, registry, executor, {
|
||||
model: config.model,
|
||||
systemPrompt: config.systemPrompt,
|
||||
agentName: config.name,
|
||||
})
|
||||
;(agent as any).runner = runner
|
||||
|
||||
const pool = new AgentPool(5)
|
||||
pool.add(agent)
|
||||
return pool
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal re-implementation of the retry logic from executeQueue for isolated testing.
|
||||
* This tests the retry pattern directly without needing the full orchestrator.
|
||||
*/
|
||||
async function executeTaskWithRetry(
|
||||
task: Task,
|
||||
pool: AgentPool,
|
||||
onProgress?: (event: OrchestratorEvent) => void,
|
||||
): Promise<AgentRunResult> {
|
||||
const maxAttempts = (task.maxRetries ?? 0) + 1
|
||||
const baseDelay = task.retryDelayMs ?? 1000
|
||||
const backoff = task.retryBackoff ?? 2
|
||||
const assignee = task.assignee!
|
||||
|
||||
let lastResult: AgentRunResult | undefined
|
||||
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
const result = await pool.run(assignee, task.description)
|
||||
lastResult = result
|
||||
|
||||
if (result.success) {
|
||||
return result
|
||||
}
|
||||
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = baseDelay * backoff ** (attempt - 1)
|
||||
onProgress?.({
|
||||
type: 'task_retry',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: { attempt, maxAttempts, error: result.output, nextDelayMs: delay },
|
||||
})
|
||||
// Skip actual sleep in tests — just verify the event was emitted
|
||||
continue
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err)
|
||||
|
||||
if (attempt < maxAttempts) {
|
||||
const delay = baseDelay * backoff ** (attempt - 1)
|
||||
onProgress?.({
|
||||
type: 'task_retry',
|
||||
task: task.id,
|
||||
agent: assignee,
|
||||
data: { attempt, maxAttempts, error: message, nextDelayMs: delay },
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
return {
|
||||
success: false,
|
||||
output: message,
|
||||
messages: [],
|
||||
tokenUsage: { input_tokens: 0, output_tokens: 0 },
|
||||
toolCalls: [],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lastResult!
|
||||
}
|
||||
|
||||
describe('Task retry logic', () => {
|
||||
describe('executeWithRetry', () => {
|
||||
it('succeeds on first attempt with no retry config', async () => {
|
||||
const pool = buildMockPool('worker', [{ text: 'done' }])
|
||||
const task = createTask({
|
||||
title: 'Simple',
|
||||
description: 'do something',
|
||||
assignee: 'worker',
|
||||
})
|
||||
const run = vi.fn().mockResolvedValue(SUCCESS_RESULT)
|
||||
const task = createTask({ title: 'Simple', description: 'test' })
|
||||
|
||||
const result = await executeWithRetry(run, task, undefined, noDelay)
|
||||
|
||||
const result = await executeTaskWithRetry(task, pool)
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toBe('done')
|
||||
expect(run).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('retries and succeeds on second attempt', async () => {
|
||||
// First call returns empty text (success:true but the agent just returned text)
|
||||
// We need to simulate failure. Since AgentRunner always returns success:true
|
||||
// for normal text responses, let's test with multiple responses where
|
||||
// the first throws and the second succeeds.
|
||||
const pool = buildMockPool('worker', [
|
||||
{ text: 'result1' },
|
||||
{ text: 'result2' },
|
||||
])
|
||||
it('succeeds on first attempt even when maxRetries > 0', async () => {
|
||||
const run = vi.fn().mockResolvedValue(SUCCESS_RESULT)
|
||||
const task = createTask({
|
||||
title: 'Has retries',
|
||||
description: 'test',
|
||||
maxRetries: 3,
|
||||
})
|
||||
|
||||
const result = await executeWithRetry(run, task, undefined, noDelay)
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(run).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('retries on exception and succeeds on second attempt', async () => {
|
||||
const run = vi.fn()
|
||||
.mockRejectedValueOnce(new Error('transient error'))
|
||||
.mockResolvedValueOnce(SUCCESS_RESULT)
|
||||
|
||||
const task = createTask({
|
||||
title: 'Retry task',
|
||||
description: 'do something',
|
||||
assignee: 'worker',
|
||||
description: 'test',
|
||||
maxRetries: 2,
|
||||
retryDelayMs: 10,
|
||||
retryBackoff: 1,
|
||||
retryDelayMs: 100,
|
||||
retryBackoff: 2,
|
||||
})
|
||||
|
||||
// Simulate first attempt failure by making pool.run throw on first call
|
||||
let callCount = 0
|
||||
const originalRun = pool.run.bind(pool)
|
||||
vi.spyOn(pool, 'run').mockImplementation(async (name, prompt) => {
|
||||
callCount++
|
||||
if (callCount === 1) {
|
||||
throw new Error('transient LLM error')
|
||||
}
|
||||
return originalRun(name, prompt)
|
||||
})
|
||||
|
||||
const events: OrchestratorEvent[] = []
|
||||
const result = await executeTaskWithRetry(task, pool, (e) => events.push(e))
|
||||
const retryEvents: unknown[] = []
|
||||
const result = await executeWithRetry(
|
||||
run,
|
||||
task,
|
||||
(data) => retryEvents.push(data),
|
||||
noDelay,
|
||||
)
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toBe('result1') // second adapter response (first call threw)
|
||||
expect(callCount).toBe(2)
|
||||
|
||||
// Verify task_retry event was emitted
|
||||
expect(events).toHaveLength(1)
|
||||
expect(events[0]!.type).toBe('task_retry')
|
||||
expect((events[0]!.data as any).attempt).toBe(1)
|
||||
expect(run).toHaveBeenCalledTimes(2)
|
||||
expect(retryEvents).toHaveLength(1)
|
||||
expect(retryEvents[0]).toEqual({
|
||||
attempt: 1,
|
||||
maxAttempts: 3,
|
||||
error: 'transient error',
|
||||
nextDelayMs: 100, // 100 * 2^0
|
||||
})
|
||||
})
|
||||
|
||||
it('exhausts all retries and fails', async () => {
|
||||
const pool = buildMockPool('worker', [])
|
||||
it('retries on success:false and succeeds on second attempt', async () => {
|
||||
const run = vi.fn()
|
||||
.mockResolvedValueOnce(FAILURE_RESULT)
|
||||
.mockResolvedValueOnce(SUCCESS_RESULT)
|
||||
|
||||
const task = createTask({
|
||||
title: 'Retry task',
|
||||
description: 'test',
|
||||
maxRetries: 1,
|
||||
retryDelayMs: 50,
|
||||
})
|
||||
|
||||
const result = await executeWithRetry(run, task, undefined, noDelay)
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(run).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('exhausts all retries on persistent exception', async () => {
|
||||
const run = vi.fn().mockRejectedValue(new Error('persistent error'))
|
||||
|
||||
const task = createTask({
|
||||
title: 'Always fails',
|
||||
description: 'do something',
|
||||
assignee: 'worker',
|
||||
description: 'test',
|
||||
maxRetries: 2,
|
||||
retryDelayMs: 10,
|
||||
retryBackoff: 1,
|
||||
})
|
||||
|
||||
vi.spyOn(pool, 'run').mockRejectedValue(new Error('persistent error'))
|
||||
|
||||
const events: OrchestratorEvent[] = []
|
||||
const result = await executeTaskWithRetry(task, pool, (e) => events.push(e))
|
||||
const retryEvents: unknown[] = []
|
||||
const result = await executeWithRetry(
|
||||
run,
|
||||
task,
|
||||
(data) => retryEvents.push(data),
|
||||
noDelay,
|
||||
)
|
||||
|
||||
expect(result.success).toBe(false)
|
||||
expect(result.output).toBe('persistent error')
|
||||
|
||||
// Should have emitted 2 retry events (attempt 1 and 2), then failed on attempt 3
|
||||
expect(events).toHaveLength(2)
|
||||
expect(events.every(e => e.type === 'task_retry')).toBe(true)
|
||||
expect(run).toHaveBeenCalledTimes(3) // 1 initial + 2 retries
|
||||
expect(retryEvents).toHaveLength(2)
|
||||
})
|
||||
|
||||
it('emits correct backoff delays', async () => {
|
||||
const pool = buildMockPool('worker', [])
|
||||
it('exhausts all retries on persistent success:false', async () => {
|
||||
const run = vi.fn().mockResolvedValue(FAILURE_RESULT)
|
||||
|
||||
const task = createTask({
|
||||
title: 'Always fails',
|
||||
description: 'test',
|
||||
maxRetries: 1,
|
||||
})
|
||||
|
||||
const result = await executeWithRetry(run, task, undefined, noDelay)
|
||||
|
||||
expect(result.success).toBe(false)
|
||||
expect(result.output).toBe('agent failed')
|
||||
expect(run).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('emits correct exponential backoff delays', async () => {
|
||||
const run = vi.fn().mockRejectedValue(new Error('error'))
|
||||
|
||||
const task = createTask({
|
||||
title: 'Backoff test',
|
||||
description: 'do something',
|
||||
assignee: 'worker',
|
||||
description: 'test',
|
||||
maxRetries: 3,
|
||||
retryDelayMs: 100,
|
||||
retryBackoff: 2,
|
||||
})
|
||||
|
||||
vi.spyOn(pool, 'run').mockRejectedValue(new Error('error'))
|
||||
const retryEvents: Array<{ nextDelayMs: number }> = []
|
||||
await executeWithRetry(
|
||||
run,
|
||||
task,
|
||||
(data) => retryEvents.push(data),
|
||||
noDelay,
|
||||
)
|
||||
|
||||
const events: OrchestratorEvent[] = []
|
||||
await executeTaskWithRetry(task, pool, (e) => events.push(e))
|
||||
|
||||
// 3 retry events: attempts 1, 2, 3 (attempt 4 is the final failure)
|
||||
expect(events).toHaveLength(3)
|
||||
expect((events[0]!.data as any).nextDelayMs).toBe(100) // 100 * 2^0
|
||||
expect((events[1]!.data as any).nextDelayMs).toBe(200) // 100 * 2^1
|
||||
expect((events[2]!.data as any).nextDelayMs).toBe(400) // 100 * 2^2
|
||||
expect(retryEvents).toHaveLength(3)
|
||||
expect(retryEvents[0]!.nextDelayMs).toBe(100) // 100 * 2^0
|
||||
expect(retryEvents[1]!.nextDelayMs).toBe(200) // 100 * 2^1
|
||||
expect(retryEvents[2]!.nextDelayMs).toBe(400) // 100 * 2^2
|
||||
})
|
||||
|
||||
it('no retry events when maxRetries is 0', async () => {
|
||||
const pool = buildMockPool('worker', [])
|
||||
it('no retry events when maxRetries is 0 (default)', async () => {
|
||||
const run = vi.fn().mockRejectedValue(new Error('fail'))
|
||||
const task = createTask({ title: 'No retry', description: 'test' })
|
||||
|
||||
const task = createTask({
|
||||
title: 'No retry',
|
||||
description: 'do something',
|
||||
assignee: 'worker',
|
||||
maxRetries: 0,
|
||||
})
|
||||
|
||||
vi.spyOn(pool, 'run').mockRejectedValue(new Error('fail'))
|
||||
|
||||
const events: OrchestratorEvent[] = []
|
||||
const result = await executeTaskWithRetry(task, pool, (e) => events.push(e))
|
||||
const retryEvents: unknown[] = []
|
||||
const result = await executeWithRetry(
|
||||
run,
|
||||
task,
|
||||
(data) => retryEvents.push(data),
|
||||
noDelay,
|
||||
)
|
||||
|
||||
expect(result.success).toBe(false)
|
||||
expect(events).toHaveLength(0)
|
||||
expect(run).toHaveBeenCalledTimes(1)
|
||||
expect(retryEvents).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('calls the delay function with computed delay', async () => {
|
||||
const run = vi.fn()
|
||||
.mockRejectedValueOnce(new Error('error'))
|
||||
.mockResolvedValueOnce(SUCCESS_RESULT)
|
||||
|
||||
const task = createTask({
|
||||
title: 'Delay test',
|
||||
description: 'test',
|
||||
maxRetries: 1,
|
||||
retryDelayMs: 250,
|
||||
retryBackoff: 3,
|
||||
})
|
||||
|
||||
const mockDelay = vi.fn().mockResolvedValue(undefined)
|
||||
await executeWithRetry(run, task, undefined, mockDelay)
|
||||
|
||||
expect(mockDelay).toHaveBeenCalledTimes(1)
|
||||
expect(mockDelay).toHaveBeenCalledWith(250) // 250 * 3^0
|
||||
})
|
||||
|
||||
it('caps delay at 30 seconds', async () => {
|
||||
const run = vi.fn()
|
||||
.mockRejectedValueOnce(new Error('error'))
|
||||
.mockResolvedValueOnce(SUCCESS_RESULT)
|
||||
|
||||
const task = createTask({
|
||||
title: 'Cap test',
|
||||
description: 'test',
|
||||
maxRetries: 1,
|
||||
retryDelayMs: 50_000,
|
||||
retryBackoff: 2,
|
||||
})
|
||||
|
||||
const mockDelay = vi.fn().mockResolvedValue(undefined)
|
||||
await executeWithRetry(run, task, undefined, mockDelay)
|
||||
|
||||
expect(mockDelay).toHaveBeenCalledWith(30_000) // capped
|
||||
})
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue