From 580ee319a3b2556c760055ba4189bb17a83bd1f7 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 7 Apr 2026 12:11:11 -0500 Subject: [PATCH] fix: prevent dispatcher race condition via promise-based lazy init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store the initialization Promise in the requestors map synchronously before yielding, so concurrent callers for the same key await the same instance — prevents orphaned RequestResponse objects and duplicate NATS subscriptions. Mirrors upstream fix 8f18ba02. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../flow/src/gateway/dispatch/manager.ts | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 2c68fb59..8300da64 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -67,7 +67,7 @@ function topicName(name: string): string { export class DispatcherManager { private readonly pubsub: PubSubBackend; - private requestors = new Map>(); + private requestors = new Map>>(); constructor(config: GatewayConfig) { this.pubsub = new NatsBackend(config.natsUrl ?? "nats://localhost:4222"); @@ -78,7 +78,8 @@ export class DispatcherManager { } async stop(): Promise { - for (const rr of this.requestors.values()) { + for (const pending of this.requestors.values()) { + const rr = await pending; await rr.stop(); } await this.pubsub.close(); @@ -86,23 +87,26 @@ export class DispatcherManager { // ---------- Internal helpers ---------- - private async getRequestor( + private getRequestor( requestTopic: string, responseTopic: string, key: string, ): Promise> { - let rr = this.requestors.get(key); - if (!rr) { - rr = new RequestResponse({ - pubsub: this.pubsub, - requestTopic, - responseTopic, - subscription: `gateway-${key}`, - }); - await rr.start(); - this.requestors.set(key, rr); + let pending = this.requestors.get(key); + if (!pending) { + pending = (async () => { + const rr = new RequestResponse({ + pubsub: this.pubsub, + requestTopic, + responseTopic, + subscription: `gateway-${key}`, + }); + await rr.start(); + return rr; + })(); + this.requestors.set(key, pending); } - return rr; + return pending; } private resolveGlobalTopics(