diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index b13415ba..d732c019 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -93,13 +93,18 @@ export abstract class FlowProcessor extends AsyncProcessor { continue; } - if (!this.flows.has(name)) { - console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics); - const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications); - await flow.start(); - this.flows.set(name, flow); - console.log(`[${this.config.id}] Flow "${name}" started`); + // Stop existing flow before (re)starting with new config + if (this.flows.has(name)) { + console.log(`[${this.config.id}] Restarting flow "${name}" with updated config`); + await this.flows.get(name)!.stop(); + this.flows.delete(name); } + + console.log(`[${this.config.id}] Starting flow "${name}" with topics:`, defn.topics); + const flow = new Flow(name, this.config.id, this.pubsub, defn, this.specifications); + await flow.start(); + this.flows.set(name, flow); + console.log(`[${this.config.id}] Flow "${name}" started`); } } diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 35d00b22..2c68fb59 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -241,9 +241,10 @@ export class DispatcherManager { * Publish a single message to an arbitrary topic (no request/response). * Used for injecting documents into the processing pipeline. */ - async publishToTopic(topic: string, message: unknown): Promise { + async publishToTopic(topic: string, message: unknown, id?: string): Promise { const producer = await this.pubsub.createProducer({ topic }); - await producer.send(message); + const messageId = id ?? `pub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + await producer.send(message, { id: messageId }); await producer.close(); }