From 0510009597bda31faa64bb2a99bbe3041a9c7c69 Mon Sep 17 00:00:00 2001 From: Paul Huliganga Date: Thu, 26 Mar 2026 15:44:45 -0400 Subject: [PATCH] fix(workflow): align orchestrator API and workflow runner --- package.json | 3 +- scripts/morning-report.ts | 184 ++++++-------- scripts/run-workflow.ts | 190 +++++++++++++++ src/backend/services/PhaseProgressLogger.ts | 14 +- .../services/SequentialOrchestrator.ts | 230 +++++++++++++----- 5 files changed, 437 insertions(+), 184 deletions(-) create mode 100644 scripts/run-workflow.ts diff --git a/package.json b/package.json index 615cea4..b3cbab2 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ "build": "tsc", "test": "vitest run", "test:watch": "vitest", - "migrate": "ts-node-esm src/backend/db/migrate.ts" + "migrate": "ts-node-esm src/backend/db/migrate.ts", + "workflow:run": "ts-node scripts/run-workflow.ts" }, "dependencies": { "express": "^4.18.2", diff --git a/scripts/morning-report.ts b/scripts/morning-report.ts index 096de98..2e6c8cc 100644 --- a/scripts/morning-report.ts +++ b/scripts/morning-report.ts @@ -1,147 +1,103 @@ -#!/usr/bin/env ts-node import { WorkflowStatusManager } from '../src/backend/services/WorkflowStatusManager'; import { getPendingPhaseUpdates } from '../src/backend/services/PhaseUpdateQueue'; -import * as path from 'path'; +import { exec as execCb } from 'child_process'; +import { promisify } from 'util'; -// Configurable recency window in hours for recent commits -type ReportConfig = { +const exec = promisify(execCb); + +type CommitInfo = { hash: string; msg: string; date: string }; + +type MorningReportOptions = { commitWindowHours?: number; - statusPath?: string; - phaseUpdatesPath?: string; stalledThresholdMinutes?: number; now?: Date; - getRecentCommitsFn?: (windowHours: number) => Promise<{hash: string; msg: string; date: string}[]>; + getRecentCommitsFn?: (sinceIso: string) => Promise; }; -const DEFAULT_STATUS_PATH = path.join(process.cwd(), 'status/workflow-status.json'); -const DEFAULT_PHASE_UPDATES_PATH = path.join(process.cwd(), 'status/phase-updates.jsonl'); -const DEFAULT_STALLED_THRESHOLD_MINUTES = 60; - -export async function getRecentCommits(windowHours: number): Promise<{hash: string, msg: string, date: string}[]> { - const sinceArg = `--since='${windowHours} hours ago'`; - const cmd = `git log --oneline --date=iso --pretty=format:'%h|%s|%cd' ${sinceArg}`; - const { exec } = require('child_process'); - return new Promise((resolve) => { - exec(cmd, { cwd: process.cwd() }, (err: any, stdout: string) => { - if (err) return resolve([]); - const lines = stdout.trim().split(/\n/); - const result = lines.filter(Boolean).map((line: string) => { - const [hash, msg, date] = line.split('|'); - return {hash, msg, date}; - }); - resolve(result); +async function getRecentCommits(sinceIso: string): Promise { + const cmd = `git log --since="${sinceIso}" --pretty=format:"%h|%s|%cI"`; + const { stdout } = await exec(cmd); + if (!stdout.trim()) return []; + return stdout + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => { + const [hash, msg, date] = line.split('|'); + return { hash, msg, date }; }); - }); } -function minutesBetween(a: Date, b: Date): number { - return Math.abs((a.getTime() - b.getTime()) / 60000); +function minutesSince(iso: string, now: Date): number { + return Math.floor((now.getTime() - new Date(iso).getTime()) / 60000); } -function detectStalled(status: any, thresholdMinutes: number, now: Date): {stalled: boolean; reason?: string; recommend?: string} { - if (!status) return {stalled: false}; - if (["completed", "failed", "idle"].includes(status.overallStatus)) return {stalled: false}; - const last = new Date(status.lastUpdated); - const mins = minutesBetween(now, last); - if (mins >= thresholdMinutes) { - return { - stalled: true, - reason: `No progress in ${mins.toFixed(0)} minutes (threshold: ${thresholdMinutes}m). Last update: ${last.toISOString()}`, - recommend: status.overallStatus === "blocked" ? "Manual intervention may be required. See blockers below." : "Restart or debug orchestrator." - }; - } - return {stalled: false}; -} +export async function generateMorningReport(options: MorningReportOptions = {}): Promise { + const commitWindowHours = options.commitWindowHours ?? 24; + const stalledThresholdMinutes = options.stalledThresholdMinutes ?? 60; + const now = options.now ?? new Date(); + const getRecentCommitsFn = options.getRecentCommitsFn ?? getRecentCommits; -function extractBlockers(status: any): string[] { - const out: string[] = []; - if (!status) return out; - if (status.overallStatus === 'blocked') { - if (status.lastFailureReason) out.push(`❗ Blocked: ${status.lastFailureReason}`); - else out.push(`❗ Blocked: Reason not specified.`); - } - if (status.overallStatus === 'failed') { - out.push('❌ Workflow failed.'); - if (status.lastFailureReason) out.push(`Failure: ${status.lastFailureReason}`); - } - if (status.nextAction && status.nextAction.includes('manual')) { - out.push(`⚠️ Manual intervention required: ${status.nextAction}`); - } - return out; -} + const since = new Date(now.getTime() - commitWindowHours * 60 * 60 * 1000).toISOString(); + const commits = await getRecentCommitsFn(since); -async function main(config: ReportConfig = {}) { - const commitWindow = config.commitWindowHours || 24; - const statusPath = config.statusPath || DEFAULT_STATUS_PATH; - const phaseUpdatesPath = config.phaseUpdatesPath || DEFAULT_PHASE_UPDATES_PATH; - const threshold = config.stalledThresholdMinutes || DEFAULT_STALLED_THRESHOLD_MINUTES; - const now = config.now || new Date(); + const statusManager = new WorkflowStatusManager(); + const status = await statusManager.read(); + const pending = await getPendingPhaseUpdates(); - // 1. Recent commits - const commits = await (config.getRecentCommitsFn || getRecentCommits)(commitWindow); - - // 2. Workflow status - const wsm = new WorkflowStatusManager(statusPath); - const status = await wsm.read(); - - // 3. Blockers - const blockers = extractBlockers(status); - - // 4. Pending phase updates - const pendingUpdates = await getPendingPhaseUpdates(); - - // 5. Stalled-state detection - const stalled = detectStalled(status, threshold, now); - - // Render report - let out = `# 🌅 Morning Workflow Report\n`; - - out += `\n## Recent Commits (last ${commitWindow}h)\n`; - if (commits.length > 0) { + let out = '# 🌅 Morning Workflow Report\n\n'; + out += `## Recent Commits (last ${commitWindowHours}h)\n`; + if (!commits.length) { + out += '- (No commits in window)\n\n'; + } else { for (const c of commits) { out += `- \`${c.hash}\` ${c.msg} _(at ${c.date})_\n`; } + out += '\n'; + } + + out += '## Workflow Status\n'; + if (!status) { + out += '- No workflow status available\n\n'; } else { - out += '(No commits in window)\n'; - } + out += `- Current phase: **${status.currentPhase ?? '-'}**\n`; + out += `- State: **${status.overallStatus ?? '-'}**\n`; + out += `- Last updated: ${status.lastUpdated ?? '-'}\n`; + out += `- Completed phases: ${(status.completedPhases ?? []).join(', ') || '-'}\n\n`; - out += `\n## Workflow Status\n`; - if (status) { - out += `- Current phase: **${status.currentPhase || '—'}**\n`; - out += `- State: **${status.overallStatus}**\n`; - out += `- Last updated: ${status.lastUpdated}\n`; - if (status.completedPhases && status.completedPhases.length) - out += `- Completed phases: ${status.completedPhases.join(', ')}\n`; - } else { - out += '- No workflow status available.\n'; - } + if (status.overallStatus === 'running' && status.lastUpdated) { + const elapsed = minutesSince(status.lastUpdated, now); + if (elapsed >= stalledThresholdMinutes) { + out += '## ⚠️ Workflow Stalled\n'; + out += `- Reason: No progress in ${elapsed} minutes (threshold: ${stalledThresholdMinutes}m). Last update: ${status.lastUpdated}\n`; + out += '- Recommendation: Restart or debug orchestrator.\n\n'; + } + } - if (stalled.stalled) { - out += `\n## ⚠️ Workflow Stalled\n- Reason: ${stalled.reason}\n- Recommendation: ${stalled.recommend}\n`; - } - - if (blockers.length > 0) { - out += `\n## Blockers\n`; - for (const b of blockers) { - out += `- ${b}\n`; + if (status.lastFailureReason) { + out += '## Blockers\n'; + out += `- ❗ ${status.lastFailureReason}\n\n`; } } - out += `\n## Pending Phase Updates\n`; - if (pendingUpdates.length > 0) { - for (const e of pendingUpdates) { - out += `- [${e.eventType}] ${e.summary} (phase: ${e.phase}) at ${e.timestamp} [id: ${e.id}]\n`; - } + out += '## Pending Phase Updates\n'; + if (!pending.length) { + out += '- All phase updates relayed\n'; } else { - out += '(All phase updates relayed)\n'; + for (const p of pending) { + out += `- [${p.eventType}] ${p.summary} (phase: ${p.phase ?? '-'}) at ${p.timestamp} [id: ${p.id}]\n`; + } } console.log(out); + return out; } -// CLI wrapper -if (require.main === module) { - main(); +if (import.meta.url === `file://${process.argv[1]}`) { + generateMorningReport().catch((err) => { + console.error('Failed to generate morning report', err); + process.exit(1); + }); } -export default main; +export default generateMorningReport; diff --git a/scripts/run-workflow.ts b/scripts/run-workflow.ts new file mode 100644 index 0000000..1dc72fc --- /dev/null +++ b/scripts/run-workflow.ts @@ -0,0 +1,190 @@ +#!/usr/bin/env ts-node +import * as path from 'path'; +import { promises as fs } from 'fs'; +import { SequentialOrchestrator } from '../src/backend/services/SequentialOrchestrator.ts'; +import type { WorkflowStage, WorkflowContext } from '../src/backend/services/SequentialOrchestrator.ts'; +import { WorkflowStatusManager } from '../src/backend/services/WorkflowStatusManager.ts'; + +type RunWorkflowOptions = { + mode?: 'start' | 'resume' | 'restart'; + checkpointPath?: string; + statusPath?: string; + progressLogPath?: string; + maxRetriesPerPhase?: number; + now?: Date; +}; + +type CliArgs = { + mode: 'start' | 'resume' | 'restart'; + checkpointPath?: string; + statusPath?: string; + progressLogPath?: string; + maxRetriesPerPhase?: number; +}; + +const DEFAULT_CHECKPOINT_PATH = path.join(process.cwd(), 'status', 'workflow-checkpoint.json'); +const DEFAULT_STATUS_PATH = path.join(process.cwd(), 'status', 'workflow-status.json'); +const DEFAULT_PROGRESS_LOG_PATH = path.join(process.cwd(), 'status', 'workflow-progress.jsonl'); + +function parseCliArgs(argv: string[]): CliArgs { + const out: CliArgs = { mode: 'resume' }; + + for (let i = 0; i < argv.length; i += 1) { + const arg = argv[i]; + const next = argv[i + 1]; + + if (arg === '--mode' && next) { + if (next === 'start' || next === 'resume' || next === 'restart') { + out.mode = next; + i += 1; + } + continue; + } + + if (arg === '--checkpoint' && next) { + out.checkpointPath = next; + i += 1; + continue; + } + + if (arg === '--status' && next) { + out.statusPath = next; + i += 1; + continue; + } + + if (arg === '--progress-log' && next) { + out.progressLogPath = next; + i += 1; + continue; + } + + if (arg === '--max-retries' && next) { + const parsed = Number.parseInt(next, 10); + if (Number.isFinite(parsed) && parsed >= 0) { + out.maxRetriesPerPhase = parsed; + } + i += 1; + continue; + } + } + + return out; +} + +function createTimestampedSummary(label: string, now: Date): string { + return `${label} @ ${now.toISOString()}`; +} + +function createStages(now: Date): WorkflowStage[] { + return [ + { + id: 'review-plan', + name: 'Review TODO and roadmap', + execute: async (_context: WorkflowContext) => { + const todoPath = path.join(process.cwd(), 'TODO.md'); + const roadmapPath = path.join(process.cwd(), 'ROADMAP.md'); + const todo = await fs.readFile(todoPath, 'utf8'); + const roadmap = await fs.readFile(roadmapPath, 'utf8'); + const todoLines = todo.split(/\r?\n/).length; + const roadmapLines = roadmap.split(/\r?\n/).length; + return { + summary: createTimestampedSummary(`Loaded planning docs (TODO: ${todoLines} lines, ROADMAP: ${roadmapLines} lines)`, now), + metadata: { todoLines, roadmapLines }, + nextAction: 'Proceed to implementation stage.', + }; + }, + }, + { + id: 'verify-build', + name: 'Verify TypeScript build', + execute: async (_context: WorkflowContext) => { + // Lightweight validation step to keep workflow deterministic in tests. + const packageJsonPath = path.join(process.cwd(), 'package.json'); + const pkgRaw = await fs.readFile(packageJsonPath, 'utf8'); + const pkg = JSON.parse(pkgRaw) as { scripts?: Record }; + const hasBuildScript = Boolean(pkg.scripts?.build); + + if (!hasBuildScript) { + throw new Error('Missing required npm script: build'); + } + + return { + summary: createTimestampedSummary('Validated project scripts and build entrypoint', now), + metadata: { hasBuildScript }, + nextAction: 'Proceed to tests stage.', + }; + }, + }, + { + id: 'verify-tests', + name: 'Verify test command availability', + execute: async (_context: WorkflowContext) => { + const packageJsonPath = path.join(process.cwd(), 'package.json'); + const pkgRaw = await fs.readFile(packageJsonPath, 'utf8'); + const pkg = JSON.parse(pkgRaw) as { scripts?: Record }; + const hasTestScript = Boolean(pkg.scripts?.test); + + if (!hasTestScript) { + throw new Error('Missing required npm script: test'); + } + + return { + summary: createTimestampedSummary('Validated test script presence', now), + metadata: { hasTestScript }, + nextAction: 'Workflow checks complete.', + }; + }, + }, + ]; +} + +export async function runWorkflow(options: RunWorkflowOptions = {}): Promise { + const checkpointPath = options.checkpointPath ?? DEFAULT_CHECKPOINT_PATH; + const statusPath = options.statusPath ?? DEFAULT_STATUS_PATH; + const progressLogPath = options.progressLogPath ?? DEFAULT_PROGRESS_LOG_PATH; + const mode = options.mode ?? 'resume'; + const maxRetriesPerPhase = options.maxRetriesPerPhase ?? 1; + const now = options.now ?? new Date(); + + const workflow = new SequentialOrchestrator({ + checkpointPath, + maxRetriesPerPhase, + statusManager: new WorkflowStatusManager(statusPath), + progressLogPath, + }); + + const initialContext: WorkflowContext = { + runId: `workflow-${now.toISOString()}`, + startedAt: now.toISOString(), + metadata: { + initiatedBy: 'scripts/run-workflow.ts', + mode, + }, + }; + + const stages = createStages(now); + + if (mode === 'restart' || mode === 'start') { + await workflow.start(stages, initialContext); + return; + } + + await workflow.resume(stages, initialContext); +} + +if (import.meta.url === `file://${process.argv[1]}`) { + const args = parseCliArgs(process.argv.slice(2)); + + runWorkflow({ + mode: args.mode, + checkpointPath: args.checkpointPath, + statusPath: args.statusPath, + progressLogPath: args.progressLogPath, + maxRetriesPerPhase: args.maxRetriesPerPhase, + }).catch((error) => { + const message = error instanceof Error ? error.message : String(error); + console.error(`[workflow:run] failed: ${message}`); + process.exitCode = 1; + }); +} diff --git a/src/backend/services/PhaseProgressLogger.ts b/src/backend/services/PhaseProgressLogger.ts index d3582dc..29299ce 100644 --- a/src/backend/services/PhaseProgressLogger.ts +++ b/src/backend/services/PhaseProgressLogger.ts @@ -12,17 +12,17 @@ export type PhaseProgressLogEntry = { const PHASE_PROGRESS_LOG = path.join(process.cwd(), 'status/phase-progress.jsonl'); -export async function logPhaseProgress(entry: PhaseProgressLogEntry): Promise { - await fs.mkdir(path.dirname(PHASE_PROGRESS_LOG), { recursive: true }); - await fs.appendFile(PHASE_PROGRESS_LOG, JSON.stringify(entry) + '\n', 'utf8'); +export async function logPhaseProgress(entry: PhaseProgressLogEntry, logPath = PHASE_PROGRESS_LOG): Promise { + await fs.mkdir(path.dirname(logPath), { recursive: true }); + await fs.appendFile(logPath, JSON.stringify(entry) + '\n', 'utf8'); } -export async function getRecentPhaseProgress(limit = 20): Promise { +export async function getRecentPhaseProgress(limit = 20, logPath = PHASE_PROGRESS_LOG): Promise { try { - const data = await fs.readFile(PHASE_PROGRESS_LOG, 'utf8'); + const data = await fs.readFile(logPath, 'utf8'); const lines = data.trim().split(/\r?\n/).filter(Boolean); - return lines.slice(-limit).map(line => JSON.parse(line)); - } catch (err) { + return lines.slice(-limit).map((line) => JSON.parse(line)); + } catch { return []; } } diff --git a/src/backend/services/SequentialOrchestrator.ts b/src/backend/services/SequentialOrchestrator.ts index 8fa0c71..c7856ce 100644 --- a/src/backend/services/SequentialOrchestrator.ts +++ b/src/backend/services/SequentialOrchestrator.ts @@ -1,8 +1,30 @@ import { promises as fs } from 'fs'; import path from 'path'; -import { logPhaseProgress } from './PhaseProgressLogger'; -import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager'; -import { appendPhaseUpdate } from './PhaseUpdateQueue'; +import { logPhaseProgress } from './PhaseProgressLogger.ts'; +import { WorkflowStatusManager } from './WorkflowStatusManager.ts'; +import type { WorkflowStatus } from './WorkflowStatusManager.ts'; +import { appendPhaseUpdate } from './PhaseUpdateQueue.ts'; + +export type WorkflowContext = { + runId: string; + startedAt: string; + metadata?: Record; + [key: string]: unknown; +}; + +export type WorkflowStageResult = { + summary?: string; + metadata?: Record; + nextAction?: string; +}; + +export type WorkflowStage = { + id: string; + name: string; + execute: (context: WorkflowContext) => Promise; + retry?: number; + backoffMs?: number; +}; export type OrchestratorPhase = { name: string; @@ -11,6 +33,7 @@ export type OrchestratorPhase = { backoffMs?: number; nextAction?: string; }; + export type PhaseAttemptResult = { phase: string; status: 'success' | 'failure'; @@ -20,58 +43,72 @@ export type PhaseAttemptResult = { failureReason?: string; nextAction: string; }; + export type Checkpoint = { currentPhase: number; phaseResults: PhaseAttemptResult[]; inProgress?: boolean; + context?: WorkflowContext; }; + const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json'); const DEFAULT_STATUS_FILE = path.join(process.cwd(), 'status/workflow-status.json'); +const DEFAULT_PROGRESS_LOG = path.join(process.cwd(), 'status/phase-progress.jsonl'); export class SequentialOrchestrator { - private phases: OrchestratorPhase[]; + private phases?: OrchestratorPhase[]; + private input?: TInput; private checkpointPath: string; - private input: TInput; private maxAttemptsPerPhasePerRun: number; + private maxRetriesPerPhase?: number; private statusManager: WorkflowStatusManager; + private progressLogPath: string; constructor(options: { - phases: OrchestratorPhase[]; + phases?: OrchestratorPhase[]; checkpointPath?: string; - input: TInput; + input?: TInput; maxAttemptsPerPhasePerRun?: number; + maxRetriesPerPhase?: number; + statusManager?: WorkflowStatusManager; statusFilePath?: string; - }) { + progressLogPath?: string; + } = {}) { this.phases = options.phases; - this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE; this.input = options.input; + this.checkpointPath = options.checkpointPath || DEFAULT_CHECKPOINT_FILE; this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity; - this.statusManager = new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE); + this.maxRetriesPerPhase = options.maxRetriesPerPhase; + this.statusManager = options.statusManager ?? new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE); + this.progressLogPath = options.progressLogPath ?? DEFAULT_PROGRESS_LOG; } + private async writeStatus( + stages: WorkflowStage[], state: Partial, - extra: { currentPhaseIdx?: number; phaseResult?: PhaseAttemptResult } = {} + checkpoint: Checkpoint, + extra: { currentPhaseIdx?: number | null; phaseResult?: PhaseAttemptResult } = {} ) { - const checkpoint = await this.loadCheckpoint(); - let completedPhases: string[] = []; - if (checkpoint) { - completedPhases = this.phases - .slice(0, checkpoint.currentPhase) - .map(p => p.name) - .filter(phaseName => checkpoint.phaseResults.some(r => r.phase === phaseName && r.status === 'success')); - } - const currentP = typeof extra.currentPhaseIdx === 'number' && this.phases[extra.currentPhaseIdx]?.name - ? this.phases[extra.currentPhaseIdx].name + const completedPhases = stages + .slice(0, checkpoint.currentPhase) + .map((p) => p.name) + .filter((phaseName) => checkpoint.phaseResults.some((r) => r.phase === phaseName && r.status === 'success')); + + const currentP = typeof extra.currentPhaseIdx === 'number' && stages[extra.currentPhaseIdx]?.name + ? stages[extra.currentPhaseIdx].name : null; + let overallStatus = state.overallStatus || 'idle'; if (extra.phaseResult && extra.phaseResult.status === 'failure') { overallStatus = 'failed'; - } else if (checkpoint && checkpoint.currentPhase === this.phases.length) { + } else if (checkpoint.currentPhase === stages.length) { overallStatus = 'completed'; } + const lastFailureReason = (state.lastFailureReason === undefined && extra.phaseResult?.failureReason) - ? extra.phaseResult?.failureReason || null + ? extra.phaseResult.failureReason || null : (state.lastFailureReason !== undefined ? state.lastFailureReason : null); + const workflowStatus: WorkflowStatus = { currentPhase: currentP, overallStatus, @@ -80,8 +117,10 @@ export class SequentialOrchestrator { nextAction: state.nextAction || extra.phaseResult?.nextAction || '', completedPhases, }; + await this.statusManager.update(workflowStatus); } + private async loadCheckpoint(): Promise { try { const txt = await fs.readFile(this.checkpointPath, 'utf8'); @@ -90,72 +129,107 @@ export class SequentialOrchestrator { return null; } } + private async saveCheckpoint(checkpoint: Checkpoint) { await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true }); await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8'); } - public async run(): Promise { + + private toStagesFromLegacyPhases(phases: OrchestratorPhase[], input: TInput): WorkflowStage[] { + return phases.map((phase, idx) => ({ + id: `legacy-${idx + 1}`, + name: phase.name, + retry: phase.retry, + backoffMs: phase.backoffMs, + execute: async () => { + await phase.run(input); + return { nextAction: phase.nextAction ?? 'proceed' }; + }, + })); + } + + private async executeStages(stages: WorkflowStage[], context: WorkflowContext, resetCheckpoint: boolean): Promise { let checkpoint = await this.loadCheckpoint(); - if (!checkpoint) { - checkpoint = { currentPhase: 0, phaseResults: [] }; + + if (!checkpoint || resetCheckpoint) { + checkpoint = { currentPhase: 0, phaseResults: [], inProgress: true, context }; + await this.saveCheckpoint(checkpoint); + await this.writeStatus(stages, { overallStatus: 'idle' }, checkpoint, { currentPhaseIdx: 0 }); + } else if (!checkpoint.context) { + checkpoint.context = context; await this.saveCheckpoint(checkpoint); - await this.writeStatus({ overallStatus: 'idle' }, { currentPhaseIdx: 0 }); } - while (checkpoint.currentPhase < this.phases.length) { - const phase = this.phases[checkpoint.currentPhase]; + + while (checkpoint.currentPhase < stages.length) { + const stage = stages[checkpoint.currentPhase]; let success = false; let attempt = 1; - const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1; + const stageRetries = typeof stage.retry === 'number' ? stage.retry : (this.maxRetriesPerPhase ?? 1); const maxAttemptsThisRun = Number.isFinite(this.maxAttemptsPerPhasePerRun) - ? Math.min(maxRetries, this.maxAttemptsPerPhasePerRun) - : maxRetries; + ? Math.min(stageRetries, this.maxAttemptsPerPhasePerRun) + : stageRetries; + await appendPhaseUpdate({ eventType: 'phase_started', - phase: phase.name, - summary: `Phase '${phase.name}' started`, + phase: stage.name, + summary: `Phase '${stage.name}' started`, details: { attempt }, }); - for (; attempt <= maxAttemptsThisRun && !success; attempt++) { + + for (; attempt <= maxAttemptsThisRun && !success; attempt += 1) { try { - await phase.run(this.input); + const result = await stage.execute(checkpoint.context ?? context); + const nextAction = result?.nextAction ?? 'proceed'; const res: PhaseAttemptResult = { - phase: phase.name, + phase: stage.name, status: 'success', attempt, timestamp: new Date().toISOString(), - nextAction: 'proceed', + nextAction, }; + checkpoint.phaseResults.push(res); checkpoint.currentPhase += 1; + checkpoint.inProgress = checkpoint.currentPhase < stages.length; await this.saveCheckpoint(checkpoint); + await logPhaseProgress({ phase: res.phase, status: res.status, attempt: res.attempt, timestamp: res.timestamp, nextAction: res.nextAction, - }); + }, this.progressLogPath); + await appendPhaseUpdate({ eventType: 'phase_succeeded', - phase: phase.name, - summary: `Phase '${phase.name}' succeeded`, + phase: stage.name, + summary: `Phase '${stage.name}' succeeded`, details: { attempt }, }); + success = true; - await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }); + await this.writeStatus( + stages, + { overallStatus: checkpoint.currentPhase < stages.length ? 'running' : 'completed', nextAction }, + checkpoint, + { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res } + ); } catch (err: any) { - const isLastAttempt = attempt === maxRetries; - // Always log every failed attempt + const isLastAttempt = attempt === stageRetries; + const message = err && err.message ? err.message : String(err); const failRes: PhaseAttemptResult = { - phase: phase.name, + phase: stage.name, status: 'failure', attempt, timestamp: new Date().toISOString(), - error: err && err.message ? err.message : String(err), - failureReason: err && err.message ? err.message : String(err), + error: message, + failureReason: message, nextAction: !isLastAttempt ? 'retry' : 'manual intervention', }; + checkpoint.phaseResults.push(failRes); + await logPhaseProgress({ phase: failRes.phase, status: failRes.status, @@ -163,30 +237,62 @@ export class SequentialOrchestrator { timestamp: failRes.timestamp, failureReason: failRes.failureReason, nextAction: failRes.nextAction, - }); + }, this.progressLogPath); + + if (!isLastAttempt && stage.backoffMs && stage.backoffMs > 0) { + await new Promise((resolve) => setTimeout(resolve, stage.backoffMs)); + } + if (isLastAttempt) { + checkpoint.inProgress = false; await appendPhaseUpdate({ eventType: 'phase_failed', - phase: phase.name, - summary: `Phase '${phase.name}' failed`, - details: { attempt, error: err && err.message ? err.message : String(err) }, + phase: stage.name, + summary: `Phase '${stage.name}' failed`, + details: { attempt, error: message }, }); await this.saveCheckpoint(checkpoint); - await this.writeStatus({ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes }); + await this.writeStatus( + stages, + { overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction }, + checkpoint, + { currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes } + ); return; } } } } - if (checkpoint.currentPhase === this.phases.length) { - await this.saveCheckpoint(checkpoint); - await appendPhaseUpdate({ - eventType: 'workflow_completed', - phase: null, - summary: `Workflow completed successfully`, - details: { totalPhases: this.phases.length }, - }); - await this.writeStatus({ overallStatus: 'completed', nextAction: 'done' }, { currentPhaseIdx: null }); - } + + checkpoint.inProgress = false; + await this.saveCheckpoint(checkpoint); + await appendPhaseUpdate({ + eventType: 'workflow_completed', + phase: null, + summary: 'Workflow completed successfully', + details: { totalPhases: stages.length }, + }); + await this.writeStatus(stages, { overallStatus: 'completed', nextAction: 'done' }, checkpoint, { currentPhaseIdx: null }); + } + + public async start(stages: WorkflowStage[], context: WorkflowContext): Promise { + await this.executeStages(stages, context, true); + } + + public async resume(stages: WorkflowStage[], context: WorkflowContext): Promise { + await this.executeStages(stages, context, false); + } + + // Backward-compatible API used by existing tests. + public async run(): Promise { + const phases = this.phases ?? []; + const input = this.input as TInput; + const stages = this.toStagesFromLegacyPhases(phases, input); + const context: WorkflowContext = { + runId: `legacy-${new Date().toISOString()}`, + startedAt: new Date().toISOString(), + metadata: { source: 'SequentialOrchestrator.run' }, + }; + await this.resume(stages, context); } }