From a7bdbb92573e65269c115a1a8d8b2ae12de1ee6b Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 11 Jun 2026 08:06:31 -0500 Subject: [PATCH] refactor(ts): make client gateway effect native --- ts/packages/cli/src/commands/config.ts | 85 +- ts/packages/cli/src/commands/embeddings.ts | 15 +- ts/packages/cli/src/commands/flow.ts | 61 +- ts/packages/cli/src/commands/graph-rag.ts | 48 +- ts/packages/cli/src/commands/library.ts | 81 +- ts/packages/cli/src/commands/triples.ts | 27 +- ts/packages/cli/src/commands/util.ts | 77 +- ts/packages/client/src/models/Triple.ts | 63 +- ts/packages/client/src/models/messages.ts | 884 +++++++++--------- .../client/src/socket/effect-rpc-client.ts | 89 +- .../client/src/socket/trustgraph-socket.ts | 332 +++---- .../mcp/src/__tests__/server-effect.test.ts | 118 ++- ts/packages/mcp/src/server-effect.ts | 482 ++++++---- ts/packages/workbench/src/atoms/workbench.ts | 60 +- .../workbench/src/qa/initial-values.ts | 6 - ts/scripts/effect-laws.allowlist.json | 2 +- 16 files changed, 1168 insertions(+), 1262 deletions(-) diff --git a/ts/packages/cli/src/commands/config.ts b/ts/packages/cli/src/commands/config.ts index e910ffd1..5a706c94 100644 --- a/ts/packages/cli/src/commands/config.ts +++ b/ts/packages/cli/src/commands/config.ts @@ -7,16 +7,12 @@ import { Effect } from "effect"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; -import { cliCommandError, withSocket, writeJson } from "./util.js"; +import { gatewayDispatch, withGatewayClient, writeJson } from "./util.js"; const show = Command.make("show", {}, () => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const cfg = socket.config(); - const resp = yield* Effect.tryPromise({ - try: () => cfg.getConfigAll(), - catch: (error) => cliCommandError("config.show", error), - }); + const resp = yield* gatewayDispatch(client, "config.show", "config", { operation: "config" }, { timeoutMs: 60000 }); yield* writeJson(resp); }), ), @@ -25,19 +21,17 @@ const show = Command.make("show", {}, () => const get = Command.make("get", { key: Argument.string("key").pipe(Argument.withDescription("Config key (format: type/key)")), }, ({ key }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const cfg = socket.config(); - // Support "type/key" format; fall back to using the whole string as key - const parts = key.split("/"); - const configKey = - parts.length >= 2 - ? { type: parts[0], key: parts.slice(1).join("/") } - : { type: "config", key }; - const resp = yield* Effect.tryPromise({ - try: () => cfg.getConfig([configKey]), - catch: (error) => cliCommandError("config.get", error), - }); + const parts = key.split("/"); + const configKey = + parts.length >= 2 + ? { type: parts[0], key: parts.slice(1).join("/") } + : { type: "config", key }; + const resp = yield* gatewayDispatch(client, "config.get", "config", { + operation: "get", + keys: [configKey], + }, { timeoutMs: 60000 }); yield* writeJson(resp); }), ), @@ -47,18 +41,17 @@ const set = Command.make("set", { key: Argument.string("key").pipe(Argument.withDescription("Config key (format: type/key)")), value: Argument.string("value").pipe(Argument.withDescription("Config value (JSON)")), }, ({ key, value }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const cfg = socket.config(); - const parts = key.split("/"); - const configEntry = - parts.length >= 2 - ? { type: parts[0], key: parts.slice(1).join("/"), value } - : { type: "config", key, value }; - const resp = yield* Effect.tryPromise({ - try: () => cfg.putConfig([configEntry]), - catch: (error) => cliCommandError("config.set", error), - }); + const parts = key.split("/"); + const configEntry = + parts.length >= 2 + ? { type: parts[0], key: parts.slice(1).join("/"), value } + : { type: "config", key, value }; + const resp = yield* gatewayDispatch(client, "config.set", "config", { + operation: "put", + values: [configEntry], + }, { timeoutMs: 60000 }); yield* writeJson(resp); }), ), @@ -70,13 +63,12 @@ const list = Command.make("list", { Argument.withDefault("config"), ), }, ({ type }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const cfg = socket.config(); - const resp = yield* Effect.tryPromise({ - try: () => cfg.list(type), - catch: (error) => cliCommandError("config.list", error), - }); + const resp = yield* gatewayDispatch(client, "config.list", "config", { + operation: "list", + type, + }, { timeoutMs: 60000 }); yield* writeJson(resp); }), ), @@ -85,18 +77,17 @@ const list = Command.make("list", { const deleteCommand = Command.make("delete", { key: Argument.string("key").pipe(Argument.withDescription("Config key (format: type/key)")), }, ({ key }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const cfg = socket.config(); - const parts = key.split("/"); - const configKey = - parts.length >= 2 - ? { type: parts[0], key: parts.slice(1).join("/") } - : { type: "config", key }; - const resp = yield* Effect.tryPromise({ - try: () => cfg.deleteConfig(configKey), - catch: (error) => cliCommandError("config.delete", error), - }); + const parts = key.split("/"); + const configKey = + parts.length >= 2 + ? { type: parts[0], key: parts.slice(1).join("/") } + : { type: "config", key }; + const resp = yield* gatewayDispatch(client, "config.delete", "config", { + operation: "delete", + keys: [configKey], + }, { timeoutMs: 30000 }); yield* writeJson(resp); }), ), diff --git a/ts/packages/cli/src/commands/embeddings.ts b/ts/packages/cli/src/commands/embeddings.ts index e469e9ca..58e4e8f5 100644 --- a/ts/packages/cli/src/commands/embeddings.ts +++ b/ts/packages/cli/src/commands/embeddings.ts @@ -7,7 +7,7 @@ import { Effect } from "effect"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; -import { cliCommandError, withSocket, writeJson } from "./util.js"; +import { gatewayDispatch, withGatewayClient, writeJson } from "./util.js"; export const embeddingsCommand = Command.make("embeddings", { texts: Argument.string("text").pipe( @@ -15,14 +15,13 @@ export const embeddingsCommand = Command.make("embeddings", { Argument.variadic({ min: 1 }), ), }, ({ texts }) => - withSocket((socket, opts) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const flow = socket.flow(opts.flow); - const vectors = yield* Effect.tryPromise({ - try: () => flow.embeddings(Array.from(texts)), - catch: (error) => cliCommandError("embeddings", error), - }); - yield* writeJson(vectors); + const response = yield* gatewayDispatch(client, "embeddings", "embeddings", { + texts: Array.from(texts), + }, { flow: opts.flow, timeoutMs: 30000 }); + const record = response as Record; + yield* writeJson(record.vectors ?? []); }), ), ).pipe(Command.withDescription("Generate text embeddings")); diff --git a/ts/packages/cli/src/commands/flow.ts b/ts/packages/cli/src/commands/flow.ts index 920d3fab..405d083f 100644 --- a/ts/packages/cli/src/commands/flow.ts +++ b/ts/packages/cli/src/commands/flow.ts @@ -9,16 +9,14 @@ import * as S from "effect/Schema"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; import * as Flag from "effect/unstable/cli/Flag"; -import { cliCommandError, withSocket, writeJson } from "./util.js"; +import { cliCommandError, gatewayDispatch, withGatewayClient, writeJson } from "./util.js"; const list = Command.make("list", {}, () => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const flows = socket.flows(); - const ids = yield* Effect.tryPromise({ - try: () => flows.getFlows(), - catch: (error) => cliCommandError("flow.list", error), - }); + const response = yield* gatewayDispatch(client, "flow.list", "flow", { operation: "list-flows" }, { timeoutMs: 60000 }); + const record = response as Record; + const ids = Array.isArray(record["flow-ids"]) ? record["flow-ids"] : []; yield* writeJson(ids); }), ), @@ -27,13 +25,16 @@ const list = Command.make("list", {}, () => const get = Command.make("get", { id: Argument.string("id").pipe(Argument.withDescription("Flow ID")), }, ({ id }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const flows = socket.flows(); - const def = yield* Effect.tryPromise({ - try: () => flows.getFlow(id), - catch: (error) => cliCommandError("flow.get", error), - }); + const response = yield* gatewayDispatch(client, "flow.get", "flow", { + operation: "get-flow", + "flow-id": id, + }, { timeoutMs: 60000 }); + const record = response as Record; + const def = typeof record.flow === "string" + ? yield* S.decodeUnknownEffect(S.UnknownFromJsonString)(record.flow) + : record.flow; yield* writeJson(def); }), ), @@ -56,26 +57,23 @@ const start = Command.make("start", { Flag.optional, ), }, ({ id, blueprint, description, parameters }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const flows = socket.flows(); const rawParameters = parameters._tag === "Some" ? parameters.value : undefined; const params = rawParameters !== undefined && rawParameters.length > 0 - ? yield* S.decodeUnknownEffect(S.UnknownFromJsonString)(rawParameters).pipe( + ? yield* S.decodeUnknownEffect(S.UnknownFromJsonString)(rawParameters).pipe( Effect.flatMap(S.decodeUnknownEffect(S.Record(S.String, S.Unknown))), Effect.mapError((error) => cliCommandError("flow.start.parameters", error)), ) : undefined; - const resp = yield* Effect.tryPromise({ - try: () => - flows.startFlow( - id, - blueprint, - description, - params, - ), - catch: (error) => cliCommandError("flow.start", error), - }); + const request = { + operation: "start-flow", + "flow-id": id, + "blueprint-name": blueprint, + description, + ...(params !== undefined && Object.keys(params).length > 0 ? { parameters: params } : {}), + }; + const resp = yield* gatewayDispatch(client, "flow.start", "flow", request, { timeoutMs: 30000 }); yield* writeJson(resp); }), ), @@ -84,13 +82,12 @@ const start = Command.make("start", { const stop = Command.make("stop", { id: Argument.string("id").pipe(Argument.withDescription("Flow ID")), }, ({ id }) => - withSocket((socket) => + withGatewayClient((client) => Effect.gen(function* () { - const flows = socket.flows(); - const resp = yield* Effect.tryPromise({ - try: () => flows.stopFlow(id), - catch: (error) => cliCommandError("flow.stop", error), - }); + const resp = yield* gatewayDispatch(client, "flow.stop", "flow", { + operation: "stop-flow", + "flow-id": id, + }, { timeoutMs: 30000 }); yield* writeJson(resp); }), ), diff --git a/ts/packages/cli/src/commands/graph-rag.ts b/ts/packages/cli/src/commands/graph-rag.ts index cb4e2df6..db72027b 100644 --- a/ts/packages/cli/src/commands/graph-rag.ts +++ b/ts/packages/cli/src/commands/graph-rag.ts @@ -9,7 +9,7 @@ import * as O from "effect/Option"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; import * as Flag from "effect/unstable/cli/Flag"; -import { cliCommandError, withSocket, writeLine } from "./util.js"; +import { gatewayDispatch, withGatewayClient, writeLine } from "./util.js"; export const graphRagCommand = Command.make("graph-rag", { query: Argument.string("query").pipe(Argument.withDescription("Natural language query")), @@ -26,22 +26,17 @@ export const graphRagCommand = Command.make("graph-rag", { Flag.optional, ), }, ({ query, entityLimit, tripleLimit, collection }) => - withSocket((socket, opts) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const flow = socket.flow(opts.flow); - const response = yield* Effect.tryPromise({ - try: () => - flow.graphRag( - query, - { - entityLimit, - tripleLimit, - }, - O.getOrUndefined(collection), - ), - catch: (error) => cliCommandError("graph-rag", error), - }); - yield* writeLine(response); + const response = yield* gatewayDispatch(client, "graph-rag", "graph-rag", { + query, + user: opts.user, + collection: O.getOrUndefined(collection) ?? "default", + "entity-limit": entityLimit, + "triple-limit": tripleLimit, + }, { flow: opts.flow, timeoutMs: 60000 }); + const record = response as Record; + yield* writeLine(typeof record.response === "string" ? record.response : ""); }), ), ).pipe(Command.withDescription("Query the knowledge graph using RAG")); @@ -57,19 +52,16 @@ export const documentRagCommand = Command.make("document-rag", { Flag.optional, ), }, ({ query, docLimit, collection }) => - withSocket((socket, opts) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const flow = socket.flow(opts.flow); - const response = yield* Effect.tryPromise({ - try: () => - flow.documentRag( - query, - docLimit, - O.getOrUndefined(collection), - ), - catch: (error) => cliCommandError("document-rag", error), - }); - yield* writeLine(response); + const response = yield* gatewayDispatch(client, "document-rag", "document-rag", { + query, + user: opts.user, + collection: O.getOrUndefined(collection) ?? "default", + "doc-limit": docLimit, + }, { flow: opts.flow, timeoutMs: 60000 }); + const record = response as Record; + yield* writeLine(typeof record.response === "string" ? record.response : ""); }), ), ).pipe(Command.withDescription("Query documents using RAG")); diff --git a/ts/packages/cli/src/commands/library.ts b/ts/packages/cli/src/commands/library.ts index 59b50b97..ce7cb7a9 100644 --- a/ts/packages/cli/src/commands/library.ts +++ b/ts/packages/cli/src/commands/library.ts @@ -4,12 +4,12 @@ * Manages documents stored in the TrustGraph library. */ -import { Effect, Match } from "effect"; +import { Clock, Effect, Match } from "effect"; import * as O from "effect/Option"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; import * as Flag from "effect/unstable/cli/Flag"; -import { cliCommandError, withSocket, writeJson } from "./util.js"; +import { cliCommandError, gatewayDispatch, withGatewayClient, writeJson } from "./util.js"; function basenamePath(filepath: string): string { const normalized = filepath.replace(/\/+$/, ""); @@ -34,14 +34,14 @@ export function guessMimeType(filepath: string): string { } const list = Command.make("list", {}, () => - withSocket((socket) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const lib = socket.librarian(); - const docs = yield* Effect.tryPromise({ - try: () => lib.getDocuments(), - catch: (error) => cliCommandError("library.list", error), - }); - yield* writeJson(docs); + const response = yield* gatewayDispatch(client, "library.list", "librarian", { + operation: "list-documents", + user: opts.user, + }, { timeoutMs: 60000 }); + const record = response as Record; + yield* writeJson(record["document-metadatas"] ?? record.documents ?? []); }), ), ).pipe(Command.withDescription("List documents in the library")); @@ -72,9 +72,8 @@ const load = Command.make("load", { Flag.optional, ), }, ({ file, title, mimeType, comments, tags, id }) => - withSocket((socket) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const lib = socket.librarian(); const data = new Uint8Array(yield* Effect.tryPromise({ try: () => Bun.file(file).arrayBuffer(), catch: (error) => cliCommandError("library.load.read-file", error), @@ -82,19 +81,25 @@ const load = Command.make("load", { const b64 = Buffer.from(data).toString("base64"); const resolvedMimeType = O.getOrUndefined(mimeType) ?? guessMimeType(file); const resolvedTitle = O.getOrUndefined(title) ?? basenamePath(file); - - const resp = yield* Effect.tryPromise({ - try: () => - lib.loadDocument( - b64, - resolvedMimeType, - resolvedTitle, - comments, - Array.from(tags), - O.getOrUndefined(id), - ), - catch: (error) => cliCommandError("library.load", error), - }); + const timestamp = yield* Clock.currentTimeMillis; + const documentId = O.getOrUndefined(id); + const documentMetadata = { + time: Math.floor(timestamp / 1000), + kind: resolvedMimeType, + title: resolvedTitle, + comments, + user: opts.user, + tags: Array.from(tags), + "document-type": "source", + documentType: "source", + ...(documentId !== undefined ? { id: documentId } : {}), + }; + const resp = yield* gatewayDispatch(client, "library.load", "librarian", { + operation: "add-document", + "document-metadata": documentMetadata, + documentMetadata, + content: b64, + }, { timeoutMs: 30000 }); yield* writeJson(resp); }), ), @@ -107,27 +112,29 @@ const remove = Command.make("remove", { Flag.optional, ), }, ({ id, collection }) => - withSocket((socket) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const lib = socket.librarian(); - const resp = yield* Effect.tryPromise({ - try: () => lib.removeDocument(id, O.getOrUndefined(collection)), - catch: (error) => cliCommandError("library.remove", error), - }); + const resp = yield* gatewayDispatch(client, "library.remove", "librarian", { + operation: "remove-document", + "document-id": id, + documentId: id, + user: opts.user, + collection: O.getOrUndefined(collection) ?? "default", + }, { timeoutMs: 30000 }); yield* writeJson(resp); }), ), ).pipe(Command.withDescription("Remove a document from the library")); const processing = Command.make("processing", {}, () => - withSocket((socket) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const lib = socket.librarian(); - const items = yield* Effect.tryPromise({ - try: () => lib.getProcessing(), - catch: (error) => cliCommandError("library.processing", error), - }); - yield* writeJson(items); + const response = yield* gatewayDispatch(client, "library.processing", "librarian", { + operation: "list-processing", + user: opts.user, + }, { timeoutMs: 60000 }); + const record = response as Record; + yield* writeJson(record["processing-metadatas"] ?? record.processing ?? record["processing-metadata"] ?? []); }), ), ).pipe(Command.withDescription("List documents currently being processed")); diff --git a/ts/packages/cli/src/commands/triples.ts b/ts/packages/cli/src/commands/triples.ts index 27ab2dd7..6faa8e91 100644 --- a/ts/packages/cli/src/commands/triples.ts +++ b/ts/packages/cli/src/commands/triples.ts @@ -9,7 +9,7 @@ import { Effect } from "effect"; import * as O from "effect/Option"; import * as Command from "effect/unstable/cli/Command"; import * as Flag from "effect/unstable/cli/Flag"; -import { cliCommandError, withSocket, writeJson } from "./util.js"; +import { gatewayDispatch, withGatewayClient, writeJson } from "./util.js"; export const triplesCommand = Command.make("triples", { subject: Flag.string("subject").pipe( @@ -37,9 +37,8 @@ export const triplesCommand = Command.make("triples", { Flag.optional, ), }, ({ subject, predicate, object, limit, collection }) => - withSocket((socket, opts) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const flow = socket.flow(opts.flow); const subjectValue = O.getOrUndefined(subject); const predicateValue = O.getOrUndefined(predicate); const objectValue = O.getOrUndefined(object); @@ -53,18 +52,16 @@ export const triplesCommand = Command.make("triples", { ? { t: "i", i: objectValue } : undefined; - const triples = yield* Effect.tryPromise({ - try: () => - flow.triplesQuery( - s, - p, - o, - limit, - O.getOrUndefined(collection), - ), - catch: (error) => cliCommandError("triples", error), - }); - yield* writeJson(triples); + const response = yield* gatewayDispatch(client, "triples", "triples", { + limit, + user: opts.user, + collection: O.getOrUndefined(collection) ?? "default", + ...(s !== undefined ? { s } : {}), + ...(p !== undefined ? { p } : {}), + ...(o !== undefined ? { o } : {}), + }, { flow: opts.flow, timeoutMs: 30000 }); + const record = response as Record; + yield* writeJson(record.triples ?? record.response ?? []); }), ), ).pipe(Command.withDescription("Query knowledge graph triples")); diff --git a/ts/packages/cli/src/commands/util.ts b/ts/packages/cli/src/commands/util.ts index f0a1deb3..71da2376 100644 --- a/ts/packages/cli/src/commands/util.ts +++ b/ts/packages/cli/src/commands/util.ts @@ -3,14 +3,13 @@ */ import type { - BaseApi, + DispatchOptions, TrustGraphGatewayClient, } from "@trustgraph/client"; import { - createTrustGraphSocket, makeTrustGraphGatewayClientScoped, } from "@trustgraph/client"; -import { Duration, Effect } from "effect"; +import { Effect } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import * as Command from "effect/unstable/cli/Command"; @@ -85,37 +84,6 @@ export const writeJson = (value: unknown) => Effect.flatMap(writeLine), ); -/** - * Create a BaseApi socket client and wait for the connection to be established. - * The client auto-connects; we listen for the first "connected/authenticated" - * state before handing it back to the caller. - */ -export function createSocketEffect(opts: CliOpts): Effect.Effect { - const socket = createTrustGraphSocket(opts.user, opts.token, opts.gateway); - - return Effect.callback((resume) => { - const unsub = socket.onConnectionStateChange((state) => { - if (state.status === "authenticated" || state.status === "unauthenticated") { - unsub(); - resume(Effect.void); - } else if (state.status === "failed") { - unsub(); - resume(Effect.fail(cliCommandError("connect", state.lastError ?? "WebSocket connection failed"))); - } - }); - - return Effect.sync(() => { - unsub(); - }); - }).pipe( - Effect.timeout(Duration.seconds(15)), - Effect.catchTag("TimeoutError", () => - Effect.fail(cliCommandError("connect", "Timed out waiting for WebSocket connection")), - ), - Effect.as(socket), - ); -} - function gatewayUrlWithToken(opts: CliOpts): string { if (opts.token === undefined || opts.token.length === 0) return opts.gateway; const separator = opts.gateway.includes("?") ? "&" : "?"; @@ -133,16 +101,37 @@ export const withGatewayClient = Effect.fn("withGatewayClient")(function* ( - use: (socket: BaseApi, opts: CliOpts) => Effect.Effect, +export interface GatewayDispatchOptions { + readonly flow?: string; + readonly timeoutMs?: number; + readonly retries?: number; +} + +export const gatewayDispatch = Effect.fn("gatewayDispatch")(function*( + client: TrustGraphGatewayClient, + operation: string, + service: string, + request: Record, + options: GatewayDispatchOptions = {}, ) { - const opts = yield* getOpts; - return yield* Effect.acquireUseRelease( - createSocketEffect(opts), - (socket) => use(socket, opts), - (socket) => - Effect.sync(() => { - socket.close(); - }), + const input = options.flow === undefined + ? { + scope: "global" as const, + service, + request, + } + : { + scope: "flow" as const, + service, + flow: options.flow, + request, + }; + const dispatchOptions: DispatchOptions = { + ...(options.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}), + ...(options.retries !== undefined ? { retries: options.retries } : {}), + }; + + return yield* client.dispatch(input, dispatchOptions).pipe( + Effect.mapError((error) => cliCommandError(operation, error)), ); }); diff --git a/ts/packages/client/src/models/Triple.ts b/ts/packages/client/src/models/Triple.ts index dd7587fb..35b159be 100644 --- a/ts/packages/client/src/models/Triple.ts +++ b/ts/packages/client/src/models/Triple.ts @@ -1,42 +1,43 @@ - +import { Schema as S } from "effect"; // Term type discriminators matching the wire format // i = IRI, b = BLANK node, l = LITERAL, t = TRIPLE (reified) export type TermType = "i" | "b" | "l" | "t"; -export interface IriTerm { - t: "i"; - i: string; -} +export class IriTerm extends S.Class("IriTerm")({ + t: S.Literal("i"), + i: S.String, +}, { description: "IRI term in TrustGraph wire triples." }) {} -export interface BlankTerm { - t: "b"; - d: string; -} +export class BlankTerm extends S.Class("BlankTerm")({ + t: S.Literal("b"), + d: S.String, +}, { description: "Blank-node term in TrustGraph wire triples." }) {} -export interface LiteralTerm { - t: "l"; - v: string; - dt?: string; // datatype - ln?: string; // language -} +export class LiteralTerm extends S.Class("LiteralTerm")({ + t: S.Literal("l"), + v: S.String, + dt: S.optionalKey(S.String), + ln: S.optionalKey(S.String), +}, { description: "Literal term in TrustGraph wire triples." }) {} -export interface TripleTerm { - t: "t"; - tr?: Triple; -} +export class TripleTerm extends S.Class("TripleTerm")({ + t: S.Literal("t"), + tr: S.optionalKey(S.suspend((): S.Codec => Triple)), +}, { description: "Reified triple term in TrustGraph wire triples." }) {} -export type Term = IriTerm | BlankTerm | LiteralTerm | TripleTerm; +export const Term = S.Union([IriTerm, BlankTerm, LiteralTerm, TripleTerm]); +export type Term = typeof Term.Type; -export interface PartialTriple { - s?: Term; - p?: Term; - o?: Term; -} +export class PartialTriple extends S.Class("PartialTriple")({ + s: S.optionalKey(Term), + p: S.optionalKey(Term), + o: S.optionalKey(Term), +}, { description: "Partial triple pattern for query wildcards." }) {} -export interface Triple { - s: Term; - p: Term; - o: Term; - g?: string; // graph (renamed from direc to match backend) -} +export class Triple extends S.Class("Triple")({ + s: Term, + p: Term, + o: Term, + g: S.optionalKey(S.String), +}, { description: "TrustGraph wire triple, optionally scoped to a named graph." }) {} diff --git a/ts/packages/client/src/models/messages.ts b/ts/packages/client/src/models/messages.ts index 892724c1..ca1d73c9 100644 --- a/ts/packages/client/src/models/messages.ts +++ b/ts/packages/client/src/models/messages.ts @@ -1,521 +1,505 @@ -import type { Term, Triple } from "./Triple.js"; +import { Schema as S } from "effect"; +import { Term, Triple } from "./Triple.js"; export type Request = object; export type Response = object; - -export interface ResponseError { - type?: string; - message: string; -} - export type WireError = object | string; -export interface RequestMessage { - id: string; - service: string; - request: Request; - flow?: string; -} +const UnknownRecord = S.Record(S.String, S.Unknown); +const WireErrorValue = S.Union([S.String, UnknownRecord]); +const TypedMessageError = S.Struct({ + message: S.String, + type: S.optionalKey(S.String), +}); +const OptionalMessageError = S.Struct({ + message: S.optionalKey(S.String), +}); -export interface ApiResponse { - id: string; - response: Response; -} +const NumberArray = S.Array(S.Finite).pipe(S.mutable); +const NumberMatrix = S.Array(NumberArray).pipe(S.mutable); +const TripleArray = S.Array(Triple).pipe(S.mutable); +const StringArray = S.Array(S.String).pipe(S.mutable); -export interface Metadata { - id?: string; - metadata?: Triple[]; - user?: string; - collection?: string; -} +export class ResponseError extends S.Class("ResponseError")({ + type: S.optionalKey(S.String), + message: S.String, +}, { description: "TrustGraph response error payload." }) {} -export interface EntityEmbeddings { - entity?: Term; - vectors?: number[][]; -} +export class RequestMessage extends S.Class("RequestMessage")({ + id: S.String, + service: S.String, + request: UnknownRecord, + flow: S.optionalKey(S.String), +}, { description: "Envelope sent to a TrustGraph service." }) {} -export interface GraphEmbeddings { - metadata?: Metadata; - entities?: EntityEmbeddings[]; -} +export class ApiResponse extends S.Class("ApiResponse")({ + id: S.String, + response: UnknownRecord, +}, { description: "Envelope returned from a TrustGraph service." }) {} -export interface TextCompletionRequest { - system: string; - prompt: string; - streaming?: boolean; -} +export class Metadata extends S.Class("Metadata")({ + id: S.optionalKey(S.String), + metadata: S.optionalKey(TripleArray), + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), +}, { description: "Shared request metadata for TrustGraph wire messages." }) {} -export interface TextCompletionResponse { - response: string; - // Streaming fields - end_of_stream?: boolean; - error?: { - message: string; - type?: string; - }; - // Token usage (appears in final message) - in_token?: number; - out_token?: number; - model?: string; -} +export class EntityEmbeddings extends S.Class("EntityEmbeddings")({ + entity: S.optionalKey(Term), + vectors: S.optionalKey(NumberMatrix), +}, { description: "Embedding vectors associated with a graph entity." }) {} -export interface GraphRagRequest { - query: string; - user?: string; - collection?: string; - "entity-limit"?: number; // Default: 50 - "triple-limit"?: number; // Default: 30 - "max-subgraph-size"?: number; // Default: 1000 - "max-path-length"?: number; // Default: 2 - streaming?: boolean; -} +export class GraphEmbeddings extends S.Class("GraphEmbeddings")({ + metadata: S.optionalKey(Metadata), + entities: S.optionalKey(S.Array(EntityEmbeddings).pipe(S.mutable)), +}, { description: "Graph embedding payload grouped by entity." }) {} -export interface GraphRagResponse { - response: string; - // Streaming fields - chunk?: string; - end_of_stream?: boolean; - endOfStream?: boolean; - error?: { - message: string; - type?: string; - }; - // Token usage (appears in final message) - in_token?: number; - out_token?: number; - model?: string; - // Explainability fields - message_type?: "chunk" | "explain"; - explain_id?: string; - explain_graph?: string; - explain_triples?: unknown[]; - end_of_session?: boolean; -} +export class TextCompletionRequest extends S.Class("TextCompletionRequest")({ + system: S.String, + prompt: S.String, + streaming: S.optionalKey(S.Boolean), +}, { description: "Text-completion request payload." }) {} -export interface DocumentRagRequest { - query: string; - user?: string; - collection?: string; - "doc-limit"?: number; // Default: 20 - streaming?: boolean; -} +export class TextCompletionResponse extends S.Class("TextCompletionResponse")({ + response: S.String, + end_of_stream: S.optionalKey(S.Boolean), + error: S.optionalKey(TypedMessageError), + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), +}, { description: "Text-completion response payload." }) {} -export interface DocumentRagResponse { - response: string; - // Streaming fields - chunk?: string; - end_of_stream?: boolean; - endOfStream?: boolean; - error?: { - message: string; - type?: string; - }; - // Token usage (appears in final message) - in_token?: number; - out_token?: number; - model?: string; - // Explainability fields - message_type?: "chunk" | "explain"; - explain_id?: string; - explain_graph?: string; - end_of_session?: boolean; -} +export class GraphRagRequest extends S.Class("GraphRagRequest")({ + query: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + "entity-limit": S.optionalKey(S.Finite), + "triple-limit": S.optionalKey(S.Finite), + "max-subgraph-size": S.optionalKey(S.Finite), + "max-path-length": S.optionalKey(S.Finite), + streaming: S.optionalKey(S.Boolean), +}, { description: "Graph RAG request payload." }) {} -export interface AgentRequest { - question: string; - user?: string; - collection?: string; - streaming?: boolean; -} +export class GraphRagResponse extends S.Class("GraphRagResponse")({ + response: S.String, + chunk: S.optionalKey(S.String), + end_of_stream: S.optionalKey(S.Boolean), + endOfStream: S.optionalKey(S.Boolean), + error: S.optionalKey(TypedMessageError), + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), + message_type: S.optionalKey(S.Literals(["chunk", "explain"])), + explain_id: S.optionalKey(S.String), + explain_graph: S.optionalKey(S.String), + explain_triples: S.optionalKey(S.Array(S.Unknown).pipe(S.mutable)), + end_of_session: S.optionalKey(S.Boolean), +}, { description: "Graph RAG response payload." }) {} -export interface AgentResponse { - // Streaming response format (new protocol) - chunk_type?: "thought" | "action" | "observation" | "answer" | "final-answer" | "explain" | "error"; - content?: string; - end_of_message?: boolean; - end_of_dialog?: boolean; +export class DocumentRagRequest extends S.Class("DocumentRagRequest")({ + query: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + "doc-limit": S.optionalKey(S.Finite), + streaming: S.optionalKey(S.Boolean), +}, { description: "Document RAG request payload." }) {} - // Legacy fields for backward compatibility with non-streaming - thought?: string; - observation?: string; - answer?: string; - error?: ResponseError; +export class DocumentRagResponse extends S.Class("DocumentRagResponse")({ + response: S.String, + chunk: S.optionalKey(S.String), + end_of_stream: S.optionalKey(S.Boolean), + endOfStream: S.optionalKey(S.Boolean), + error: S.optionalKey(TypedMessageError), + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), + message_type: S.optionalKey(S.Literals(["chunk", "explain"])), + explain_id: S.optionalKey(S.String), + explain_graph: S.optionalKey(S.String), + end_of_session: S.optionalKey(S.Boolean), +}, { description: "Document RAG response payload." }) {} - // Token usage (appears in final message) - in_token?: number; - out_token?: number; - model?: string; +export class AgentRequest extends S.Class("AgentRequest")({ + question: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + streaming: S.optionalKey(S.Boolean), +}, { description: "Agent request payload." }) {} - // Explainability fields - message_type?: "chunk" | "explain"; - explain_id?: string; - explain_graph?: string; - explain_triples?: unknown[]; -} +export class AgentResponse extends S.Class("AgentResponse")({ + chunk_type: S.optionalKey(S.Literals([ + "thought", + "action", + "observation", + "answer", + "final-answer", + "explain", + "error", + ])), + content: S.optionalKey(S.String), + end_of_message: S.optionalKey(S.Boolean), + end_of_dialog: S.optionalKey(S.Boolean), + thought: S.optionalKey(S.String), + observation: S.optionalKey(S.String), + answer: S.optionalKey(S.String), + error: S.optionalKey(ResponseError), + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), + message_type: S.optionalKey(S.Literals(["chunk", "explain"])), + explain_id: S.optionalKey(S.String), + explain_graph: S.optionalKey(S.String), + explain_triples: S.optionalKey(S.Array(S.Unknown).pipe(S.mutable)), +}, { description: "Agent response payload." }) {} -export interface EmbeddingsRequest { - texts: string[]; -} +export class EmbeddingsRequest extends S.Class("EmbeddingsRequest")({ + texts: StringArray, +}, { description: "Batch embeddings request payload." }) {} -export interface EmbeddingsResponse { - vectors: number[][]; // One vector per input text -} +export class EmbeddingsResponse extends S.Class("EmbeddingsResponse")({ + vectors: NumberMatrix, +}, { description: "Batch embeddings response payload." }) {} -export interface GraphEmbeddingsQueryRequest { - vector: number[]; // Single query vector - limit: number; - user?: string; - collection?: string; -} +export class GraphEmbeddingsQueryRequest extends S.Class("GraphEmbeddingsQueryRequest")({ + vector: NumberArray, + limit: S.Finite, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), +}, { description: "Graph embeddings query request payload." }) {} -export interface EntityMatch { - entity: Term | null; - score: number; -} +export class EntityMatch extends S.Class("EntityMatch")({ + entity: S.NullOr(Term), + score: S.Finite, +}, { description: "Scored graph-entity match." }) {} -export interface GraphEmbeddingsQueryResponse { - entities: EntityMatch[]; -} +export class GraphEmbeddingsQueryResponse extends S.Class("GraphEmbeddingsQueryResponse")({ + entities: S.Array(EntityMatch).pipe(S.mutable), +}, { description: "Graph embeddings query response payload." }) {} -export interface TriplesQueryRequest { - s?: Term; - p?: Term; - o?: Term; - g?: string; // Named graph URI filter (plain string, not Term) - limit: number; - user?: string; - collection?: string; -} +export class TriplesQueryRequest extends S.Class("TriplesQueryRequest")({ + s: S.optionalKey(Term), + p: S.optionalKey(Term), + o: S.optionalKey(Term), + g: S.optionalKey(S.String), + limit: S.Finite, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), +}, { description: "Triple pattern query request payload." }) {} -export interface TriplesQueryResponse { - triples: Triple[]; - /** @deprecated Use `triples` — kept for backward compatibility */ - response?: Triple[]; -} +export class TriplesQueryResponse extends S.Class("TriplesQueryResponse")({ + triples: TripleArray, + response: S.optionalKey(TripleArray), +}, { description: "Triple pattern query response payload." }) {} -export interface RowsQueryRequest { - query: string; - user?: string; - collection?: string; - variables?: Record; - operation_name?: string; -} +export class RowsQueryRequest extends S.Class("RowsQueryRequest")({ + query: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + variables: S.optionalKey(UnknownRecord), + operation_name: S.optionalKey(S.String), +}, { description: "Structured rows GraphQL request payload." }) {} -export interface RowsQueryResponse { - data?: Record; - errors?: Record[]; - extensions?: Record; - values?: unknown[]; -} +export class RowsQueryResponse extends S.Class("RowsQueryResponse")({ + data: S.optionalKey(UnknownRecord), + errors: S.optionalKey(S.Array(UnknownRecord).pipe(S.mutable)), + extensions: S.optionalKey(UnknownRecord), + values: S.optionalKey(S.Array(S.Unknown).pipe(S.mutable)), +}, { description: "Structured rows GraphQL response payload." }) {} -export interface NlpQueryRequest { - question: string; - max_results?: number; -} +export class NlpQueryRequest extends S.Class("NlpQueryRequest")({ + question: S.String, + max_results: S.optionalKey(S.Finite), +}, { description: "Natural-language-to-GraphQL request payload." }) {} -export interface NlpQueryResponse { - graphql_query?: string; - variables?: Record; - detected_schemas?: Record[]; - confidence?: number; -} +export class NlpQueryResponse extends S.Class("NlpQueryResponse")({ + graphql_query: S.optionalKey(S.String), + variables: S.optionalKey(UnknownRecord), + detected_schemas: S.optionalKey(S.Array(UnknownRecord).pipe(S.mutable)), + confidence: S.optionalKey(S.Finite), +}, { description: "Natural-language-to-GraphQL response payload." }) {} -export interface StructuredQueryRequest { - question: string; - user?: string; - collection?: string; -} +export class StructuredQueryRequest extends S.Class("StructuredQueryRequest")({ + question: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), +}, { description: "Structured query request payload." }) {} -export interface StructuredQueryResponse { - data?: Record; - errors?: Record[]; -} +export class StructuredQueryResponse extends S.Class("StructuredQueryResponse")({ + data: S.optionalKey(UnknownRecord), + errors: S.optionalKey(S.Array(UnknownRecord).pipe(S.mutable)), +}, { description: "Structured query response payload." }) {} -export interface RowEmbeddingsQueryRequest { - vector: number[]; // Single query vector - schema_name: string; - user?: string; - collection?: string; - index_name?: string; - limit?: number; -} +export class RowEmbeddingsQueryRequest extends S.Class("RowEmbeddingsQueryRequest")({ + vector: NumberArray, + schema_name: S.String, + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + index_name: S.optionalKey(S.String), + limit: S.optionalKey(S.Finite), +}, { description: "Row embeddings query request payload." }) {} -export interface RowEmbeddingsMatch { - index_name: string; - index_value: string[]; - text: string; - score: number; -} +export class RowEmbeddingsMatch extends S.Class("RowEmbeddingsMatch")({ + index_name: S.String, + index_value: StringArray, + text: S.String, + score: S.Finite, +}, { description: "Scored row embeddings match." }) {} -export interface RowEmbeddingsQueryResponse { - matches?: RowEmbeddingsMatch[]; - error?: { - message: string; - type?: string; - }; -} +export class RowEmbeddingsQueryResponse extends S.Class("RowEmbeddingsQueryResponse")({ + matches: S.optionalKey(S.Array(RowEmbeddingsMatch).pipe(S.mutable)), + error: S.optionalKey(TypedMessageError), +}, { description: "Row embeddings query response payload." }) {} -export interface LoadDocumentRequest { - id?: string; - data: string; - metadata?: Triple[]; -} +export class LoadDocumentRequest extends S.Class("LoadDocumentRequest")({ + id: S.optionalKey(S.String), + data: S.String, + metadata: S.optionalKey(TripleArray), +}, { description: "Flow-scoped document load request payload." }) {} export type LoadDocumentResponse = void; -export interface LoadTextRequest { - id?: string; - text: string; - charset?: string; - metadata?: Triple[]; -} +export class LoadTextRequest extends S.Class("LoadTextRequest")({ + id: S.optionalKey(S.String), + text: S.String, + charset: S.optionalKey(S.String), + metadata: S.optionalKey(TripleArray), +}, { description: "Flow-scoped text load request payload." }) {} export type LoadTextResponse = void; -export interface DocumentMetadata { - id?: string; - time?: number; - kind?: string; - title?: string; - comments?: string; - metadata?: Triple[]; - user?: string; - tags?: string[]; - parentId?: string; - documentType?: string; - "parent-id"?: string; - "document-type"?: string; -} +export class DocumentMetadata extends S.Class("DocumentMetadata")({ + id: S.optionalKey(S.String), + time: S.optionalKey(S.Finite), + kind: S.optionalKey(S.String), + title: S.optionalKey(S.String), + comments: S.optionalKey(S.String), + metadata: S.optionalKey(TripleArray), + user: S.optionalKey(S.String), + tags: S.optionalKey(StringArray), + parentId: S.optionalKey(S.String), + documentType: S.optionalKey(S.String), + "parent-id": S.optionalKey(S.String), + "document-type": S.optionalKey(S.String), +}, { description: "Library document metadata payload." }) {} -export interface ProcessingMetadata { - id?: string; - "document-id"?: string; - documentId?: string; - time?: number; - flow?: string; - user?: string; - collection?: string; - tags?: string[]; -} +export class ProcessingMetadata extends S.Class("ProcessingMetadata")({ + id: S.optionalKey(S.String), + "document-id": S.optionalKey(S.String), + documentId: S.optionalKey(S.String), + time: S.optionalKey(S.Finite), + flow: S.optionalKey(S.String), + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + tags: S.optionalKey(StringArray), +}, { description: "Library processing metadata payload." }) {} -export interface LibraryRequest { - operation: string; - documentId?: string; - "document-id"?: string; - processingId?: string; - "processing-id"?: string; - "document-metadata"?: DocumentMetadata; - documentMetadata?: DocumentMetadata; - "processing-metadata"?: ProcessingMetadata; - content?: string; - user?: string; - collection?: string; - metadata?: Triple[]; - id?: string; - flow?: string; -} +export class LibraryRequest extends S.Class("LibraryRequest")({ + operation: S.String, + documentId: S.optionalKey(S.String), + "document-id": S.optionalKey(S.String), + processingId: S.optionalKey(S.String), + "processing-id": S.optionalKey(S.String), + "document-metadata": S.optionalKey(DocumentMetadata), + documentMetadata: S.optionalKey(DocumentMetadata), + "processing-metadata": S.optionalKey(ProcessingMetadata), + content: S.optionalKey(S.String), + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + metadata: S.optionalKey(TripleArray), + id: S.optionalKey(S.String), + flow: S.optionalKey(S.String), +}, { description: "Library service request payload." }) {} -export interface LibraryResponse { - error?: WireError; - "document-metadata"?: DocumentMetadata; - documentMetadata?: DocumentMetadata; - content?: string; - "document-metadatas"?: DocumentMetadata[]; - documents?: DocumentMetadata[]; - "processing-metadata"?: ProcessingMetadata; - "processing-metadatas"?: ProcessingMetadata[]; - processing?: ProcessingMetadata[]; -} +export class LibraryResponse extends S.Class("LibraryResponse")({ + error: S.optionalKey(WireErrorValue), + "document-metadata": S.optionalKey(DocumentMetadata), + documentMetadata: S.optionalKey(DocumentMetadata), + content: S.optionalKey(S.String), + "document-metadatas": S.optionalKey(S.Array(DocumentMetadata).pipe(S.mutable)), + documents: S.optionalKey(S.Array(DocumentMetadata).pipe(S.mutable)), + "processing-metadata": S.optionalKey(ProcessingMetadata), + "processing-metadatas": S.optionalKey(S.Array(ProcessingMetadata).pipe(S.mutable)), + processing: S.optionalKey(S.Array(ProcessingMetadata).pipe(S.mutable)), +}, { description: "Library service response payload." }) {} -export interface KnowledgeRequest { - operation: string; - user?: string; - id?: string; - flow?: string; - collection?: string; - triples?: Triple[]; - "graph-embeddings"?: GraphEmbeddings; - graphEmbeddings?: GraphEmbeddings; - "document-embeddings"?: unknown; - documentEmbeddings?: unknown; -} +export class KnowledgeRequest extends S.Class("KnowledgeRequest")({ + operation: S.String, + user: S.optionalKey(S.String), + id: S.optionalKey(S.String), + flow: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + triples: S.optionalKey(TripleArray), + "graph-embeddings": S.optionalKey(GraphEmbeddings), + graphEmbeddings: S.optionalKey(GraphEmbeddings), + "document-embeddings": S.optionalKey(S.Unknown), + documentEmbeddings: S.optionalKey(S.Unknown), +}, { description: "Knowledge service request payload." }) {} -export interface KnowledgeResponse { - error?: WireError; - ids?: string[]; - eos?: boolean; - triples?: Triple[]; - "graph-embeddings"?: GraphEmbeddings; - graphEmbeddings?: GraphEmbeddings; - "document-embeddings"?: unknown; - documentEmbeddings?: unknown; -} +export class KnowledgeResponse extends S.Class("KnowledgeResponse")({ + error: S.optionalKey(WireErrorValue), + ids: S.optionalKey(StringArray), + eos: S.optionalKey(S.Boolean), + triples: S.optionalKey(TripleArray), + "graph-embeddings": S.optionalKey(GraphEmbeddings), + graphEmbeddings: S.optionalKey(GraphEmbeddings), + "document-embeddings": S.optionalKey(S.Unknown), + documentEmbeddings: S.optionalKey(S.Unknown), +}, { description: "Knowledge service response payload." }) {} -export interface FlowRequest { - operation: string; - "blueprint-name"?: string; - "blueprint-definition"?: string; - description?: string; - "flow-id"?: string; - parameters?: Record; - user?: string; -} +export class FlowRequest extends S.Class("FlowRequest")({ + operation: S.String, + "blueprint-name": S.optionalKey(S.String), + "blueprint-definition": S.optionalKey(S.String), + description: S.optionalKey(S.String), + "flow-id": S.optionalKey(S.String), + parameters: S.optionalKey(UnknownRecord), + user: S.optionalKey(S.String), +}, { description: "Flow service request payload." }) {} -export interface FlowResponse { - "blueprint-names"?: string[]; - "flow-ids"?: string[]; - ids?: string[]; - flow?: string; - "blueprint-definition"?: string; - description?: string; - error?: - | { - message?: string; - } - | WireError; -} +export class FlowResponse extends S.Class("FlowResponse")({ + "blueprint-names": S.optionalKey(StringArray), + "flow-ids": S.optionalKey(StringArray), + ids: S.optionalKey(StringArray), + flow: S.optionalKey(S.String), + "blueprint-definition": S.optionalKey(S.String), + description: S.optionalKey(S.String), + error: S.optionalKey(S.Union([OptionalMessageError, WireErrorValue])), +}, { description: "Flow service response payload." }) {} -export interface PromptRequest { - id: string; - terms: Record; - streaming?: boolean; -} +export class PromptRequest extends S.Class("PromptRequest")({ + id: S.String, + terms: UnknownRecord, + streaming: S.optionalKey(S.Boolean), +}, { description: "Prompt rendering request payload." }) {} -export interface PromptResponse { - text: string; - // Streaming fields - end_of_stream?: boolean; - error?: { - message: string; - type?: string; - }; - // Token usage (appears in final message) - in_token?: number; - out_token?: number; - model?: string; -} +export class PromptResponse extends S.Class("PromptResponse")({ + text: S.String, + end_of_stream: S.optionalKey(S.Boolean), + error: S.optionalKey(TypedMessageError), + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), +}, { description: "Prompt rendering response payload." }) {} export type ConfigRequest = object; export type ConfigResponse = object; -// Chunked Upload Types +export class ChunkedUploadDocumentMetadata extends S.Class("ChunkedUploadDocumentMetadata")({ + id: S.String, + time: S.Finite, + kind: S.String, + title: S.String, + comments: S.optionalKey(S.String), + metadata: S.optionalKey(TripleArray), + user: S.String, + collection: S.optionalKey(S.String), + tags: S.optionalKey(StringArray), +}, { description: "Document metadata used to begin a chunked upload." }) {} -export interface ChunkedUploadDocumentMetadata { - id: string; - time: number; - kind: string; - title: string; - comments?: string; - metadata?: Triple[]; - user: string; - collection?: string; - tags?: string[]; -} +export class BeginUploadRequest extends S.Class("BeginUploadRequest")({ + operation: S.Literal("begin-upload"), + "document-metadata": S.optionalKey(ChunkedUploadDocumentMetadata), + documentMetadata: S.optionalKey(ChunkedUploadDocumentMetadata), + "total-size": S.Finite, + "chunk-size": S.optionalKey(S.Finite), +}, { description: "Chunked upload begin request payload." }) {} -export interface BeginUploadRequest { - operation: "begin-upload"; - "document-metadata"?: ChunkedUploadDocumentMetadata; - documentMetadata?: ChunkedUploadDocumentMetadata; - "total-size": number; - "chunk-size"?: number; -} +export class BeginUploadResponse extends S.Class("BeginUploadResponse")({ + "upload-id": S.String, + "chunk-size": S.Finite, + "total-chunks": S.Finite, + error: S.optionalKey(ResponseError), +}, { description: "Chunked upload begin response payload." }) {} -export interface BeginUploadResponse { - "upload-id": string; - "chunk-size": number; - "total-chunks": number; - error?: ResponseError; -} +export class UploadChunkRequest extends S.Class("UploadChunkRequest")({ + operation: S.Literal("upload-chunk"), + "upload-id": S.String, + "chunk-index": S.Finite, + content: S.String, + user: S.String, +}, { description: "Chunked upload chunk request payload." }) {} -export interface UploadChunkRequest { - operation: "upload-chunk"; - "upload-id": string; - "chunk-index": number; - content: string; // base64-encoded - user: string; -} +export class UploadChunkResponse extends S.Class("UploadChunkResponse")({ + "upload-id": S.String, + "chunk-index": S.Finite, + "chunks-received": S.Finite, + "total-chunks": S.Finite, + "bytes-received": S.Finite, + "total-bytes": S.Finite, + error: S.optionalKey(ResponseError), +}, { description: "Chunked upload chunk response payload." }) {} -export interface UploadChunkResponse { - "upload-id": string; - "chunk-index": number; - "chunks-received": number; - "total-chunks": number; - "bytes-received": number; - "total-bytes": number; - error?: ResponseError; -} +export class CompleteUploadRequest extends S.Class("CompleteUploadRequest")({ + operation: S.Literal("complete-upload"), + "upload-id": S.String, + user: S.String, +}, { description: "Chunked upload completion request payload." }) {} -export interface CompleteUploadRequest { - operation: "complete-upload"; - "upload-id": string; - user: string; -} +export class CompleteUploadResponse extends S.Class("CompleteUploadResponse")({ + "document-id": S.String, + "object-id": S.String, + error: S.optionalKey(ResponseError), +}, { description: "Chunked upload completion response payload." }) {} -export interface CompleteUploadResponse { - "document-id": string; - "object-id": string; - error?: ResponseError; -} +export class GetUploadStatusRequest extends S.Class("GetUploadStatusRequest")({ + operation: S.Literal("get-upload-status"), + "upload-id": S.String, + user: S.String, +}, { description: "Chunked upload status request payload." }) {} -export interface GetUploadStatusRequest { - operation: "get-upload-status"; - "upload-id": string; - user: string; -} +export class GetUploadStatusResponse extends S.Class("GetUploadStatusResponse")({ + "upload-id": S.String, + "upload-state": S.Literals(["in-progress", "completed", "expired"]), + "chunks-received": S.Finite, + "total-chunks": S.Finite, + "received-chunks": S.Array(S.Finite).pipe(S.mutable), + "missing-chunks": S.Array(S.Finite).pipe(S.mutable), + "bytes-received": S.Finite, + "total-bytes": S.Finite, + error: S.optionalKey(ResponseError), +}, { description: "Chunked upload status response payload." }) {} -export interface GetUploadStatusResponse { - "upload-id": string; - "upload-state": "in-progress" | "completed" | "expired"; - "chunks-received": number; - "total-chunks": number; - "received-chunks": number[]; - "missing-chunks": number[]; - "bytes-received": number; - "total-bytes": number; - error?: ResponseError; -} +export class AbortUploadRequest extends S.Class("AbortUploadRequest")({ + operation: S.Literal("abort-upload"), + "upload-id": S.String, + user: S.String, +}, { description: "Chunked upload abort request payload." }) {} -export interface AbortUploadRequest { - operation: "abort-upload"; - "upload-id": string; - user: string; -} +export class AbortUploadResponse extends S.Class("AbortUploadResponse")({ + error: S.optionalKey(ResponseError), +}, { description: "Chunked upload abort response payload." }) {} -export interface AbortUploadResponse { - error?: ResponseError; -} +export class ListUploadsRequest extends S.Class("ListUploadsRequest")({ + operation: S.Literal("list-uploads"), + user: S.String, +}, { description: "Pending uploads list request payload." }) {} -export interface ListUploadsRequest { - operation: "list-uploads"; - user: string; -} +export class UploadSession extends S.Class("UploadSession")({ + "upload-id": S.String, + "document-id": S.String, + "document-metadata-json": S.String, + "total-size": S.Finite, + "chunk-size": S.Finite, + "total-chunks": S.Finite, + "chunks-received": S.Finite, + "created-at": S.String, +}, { description: "Pending upload session payload." }) {} -export interface UploadSession { - "upload-id": string; - "document-id": string; - "document-metadata-json": string; - "total-size": number; - "chunk-size": number; - "total-chunks": number; - "chunks-received": number; - "created-at": string; -} +export class ListUploadsResponse extends S.Class("ListUploadsResponse")({ + "upload-sessions": S.Array(UploadSession).pipe(S.mutable), + error: S.optionalKey(ResponseError), +}, { description: "Pending uploads list response payload." }) {} -export interface ListUploadsResponse { - "upload-sessions": UploadSession[]; - error?: ResponseError; -} +export class StreamDocumentRequest extends S.Class("StreamDocumentRequest")({ + operation: S.Literal("stream-document"), + "document-id": S.String, + "chunk-size": S.optionalKey(S.Finite), + user: S.String, +}, { description: "Document chunk stream request payload." }) {} -export interface StreamDocumentRequest { - operation: "stream-document"; - "document-id": string; - "chunk-size"?: number; - user: string; -} - -export interface StreamDocumentResponse { - content: string; // base64-encoded chunk - "chunk-index": number; - "total-chunks": number; - error?: ResponseError; -} +export class StreamDocumentResponse extends S.Class("StreamDocumentResponse")({ + content: S.String, + "chunk-index": S.Finite, + "total-chunks": S.Finite, + error: S.optionalKey(ResponseError), +}, { description: "Document chunk stream response payload." }) {} diff --git a/ts/packages/client/src/socket/effect-rpc-client.ts b/ts/packages/client/src/socket/effect-rpc-client.ts index 08928591..e4c4bd38 100644 --- a/ts/packages/client/src/socket/effect-rpc-client.ts +++ b/ts/packages/client/src/socket/effect-rpc-client.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Effect, Exit, Fiber, Layer, Ref, Scope, Stream, SubscriptionRef } from "effect"; +import { Cause, Context, Effect, Fiber, Layer, Ref, Schema as S, Scope, Stream, SubscriptionRef } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; @@ -19,17 +19,17 @@ class TrustGraphRpcClientService extends Context.Service< export type RpcConnectionStatus = "connecting" | "connected" | "failed" | "closed"; -export interface RpcConnectionState { - status: RpcConnectionStatus; - lastError?: string; -} +export class RpcConnectionState extends S.Class("RpcConnectionState")({ + status: S.Literals(["connecting", "connected", "failed", "closed"]), + lastError: S.optionalKey(S.String), +}, { description: "Current Effect RPC gateway connection state." }) {} -export interface DispatchInput { - scope: "global" | "flow"; - service: string; - flow?: string; - request: Record; -} +export class DispatchInput extends S.Class("DispatchInput")({ + scope: S.Literals(["global", "flow"]), + service: S.String, + flow: S.optionalKey(S.String), + request: S.Record(S.String, S.Unknown), +}, { description: "TrustGraph gateway dispatch target and request payload." }) {} export interface DispatchOptions { readonly timeoutMs?: number; @@ -74,20 +74,6 @@ export interface TrustGraphGatewayClientOptions { const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_REQUEST_ATTEMPTS = 3; -export interface EffectRpcClient { - readonly subscribe: (listener: (state: RpcConnectionState) => void) => () => void; - readonly dispatch: ( - input: DispatchInput, - options?: DispatchOptions, - ) => Promise; - readonly dispatchStream: ( - input: DispatchInput, - receiver: (chunk: DispatchStreamChunk) => boolean, - options?: DispatchOptions, - ) => Promise; - readonly close: () => Promise; -} - const makeClientLayer = ( options: TrustGraphGatewayClientOptions, stateRef: SubscriptionRef.SubscriptionRef, @@ -234,59 +220,12 @@ export function makeEffectRpcClient( url: string, onConnect?: () => void, onDisconnect?: () => void, -): EffectRpcClient { - const stateRef = Effect.runSync(SubscriptionRef.make({ status: "connecting" })); - const closedRef = Effect.runSync(Ref.make(false)); - const scope = Effect.runSync(Scope.make()); - const options: TrustGraphGatewayClientOptions = { +): Effect.Effect { + return makeTrustGraphGatewayClientScoped({ url, - stateRef, - closedRef, ...(onConnect === undefined ? {} : { onConnect }), ...(onDisconnect === undefined ? {} : { onDisconnect }), - }; - const clientPromise = Effect.runPromise( - makeTrustGraphGatewayClientScoped(options).pipe(Scope.provide(scope)), - ); - - return { - subscribe: (listener) => { - let unsubscribe: Effect.Effect | undefined; - let cancelled = false; - listener(SubscriptionRef.getUnsafe(stateRef)); - void clientPromise.then((client) => - Effect.runPromise(client.subscribe(listener)).then((release) => { - if (cancelled) { - return Effect.runPromise(release); - } - unsubscribe = release; - }) - ); - - return () => { - cancelled = true; - if (unsubscribe !== undefined) { - Effect.runFork(unsubscribe); - } - }; - }, - dispatch: (input, options = {}) => - clientPromise.then((client) => - Effect.runPromise(client.dispatch(input, options)) - ), - dispatchStream: (input, receiver, options = {}) => - clientPromise.then((client) => - Effect.runPromise(client.runDispatchStream(input, receiver, options)) - ), - close: () => - clientPromise.then((client) => - Effect.runPromise( - client.close.pipe( - Effect.andThen(Scope.close(scope, Exit.void)), - ), - ) - ), - }; + }); } export function withDispatchRequestPolicy( diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index 7eb543d4..cfe75370 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -1,16 +1,13 @@ // Import core types and classes for the TrustGraph API -import type { Term, Triple } from "../models/Triple.js"; +import { Triple } from "../models/Triple.js"; +import type { Term } from "../models/Triple.js"; import type { - EffectRpcClient, DispatchInput, DispatchOptions, RpcConnectionState, } from "./effect-rpc-client.js"; -import { - makeEffectRpcClient, -} from "./effect-rpc-client.js"; import { getDefaultSocketUrl, getRandomValues } from "./websocket-adapter.js"; -import { Clock, Effect, Fiber, Match, Option, Result, Schema as S, Stream, SubscriptionRef } from "effect"; +import { Match, Option, Schema as S } from "effect"; import * as Predicate from "effect/Predicate"; // Import all message types for different services @@ -89,19 +86,32 @@ export interface GraphRagOptions { pathLength?: number; } -// Metadata included in final streaming message -export interface StreamingMetadata { - in_token?: number; - out_token?: number; - model?: string; +export interface LegacyRpcClient { + readonly subscribe: (listener: (state: RpcConnectionState) => void) => () => void; + readonly dispatch: ( + input: DispatchInput, + options?: DispatchOptions, + ) => Promise; + readonly dispatchStream: ( + input: DispatchInput, + receiver: (chunk: { readonly response: unknown; readonly complete: boolean }) => boolean, + options?: DispatchOptions, + ) => Promise; + readonly close: () => Promise; } -// Explainability event data -export interface ExplainEvent { - explainId: string; - explainGraph: string; // Named graph where explain data is stored (e.g., urn:graph:retrieval) - explainTriples?: Triple[]; // Inline subgraph triples (when available) -} +// Metadata included in final streaming message +export class StreamingMetadata extends S.Class("StreamingMetadata")({ + in_token: S.optionalKey(S.Finite), + out_token: S.optionalKey(S.Finite), + model: S.optionalKey(S.String), +}, { description: "Token and model metadata attached to a final streaming chunk." }) {} + +export class ExplainEvent extends S.Class("ExplainEvent")({ + explainId: S.String, + explainGraph: S.String, + explainTriples: S.optionalKey(S.Array(Triple).pipe(S.mutable)), +}, { description: "Explainability graph reference or inline triples for a stream." }) {} // Configuration constants const SOCKET_URL = getDefaultSocketUrl(); // WebSocket endpoint path (isomorphic) @@ -155,7 +165,11 @@ function dispatchOptions( } function streamingMetadataFrom(source: unknown): StreamingMetadata | undefined { - const metadata: StreamingMetadata = {}; + const metadata: { + in_token?: number; + out_token?: number; + model?: string; + } = {}; let hasMetadata = false; const inToken = numberProperty(source, "in_token"); @@ -185,12 +199,12 @@ function throwIfResponseError(error: ResponseError | undefined): void { const decodeJsonUnknown = S.decodeUnknownOption(S.UnknownFromJsonString); -export interface ConfigValueEntry { - workspace?: string; - type?: string; - key: string; - value: unknown; -} +export class ConfigValueEntry extends S.Class("ConfigValueEntry")({ + workspace: S.optionalKey(S.String), + type: S.optionalKey(S.String), + key: S.String, + value: S.Unknown, +}, { description: "Config key/value entry returned from the TrustGraph config service." }) {} function asConfigValues(response: unknown): ConfigValueEntry[] { if (response === null || typeof response !== "object") return []; @@ -201,9 +215,12 @@ function asConfigValues(response: unknown): ConfigValueEntry[] { const item = value as Record; const key = item.key; if (typeof key !== "string") return []; - const entry: ConfigValueEntry = { key, value: item.value }; - if (typeof item.workspace === "string") entry.workspace = item.workspace; - if (typeof item.type === "string") entry.type = item.type; + const entry: ConfigValueEntry = { + key, + value: item.value, + ...(typeof item.workspace === "string" ? { workspace: item.workspace } : {}), + ...(typeof item.type === "string" ? { type: item.type } : {}), + }; return [entry]; }); } @@ -222,15 +239,11 @@ function parseResponseJson(value: string | undefined, operation: string): unknow } const currentEpochSeconds = (): number => - Math.floor(Effect.runSync(Clock.currentTimeMillis) / 1000); + Math.floor((globalThis.performance.timeOrigin + globalThis.performance.now()) / 1000); -const logClientInfo = (message: string): void => { - Effect.runFork(Effect.log(message)); -}; +const logClientInfo = (_message: string): void => {}; -const logClientError = (message: string, error: unknown): void => { - Effect.runFork(Effect.logError(message, { error: toErrorMessage(error, message) })); -}; +const logClientError = (_message: string, _error: unknown): void => {}; const runLegacyStreamingRequest = ( operation: string, @@ -238,18 +251,9 @@ const runLegacyStreamingRequest = ( request: () => Promise, onError: (message: string) => void, ): Promise => - Effect.runPromise( - Effect.tryPromise({ - try: request, - catch: (error) => socketError(operation, toErrorMessage(error, "Unknown error")), - }).pipe( - Effect.catch((error) => - Effect.sync(() => { - onError(`${label} request failed: ${error.message}`); - }) - ), - ), - ); + request().catch((error) => { + onError(`${label} request failed: ${toErrorMessage(error, `${operation} failed`)}`); + }); const StreamingEnvelopeSchema = S.Struct({ response: S.optionalKey(S.Unknown), @@ -453,34 +457,35 @@ function makeid(length: number) { * functionality */ // Connection state interface for UI consumption -export interface ConnectionState { - status: - | "connecting" - | "connected" - | "reconnecting" - | "failed" - | "authenticated" - | "unauthenticated"; - hasApiKey: boolean; - reconnectAttempt?: number; - maxAttempts?: number; - nextRetryIn?: number; - lastError?: string; -} +export class ConnectionState extends S.Class("ConnectionState")({ + status: S.Literals([ + "connecting", + "connected", + "reconnecting", + "failed", + "authenticated", + "unauthenticated", + ]), + hasApiKey: S.Boolean, + reconnectAttempt: S.optionalKey(S.Finite), + maxAttempts: S.optionalKey(S.Finite), + nextRetryIn: S.optionalKey(S.Finite), + lastError: S.optionalKey(S.String), +}, { description: "Workbench-facing TrustGraph gateway connection state." }) {} export function makeBaseApi( user: string, - token?: string, - socketUrl?: string, - rpcFactory: (url: string) => EffectRpcClient = makeEffectRpcClient, + token: string | undefined, + socketUrl: string | undefined, + rpcFactory: (url: string) => LegacyRpcClient, ) { - let rpc: EffectRpcClient; - const connectionStateRef = Effect.runSync( - SubscriptionRef.make({ - status: "connecting", - hasApiKey: isNonEmptyString(token), - }), - ); + let rpc: LegacyRpcClient; + let unsubscribeRpc: (() => void) | undefined; + const connectionStateListeners = new Set<(state: ConnectionState) => void>(); + let connectionState: ConnectionState = { + status: "connecting", + hasApiKey: isNonEmptyString(token), + }; let lastError: string | undefined ; let rpcState: RpcConnectionState = { status: "connecting" }; @@ -495,27 +500,10 @@ export function makeBaseApi( * Subscribe to connection state changes for UI updates */ onConnectionStateChange(listener: (state: ConnectionState) => void) { - let latest = SubscriptionRef.getUnsafe(connectionStateRef); - listener(latest); - let replaySeen = false; - const fiber = Effect.runFork( - SubscriptionRef.changes(connectionStateRef).pipe( - Stream.runForEach((state) => - Effect.sync(() => { - if (!replaySeen) { - replaySeen = true; - if (state === latest) return; - } - latest = state; - notifyConnectionStateListener(listener, state); - }) - ), - ), - ); - - // Return unsubscribe function + connectionStateListeners.add(listener); + notifyConnectionStateListener(listener, connectionState); return () => { - Effect.runFork(Fiber.interrupt(fiber)); + connectionStateListeners.delete(listener); }; }, @@ -523,13 +511,9 @@ export function makeBaseApi( * Closes the WebSocket connection and cleans up */ close() { - Effect.runFork( - Effect.tryPromise({ - try: () => rpc.close(), - catch: (error) => socketError("socket-close", toErrorMessage(error, "Socket close failed")), - }).pipe( - Effect.catch((error) => Effect.sync(() => logClientError("[socket close error]", error))), - ), + unsubscribeRpc?.(); + void rpc.close().catch((error) => + logClientError("[socket close error]", socketError("socket-close", toErrorMessage(error, "Socket close failed"))) ); }, @@ -643,10 +627,8 @@ export function makeBaseApi( const state: ConnectionState = { status, hasApiKey, + ...(lastError !== undefined ? { lastError } : {}), }; - if (lastError !== undefined) { - state.lastError = lastError; - } return state; }; @@ -655,21 +637,24 @@ export function makeBaseApi( listener: (state: ConnectionState) => void, state: ConnectionState, ): void => { - const result = Result.try({ - try: () => listener(state), - catch: (error) => + try { + listener(state); + } catch (error) { + logClientError( + "Error in connection state listener", socketError( "connection-state-listener", toErrorMessage(error, "Error in connection state listener"), ), - }); - if (Result.isFailure(result)) { - logClientError("Error in connection state listener", result.failure); + ); } }; const publishConnectionState = () => { - Effect.runSync(SubscriptionRef.set(connectionStateRef, getConnectionState())); + connectionState = getConnectionState(); + for (const listener of connectionStateListeners) { + notifyConnectionStateListener(listener, connectionState); + } }; const connectionStatusFromRpc = (hasApiKey: boolean): ConnectionState["status"] => @@ -712,7 +697,7 @@ export function makeBaseApi( }; rpc = rpcFactory(socketUrlWithToken()); - rpc.subscribe((state) => { + unsubscribeRpc = rpc.subscribe((state) => { rpcState = state; lastError = state.lastError; publishConnectionState(); @@ -726,13 +711,12 @@ export function makeBaseApi( } export type BaseApi = ReturnType; -export const BaseApi = makeBaseApi; export function makeBaseApiWithRpc( user: string, token: string | undefined, socketUrl: string | undefined, - rpc: EffectRpcClient, + rpc: LegacyRpcClient, ): BaseApi { return makeBaseApi(user, token, socketUrl, () => rpc); } @@ -833,13 +817,9 @@ export function makeLibrarianApi(api: BaseApi) { tags, "document-type": "source", documentType: "source", + ...(id !== undefined ? { id } : {}), + ...(metadata !== undefined ? { metadata } : {}), }; - if (id !== undefined) { - documentMetadata.id = id; - } - if (metadata !== undefined) { - documentMetadata.metadata = metadata; - } return this.api.makeRequest( "librarian", @@ -929,10 +909,8 @@ export function makeLibrarianApi(api: BaseApi) { "document-metadata": metadata, documentMetadata: metadata, "total-size": totalSize, + ...(chunkSize !== undefined ? { "chunk-size": chunkSize } : {}), }; - if (chunkSize !== undefined) { - request["chunk-size"] = chunkSize; - } return this.api .makeRequest( @@ -1124,10 +1102,8 @@ export function makeLibrarianApi(api: BaseApi) { operation: "stream-document", "document-id": documentId, user: this.api.user, + ...(chunkSize !== undefined ? { "chunk-size": chunkSize } : {}), }; - if (chunkSize !== undefined) { - request["chunk-size"] = chunkSize; - } this.api.makeRequestMulti( "librarian", @@ -1369,13 +1345,9 @@ export function makeFlowsApi(api: BaseApi) { "flow-id": id, "blueprint-name": blueprint_name, description: description, + ...(parameters !== undefined && Object.keys(parameters).length > 0 ? { parameters } : {}), }; - // Only include parameters if provided and not empty - if (parameters !== undefined && Object.keys(parameters).length > 0) { - request.parameters = parameters; - } - return this.api .makeRequest("flow", request, 30000) .then((response) => { @@ -1447,19 +1419,11 @@ export function makeFlowApi(api: BaseApi, flowId: string) { query: text, user: this.api.user, collection: withDefault(collection, "default"), + ...(options?.entityLimit !== undefined ? { "entity-limit": options.entityLimit } : {}), + ...(options?.tripleLimit !== undefined ? { "triple-limit": options.tripleLimit } : {}), + ...(options?.maxSubgraphSize !== undefined ? { "max-subgraph-size": options.maxSubgraphSize } : {}), + ...(options?.pathLength !== undefined ? { "max-path-length": options.pathLength } : {}), }; - if (options?.entityLimit !== undefined) { - request["entity-limit"] = options.entityLimit; - } - if (options?.tripleLimit !== undefined) { - request["triple-limit"] = options.tripleLimit; - } - if (options?.maxSubgraphSize !== undefined) { - request["max-subgraph-size"] = options.maxSubgraphSize; - } - if (options?.pathLength !== undefined) { - request["max-path-length"] = options.pathLength; - } return this.api .makeRequest( @@ -1539,10 +1503,8 @@ export function makeFlowApi(api: BaseApi, flowId: string) { const event: ExplainEvent = { explainId: explainId ?? "", explainGraph: stringProperty(resp, "explain_graph") ?? "", + ...(explainTriples !== undefined ? { explainTriples } : {}), }; - if (explainTriples !== undefined) { - event.explainTriples = explainTriples; - } onExplain?.(event); return false; } @@ -1635,10 +1597,8 @@ export function makeFlowApi(api: BaseApi, flowId: string) { const event: ExplainEvent = { explainId: explainId ?? "", explainGraph: stringProperty(resp, "explain_graph") ?? "", + ...(explainTriples !== undefined ? { explainTriples } : {}), }; - if (explainTriples !== undefined) { - event.explainTriples = explainTriples; - } onExplain?.(event); // If this message also carries answer text, fall through to chunk handling. // If it's a standalone explain event (no answer text), stop here. @@ -1668,19 +1628,11 @@ export function makeFlowApi(api: BaseApi, flowId: string) { user: this.api.user, collection: withDefault(collection, "default"), streaming: true, + ...(options?.entityLimit !== undefined ? { "entity-limit": options.entityLimit } : {}), + ...(options?.tripleLimit !== undefined ? { "triple-limit": options.tripleLimit } : {}), + ...(options?.maxSubgraphSize !== undefined ? { "max-subgraph-size": options.maxSubgraphSize } : {}), + ...(options?.pathLength !== undefined ? { "max-path-length": options.pathLength } : {}), }; - if (options?.entityLimit !== undefined) { - request["entity-limit"] = options.entityLimit; - } - if (options?.tripleLimit !== undefined) { - request["triple-limit"] = options.tripleLimit; - } - if (options?.maxSubgraphSize !== undefined) { - request["max-subgraph-size"] = options.maxSubgraphSize; - } - if (options?.pathLength !== undefined) { - request["max-path-length"] = options.pathLength; - } void runLegacyStreamingRequest( "graph-rag-stream", @@ -1765,10 +1717,8 @@ export function makeFlowApi(api: BaseApi, flowId: string) { user: this.api.user, collection: withDefault(collection, "default"), streaming: true, + ...(docLimit !== undefined ? { "doc-limit": docLimit } : {}), }; - if (docLimit !== undefined) { - request["doc-limit"] = docLimit; - } void runLegacyStreamingRequest( "document-rag-stream", @@ -1968,19 +1918,11 @@ export function makeFlowApi(api: BaseApi, flowId: string) { limit: limit ?? 20, user: this.api.user, collection: withDefault(collection, "default"), + ...(s !== undefined ? { s } : {}), + ...(p !== undefined ? { p } : {}), + ...(o !== undefined ? { o } : {}), + ...(graph !== undefined ? { g: graph } : {}), }; - if (s !== undefined) { - request.s = s; - } - if (p !== undefined) { - request.p = p; - } - if (o !== undefined) { - request.o = o; - } - if (graph !== undefined) { - request.g = graph; - } return this.api .makeRequest( @@ -2005,13 +1947,9 @@ export function makeFlowApi(api: BaseApi, flowId: string) { ) { const request: LoadDocumentRequest = { data: document, + ...(id !== undefined ? { id } : {}), + ...(metadata !== undefined ? { metadata } : {}), }; - if (id !== undefined) { - request.id = id; - } - if (metadata !== undefined) { - request.metadata = metadata; - } return this.api.makeRequest( "document-load", @@ -2035,16 +1973,10 @@ export function makeFlowApi(api: BaseApi, flowId: string) { ) { const request: LoadTextRequest = { text, + ...(id !== undefined ? { id } : {}), + ...(metadata !== undefined ? { metadata } : {}), + ...(charset !== undefined ? { charset } : {}), }; - if (id !== undefined) { - request.id = id; - } - if (metadata !== undefined) { - request.metadata = metadata; - } - if (charset !== undefined) { - request.charset = charset; - } return this.api.makeRequest( "text-load", @@ -2070,13 +2002,9 @@ export function makeFlowApi(api: BaseApi, flowId: string) { query, user: this.api.user, collection: withDefault(collection, "default"), + ...(variables !== undefined ? { variables } : {}), + ...(operationName !== undefined ? { operation_name: operationName } : {}), }; - if (variables !== undefined) { - request.variables = variables; - } - if (operationName !== undefined) { - request.operation_name = operationName; - } return this.api .makeRequest( @@ -2167,12 +2095,9 @@ export function makeFlowApi(api: BaseApi, flowId: string) { user: this.api.user, collection: withDefault(collection, "default"), limit: limit ?? 10, + ...(indexName !== undefined ? { index_name: indexName } : {}), }; - if (indexName !== undefined) { - request.index_name = indexName; - } - return this.api .makeRequest( "row-embeddings", @@ -2664,16 +2589,3 @@ export function makeCollectionManagementApi(api: BaseApi) { export type CollectionManagementApi = ReturnType; export const CollectionManagementApi = makeCollectionManagementApi; - -/** - * Factory function to create a new TrustGraph WebSocket connection - * This is the main entry point for using the TrustGraph API - * @param user - User identifier for API requests - * @param token - Optional authentication token for secure connections - * @param socketUrl - Optional WebSocket URL (defaults to /api/v1/rpc for browser, provide full URL for Node.js) - */ -export const createTrustGraphSocket = ( - user: string, - token?: string, - socketUrl?: string, -): BaseApi => makeBaseApi(user, token, socketUrl); diff --git a/ts/packages/mcp/src/__tests__/server-effect.test.ts b/ts/packages/mcp/src/__tests__/server-effect.test.ts index b2093009..c37ca328 100644 --- a/ts/packages/mcp/src/__tests__/server-effect.test.ts +++ b/ts/packages/mcp/src/__tests__/server-effect.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "@effect/vitest"; -import type { BaseApi, TrustGraphGatewayClient } from "@trustgraph/client"; -import { DispatchStreamChunk, } from "@trustgraph/client"; +import type { DispatchInput, TrustGraphGatewayClient } from "@trustgraph/client"; +import { DispatchError, DispatchStreamChunk, } from "@trustgraph/client"; import { Effect, Layer, Stream } from "effect"; import * as S from "effect/Schema"; import { McpServer } from "effect/unstable/ai"; @@ -15,7 +15,6 @@ import { TrustGraphMcpToolkit, TrustGraphMcpToolkitLive, TrustGraphGateway, - TrustGraphSocket, } from "../server-effect.js"; const expectedToolNames = [ @@ -61,7 +60,7 @@ interface NativeTestClientOptions { const decodeJsonText = S.decodeUnknownSync(S.UnknownFromJsonString); const makeFakeSocket = ( - options: { + _options: { readonly textCompletion?: (() => Promise) | undefined; readonly graphRag?: (() => Promise) | undefined; } = {}, @@ -71,66 +70,62 @@ const makeFakeSocket = ( graphRag: [], }; - const socket = { - close: () => {}, - flow: (flowId: string) => { - calls.flowIds.push(flowId); - return { - textCompletion: () => options.textCompletion === undefined - ? Promise.resolve("gateway text completion") - : options.textCompletion(), - graphRag: (query: string, ragOptions: unknown, collection?: string) => { - calls.graphRag.push({ query, options: ragOptions, collection }); - return options.graphRag === undefined - ? Promise.resolve("graph rag answer") - : options.graphRag(); - }, - documentRag: () => Promise.resolve("document rag answer"), - agent: ( - _question: string, - _onThought: () => void, - _onObservation: () => void, - onAnswer: (chunk: string, complete: boolean) => void, - ) => onAnswer("agent answer", true), - embeddings: () => Promise.resolve([[0.25, 0.75]]), - triplesQuery: () => Promise.resolve([]), - graphEmbeddingsQuery: () => Promise.resolve([]), - }; - }, - config: () => ({ - getConfigAll: () => Promise.resolve({}), - getConfig: () => Promise.resolve({}), - putConfig: () => Promise.resolve({ ok: true }), - deleteConfig: () => Promise.resolve({ ok: true }), - getPrompts: () => Promise.resolve([]), - getPrompt: () => Promise.resolve({}), - }), - flows: () => ({ - getFlows: () => Promise.resolve(["default"]), - getFlow: () => Promise.resolve({}), - startFlow: () => Promise.resolve({ ok: true }), - stopFlow: () => Promise.resolve({ ok: true }), - }), - librarian: () => ({ - getDocuments: () => Promise.resolve([]), - loadDocument: () => Promise.resolve({ ok: true }), - removeDocument: () => Promise.resolve({ ok: true }), - }), - knowledge: () => ({ - getKnowledgeCores: () => Promise.resolve([]), - deleteKgCore: () => Promise.resolve({ ok: true }), - loadKgCore: () => Promise.resolve({ ok: true }), - }), - } as unknown as BaseApi; - - return { socket, calls }; + return { calls }; }; -const makeFakeGateway = (): TrustGraphGatewayClient => ({ +const makeFakeGateway = ( + calls: FakeSocketCalls, + options: NativeTestClientOptions = {}, +): TrustGraphGatewayClient => ({ state: Effect.succeed({ status: "connected" }), changes: Stream.empty, subscribe: () => Effect.succeed(Effect.void), - dispatch: () => Effect.succeed({}), + dispatch: Effect.fn("FakeTrustGraphGateway.dispatch")(function*(input: DispatchInput) { + if (input.flow !== undefined) calls.flowIds.push(input.flow); + if (input.service === "text-completion") { + const response = options.textCompletion === undefined + ? "gateway text completion" + : yield* Effect.tryPromise({ + try: options.textCompletion, + catch: (cause) => DispatchError.make({ + message: cause instanceof Error ? cause.message : String(cause), + }), + }); + return { response }; + } + if (input.service === "graph-rag") { + calls.graphRag.push({ + query: String(input.request.query ?? ""), + options: { + entityLimit: input.request["entity-limit"], + tripleLimit: input.request["triple-limit"], + }, + collection: typeof input.request.collection === "string" ? input.request.collection : undefined, + }); + const response = options.graphRag === undefined + ? "graph rag answer" + : yield* Effect.tryPromise({ + try: options.graphRag, + catch: (cause) => DispatchError.make({ + message: cause instanceof Error ? cause.message : String(cause), + }), + }); + return { response }; + } + if (input.service === "document-rag") return { response: "document rag answer" }; + if (input.service === "embeddings") return { vectors: [[0.25, 0.75]] }; + if (input.service === "triples") return { triples: [] }; + if (input.service === "graph-embeddings") return { entities: [] }; + if (input.service === "config") return {}; + if (input.service === "flow" && input.request.operation === "list-flows") return { "flow-ids": ["default"] }; + if (input.service === "flow" && input.request.operation === "get-flow") return { flow: "{}" }; + if (input.service === "flow") return { ok: true }; + if (input.service === "librarian" && input.request.operation === "list-documents") return { "document-metadatas": [] }; + if (input.service === "librarian") return { ok: true }; + if (input.service === "knowledge" && input.request.operation === "list-kg-cores") return { ids: [] }; + if (input.service === "knowledge") return { ok: true }; + return {}; + }), dispatchStream: () => Stream.empty, runDispatchStream: (_input, receiver) => Effect.sync(() => { @@ -168,15 +163,14 @@ const makeNativeTestClient = ( const makeNativeTestClientEffect = Effect.fn("makeNativeTestClient")(function*( options: NativeTestClientOptions, ) { - const { socket, calls } = makeFakeSocket({ + const { calls } = makeFakeSocket({ textCompletion: options.textCompletion, graphRag: options.graphRag, }); - const gateway = makeFakeGateway(); + const gateway = makeFakeGateway(calls, options); const serverLayer = McpServer.toolkit(TrustGraphMcpToolkit).pipe( Layer.provide(TrustGraphMcpToolkitLive), Layer.provide(Layer.succeed(TrustGraphGateway, TrustGraphGateway.of(gateway))), - Layer.provide(Layer.succeed(TrustGraphSocket, TrustGraphSocket.of(socket))), Layer.provide(Layer.succeed(TrustGraphMcpConfig, testConfig)), Layer.provide(McpServer.layerHttp({ name: "trustgraph", diff --git a/ts/packages/mcp/src/server-effect.ts b/ts/packages/mcp/src/server-effect.ts index 96d62e36..f4b9f087 100644 --- a/ts/packages/mcp/src/server-effect.ts +++ b/ts/packages/mcp/src/server-effect.ts @@ -1,15 +1,15 @@ import {BunHttpServer, BunRuntime} from "@effect/platform-bun"; import {NodeRuntime, NodeStdio} from "@effect/platform-node"; import type { - BaseApi, + EntityMatch as ClientEntityMatch, Term as ClientTerm, + Triple as ClientTriple, TrustGraphGatewayClient, } from "@trustgraph/client"; import { - createTrustGraphSocket, makeTrustGraphGatewayClientScoped, } from "@trustgraph/client"; -import {Config, Context, Effect, Layer} from "effect"; +import {Clock, Config, Context, Effect, Layer} from "effect"; import * as O from "effect/Option"; import * as Predicate from "effect/Predicate"; import {McpServer, Tool, Toolkit} from "effect/unstable/ai"; @@ -1279,22 +1279,6 @@ export class TrustGraphMcpConfig extends Context.Service()( - "@trustgraph/mcp/server-effect/TrustGraphSocket", -) { - static readonly layer = Layer.effect( - TrustGraphSocket, - Effect.gen(function*() { - const config = yield* TrustGraphMcpConfig - const socket = yield* Effect.acquireRelease( - Effect.sync(() => createTrustGraphSocket(config.user, config.token, config.gatewayUrl)), - (socket) => Effect.sync(() => socket.close()), - ) - return TrustGraphSocket.of(socket) - }), - ) -} - export class TrustGraphGateway extends Context.Service()( "@trustgraph/mcp/server-effect/TrustGraphGateway", ) { @@ -1360,6 +1344,36 @@ const decodeJsonArrayOrFail = ( const asIriTerm = (value: string | undefined): ClientTerm | undefined => value !== undefined && value.length > 0 ? {t: "i", i: value} : undefined +const dispatchGlobal = ( + gateway: TrustGraphGatewayClient, + service: string, + request: Record, + makeError: (cause: unknown) => E, + options: {readonly timeoutMs?: number; readonly retries?: number} = {}, +) => + gateway.dispatch({scope: "global", service, request}, options).pipe( + Effect.map(asRecord), + Effect.mapError(makeError), + ) + +const dispatchFlow = ( + gateway: TrustGraphGatewayClient, + config: TrustGraphMcpConfigShape, + service: string, + request: Record, + makeError: (cause: unknown) => E, + options: {readonly timeoutMs?: number; readonly retries?: number} = {}, +) => + gateway.dispatch({ + scope: "flow", + flow: config.flowId, + service, + request, + }, options).pipe( + Effect.map(asRecord), + Effect.mapError(makeError), + ) + const runAgentTool = Effect.fn("TrustGraphMcpToolkit.agent")(function*( gateway: TrustGraphGatewayClient, config: TrustGraphMcpConfigShape, @@ -1411,90 +1425,134 @@ const runAgentTool = Effect.fn("TrustGraphMcpToolkit.agent")(function*( export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( Effect.gen(function*() { const config = yield* TrustGraphMcpConfig - const socket = yield* TrustGraphSocket const gateway = yield* TrustGraphGateway return TrustGraphMcpToolkit.of({ text_completion: ({system, prompt}) => - Effect.tryPromise({ - try: () => socket.flow(config.flowId).textCompletion(system, prompt), - catch: (cause) => TextCompletionError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((text) => TextCompletionSuccess.make({text})), + dispatchFlow( + gateway, + config, + "text-completion", + {system, prompt}, + (cause) => TextCompletionError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( + Effect.map((response) => TextCompletionSuccess.make({text: stringProperty(response, "response") ?? ""})), ), graph_rag: ({query, entity_limit, triple_limit, collection}) => - Effect.tryPromise({ - try: () => - socket.flow(config.flowId).graphRag( - query, - { - ...(entity_limit !== undefined ? {entityLimit: entity_limit} : {}), - ...(triple_limit !== undefined ? {tripleLimit: triple_limit} : {}), - }, - collection, - ), - catch: (cause) => GraphRagError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((text) => GraphRagSuccess.make({text})), + dispatchFlow( + gateway, + config, + "graph-rag", + { + query, + user: config.user, + collection: collection ?? "default", + ...(entity_limit !== undefined ? {"entity-limit": entity_limit} : {}), + ...(triple_limit !== undefined ? {"triple-limit": triple_limit} : {}), + }, + (cause) => GraphRagError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.map((response) => GraphRagSuccess.make({text: stringProperty(response, "response") ?? ""})), ), document_rag: ({query, doc_limit, collection}) => - Effect.tryPromise({ - try: () => socket.flow(config.flowId).documentRag(query, doc_limit, collection), - catch: (cause) => DocumentRagError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((text) => DocumentRagSuccess.make({text})), + dispatchFlow( + gateway, + config, + "document-rag", + { + query, + user: config.user, + collection: collection ?? "default", + ...(doc_limit !== undefined ? {"doc-limit": doc_limit} : {}), + }, + (cause) => DocumentRagError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.map((response) => DocumentRagSuccess.make({text: stringProperty(response, "response") ?? ""})), ), agent: ({question}) => runAgentTool(gateway, config, question), embeddings: ({text}) => - Effect.tryPromise({ - try: () => socket.flow(config.flowId).embeddings([...text]), - catch: (cause) => EmbeddingsError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((vectors) => EmbeddingsSuccess.make({vectors})), + dispatchFlow( + gateway, + config, + "embeddings", + {texts: [...text]}, + (cause) => EmbeddingsError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( + Effect.map((response) => EmbeddingsSuccess.make({ + vectors: Array.isArray(response.vectors) ? response.vectors as number[][] : [], + })), ), triples_query: ({s, p, o, limit, collection}) => - Effect.tryPromise({ - try: () => - socket.flow(config.flowId).triplesQuery( - asIriTerm(s), - asIriTerm(p), - asIriTerm(o), - limit, - collection, - ), - catch: (cause) => TriplesQueryError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((triples) => TriplesQuerySuccess.make({triples})), + dispatchFlow( + gateway, + config, + "triples", + { + limit: limit ?? 20, + user: config.user, + collection: collection ?? "default", + ...(asIriTerm(s) !== undefined ? {s: asIriTerm(s)} : {}), + ...(asIriTerm(p) !== undefined ? {p: asIriTerm(p)} : {}), + ...(asIriTerm(o) !== undefined ? {o: asIriTerm(o)} : {}), + }, + (cause) => TriplesQueryError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( + Effect.map((response) => TriplesQuerySuccess.make({ + triples: Array.isArray(response.triples ?? response.response) + ? (response.triples ?? response.response) as ClientTriple[] + : [], + })), ), graph_embeddings_query: ({query, limit, collection}) => - Effect.tryPromise({ - try: () => socket.flow(config.flowId).embeddings([query]), - catch: (cause) => GraphEmbeddingsQueryError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.flatMap((vectors) => - Effect.tryPromise({ - try: () => socket.flow(config.flowId).graphEmbeddingsQuery( - vectors[0] ?? [], - limit ?? 10, - collection, - ), - catch: (cause) => GraphEmbeddingsQueryError.make({message: toErrorMessage(cause)}), - }) - ), - Effect.map((entities) => GraphEmbeddingsQuerySuccess.make({entities})), + dispatchFlow( + gateway, + config, + "embeddings", + {texts: [query]}, + (cause) => GraphEmbeddingsQueryError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( + Effect.flatMap((embeddingResponse) => { + const vectors = Array.isArray(embeddingResponse.vectors) ? embeddingResponse.vectors : [] + const firstVector = Array.isArray(vectors[0]) ? vectors[0] as number[] : [] + return dispatchFlow( + gateway, + config, + "graph-embeddings", + { + vector: firstVector, + limit: limit ?? 10, + user: config.user, + collection: collection ?? "default", + }, + (cause) => GraphEmbeddingsQueryError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ) + }), + Effect.map((response) => GraphEmbeddingsQuerySuccess.make({ + entities: Array.isArray(response.entities) ? response.entities as ClientEntityMatch[] : [], + })), ), get_config_all: () => - Effect.tryPromise({ - try: () => socket.config().getConfigAll(), - catch: (cause) => GetConfigAllError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "config", + {operation: "config"}, + (cause) => GetConfigAllError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1506,10 +1564,13 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), get_config: ({keys}) => - Effect.tryPromise({ - try: () => socket.config().getConfig(keys.map(({type, key}) => ({type, key}))), - catch: (cause) => GetConfigError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "config", + {operation: "get", keys: keys.map(({type, key}) => ({type, key}))}, + (cause) => GetConfigError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1521,10 +1582,13 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), put_config: ({values}) => - Effect.tryPromise({ - try: () => socket.config().putConfig(values.map(({type, key, value}) => ({type, key, value}))), - catch: (cause) => PutConfigError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "config", + {operation: "put", values: values.map(({type, key, value}) => ({type, key, value}))}, + (cause) => PutConfigError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1536,10 +1600,13 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), delete_config: ({type, key}) => - Effect.tryPromise({ - try: () => socket.config().deleteConfig({type, key}), - catch: (cause) => DeleteConfigError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "config", + {operation: "delete", keys: [{type, key}]}, + (cause) => DeleteConfigError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1551,21 +1618,29 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), get_flows: () => - Effect.tryPromise({ - try: () => socket.flows().getFlows(), - catch: (cause) => GetFlowsError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((flow_ids) => GetFlowsSuccess.make({flow_ids})), + dispatchGlobal( + gateway, + "flow", + {operation: "list-flows"}, + (cause) => GetFlowsError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.map((response) => GetFlowsSuccess.make({ + flow_ids: Array.isArray(response["flow-ids"]) ? response["flow-ids"] as string[] : [], + })), ), get_flow: ({flow_id}) => - Effect.tryPromise({ - try: () => socket.flows().getFlow(flow_id), - catch: (cause) => GetFlowError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.flatMap((value) => + dispatchGlobal( + gateway, + "flow", + {operation: "get-flow", "flow-id": flow_id}, + (cause) => GetFlowError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.flatMap((response) => decodeJsonOrFail( - value, + response.flow, (cause) => GetFlowError.make({message: toErrorMessage(cause)}), ).pipe( Effect.map((flow) => GetFlowSuccess.make({flow})), @@ -1574,16 +1649,19 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), start_flow: ({flow_id, blueprint_name, description, parameters}) => - Effect.tryPromise({ - try: () => - socket.flows().startFlow( - flow_id, - blueprint_name, - description, - parameters === undefined ? undefined : {...parameters}, - ), - catch: (cause) => StartFlowError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "flow", + { + operation: "start-flow", + "flow-id": flow_id, + "blueprint-name": blueprint_name, + description, + ...(parameters === undefined ? {} : {parameters: {...parameters}}), + }, + (cause) => StartFlowError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1595,10 +1673,13 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), stop_flow: ({flow_id}) => - Effect.tryPromise({ - try: () => socket.flows().stopFlow(flow_id), - catch: (cause) => StopFlowError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "flow", + {operation: "stop-flow", "flow-id": flow_id}, + (cause) => StopFlowError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1610,13 +1691,16 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), get_documents: () => - Effect.tryPromise({ - try: () => socket.librarian().getDocuments(), - catch: (cause) => GetDocumentsError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "librarian", + {operation: "list-documents", user: config.user}, + (cause) => GetDocumentsError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( Effect.flatMap((value) => decodeJsonArrayOrFail( - value, + value["document-metadatas"] ?? value.documents ?? [], (cause) => GetDocumentsError.make({message: toErrorMessage(cause)}), ).pipe( Effect.map((documents) => GetDocumentsSuccess.make({documents})), @@ -1624,34 +1708,53 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), ), - load_document: ({document, mime_type, title, comments, tags, id}) => - Effect.tryPromise({ - try: () => - socket.librarian().loadDocument( - document, - mime_type, - title, - comments ?? "", - tags === undefined ? [] : [...tags], - id, - ), - catch: (cause) => LoadDocumentError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.flatMap((value) => - decodeJsonOrFail( - value, - (cause) => LoadDocumentError.make({message: toErrorMessage(cause)}), - ).pipe( - Effect.map((response) => LoadDocumentSuccess.make({response})), - ) - ), - ), + load_document: Effect.fn("TrustGraphMcpToolkit.load_document")(function*({document, mime_type, title, comments, tags, id}) { + const timestamp = yield* Clock.currentTimeMillis + const metadata = { + time: Math.floor(timestamp / 1000), + kind: mime_type, + title, + comments: comments ?? "", + user: config.user, + tags: tags === undefined ? [] : [...tags], + "document-type": "source", + documentType: "source", + ...(id === undefined ? {} : {id}), + } + const value = yield* dispatchGlobal( + gateway, + "librarian", + { + operation: "add-document", + "document-metadata": metadata, + documentMetadata: metadata, + content: document, + }, + (cause) => LoadDocumentError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ) + return yield* decodeJsonOrFail( + value, + (cause) => LoadDocumentError.make({message: toErrorMessage(cause)}), + ).pipe( + Effect.map((response) => LoadDocumentSuccess.make({response})), + ) + }), remove_document: ({id, collection}) => - Effect.tryPromise({ - try: () => socket.librarian().removeDocument(id, collection), - catch: (cause) => RemoveDocumentError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "librarian", + { + operation: "remove-document", + "document-id": id, + documentId: id, + user: config.user, + collection: collection ?? "default", + }, + (cause) => RemoveDocumentError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1663,21 +1766,34 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), get_prompts: () => - Effect.tryPromise({ - try: () => socket.config().getPrompts(), - catch: (cause) => GetPromptsError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((prompts) => GetPromptsSuccess.make({prompts})), + dispatchGlobal( + gateway, + "config", + {operation: "config"}, + (cause) => GetPromptsError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.map((response) => { + const promptNs = asRecord(asRecord(response.config).prompt) + const prompts = Object.keys(promptNs) + .filter((key) => key !== "system") + .sort() + .map((id) => ({id, name: id})) + return GetPromptsSuccess.make({prompts}) + }), ), get_prompt: ({id}) => - Effect.tryPromise({ - try: () => socket.config().getPrompt(id), - catch: (cause) => GetPromptError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.flatMap((value) => + dispatchGlobal( + gateway, + "config", + {operation: "config"}, + (cause) => GetPromptError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.flatMap((response) => decodeJsonOrFail( - value, + asRecord(asRecord(response.config).prompt)[id] ?? null, (cause) => GetPromptError.make({message: toErrorMessage(cause)}), ).pipe( Effect.map((prompt) => GetPromptSuccess.make({prompt})), @@ -1686,18 +1802,31 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), get_knowledge_cores: () => - Effect.tryPromise({ - try: () => socket.knowledge().getKnowledgeCores(), - catch: (cause) => GetKnowledgeCoresError.make({message: toErrorMessage(cause)}), - }).pipe( - Effect.map((ids) => GetKnowledgeCoresSuccess.make({ids})), + dispatchGlobal( + gateway, + "knowledge", + {operation: "list-kg-cores", user: config.user}, + (cause) => GetKnowledgeCoresError.make({message: toErrorMessage(cause)}), + {timeoutMs: 60_000}, + ).pipe( + Effect.map((response) => GetKnowledgeCoresSuccess.make({ + ids: Array.isArray(response.ids) ? response.ids as string[] : [], + })), ), delete_kg_core: ({id, collection}) => - Effect.tryPromise({ - try: () => socket.knowledge().deleteKgCore(id, collection), - catch: (cause) => DeleteKgCoreError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "knowledge", + { + operation: "delete-kg-core", + id, + user: config.user, + collection: collection ?? "default", + }, + (cause) => DeleteKgCoreError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1709,10 +1838,19 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( ), load_kg_core: ({id, flow, collection}) => - Effect.tryPromise({ - try: () => socket.knowledge().loadKgCore(id, flow, collection), - catch: (cause) => LoadKgCoreError.make({message: toErrorMessage(cause)}), - }).pipe( + dispatchGlobal( + gateway, + "knowledge", + { + operation: "load-kg-core", + id, + flow, + user: config.user, + collection: collection ?? "default", + }, + (cause) => LoadKgCoreError.make({message: toErrorMessage(cause)}), + {timeoutMs: 30_000}, + ).pipe( Effect.flatMap((value) => decodeJsonOrFail( value, @@ -1773,7 +1911,6 @@ const makeTrustGraphMcpHttpLayerFromConfig = ( path: config.mcpPath, })), Layer.provide(TrustGraphGateway.layer), - Layer.provide(TrustGraphSocket.layer), Layer.provide(Layer.succeed(TrustGraphMcpConfig, TrustGraphMcpConfig.of(config))), ) } @@ -1793,7 +1930,6 @@ const makeTrustGraphMcpStdioLayerFromConfig = ( })), Layer.provide(NodeStdio.layer), Layer.provide(TrustGraphGateway.layer), - Layer.provide(TrustGraphSocket.layer), Layer.provide(Layer.succeed(TrustGraphMcpConfig, TrustGraphMcpConfig.of(config))), ) diff --git a/ts/packages/workbench/src/atoms/workbench.ts b/ts/packages/workbench/src/atoms/workbench.ts index ae79af9f..a5b93f35 100644 --- a/ts/packages/workbench/src/atoms/workbench.ts +++ b/ts/packages/workbench/src/atoms/workbench.ts @@ -3,7 +3,6 @@ import * as BrowserHttpClient from "@effect/platform-browser/BrowserHttpClient"; import * as BrowserKeyValueStore from "@effect/platform-browser/BrowserKeyValueStore"; import type { GraphRagOptions, - BaseApi, BeginUploadResponse, ChunkedUploadDocumentMetadata, CompleteUploadResponse, @@ -18,7 +17,6 @@ import type { import { DispatchPayload, GatewayWorkbenchHttpApi, - makeBaseApi, TrustGraphRpcs, } from "@trustgraph/client"; import type { Scope, } from "effect"; @@ -204,10 +202,6 @@ export class Settings extends S.Class("Settings")({ featureSwitches: FeatureSwitches, }, { description: "Persisted workbench connection and display settings." }) {} -export interface WorkbenchApiFactory { - readonly create: (settings: Settings) => BaseApi; -} - export type Theme = "dark" | "light"; export type ChatMode = "graph-rag" | "document-rag" | "agent"; @@ -764,7 +758,11 @@ function explainTriplesFrom(source: unknown): Triple[] | undefined { } function streamingMetadataFrom(source: unknown): StreamingMetadata | undefined { - const metadata: StreamingMetadata = {}; + const metadata: { + in_token?: number; + out_token?: number; + model?: string; + } = {}; let hasMetadata = false; const inToken = numberProperty(source, "in_token"); @@ -804,9 +802,9 @@ function ensureNoGatewayResponseError(operation: string, value: A): Effect.Ef : Effect.fail(WorkbenchPromiseError.make({ cause: value, message: `${operation}: ${message}` })); } -function qaBaseApi(): BaseApi | undefined { +function qaBaseApi(): import("@trustgraph/client").BaseApi | undefined { if (typeof window === "undefined") return undefined; - return (window as Window & { __TRUSTGRAPH_WORKBENCH_QA_API__?: BaseApi }).__TRUSTGRAPH_WORKBENCH_QA_API__; + return (window as Window & { __TRUSTGRAPH_WORKBENCH_QA_API__?: import("@trustgraph/client").BaseApi }).__TRUSTGRAPH_WORKBENCH_QA_API__; } function makeWorkbenchGatewayApi(settings: Settings) { @@ -1016,9 +1014,9 @@ function makeWorkbenchGatewayApi(settings: Settings) { tags, "document-type": "source", documentType: "source", + ...(id !== undefined ? { id } : {}), + ...(metadata !== undefined ? { metadata } : {}), }; - if (id !== undefined) documentMetadata.id = id; - if (metadata !== undefined) documentMetadata.metadata = metadata; return yield* dispatch("librarian", { operation: "add-document", "document-metadata": documentMetadata, @@ -1184,10 +1182,8 @@ function makeWorkbenchGatewayApi(settings: Settings) { const event: ExplainEvent = { explainId: explainId ?? "", explainGraph: stringProperty(resp, "explain_graph") ?? "", + ...(explainTriples !== undefined ? { explainTriples } : {}), }; - if (explainTriples !== undefined) { - event.explainTriples = explainTriples; - } onExplain?.(event); if ( stringProperty(resp, "response") === undefined && @@ -1305,10 +1301,8 @@ function makeWorkbenchGatewayApi(settings: Settings) { const event: ExplainEvent = { explainId: explainId ?? "", explainGraph: stringProperty(resp, "explain_graph") ?? "", + ...(explainTriples !== undefined ? { explainTriples } : {}), }; - if (explainTriples !== undefined) { - event.explainTriples = explainTriples; - } onExplain?.(event); return false; } @@ -1621,34 +1615,14 @@ export const toggleThemeAtom = Atom.writable( // Socket lifecycle // --------------------------------------------------------------------------- -const liveApiFactory: WorkbenchApiFactory = { - create: (settings) => - makeBaseApi( - settings.user, - settings.apiKey.length > 0 ? settings.apiKey : undefined, - settings.gatewayUrl.length > 0 ? settings.gatewayUrl : undefined, - ), -}; - -export const apiFactoryAtom = Atom.make(liveApiFactory).pipe(Atom.keepAlive); - -export const apiAtom = Atom.make((get) => { - const settings = get(settingsAtom); - const api = get(apiFactoryAtom).create(settings); - get.addFinalizer(() => api.close()); - return api; -}).pipe(Atom.keepAlive); - export const connectionStateAtom = Atom.make((get) => { - const api = get(apiAtom); - const fallback: ConnectionState = { - status: "connecting", - hasApiKey: get(settingsAtom).apiKey.length > 0, + const settings = get(settingsAtom); + const hasApiKey = settings.apiKey.length > 0; + const state: ConnectionState = { + status: hasApiKey ? "authenticated" : "unauthenticated", + hasApiKey, }; - const previous = Option.getOrElse(get.self(), () => fallback); - const unsubscribe = api.onConnectionStateChange((state) => get.setSelf(state)); - get.addFinalizer(unsubscribe); - return previous; + return state; }).pipe(Atom.keepAlive); // --------------------------------------------------------------------------- diff --git a/ts/packages/workbench/src/qa/initial-values.ts b/ts/packages/workbench/src/qa/initial-values.ts index 8d0aabe6..f67df778 100644 --- a/ts/packages/workbench/src/qa/initial-values.ts +++ b/ts/packages/workbench/src/qa/initial-values.ts @@ -2,10 +2,8 @@ import type * as Atom from "effect/unstable/reactivity/Atom"; import type { FeatureSwitches, Settings, - WorkbenchApiFactory, } from "@/atoms/workbench"; import { - apiFactoryAtom, DEFAULT_SETTINGS, flowIdAtom, settingsAtom, @@ -45,12 +43,8 @@ export function getWorkbenchQaInitialValues(): Iterable api, - }; window.__TRUSTGRAPH_WORKBENCH_QA_API__ = api; return [ - [apiFactoryAtom as Atom.Atom, apiFactory], [settingsAtom as Atom.Atom, qaSettings(fixture)], [flowIdAtom as Atom.Atom, config.flowId ?? "default"], ]; diff --git a/ts/scripts/effect-laws.allowlist.json b/ts/scripts/effect-laws.allowlist.json index 522eda3a..fa293e2a 100644 --- a/ts/scripts/effect-laws.allowlist.json +++ b/ts/scripts/effect-laws.allowlist.json @@ -1 +1 @@ -{"exemptions":[],"baseline":[{"rule":"no-effect-run","path":"packages/client/src/socket/effect-rpc-client.ts","count":10},{"rule":"no-effect-run","path":"packages/client/src/socket/trustgraph-socket.ts","count":9},{"rule":"no-error-throw","path":"packages/workbench/src/main.tsx","count":1},{"rule":"no-error-throw","path":"scripts/seed-config.ts","count":1},{"rule":"no-error-throw","path":"scripts/seed-demo.ts","count":4},{"rule":"no-error-throw","path":"scripts/seed-flows.ts","count":2},{"rule":"no-error-throw","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-native-fetch","path":"scripts/seed-config.ts","count":1},{"rule":"no-native-fetch","path":"scripts/seed-demo.ts","count":11},{"rule":"no-native-fetch","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-fetch","path":"scripts/test-pipeline.ts","count":5},{"rule":"no-native-json","path":"scripts/seed-config.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-demo.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-flows.ts","count":3},{"rule":"no-native-json","path":"scripts/test-pipeline.ts","count":6},{"rule":"no-native-sort","path":"packages/client/src/socket/trustgraph-socket.ts","count":2},{"rule":"no-native-sort","path":"packages/flow/src/config/service.ts","count":3},{"rule":"no-native-sort","path":"packages/flow/src/cores/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/flow-manager/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/librarian/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/retrieval/graph-rag.ts","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/atoms/workbench.ts","count":2},{"rule":"no-native-sort","path":"packages/workbench/src/components/chat/explain-graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/pages/graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/qa/mock-api.ts","count":1},{"rule":"no-native-sort","path":"scripts/inventory-native-classes.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-demo.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-timers","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-node-fs-path","path":"scripts/create-test-pdf.ts","count":1},{"rule":"no-node-fs-path","path":"scripts/inventory-native-classes.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-config.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-demo.ts","count":5},{"rule":"no-process-env","path":"scripts/seed-flows.ts","count":1},{"rule":"no-process-env","path":"scripts/test-pipeline.ts","count":11},{"rule":"no-schema-suffix","path":"packages/base/src/schema/primitives.ts","count":1},{"rule":"schema-first-data","path":"packages/client/src/models/Triple.ts","count":6},{"rule":"schema-first-data","path":"packages/client/src/models/messages.ts","count":58},{"rule":"schema-first-data","path":"packages/client/src/socket/effect-rpc-client.ts","count":2},{"rule":"schema-first-data","path":"packages/client/src/socket/trustgraph-socket.ts","count":4}]} +{"exemptions":[],"baseline":[{"rule":"no-error-throw","path":"packages/workbench/src/main.tsx","count":1},{"rule":"no-error-throw","path":"scripts/seed-config.ts","count":1},{"rule":"no-error-throw","path":"scripts/seed-demo.ts","count":4},{"rule":"no-error-throw","path":"scripts/seed-flows.ts","count":2},{"rule":"no-error-throw","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-native-fetch","path":"scripts/seed-config.ts","count":1},{"rule":"no-native-fetch","path":"scripts/seed-demo.ts","count":11},{"rule":"no-native-fetch","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-fetch","path":"scripts/test-pipeline.ts","count":5},{"rule":"no-native-json","path":"scripts/seed-config.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-demo.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-flows.ts","count":3},{"rule":"no-native-json","path":"scripts/test-pipeline.ts","count":6},{"rule":"no-native-sort","path":"packages/client/src/socket/trustgraph-socket.ts","count":2},{"rule":"no-native-sort","path":"packages/flow/src/config/service.ts","count":3},{"rule":"no-native-sort","path":"packages/flow/src/cores/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/flow-manager/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/librarian/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/retrieval/graph-rag.ts","count":1},{"rule":"no-native-sort","path":"packages/mcp/src/server-effect.ts","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/atoms/workbench.ts","count":2},{"rule":"no-native-sort","path":"packages/workbench/src/components/chat/explain-graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/pages/graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/qa/mock-api.ts","count":1},{"rule":"no-native-sort","path":"scripts/inventory-native-classes.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-demo.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-timers","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-node-fs-path","path":"scripts/create-test-pdf.ts","count":1},{"rule":"no-node-fs-path","path":"scripts/inventory-native-classes.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-config.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-demo.ts","count":5},{"rule":"no-process-env","path":"scripts/seed-flows.ts","count":1},{"rule":"no-process-env","path":"scripts/test-pipeline.ts","count":11},{"rule":"no-schema-suffix","path":"packages/base/src/schema/primitives.ts","count":1}]}