fix(workflow): align orchestrator API and workflow runner

This commit is contained in:
Paul Huliganga 2026-03-26 15:44:45 -04:00
parent 476ca0b0c2
commit 0510009597
5 changed files with 437 additions and 184 deletions

View File

@ -9,7 +9,8 @@
"build": "tsc",
"test": "vitest run",
"test:watch": "vitest",
"migrate": "ts-node-esm src/backend/db/migrate.ts"
"migrate": "ts-node-esm src/backend/db/migrate.ts",
"workflow:run": "ts-node scripts/run-workflow.ts"
},
"dependencies": {
"express": "^4.18.2",

View File

@ -1,147 +1,103 @@
#!/usr/bin/env ts-node
import { WorkflowStatusManager } from '../src/backend/services/WorkflowStatusManager';
import { getPendingPhaseUpdates } from '../src/backend/services/PhaseUpdateQueue';
import * as path from 'path';
import { exec as execCb } from 'child_process';
import { promisify } from 'util';
// Configurable recency window in hours for recent commits
type ReportConfig = {
const exec = promisify(execCb);
type CommitInfo = { hash: string; msg: string; date: string };
type MorningReportOptions = {
commitWindowHours?: number;
statusPath?: string;
phaseUpdatesPath?: string;
stalledThresholdMinutes?: number;
now?: Date;
getRecentCommitsFn?: (windowHours: number) => Promise<{hash: string; msg: string; date: string}[]>;
getRecentCommitsFn?: (sinceIso: string) => Promise<CommitInfo[]>;
};
const DEFAULT_STATUS_PATH = path.join(process.cwd(), 'status/workflow-status.json');
const DEFAULT_PHASE_UPDATES_PATH = path.join(process.cwd(), 'status/phase-updates.jsonl');
const DEFAULT_STALLED_THRESHOLD_MINUTES = 60;
export async function getRecentCommits(windowHours: number): Promise<{hash: string, msg: string, date: string}[]> {
const sinceArg = `--since='${windowHours} hours ago'`;
const cmd = `git log --oneline --date=iso --pretty=format:'%h|%s|%cd' ${sinceArg}`;
const { exec } = require('child_process');
return new Promise((resolve) => {
exec(cmd, { cwd: process.cwd() }, (err: any, stdout: string) => {
if (err) return resolve([]);
const lines = stdout.trim().split(/\n/);
const result = lines.filter(Boolean).map((line: string) => {
const [hash, msg, date] = line.split('|');
return {hash, msg, date};
});
resolve(result);
async function getRecentCommits(sinceIso: string): Promise<CommitInfo[]> {
const cmd = `git log --since="${sinceIso}" --pretty=format:"%h|%s|%cI"`;
const { stdout } = await exec(cmd);
if (!stdout.trim()) return [];
return stdout
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean)
.map((line) => {
const [hash, msg, date] = line.split('|');
return { hash, msg, date };
});
});
}
function minutesBetween(a: Date, b: Date): number {
return Math.abs((a.getTime() - b.getTime()) / 60000);
function minutesSince(iso: string, now: Date): number {
return Math.floor((now.getTime() - new Date(iso).getTime()) / 60000);
}
function detectStalled(status: any, thresholdMinutes: number, now: Date): {stalled: boolean; reason?: string; recommend?: string} {
if (!status) return {stalled: false};
if (["completed", "failed", "idle"].includes(status.overallStatus)) return {stalled: false};
const last = new Date(status.lastUpdated);
const mins = minutesBetween(now, last);
if (mins >= thresholdMinutes) {
return {
stalled: true,
reason: `No progress in ${mins.toFixed(0)} minutes (threshold: ${thresholdMinutes}m). Last update: ${last.toISOString()}`,
recommend: status.overallStatus === "blocked" ? "Manual intervention may be required. See blockers below." : "Restart or debug orchestrator."
};
}
return {stalled: false};
}
export async function generateMorningReport(options: MorningReportOptions = {}): Promise<string> {
const commitWindowHours = options.commitWindowHours ?? 24;
const stalledThresholdMinutes = options.stalledThresholdMinutes ?? 60;
const now = options.now ?? new Date();
const getRecentCommitsFn = options.getRecentCommitsFn ?? getRecentCommits;
function extractBlockers(status: any): string[] {
const out: string[] = [];
if (!status) return out;
if (status.overallStatus === 'blocked') {
if (status.lastFailureReason) out.push(`❗ Blocked: ${status.lastFailureReason}`);
else out.push(`❗ Blocked: Reason not specified.`);
}
if (status.overallStatus === 'failed') {
out.push('❌ Workflow failed.');
if (status.lastFailureReason) out.push(`Failure: ${status.lastFailureReason}`);
}
if (status.nextAction && status.nextAction.includes('manual')) {
out.push(`⚠️ Manual intervention required: ${status.nextAction}`);
}
return out;
}
const since = new Date(now.getTime() - commitWindowHours * 60 * 60 * 1000).toISOString();
const commits = await getRecentCommitsFn(since);
async function main(config: ReportConfig = {}) {
const commitWindow = config.commitWindowHours || 24;
const statusPath = config.statusPath || DEFAULT_STATUS_PATH;
const phaseUpdatesPath = config.phaseUpdatesPath || DEFAULT_PHASE_UPDATES_PATH;
const threshold = config.stalledThresholdMinutes || DEFAULT_STALLED_THRESHOLD_MINUTES;
const now = config.now || new Date();
const statusManager = new WorkflowStatusManager();
const status = await statusManager.read();
const pending = await getPendingPhaseUpdates();
// 1. Recent commits
const commits = await (config.getRecentCommitsFn || getRecentCommits)(commitWindow);
// 2. Workflow status
const wsm = new WorkflowStatusManager(statusPath);
const status = await wsm.read();
// 3. Blockers
const blockers = extractBlockers(status);
// 4. Pending phase updates
const pendingUpdates = await getPendingPhaseUpdates();
// 5. Stalled-state detection
const stalled = detectStalled(status, threshold, now);
// Render report
let out = `# 🌅 Morning Workflow Report\n`;
out += `\n## Recent Commits (last ${commitWindow}h)\n`;
if (commits.length > 0) {
let out = '# 🌅 Morning Workflow Report\n\n';
out += `## Recent Commits (last ${commitWindowHours}h)\n`;
if (!commits.length) {
out += '- (No commits in window)\n\n';
} else {
for (const c of commits) {
out += `- \`${c.hash}\` ${c.msg} _(at ${c.date})_\n`;
}
out += '\n';
}
out += '## Workflow Status\n';
if (!status) {
out += '- No workflow status available\n\n';
} else {
out += '(No commits in window)\n';
}
out += `- Current phase: **${status.currentPhase ?? '-'}**\n`;
out += `- State: **${status.overallStatus ?? '-'}**\n`;
out += `- Last updated: ${status.lastUpdated ?? '-'}\n`;
out += `- Completed phases: ${(status.completedPhases ?? []).join(', ') || '-'}\n\n`;
out += `\n## Workflow Status\n`;
if (status) {
out += `- Current phase: **${status.currentPhase || '—'}**\n`;
out += `- State: **${status.overallStatus}**\n`;
out += `- Last updated: ${status.lastUpdated}\n`;
if (status.completedPhases && status.completedPhases.length)
out += `- Completed phases: ${status.completedPhases.join(', ')}\n`;
} else {
out += '- No workflow status available.\n';
}
if (status.overallStatus === 'running' && status.lastUpdated) {
const elapsed = minutesSince(status.lastUpdated, now);
if (elapsed >= stalledThresholdMinutes) {
out += '## ⚠️ Workflow Stalled\n';
out += `- Reason: No progress in ${elapsed} minutes (threshold: ${stalledThresholdMinutes}m). Last update: ${status.lastUpdated}\n`;
out += '- Recommendation: Restart or debug orchestrator.\n\n';
}
}
if (stalled.stalled) {
out += `\n## ⚠️ Workflow Stalled\n- Reason: ${stalled.reason}\n- Recommendation: ${stalled.recommend}\n`;
}
if (blockers.length > 0) {
out += `\n## Blockers\n`;
for (const b of blockers) {
out += `- ${b}\n`;
if (status.lastFailureReason) {
out += '## Blockers\n';
out += `- ❗ ${status.lastFailureReason}\n\n`;
}
}
out += `\n## Pending Phase Updates\n`;
if (pendingUpdates.length > 0) {
for (const e of pendingUpdates) {
out += `- [${e.eventType}] ${e.summary} (phase: ${e.phase}) at ${e.timestamp} [id: ${e.id}]\n`;
}
out += '## Pending Phase Updates\n';
if (!pending.length) {
out += '- All phase updates relayed\n';
} else {
out += '(All phase updates relayed)\n';
for (const p of pending) {
out += `- [${p.eventType}] ${p.summary} (phase: ${p.phase ?? '-'}) at ${p.timestamp} [id: ${p.id}]\n`;
}
}
console.log(out);
return out;
}
// CLI wrapper
if (require.main === module) {
main();
if (import.meta.url === `file://${process.argv[1]}`) {
generateMorningReport().catch((err) => {
console.error('Failed to generate morning report', err);
process.exit(1);
});
}
export default main;
export default generateMorningReport;

190
scripts/run-workflow.ts Normal file
View File

@ -0,0 +1,190 @@
#!/usr/bin/env ts-node
import * as path from 'path';
import { promises as fs } from 'fs';
import { SequentialOrchestrator } from '../src/backend/services/SequentialOrchestrator.ts';
import type { WorkflowStage, WorkflowContext } from '../src/backend/services/SequentialOrchestrator.ts';
import { WorkflowStatusManager } from '../src/backend/services/WorkflowStatusManager.ts';
type RunWorkflowOptions = {
mode?: 'start' | 'resume' | 'restart';
checkpointPath?: string;
statusPath?: string;
progressLogPath?: string;
maxRetriesPerPhase?: number;
now?: Date;
};
type CliArgs = {
mode: 'start' | 'resume' | 'restart';
checkpointPath?: string;
statusPath?: string;
progressLogPath?: string;
maxRetriesPerPhase?: number;
};
const DEFAULT_CHECKPOINT_PATH = path.join(process.cwd(), 'status', 'workflow-checkpoint.json');
const DEFAULT_STATUS_PATH = path.join(process.cwd(), 'status', 'workflow-status.json');
const DEFAULT_PROGRESS_LOG_PATH = path.join(process.cwd(), 'status', 'workflow-progress.jsonl');
function parseCliArgs(argv: string[]): CliArgs {
const out: CliArgs = { mode: 'resume' };
for (let i = 0; i < argv.length; i += 1) {
const arg = argv[i];
const next = argv[i + 1];
if (arg === '--mode' && next) {
if (next === 'start' || next === 'resume' || next === 'restart') {
out.mode = next;
i += 1;
}
continue;
}
if (arg === '--checkpoint' && next) {
out.checkpointPath = next;
i += 1;
continue;
}
if (arg === '--status' && next) {
out.statusPath = next;
i += 1;
continue;
}
if (arg === '--progress-log' && next) {
out.progressLogPath = next;
i += 1;
continue;
}
if (arg === '--max-retries' && next) {
const parsed = Number.parseInt(next, 10);
if (Number.isFinite(parsed) && parsed >= 0) {
out.maxRetriesPerPhase = parsed;
}
i += 1;
continue;
}
}
return out;
}
function createTimestampedSummary(label: string, now: Date): string {
return `${label} @ ${now.toISOString()}`;
}
function createStages(now: Date): WorkflowStage[] {
return [
{
id: 'review-plan',
name: 'Review TODO and roadmap',
execute: async (_context: WorkflowContext) => {
const todoPath = path.join(process.cwd(), 'TODO.md');
const roadmapPath = path.join(process.cwd(), 'ROADMAP.md');
const todo = await fs.readFile(todoPath, 'utf8');
const roadmap = await fs.readFile(roadmapPath, 'utf8');
const todoLines = todo.split(/\r?\n/).length;
const roadmapLines = roadmap.split(/\r?\n/).length;
return {
summary: createTimestampedSummary(`Loaded planning docs (TODO: ${todoLines} lines, ROADMAP: ${roadmapLines} lines)`, now),
metadata: { todoLines, roadmapLines },
nextAction: 'Proceed to implementation stage.',
};
},
},
{
id: 'verify-build',
name: 'Verify TypeScript build',
execute: async (_context: WorkflowContext) => {
// Lightweight validation step to keep workflow deterministic in tests.
const packageJsonPath = path.join(process.cwd(), 'package.json');
const pkgRaw = await fs.readFile(packageJsonPath, 'utf8');
const pkg = JSON.parse(pkgRaw) as { scripts?: Record<string, string> };
const hasBuildScript = Boolean(pkg.scripts?.build);
if (!hasBuildScript) {
throw new Error('Missing required npm script: build');
}
return {
summary: createTimestampedSummary('Validated project scripts and build entrypoint', now),
metadata: { hasBuildScript },
nextAction: 'Proceed to tests stage.',
};
},
},
{
id: 'verify-tests',
name: 'Verify test command availability',
execute: async (_context: WorkflowContext) => {
const packageJsonPath = path.join(process.cwd(), 'package.json');
const pkgRaw = await fs.readFile(packageJsonPath, 'utf8');
const pkg = JSON.parse(pkgRaw) as { scripts?: Record<string, string> };
const hasTestScript = Boolean(pkg.scripts?.test);
if (!hasTestScript) {
throw new Error('Missing required npm script: test');
}
return {
summary: createTimestampedSummary('Validated test script presence', now),
metadata: { hasTestScript },
nextAction: 'Workflow checks complete.',
};
},
},
];
}
export async function runWorkflow(options: RunWorkflowOptions = {}): Promise<void> {
const checkpointPath = options.checkpointPath ?? DEFAULT_CHECKPOINT_PATH;
const statusPath = options.statusPath ?? DEFAULT_STATUS_PATH;
const progressLogPath = options.progressLogPath ?? DEFAULT_PROGRESS_LOG_PATH;
const mode = options.mode ?? 'resume';
const maxRetriesPerPhase = options.maxRetriesPerPhase ?? 1;
const now = options.now ?? new Date();
const workflow = new SequentialOrchestrator({
checkpointPath,
maxRetriesPerPhase,
statusManager: new WorkflowStatusManager(statusPath),
progressLogPath,
});
const initialContext: WorkflowContext = {
runId: `workflow-${now.toISOString()}`,
startedAt: now.toISOString(),
metadata: {
initiatedBy: 'scripts/run-workflow.ts',
mode,
},
};
const stages = createStages(now);
if (mode === 'restart' || mode === 'start') {
await workflow.start(stages, initialContext);
return;
}
await workflow.resume(stages, initialContext);
}
if (import.meta.url === `file://${process.argv[1]}`) {
const args = parseCliArgs(process.argv.slice(2));
runWorkflow({
mode: args.mode,
checkpointPath: args.checkpointPath,
statusPath: args.statusPath,
progressLogPath: args.progressLogPath,
maxRetriesPerPhase: args.maxRetriesPerPhase,
}).catch((error) => {
const message = error instanceof Error ? error.message : String(error);
console.error(`[workflow:run] failed: ${message}`);
process.exitCode = 1;
});
}

View File

@ -12,17 +12,17 @@ export type PhaseProgressLogEntry = {
const PHASE_PROGRESS_LOG = path.join(process.cwd(), 'status/phase-progress.jsonl');
export async function logPhaseProgress(entry: PhaseProgressLogEntry): Promise<void> {
await fs.mkdir(path.dirname(PHASE_PROGRESS_LOG), { recursive: true });
await fs.appendFile(PHASE_PROGRESS_LOG, JSON.stringify(entry) + '\n', 'utf8');
export async function logPhaseProgress(entry: PhaseProgressLogEntry, logPath = PHASE_PROGRESS_LOG): Promise<void> {
await fs.mkdir(path.dirname(logPath), { recursive: true });
await fs.appendFile(logPath, JSON.stringify(entry) + '\n', 'utf8');
}
export async function getRecentPhaseProgress(limit = 20): Promise<PhaseProgressLogEntry[]> {
export async function getRecentPhaseProgress(limit = 20, logPath = PHASE_PROGRESS_LOG): Promise<PhaseProgressLogEntry[]> {
try {
const data = await fs.readFile(PHASE_PROGRESS_LOG, 'utf8');
const data = await fs.readFile(logPath, 'utf8');
const lines = data.trim().split(/\r?\n/).filter(Boolean);
return lines.slice(-limit).map(line => JSON.parse(line));
} catch (err) {
return lines.slice(-limit).map((line) => JSON.parse(line));
} catch {
return [];
}
}

View File

@ -1,8 +1,30 @@
import { promises as fs } from 'fs';
import path from 'path';
import { logPhaseProgress } from './PhaseProgressLogger';
import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager';
import { appendPhaseUpdate } from './PhaseUpdateQueue';
import { logPhaseProgress } from './PhaseProgressLogger.ts';
import { WorkflowStatusManager } from './WorkflowStatusManager.ts';
import type { WorkflowStatus } from './WorkflowStatusManager.ts';
import { appendPhaseUpdate } from './PhaseUpdateQueue.ts';
export type WorkflowContext = {
runId: string;
startedAt: string;
metadata?: Record<string, unknown>;
[key: string]: unknown;
};
export type WorkflowStageResult = {
summary?: string;
metadata?: Record<string, unknown>;
nextAction?: string;
};
export type WorkflowStage = {
id: string;
name: string;
execute: (context: WorkflowContext) => Promise<WorkflowStageResult | void>;
retry?: number;
backoffMs?: number;
};
export type OrchestratorPhase<TInput, TOutput> = {
name: string;
@ -11,6 +33,7 @@ export type OrchestratorPhase<TInput, TOutput> = {
backoffMs?: number;
nextAction?: string;
};
export type PhaseAttemptResult = {
phase: string;
status: 'success' | 'failure';
@ -20,58 +43,72 @@ export type PhaseAttemptResult = {
failureReason?: string;
nextAction: string;
};
export type Checkpoint = {
currentPhase: number;
phaseResults: PhaseAttemptResult[];
inProgress?: boolean;
context?: WorkflowContext;
};
const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json');
const DEFAULT_STATUS_FILE = path.join(process.cwd(), 'status/workflow-status.json');
const DEFAULT_PROGRESS_LOG = path.join(process.cwd(), 'status/phase-progress.jsonl');
export class SequentialOrchestrator<TInput = any, TOutput = any> {
private phases: OrchestratorPhase<TInput, TOutput>[];
private phases?: OrchestratorPhase<TInput, TOutput>[];
private input?: TInput;
private checkpointPath: string;
private input: TInput;
private maxAttemptsPerPhasePerRun: number;
private maxRetriesPerPhase?: number;
private statusManager: WorkflowStatusManager;
private progressLogPath: string;
constructor(options: {
phases: OrchestratorPhase<TInput, TOutput>[];
phases?: OrchestratorPhase<TInput, TOutput>[];
checkpointPath?: string;
input: TInput;
input?: TInput;
maxAttemptsPerPhasePerRun?: number;
maxRetriesPerPhase?: number;
statusManager?: WorkflowStatusManager;
statusFilePath?: string;
}) {
progressLogPath?: string;
} = {}) {
this.phases = options.phases;
this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE;
this.input = options.input;
this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE;
this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity;
this.statusManager = new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE);
this.maxRetriesPerPhase = options.maxRetriesPerPhase;
this.statusManager = options.statusManager ?? new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE);
this.progressLogPath = options.progressLogPath ?? DEFAULT_PROGRESS_LOG;
}
private async writeStatus(
stages: WorkflowStage[],
state: Partial<WorkflowStatus>,
extra: { currentPhaseIdx?: number; phaseResult?: PhaseAttemptResult } = {}
checkpoint: Checkpoint,
extra: { currentPhaseIdx?: number | null; phaseResult?: PhaseAttemptResult } = {}
) {
const checkpoint = await this.loadCheckpoint();
let completedPhases: string[] = [];
if (checkpoint) {
completedPhases = this.phases
.slice(0, checkpoint.currentPhase)
.map(p => p.name)
.filter(phaseName => checkpoint.phaseResults.some(r => r.phase === phaseName && r.status === 'success'));
}
const currentP = typeof extra.currentPhaseIdx === 'number' && this.phases[extra.currentPhaseIdx]?.name
? this.phases[extra.currentPhaseIdx].name
const completedPhases = stages
.slice(0, checkpoint.currentPhase)
.map((p) => p.name)
.filter((phaseName) => checkpoint.phaseResults.some((r) => r.phase === phaseName && r.status === 'success'));
const currentP = typeof extra.currentPhaseIdx === 'number' && stages[extra.currentPhaseIdx]?.name
? stages[extra.currentPhaseIdx].name
: null;
let overallStatus = state.overallStatus || 'idle';
if (extra.phaseResult && extra.phaseResult.status === 'failure') {
overallStatus = 'failed';
} else if (checkpoint && checkpoint.currentPhase === this.phases.length) {
} else if (checkpoint.currentPhase === stages.length) {
overallStatus = 'completed';
}
const lastFailureReason = (state.lastFailureReason === undefined && extra.phaseResult?.failureReason)
? extra.phaseResult?.failureReason || null
? extra.phaseResult.failureReason || null
: (state.lastFailureReason !== undefined ? state.lastFailureReason : null);
const workflowStatus: WorkflowStatus = {
currentPhase: currentP,
overallStatus,
@ -80,8 +117,10 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
nextAction: state.nextAction || extra.phaseResult?.nextAction || '',
completedPhases,
};
await this.statusManager.update(workflowStatus);
}
private async loadCheckpoint(): Promise<Checkpoint | null> {
try {
const txt = await fs.readFile(this.checkpointPath, 'utf8');
@ -90,72 +129,107 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
return null;
}
}
private async saveCheckpoint(checkpoint: Checkpoint) {
await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true });
await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8');
}
public async run(): Promise<void> {
private toStagesFromLegacyPhases(phases: OrchestratorPhase<TInput, TOutput>[], input: TInput): WorkflowStage[] {
return phases.map((phase, idx) => ({
id: `legacy-${idx + 1}`,
name: phase.name,
retry: phase.retry,
backoffMs: phase.backoffMs,
execute: async () => {
await phase.run(input);
return { nextAction: phase.nextAction ?? 'proceed' };
},
}));
}
private async executeStages(stages: WorkflowStage[], context: WorkflowContext, resetCheckpoint: boolean): Promise<void> {
let checkpoint = await this.loadCheckpoint();
if (!checkpoint) {
checkpoint = { currentPhase: 0, phaseResults: [] };
if (!checkpoint || resetCheckpoint) {
checkpoint = { currentPhase: 0, phaseResults: [], inProgress: true, context };
await this.saveCheckpoint(checkpoint);
await this.writeStatus(stages, { overallStatus: 'idle' }, checkpoint, { currentPhaseIdx: 0 });
} else if (!checkpoint.context) {
checkpoint.context = context;
await this.saveCheckpoint(checkpoint);
await this.writeStatus({ overallStatus: 'idle' }, { currentPhaseIdx: 0 });
}
while (checkpoint.currentPhase < this.phases.length) {
const phase = this.phases[checkpoint.currentPhase];
while (checkpoint.currentPhase < stages.length) {
const stage = stages[checkpoint.currentPhase];
let success = false;
let attempt = 1;
const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1;
const stageRetries = typeof stage.retry === 'number' ? stage.retry : (this.maxRetriesPerPhase ?? 1);
const maxAttemptsThisRun = Number.isFinite(this.maxAttemptsPerPhasePerRun)
? Math.min(maxRetries, this.maxAttemptsPerPhasePerRun)
: maxRetries;
? Math.min(stageRetries, this.maxAttemptsPerPhasePerRun)
: stageRetries;
await appendPhaseUpdate({
eventType: 'phase_started',
phase: phase.name,
summary: `Phase '${phase.name}' started`,
phase: stage.name,
summary: `Phase '${stage.name}' started`,
details: { attempt },
});
for (; attempt <= maxAttemptsThisRun && !success; attempt++) {
for (; attempt <= maxAttemptsThisRun && !success; attempt += 1) {
try {
await phase.run(this.input);
const result = await stage.execute(checkpoint.context ?? context);
const nextAction = result?.nextAction ?? 'proceed';
const res: PhaseAttemptResult = {
phase: phase.name,
phase: stage.name,
status: 'success',
attempt,
timestamp: new Date().toISOString(),
nextAction: 'proceed',
nextAction,
};
checkpoint.phaseResults.push(res);
checkpoint.currentPhase += 1;
checkpoint.inProgress = checkpoint.currentPhase < stages.length;
await this.saveCheckpoint(checkpoint);
await logPhaseProgress({
phase: res.phase,
status: res.status,
attempt: res.attempt,
timestamp: res.timestamp,
nextAction: res.nextAction,
});
}, this.progressLogPath);
await appendPhaseUpdate({
eventType: 'phase_succeeded',
phase: phase.name,
summary: `Phase '${phase.name}' succeeded`,
phase: stage.name,
summary: `Phase '${stage.name}' succeeded`,
details: { attempt },
});
success = true;
await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res });
await this.writeStatus(
stages,
{ overallStatus: checkpoint.currentPhase < stages.length ? 'running' : 'completed', nextAction },
checkpoint,
{ currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }
);
} catch (err: any) {
const isLastAttempt = attempt === maxRetries;
// Always log every failed attempt
const isLastAttempt = attempt === stageRetries;
const message = err && err.message ? err.message : String(err);
const failRes: PhaseAttemptResult = {
phase: phase.name,
phase: stage.name,
status: 'failure',
attempt,
timestamp: new Date().toISOString(),
error: err && err.message ? err.message : String(err),
failureReason: err && err.message ? err.message : String(err),
error: message,
failureReason: message,
nextAction: !isLastAttempt ? 'retry' : 'manual intervention',
};
checkpoint.phaseResults.push(failRes);
await logPhaseProgress({
phase: failRes.phase,
status: failRes.status,
@ -163,30 +237,62 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
timestamp: failRes.timestamp,
failureReason: failRes.failureReason,
nextAction: failRes.nextAction,
});
}, this.progressLogPath);
if (!isLastAttempt && stage.backoffMs && stage.backoffMs > 0) {
await new Promise((resolve) => setTimeout(resolve, stage.backoffMs));
}
if (isLastAttempt) {
checkpoint.inProgress = false;
await appendPhaseUpdate({
eventType: 'phase_failed',
phase: phase.name,
summary: `Phase '${phase.name}' failed`,
details: { attempt, error: err && err.message ? err.message : String(err) },
phase: stage.name,
summary: `Phase '${stage.name}' failed`,
details: { attempt, error: message },
});
await this.saveCheckpoint(checkpoint);
await this.writeStatus({ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes });
await this.writeStatus(
stages,
{ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction },
checkpoint,
{ currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes }
);
return;
}
}
}
}
if (checkpoint.currentPhase === this.phases.length) {
await this.saveCheckpoint(checkpoint);
await appendPhaseUpdate({
eventType: 'workflow_completed',
phase: null,
summary: `Workflow completed successfully`,
details: { totalPhases: this.phases.length },
});
await this.writeStatus({ overallStatus: 'completed', nextAction: 'done' }, { currentPhaseIdx: null });
}
checkpoint.inProgress = false;
await this.saveCheckpoint(checkpoint);
await appendPhaseUpdate({
eventType: 'workflow_completed',
phase: null,
summary: 'Workflow completed successfully',
details: { totalPhases: stages.length },
});
await this.writeStatus(stages, { overallStatus: 'completed', nextAction: 'done' }, checkpoint, { currentPhaseIdx: null });
}
public async start(stages: WorkflowStage[], context: WorkflowContext): Promise<void> {
await this.executeStages(stages, context, true);
}
public async resume(stages: WorkflowStage[], context: WorkflowContext): Promise<void> {
await this.executeStages(stages, context, false);
}
// Backward-compatible API used by existing tests.
public async run(): Promise<void> {
const phases = this.phases ?? [];
const input = this.input as TInput;
const stages = this.toStagesFromLegacyPhases(phases, input);
const context: WorkflowContext = {
runId: `legacy-${new Date().toISOString()}`,
startedAt: new Date().toISOString(),
metadata: { source: 'SequentialOrchestrator.run' },
};
await this.resume(stages, context);
}
}