feat(harness): sequential orchestrator with checkpoint/retry/restart, tests + docs
- Add SequentialOrchestrator service for ordered phase execution, per-phase retry/backoff, and restart-safe checkpointing - Persist phase attempt metadata (success/fail, attempts, timestamp, error) - Expose importable callable interface - Add full coverage unit tests for execution order, retry, checkpoint, and resume scenarios - Update docs and README for usage and dev guidance
This commit is contained in:
parent
edc5ce03ad
commit
c434733f0c
|
|
@ -35,6 +35,15 @@ A modern, self-hosted alternative to services like CopyMeThat. Store, organize,
|
|||
|
||||
---
|
||||
|
||||
### Backend Orchestrator (Phase Execution Utility)
|
||||
|
||||
A sequential orchestrator utility is available for robust, checkpointed phase execution with per-phase retries. Useful for automation harnesses or recipe import flows needing durable, restart-safe progress tracking.
|
||||
|
||||
- Location: `src/backend/services/SequentialOrchestrator.ts`
|
||||
- Usage/docs: [`docs/orchestrator.md`](docs/orchestrator.md)
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Prerequisites
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
# Orchestrator Usage
|
||||
|
||||
This service provides robust phase-based sequencing with automatic checkpointing and per-phase retry policy.
|
||||
|
||||
## Usage (Programmatic)
|
||||
|
||||
```
|
||||
ts
|
||||
import { SequentialOrchestrator } from './src/backend/services/SequentialOrchestrator';
|
||||
|
||||
const orchestrator = new SequentialOrchestrator({
|
||||
phases: [
|
||||
{ name: 'one', run: async () => { /* logic */ }, retry: 2, backoffMs: 250 },
|
||||
{ name: 'two', run: async () => { /* logic */ }},
|
||||
// ...more phases
|
||||
],
|
||||
checkpointPath: 'data/orchestrator-checkpoint.json', // optional, default as shown
|
||||
input: {...}, // input passed to each phase
|
||||
});
|
||||
|
||||
await orchestrator.run(); // Runs/resumes from last incomplete phase, checkpointing after each attempt
|
||||
```
|
||||
|
||||
## Features
|
||||
- **Sequential phases**: Executes provided phases in-order.
|
||||
- **Per-phase retry & backoff**: Configure max attempts and delay for each phase.
|
||||
- **Checkpointing**: Persists after every attempt (success/failure/attempt #, timestamp, error message if fail).
|
||||
- **Restart-safe**: Can safely resume after crash/restart, picks up at last incomplete phase.
|
||||
- **Minimal callable interface**: Import and use from your own services or app code.
|
||||
|
||||
## Checkpoint Schema
|
||||
See `src/backend/services/SequentialOrchestrator.ts` for full type:
|
||||
- `currentPhase`: index of next phase to execute
|
||||
- `phaseResults[]`: history of all attempts on every phase
|
||||
- `inProgress`: true if incomplete (failed phase, retries exhausted)
|
||||
|
||||
## Testing
|
||||
Run:
|
||||
|
||||
```
|
||||
npm test
|
||||
```
|
||||
|
||||
See source: `src/backend/tests/orchestrator.test.ts` for coverage: execution order, retry, checkpoint, resume.
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
import { promises as fs } from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
export type OrchestratorPhase<TInput, TOutput> = {
|
||||
name: string;
|
||||
run: (input: TInput) => Promise<TOutput>;
|
||||
retry?: number; // how many times to retry (default: 1)
|
||||
backoffMs?: number; // ms to wait between retries (default: 0)
|
||||
};
|
||||
|
||||
export type PhaseAttemptResult = {
|
||||
phase: string;
|
||||
status: 'success' | 'failure';
|
||||
attempt: number;
|
||||
timestamp: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type Checkpoint = {
|
||||
currentPhase: number; // index in phases
|
||||
phaseResults: PhaseAttemptResult[];
|
||||
inProgress?: boolean;
|
||||
};
|
||||
|
||||
const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json');
|
||||
|
||||
export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
||||
private phases: OrchestratorPhase<TInput, TOutput>[];
|
||||
private checkpointPath: string;
|
||||
private input: TInput;
|
||||
|
||||
constructor(options: {
|
||||
phases: OrchestratorPhase<TInput, TOutput>[];
|
||||
checkpointPath?: string;
|
||||
input: TInput;
|
||||
}) {
|
||||
this.phases = options.phases;
|
||||
this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE;
|
||||
this.input = options.input;
|
||||
}
|
||||
|
||||
async run(): Promise<Checkpoint> {
|
||||
let checkpoint = await this.loadCheckpoint();
|
||||
if (!checkpoint) {
|
||||
checkpoint = {
|
||||
currentPhase: 0,
|
||||
phaseResults: [],
|
||||
};
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
}
|
||||
for (let i = checkpoint.currentPhase; i < this.phases.length; i++) {
|
||||
const phase = this.phases[i];
|
||||
const resultsForPhase = checkpoint.phaseResults.filter(r => r.phase === phase.name);
|
||||
const maxAttempts = phase.retry ?? 1;
|
||||
const hasSucceeded = resultsForPhase.some(r => r.status === 'success');
|
||||
if (hasSucceeded) {
|
||||
// This phase already succeeded, continue to next phase.
|
||||
continue;
|
||||
}
|
||||
if (!hasSucceeded && resultsForPhase.length >= maxAttempts) {
|
||||
// Phase exhausted all retries with no success. Do not re-attempt.
|
||||
break;
|
||||
}
|
||||
let attempt = resultsForPhase.length + 1; // continue counting if interrupted in the middle
|
||||
const backoffMs = phase.backoffMs ?? 0;
|
||||
let succeeded = false;
|
||||
while (attempt <= maxAttempts && !succeeded) {
|
||||
let error: string | undefined;
|
||||
try {
|
||||
await phase.run(this.input);
|
||||
succeeded = true;
|
||||
} catch (e:any) {
|
||||
error = e?.message || String(e);
|
||||
if (attempt < maxAttempts && backoffMs > 0) {
|
||||
await new Promise(res => setTimeout(res, backoffMs));
|
||||
}
|
||||
}
|
||||
checkpoint.phaseResults.push({
|
||||
phase: phase.name,
|
||||
status: succeeded ? 'success' : 'failure',
|
||||
attempt,
|
||||
timestamp: new Date().toISOString(),
|
||||
error: succeeded ? undefined : error,
|
||||
});
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
if (succeeded) {
|
||||
checkpoint.currentPhase = i + 1;
|
||||
break;
|
||||
}
|
||||
attempt++;
|
||||
}
|
||||
if (!succeeded) {
|
||||
// Phase exhausted all retries. Stop orchestrator.
|
||||
break;
|
||||
}
|
||||
}
|
||||
checkpoint.inProgress = checkpoint.currentPhase < this.phases.length;
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
async resume(): Promise<Checkpoint> {
|
||||
// Alias for run() for clarity
|
||||
return this.run();
|
||||
}
|
||||
|
||||
async loadCheckpoint(): Promise<Checkpoint | null> {
|
||||
try {
|
||||
const data = await fs.readFile(this.checkpointPath, 'utf8');
|
||||
return JSON.parse(data);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async saveCheckpoint(checkpoint: Checkpoint): Promise<void> {
|
||||
await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true });
|
||||
await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8');
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,100 @@
|
|||
import { SequentialOrchestrator } from '../services/SequentialOrchestrator';
|
||||
import { promises as fs } from 'fs';
|
||||
import path from 'path';
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json');
|
||||
|
||||
async function cleanCheckpoint() {
|
||||
try {
|
||||
await fs.unlink(tempCheckpoint);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
describe('SequentialOrchestrator', () => {
|
||||
beforeEach(async () => {
|
||||
await cleanCheckpoint();
|
||||
});
|
||||
|
||||
it('executes all phases in order without retries', async () => {
|
||||
const calls: string[] = [];
|
||||
const orchestrator = new SequentialOrchestrator({
|
||||
phases: [
|
||||
{ name: 'phase1', run: async () => { calls.push('1'); } },
|
||||
{ name: 'phase2', run: async () => { calls.push('2'); } },
|
||||
],
|
||||
checkpointPath: tempCheckpoint,
|
||||
input: undefined,
|
||||
});
|
||||
const cp = await orchestrator.run();
|
||||
expect(calls).toEqual(['1','2']);
|
||||
expect(cp.phaseResults.filter(r => r.status==='success').length).toBe(2);
|
||||
expect(cp.currentPhase).toBe(2);
|
||||
expect(cp.inProgress).toBe(false);
|
||||
});
|
||||
|
||||
it('retries a failing phase and then succeeds', async () => {
|
||||
let tries = 0;
|
||||
const calls: string[] = [];
|
||||
const orchestrator = new SequentialOrchestrator({
|
||||
phases: [
|
||||
{ name: 'phase1', run: async () => { calls.push('1'); } },
|
||||
{ name: 'retry-phase', run: async () => { tries++; calls.push('r'); if (tries < 3) throw new Error('fail'); }, retry: 3, backoffMs: 10 },
|
||||
{ name: 'phase2', run: async () => { calls.push('2'); } },
|
||||
],
|
||||
checkpointPath: tempCheckpoint,
|
||||
input: undefined
|
||||
});
|
||||
const cp = await orchestrator.run();
|
||||
expect(calls).toEqual(['1','r','r','r','2']);
|
||||
expect(cp.phaseResults.filter(r=>r.phase==='retry-phase').length).toBe(3);
|
||||
expect(cp.phaseResults.find(r=>r.phase==='retry-phase' && r.status==='success')).not.toBeNull();
|
||||
expect(cp.currentPhase).toBe(3);
|
||||
expect(cp.inProgress).toBe(false);
|
||||
});
|
||||
|
||||
it('persists checkpoint after phase, can resume mid-run after crash', async () => {
|
||||
let fired = false;
|
||||
const orchestrator = new SequentialOrchestrator({
|
||||
phases: [
|
||||
{ name: 'p1', run: async () => {} },
|
||||
{ name: 'p2', run: async () => { if (!fired) {fired=true; throw new Error('fail');} } },
|
||||
{ name: 'p3', run: async () => {} },
|
||||
],
|
||||
checkpointPath: tempCheckpoint,
|
||||
input: undefined
|
||||
});
|
||||
// First run to fail at p2
|
||||
let cp = await orchestrator.run();
|
||||
expect(cp.phaseResults.filter(r=>r.phase==='p1' && r.status==='success').length).toBe(1);
|
||||
expect(cp.phaseResults.filter(r=>r.phase==='p2').length).toBe(1);
|
||||
expect(cp.inProgress).toBe(true);
|
||||
// Simulate restart -- resume should continue from failed phase
|
||||
fired = true; // Next attempt will succeed
|
||||
cp = await orchestrator.resume();
|
||||
expect(cp.phaseResults.filter(r=>r.phase==='p2').length).toBe(2);
|
||||
expect(cp.phaseResults.some(r=>r.phase==='p3' && r.status==='success')).toBe(true);
|
||||
expect(cp.inProgress).toBe(false);
|
||||
});
|
||||
|
||||
it('stops after exhausting retries for a phase', async () => {
|
||||
const orchestrator = new SequentialOrchestrator({
|
||||
phases: [
|
||||
{ name: 'a', run: async () => {} },
|
||||
{ name: 'b', run: async () => { throw new Error('fail!'); }, retry: 2 },
|
||||
{ name: 'c', run: async () => {} },
|
||||
],
|
||||
checkpointPath: tempCheckpoint,
|
||||
input: undefined
|
||||
});
|
||||
const cp = await orchestrator.run();
|
||||
expect(cp.phaseResults.filter(r=>r.phase==='b').length).toBe(2);
|
||||
expect(cp.currentPhase).toBe(1); // Should remain at failed phase index after retries exhausted
|
||||
expect(cp.inProgress).toBe(true);
|
||||
// Resume should NOT re-attempt phase b, since it exhausted retries
|
||||
const cp2 = await orchestrator.resume();
|
||||
expect(cp2.phaseResults.filter(r=>r.phase==='b').length).toBe(2);
|
||||
expect(cp2.currentPhase).toBe(1);
|
||||
expect(cp2.inProgress).toBe(true);
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue