Harden gateway dispatcher effects

This commit is contained in:
elpresidank 2026-06-02 05:14:58 -05:00
parent fe4f5777c9
commit 89ef3dbbbf
4 changed files with 164 additions and 62 deletions

View file

@ -96,6 +96,7 @@ class DispatchBackend implements PubSubBackend {
readonly consumerOptions: CreateConsumerOptions[] = [];
readonly producersByTopic = new Map<string, RecordingProducer<unknown>>();
readonly consumersByTopic = new Map<string, TopicConsumer<unknown>>();
readonly failSendTopics = new Set<string>();
async createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>> {
this.producerOptions.push(options);
@ -124,6 +125,10 @@ class DispatchBackend implements PubSubBackend {
}
private handleSend(topic: string, message: unknown, properties?: Record<string, string>): void {
if (this.failSendTopics.has(topic)) {
throw "send failed";
}
const id = properties?.id ?? "";
if (topic === "tg.flow.config-request") {
this.push("tg.flow.config-response", { ok: true, echo: message }, id);
@ -167,7 +172,49 @@ describe("gateway dispatcher manager", () => {
expect(backend.consumerOptions.filter((options) => options.topic === "tg.flow.config-response")).toHaveLength(1);
expect(backend.producersByTopic.get("tg.flow.config-request")?.closeCount).toBe(1);
expect(backend.consumersByTopic.get("tg.flow.config-response")?.closeCount).toBe(1);
expect(backend.closeCount).toBe(1);
expect(backend.closeCount).toBe(0);
});
it("does not start requestors when request serialization fails", async () => {
const backend = new DispatchBackend();
const manager = makeDispatcherManager({
port: 0,
metricsPort: 0,
pubsub: backend,
});
await expect(
manager.dispatchGlobalService("knowledge", { term: { t: "t" } }),
).rejects.toMatchObject({
_tag: "DispatchSerializationError",
operation: "client-term-to-internal",
});
await manager.stop();
expect(backend.producerOptions).toHaveLength(0);
expect(backend.consumerOptions).toHaveLength(0);
expect(backend.closeCount).toBe(0);
});
it("closes one-shot publish producers when send fails", async () => {
const backend = new DispatchBackend();
backend.failSendTopics.add("tg.flow.ingest");
const manager = makeDispatcherManager({
port: 0,
metricsPort: 0,
pubsub: backend,
});
await expect(
manager.publishToTopic("tg.flow.ingest", { text: "hello" }, "msg-1"),
).rejects.toMatchObject({
_tag: "MessagingDeliveryError",
operation: "send",
});
await manager.stop();
expect(backend.producersByTopic.get("tg.flow.ingest")?.closeCount).toBe(1);
expect(backend.closeCount).toBe(0);
});
it("streams responses until the centralized completion predicate is true", async () => {

View file

@ -25,7 +25,11 @@ import {
type RequestResponseFactoryService,
} from "@trustgraph/base";
import type { GatewayConfig } from "../server.js";
import { translateRequest, translateResponse } from "./serialize.js";
import {
translateRequestEffect,
translateResponseEffect,
type DispatchSerializationError,
} from "./serialize.js";
export type Responder = (response: unknown, complete: boolean) => Promise<void>;
export type EffectResponder<E = never, R = never> = (
@ -37,6 +41,7 @@ export type DispatcherStreamError<E = never> =
| MessagingLifecycleError
| MessagingDeliveryError
| MessagingTimeoutError
| DispatchSerializationError
| E;
// ---------- Service registry ----------
@ -169,6 +174,7 @@ interface DispatcherRuntime {
export function makeDispatcherManager(config: GatewayConfig): DispatcherManager {
const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222");
const ownsPubSub = config.pubsub === undefined;
let runtime: DispatcherRuntime | null = null;
const startEffect = Effect.fn("DispatcherManager.start")(function* () {
@ -207,10 +213,12 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
yield* Scope.close(current.scope, Exit.void);
}
yield* Effect.tryPromise({
try: () => pubsub.close(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause),
});
if (ownsPubSub) {
yield* Effect.tryPromise({
try: () => pubsub.close(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause),
});
}
});
const stop = (): Promise<void> => Effect.runPromise(stopEffect());
@ -306,11 +314,11 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
request: Record<string, unknown>,
) {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const translated = yield* translateRequestEffect(kind, request);
const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
return yield* translateResponseEffect(kind, response);
});
const dispatchGlobalServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming")(function* <
@ -322,15 +330,15 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
responder: EffectResponder<E, R>,
) {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const translated = yield* translateRequestEffect(kind, request);
const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
recipient: Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming.recipient")(function* (response) {
const translatedRes = yield* translateResponseEffect(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return responder(translatedRes, complete).pipe(Effect.as(complete));
},
return yield* responder(translatedRes, complete).pipe(Effect.as(complete));
}),
});
});
@ -367,15 +375,15 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
request: Record<string, unknown>,
) {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const translated = yield* translateRequestEffect(kind, request);
const rr = yield* getRequestorEffect(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
);
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
return yield* translateResponseEffect(kind, response);
});
const dispatchFlowServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchFlowServiceStreaming")(function* <
@ -388,19 +396,19 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
responder: EffectResponder<E, R>,
) {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const translated = yield* translateRequestEffect(kind, request);
const rr = yield* getRequestorEffect(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
);
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
recipient: Effect.fn("DispatcherManager.dispatchFlowServiceStreaming.recipient")(function* (response) {
const translatedRes = yield* translateResponseEffect(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return responder(translatedRes, complete).pipe(Effect.as(complete));
},
return yield* responder(translatedRes, complete).pipe(Effect.as(complete));
}),
});
});
@ -431,24 +439,27 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
*/
const publishToTopic = (topic: string, message: unknown, id?: string): Promise<void> =>
Effect.runPromise(
Effect.gen(function* () {
const producer = yield* Effect.tryPromise({
Effect.acquireUseRelease(
Effect.tryPromise({
try: () => pubsub.createProducer<unknown>({ topic }),
catch: (cause) => messagingDeliveryError(topic, "create-producer", cause),
});
const timestamp = yield* Clock.currentTimeMillis;
const suffix = yield* Random.nextIntBetween(0, 36 ** 6, { halfOpen: true });
const messageId = id ?? `pub-${timestamp}-${suffix.toString(36).padStart(6, "0")}`;
}),
(producer) =>
Effect.gen(function* () {
const timestamp = yield* Clock.currentTimeMillis;
const suffix = yield* Random.nextIntBetween(0, 36 ** 6, { halfOpen: true });
const messageId = id ?? `pub-${timestamp}-${suffix.toString(36).padStart(6, "0")}`;
yield* Effect.tryPromise({
try: () => producer.send(message, { id: messageId }),
catch: (cause) => messagingDeliveryError(topic, "send", cause),
});
yield* Effect.tryPromise({
yield* Effect.tryPromise({
try: () => producer.send(message, { id: messageId }),
catch: (cause) => messagingDeliveryError(topic, "send", cause),
});
}),
(producer) => Effect.tryPromise({
try: () => producer.close(),
catch: (cause) => messagingDeliveryError(topic, "close-producer", cause),
});
}),
}),
),
);
return {

View file

@ -18,7 +18,8 @@
* Python reference: trustgraph-base/trustgraph/messaging/translators/primitives.py
*/
import type { Term, Triple } from "@trustgraph/base";
import { errorMessage, type Term, type Triple } from "@trustgraph/base";
import { Effect } from "effect";
import * as S from "effect/Schema";
// ---------- Client wire format type definitions ----------
@ -55,6 +56,8 @@ export class DispatchSerializationError extends S.TaggedErrorClass<DispatchSeria
},
) {}
const isDispatchSerializationError = S.is(DispatchSerializationError);
interface ClientTriple {
s: ClientTerm;
p: ClientTerm;
@ -280,6 +283,21 @@ export function translateRequest(service: string, body: unknown): unknown {
return body;
}
export const translateRequestEffect = (
service: string,
body: unknown,
): Effect.Effect<unknown, DispatchSerializationError> =>
Effect.try({
try: () => translateRequest(service, body),
catch: (cause) =>
isDispatchSerializationError(cause)
? cause
: DispatchSerializationError.make({
operation: `translate-request:${service}`,
message: errorMessage(cause),
}),
});
/**
* Translate an internal response body to client wire format.
*
@ -293,3 +311,18 @@ export function translateResponse(service: string, response: unknown): unknown {
}
return response;
}
export const translateResponseEffect = (
service: string,
response: unknown,
): Effect.Effect<unknown, DispatchSerializationError> =>
Effect.try({
try: () => translateResponse(service, response),
catch: (cause) =>
isDispatchSerializationError(cause)
? cause
: DispatchSerializationError.make({
operation: `translate-response:${service}`,
message: errorMessage(cause),
}),
});