fix: resolve FlowProcessor topic collisions, librarian timeout, tests

Two bugs found during end-to-end testing:

1. FlowProcessor never restarted flows when config changed — it only
   started them once. Stale NATS JetStream data from previous sessions
   caused services to bind to wrong topics. Fix: stop and restart flows
   on every config push that includes flow definitions.

2. Gateway publishToTopic sent messages without an id property. Pipeline
   FlowProcessor handlers check properties.id and silently return if
   missing. Fix: auto-generate a message id when publishing to topics.

Both fixes validated: 13/13 integration tests passing, PDF decoder
correctly receives and processes document messages through the pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
elpresidank 2026-04-07 01:53:55 -05:00
parent c545213224
commit 5bc7a1b6fc
2 changed files with 14 additions and 8 deletions

View file

@ -93,13 +93,18 @@ export abstract class FlowProcessor extends AsyncProcessor {
continue;
}
if (!this.flows.has(name)) {
console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics);
const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications);
await flow.start();
this.flows.set(name, flow);
console.log(`[${this.config.id}] Flow "${name}" started`);
// Stop existing flow before (re)starting with new config
if (this.flows.has(name)) {
console.log(`[${this.config.id}] Restarting flow "${name}" with updated config`);
await this.flows.get(name)!.stop();
this.flows.delete(name);
}
console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics);
const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications);
await flow.start();
this.flows.set(name, flow);
console.log(`[${this.config.id}] Flow "${name}" started`);
}
}