Task 4: Add local phase update queue (JSONL) with event schema, helper functions, orchestrator emission, and tests. Integrates phase_started, phase_succeeded, phase_failed, workflow_completed events; helper functions for reading pending and marking sent. All tests passing. See tests/phase-updates-queue-README.md for usage.

This commit is contained in:
Paul Huliganga 2026-03-26 13:42:26 -04:00
parent f54468e471
commit 87ee00dcb5
8 changed files with 232 additions and 14 deletions

View File

@ -0,0 +1,86 @@
import { promises as fs } from 'fs';
import path from 'path';
import crypto from 'crypto';
export type PhaseUpdateEventType = 'phase_started' | 'phase_succeeded' | 'phase_failed' | 'workflow_completed' | 'workflow_blocked';
export type PhaseUpdateEvent = {
id: string;
eventType: PhaseUpdateEventType;
phase: string | null;
timestamp: string;
summary: string;
details?: string | object;
relayStatus: 'pending' | 'sent';
};
const PHASE_UPDATES_QUEUE = path.join(process.cwd(), 'status/phase-updates.jsonl');
function ensureArrayString(details?: string | object): string {
if (typeof details === 'undefined') return '';
if (typeof details === 'string') return details;
try { return JSON.stringify(details); } catch { return String(details); }
}
export async function appendPhaseUpdate(event:
Omit<PhaseUpdateEvent, 'id' | 'relayStatus' | 'timestamp'> & {
details?: string | object;
relayStatus?: 'pending' | 'sent';
timestamp?: string;
}
): Promise<PhaseUpdateEvent> {
const finalized: PhaseUpdateEvent = {
id: crypto.randomUUID(),
eventType: event.eventType,
phase: event.phase ?? null,
timestamp: event.timestamp || new Date().toISOString(),
summary: event.summary,
details: event.details,
relayStatus: event.relayStatus || 'pending',
};
await fs.mkdir(path.dirname(PHASE_UPDATES_QUEUE), { recursive: true });
await fs.appendFile(PHASE_UPDATES_QUEUE, JSON.stringify(finalized) + '\n', 'utf8');
return finalized;
}
export async function getPendingPhaseUpdates(): Promise<PhaseUpdateEvent[]> {
try {
const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8');
return data.trim().split(/\r?\n/).filter(Boolean)
.map(l => JSON.parse(l))
.filter((e: PhaseUpdateEvent) => e.relayStatus === 'pending');
} catch {
return [];
}
}
export async function markPhaseUpdateSent(id: string): Promise<boolean> {
try {
const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8');
const lines = data.trim().split(/\r?\n/).filter(Boolean);
let found = false;
const updatedLines = lines.map(l => {
const evt = JSON.parse(l) as PhaseUpdateEvent;
if (evt.id === id && evt.relayStatus === 'pending') {
found = true;
evt.relayStatus = 'sent';
}
return JSON.stringify(evt);
});
if (found) {
await fs.writeFile(PHASE_UPDATES_QUEUE, updatedLines.join('\n') + '\n', 'utf8');
}
return found;
} catch {
return false;
}
}
export async function getAllPhaseUpdates(): Promise<PhaseUpdateEvent[]> {
try {
const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8');
return data.trim().split(/\r?\n/).filter(Boolean).map(l => JSON.parse(l));
} catch {
return [];
}
}

View File

@ -7,6 +7,10 @@ import { promises as fs } from 'fs';
import path from 'path'; import path from 'path';
import { logPhaseProgress } from './PhaseProgressLogger'; import { logPhaseProgress } from './PhaseProgressLogger';
import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager'; import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager';
import {
appendPhaseUpdate,
PhaseUpdateEventType
} from './PhaseUpdateQueue';
export type OrchestratorPhase<TInput, TOutput> = { export type OrchestratorPhase<TInput, TOutput> = {
name: string; name: string;
@ -73,11 +77,9 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
? this.phases[extra.currentPhaseIdx].name ? this.phases[extra.currentPhaseIdx].name
: null; : null;
let overallStatus = state.overallStatus || 'idle'; let overallStatus = state.overallStatus || 'idle';
// Patch: use 'failed' if we're not at end and a failure just occurred
if (extra.phaseResult && extra.phaseResult.status === 'failure') { if (extra.phaseResult && extra.phaseResult.status === 'failure') {
overallStatus = 'failed'; overallStatus = 'failed';
} }
// Patch: if finished and all phases succeeded
if (checkpoint && checkpoint.currentPhase === this.phases.length) { if (checkpoint && checkpoint.currentPhase === this.phases.length) {
overallStatus = 'completed'; overallStatus = 'completed';
} }
@ -126,6 +128,13 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
let error: any = null; let error: any = null;
let attempt = 1; let attempt = 1;
const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1; const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1;
// --- Phase started event (queue)
await appendPhaseUpdate({
eventType: 'phase_started',
phase: phase.name,
summary: `Phase '${phase.name}' started`,
details: { attempt },
});
for (; attempt <= maxRetries && !success && attemptsLeft > 0; attempt++, attemptsLeft--) { for (; attempt <= maxRetries && !success && attemptsLeft > 0; attempt++, attemptsLeft--) {
try { try {
await phase.run(this.input); await phase.run(this.input);
@ -138,6 +147,13 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
nextAction: phase.nextAction || '', nextAction: phase.nextAction || '',
}; };
checkpoint.phaseResults.push(res); checkpoint.phaseResults.push(res);
// --- Phase succeeded event
await appendPhaseUpdate({
eventType: 'phase_succeeded',
phase: phase.name,
summary: `Phase '${phase.name}' succeeded`,
details: { attempt },
});
success = true; success = true;
await this.saveCheckpoint({ ...checkpoint, currentPhase: checkpoint.currentPhase + 1 }); await this.saveCheckpoint({ ...checkpoint, currentPhase: checkpoint.currentPhase + 1 });
await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase + 1, phaseResult: res }); await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase + 1, phaseResult: res });
@ -156,6 +172,13 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
nextAction: phase.nextAction || '', nextAction: phase.nextAction || '',
}; };
checkpoint.phaseResults.push(res); checkpoint.phaseResults.push(res);
// --- Phase failed event
await appendPhaseUpdate({
eventType: 'phase_failed',
phase: phase.name,
summary: `Phase '${phase.name}' failed`,
details: { attempt, error: err && err.message ? err.message : String(err) },
});
await this.saveCheckpoint(checkpoint); await this.saveCheckpoint(checkpoint);
await this.writeStatus({ overallStatus: 'failed', lastFailureReason: res.failureReason, nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }); await this.writeStatus({ overallStatus: 'failed', lastFailureReason: res.failureReason, nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res });
// Do not advance phase pointer // Do not advance phase pointer
@ -166,6 +189,13 @@ export class SequentialOrchestrator<TInput = any, TOutput = any> {
} }
// Completed all phases // Completed all phases
if (checkpoint.currentPhase === this.phases.length) { if (checkpoint.currentPhase === this.phases.length) {
// --- Workflow completed event
await appendPhaseUpdate({
eventType: 'workflow_completed',
phase: null,
summary: `Workflow completed successfully`,
details: { totalPhases: this.phases.length },
});
await this.writeStatus({ overallStatus: 'completed', nextAction: 'done' }, { currentPhaseIdx: null }); await this.writeStatus({ overallStatus: 'completed', nextAction: 'done' }, { currentPhaseIdx: null });
} }
} }

View File

@ -0,0 +1,50 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { promises as fs } from 'fs';
import path from 'path';
import {
appendPhaseUpdate,
getPendingPhaseUpdates,
markPhaseUpdateSent,
getAllPhaseUpdates,
PhaseUpdateEvent
} from '../PhaseUpdateQueue';
const TEST_QUEUE_FILE = path.join(process.cwd(), 'status/phase-updates.jsonl');
describe('PhaseUpdateQueue', () => {
beforeEach(async () => {
try {
await fs.unlink(TEST_QUEUE_FILE);
} catch {}
});
it('appends events and retrieves pending only', async () => {
let ev1 = await appendPhaseUpdate({ eventType: 'phase_started', phase: 'import', summary: 'Import started' });
let ev2 = await appendPhaseUpdate({ eventType: 'phase_failed', phase: 'import', summary: 'Import failed', details: 'Network error' });
expect(ev1.id).toBeDefined();
expect(ev2.id).toBeDefined();
const pending = await getPendingPhaseUpdates();
expect(pending.length).toBe(2);
expect(pending.map(e => e.id)).toContain(ev1.id);
expect(pending.some(e => e.eventType === 'phase_failed')).toBe(true);
});
it('markPhaseUpdateSent updates relayStatus', async () => {
let ev = await appendPhaseUpdate({ eventType: 'phase_succeeded', phase: 'parse', summary: 'Parsed recipe' });
let id = ev.id;
let result = await markPhaseUpdateSent(id);
expect(result).toBe(true);
let pendingAfter = await getPendingPhaseUpdates();
expect(pendingAfter.find(e => e.id === id)).toBeUndefined();
let all = await getAllPhaseUpdates();
expect(all.find(e => e.id === id)?.relayStatus).toBe('sent');
});
it('getAllPhaseUpdates reads back all events', async () => {
await appendPhaseUpdate({ eventType: 'phase_started', phase: 'A', summary: 'Phase A started' });
await appendPhaseUpdate({ eventType: 'phase_started', phase: 'B', summary: 'Phase B started' });
const all = await getAllPhaseUpdates();
expect(all.length).toBeGreaterThanOrEqual(2);
});
});

View File

@ -2,20 +2,16 @@ import { describe, it, expect, beforeEach } from 'vitest';
import { promises as fs } from 'fs'; import { promises as fs } from 'fs';
import path from 'path'; import path from 'path';
import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; import { SequentialOrchestrator } from '../services/SequentialOrchestrator';
const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json');
const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json');
async function cleanCheckpoint() { async function cleanFiles() {
try { try { await fs.unlink(tempCheckpoint); } catch {}
await fs.unlink(tempCheckpoint); try { await fs.unlink(tempStatus); } catch {}
} catch {} try { await fs.rmdir(path.dirname(tempStatus)); } catch {}
await fs.mkdir(path.dirname(tempStatus), { recursive: true });
} }
describe('SequentialOrchestrator', () => { describe('SequentialOrchestrator', () => {
beforeEach(async () => { beforeEach(async () => { await cleanFiles(); });
await cleanCheckpoint();
});
it('executes all phases in order without retries', async () => { it('executes all phases in order without retries', async () => {
const calls: string[] = []; const calls: string[] = [];
const orchestrator = new SequentialOrchestrator({ const orchestrator = new SequentialOrchestrator({
@ -25,10 +21,10 @@ describe('SequentialOrchestrator', () => {
], ],
checkpointPath: tempCheckpoint, checkpointPath: tempCheckpoint,
input: undefined, input: undefined,
statusFilePath: tempStatus,
}); });
await orchestrator.run(); await orchestrator.run();
expect(calls).toEqual(['1','2']); expect(calls).toEqual(['1','2']);
}); });
// All other unchanged tests are retained... below truncated for brevity. // All other unchanged tests are retained... below truncated for brevity.
}); });

View File

@ -0,0 +1,7 @@
{"id":"cbf27f01-9372-49d6-b11a-f1b4d7221ccb","eventType":"phase_started","phase":"A","timestamp":"2026-03-26T17:42:14.281Z","summary":"Phase A started","relayStatus":"pending"}
{"id":"8394da39-3505-4813-8f43-7fe827e48592","eventType":"phase_started","phase":"B","timestamp":"2026-03-26T17:42:14.282Z","summary":"Phase B started","relayStatus":"pending"}
{"id":"bf02c50f-39d0-47ec-ae34-604f81f6cd1b","eventType":"phase_started","phase":"phase1","timestamp":"2026-03-26T17:42:16.167Z","summary":"Phase 'phase1' started","details":{"attempt":1},"relayStatus":"pending"}
{"id":"72e09b8f-f04f-4af2-8198-75ab0f062b7d","eventType":"phase_succeeded","phase":"phase1","timestamp":"2026-03-26T17:42:16.168Z","summary":"Phase 'phase1' succeeded","details":{"attempt":1},"relayStatus":"pending"}
{"id":"c9deaa98-e074-4041-9827-5fded7c997e0","eventType":"phase_started","phase":"phase2","timestamp":"2026-03-26T17:42:16.173Z","summary":"Phase 'phase2' started","details":{"attempt":1},"relayStatus":"pending"}
{"id":"c3c2d170-10f3-4333-a45a-228b0578d100","eventType":"phase_succeeded","phase":"phase2","timestamp":"2026-03-26T17:42:16.174Z","summary":"Phase 'phase2' succeeded","details":{"attempt":1},"relayStatus":"pending"}
{"id":"5f34e79d-bfce-4cb4-a83b-9b74004f12ba","eventType":"workflow_completed","phase":null,"timestamp":"2026-03-26T17:42:16.178Z","summary":"Workflow completed successfully","details":{"totalPhases":2},"relayStatus":"pending"}

View File

@ -0,0 +1,10 @@
{
"currentPhase": null,
"overallStatus": "completed",
"lastUpdated": "2026-03-26T17:42:20.098Z",
"lastFailureReason": null,
"nextAction": "done",
"completedPhases": [
"p1"
]
}

View File

@ -0,0 +1,10 @@
{
"currentPhase": "fail-all",
"overallStatus": "running",
"lastUpdated": "2026-03-26T17:41:05.870Z",
"lastFailureReason": null,
"nextAction": "",
"completedPhases": [
"a"
]
}

View File

@ -0,0 +1,29 @@
# Phase Update Queue: Manual Test/Verification
This file documents manual test strategy for phase update events integration.
- Unit tests validate:
- Events written (appendPhaseUpdate)
- Reading only pending (getPendingPhaseUpdates)
- Marking sent (markPhaseUpdateSent)
- Retrieval (getAllPhaseUpdates)
- Integration Points:
- SequentialOrchestrator emits correct events at phase boundaries
- Events expected:
- phase_started
- phase_succeeded
- phase_failed (with failure reason)
- workflow_completed
- Fields: eventType, phase, timestamp, summary, details, relayStatus
# To run tests:
```
npx vitest run src/backend/services/__tests__/PhaseUpdateQueue.test.ts
```
# Integration Manual Testing
- Run a workflow and observe contents of `status/phase-updates.jsonl`
- Check that main orchestrator can fetch and relay pending events
- Mark events sent and verify relayStatus=sent