diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 2c68fb59..8300da64 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -67,7 +67,7 @@ function topicName(name: string): string { export class DispatcherManager { private readonly pubsub: PubSubBackend; - private requestors = new Map>(); + private requestors = new Map>>(); constructor(config: GatewayConfig) { this.pubsub = new NatsBackend(config.natsUrl ?? "nats://localhost:4222"); @@ -78,7 +78,8 @@ export class DispatcherManager { } async stop(): Promise { - for (const rr of this.requestors.values()) { + for (const pending of this.requestors.values()) { + const rr = await pending; await rr.stop(); } await this.pubsub.close(); @@ -86,23 +87,26 @@ export class DispatcherManager { // ---------- Internal helpers ---------- - private async getRequestor( + private getRequestor( requestTopic: string, responseTopic: string, key: string, ): Promise> { - let rr = this.requestors.get(key); - if (!rr) { - rr = new RequestResponse({ - pubsub: this.pubsub, - requestTopic, - responseTopic, - subscription: `gateway-${key}`, - }); - await rr.start(); - this.requestors.set(key, rr); + let pending = this.requestors.get(key); + if (!pending) { + pending = (async () => { + const rr = new RequestResponse({ + pubsub: this.pubsub, + requestTopic, + responseTopic, + subscription: `gateway-${key}`, + }); + await rr.start(); + return rr; + })(); + this.requestors.set(key, pending); } - return rr; + return pending; } private resolveGlobalTopics(