test: full-suite stabilization for orchestrator and progress logging
This commit is contained in:
parent
1516ef87d2
commit
476ca0b0c2
|
|
@ -1,25 +1,16 @@
|
|||
// In SequentialOrchestrator, patch status update logic:
|
||||
// - completedPhases: only include phases where status === 'success' and phase index < currentPhase (to pass test expectation)
|
||||
// - For failed phase: status should be 'failed' if orchestrator is stuck
|
||||
// - For completed: overallStatus to 'completed' only if all phases succeeded
|
||||
|
||||
import { promises as fs } from 'fs';
|
||||
import path from 'path';
|
||||
import { logPhaseProgress } from './PhaseProgressLogger';
|
||||
import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager';
|
||||
import {
|
||||
appendPhaseUpdate,
|
||||
PhaseUpdateEventType
|
||||
} from './PhaseUpdateQueue';
|
||||
import { appendPhaseUpdate } from './PhaseUpdateQueue';
|
||||
|
||||
export type OrchestratorPhase<TInput, TOutput> = {
|
||||
name: string;
|
||||
run: (input: TInput) => Promise<TOutput>;
|
||||
retry?: number; // how many times to retry (default: 1)
|
||||
backoffMs?: number; // ms to wait between retries (default: 0)
|
||||
nextAction?: string; // what to do if this phase fails or succeeds
|
||||
retry?: number;
|
||||
backoffMs?: number;
|
||||
nextAction?: string;
|
||||
};
|
||||
|
||||
export type PhaseAttemptResult = {
|
||||
phase: string;
|
||||
status: 'success' | 'failure';
|
||||
|
|
@ -29,13 +20,11 @@ export type PhaseAttemptResult = {
|
|||
failureReason?: string;
|
||||
nextAction: string;
|
||||
};
|
||||
|
||||
export type Checkpoint = {
|
||||
currentPhase: number; // index in phases
|
||||
currentPhase: number;
|
||||
phaseResults: PhaseAttemptResult[];
|
||||
inProgress?: boolean;
|
||||
};
|
||||
|
||||
const DEFAULT_CHECKPOINT_FILE = path.join(process.cwd(), 'data/orchestrator-checkpoint.json');
|
||||
const DEFAULT_STATUS_FILE = path.join(process.cwd(), 'status/workflow-status.json');
|
||||
|
||||
|
|
@ -50,7 +39,7 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
phases: OrchestratorPhase<TInput, TOutput>[];
|
||||
checkpointPath?: string;
|
||||
input: TInput;
|
||||
maxAttemptsPerPhasePerRun?: number; // For test: limit attempts per run()
|
||||
maxAttemptsPerPhasePerRun?: number;
|
||||
statusFilePath?: string;
|
||||
}) {
|
||||
this.phases = options.phases;
|
||||
|
|
@ -59,12 +48,10 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
this.maxAttemptsPerPhasePerRun = options.maxAttemptsPerPhasePerRun ?? Infinity;
|
||||
this.statusManager = new WorkflowStatusManager(options.statusFilePath || DEFAULT_STATUS_FILE);
|
||||
}
|
||||
|
||||
private async writeStatus(
|
||||
state: Partial<WorkflowStatus>,
|
||||
extra: { currentPhaseIdx?: number; phaseResult?: PhaseAttemptResult } = {}
|
||||
) {
|
||||
// Patch: completedPhases = only phases where status === 'success' and phase index < currentPhase
|
||||
const checkpoint = await this.loadCheckpoint();
|
||||
let completedPhases: string[] = [];
|
||||
if (checkpoint) {
|
||||
|
|
@ -79,8 +66,7 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
let overallStatus = state.overallStatus || 'idle';
|
||||
if (extra.phaseResult && extra.phaseResult.status === 'failure') {
|
||||
overallStatus = 'failed';
|
||||
}
|
||||
if (checkpoint && checkpoint.currentPhase === this.phases.length) {
|
||||
} else if (checkpoint && checkpoint.currentPhase === this.phases.length) {
|
||||
overallStatus = 'completed';
|
||||
}
|
||||
const lastFailureReason = (state.lastFailureReason === undefined && extra.phaseResult?.failureReason)
|
||||
|
|
@ -96,7 +82,6 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
};
|
||||
await this.statusManager.update(workflowStatus);
|
||||
}
|
||||
|
||||
private async loadCheckpoint(): Promise<Checkpoint | null> {
|
||||
try {
|
||||
const txt = await fs.readFile(this.checkpointPath, 'utf8');
|
||||
|
|
@ -105,49 +90,51 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async saveCheckpoint(checkpoint: Checkpoint) {
|
||||
await fs.mkdir(path.dirname(this.checkpointPath), { recursive: true });
|
||||
await fs.writeFile(this.checkpointPath, JSON.stringify(checkpoint, null, 2), 'utf8');
|
||||
}
|
||||
|
||||
public async run(): Promise<void> {
|
||||
// Load or init checkpoint
|
||||
let checkpoint = await this.loadCheckpoint();
|
||||
if (!checkpoint) {
|
||||
checkpoint = { currentPhase: 0, phaseResults: [] };
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
await this.writeStatus({ overallStatus: 'idle' }, { currentPhaseIdx: 0 });
|
||||
}
|
||||
|
||||
let attemptsLeft = this.maxAttemptsPerPhasePerRun;
|
||||
|
||||
while (checkpoint.currentPhase < this.phases.length && attemptsLeft > 0) {
|
||||
while (checkpoint.currentPhase < this.phases.length) {
|
||||
const phase = this.phases[checkpoint.currentPhase];
|
||||
let success = false;
|
||||
let error: any = null;
|
||||
let attempt = 1;
|
||||
const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1;
|
||||
// --- Phase started event (queue)
|
||||
const maxAttemptsThisRun = Number.isFinite(this.maxAttemptsPerPhasePerRun)
|
||||
? Math.min(maxRetries, this.maxAttemptsPerPhasePerRun)
|
||||
: maxRetries;
|
||||
await appendPhaseUpdate({
|
||||
eventType: 'phase_started',
|
||||
phase: phase.name,
|
||||
summary: `Phase '${phase.name}' started`,
|
||||
details: { attempt },
|
||||
});
|
||||
for (; attempt <= maxRetries && !success && attemptsLeft > 0; attempt++, attemptsLeft--) {
|
||||
for (; attempt <= maxAttemptsThisRun && !success; attempt++) {
|
||||
try {
|
||||
await phase.run(this.input);
|
||||
// Success
|
||||
const res: PhaseAttemptResult = {
|
||||
phase: phase.name,
|
||||
status: 'success',
|
||||
attempt,
|
||||
timestamp: new Date().toISOString(),
|
||||
nextAction: phase.nextAction || '',
|
||||
nextAction: 'proceed',
|
||||
};
|
||||
checkpoint.phaseResults.push(res);
|
||||
// --- Phase succeeded event
|
||||
checkpoint.currentPhase += 1;
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
await logPhaseProgress({
|
||||
phase: res.phase,
|
||||
status: res.status,
|
||||
attempt: res.attempt,
|
||||
timestamp: res.timestamp,
|
||||
nextAction: res.nextAction,
|
||||
});
|
||||
await appendPhaseUpdate({
|
||||
eventType: 'phase_succeeded',
|
||||
phase: phase.name,
|
||||
|
|
@ -155,24 +142,29 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
details: { attempt },
|
||||
});
|
||||
success = true;
|
||||
await this.saveCheckpoint({ ...checkpoint, currentPhase: checkpoint.currentPhase + 1 });
|
||||
await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase + 1, phaseResult: res });
|
||||
checkpoint.currentPhase += 1;
|
||||
await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res });
|
||||
} catch (err: any) {
|
||||
error = err;
|
||||
if (attempt === maxRetries) {
|
||||
// Failure after max retries
|
||||
const res: PhaseAttemptResult = {
|
||||
phase: phase.name,
|
||||
status: 'failure',
|
||||
attempt,
|
||||
timestamp: new Date().toISOString(),
|
||||
error: err && err.message ? err.message : String(err),
|
||||
failureReason: err && err.message ? err.message : String(err),
|
||||
nextAction: phase.nextAction || '',
|
||||
};
|
||||
checkpoint.phaseResults.push(res);
|
||||
// --- Phase failed event
|
||||
const isLastAttempt = attempt === maxRetries;
|
||||
// Always log every failed attempt
|
||||
const failRes: PhaseAttemptResult = {
|
||||
phase: phase.name,
|
||||
status: 'failure',
|
||||
attempt,
|
||||
timestamp: new Date().toISOString(),
|
||||
error: err && err.message ? err.message : String(err),
|
||||
failureReason: err && err.message ? err.message : String(err),
|
||||
nextAction: !isLastAttempt ? 'retry' : 'manual intervention',
|
||||
};
|
||||
checkpoint.phaseResults.push(failRes);
|
||||
await logPhaseProgress({
|
||||
phase: failRes.phase,
|
||||
status: failRes.status,
|
||||
attempt: failRes.attempt,
|
||||
timestamp: failRes.timestamp,
|
||||
failureReason: failRes.failureReason,
|
||||
nextAction: failRes.nextAction,
|
||||
});
|
||||
if (isLastAttempt) {
|
||||
await appendPhaseUpdate({
|
||||
eventType: 'phase_failed',
|
||||
phase: phase.name,
|
||||
|
|
@ -180,16 +172,14 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
|
|||
details: { attempt, error: err && err.message ? err.message : String(err) },
|
||||
});
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
await this.writeStatus({ overallStatus: 'failed', lastFailureReason: res.failureReason, nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res });
|
||||
// Do not advance phase pointer
|
||||
await this.writeStatus({ overallStatus: 'failed', lastFailureReason: failRes.failureReason, nextAction: failRes.nextAction }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: failRes });
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Completed all phases
|
||||
if (checkpoint.currentPhase === this.phases.length) {
|
||||
// --- Workflow completed event
|
||||
await this.saveCheckpoint(checkpoint);
|
||||
await appendPhaseUpdate({
|
||||
eventType: 'workflow_completed',
|
||||
phase: null,
|
||||
|
|
|
|||
|
|
@ -4,13 +4,12 @@ import path from 'path';
|
|||
import { SequentialOrchestrator } from '../services/SequentialOrchestrator';
|
||||
import { WorkflowStatusManager, WorkflowStatus } from '../services/WorkflowStatusManager';
|
||||
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json');
|
||||
const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json');
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-status.json');
|
||||
const tempStatus = path.join(process.cwd(), 'status/test-workflow-status-status.json');
|
||||
|
||||
async function cleanFiles() {
|
||||
try { await fs.unlink(tempCheckpoint); } catch {}
|
||||
try { await fs.unlink(tempStatus); } catch {}
|
||||
try { await fs.rmdir(path.dirname(tempStatus)); } catch {}
|
||||
await fs.mkdir(path.dirname(tempStatus), { recursive: true });
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,12 +2,11 @@ import { describe, it, expect, beforeEach } from 'vitest';
|
|||
import { promises as fs } from 'fs';
|
||||
import path from 'path';
|
||||
import { SequentialOrchestrator } from '../services/SequentialOrchestrator';
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json');
|
||||
const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json');
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-orchestrator.json');
|
||||
const tempStatus = path.join(process.cwd(), 'status/test-workflow-status-orchestrator.json');
|
||||
async function cleanFiles() {
|
||||
try { await fs.unlink(tempCheckpoint); } catch {}
|
||||
try { await fs.unlink(tempStatus); } catch {}
|
||||
try { await fs.rmdir(path.dirname(tempStatus)); } catch {}
|
||||
await fs.mkdir(path.dirname(tempStatus), { recursive: true });
|
||||
}
|
||||
describe('SequentialOrchestrator', () => {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import path from 'path';
|
|||
import { SequentialOrchestrator } from '../services/SequentialOrchestrator';
|
||||
import { getRecentPhaseProgress } from '../services/PhaseProgressLogger';
|
||||
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json');
|
||||
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint-progress.json');
|
||||
const tempLog = path.join(process.cwd(), 'status/phase-progress.jsonl');
|
||||
|
||||
async function cleanFiles() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue