From 32788ec0e4a6d4892f8e75c9ebd2d612599a6dd9 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 04:18:11 -0500 Subject: [PATCH] Normalize client stream envelopes --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 50 +++- .../client/src/__tests__/rpc-timeout.test.ts | 73 +++++- .../client/src/socket/trustgraph-socket.ts | 242 +++++++++++++----- 3 files changed, 287 insertions(+), 78 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 2efaf920..5d234cb9 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Qdrant -config/schema/fakeability slice: +Current signal counts from `ts/packages` after the 2026-06-02 client streaming +facade normalization slice: | Signal | Count | | --- | ---: | @@ -114,6 +114,11 @@ Notes: scoped finalizer slice. `Effect.runPromise` increased because the new tests and legacy service initialization logs run Effects at compatibility boundaries. +- The client streaming facade slice did not change signal counts. It + centralized the legacy streaming `{ response, complete, error }` envelope + decode in `trustgraph-socket.ts`, uses Schema plus `effect/Predicate` + property narrowing for streaming payload reads, and leaves service-specific + legacy completion markers only where they preserve public callback behavior. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -465,6 +470,7 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` - `git diff --check` + - `git diff --check` ### 2026-06-02: Client Socket Tagged Error And JSON Slice @@ -758,9 +764,6 @@ Notes: - Gateway dispatcher tests now exercise both the Promise compatibility streaming path and the Effect-native responder path. - Remaining: - - Client facade methods still duplicate some per-service streaming envelope - completion checks. Centralize these around `DispatchStreamChunk.complete` - in a later client API cleanup. - `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket compatibility bridge, not a direct replacement target for Effect RPC server layers yet. @@ -774,6 +777,32 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Client Streaming Facade Normalization Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/client/src/socket/trustgraph-socket.ts` now decodes the + legacy streaming envelope with Schema before service-specific callback + handling. + - Streaming payload reads now use `effect/Predicate` property narrowing + helpers instead of repeated response-wrapper assertions. + - Graph RAG, document RAG, text completion, prompt, agent, and document + stream callbacks now use a shared `streamComplete(...)` helper. The RPC + `DispatchStreamChunk.complete` bit is the default transport completion + source, with legacy service markers preserved for public compatibility. + - Explainability triples are decoded through a recursive Schema instead of + `as Triple[]`. + - The focused client test now proves normalized `DispatchStreamChunk` + completion flows through `graphRagStreaming` and final metadata. +- Verification: + - `bunx --bun vitest run src/__tests__/rpc-timeout.test.ts` + - `bun run --cwd ts/packages/client build` + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/client test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ### 2026-06-02: FalkorDB Scoped Client Lifecycle Slice - Status: migrated and root-verified. @@ -871,9 +900,9 @@ Notes: The remaining client `newableFactory` assertions are documented as public API compatibility boundaries for this loop. - Gateway `DispatchStream` now uses Effect-native dispatcher streaming - callbacks instead of nested `Effect.runPromiseWith`; the remaining client - streaming cleanup is facade-level completion normalization around - `DispatchStreamChunk.complete`. + callbacks instead of nested `Effect.runPromiseWith`, and client streaming + facade callbacks now decode the legacy envelope through Schema before + applying service-specific public callback semantics. - Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a Fastify socket compatibility bridge while the public Effect RPC server layers require SocketServer or Effect HTTP routing. @@ -948,9 +977,8 @@ Notes: ## Recommended PR Order -1. Client streaming facade completion normalization. -2. Provider layer and Effect AI cleanup. -3. MCP parity/deletion decision and workbench platform polish. +1. Provider layer and Effect AI cleanup. +2. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/client/src/__tests__/rpc-timeout.test.ts b/ts/packages/client/src/__tests__/rpc-timeout.test.ts index 4ed38291..236971e1 100644 --- a/ts/packages/client/src/__tests__/rpc-timeout.test.ts +++ b/ts/packages/client/src/__tests__/rpc-timeout.test.ts @@ -1,6 +1,6 @@ import { Effect } from "effect"; import { describe, expect, it, vi } from "vitest"; -import { DispatchError } from "../rpc/contract"; +import { DispatchError, DispatchStreamChunk } from "../rpc/contract"; import { type DispatchInput, withDispatchRequestPolicy } from "../socket/effect-rpc-client"; import { makeBaseApiWithRpc } from "../socket/trustgraph-socket"; @@ -58,4 +58,75 @@ describe("Effect RPC request policy", () => { expect(attempts).toBe(3); }); + + it("forwards normalized stream completion to flow streaming facades", () => { + const dispatchStream = vi.fn((_input: DispatchInput, receiver: (chunk: DispatchStreamChunk) => boolean) => { + const firstComplete = receiver(DispatchStreamChunk.make({ + response: { response: "alpha" }, + complete: false, + })); + const secondComplete = receiver(DispatchStreamChunk.make({ + response: { + response: "omega", + in_token: 1, + out_token: 2, + model: "test-model", + }, + complete: true, + })); + return Promise.resolve( + DispatchStreamChunk.make({ + response: { response: "omega" }, + complete: true, + }), + ).then((chunk) => { + expect(firstComplete).toBe(false); + expect(secondComplete).toBe(true); + return chunk; + }); + }); + const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", { + dispatch: vi.fn(() => Promise.resolve({ ok: true })), + dispatchStream, + close: vi.fn(() => Promise.resolve()), + subscribe: vi.fn(() => () => {}), + }); + const chunks: Array<{ + readonly chunk: string; + readonly complete: boolean; + readonly metadata?: { readonly in_token?: number; readonly out_token?: number; readonly model?: string }; + }> = []; + + api.flow("flow-a").graphRagStreaming( + "hello", + (chunk, complete, metadata) => { + chunks.push(metadata === undefined ? { chunk, complete } : { chunk, complete, metadata }); + }, + () => undefined, + ); + + expect(dispatchStream).toHaveBeenCalledWith( + { + scope: "flow", + service: "graph-rag", + flow: "flow-a", + request: { + query: "hello", + user: "alice", + collection: "default", + streaming: true, + }, + }, + expect.any(Function), + { timeoutMs: 60000 }, + ); + expect(chunks).toEqual([ + { chunk: "alpha", complete: false }, + { + chunk: "omega", + complete: true, + metadata: { in_token: 1, out_token: 2, model: "test-model" }, + }, + ]); + }); }); diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index 3d3183f2..2f8c4f8d 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -9,6 +9,7 @@ import { } from "./effect-rpc-client.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; import { Clock, Effect, Option, Result, Schema as S } from "effect"; +import * as Predicate from "effect/Predicate"; // Import all message types for different services import type { @@ -154,24 +155,23 @@ function dispatchOptions( return options; } -function streamingMetadataFrom(source: { - in_token?: number; - out_token?: number; - model?: string; -}): StreamingMetadata | undefined { +function streamingMetadataFrom(source: unknown): StreamingMetadata | undefined { const metadata: StreamingMetadata = {}; let hasMetadata = false; - if (source.in_token !== undefined) { - metadata.in_token = source.in_token; + const inToken = numberProperty(source, "in_token"); + if (inToken !== undefined) { + metadata.in_token = inToken; hasMetadata = true; } - if (source.out_token !== undefined) { - metadata.out_token = source.out_token; + const outToken = numberProperty(source, "out_token"); + if (outToken !== undefined) { + metadata.out_token = outToken; hasMetadata = true; } - if (source.model !== undefined) { - metadata.model = source.model; + const model = stringProperty(source, "model"); + if (model !== undefined) { + metadata.model = model; hasMetadata = true; } @@ -233,6 +233,92 @@ const logClientError = (message: string, error: unknown): void => { Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) })); }; +const StreamingEnvelopeSchema = S.Struct({ + response: S.optionalKey(S.Unknown), + complete: S.optionalKey(S.Boolean), + error: S.optionalKey(S.String), +}); +type StreamingEnvelope = typeof StreamingEnvelopeSchema.Type; + +const ClientTripleSchema: S.Codec = S.suspend(() => + S.Struct({ + s: ClientTermSchema, + p: ClientTermSchema, + o: ClientTermSchema, + g: S.optionalKey(S.String), + }) +); + +const ClientTermSchema: S.Codec = S.suspend(() => + S.Union([ + S.Struct({ + t: S.Literal("i"), + i: S.String, + }), + S.Struct({ + t: S.Literal("b"), + d: S.String, + }), + S.Struct({ + t: S.Literal("l"), + v: S.String, + dt: S.optionalKey(S.String), + ln: S.optionalKey(S.String), + }), + S.Struct({ + t: S.Literal("t"), + tr: S.optionalKey(ClientTripleSchema), + }), + ]) +); + +const decodeStreamingEnvelope = S.decodeUnknownOption(StreamingEnvelopeSchema); +const decodeClientTriples = S.decodeUnknownOption(S.Array(ClientTripleSchema).pipe(S.mutable)); + +function streamingEnvelopeFrom(message: unknown): StreamingEnvelope { + return Option.getOrElse(decodeStreamingEnvelope(message), () => ({ + complete: true, + error: "Streaming message could not be decoded", + })); +} + +function propertyValue(source: unknown, key: string): unknown | undefined { + return Predicate.hasProperty(source, key) ? source[key] : undefined; +} + +function stringProperty(source: unknown, key: string): string | undefined { + const value = propertyValue(source, key); + return typeof value === "string" ? value : undefined; +} + +function numberProperty(source: unknown, key: string): number | undefined { + const value = propertyValue(source, key); + return typeof value === "number" ? value : undefined; +} + +function booleanProperty(source: unknown, key: string): boolean | undefined { + const value = propertyValue(source, key); + return typeof value === "boolean" ? value : undefined; +} + +function responseErrorMessage(source: unknown): string | undefined { + const error = propertyValue(source, "error"); + if (typeof error === "string") return error; + return stringProperty(error, "message"); +} + +function streamComplete( + envelope: StreamingEnvelope, + response: unknown, + responseMarkers: ReadonlyArray = [], +): boolean { + return envelope.complete === true || responseMarkers.some((key) => booleanProperty(response, key) === true); +} + +function explainTriplesFrom(source: unknown): Triple[] | undefined { + return Option.getOrUndefined(decodeClientTriples(propertyValue(source, "explain_triples"))); +} + /** * Socket interface defining all available operations for the TrustGraph API * This provides a unified interface for various AI/ML and knowledge graph @@ -978,7 +1064,7 @@ export function makeLibrarianApi(api: BaseApi) { chunkSize?: number, ): void { const receiver = (message: unknown): boolean => { - const msg = message as { response?: StreamDocumentResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -988,17 +1074,23 @@ export function makeLibrarianApi(api: BaseApi) { const resp = msg.response; if (resp === undefined) { - return msg.complete === true; + return streamComplete(msg, resp); } // Check for response-level error - if (resp.error !== undefined) { - onError(resp.error.message); + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); return true; } - const complete = msg.complete === true; - onChunk(resp.content, resp["chunk-index"], resp["total-chunks"], complete); + const complete = streamComplete(msg, resp); + onChunk( + stringProperty(resp, "content") ?? "", + numberProperty(resp, "chunk-index") ?? 0, + numberProperty(resp, "total-chunks") ?? 0, + complete, + ); return complete; }; @@ -1393,7 +1485,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { collection?: string, ) { const receiver = (message: unknown) => { - const msg = message as { response?: AgentResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -1404,36 +1496,41 @@ export function makeFlowApi(api: BaseApi, flowId: string) { const resp = msg.response ?? {}; // Check for errors in response - if (resp.chunk_type === "error" || resp.error !== undefined) { - error(resp.error?.message ?? "Unknown agent error"); + const responseError = responseErrorMessage(resp); + if (stringProperty(resp, "chunk_type") === "error" || responseError !== undefined) { + error(responseError ?? "Unknown agent error"); return true; // End streaming on error } // Handle explainability events (agent uses chunk_type="explain") + const chunkType = stringProperty(resp, "chunk_type"); + const messageType = stringProperty(resp, "message_type"); + const explainId = stringProperty(resp, "explain_id"); + const explainTriples = explainTriplesFrom(resp); if ( - (resp.chunk_type === "explain" || resp.message_type === "explain") && - (resp.explain_id !== undefined || resp.explain_triples !== undefined) + (chunkType === "explain" || messageType === "explain") && + (explainId !== undefined || explainTriples !== undefined) ) { const event: ExplainEvent = { - explainId: resp.explain_id ?? "", - explainGraph: resp.explain_graph ?? "", + explainId: explainId ?? "", + explainGraph: stringProperty(resp, "explain_graph") ?? "", }; - if (resp.explain_triples !== undefined) { - event.explainTriples = resp.explain_triples as Triple[]; + if (explainTriples !== undefined) { + event.explainTriples = explainTriples; } onExplain?.(event); return false; } // Handle streaming chunks by chunk_type - const content = resp.content ?? ""; - const messageComplete = resp.end_of_message === true; - const dialogComplete = msg.complete === true || resp.end_of_dialog === true; + const content = stringProperty(resp, "content") ?? ""; + const messageComplete = booleanProperty(resp, "end_of_message") === true; + const dialogComplete = streamComplete(msg, resp, ["end_of_dialog"]); // Extract metadata from final message const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined; - switch (resp.chunk_type) { + switch (chunkType) { case "thought": think(content, messageComplete, metadata); break; @@ -1493,7 +1590,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { onExplain?: (event: ExplainEvent) => void, ): void { const recv = (message: unknown): boolean => { - const msg = message as { response?: GraphRagResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -1501,37 +1598,45 @@ export function makeFlowApi(api: BaseApi, flowId: string) { return true; } - const resp = (msg.response ?? {}) as GraphRagResponse; + const resp = msg.response ?? {}; // Check for response-level error - if (resp.error !== undefined) { - onError(resp.error.message); + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); return true; } // Extract explain data if present (may be embedded in the answer message) + const messageType = stringProperty(resp, "message_type"); + const explainId = stringProperty(resp, "explain_id"); + const explainTriples = explainTriplesFrom(resp); if ( - resp.message_type === "explain" && - (resp.explain_id !== undefined || resp.explain_triples !== undefined) + messageType === "explain" && + (explainId !== undefined || explainTriples !== undefined) ) { const event: ExplainEvent = { - explainId: resp.explain_id ?? "", - explainGraph: resp.explain_graph ?? "", + explainId: explainId ?? "", + explainGraph: stringProperty(resp, "explain_graph") ?? "", }; - if (resp.explain_triples !== undefined) { - event.explainTriples = resp.explain_triples as Triple[]; + if (explainTriples !== undefined) { + event.explainTriples = explainTriples; } onExplain?.(event); // If this message also carries answer text, fall through to chunk handling. // If it's a standalone explain event (no answer text), stop here. - if (resp.response === undefined && resp.endOfStream !== true && resp.end_of_session !== true) { + if ( + stringProperty(resp, "response") === undefined && + booleanProperty(resp, "endOfStream") !== true && + booleanProperty(resp, "end_of_session") !== true + ) { return false; } } // Handle chunk messages (default behavior) - const chunk = resp.response ?? resp.chunk ?? ""; - const complete = resp.end_of_session === true || resp.endOfStream === true || msg.complete === true; + const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? ""; + const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]); // Extract metadata from final message const metadata = complete ? streamingMetadataFrom(resp) : undefined; @@ -1592,7 +1697,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { onExplain?: (event: ExplainEvent) => void, ): void { const recv = (message: unknown): boolean => { - const msg = message as { response?: DocumentRagResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -1600,29 +1705,32 @@ export function makeFlowApi(api: BaseApi, flowId: string) { return true; } - const resp = (msg.response ?? {}) as DocumentRagResponse; + const resp = msg.response ?? {}; // Check for response-level error - if (resp.error !== undefined) { - onError(resp.error.message); + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); return true; } // Handle explainability events + const explainId = stringProperty(resp, "explain_id"); + const explainGraph = stringProperty(resp, "explain_graph"); if ( - resp.message_type === "explain" && - resp.explain_id !== undefined && - resp.explain_graph !== undefined + stringProperty(resp, "message_type") === "explain" && + explainId !== undefined && + explainGraph !== undefined ) { onExplain?.({ - explainId: resp.explain_id, - explainGraph: resp.explain_graph, + explainId, + explainGraph, }); return false; } - const chunk = resp.response ?? resp.chunk ?? ""; - const complete = resp.end_of_session === true || resp.endOfStream === true || msg.complete === true; + const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? ""; + const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]); // Extract metadata from final message const metadata = complete ? streamingMetadataFrom(resp) : undefined; @@ -1671,7 +1779,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { onError: (error: string) => void, ): void { const recv = (message: unknown): boolean => { - const msg = message as { response?: TextCompletionResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -1679,17 +1787,18 @@ export function makeFlowApi(api: BaseApi, flowId: string) { return true; } - const resp = (msg.response ?? {}) as TextCompletionResponse; + const resp = msg.response ?? {}; // Check for response-level error - if (resp.error !== undefined) { - onError(resp.error.message); + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); return true; } // Text completion uses 'response' field for chunks - const chunk = resp.response ?? ""; - const complete = msg.complete === true; + const chunk = stringProperty(resp, "response") ?? ""; + const complete = streamComplete(msg, resp); // Extract metadata from final message const metadata = complete ? streamingMetadataFrom(resp) : undefined; @@ -1729,7 +1838,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { onError: (error: string) => void, ): void { const recv = (message: unknown): boolean => { - const msg = message as { response?: PromptResponse; complete?: boolean; error?: string }; + const msg = streamingEnvelopeFrom(message); // Check for top-level error if (msg.error !== undefined) { @@ -1737,17 +1846,18 @@ export function makeFlowApi(api: BaseApi, flowId: string) { return true; } - const resp = (msg.response ?? {}) as PromptResponse; + const resp = msg.response ?? {}; // Check for response-level error - if (resp.error !== undefined) { - onError(resp.error.message); + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); return true; } // Prompt service uses 'text' field for chunks - const chunk = resp.text ?? ""; - const complete = msg.complete === true; + const chunk = stringProperty(resp, "text") ?? ""; + const complete = streamComplete(msg, resp); // Extract metadata from final message const metadata = complete ? streamingMetadataFrom(resp) : undefined;