From 87ee00dcb580bd5a33b3da01b9f4b1e7b173e13b Mon Sep 17 00:00:00 2001 From: Paul Huliganga Date: Thu, 26 Mar 2026 13:42:26 -0400 Subject: [PATCH] Task 4: Add local phase update queue (JSONL) with event schema, helper functions, orchestrator emission, and tests. Integrates phase_started, phase_succeeded, phase_failed, workflow_completed events; helper functions for reading pending and marking sent. All tests passing. See tests/phase-updates-queue-README.md for usage. --- src/backend/services/PhaseUpdateQueue.ts | 86 +++++++++++++++++++ .../services/SequentialOrchestrator.ts | 34 +++++++- .../__tests__/PhaseUpdateQueue.test.ts | 50 +++++++++++ src/backend/tests/orchestrator.test.ts | 20 ++--- status/phase-updates.jsonl | 7 ++ status/test-workflow-status.json | 10 +++ status/workflow-status.json | 10 +++ tests/phase-updates-queue-README.md | 29 +++++++ 8 files changed, 232 insertions(+), 14 deletions(-) create mode 100644 src/backend/services/PhaseUpdateQueue.ts create mode 100644 src/backend/services/__tests__/PhaseUpdateQueue.test.ts create mode 100644 status/phase-updates.jsonl create mode 100644 status/test-workflow-status.json create mode 100644 status/workflow-status.json create mode 100644 tests/phase-updates-queue-README.md diff --git a/src/backend/services/PhaseUpdateQueue.ts b/src/backend/services/PhaseUpdateQueue.ts new file mode 100644 index 0000000..128c3da --- /dev/null +++ b/src/backend/services/PhaseUpdateQueue.ts @@ -0,0 +1,86 @@ +import { promises as fs } from 'fs'; +import path from 'path'; +import crypto from 'crypto'; + +export type PhaseUpdateEventType = 'phase_started' | 'phase_succeeded' | 'phase_failed' | 'workflow_completed' | 'workflow_blocked'; + +export type PhaseUpdateEvent = { + id: string; + eventType: PhaseUpdateEventType; + phase: string | null; + timestamp: string; + summary: string; + details?: string | object; + relayStatus: 'pending' | 'sent'; +}; + +const PHASE_UPDATES_QUEUE = path.join(process.cwd(), 'status/phase-updates.jsonl'); + +function ensureArrayString(details?: string | object): string { + if (typeof details === 'undefined') return ''; + if (typeof details === 'string') return details; + try { return JSON.stringify(details); } catch { return String(details); } +} + +export async function appendPhaseUpdate(event: + Omit & { + details?: string | object; + relayStatus?: 'pending' | 'sent'; + timestamp?: string; + } +): Promise { + const finalized: PhaseUpdateEvent = { + id: crypto.randomUUID(), + eventType: event.eventType, + phase: event.phase ?? null, + timestamp: event.timestamp || new Date().toISOString(), + summary: event.summary, + details: event.details, + relayStatus: event.relayStatus || 'pending', + }; + await fs.mkdir(path.dirname(PHASE_UPDATES_QUEUE), { recursive: true }); + await fs.appendFile(PHASE_UPDATES_QUEUE, JSON.stringify(finalized) + '\n', 'utf8'); + return finalized; +} + +export async function getPendingPhaseUpdates(): Promise { + try { + const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8'); + return data.trim().split(/\r?\n/).filter(Boolean) + .map(l => JSON.parse(l)) + .filter((e: PhaseUpdateEvent) => e.relayStatus === 'pending'); + } catch { + return []; + } +} + +export async function markPhaseUpdateSent(id: string): Promise { + try { + const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8'); + const lines = data.trim().split(/\r?\n/).filter(Boolean); + let found = false; + const updatedLines = lines.map(l => { + const evt = JSON.parse(l) as PhaseUpdateEvent; + if (evt.id === id && evt.relayStatus === 'pending') { + found = true; + evt.relayStatus = 'sent'; + } + return JSON.stringify(evt); + }); + if (found) { + await fs.writeFile(PHASE_UPDATES_QUEUE, updatedLines.join('\n') + '\n', 'utf8'); + } + return found; + } catch { + return false; + } +} + +export async function getAllPhaseUpdates(): Promise { + try { + const data = await fs.readFile(PHASE_UPDATES_QUEUE, 'utf8'); + return data.trim().split(/\r?\n/).filter(Boolean).map(l => JSON.parse(l)); + } catch { + return []; + } +} diff --git a/src/backend/services/SequentialOrchestrator.ts b/src/backend/services/SequentialOrchestrator.ts index 4cdc841..bb20499 100644 --- a/src/backend/services/SequentialOrchestrator.ts +++ b/src/backend/services/SequentialOrchestrator.ts @@ -7,6 +7,10 @@ import { promises as fs } from 'fs'; import path from 'path'; import { logPhaseProgress } from './PhaseProgressLogger'; import { WorkflowStatusManager, WorkflowStatus } from './WorkflowStatusManager'; +import { + appendPhaseUpdate, + PhaseUpdateEventType +} from './PhaseUpdateQueue'; export type OrchestratorPhase = { name: string; @@ -73,11 +77,9 @@ export class SequentialOrchestrator { ? this.phases[extra.currentPhaseIdx].name : null; let overallStatus = state.overallStatus || 'idle'; - // Patch: use 'failed' if we're not at end and a failure just occurred if (extra.phaseResult && extra.phaseResult.status === 'failure') { overallStatus = 'failed'; } - // Patch: if finished and all phases succeeded if (checkpoint && checkpoint.currentPhase === this.phases.length) { overallStatus = 'completed'; } @@ -126,6 +128,13 @@ export class SequentialOrchestrator { let error: any = null; let attempt = 1; const maxRetries = (typeof phase.retry === 'number') ? phase.retry : 1; + // --- Phase started event (queue) + await appendPhaseUpdate({ + eventType: 'phase_started', + phase: phase.name, + summary: `Phase '${phase.name}' started`, + details: { attempt }, + }); for (; attempt <= maxRetries && !success && attemptsLeft > 0; attempt++, attemptsLeft--) { try { await phase.run(this.input); @@ -138,6 +147,13 @@ export class SequentialOrchestrator { nextAction: phase.nextAction || '', }; checkpoint.phaseResults.push(res); + // --- Phase succeeded event + await appendPhaseUpdate({ + eventType: 'phase_succeeded', + phase: phase.name, + summary: `Phase '${phase.name}' succeeded`, + details: { attempt }, + }); success = true; await this.saveCheckpoint({ ...checkpoint, currentPhase: checkpoint.currentPhase + 1 }); await this.writeStatus({ overallStatus: 'running', nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase + 1, phaseResult: res }); @@ -156,6 +172,13 @@ export class SequentialOrchestrator { nextAction: phase.nextAction || '', }; checkpoint.phaseResults.push(res); + // --- Phase failed event + await appendPhaseUpdate({ + eventType: 'phase_failed', + phase: phase.name, + summary: `Phase '${phase.name}' failed`, + details: { attempt, error: err && err.message ? err.message : String(err) }, + }); await this.saveCheckpoint(checkpoint); await this.writeStatus({ overallStatus: 'failed', lastFailureReason: res.failureReason, nextAction: phase.nextAction || '' }, { currentPhaseIdx: checkpoint.currentPhase, phaseResult: res }); // Do not advance phase pointer @@ -166,6 +189,13 @@ export class SequentialOrchestrator { } // Completed all phases if (checkpoint.currentPhase === this.phases.length) { + // --- Workflow completed event + await appendPhaseUpdate({ + eventType: 'workflow_completed', + phase: null, + summary: `Workflow completed successfully`, + details: { totalPhases: this.phases.length }, + }); await this.writeStatus({ overallStatus: 'completed', nextAction: 'done' }, { currentPhaseIdx: null }); } } diff --git a/src/backend/services/__tests__/PhaseUpdateQueue.test.ts b/src/backend/services/__tests__/PhaseUpdateQueue.test.ts new file mode 100644 index 0000000..906e2b7 --- /dev/null +++ b/src/backend/services/__tests__/PhaseUpdateQueue.test.ts @@ -0,0 +1,50 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { promises as fs } from 'fs'; +import path from 'path'; +import { + appendPhaseUpdate, + getPendingPhaseUpdates, + markPhaseUpdateSent, + getAllPhaseUpdates, + PhaseUpdateEvent +} from '../PhaseUpdateQueue'; + +const TEST_QUEUE_FILE = path.join(process.cwd(), 'status/phase-updates.jsonl'); + +describe('PhaseUpdateQueue', () => { + beforeEach(async () => { + try { + await fs.unlink(TEST_QUEUE_FILE); + } catch {} + }); + + it('appends events and retrieves pending only', async () => { + let ev1 = await appendPhaseUpdate({ eventType: 'phase_started', phase: 'import', summary: 'Import started' }); + let ev2 = await appendPhaseUpdate({ eventType: 'phase_failed', phase: 'import', summary: 'Import failed', details: 'Network error' }); + expect(ev1.id).toBeDefined(); + expect(ev2.id).toBeDefined(); + + const pending = await getPendingPhaseUpdates(); + expect(pending.length).toBe(2); + expect(pending.map(e => e.id)).toContain(ev1.id); + expect(pending.some(e => e.eventType === 'phase_failed')).toBe(true); + }); + + it('markPhaseUpdateSent updates relayStatus', async () => { + let ev = await appendPhaseUpdate({ eventType: 'phase_succeeded', phase: 'parse', summary: 'Parsed recipe' }); + let id = ev.id; + let result = await markPhaseUpdateSent(id); + expect(result).toBe(true); + let pendingAfter = await getPendingPhaseUpdates(); + expect(pendingAfter.find(e => e.id === id)).toBeUndefined(); + let all = await getAllPhaseUpdates(); + expect(all.find(e => e.id === id)?.relayStatus).toBe('sent'); + }); + + it('getAllPhaseUpdates reads back all events', async () => { + await appendPhaseUpdate({ eventType: 'phase_started', phase: 'A', summary: 'Phase A started' }); + await appendPhaseUpdate({ eventType: 'phase_started', phase: 'B', summary: 'Phase B started' }); + const all = await getAllPhaseUpdates(); + expect(all.length).toBeGreaterThanOrEqual(2); + }); +}); diff --git a/src/backend/tests/orchestrator.test.ts b/src/backend/tests/orchestrator.test.ts index 8922c88..c3fa063 100644 --- a/src/backend/tests/orchestrator.test.ts +++ b/src/backend/tests/orchestrator.test.ts @@ -2,20 +2,16 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { promises as fs } from 'fs'; import path from 'path'; import { SequentialOrchestrator } from '../services/SequentialOrchestrator'; - const tempCheckpoint = path.join(process.cwd(), 'data/test-orch-checkpoint.json'); - -async function cleanCheckpoint() { - try { - await fs.unlink(tempCheckpoint); - } catch {} +const tempStatus = path.join(process.cwd(), 'status/test-workflow-status.json'); +async function cleanFiles() { + try { await fs.unlink(tempCheckpoint); } catch {} + try { await fs.unlink(tempStatus); } catch {} + try { await fs.rmdir(path.dirname(tempStatus)); } catch {} + await fs.mkdir(path.dirname(tempStatus), { recursive: true }); } - describe('SequentialOrchestrator', () => { - beforeEach(async () => { - await cleanCheckpoint(); - }); - + beforeEach(async () => { await cleanFiles(); }); it('executes all phases in order without retries', async () => { const calls: string[] = []; const orchestrator = new SequentialOrchestrator({ @@ -25,10 +21,10 @@ describe('SequentialOrchestrator', () => { ], checkpointPath: tempCheckpoint, input: undefined, + statusFilePath: tempStatus, }); await orchestrator.run(); expect(calls).toEqual(['1','2']); }); - // All other unchanged tests are retained... below truncated for brevity. }); diff --git a/status/phase-updates.jsonl b/status/phase-updates.jsonl new file mode 100644 index 0000000..0cae43a --- /dev/null +++ b/status/phase-updates.jsonl @@ -0,0 +1,7 @@ +{"id":"cbf27f01-9372-49d6-b11a-f1b4d7221ccb","eventType":"phase_started","phase":"A","timestamp":"2026-03-26T17:42:14.281Z","summary":"Phase A started","relayStatus":"pending"} +{"id":"8394da39-3505-4813-8f43-7fe827e48592","eventType":"phase_started","phase":"B","timestamp":"2026-03-26T17:42:14.282Z","summary":"Phase B started","relayStatus":"pending"} +{"id":"bf02c50f-39d0-47ec-ae34-604f81f6cd1b","eventType":"phase_started","phase":"phase1","timestamp":"2026-03-26T17:42:16.167Z","summary":"Phase 'phase1' started","details":{"attempt":1},"relayStatus":"pending"} +{"id":"72e09b8f-f04f-4af2-8198-75ab0f062b7d","eventType":"phase_succeeded","phase":"phase1","timestamp":"2026-03-26T17:42:16.168Z","summary":"Phase 'phase1' succeeded","details":{"attempt":1},"relayStatus":"pending"} +{"id":"c9deaa98-e074-4041-9827-5fded7c997e0","eventType":"phase_started","phase":"phase2","timestamp":"2026-03-26T17:42:16.173Z","summary":"Phase 'phase2' started","details":{"attempt":1},"relayStatus":"pending"} +{"id":"c3c2d170-10f3-4333-a45a-228b0578d100","eventType":"phase_succeeded","phase":"phase2","timestamp":"2026-03-26T17:42:16.174Z","summary":"Phase 'phase2' succeeded","details":{"attempt":1},"relayStatus":"pending"} +{"id":"5f34e79d-bfce-4cb4-a83b-9b74004f12ba","eventType":"workflow_completed","phase":null,"timestamp":"2026-03-26T17:42:16.178Z","summary":"Workflow completed successfully","details":{"totalPhases":2},"relayStatus":"pending"} diff --git a/status/test-workflow-status.json b/status/test-workflow-status.json new file mode 100644 index 0000000..862f3f3 --- /dev/null +++ b/status/test-workflow-status.json @@ -0,0 +1,10 @@ +{ + "currentPhase": null, + "overallStatus": "completed", + "lastUpdated": "2026-03-26T17:42:20.098Z", + "lastFailureReason": null, + "nextAction": "done", + "completedPhases": [ + "p1" + ] +} \ No newline at end of file diff --git a/status/workflow-status.json b/status/workflow-status.json new file mode 100644 index 0000000..ec8dcfc --- /dev/null +++ b/status/workflow-status.json @@ -0,0 +1,10 @@ +{ + "currentPhase": "fail-all", + "overallStatus": "running", + "lastUpdated": "2026-03-26T17:41:05.870Z", + "lastFailureReason": null, + "nextAction": "", + "completedPhases": [ + "a" + ] +} \ No newline at end of file diff --git a/tests/phase-updates-queue-README.md b/tests/phase-updates-queue-README.md new file mode 100644 index 0000000..8271404 --- /dev/null +++ b/tests/phase-updates-queue-README.md @@ -0,0 +1,29 @@ +# Phase Update Queue: Manual Test/Verification + +This file documents manual test strategy for phase update events integration. + +- Unit tests validate: + - Events written (appendPhaseUpdate) + - Reading only pending (getPendingPhaseUpdates) + - Marking sent (markPhaseUpdateSent) + - Retrieval (getAllPhaseUpdates) + +- Integration Points: + - SequentialOrchestrator emits correct events at phase boundaries + - Events expected: + - phase_started + - phase_succeeded + - phase_failed (with failure reason) + - workflow_completed + - Fields: eventType, phase, timestamp, summary, details, relayStatus + +# To run tests: + +``` +npx vitest run src/backend/services/__tests__/PhaseUpdateQueue.test.ts +``` + +# Integration Manual Testing +- Run a workflow and observe contents of `status/phase-updates.jsonl` +- Check that main orchestrator can fetch and relay pending events +- Mark events sent and verify relayStatus=sent