diff --git a/README.md b/README.md index 94e3cf8..5d9b097 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,15 @@ A modern, self-hosted alternative to services like CopyMeThat. Store, organize, --- +### Backend Orchestrator (Phase Execution Utility) + +A sequential orchestrator utility is available for robust, checkpointed phase execution with per-phase retries. Useful for automation harnesses or recipe import flows needing durable, restart-safe progress tracking. + +- Location: `src/backend/services/SequentialOrchestrator.ts` +- Usage/docs: [`docs/orchestrator.md`](docs/orchestrator.md) + +--- + ## Quick Start ### Prerequisites diff --git a/docs/orchestrator.md b/docs/orchestrator.md new file mode 100644 index 0000000..e9af331 --- /dev/null +++ b/docs/orchestrator.md @@ -0,0 +1,44 @@ +# Orchestrator Usage + +This service provides robust phase-based sequencing with automatic checkpointing and per-phase retry policy. + +## Usage (Programmatic) + +``` +ts +import { SequentialOrchestrator } from './src/backend/services/SequentialOrchestrator'; + +const orchestrator = new SequentialOrchestrator({ + phases: [ + { name: 'one', run: async () => { /* logic */ }, retry: 2, backoffMs: 250 }, + { name: 'two', run: async () => { /* logic */ }}, + // ...more phases + ], + checkpointPath: 'data/orchestrator-checkpoint.json', // optional, default as shown + input: {...}, // input passed to each phase +}); + +await orchestrator.run(); // Runs/resumes from last incomplete phase, checkpointing after each attempt +``` + +## Features +- **Sequential phases**: Executes provided phases in-order. +- **Per-phase retry & backoff**: Configure max attempts and delay for each phase. +- **Checkpointing**: Persists after every attempt (success/failure/attempt #, timestamp, error message if fail). +- **Restart-safe**: Can safely resume after crash/restart, picks up at last incomplete phase. +- **Minimal callable interface**: Import and use from your own services or app code. + +## Checkpoint Schema +See `src/backend/services/SequentialOrchestrator.ts` for full type: +- `currentPhase`: index of next phase to execute +- `phaseResults[]`: history of all attempts on every phase +- `inProgress`: true if incomplete (failed phase, retries exhausted) + +## Testing +Run: + +``` +npm test +``` + +See source: `src/backend/tests/orchestrator.test.ts` for coverage: execution order, retry, checkpoint, resume. diff --git a/src/backend/services/SequentialOrchestrator.ts b/src/backend/services/SequentialOrchestrator.ts new file mode 100644 index 0000000..8d5848b --- /dev/null +++ b/src/backend/services/SequentialOrchestrator.ts @@ -0,0 +1,120 @@ +import { promises as fs } from 'fs'; +import path from 'path'; + +export type OrchestratorPhase = { + name: string; + run: (input: TInput) => Promise; + retry?: number; // how many times to retry (default: 1) + backoffMs?: number; // ms to wait between retries (default: 0) +}; + +export type PhaseAttemptResult = { + phase: string; + status: 'success' | 'failure'; + attempt: number; + timestamp: string; + error?: string; +}; + +export type Checkpoint = { + currentPhase: number; // index in phases + phaseResults: PhaseAttemptResult[]; + inProgress?: boolean; +}; + +const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json'); + +export class SequentialOrchestrator { + private phases: OrchestratorPhase[]; + private checkpointPath: string; + private input: TInput; + + constructor(options: { + phases: OrchestratorPhase[]; + checkpointPath?: string; + input: TInput; + }) { + this.phases = options.phases; + this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE; + this.input = options.input; + } + + async run(): Promise { + let checkpoint = await this.loadCheckpoint(); + if (!checkpoint) { + checkpoint = { + currentPhase: 0, + phaseResults: [], + }; + await this.saveCheckpoint(checkpoint); + } + for (let i = checkpoint.currentPhase; i < this.phases.length; i++) { + const phase = this.phases[i]; + const resultsForPhase = checkpoint.phaseResults.filter(r => r.phase === phase.name); + const maxAttempts = phase.retry ?? 1; + const hasSucceeded = resultsForPhase.some(r => r.status === 'success'); + if (hasSucceeded) { + // This phase already succeeded, continue to next phase. + continue; + } + if (!hasSucceeded && resultsForPhase.length >= maxAttempts) { + // Phase exhausted all retries with no success. Do not re-attempt. + break; + } + let attempt = resultsForPhase.length + 1; // continue counting if interrupted in the middle + const backoffMs = phase.backoffMs ?? 0; + let succeeded = false; + while (attempt <= maxAttempts && !succeeded) { + let error: string | undefined; + try { + await phase.run(this.input); + succeeded = true; + } catch (e:any) { + error = e?.message || String(e); + if (attempt < maxAttempts && backoffMs > 0) { + await new Promise(res => setTimeout(res, backoffMs)); + } + } + checkpoint.phaseResults.push({ + phase: phase.name, + status: succeeded ? 'success' : 'failure', + attempt, + timestamp: new Date().toISOString(), + error: succeeded ? undefined : error, + }); + await this.saveCheckpoint(checkpoint); + if (succeeded) { + checkpoint.currentPhase = i + 1; + break; + } + attempt++; + } + if (!succeeded) { + // Phase exhausted all retries. Stop orchestrator. + break; + } + } + checkpoint.inProgress = checkpoint.currentPhase < this.phases.length; + await this.saveCheckpoint(checkpoint); + return checkpoint; + } + + async resume(): Promise { + // Alias for run() for clarity + return this.run(); + } + + async loadCheckpoint(): Promise { + try { + const data = await fs.readFile(this.checkpointPath, 'utf8'); + return JSON.parse(data); + } catch { + return null; + } + } + + async saveCheckpoint(checkpoint: Checkpoint): Promise { + await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true }); + await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8'); + } +} diff --git a/src/backend/tests/orchestrator.test.ts b/src/backend/tests/orchestrator.test.ts new file mode 100644 index 0000000..1cb31a0 --- /dev/null +++ b/src/backend/tests/orchestrator.test.ts @@ -0,0 +1,100 @@ +import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; +import { promises as fs } from 'fs'; +import path from 'path'; +import { describe, it, expect, beforeEach } from 'vitest'; + +const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); + +async function cleanCheckpoint() { + try { + await fs.unlink(tempCheckpoint); + } catch {} +} + +describe('SequentialOrchestrator', () => { + beforeEach(async () => { + await cleanCheckpoint(); + }); + + it('executes all phases in order without retries', async () => { + const calls: string[] = []; + const orchestrator = new SequentialOrchestrator({ + phases: [ + { name: 'phase1', run: async () => { calls.push('1'); } }, + { name: 'phase2', run: async () => { calls.push('2'); } }, + ], + checkpointPath: tempCheckpoint, + input: undefined, + }); + const cp = await orchestrator.run(); + expect(calls).toEqual(['1','2']); + expect(cp.phaseResults.filter(r => r.status==='success').length).toBe(2); + expect(cp.currentPhase).toBe(2); + expect(cp.inProgress).toBe(false); + }); + + it('retries a failing phase and then succeeds', async () => { + let tries = 0; + const calls: string[] = []; + const orchestrator = new SequentialOrchestrator({ + phases: [ + { name: 'phase1', run: async () => { calls.push('1'); } }, + { name: 'retry-phase', run: async () => { tries++; calls.push('r'); if (tries < 3) throw new Error('fail'); }, retry: 3, backoffMs: 10 }, + { name: 'phase2', run: async () => { calls.push('2'); } }, + ], + checkpointPath: tempCheckpoint, + input: undefined + }); + const cp = await orchestrator.run(); + expect(calls).toEqual(['1','r','r','r','2']); + expect(cp.phaseResults.filter(r=>r.phase==='retry-phase').length).toBe(3); + expect(cp.phaseResults.find(r=>r.phase==='retry-phase' && r.status==='success')).not.toBeNull(); + expect(cp.currentPhase).toBe(3); + expect(cp.inProgress).toBe(false); + }); + + it('persists checkpoint after phase, can resume mid-run after crash', async () => { + let fired = false; + const orchestrator = new SequentialOrchestrator({ + phases: [ + { name: 'p1', run: async () => {} }, + { name: 'p2', run: async () => { if (!fired) {fired=true; throw new Error('fail');} } }, + { name: 'p3', run: async () => {} }, + ], + checkpointPath: tempCheckpoint, + input: undefined + }); + // First run to fail at p2 + let cp = await orchestrator.run(); + expect(cp.phaseResults.filter(r=>r.phase==='p1' && r.status==='success').length).toBe(1); + expect(cp.phaseResults.filter(r=>r.phase==='p2').length).toBe(1); + expect(cp.inProgress).toBe(true); + // Simulate restart -- resume should continue from failed phase + fired = true; // Next attempt will succeed + cp = await orchestrator.resume(); + expect(cp.phaseResults.filter(r=>r.phase==='p2').length).toBe(2); + expect(cp.phaseResults.some(r=>r.phase==='p3' && r.status==='success')).toBe(true); + expect(cp.inProgress).toBe(false); + }); + + it('stops after exhausting retries for a phase', async () => { + const orchestrator = new SequentialOrchestrator({ + phases: [ + { name: 'a', run: async () => {} }, + { name: 'b', run: async () => { throw new Error('fail!'); }, retry: 2 }, + { name: 'c', run: async () => {} }, + ], + checkpointPath: tempCheckpoint, + input: undefined + }); + const cp = await orchestrator.run(); + expect(cp.phaseResults.filter(r=>r.phase==='b').length).toBe(2); + expect(cp.currentPhase).toBe(1); // Should remain at failed phase index after retries exhausted + expect(cp.inProgress).toBe(true); + // Resume should NOT re-attempt phase b, since it exhausted retries + const cp2 = await orchestrator.resume(); + expect(cp2.phaseResults.filter(r=>r.phase==='b').length).toBe(2); + expect(cp2.currentPhase).toBe(1); + expect(cp2.inProgress).toBe(true); + }); +});