recipe-manager/src/backend/services/SequentialOrchestrator.ts

299 lines
10 KiB
TypeScript

import { promises as fs } from 'fs';
import path from 'path';
import { logPhaseProgress } from './PhaseProgressLogger.js';
import { WorkflowStatusManager } from './WorkflowStatusManager.js';
import type { WorkflowStatus } from './WorkflowStatusManager.js';
import { appendPhaseUpdate } from './PhaseUpdateQueue.js';
export type WorkflowContext = {
runId: string;
startedAt: string;
metadata?: Record<string, unknown>;
[key: string]: unknown;
};
export type WorkflowStageResult = {
summary?: string;
metadata?: Record<string, unknown>;
nextAction?: string;
};
export type WorkflowStage = {
id: string;
name: string;
execute: (context: WorkflowContext) => Promise<WorkflowStageResult | void>;
retry?: number;
backoffMs?: number;
};
export type OrchestratorPhase<TInput, TOutput> = {
name: string;
run: (input: TInput) => Promise<TOutput>;
retry?: number;
backoffMs?: number;
nextAction?: string;
};
export type PhaseAttemptResult = {
phase: string;
status: 'success' | 'failure';
attempt: number;
timestamp: string;
error?: string;
failureReason?: string;
nextAction: string;
};
export type Checkpoint = {
currentPhase: number;
phaseResults: PhaseAttemptResult[];
inProgress?: boolean;
context?: WorkflowContext;
};
const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json');
const DEFAULT_STATUS_FILE = path.join(process.cwd(), 'status/workflow-status.json');
const DEFAULT_PROGRESS_LOG = path.join(process.cwd(), 'status/phase-progress.jsonl');
export class SequentialOrchestrator<TInput = any, TOutput = any> {
private phases?: OrchestratorPhase<TInput, TOutput>[];
private input?: TInput;
private checkpointPath: string;
private maxAttemptsPerPhasePerRun: number;
private maxRetriesPerPhase?: number;
private statusManager: WorkflowStatusManager;
private progressLogPath: string;
constructor(options: {
phases?: OrchestratorPhase<TInput, TOutput>[];
checkpointPath?: string;
input?: TInput;
maxAttemptsPerPhasePerRun?: number;
maxRetriesPerPhase?: number;
statusManager?: WorkflowStatusManager;
statusFilePath?: string;
progressLogPath?: string;
} = {}) {
this.phases = options.phases;
this.input = options.input;
this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE;
this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity;
this.maxRetriesPerPhase = options.maxRetriesPerPhase;
this.statusManager = options.statusManager ?? new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE);
this.progressLogPath = options.progressLogPath ?? DEFAULT_PROGRESS_LOG;
}
private async writeStatus(
stages: WorkflowStage[],
state: Partial<WorkflowStatus>,
checkpoint: Checkpoint,
extra: { currentPhaseIdx?: number | null; phaseResult?: PhaseAttemptResult } = {}
) {
const completedPhases = stages
.slice(0, checkpoint.currentPhase)
.map((p) => p.name)
.filter((phaseName) => checkpoint.phaseResults.some((r) => r.phase === phaseName && r.status === 'success'));
const currentP = typeof extra.currentPhaseIdx === 'number' && stages[extra.currentPhaseIdx]?.name
? stages[extra.currentPhaseIdx].name
: null;
let overallStatus = state.overallStatus || 'idle';
if (extra.phaseResult && extra.phaseResult.status === 'failure') {
overallStatus = 'failed';
} else if (checkpoint.currentPhase === stages.length) {
overallStatus = 'completed';
}
const lastFailureReason = (state.lastFailureReason === undefined && extra.phaseResult?.failureReason)
? extra.phaseResult.failureReason || null
: (state.lastFailureReason !== undefined ? state.lastFailureReason : null);
const workflowStatus: WorkflowStatus = {
currentPhase: currentP,
overallStatus,
lastUpdated: new Date().toISOString(),
lastFailureReason,
nextAction: state.nextAction || extra.phaseResult?.nextAction || '',
completedPhases,
};
await this.statusManager.update(workflowStatus);
}
private async loadCheckpoint(): Promise<Checkpoint | null> {
try {
const txt = await fs.readFile(this.checkpointPath, 'utf8');
return JSON.parse(txt) as Checkpoint;
} catch {
return null;
}
}
private async saveCheckpoint(checkpoint: Checkpoint) {
await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true });
await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8');
}
private toStagesFromLegacyPhases(phases: OrchestratorPhase<TInput, TOutput>[], input: TInput): WorkflowStage[] {
return phases.map((phase, idx) => ({
id: `legacy-${idx + 1}`,
name: phase.name,
retry: phase.retry,
backoffMs: phase.backoffMs,
execute: async () => {
await phase.run(input);
return { nextAction: phase.nextAction ?? 'proceed' };
},
}));
}
private async executeStages(stages: WorkflowStage[], context: WorkflowContext, resetCheckpoint: boolean): Promise<void> {
let checkpoint = await this.loadCheckpoint();
if (!checkpoint || resetCheckpoint) {
checkpoint = { currentPhase: 0, phaseResults: [], inProgress: true, context };
await this.saveCheckpoint(checkpoint);
await this.writeStatus(stages, { overallStatus: 'idle' }, checkpoint, { currentPhaseIdx: 0 });
} else if (!checkpoint.context) {
checkpoint.context = context;
await this.saveCheckpoint(checkpoint);
}
while (checkpoint.currentPhase < stages.length) {
const stage = stages[checkpoint.currentPhase];
let success = false;
let attempt = 1;
const stageRetries = typeof stage.retry === 'number' ? stage.retry : (this.maxRetriesPerPhase ?? 1);
const maxAttemptsThisRun = Number.isFinite(this.maxAttemptsPerPhasePerRun)
? Math.min(stageRetries, this.maxAttemptsPerPhasePerRun)
: stageRetries;
await appendPhaseUpdate({
eventType: 'phase_started',
phase: stage.name,
summary: `Phase '${stage.name}' started`,
details: { attempt },
});
for (; attempt <= maxAttemptsThisRun && !success; attempt += 1) {
try {
const result = await stage.execute(checkpoint.context ?? context);
const nextAction = result?.nextAction ?? 'proceed';
const res: PhaseAttemptResult = {
phase: stage.name,
status: 'success',
attempt,
timestamp: new Date().toISOString(),
nextAction,
};
checkpoint.phaseResults.push(res);
checkpoint.currentPhase += 1;
checkpoint.inProgress = checkpoint.currentPhase < stages.length;
await this.saveCheckpoint(checkpoint);
await logPhaseProgress({
phase: res.phase,
status: res.status,
attempt: res.attempt,
timestamp: res.timestamp,
nextAction: res.nextAction,
}, this.progressLogPath);
await appendPhaseUpdate({
eventType: 'phase_succeeded',
phase: stage.name,
summary: `Phase '${stage.name}' succeeded`,
details: { attempt },
});
success = true;
await this.writeStatus(
stages,
{ overallStatus: checkpoint.currentPhase < stages.length ? 'running' : 'completed', nextAction },
checkpoint,
{ currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }
);
} catch (err: any) {
const isLastAttempt = attempt === stageRetries;
const message = err && err.message ? err.message : String(err);
const failRes: PhaseAttemptResult = {
phase: stage.name,
status: 'failure',
attempt,
timestamp: new Date().toISOString(),
error: message,
failureReason: message,
nextAction: !isLastAttempt ? 'retry' : 'manual intervention',
};
checkpoint.phaseResults.push(failRes);
await logPhaseProgress({
phase: failRes.phase,
status: failRes.status,
attempt: failRes.attempt,
timestamp: failRes.timestamp,
failureReason: failRes.failureReason,
nextAction: failRes.nextAction,
}, this.progressLogPath);
if (!isLastAttempt && stage.backoffMs && stage.backoffMs > 0) {
await new Promise((resolve) => setTimeout(resolve, stage.backoffMs));
}
if (isLastAttempt) {
checkpoint.inProgress = false;
await appendPhaseUpdate({
eventType: 'phase_failed',
phase: stage.name,
summary: `Phase '${stage.name}' failed`,
details: { attempt, error: message },
});
await this.saveCheckpoint(checkpoint);
await this.writeStatus(
stages,
{ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction },
checkpoint,
{ currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes }
);
return;
}
}
}
}
checkpoint.inProgress = false;
await this.saveCheckpoint(checkpoint);
await appendPhaseUpdate({
eventType: 'workflow_completed',
phase: null,
summary: 'Workflow completed successfully',
details: { totalPhases: stages.length },
});
await this.writeStatus(stages, { overallStatus: 'completed', nextAction: 'done' }, checkpoint, { currentPhaseIdx: null });
}
public async start(stages: WorkflowStage[], context: WorkflowContext): Promise<void> {
await this.executeStages(stages, context, true);
}
public async resume(stages: WorkflowStage[], context: WorkflowContext): Promise<void> {
await this.executeStages(stages, context, false);
}
// Backward-compatible API used by existing tests.
public async run(): Promise<void> {
const phases = this.phases ?? [];
const input = this.input as TInput;
const stages = this.toStagesFromLegacyPhases(phases, input);
const context: WorkflowContext = {
runId: `legacy-${new Date().toISOString()}`,
startedAt: new Date().toISOString(),
metadata: { source: 'SequentialOrchestrator.run' },
};
await this.resume(stages, context);
}
}