Normalize client stream envelopes

This commit is contained in:
elpresidank 2026-06-02 04:18:11 -05:00
parent 8287e1cf93
commit 32788ec0e4
3 changed files with 287 additions and 78 deletions

View file

@ -12,8 +12,8 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 Qdrant
config/schema/fakeability slice:
Current signal counts from `ts/packages` after the 2026-06-02 client streaming
facade normalization slice:
| Signal | Count |
| --- | ---: |
@ -114,6 +114,11 @@ Notes:
scoped finalizer slice. `Effect.runPromise` increased because the new tests
and legacy service initialization logs run Effects at compatibility
boundaries.
- The client streaming facade slice did not change signal counts. It
centralized the legacy streaming `{ response, complete, error }` envelope
decode in `trustgraph-socket.ts`, uses Schema plus `effect/Predicate`
property narrowing for streaming payload reads, and leaves service-specific
legacy completion markers only where they preserve public callback behavior.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`.
@ -465,6 +470,7 @@ Notes:
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
- `git diff --check`
### 2026-06-02: Client Socket Tagged Error And JSON Slice
@ -758,9 +764,6 @@ Notes:
- Gateway dispatcher tests now exercise both the Promise compatibility
streaming path and the Effect-native responder path.
- Remaining:
- Client facade methods still duplicate some per-service streaming envelope
completion checks. Centralize these around `DispatchStreamChunk.complete`
in a later client API cleanup.
- `ts/packages/flow/src/gateway/rpc-protocol.ts` remains a Fastify socket
compatibility bridge, not a direct replacement target for Effect RPC
server layers yet.
@ -774,6 +777,32 @@ Notes:
- `cd ts && bun run test`
- `git diff --check`
### 2026-06-02: Client Streaming Facade Normalization Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/client/src/socket/trustgraph-socket.ts` now decodes the
legacy streaming envelope with Schema before service-specific callback
handling.
- Streaming payload reads now use `effect/Predicate` property narrowing
helpers instead of repeated response-wrapper assertions.
- Graph RAG, document RAG, text completion, prompt, agent, and document
stream callbacks now use a shared `streamComplete(...)` helper. The RPC
`DispatchStreamChunk.complete` bit is the default transport completion
source, with legacy service markers preserved for public compatibility.
- Explainability triples are decoded through a recursive Schema instead of
`as Triple[]`.
- The focused client test now proves normalized `DispatchStreamChunk`
completion flows through `graphRagStreaming` and final metadata.
- Verification:
- `bunx --bun vitest run src/__tests__/rpc-timeout.test.ts`
- `bun run --cwd ts/packages/client build`
- `cd ts && bun run check:tsgo`
- `bun run --cwd ts/packages/client test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
### 2026-06-02: FalkorDB Scoped Client Lifecycle Slice
- Status: migrated and root-verified.
@ -871,9 +900,9 @@ Notes:
The remaining client `newableFactory` assertions are documented as public
API compatibility boundaries for this loop.
- Gateway `DispatchStream` now uses Effect-native dispatcher streaming
callbacks instead of nested `Effect.runPromiseWith`; the remaining client
streaming cleanup is facade-level completion normalization around
`DispatchStreamChunk.complete`.
callbacks instead of nested `Effect.runPromiseWith`, and client streaming
facade callbacks now decode the legacy envelope through Schema before
applying service-specific public callback semantics.
- Do not make `gateway/rpc-protocol.ts` the next cleanup target: it is a
Fastify socket compatibility bridge while the public Effect RPC server
layers require SocketServer or Effect HTTP routing.
@ -948,9 +977,8 @@ Notes:
## Recommended PR Order
1. Client streaming facade completion normalization.
2. Provider layer and Effect AI cleanup.
3. MCP parity/deletion decision and workbench platform polish.
1. Provider layer and Effect AI cleanup.
2. MCP parity/deletion decision and workbench platform polish.
## No-Op Rules

View file

@ -1,6 +1,6 @@
import { Effect } from "effect";
import { describe, expect, it, vi } from "vitest";
import { DispatchError } from "../rpc/contract";
import { DispatchError, DispatchStreamChunk } from "../rpc/contract";
import { type DispatchInput, withDispatchRequestPolicy } from "../socket/effect-rpc-client";
import { makeBaseApiWithRpc } from "../socket/trustgraph-socket";
@ -58,4 +58,75 @@ describe("Effect RPC request policy", () => {
expect(attempts).toBe(3);
});
it("forwards normalized stream completion to flow streaming facades", () => {
const dispatchStream = vi.fn((_input: DispatchInput, receiver: (chunk: DispatchStreamChunk) => boolean) => {
const firstComplete = receiver(DispatchStreamChunk.make({
response: { response: "alpha" },
complete: false,
}));
const secondComplete = receiver(DispatchStreamChunk.make({
response: {
response: "omega",
in_token: 1,
out_token: 2,
model: "test-model",
},
complete: true,
}));
return Promise.resolve(
DispatchStreamChunk.make({
response: { response: "omega" },
complete: true,
}),
).then((chunk) => {
expect(firstComplete).toBe(false);
expect(secondComplete).toBe(true);
return chunk;
});
});
const api = makeBaseApiWithRpc("alice", undefined, "ws://example.test/rpc", {
dispatch: vi.fn(() => Promise.resolve({ ok: true })),
dispatchStream,
close: vi.fn(() => Promise.resolve()),
subscribe: vi.fn(() => () => {}),
});
const chunks: Array<{
readonly chunk: string;
readonly complete: boolean;
readonly metadata?: { readonly in_token?: number; readonly out_token?: number; readonly model?: string };
}> = [];
api.flow("flow-a").graphRagStreaming(
"hello",
(chunk, complete, metadata) => {
chunks.push(metadata === undefined ? { chunk, complete } : { chunk, complete, metadata });
},
() => undefined,
);
expect(dispatchStream).toHaveBeenCalledWith(
{
scope: "flow",
service: "graph-rag",
flow: "flow-a",
request: {
query: "hello",
user: "alice",
collection: "default",
streaming: true,
},
},
expect.any(Function),
{ timeoutMs: 60000 },
);
expect(chunks).toEqual([
{ chunk: "alpha", complete: false },
{
chunk: "omega",
complete: true,
metadata: { in_token: 1, out_token: 2, model: "test-model" },
},
]);
});
});

View file

@ -9,6 +9,7 @@ import {
} from "./effect-rpc-client.js";
import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js";
import { Clock, Effect, Option, Result, Schema as S } from "effect";
import * as Predicate from "effect/Predicate";
// Import all message types for different services
import type {
@ -154,24 +155,23 @@ function dispatchOptions(
return options;
}
function streamingMetadataFrom(source: {
in_token?: number;
out_token?: number;
model?: string;
}): StreamingMetadata | undefined {
function streamingMetadataFrom(source: unknown): StreamingMetadata | undefined {
const metadata: StreamingMetadata = {};
let hasMetadata = false;
if (source.in_token !== undefined) {
metadata.in_token = source.in_token;
const inToken = numberProperty(source, "in_token");
if (inToken !== undefined) {
metadata.in_token = inToken;
hasMetadata = true;
}
if (source.out_token !== undefined) {
metadata.out_token = source.out_token;
const outToken = numberProperty(source, "out_token");
if (outToken !== undefined) {
metadata.out_token = outToken;
hasMetadata = true;
}
if (source.model !== undefined) {
metadata.model = source.model;
const model = stringProperty(source, "model");
if (model !== undefined) {
metadata.model = model;
hasMetadata = true;
}
@ -233,6 +233,92 @@ const logClientError = (message: string, error: unknown): void => {
Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) }));
};
const StreamingEnvelopeSchema = S.Struct({
response: S.optionalKey(S.Unknown),
complete: S.optionalKey(S.Boolean),
error: S.optionalKey(S.String),
});
type StreamingEnvelope = typeof StreamingEnvelopeSchema.Type;
const ClientTripleSchema: S.Codec<Triple, Triple> = S.suspend(() =>
S.Struct({
s: ClientTermSchema,
p: ClientTermSchema,
o: ClientTermSchema,
g: S.optionalKey(S.String),
})
);
const ClientTermSchema: S.Codec<Term, Term> = S.suspend(() =>
S.Union([
S.Struct({
t: S.Literal("i"),
i: S.String,
}),
S.Struct({
t: S.Literal("b"),
d: S.String,
}),
S.Struct({
t: S.Literal("l"),
v: S.String,
dt: S.optionalKey(S.String),
ln: S.optionalKey(S.String),
}),
S.Struct({
t: S.Literal("t"),
tr: S.optionalKey(ClientTripleSchema),
}),
])
);
const decodeStreamingEnvelope = S.decodeUnknownOption(StreamingEnvelopeSchema);
const decodeClientTriples = S.decodeUnknownOption(S.Array(ClientTripleSchema).pipe(S.mutable));
function streamingEnvelopeFrom(message: unknown): StreamingEnvelope {
return Option.getOrElse(decodeStreamingEnvelope(message), () => ({
complete: true,
error: "Streaming message could not be decoded",
}));
}
function propertyValue(source: unknown, key: string): unknown | undefined {
return Predicate.hasProperty(source, key) ? source[key] : undefined;
}
function stringProperty(source: unknown, key: string): string | undefined {
const value = propertyValue(source, key);
return typeof value === "string" ? value : undefined;
}
function numberProperty(source: unknown, key: string): number | undefined {
const value = propertyValue(source, key);
return typeof value === "number" ? value : undefined;
}
function booleanProperty(source: unknown, key: string): boolean | undefined {
const value = propertyValue(source, key);
return typeof value === "boolean" ? value : undefined;
}
function responseErrorMessage(source: unknown): string | undefined {
const error = propertyValue(source, "error");
if (typeof error === "string") return error;
return stringProperty(error, "message");
}
function streamComplete(
envelope: StreamingEnvelope,
response: unknown,
responseMarkers: ReadonlyArray<string> = [],
): boolean {
return envelope.complete === true || responseMarkers.some((key) => booleanProperty(response, key) === true);
}
function explainTriplesFrom(source: unknown): Triple[] | undefined {
return Option.getOrUndefined(decodeClientTriples(propertyValue(source, "explain_triples")));
}
/**
* Socket interface defining all available operations for the TrustGraph API
* This provides a unified interface for various AI/ML and knowledge graph
@ -978,7 +1064,7 @@ export function makeLibrarianApi(api: BaseApi) {
chunkSize?: number,
): void {
const receiver = (message: unknown): boolean => {
const msg = message as { response?: StreamDocumentResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -988,17 +1074,23 @@ export function makeLibrarianApi(api: BaseApi) {
const resp = msg.response;
if (resp === undefined) {
return msg.complete === true;
return streamComplete(msg, resp);
}
// Check for response-level error
if (resp.error !== undefined) {
onError(resp.error.message);
const responseError = responseErrorMessage(resp);
if (responseError !== undefined) {
onError(responseError);
return true;
}
const complete = msg.complete === true;
onChunk(resp.content, resp["chunk-index"], resp["total-chunks"], complete);
const complete = streamComplete(msg, resp);
onChunk(
stringProperty(resp, "content") ?? "",
numberProperty(resp, "chunk-index") ?? 0,
numberProperty(resp, "total-chunks") ?? 0,
complete,
);
return complete;
};
@ -1393,7 +1485,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
collection?: string,
) {
const receiver = (message: unknown) => {
const msg = message as { response?: AgentResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -1404,36 +1496,41 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
const resp = msg.response ?? {};
// Check for errors in response
if (resp.chunk_type === "error" || resp.error !== undefined) {
error(resp.error?.message ?? "Unknown agent error");
const responseError = responseErrorMessage(resp);
if (stringProperty(resp, "chunk_type") === "error" || responseError !== undefined) {
error(responseError ?? "Unknown agent error");
return true; // End streaming on error
}
// Handle explainability events (agent uses chunk_type="explain")
const chunkType = stringProperty(resp, "chunk_type");
const messageType = stringProperty(resp, "message_type");
const explainId = stringProperty(resp, "explain_id");
const explainTriples = explainTriplesFrom(resp);
if (
(resp.chunk_type === "explain" || resp.message_type === "explain") &&
(resp.explain_id !== undefined || resp.explain_triples !== undefined)
(chunkType === "explain" || messageType === "explain") &&
(explainId !== undefined || explainTriples !== undefined)
) {
const event: ExplainEvent = {
explainId: resp.explain_id ?? "",
explainGraph: resp.explain_graph ?? "",
explainId: explainId ?? "",
explainGraph: stringProperty(resp, "explain_graph") ?? "",
};
if (resp.explain_triples !== undefined) {
event.explainTriples = resp.explain_triples as Triple[];
if (explainTriples !== undefined) {
event.explainTriples = explainTriples;
}
onExplain?.(event);
return false;
}
// Handle streaming chunks by chunk_type
const content = resp.content ?? "";
const messageComplete = resp.end_of_message === true;
const dialogComplete = msg.complete === true || resp.end_of_dialog === true;
const content = stringProperty(resp, "content") ?? "";
const messageComplete = booleanProperty(resp, "end_of_message") === true;
const dialogComplete = streamComplete(msg, resp, ["end_of_dialog"]);
// Extract metadata from final message
const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined;
switch (resp.chunk_type) {
switch (chunkType) {
case "thought":
think(content, messageComplete, metadata);
break;
@ -1493,7 +1590,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
onExplain?: (event: ExplainEvent) => void,
): void {
const recv = (message: unknown): boolean => {
const msg = message as { response?: GraphRagResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -1501,37 +1598,45 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
return true;
}
const resp = (msg.response ?? {}) as GraphRagResponse;
const resp = msg.response ?? {};
// Check for response-level error
if (resp.error !== undefined) {
onError(resp.error.message);
const responseError = responseErrorMessage(resp);
if (responseError !== undefined) {
onError(responseError);
return true;
}
// Extract explain data if present (may be embedded in the answer message)
const messageType = stringProperty(resp, "message_type");
const explainId = stringProperty(resp, "explain_id");
const explainTriples = explainTriplesFrom(resp);
if (
resp.message_type === "explain" &&
(resp.explain_id !== undefined || resp.explain_triples !== undefined)
messageType === "explain" &&
(explainId !== undefined || explainTriples !== undefined)
) {
const event: ExplainEvent = {
explainId: resp.explain_id ?? "",
explainGraph: resp.explain_graph ?? "",
explainId: explainId ?? "",
explainGraph: stringProperty(resp, "explain_graph") ?? "",
};
if (resp.explain_triples !== undefined) {
event.explainTriples = resp.explain_triples as Triple[];
if (explainTriples !== undefined) {
event.explainTriples = explainTriples;
}
onExplain?.(event);
// If this message also carries answer text, fall through to chunk handling.
// If it's a standalone explain event (no answer text), stop here.
if (resp.response === undefined && resp.endOfStream !== true && resp.end_of_session !== true) {
if (
stringProperty(resp, "response") === undefined &&
booleanProperty(resp, "endOfStream") !== true &&
booleanProperty(resp, "end_of_session") !== true
) {
return false;
}
}
// Handle chunk messages (default behavior)
const chunk = resp.response ?? resp.chunk ?? "";
const complete = resp.end_of_session === true || resp.endOfStream === true || msg.complete === true;
const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? "";
const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]);
// Extract metadata from final message
const metadata = complete ? streamingMetadataFrom(resp) : undefined;
@ -1592,7 +1697,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
onExplain?: (event: ExplainEvent) => void,
): void {
const recv = (message: unknown): boolean => {
const msg = message as { response?: DocumentRagResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -1600,29 +1705,32 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
return true;
}
const resp = (msg.response ?? {}) as DocumentRagResponse;
const resp = msg.response ?? {};
// Check for response-level error
if (resp.error !== undefined) {
onError(resp.error.message);
const responseError = responseErrorMessage(resp);
if (responseError !== undefined) {
onError(responseError);
return true;
}
// Handle explainability events
const explainId = stringProperty(resp, "explain_id");
const explainGraph = stringProperty(resp, "explain_graph");
if (
resp.message_type === "explain" &&
resp.explain_id !== undefined &&
resp.explain_graph !== undefined
stringProperty(resp, "message_type") === "explain" &&
explainId !== undefined &&
explainGraph !== undefined
) {
onExplain?.({
explainId: resp.explain_id,
explainGraph: resp.explain_graph,
explainId,
explainGraph,
});
return false;
}
const chunk = resp.response ?? resp.chunk ?? "";
const complete = resp.end_of_session === true || resp.endOfStream === true || msg.complete === true;
const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? "";
const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]);
// Extract metadata from final message
const metadata = complete ? streamingMetadataFrom(resp) : undefined;
@ -1671,7 +1779,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
onError: (error: string) => void,
): void {
const recv = (message: unknown): boolean => {
const msg = message as { response?: TextCompletionResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -1679,17 +1787,18 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
return true;
}
const resp = (msg.response ?? {}) as TextCompletionResponse;
const resp = msg.response ?? {};
// Check for response-level error
if (resp.error !== undefined) {
onError(resp.error.message);
const responseError = responseErrorMessage(resp);
if (responseError !== undefined) {
onError(responseError);
return true;
}
// Text completion uses 'response' field for chunks
const chunk = resp.response ?? "";
const complete = msg.complete === true;
const chunk = stringProperty(resp, "response") ?? "";
const complete = streamComplete(msg, resp);
// Extract metadata from final message
const metadata = complete ? streamingMetadataFrom(resp) : undefined;
@ -1729,7 +1838,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
onError: (error: string) => void,
): void {
const recv = (message: unknown): boolean => {
const msg = message as { response?: PromptResponse; complete?: boolean; error?: string };
const msg = streamingEnvelopeFrom(message);
// Check for top-level error
if (msg.error !== undefined) {
@ -1737,17 +1846,18 @@ export function makeFlowApi(api: BaseApi, flowId: string) {
return true;
}
const resp = (msg.response ?? {}) as PromptResponse;
const resp = msg.response ?? {};
// Check for response-level error
if (resp.error !== undefined) {
onError(resp.error.message);
const responseError = responseErrorMessage(resp);
if (responseError !== undefined) {
onError(responseError);
return true;
}
// Prompt service uses 'text' field for chunks
const chunk = resp.text ?? "";
const complete = msg.complete === true;
const chunk = stringProperty(resp, "text") ?? "";
const complete = streamComplete(msg, resp);
// Extract metadata from final message
const metadata = complete ? streamingMetadataFrom(resp) : undefined;