diff --git a/src/backend/services/SequentialOrchestrator.ts b/src/backend/services/SequentialOrchestrator.ts index bb20499..8fa0c71 100644 --- a/src/backend/services/SequentialOrchestrator.ts +++ b/src/backend/services/SequentialOrchestrator.ts @@ -1,25 +1,16 @@ -// In SequentialOrchestrator, patch status update logic: -// - completedPhases: only include phases where status === 'success' and phase index < currentPhase (to pass test expectation) -// - For failed phase: status should be 'failed' if orchestrator is stuck -// - For completed: overallStatus to 'completed' only if all phases succeeded - import { promises as fs } from 'fs'; import path from 'path'; import { logPhaseProgress } from './PhaseProgressLogger'; import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager'; -import { - appendPhaseUpdate, - PhaseUpdateEventType -} from './PhaseUpdateQueue'; +import { appendPhaseUpdate } from './PhaseUpdateQueue'; export type OrchestratorPhase = { name: string; run: (input: TInput) => Promise; - retry?: number; // how many times to retry (default: 1) - backoffMs?: number; // ms to wait between retries (default: 0) - nextAction?: string; // what to do if this phase fails or succeeds + retry?: number; + backoffMs?: number; + nextAction?: string; }; - export type PhaseAttemptResult = { phase: string; status: 'success' | 'failure'; @@ -29,13 +20,11 @@ export type PhaseAttemptResult = { failureReason?: string; nextAction: string; }; - export type Checkpoint = { - currentPhase: number; // index in phases + currentPhase: number; phaseResults: PhaseAttemptResult[]; inProgress?: boolean; }; - const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json'); const DEFAULT_STATUS_FILE = path.join(process.cwd(), 'status/workflow-status.json'); @@ -50,7 +39,7 @@ export class SequentialOrchestrator { phases: OrchestratorPhase[]; checkpointPath?: string; input: TInput; - maxAttemptsPerPhasePerRun?: number; // For test: limit attempts per run() + maxAttemptsPerPhasePerRun?: number; statusFilePath?: string; }) { this.phases = options.phases; @@ -59,12 +48,10 @@ export class SequentialOrchestrator { this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity; this.statusManager = new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE); } - private async writeStatus( state: Partial, extra: { currentPhaseIdx?: number; phaseResult?: PhaseAttemptResult } = {} ) { - // Patch: completedPhases = only phases where status === 'success' and phase index < currentPhase const checkpoint = await this.loadCheckpoint(); let completedPhases: string[] = []; if (checkpoint) { @@ -79,8 +66,7 @@ export class SequentialOrchestrator { let overallStatus = state.overallStatus || 'idle'; if (extra.phaseResult && extra.phaseResult.status === 'failure') { overallStatus = 'failed'; - } - if (checkpoint && checkpoint.currentPhase === this.phases.length) { + } else if (checkpoint && checkpoint.currentPhase === this.phases.length) { overallStatus = 'completed'; } const lastFailureReason = (state.lastFailureReason === undefined && extra.phaseResult?.failureReason) @@ -96,7 +82,6 @@ export class SequentialOrchestrator { }; await this.statusManager.update(workflowStatus); } - private async loadCheckpoint(): Promise { try { const txt = await fs.readFile(this.checkpointPath, 'utf8'); @@ -105,49 +90,51 @@ export class SequentialOrchestrator { return null; } } - private async saveCheckpoint(checkpoint: Checkpoint) { await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true }); await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8'); } - public async run(): Promise { - // Load or init checkpoint let checkpoint = await this.loadCheckpoint(); if (!checkpoint) { checkpoint = { currentPhase: 0, phaseResults: [] }; await this.saveCheckpoint(checkpoint); await this.writeStatus({ overallStatus: 'idle' }, { currentPhaseIdx: 0 }); } - - let attemptsLeft = this.maxAttemptsPerPhasePerRun; - - while (checkpoint.currentPhase < this.phases.length && attemptsLeft > 0) { + while (checkpoint.currentPhase < this.phases.length) { const phase = this.phases[checkpoint.currentPhase]; let success = false; - let error: any = null; let attempt = 1; const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1; - // --- Phase started event (queue) + const maxAttemptsThisRun = Number.isFinite(this.maxAttemptsPerPhasePerRun) + ? Math.min(maxRetries, this.maxAttemptsPerPhasePerRun) + : maxRetries; await appendPhaseUpdate({ eventType: 'phase_started', phase: phase.name, summary: `Phase '${phase.name}' started`, details: { attempt }, }); - for (; attempt <= maxRetries && !success && attemptsLeft > 0; attempt++, attemptsLeft--) { + for (; attempt <= maxAttemptsThisRun && !success; attempt++) { try { await phase.run(this.input); - // Success const res: PhaseAttemptResult = { phase: phase.name, status: 'success', attempt, timestamp: new Date().toISOString(), - nextAction: phase.nextAction || '', + nextAction: 'proceed', }; checkpoint.phaseResults.push(res); - // --- Phase succeeded event + checkpoint.currentPhase += 1; + await this.saveCheckpoint(checkpoint); + await logPhaseProgress({ + phase: res.phase, + status: res.status, + attempt: res.attempt, + timestamp: res.timestamp, + nextAction: res.nextAction, + }); await appendPhaseUpdate({ eventType: 'phase_succeeded', phase: phase.name, @@ -155,24 +142,29 @@ export class SequentialOrchestrator { details: { attempt }, }); success = true; - await this.saveCheckpoint({ ...checkpoint, currentPhase: checkpoint.currentPhase + 1 }); - await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase + 1, phaseResult: res }); - checkpoint.currentPhase += 1; + await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }); } catch (err: any) { - error = err; - if (attempt === maxRetries) { - // Failure after max retries - const res: PhaseAttemptResult = { - phase: phase.name, - status: 'failure', - attempt, - timestamp: new Date().toISOString(), - error: err && err.message ? err.message : String(err), - failureReason: err && err.message ? err.message : String(err), - nextAction: phase.nextAction || '', - }; - checkpoint.phaseResults.push(res); - // --- Phase failed event + const isLastAttempt = attempt === maxRetries; + // Always log every failed attempt + const failRes: PhaseAttemptResult = { + phase: phase.name, + status: 'failure', + attempt, + timestamp: new Date().toISOString(), + error: err && err.message ? err.message : String(err), + failureReason: err && err.message ? err.message : String(err), + nextAction: !isLastAttempt ? 'retry' : 'manual intervention', + }; + checkpoint.phaseResults.push(failRes); + await logPhaseProgress({ + phase: failRes.phase, + status: failRes.status, + attempt: failRes.attempt, + timestamp: failRes.timestamp, + failureReason: failRes.failureReason, + nextAction: failRes.nextAction, + }); + if (isLastAttempt) { await appendPhaseUpdate({ eventType: 'phase_failed', phase: phase.name, @@ -180,16 +172,14 @@ export class SequentialOrchestrator { details: { attempt, error: err && err.message ? err.message : String(err) }, }); await this.saveCheckpoint(checkpoint); - await this.writeStatus({ overallStatus: 'failed', lastFailureReason: res.failureReason, nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }); - // Do not advance phase pointer + await this.writeStatus({ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes }); return; } } } } - // Completed all phases if (checkpoint.currentPhase === this.phases.length) { - // --- Workflow completed event + await this.saveCheckpoint(checkpoint); await appendPhaseUpdate({ eventType: 'workflow_completed', phase: null, diff --git a/src/backend/tests/orchestrator-status.test.ts b/src/backend/tests/orchestrator-status.test.ts index 9764af5..b5c7875 100644 --- a/src/backend/tests/orchestrator-status.test.ts +++ b/src/backend/tests/orchestrator-status.test.ts @@ -4,13 +4,12 @@ import path from 'path'; import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; import { WorkflowStatusManager, WorkflowStatus } from '../services/WorkflowStatusManager'; -const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); -const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json'); +const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-status.json'); +const tempStatus = path.join(process.cwd(), 'status/test-workflow-status-status.json'); async function cleanFiles() { try { await fs.unlink(tempCheckpoint); } catch {} try { await fs.unlink(tempStatus); } catch {} - try { await fs.rmdir(path.dirname(tempStatus)); } catch {} await fs.mkdir(path.dirname(tempStatus), { recursive: true }); } diff --git a/src/backend/tests/orchestrator.test.ts b/src/backend/tests/orchestrator.test.ts index c3fa063..c3c921b 100644 --- a/src/backend/tests/orchestrator.test.ts +++ b/src/backend/tests/orchestrator.test.ts @@ -2,12 +2,11 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { promises as fs } from 'fs'; import path from 'path'; import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; -const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); -const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json'); +const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-orchestrator.json'); +const tempStatus = path.join(process.cwd(), 'status/test-workflow-status-orchestrator.json'); async function cleanFiles() { try { await fs.unlink(tempCheckpoint); } catch {} try { await fs.unlink(tempStatus); } catch {} - try { await fs.rmdir(path.dirname(tempStatus)); } catch {} await fs.mkdir(path.dirname(tempStatus), { recursive: true }); } describe('SequentialOrchestrator', () => { diff --git a/src/backend/tests/phase-progress-logger.test.ts b/src/backend/tests/phase-progress-logger.test.ts index 360a4a9..9c764c7 100644 --- a/src/backend/tests/phase-progress-logger.test.ts +++ b/src/backend/tests/phase-progress-logger.test.ts @@ -4,7 +4,7 @@ import path from 'path'; import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; import { getRecentPhaseProgress } from '../services/PhaseProgressLogger'; -const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); +const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-progress.json'); const tempLog = path.join(process.cwd(), 'status/phase-progress.jsonl'); async function cleanFiles() {