mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-01 09:29:38 +02:00
fix: prevent dispatcher race condition via promise-based lazy init
Store the initialization Promise in the requestors map synchronously
before yielding, so concurrent callers for the same key await the same
instance — prevents orphaned RequestResponse objects and duplicate NATS
subscriptions. Mirrors upstream fix 8f18ba02.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2a2e8e76a3
commit
580ee319a3
1 changed files with 18 additions and 14 deletions
|
|
@ -67,7 +67,7 @@ function topicName(name: string): string {
|
|||
|
||||
export class DispatcherManager {
|
||||
private readonly pubsub: PubSubBackend;
|
||||
private requestors = new Map<string, RequestResponse<unknown, unknown>>();
|
||||
private requestors = new Map<string, Promise<RequestResponse<unknown, unknown>>>();
|
||||
|
||||
constructor(config: GatewayConfig) {
|
||||
this.pubsub = new NatsBackend(config.natsUrl ?? "nats://localhost:4222");
|
||||
|
|
@ -78,7 +78,8 @@ export class DispatcherManager {
|
|||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
for (const rr of this.requestors.values()) {
|
||||
for (const pending of this.requestors.values()) {
|
||||
const rr = await pending;
|
||||
await rr.stop();
|
||||
}
|
||||
await this.pubsub.close();
|
||||
|
|
@ -86,23 +87,26 @@ export class DispatcherManager {
|
|||
|
||||
// ---------- Internal helpers ----------
|
||||
|
||||
private async getRequestor(
|
||||
private getRequestor(
|
||||
requestTopic: string,
|
||||
responseTopic: string,
|
||||
key: string,
|
||||
): Promise<RequestResponse<unknown, unknown>> {
|
||||
let rr = this.requestors.get(key);
|
||||
if (!rr) {
|
||||
rr = new RequestResponse({
|
||||
pubsub: this.pubsub,
|
||||
requestTopic,
|
||||
responseTopic,
|
||||
subscription: `gateway-${key}`,
|
||||
});
|
||||
await rr.start();
|
||||
this.requestors.set(key, rr);
|
||||
let pending = this.requestors.get(key);
|
||||
if (!pending) {
|
||||
pending = (async () => {
|
||||
const rr = new RequestResponse({
|
||||
pubsub: this.pubsub,
|
||||
requestTopic,
|
||||
responseTopic,
|
||||
subscription: `gateway-${key}`,
|
||||
});
|
||||
await rr.start();
|
||||
return rr;
|
||||
})();
|
||||
this.requestors.set(key, pending);
|
||||
}
|
||||
return rr;
|
||||
return pending;
|
||||
}
|
||||
|
||||
private resolveGlobalTopics(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue