Make gateway dispatcher requestors Effect-scoped

This commit is contained in:
elpresidank 2026-06-01 22:17:50 -05:00
parent a0d2575273
commit 64fb23e7d0
6 changed files with 395 additions and 66 deletions

View file

@ -0,0 +1,210 @@
import { describe, expect, it } from "vitest";
import {
dispatcherManagerIsCompleteResponse,
makeDispatcherManager,
} from "../gateway/dispatch/manager.js";
import type {
BackendConsumer,
BackendProducer,
CreateConsumerOptions,
CreateProducerOptions,
Message,
PubSubBackend,
} from "@trustgraph/base";
function createMessage<T>(value: T, properties: Record<string, string> = {}): Message<T> {
return {
value: () => value,
properties: () => properties,
};
}
class TopicConsumer<T> implements BackendConsumer<T> {
readonly acknowledged: Array<Message<T>> = [];
readonly nacked: Array<Message<T>> = [];
closeCount = 0;
private readonly messages: Array<Message<T>> = [];
private readonly waiters: Array<(message: Message<T> | null) => void> = [];
private closed = false;
push(message: Message<T>): void {
const waiter = this.waiters.shift();
if (waiter !== undefined) {
waiter(message);
return;
}
this.messages.push(message);
}
async receive(): Promise<Message<T> | null> {
const message = this.messages.shift();
if (message !== undefined || this.closed) return message ?? null;
return await new Promise((resolve) => {
this.waiters.push(resolve);
});
}
async acknowledge(message: Message<T>): Promise<void> {
this.acknowledged.push(message);
}
async negativeAcknowledge(message: Message<T>): Promise<void> {
this.nacked.push(message);
}
async unsubscribe(): Promise<void> {}
async close(): Promise<void> {
this.closed = true;
for (const waiter of this.waiters.splice(0)) {
waiter(null);
}
this.closeCount += 1;
}
}
class RecordingProducer<T> implements BackendProducer<T> {
readonly sent: Array<{ readonly message: T; readonly properties?: Record<string, string> }> = [];
closeCount = 0;
flushCount = 0;
constructor(
private readonly topic: string,
private readonly onSend: (topic: string, message: T, properties?: Record<string, string>) => void,
) {}
async send(message: T, properties?: Record<string, string>): Promise<void> {
this.sent.push(properties === undefined ? { message } : { message, properties });
this.onSend(this.topic, message, properties);
}
async flush(): Promise<void> {
this.flushCount += 1;
}
async close(): Promise<void> {
this.closeCount += 1;
}
}
class DispatchBackend implements PubSubBackend {
closeCount = 0;
readonly producerOptions: CreateProducerOptions[] = [];
readonly consumerOptions: CreateConsumerOptions[] = [];
readonly producersByTopic = new Map<string, RecordingProducer<unknown>>();
readonly consumersByTopic = new Map<string, TopicConsumer<unknown>>();
async createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>> {
this.producerOptions.push(options);
let producer = this.producersByTopic.get(options.topic);
if (producer === undefined) {
producer = new RecordingProducer<unknown>(options.topic, (topic, message, properties) => {
this.handleSend(topic, message, properties);
});
this.producersByTopic.set(options.topic, producer);
}
return producer as BackendProducer<T>;
}
async createConsumer<T>(options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
this.consumerOptions.push(options);
let consumer = this.consumersByTopic.get(options.topic);
if (consumer === undefined) {
consumer = new TopicConsumer<unknown>();
this.consumersByTopic.set(options.topic, consumer);
}
return consumer as BackendConsumer<T>;
}
async close(): Promise<void> {
this.closeCount += 1;
}
private handleSend(topic: string, message: unknown, properties?: Record<string, string>): void {
const id = properties?.id ?? "";
if (topic === "tg.flow.config-request") {
this.push("tg.flow.config-response", { ok: true, echo: message }, id);
return;
}
if (topic === "tg.flow.knowledge-request") {
this.push("tg.flow.knowledge-response", { chunk: 1 }, id);
this.push("tg.flow.knowledge-response", { chunk: 2, endOfStream: true }, id);
return;
}
if (topic === "tg.flow.prompt-request") {
this.push("tg.flow.prompt-response", { prompt: message }, id);
}
}
private push(topic: string, response: unknown, id: string): void {
const consumer = this.consumersByTopic.get(topic);
consumer?.push(createMessage(response, { id }));
}
}
describe("gateway dispatcher manager", () => {
it("caches Effect requestors as scoped handles", async () => {
const backend = new DispatchBackend();
const manager = makeDispatcherManager({
port: 0,
metricsPort: 0,
pubsub: backend,
});
await manager.start();
const first = await manager.dispatchGlobalService("config", { operation: "get" });
const second = await manager.dispatchGlobalService("config", { operation: "list" });
await manager.stop();
expect(first).toEqual({ ok: true, echo: { operation: "get" } });
expect(second).toEqual({ ok: true, echo: { operation: "list" } });
expect(backend.producerOptions.filter((options) => options.topic === "tg.flow.config-request")).toHaveLength(1);
expect(backend.consumerOptions.filter((options) => options.topic === "tg.flow.config-response")).toHaveLength(1);
expect(backend.producersByTopic.get("tg.flow.config-request")?.closeCount).toBe(1);
expect(backend.consumersByTopic.get("tg.flow.config-response")?.closeCount).toBe(1);
expect(backend.closeCount).toBe(1);
});
it("streams responses until the centralized completion predicate is true", async () => {
const backend = new DispatchBackend();
const manager = makeDispatcherManager({
port: 0,
metricsPort: 0,
pubsub: backend,
});
const chunks: Array<{ readonly response: unknown; readonly complete: boolean }> = [];
await manager.dispatchGlobalServiceStreaming("knowledge", { query: "hello" }, async (response, complete) => {
chunks.push({ response, complete });
});
await manager.stop();
expect(chunks).toEqual([
{ response: { chunk: 1 }, complete: false },
{ response: { chunk: 2, endOfStream: true }, complete: true },
]);
});
it.each([
[{ complete: true }],
[{ endOfStream: true }],
[{ endOfSession: true }],
[{ end_of_stream: true }],
[{ end_of_session: true }],
[{ end_of_dialog: true }],
[{ eos: true }],
[{ error: { message: "failed" } }],
["plain"],
[null],
])("treats %j as a complete streaming response", (response) => {
expect(dispatcherManagerIsCompleteResponse(response)).toBe(true);
});
it("does not mark ordinary response objects complete", () => {
expect(dispatcherManagerIsCompleteResponse({ chunk: 1 })).toBe(false);
});
});

View file

@ -8,7 +8,18 @@
* Python reference: trustgraph-flow/trustgraph/gateway/dispatch/manager.py
*/
import { makeNatsBackend, makeRequestResponse, type PubSubBackend, type RequestResponse } from "@trustgraph/base";
import { Effect, Exit, Scope, SynchronizedRef } from "effect";
import {
loadMessagingRuntimeConfig,
makeNatsBackend,
makePubSubService,
makeRequestResponseFactoryService,
messagingDeliveryError,
messagingLifecycleError,
type EffectRequestResponse,
type PubSubBackend,
type RequestResponseFactoryService,
} from "@trustgraph/base";
import type { GatewayConfig } from "../server.js";
import { translateRequest, translateResponse } from "./serialize.js";
@ -107,44 +118,106 @@ export const dispatcherManagerGlobalServiceNames = (): readonly string[] => [
export const dispatcherManagerIsStreamingService = (kind: string): boolean =>
STREAMING_SERVICES.has(kind);
export const dispatcherManagerIsCompleteResponse = (response: unknown): boolean => {
if (typeof response !== "object" || response === null) return true;
const res = response as Record<string, unknown>;
return (
res.complete === true ||
res.endOfStream === true ||
res.endOfSession === true ||
res.end_of_stream === true ||
res.end_of_session === true ||
res.end_of_dialog === true ||
res.eos === true ||
// error responses are always final
(res.error !== undefined && res.error !== null)
);
};
type RequestorMap = Map<string, EffectRequestResponse<unknown, unknown>>;
interface DispatcherRuntime {
readonly scope: Scope.Closeable;
readonly requestors: SynchronizedRef.SynchronizedRef<RequestorMap>;
readonly factory: RequestResponseFactoryService;
}
export function makeDispatcherManager(config: GatewayConfig): DispatcherManager {
const pubsub: PubSubBackend = makeNatsBackend(config.natsUrl ?? "nats://localhost:4222");
const requestors = new Map<string, Promise<RequestResponse<unknown, unknown>>>();
const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222");
let runtime: DispatcherRuntime | null = null;
const start = async (): Promise<void> => {
// Requestors are created on demand when first accessed
if (runtime !== null) return;
runtime = await Effect.runPromise(
Effect.gen(function* () {
const scope = yield* Scope.make();
return yield* Effect.gen(function* () {
const messagingConfig = yield* loadMessagingRuntimeConfig();
const requestors = yield* SynchronizedRef.make<RequestorMap>(new Map());
return {
scope,
requestors,
factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig),
} satisfies DispatcherRuntime;
}).pipe(
Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))),
);
}),
);
};
const stop = async (): Promise<void> => {
for (const pending of requestors.values()) {
const rr = await pending;
await rr.stop();
const current = runtime;
runtime = null;
if (current !== null) {
await Effect.runPromise(Scope.close(current.scope, Exit.void));
}
await pubsub.close();
};
// ---------- Internal helpers ----------
const getRequestor = (
const ensureRuntime = async (): Promise<DispatcherRuntime> => {
if (runtime === null) {
await start();
}
if (runtime === null) {
throw messagingLifecycleError("gateway-dispatcher", "start", "Dispatcher manager failed to start");
}
return runtime;
};
const getRequestor = async (
requestTopic: string,
responseTopic: string,
key: string,
): Promise<RequestResponse<unknown, unknown>> => {
let pending = requestors.get(key);
if (pending === undefined) {
pending = (async () => {
const rr = makeRequestResponse({
pubsub,
): Promise<EffectRequestResponse<unknown, unknown>> => {
const current = await ensureRuntime();
return await Effect.runPromise(
SynchronizedRef.modifyEffect(current.requestors, (requestors) => {
const cached = requestors.get(key);
if (cached !== undefined) {
return Effect.succeed([cached, requestors] as const);
}
return current.factory.make<unknown, unknown>({
requestTopic,
responseTopic,
subscription: `gateway-${key}`,
});
await rr.start();
return rr;
})();
requestors.set(key, pending);
}
return pending;
}).pipe(
Scope.provide(current.scope),
Effect.map((requestor) => {
const next = new Map(requestors);
next.set(key, requestor);
return [requestor, next] as const;
}),
);
}),
);
};
const resolveGlobalTopics = (
@ -181,26 +254,6 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
};
};
/**
* Determine whether a response is the final one in a streaming sequence.
* Checks for various end-of-stream markers used by different services.
*/
const isComplete = (response: unknown): boolean => {
if (typeof response !== "object" || response === null) return true;
const res = response as Record<string, unknown>;
return (
res.complete === true ||
res.endOfStream === true ||
res.endOfSession === true ||
res.end_of_stream === true ||
res.end_of_session === true ||
res.end_of_dialog === true ||
res.eos === true ||
// error responses are always final
(res.error !== undefined && res.error !== null)
);
};
// ---------- Global service dispatch ----------
const dispatchGlobalService = async (
@ -211,7 +264,7 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
const rr = await getRequestor(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
const response = await rr.request(translated);
const response = await Effect.runPromise(rr.request(translated));
return translateResponse(kind, response);
};
@ -224,14 +277,21 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
const rr = await getRequestor(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
await rr.request(translated, {
recipient: async (response) => {
const translatedRes = translateResponse(kind, response);
const complete = isComplete(translatedRes);
await responder(translatedRes, complete);
return complete;
},
});
await Effect.runPromise(
rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return Effect.tryPromise({
try: async () => {
await responder(translatedRes, complete);
return complete;
},
catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error),
});
},
}),
);
};
// ---------- Flow-scoped service dispatch ----------
@ -249,7 +309,7 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
);
const translated = translateRequest(kind, request);
const response = await rr.request(translated);
const response = await Effect.runPromise(rr.request(translated));
return translateResponse(kind, response);
};
@ -267,14 +327,21 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
);
const translated = translateRequest(kind, request);
await rr.request(translated, {
recipient: async (response) => {
const translatedRes = translateResponse(kind, response);
const complete = isComplete(translatedRes);
await responder(translatedRes, complete);
return complete;
},
});
await Effect.runPromise(
rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return Effect.tryPromise({
try: async () => {
await responder(translatedRes, complete);
return complete;
},
catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error),
});
},
}),
);
};
// ---------- Fire-and-forget publish ----------

View file

@ -2,6 +2,7 @@ export { createGateway, run, type GatewayConfig } from "./server.js";
export {
dispatcherManagerFlowServiceNames,
dispatcherManagerGlobalServiceNames,
dispatcherManagerIsCompleteResponse,
dispatcherManagerIsStreamingService,
makeDispatcherManager,
type DispatcherManager,

View file

@ -13,7 +13,7 @@ import { Config, Effect, Exit, Scope } from "effect";
import * as O from "effect/Option";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as EffectSocket from "effect/unstable/socket/Socket";
import { optionalStringConfig, registry, toTgError } from "@trustgraph/base";
import { optionalStringConfig, registry, toTgError, type PubSubBackend } from "@trustgraph/base";
import { makeDispatcherManager } from "./dispatch/manager.js";
import { makeGatewayRpcServer } from "./rpc-server.js";
@ -22,6 +22,7 @@ export interface GatewayConfig {
metricsPort: number;
secret?: string;
natsUrl?: string;
pubsub?: PubSubBackend;
}
export async function createGateway(config: GatewayConfig) {