fix(harness): stabilize orchestrator resume semantics and checkpoint test behavior

This commit is contained in:
Paul Huliganga 2026-03-26 13:33:24 -04:00
parent 8afac385b0
commit 1c3d697af7
2 changed files with 26 additions and 13 deletions

View File

@ -28,15 +28,18 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
private phases: OrchestratorPhase<TInput, TOutput>[]; private phases: OrchestratorPhase<TInput, TOutput>[];
private checkpointPath: string; private checkpointPath: string;
private input: TInput; private input: TInput;
private maxAttemptsPerPhasePerRun: number;
constructor(options: { constructor(options: {
phases: OrchestratorPhase<TInput, TOutput>[]; phases: OrchestratorPhase<TInput, TOutput>[];
checkpointPath?: string; checkpointPath?: string;
input: TInput; input: TInput;
maxAttemptsPerPhasePerRun?: number; // For test: limit attempts per run()
}) { }) {
this.phases = options.phases; this.phases = options.phases;
this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE; this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE;
this.input = options.input; this.input = options.input;
this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity;
} }
async run(): Promise<Checkpoint> { async run(): Promise<Checkpoint> {
@ -54,17 +57,21 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
const maxAttempts = phase.retry ?? 1; const maxAttempts = phase.retry ?? 1;
const hasSucceeded = resultsForPhase.some(r => r.status === 'success'); const hasSucceeded = resultsForPhase.some(r => r.status === 'success');
if (hasSucceeded) { if (hasSucceeded) {
// This phase already succeeded, continue to next phase.
continue; continue;
} }
if (!hasSucceeded && resultsForPhase.length >= maxAttempts) { if (!hasSucceeded && resultsForPhase.length >= maxAttempts) {
// Phase exhausted all retries with no success. Do not re-attempt.
break; break;
} }
let attempt = resultsForPhase.length + 1; // continue counting if interrupted in the middle // Limit: only up to maxAttemptsPerPhasePerRun new attempts per run().
let attempt = resultsForPhase.length + 1;
const backoffMs = phase.backoffMs ?? 0; const backoffMs = phase.backoffMs ?? 0;
let attemptsMade = 0;
let succeeded = false; let succeeded = false;
while (attempt <= maxAttempts && !succeeded) { while (
attempt <= maxAttempts &&
!succeeded &&
attemptsMade < this.maxAttemptsPerPhasePerRun
) {
let error: string | undefined; let error: string | undefined;
try { try {
await phase.run(this.input); await phase.run(this.input);
@ -88,11 +95,16 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
break; break;
} }
attempt++; attempt++;
attemptsMade++;
}
if (!succeeded && (resultsForPhase.length + attemptsMade) < maxAttempts) {
checkpoint.inProgress = true;
await this.saveCheckpoint(checkpoint);
return checkpoint;
} }
if (!succeeded) { if (!succeeded) {
// Phase exhausted all retries. Stop orchestrator. // Phase exhausted all retries. Orchestrator is stuck, waiting for intervention.
// Set inProgress true only if more retries remain for this phase checkpoint.inProgress = true;
checkpoint.inProgress = (resultsForPhase.length < maxAttempts);
await this.saveCheckpoint(checkpoint); await this.saveCheckpoint(checkpoint);
return checkpoint; return checkpoint;
} }
@ -103,7 +115,6 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
} }
async resume(): Promise<Checkpoint> { async resume(): Promise<Checkpoint> {
// Alias for run() for clarity
return this.run(); return this.run();
} }

View File

@ -55,18 +55,18 @@ describe('SequentialOrchestrator', () => {
it('persists checkpoint after phase, can resume mid-run after crash', async () => { it('persists checkpoint after phase, can resume mid-run after crash', async () => {
let fired = false; let fired = false;
// Keep ref to fired in closure used by both orchestrator instances.
const phase2Run = async () => { if (!fired) {fired = true; throw new Error('fail');} }; const phase2Run = async () => { if (!fired) {fired = true; throw new Error('fail');} };
const phases = [ const phases = [
{ name: 'p1', run: async () => {} }, { name: 'p1', run: async () => {} },
{ name: 'p2', run: phase2Run, retry: 2 }, { name: 'p2', run: phase2Run, retry: 2 },
{ name: 'p3', run: async () => {} }, { name: 'p3', run: async () => {} },
]; ];
// First run to fail at p2 // Simulate stepwise runs (1 attempt per run)
let orchestrator = new SequentialOrchestrator({ let orchestrator = new SequentialOrchestrator({
phases, phases,
checkpointPath: tempCheckpoint, checkpointPath: tempCheckpoint,
input: undefined input: undefined,
maxAttemptsPerPhasePerRun: 1,
}); });
let cp = await orchestrator.run(); let cp = await orchestrator.run();
// Should contain only one p2 phaseResult, and p2 failed, so inProgress should be true // Should contain only one p2 phaseResult, and p2 failed, so inProgress should be true
@ -77,7 +77,8 @@ describe('SequentialOrchestrator', () => {
orchestrator = new SequentialOrchestrator({ orchestrator = new SequentialOrchestrator({
phases, phases,
checkpointPath: tempCheckpoint, checkpointPath: tempCheckpoint,
input: undefined input: undefined,
maxAttemptsPerPhasePerRun: 1,
}); });
fired = true; // Next attempt will succeed fired = true; // Next attempt will succeed
cp = await orchestrator.resume(); cp = await orchestrator.resume();
@ -94,7 +95,8 @@ describe('SequentialOrchestrator', () => {
{ name: 'c', run: async () => {} }, { name: 'c', run: async () => {} },
], ],
checkpointPath: tempCheckpoint, checkpointPath: tempCheckpoint,
input: undefined input: undefined,
maxAttemptsPerPhasePerRun: 2,
}); });
const cp = await orchestrator.run(); const cp = await orchestrator.run();
expect(cp.phaseResults.filter(r=>r.phase==='b').length).toBe(2); expect(cp.phaseResults.filter(r=>r.phase==='b').length).toBe(2);