Add Effect-native gateway streaming path

This commit is contained in:
elpresidank 2026-06-02 03:43:21 -05:00
parent df0a0c068e
commit ce5838db1d
5 changed files with 315 additions and 189 deletions

View file

@ -1,4 +1,4 @@
import { Context, Data, Effect, Layer, ManagedRuntime, Stream } from "effect";
import { Context, Effect, Layer, ManagedRuntime, Stream } from "effect";
import type * as RpcGroup from "effect/unstable/rpc/RpcGroup";
import * as RpcClient from "effect/unstable/rpc/RpcClient";
import type { RpcClientError } from "effect/unstable/rpc/RpcClientError";
@ -156,17 +156,12 @@ export function makeEffectRpcClient(
runtime.runPromise(
withDispatchRequestPolicy(
client.DispatchStream(DispatchPayload.make(input)).pipe(
Stream.runForEach((chunk) =>
Stream.runForEachWhile((chunk) =>
Effect.suspend(() => {
last = chunk;
if (receiver(chunk)) return Effect.fail(new StopStreaming());
return Effect.void;
return Effect.succeed(!receiver(chunk));
}),
),
Effect.catchIf(
(cause): cause is StopStreaming => cause instanceof StopStreaming,
() => Effect.void,
),
),
options,
),
@ -205,8 +200,6 @@ export function withDispatchRequestPolicy<A, E, R>(
return retryTimes > 0 ? timed.pipe(Effect.retry({ times: retryTimes })) : timed;
}
class StopStreaming extends Data.TaggedError("StopStreaming")<{}> {}
function errorMessage(cause: unknown): string {
if (cause instanceof Error) return cause.message;
if (typeof cause === "string") return cause;

View file

@ -1,4 +1,5 @@
import { describe, expect, it } from "vitest";
import { Effect } from "effect";
import {
dispatcherManagerIsCompleteResponse,
makeDispatcherManager,
@ -189,6 +190,30 @@ describe("gateway dispatcher manager", () => {
]);
});
it("streams responses through the Effect-native responder path", async () => {
const backend = new DispatchBackend();
const manager = makeDispatcherManager({
port: 0,
metricsPort: 0,
pubsub: backend,
});
const chunks: Array<{ readonly response: unknown; readonly complete: boolean }> = [];
await Effect.runPromise(
manager.dispatchGlobalServiceStreamingEffect("knowledge", { query: "hello" }, (response, complete) =>
Effect.sync(() => {
chunks.push({ response, complete });
})
),
);
await manager.stop();
expect(chunks).toEqual([
{ response: { chunk: 1 }, complete: false },
{ response: { chunk: 2, endOfStream: true }, complete: true },
]);
});
it.each([
[{ complete: true }],
[{ endOfStream: true }],

View file

@ -17,13 +17,27 @@ import {
messagingDeliveryError,
messagingLifecycleError,
type EffectRequestResponse,
type MessagingDeliveryError,
type MessagingLifecycleError,
type MessagingTimeoutError,
type PubSubBackend,
type PubSubError,
type RequestResponseFactoryService,
} from "@trustgraph/base";
import type { GatewayConfig } from "../server.js";
import { translateRequest, translateResponse } from "./serialize.js";
export type Responder = (response: unknown, complete: boolean) => Promise<void>;
export type EffectResponder<E = never, R = never> = (
response: unknown,
complete: boolean,
) => Effect.Effect<void, E, R>;
export type DispatcherStreamError<E = never> =
| PubSubError
| MessagingLifecycleError
| MessagingDeliveryError
| MessagingTimeoutError
| E;
// ---------- Service registry ----------
@ -89,6 +103,11 @@ export interface DispatcherManager {
request: Record<string, unknown>,
responder: Responder,
) => Promise<void>;
readonly dispatchGlobalServiceStreamingEffect: <E = never, R = never>(
kind: string,
request: Record<string, unknown>,
responder: EffectResponder<E, R>,
) => Effect.Effect<void, DispatcherStreamError<E>, R>;
readonly dispatchFlowService: (
flow: string,
kind: string,
@ -100,6 +119,12 @@ export interface DispatcherManager {
request: Record<string, unknown>,
responder: Responder,
) => Promise<void>;
readonly dispatchFlowServiceStreamingEffect: <E = never, R = never>(
flow: string,
kind: string,
request: Record<string, unknown>,
responder: EffectResponder<E, R>,
) => Effect.Effect<void, DispatcherStreamError<E>, R>;
readonly publishToTopic: (
topic: string,
message: unknown,
@ -146,96 +171,93 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222");
let runtime: DispatcherRuntime | null = null;
const start = (): Promise<void> => {
if (runtime !== null) return Promise.resolve();
const startEffect = Effect.fn("DispatcherManager.start")(function* () {
if (runtime !== null) return;
return Effect.runPromise(
Effect.gen(function* () {
const scope = yield* Scope.make();
const nextRuntime = yield* Effect.gen(function* () {
const messagingConfig = yield* loadMessagingRuntimeConfig();
const requestors = yield* SynchronizedRef.make<RequestorMap>(new Map());
return {
scope,
requestors,
factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig),
} satisfies DispatcherRuntime;
}).pipe(
Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))),
);
runtime = nextRuntime;
}),
const scope = yield* Scope.make();
const nextRuntime = yield* Effect.gen(function* () {
const messagingConfig = yield* loadMessagingRuntimeConfig().pipe(
Effect.mapError((cause) =>
messagingLifecycleError(
"gateway-dispatcher",
"load-messaging-config",
cause,
)
),
);
const requestors = yield* SynchronizedRef.make<RequestorMap>(new Map());
return {
scope,
requestors,
factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig),
} satisfies DispatcherRuntime;
}).pipe(
Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))),
);
};
runtime = nextRuntime;
});
const stop = (): Promise<void> =>
Effect.runPromise(
Effect.gen(function* () {
const current = runtime;
runtime = null;
const start = (): Promise<void> => Effect.runPromise(startEffect());
if (current !== null) {
yield* Scope.close(current.scope, Exit.void);
}
const stopEffect = Effect.fn("DispatcherManager.stop")(function* () {
const current = runtime;
runtime = null;
yield* Effect.tryPromise({
try: () => pubsub.close(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause),
});
}),
);
if (current !== null) {
yield* Scope.close(current.scope, Exit.void);
}
yield* Effect.tryPromise({
try: () => pubsub.close(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause),
});
});
const stop = (): Promise<void> => Effect.runPromise(stopEffect());
// ---------- Internal helpers ----------
const ensureRuntime = (): Promise<DispatcherRuntime> =>
Effect.runPromise(
Effect.gen(function* () {
if (runtime === null) {
yield* Effect.tryPromise({
try: () => start(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "start", cause),
});
}
if (runtime === null) {
return yield* messagingLifecycleError("gateway-dispatcher", "start", "Dispatcher manager failed to start");
}
return runtime;
}),
);
const ensureRuntimeEffect = Effect.fn("DispatcherManager.ensureRuntime")(function* () {
if (runtime === null) {
yield* startEffect();
}
if (runtime === null) {
return yield* messagingLifecycleError(
"gateway-dispatcher",
"start",
"Dispatcher manager failed to start",
);
}
return runtime;
});
const getRequestor = (
const getRequestorEffect = Effect.fn("DispatcherManager.getRequestor")(function* (
requestTopic: string,
responseTopic: string,
key: string,
): Promise<EffectRequestResponse<unknown, unknown>> =>
Effect.runPromise(
Effect.gen(function* () {
const current = yield* Effect.tryPromise({
try: () => ensureRuntime(),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "ensure-runtime", cause),
});
) {
const current = yield* ensureRuntimeEffect();
return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => {
const cached = requestors.get(key);
if (cached !== undefined) {
return Effect.succeed([cached, requestors] as const);
}
return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => {
const cached = requestors.get(key);
if (cached !== undefined) {
return Effect.succeed([cached, requestors] as const);
}
return current.factory.make<unknown, unknown>({
requestTopic,
responseTopic,
subscription: `gateway-${key}`,
}).pipe(
Scope.provide(current.scope),
Effect.map((requestor) => {
const next = new Map(requestors);
next.set(key, requestor);
return [requestor, next] as const;
}),
);
});
}),
);
return current.factory.make<unknown, unknown>({
requestTopic,
responseTopic,
subscription: `gateway-${key}`,
}).pipe(
Scope.provide(current.scope),
Effect.map((requestor) => {
const next = new Map(requestors);
next.set(key, requestor);
return [requestor, next] as const;
}),
);
});
});
const resolveGlobalTopics = (
kind: string,
@ -277,19 +299,40 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
kind: string,
request: Record<string, unknown>,
): Promise<unknown> =>
Effect.runPromise(
Effect.gen(function* () {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const rr = yield* Effect.tryPromise({
try: () => getRequestor(requestTopic, responseTopic, `global:${kind}`),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause),
});
Effect.runPromise(dispatchGlobalServiceEffect(kind, request));
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
}),
);
const dispatchGlobalServiceEffect = Effect.fn("DispatcherManager.dispatchGlobalService")(function* (
kind: string,
request: Record<string, unknown>,
) {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
});
const dispatchGlobalServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming")(function* <
E,
R,
>(
kind: string,
request: Record<string, unknown>,
responder: EffectResponder<E, R>,
) {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`);
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return responder(translatedRes, complete).pipe(Effect.as(complete));
},
});
});
const dispatchGlobalServiceStreaming = (
kind: string,
@ -297,25 +340,16 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
responder: Responder,
): Promise<void> =>
Effect.runPromise(
Effect.gen(function* () {
const { requestTopic, responseTopic } = resolveGlobalTopics(kind);
const rr = yield* Effect.tryPromise({
try: () => getRequestor(requestTopic, responseTopic, `global:${kind}`),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause),
});
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return Effect.tryPromise({
try: () => responder(translatedRes, complete).then(() => complete),
catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error),
});
},
});
}),
dispatchGlobalServiceStreamingEffect(kind, request, (response, complete) =>
Effect.tryPromise({
try: () => responder(response, complete),
catch: (error) => messagingDeliveryError(
resolveGlobalTopics(kind).responseTopic,
"stream-responder",
error,
),
})
),
);
// ---------- Flow-scoped service dispatch ----------
@ -325,24 +359,51 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
kind: string,
request: Record<string, unknown>,
): Promise<unknown> =>
Effect.runPromise(
Effect.gen(function* () {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const rr = yield* Effect.tryPromise({
try: () => getRequestor(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause),
});
Effect.runPromise(dispatchFlowServiceEffect(flow, kind, request));
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
}),
const dispatchFlowServiceEffect = Effect.fn("DispatcherManager.dispatchFlowService")(function* (
flow: string,
kind: string,
request: Record<string, unknown>,
) {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const rr = yield* getRequestorEffect(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
);
const translated = translateRequest(kind, request);
const response = yield* rr.request(translated);
return translateResponse(kind, response);
});
const dispatchFlowServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchFlowServiceStreaming")(function* <
E,
R,
>(
flow: string,
kind: string,
request: Record<string, unknown>,
responder: EffectResponder<E, R>,
) {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const rr = yield* getRequestorEffect(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
);
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return responder(translatedRes, complete).pipe(Effect.as(complete));
},
});
});
const dispatchFlowServiceStreaming = (
flow: string,
kind: string,
@ -350,29 +411,16 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
responder: Responder,
): Promise<void> =>
Effect.runPromise(
Effect.gen(function* () {
const { requestTopic, responseTopic } = resolveFlowTopics(kind);
const rr = yield* Effect.tryPromise({
try: () => getRequestor(
requestTopic,
responseTopic,
`flow:${flow}:${kind}`,
dispatchFlowServiceStreamingEffect(flow, kind, request, (response, complete) =>
Effect.tryPromise({
try: () => responder(response, complete),
catch: (error) => messagingDeliveryError(
resolveFlowTopics(kind).responseTopic,
"stream-responder",
error,
),
catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause),
});
const translated = translateRequest(kind, request);
yield* rr.request(translated, {
recipient: (response) => {
const translatedRes = translateResponse(kind, response);
const complete = dispatcherManagerIsCompleteResponse(translatedRes);
return Effect.tryPromise({
try: () => responder(translatedRes, complete).then(() => complete),
catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error),
});
},
});
}),
})
),
);
// ---------- Fire-and-forget publish ----------
@ -408,8 +456,10 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager
stop,
dispatchGlobalService,
dispatchGlobalServiceStreaming,
dispatchGlobalServiceStreamingEffect,
dispatchFlowService,
dispatchFlowServiceStreaming,
dispatchFlowServiceStreamingEffect,
publishToTopic,
};
}

View file

@ -3,7 +3,7 @@ import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as RpcServer from "effect/unstable/rpc/RpcServer";
import type * as Socket from "effect/unstable/socket/Socket";
import { errorMessage } from "@trustgraph/base";
import type { DispatcherManager } from "./dispatch/manager.js";
import type { DispatcherManager, DispatcherStreamError } from "./dispatch/manager.js";
import { DispatchError, DispatchPayload, DispatchStreamChunk, TrustGraphRpcs } from "./rpc-contract.js";
import { makeSocketRpcProtocol } from "./rpc-protocol.js";
@ -45,20 +45,14 @@ const makeGatewayRpcHandlers = (dispatcher: DispatcherManager) =>
catch: (cause) => DispatchError.make({ message: errorMessage(cause) }),
}),
DispatchStream: Effect.fn("GatewayRpc.DispatchStream")(function* (payload) {
const context = yield* Effect.context<never>();
const runPromise = Effect.runPromiseWith(context);
const queue = yield* Queue.bounded<DispatchStreamChunk, DispatchError | Cause.Done>(16);
yield* Effect.addFinalizer(() => Queue.shutdown(queue));
yield* Effect.tryPromise({
try: () =>
dispatchStream(dispatcher, payload, (response, complete) =>
runPromise(Queue.offer(queue, DispatchStreamChunk.make({ response, complete }))).then(() => complete),
),
catch: (cause) => DispatchError.make({ message: errorMessage(cause) }),
}).pipe(
yield* dispatchStreamEffect(dispatcher, payload, (response, complete) =>
Queue.offer(queue, DispatchStreamChunk.make({ response, complete })),
).pipe(
Effect.flatMap(() => Queue.end(queue)),
Effect.catch((error) => Queue.fail(queue, error)),
Effect.catch((cause) => Queue.fail(queue, DispatchError.make({ message: errorMessage(cause) }))),
Effect.forkScoped,
);
@ -81,26 +75,23 @@ function dispatchOne(
return dispatcher.dispatchGlobalService(payload.service, payload.request);
}
function dispatchStream(
function dispatchStreamEffect(
dispatcher: DispatcherManager,
payload: DispatchPayload,
responder: (response: unknown, complete: boolean) => Promise<boolean>,
): Promise<void> {
const send = (response: unknown, complete: boolean): Promise<void> =>
responder(response, complete).then(() => undefined);
responder: (response: unknown, complete: boolean) => Effect.Effect<void>,
): Effect.Effect<void, DispatcherStreamError> {
if (payload.scope === "flow") {
return dispatcher.dispatchFlowServiceStreaming(
return dispatcher.dispatchFlowServiceStreamingEffect(
payload.flow ?? "default",
payload.service,
payload.request,
send,
responder,
);
}
return dispatcher.dispatchGlobalServiceStreaming(
return dispatcher.dispatchGlobalServiceStreamingEffect(
payload.service,
payload.request,
send,
responder,
);
}