diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 95da8916..2eafdda3 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,12 +12,13 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 native PubSub -boundary slice: +Current signal counts from `ts/packages` after the 2026-06-02 gateway +streaming callback slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 165 | +| `Effect.runPromise` | 163 | +| `Effect.runPromiseWith` | 0 | | `Map<` | 82 | | `WebSocket` | 62 | | `new Map` | 60 | @@ -93,6 +94,10 @@ Notes: `PubSub` is an in-process hub and does not replace the broker-backed `PubSubBackend`/NATS boundary, but it should be preferred for future in-process broadcast/fanout needs. +- The gateway streaming callback slice added Effect-returning dispatcher + streaming methods, switched the RPC stream server off nested + `Effect.runPromiseWith(context)` queue offers, and replaced the client + `StopStreaming` sentinel error with `Stream.runForEachWhile`. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -718,6 +723,41 @@ Notes: - `bun run --cwd ts/packages/base test` - `cd ts && bun run check` +### 2026-06-02: Gateway Streaming Callback Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/gateway/dispatch/manager.ts` now exposes + `dispatchGlobalServiceStreamingEffect` and + `dispatchFlowServiceStreamingEffect` so Effect callers can handle stream + chunks without Promise callback re-entry. + - The existing Promise-returning streaming methods remain as compatibility + facades and wrap responders with `Effect.tryPromise`. + - `ts/packages/flow/src/gateway/rpc-server.ts` now writes stream chunks into + the RPC queue through the dispatcher Effect path, removing the prior + `Effect.context` plus `Effect.runPromiseWith(context)` bridge. + - `ts/packages/client/src/socket/effect-rpc-client.ts` now uses + `Stream.runForEachWhile` for early stream termination instead of throwing a + synthetic `StopStreaming` tagged error. + - Gateway dispatcher tests now exercise both the Promise compatibility + streaming path and the Effect-native responder path. +- Remaining: + - Client facade methods still duplicate some per-service streaming envelope + completion checks. Centralize these around `DispatchStreamChunk.complete` + in a later client API cleanup. + - `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket + compatibility bridge, not a direct replacement target for Effect RPC + server layers yet. +- Verification: + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/client build` + - `bunx --bun vitest run src/__tests__/gateway-dispatcher.test.ts` + - `bunx --bun vitest run src/__tests__/rpc-timeout.test.ts` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -756,7 +796,13 @@ Notes: Socket errors/JSON parsing now use tagged errors and Schema decoding. The remaining client `newableFactory` assertions are documented as public API compatibility boundaries for this loop. - - Knowledge streams still duplicate legacy end-of-stream handling. + - Gateway `DispatchStream` now uses Effect-native dispatcher streaming + callbacks instead of nested `Effect.runPromiseWith`; the remaining client + streaming cleanup is facade-level completion normalization around + `DispatchStreamChunk.complete`. + - Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a + Fastify socket compatibility bridge while the public Effect RPC server + layers require SocketServer or Effect HTTP routing. - WebSocket adapter host fallbacks now use `Result.try` and tagged adapter errors while preserving sync exports. - RAG/providers/storage: @@ -764,8 +810,14 @@ Notes: remaining `ts/packages` matches. - Provider SDKs and storage clients should become managed resources where they have meaningful lifecycle. - - FalkorDB/Qdrant/Ollama/OpenAI-compatible surfaces still need config, - schema, and scope audits. + - FalkorDB should be the next P1 storage slice: both triples query and store + connect Redis clients, cache them with mutable `Effect.cached` slots, and + expose `Layer.succeed` services without a scoped client finalizer. + - Qdrant has no close/disconnect surface in the installed client, so treat it + as a config/schema/fakeability slice rather than an `acquireRelease` close + slice. + - Ollama/OpenAI-compatible/provider surfaces still need config, schema, and + provider-layer audits. ## Ranked Findings @@ -773,19 +825,29 @@ Notes: - TrustGraph evidence: - `ts/packages/flow/src/storage/triples/falkordb.ts` + - `ts/packages/flow/src/query/triples/falkordb.ts` - `ts/packages/flow/src/storage/embeddings/qdrant-graph.ts` - `ts/packages/flow/src/storage/embeddings/qdrant-doc.ts` + - `ts/packages/flow/src/query/embeddings/qdrant-graph.ts` + - `ts/packages/flow/src/query/embeddings/qdrant-doc.ts` - `ts/packages/flow/src/model/text-completion/*.ts` - `ts/packages/flow/src/embeddings/ollama.ts` - Effect primitives: - - `Effect.acquireRelease`, `Layer.scoped`, `Config`, `ConfigProvider`, - `Metric`, `Logger`, Effect AI provider layers. + - `Effect.acquireRelease`, `Layer.effect`/`Layer.scoped`, `Config`, + `ConfigProvider`, `Metric`, `Logger`, Effect AI provider layers. - Rewrite shape: + - First migrate FalkorDB triples store/query so Redis client connect and + disconnect/quit are owned by the service layer scope instead of mutable + cached effects hidden inside a `Layer.succeed` service. - Move env/config reading into `Config` loaders and provider-specific layers. - - Scope SDK clients that need explicit close/disconnect. + - Scope SDK clients that need explicit close/disconnect; for clients without + close APIs, prefer config/schema/fakeable construction work instead. - Tests: - - Provider config tests with `ConfigProvider.fromMap`. - - Storage tests with fake clients before changing real resource lifetimes. + - FalkorDB tests with fake client factories proving connect on acquire and + disconnect/quit on scope close. + - Provider/config tests with `ConfigProvider.fromUnknown`. + - Storage/query tests with fake clients before changing real resource + lifetimes. ### P2: Canonicalize MCP Around The Effect Server @@ -819,9 +881,10 @@ Notes: ## Recommended PR Order -1. Gateway RPC callback and client streaming completion cleanup. -2. Storage/provider managed resource cleanup. -3. MCP parity/deletion decision and workbench platform polish. +1. FalkorDB triples store/query scoped client lifecycle. +2. Qdrant config/schema/fakeable construction cleanup. +3. Client streaming facade completion normalization. +4. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules @@ -850,6 +913,10 @@ Do not flag these as rewrite blockers without additional proof: boundary for NATS/Pulsar-style topics, acknowledgement, schema codecs, and backend lifecycle. Effect's native `PubSub` can replace in-process fanout helpers, but not the distributed broker abstraction by itself. +- `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket + compatibility bridge. Do not flag its internal connection maps/sets as a + standalone replacement target until the gateway is ready to move onto Effect + SocketServer or Effect HTTP routing. ## Acceptance For Final Loop Completion diff --git a/ts/packages/client/src/socket/effect-rpc-client.ts b/ts/packages/client/src/socket/effect-rpc-client.ts index 70e724af..66b3ab38 100644 --- a/ts/packages/client/src/socket/effect-rpc-client.ts +++ b/ts/packages/client/src/socket/effect-rpc-client.ts @@ -1,4 +1,4 @@ -import { Context, Data, Effect, Layer, ManagedRuntime, Stream } from "effect"; +import { Context, Effect, Layer, ManagedRuntime, Stream } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; @@ -156,17 +156,12 @@ export function makeEffectRpcClient( runtime.runPromise( withDispatchRequestPolicy( client.DispatchStream(DispatchPayload.make(input)).pipe( - Stream.runForEach((chunk) => + Stream.runForEachWhile((chunk) => Effect.suspend(() => { last = chunk; - if (receiver(chunk)) return Effect.fail(new StopStreaming()); - return Effect.void; + return Effect.succeed(!receiver(chunk)); }), ), - Effect.catchIf( - (cause): cause is StopStreaming => cause instanceof StopStreaming, - () => Effect.void, - ), ), options, ), @@ -205,8 +200,6 @@ export function withDispatchRequestPolicy( return retryTimes > 0 ? timed.pipe(Effect.retry({ times: retryTimes })) : timed; } -class StopStreaming extends Data.TaggedError("StopStreaming")<{}> {} - function errorMessage(cause: unknown): string { if (cause instanceof Error) return cause.message; if (typeof cause === "string") return cause; diff --git a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts index e6fd77b2..4f0b597d 100644 --- a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts +++ b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; +import { Effect } from "effect"; import { dispatcherManagerIsCompleteResponse, makeDispatcherManager, @@ -189,6 +190,30 @@ describe("gateway dispatcher manager", () => { ]); }); + it("streams responses through the Effect-native responder path", async () => { + const backend = new DispatchBackend(); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + const chunks: Array<{ readonly response: unknown; readonly complete: boolean }> = []; + + await Effect.runPromise( + manager.dispatchGlobalServiceStreamingEffect("knowledge", { query: "hello" }, (response, complete) => + Effect.sync(() => { + chunks.push({ response, complete }); + }) + ), + ); + await manager.stop(); + + expect(chunks).toEqual([ + { response: { chunk: 1 }, complete: false }, + { response: { chunk: 2, endOfStream: true }, complete: true }, + ]); + }); + it.each([ [{ complete: true }], [{ endOfStream: true }], diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 81f111b3..5dda5d89 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -17,13 +17,27 @@ import { messagingDeliveryError, messagingLifecycleError, type EffectRequestResponse, + type MessagingDeliveryError, + type MessagingLifecycleError, + type MessagingTimeoutError, type PubSubBackend, + type PubSubError, type RequestResponseFactoryService, } from "@trustgraph/base"; import type { GatewayConfig } from "../server.js"; import { translateRequest, translateResponse } from "./serialize.js"; export type Responder = (response: unknown, complete: boolean) => Promise; +export type EffectResponder = ( + response: unknown, + complete: boolean, +) => Effect.Effect; +export type DispatcherStreamError = + | PubSubError + | MessagingLifecycleError + | MessagingDeliveryError + | MessagingTimeoutError + | E; // ---------- Service registry ---------- @@ -89,6 +103,11 @@ export interface DispatcherManager { request: Record, responder: Responder, ) => Promise; + readonly dispatchGlobalServiceStreamingEffect: ( + kind: string, + request: Record, + responder: EffectResponder, + ) => Effect.Effect, R>; readonly dispatchFlowService: ( flow: string, kind: string, @@ -100,6 +119,12 @@ export interface DispatcherManager { request: Record, responder: Responder, ) => Promise; + readonly dispatchFlowServiceStreamingEffect: ( + flow: string, + kind: string, + request: Record, + responder: EffectResponder, + ) => Effect.Effect, R>; readonly publishToTopic: ( topic: string, message: unknown, @@ -146,96 +171,93 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager const pubsub: PubSubBackend = config.pubsub ?? makeNatsBackend(config.natsUrl ?? "nats://localhost:4222"); let runtime: DispatcherRuntime | null = null; - const start = (): Promise => { - if (runtime !== null) return Promise.resolve(); + const startEffect = Effect.fn("DispatcherManager.start")(function* () { + if (runtime !== null) return; - return Effect.runPromise( - Effect.gen(function* () { - const scope = yield* Scope.make(); - const nextRuntime = yield* Effect.gen(function* () { - const messagingConfig = yield* loadMessagingRuntimeConfig(); - const requestors = yield* SynchronizedRef.make(new Map()); - return { - scope, - requestors, - factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig), - } satisfies DispatcherRuntime; - }).pipe( - Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))), - ); - runtime = nextRuntime; - }), + const scope = yield* Scope.make(); + const nextRuntime = yield* Effect.gen(function* () { + const messagingConfig = yield* loadMessagingRuntimeConfig().pipe( + Effect.mapError((cause) => + messagingLifecycleError( + "gateway-dispatcher", + "load-messaging-config", + cause, + ) + ), + ); + const requestors = yield* SynchronizedRef.make(new Map()); + return { + scope, + requestors, + factory: makeRequestResponseFactoryService(makePubSubService(pubsub), messagingConfig), + } satisfies DispatcherRuntime; + }).pipe( + Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))), ); - }; + runtime = nextRuntime; + }); - const stop = (): Promise => - Effect.runPromise( - Effect.gen(function* () { - const current = runtime; - runtime = null; + const start = (): Promise => Effect.runPromise(startEffect()); - if (current !== null) { - yield* Scope.close(current.scope, Exit.void); - } + const stopEffect = Effect.fn("DispatcherManager.stop")(function* () { + const current = runtime; + runtime = null; - yield* Effect.tryPromise({ - try: () => pubsub.close(), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause), - }); - }), - ); + if (current !== null) { + yield* Scope.close(current.scope, Exit.void); + } + + yield* Effect.tryPromise({ + try: () => pubsub.close(), + catch: (cause) => messagingLifecycleError("gateway-dispatcher", "close-pubsub", cause), + }); + }); + + const stop = (): Promise => Effect.runPromise(stopEffect()); // ---------- Internal helpers ---------- - const ensureRuntime = (): Promise => - Effect.runPromise( - Effect.gen(function* () { - if (runtime === null) { - yield* Effect.tryPromise({ - try: () => start(), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "start", cause), - }); - } - if (runtime === null) { - return yield* messagingLifecycleError("gateway-dispatcher", "start", "Dispatcher manager failed to start"); - } - return runtime; - }), - ); + const ensureRuntimeEffect = Effect.fn("DispatcherManager.ensureRuntime")(function* () { + if (runtime === null) { + yield* startEffect(); + } + if (runtime === null) { + return yield* messagingLifecycleError( + "gateway-dispatcher", + "start", + "Dispatcher manager failed to start", + ); + } + return runtime; + }); - const getRequestor = ( + const getRequestorEffect = Effect.fn("DispatcherManager.getRequestor")(function* ( requestTopic: string, responseTopic: string, key: string, - ): Promise> => - Effect.runPromise( - Effect.gen(function* () { - const current = yield* Effect.tryPromise({ - try: () => ensureRuntime(), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "ensure-runtime", cause), - }); + ) { + const current = yield* ensureRuntimeEffect(); - return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => { - const cached = requestors.get(key); - if (cached !== undefined) { - return Effect.succeed([cached, requestors] as const); - } + return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => { + const cached = requestors.get(key); + if (cached !== undefined) { + return Effect.succeed([cached, requestors] as const); + } - return current.factory.make({ - requestTopic, - responseTopic, - subscription: `gateway-${key}`, - }).pipe( - Scope.provide(current.scope), - Effect.map((requestor) => { - const next = new Map(requestors); - next.set(key, requestor); - return [requestor, next] as const; - }), - ); - }); - }), - ); + return current.factory.make({ + requestTopic, + responseTopic, + subscription: `gateway-${key}`, + }).pipe( + Scope.provide(current.scope), + Effect.map((requestor) => { + const next = new Map(requestors); + next.set(key, requestor); + return [requestor, next] as const; + }), + ); + }); + }); const resolveGlobalTopics = ( kind: string, @@ -277,19 +299,40 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager kind: string, request: Record, ): Promise => - Effect.runPromise( - Effect.gen(function* () { - const { requestTopic, responseTopic } = resolveGlobalTopics(kind); - const rr = yield* Effect.tryPromise({ - try: () => getRequestor(requestTopic, responseTopic, `global:${kind}`), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause), - }); + Effect.runPromise(dispatchGlobalServiceEffect(kind, request)); - const translated = translateRequest(kind, request); - const response = yield* rr.request(translated); - return translateResponse(kind, response); - }), - ); + const dispatchGlobalServiceEffect = Effect.fn("DispatcherManager.dispatchGlobalService")(function* ( + kind: string, + request: Record, + ) { + const { requestTopic, responseTopic } = resolveGlobalTopics(kind); + const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`); + + const translated = translateRequest(kind, request); + const response = yield* rr.request(translated); + return translateResponse(kind, response); + }); + + const dispatchGlobalServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchGlobalServiceStreaming")(function* < + E, + R, + >( + kind: string, + request: Record, + responder: EffectResponder, + ) { + const { requestTopic, responseTopic } = resolveGlobalTopics(kind); + const rr = yield* getRequestorEffect(requestTopic, responseTopic, `global:${kind}`); + const translated = translateRequest(kind, request); + + yield* rr.request(translated, { + recipient: (response) => { + const translatedRes = translateResponse(kind, response); + const complete = dispatcherManagerIsCompleteResponse(translatedRes); + return responder(translatedRes, complete).pipe(Effect.as(complete)); + }, + }); + }); const dispatchGlobalServiceStreaming = ( kind: string, @@ -297,25 +340,16 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager responder: Responder, ): Promise => Effect.runPromise( - Effect.gen(function* () { - const { requestTopic, responseTopic } = resolveGlobalTopics(kind); - const rr = yield* Effect.tryPromise({ - try: () => getRequestor(requestTopic, responseTopic, `global:${kind}`), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause), - }); - const translated = translateRequest(kind, request); - - yield* rr.request(translated, { - recipient: (response) => { - const translatedRes = translateResponse(kind, response); - const complete = dispatcherManagerIsCompleteResponse(translatedRes); - return Effect.tryPromise({ - try: () => responder(translatedRes, complete).then(() => complete), - catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error), - }); - }, - }); - }), + dispatchGlobalServiceStreamingEffect(kind, request, (response, complete) => + Effect.tryPromise({ + try: () => responder(response, complete), + catch: (error) => messagingDeliveryError( + resolveGlobalTopics(kind).responseTopic, + "stream-responder", + error, + ), + }) + ), ); // ---------- Flow-scoped service dispatch ---------- @@ -325,24 +359,51 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager kind: string, request: Record, ): Promise => - Effect.runPromise( - Effect.gen(function* () { - const { requestTopic, responseTopic } = resolveFlowTopics(kind); - const rr = yield* Effect.tryPromise({ - try: () => getRequestor( - requestTopic, - responseTopic, - `flow:${flow}:${kind}`, - ), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause), - }); + Effect.runPromise(dispatchFlowServiceEffect(flow, kind, request)); - const translated = translateRequest(kind, request); - const response = yield* rr.request(translated); - return translateResponse(kind, response); - }), + const dispatchFlowServiceEffect = Effect.fn("DispatcherManager.dispatchFlowService")(function* ( + flow: string, + kind: string, + request: Record, + ) { + const { requestTopic, responseTopic } = resolveFlowTopics(kind); + const rr = yield* getRequestorEffect( + requestTopic, + responseTopic, + `flow:${flow}:${kind}`, ); + const translated = translateRequest(kind, request); + const response = yield* rr.request(translated); + return translateResponse(kind, response); + }); + + const dispatchFlowServiceStreamingEffect = Effect.fn("DispatcherManager.dispatchFlowServiceStreaming")(function* < + E, + R, + >( + flow: string, + kind: string, + request: Record, + responder: EffectResponder, + ) { + const { requestTopic, responseTopic } = resolveFlowTopics(kind); + const rr = yield* getRequestorEffect( + requestTopic, + responseTopic, + `flow:${flow}:${kind}`, + ); + const translated = translateRequest(kind, request); + + yield* rr.request(translated, { + recipient: (response) => { + const translatedRes = translateResponse(kind, response); + const complete = dispatcherManagerIsCompleteResponse(translatedRes); + return responder(translatedRes, complete).pipe(Effect.as(complete)); + }, + }); + }); + const dispatchFlowServiceStreaming = ( flow: string, kind: string, @@ -350,29 +411,16 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager responder: Responder, ): Promise => Effect.runPromise( - Effect.gen(function* () { - const { requestTopic, responseTopic } = resolveFlowTopics(kind); - const rr = yield* Effect.tryPromise({ - try: () => getRequestor( - requestTopic, - responseTopic, - `flow:${flow}:${kind}`, + dispatchFlowServiceStreamingEffect(flow, kind, request, (response, complete) => + Effect.tryPromise({ + try: () => responder(response, complete), + catch: (error) => messagingDeliveryError( + resolveFlowTopics(kind).responseTopic, + "stream-responder", + error, ), - catch: (cause) => messagingLifecycleError("gateway-dispatcher", "get-requestor", cause), - }); - const translated = translateRequest(kind, request); - - yield* rr.request(translated, { - recipient: (response) => { - const translatedRes = translateResponse(kind, response); - const complete = dispatcherManagerIsCompleteResponse(translatedRes); - return Effect.tryPromise({ - try: () => responder(translatedRes, complete).then(() => complete), - catch: (error) => messagingDeliveryError(responseTopic, "stream-responder", error), - }); - }, - }); - }), + }) + ), ); // ---------- Fire-and-forget publish ---------- @@ -408,8 +456,10 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager stop, dispatchGlobalService, dispatchGlobalServiceStreaming, + dispatchGlobalServiceStreamingEffect, dispatchFlowService, dispatchFlowServiceStreaming, + dispatchFlowServiceStreamingEffect, publishToTopic, }; } diff --git a/ts/packages/flow/src/gateway/rpc-server.ts b/ts/packages/flow/src/gateway/rpc-server.ts index f1787957..44c06891 100644 --- a/ts/packages/flow/src/gateway/rpc-server.ts +++ b/ts/packages/flow/src/gateway/rpc-server.ts @@ -3,7 +3,7 @@ import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as RpcServer from "effect/unstable/rpc/RpcServer"; import type * as Socket from "effect/unstable/socket/Socket"; import { errorMessage } from "@trustgraph/base"; -import type { DispatcherManager } from "./dispatch/manager.js"; +import type { DispatcherManager, DispatcherStreamError } from "./dispatch/manager.js"; import { DispatchError, DispatchPayload, DispatchStreamChunk, TrustGraphRpcs } from "./rpc-contract.js"; import { makeSocketRpcProtocol } from "./rpc-protocol.js"; @@ -45,20 +45,14 @@ const makeGatewayRpcHandlers = (dispatcher: DispatcherManager) => catch: (cause) => DispatchError.make({ message: errorMessage(cause) }), }), DispatchStream: Effect.fn("GatewayRpc.DispatchStream")(function* (payload) { - const context = yield* Effect.context(); - const runPromise = Effect.runPromiseWith(context); const queue = yield* Queue.bounded(16); yield* Effect.addFinalizer(() => Queue.shutdown(queue)); - yield* Effect.tryPromise({ - try: () => - dispatchStream(dispatcher, payload, (response, complete) => - runPromise(Queue.offer(queue, DispatchStreamChunk.make({ response, complete }))).then(() => complete), - ), - catch: (cause) => DispatchError.make({ message: errorMessage(cause) }), - }).pipe( + yield* dispatchStreamEffect(dispatcher, payload, (response, complete) => + Queue.offer(queue, DispatchStreamChunk.make({ response, complete })), + ).pipe( Effect.flatMap(() => Queue.end(queue)), - Effect.catch((error) => Queue.fail(queue, error)), + Effect.catch((cause) => Queue.fail(queue, DispatchError.make({ message: errorMessage(cause) }))), Effect.forkScoped, ); @@ -81,26 +75,23 @@ function dispatchOne( return dispatcher.dispatchGlobalService(payload.service, payload.request); } -function dispatchStream( +function dispatchStreamEffect( dispatcher: DispatcherManager, payload: DispatchPayload, - responder: (response: unknown, complete: boolean) => Promise, -): Promise { - const send = (response: unknown, complete: boolean): Promise => - responder(response, complete).then(() => undefined); - + responder: (response: unknown, complete: boolean) => Effect.Effect, +): Effect.Effect { if (payload.scope === "flow") { - return dispatcher.dispatchFlowServiceStreaming( + return dispatcher.dispatchFlowServiceStreamingEffect( payload.flow ?? "default", payload.service, payload.request, - send, + responder, ); } - return dispatcher.dispatchGlobalServiceStreaming( + return dispatcher.dispatchGlobalServiceStreamingEffect( payload.service, payload.request, - send, + responder, ); }