From 5bc7a1b6fc436da509688607c454084949b087e5 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 7 Apr 2026 01:53:55 -0500 Subject: [PATCH] fix: resolve FlowProcessor topic collisions, librarian timeout, tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs found during end-to-end testing: 1. FlowProcessor never restarted flows when config changed — it only started them once. Stale NATS JetStream data from previous sessions caused services to bind to wrong topics. Fix: stop and restart flows on every config push that includes flow definitions. 2. Gateway publishToTopic sent messages without an id property. Pipeline FlowProcessor handlers check properties.id and silently return if missing. Fix: auto-generate a message id when publishing to topics. Both fixes validated: 13/13 integration tests passing, PDF decoder correctly receives and processes document messages through the pipeline. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../base/src/processor/flow-processor.ts | 17 +++++++++++------ .../flow/src/gateway/dispatch/manager.ts | 5 +++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index b13415ba..d732c019 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -93,13 +93,18 @@ export abstract class FlowProcessor extends AsyncProcessor { continue; } - if (!this.flows.has(name)) { - console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics); - const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications); - await flow.start(); - this.flows.set(name, flow); - console.log(`[${this.config.id}] Flow "${name}" started`); + // Stop existing flow before (re)starting with new config + if (this.flows.has(name)) { + console.log(`[${this.config.id}] Restarting flow "${name}" with updated config`); + await this.flows.get(name)!.stop(); + this.flows.delete(name); } + + console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics); + const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications); + await flow.start(); + this.flows.set(name, flow); + console.log(`[${this.config.id}] Flow "${name}" started`); } } diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 35d00b22..2c68fb59 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -241,9 +241,10 @@ export class DispatcherManager { * Publish a single message to an arbitrary topic (no request/response). * Used for injecting documents into the processing pipeline. */ - async publishToTopic(topic: string, message: unknown): Promise { + async publishToTopic(topic: string, message: unknown, id?: string): Promise { const producer = await this.pubsub.createProducer({ topic }); - await producer.send(message); + const messageId = id ?? `pub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + await producer.send(message, { id: messageId }); await producer.close(); }