From cf12defcd80b7962d4ec29382e10eb31c02854b9 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 11 Jun 2026 06:29:29 -0500 Subject: [PATCH] =?UTF-8?q?refactor(ts):=20complete=20legacy=20host=20remo?= =?UTF-8?q?val=20=E2=80=94=20drop=20fastify/commander/zod,=20delete=20MCP?= =?UTF-8?q?=20SDK=20server,=20remove=20ManagedRuntime=20facades?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finishes the remaining EFFECT_NATIVE_REWRITE_PLAN stages in one verified slice: - fastify, @fastify/websocket, commander, zod removed from all package manifests - legacy @modelcontextprotocol/sdk stdio server deleted; effect/unstable/ai McpServer is canonical - no ManagedRuntime or Effect.runPromise program facades remain in production source - gateway server/rpc-contract and client rpc/socket moved onto Effect v4 native http/rpc/socket layers Gates (force-run, no cache): check:tsgo, build, test (96 tests / 11 tasks) all green. Native-class inventory: zero blocking production classes. Co-Authored-By: Claude Fable 5 --- ts/bun.lock | 66 +- ts/packages/base/package.json | 8 - .../base/src/processor/async-processor.ts | 23 +- ts/packages/base/src/processor/index.ts | 1 + ts/packages/base/src/processor/program.ts | 28 +- ts/packages/cli/package.json | 8 +- ts/packages/cli/src/commands/agent.ts | 88 +- ts/packages/cli/src/commands/util.ts | 24 +- ts/packages/client/package.json | 10 + .../client/src/__tests__/flows-api.test.ts | 2 +- .../src/__tests__/workbench-contracts.test.ts | 16 +- ts/packages/client/src/index.ts | 1 + ts/packages/client/src/rpc/contract.ts | 12 + .../client/src/socket/effect-rpc-client.ts | 332 +++--- .../client/src/socket/trustgraph-socket.ts | 45 +- ts/packages/flow/package.json | 10 +- .../flow/src/gateway/dispatch/manager.ts | 15 + ts/packages/flow/src/gateway/rpc-contract.ts | 35 +- ts/packages/flow/src/gateway/server.ts | 24 +- ts/packages/flow/tsconfig.json | 3 +- ts/packages/mcp/package.json | 13 +- .../mcp/src/__tests__/server-effect.test.ts | 30 +- ts/packages/mcp/src/server-effect.ts | 114 +- ts/packages/workbench/package.json | 15 +- ts/packages/workbench/src/atoms/workbench.ts | 994 ++++++++++++++++-- .../src/components/notification-toasts.tsx | 6 +- ts/packages/workbench/src/pages/settings.tsx | 4 +- .../workbench/src/qa/initial-values.ts | 3 + ts/packages/workbench/vite.config.ts | 30 +- ts/scripts/inventory-native-classes.ts | 2 + 30 files changed, 1506 insertions(+), 456 deletions(-) diff --git a/ts/bun.lock b/ts/bun.lock index 69f27400..a4b7b0df 100644 --- a/ts/bun.lock +++ b/ts/bun.lock @@ -27,14 +27,6 @@ "name": "@trustgraph/base", "version": "0.1.0", "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", - "@effect/platform-bun": "4.0.0-beta.78", "effect": "4.0.0-beta.78", "nats": "^2.29.0", }, @@ -52,16 +44,10 @@ "tg": "dist/index.js", }, "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", "@trustgraph/base": "workspace:*", "@trustgraph/client": "workspace:*", + "effect": "4.0.0-beta.78", "ws": "^8.18.0", }, "devDependencies": { @@ -97,21 +83,13 @@ "version": "0.1.0", "dependencies": { "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78", "@mistralai/mistralai": "^1.0.0", "@modelcontextprotocol/sdk": "^1.12.0", "@qdrant/js-client-rest": "^1.13.0", "@trustgraph/base": "workspace:*", + "@trustgraph/client": "workspace:*", "effect": "4.0.0-beta.78", "falkordb": "^5.0.0", "ollama": "^0.6.3", @@ -129,20 +107,11 @@ "name": "@trustgraph/mcp", "version": "0.1.0", "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78", "@trustgraph/base": "workspace:*", "@trustgraph/client": "workspace:*", + "effect": "4.0.0-beta.78", }, "devDependencies": { "@effect/vitest": "4.0.0-beta.78", @@ -155,21 +124,11 @@ "name": "@trustgraph/workbench", "version": "0.1.0", "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", "@effect/platform-browser": "4.0.0-beta.78", - "@effect/platform-bun": "4.0.0-beta.78", - "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78", - "@tanstack/react-query": "^5.75.0", "@trustgraph/client": "workspace:*", "clsx": "^2.1.0", + "effect": "4.0.0-beta.78", "lucide-react": "^0.513.0", "react": "^19.1.0", "react-dom": "^19.1.0", @@ -178,7 +137,6 @@ "react-markdown": "^10.1.0", "react-router": "^7.6.0", "tailwind-merge": "^3.3.0", - "zustand": "^5.0.0", }, "devDependencies": { "@effect/vitest": "4.0.0-beta.78", @@ -234,16 +192,8 @@ "@effect/ai-anthropic": ["@effect/ai-anthropic@4.0.0-beta.78", "", { "peerDependencies": { "effect": "^4.0.0-beta.78" } }, "sha512-FZzwvKx2k+UIDJyv+FWMtC2CKRxJMWTII+z8EmMcT6tw1uzkkn+ouIyJ32AU+lfveKScNV1SXL4ml3VdcYGy/g=="], - "@effect/ai-openai": ["@effect/ai-openai@4.0.0-beta.78", "", { "peerDependencies": { "effect": "^4.0.0-beta.78" } }, "sha512-tx953rRkLqW2BeEkWK12/nRBPO0b1eS6pI+2YyWI0nQvX2JTTijrGlBv/qDVa5kxDkLm63+tA04xnxgZMlA8NA=="], - - "@effect/ai-openrouter": ["@effect/ai-openrouter@4.0.0-beta.78", "", { "peerDependencies": { "effect": "^4.0.0-beta.78" } }, "sha512-9GIRU9stAnDU5EJ5ZghUWrQXaE+rECCWI/eKVfYeC7UqjZmmmJmTcEbid3tvz2NMsnvIn0ymeKsJAohWCys39w=="], - "@effect/atom-react": ["@effect/atom-react@4.0.0-beta.78", "", { "peerDependencies": { "effect": "^4.0.0-beta.78", "react": "^19.2.4", "scheduler": "*" } }, "sha512-cgxDXJaD0wlbQXbp6tiEmmY+yajwurB0ynkFG20RVucvH4LsQMB3ogiHe0mt42wGggfbVYMEDxgBpQdqDRY8yA=="], - "@effect/openapi-generator": ["@effect/openapi-generator@4.0.0-beta.78", "", { "peerDependencies": { "@effect/platform-node": "^4.0.0-beta.78", "effect": "^4.0.0-beta.78" }, "bin": { "openapigen": "dist/bin.js" } }, "sha512-qhKRcZCNQ5b0Klrct+AC/tPQgIDBxVsD0MkQLIzqvLU3qRHaNd5yHo7kxFf/DuhCyyL++xZfbHsPdq3VdLIByg=="], - - "@effect/opentelemetry": ["@effect/opentelemetry@4.0.0-beta.78", "", { "peerDependencies": { "@opentelemetry/api": "^1.9", "@opentelemetry/api-logs": ">=0.203.0 <0.300.0", "@opentelemetry/resources": "^2.0.0", "@opentelemetry/sdk-logs": ">=0.203.0 <0.300.0", "@opentelemetry/sdk-metrics": "^2.0.0", "@opentelemetry/sdk-trace-base": "^2.0.0", "@opentelemetry/sdk-trace-node": "^2.0.0", "@opentelemetry/sdk-trace-web": "^2.0.0", "@opentelemetry/semantic-conventions": "^1.33.0", "effect": "^4.0.0-beta.78" }, "optionalPeers": ["@opentelemetry/api", "@opentelemetry/api-logs", "@opentelemetry/resources", "@opentelemetry/sdk-logs", "@opentelemetry/sdk-metrics", "@opentelemetry/sdk-trace-base", "@opentelemetry/sdk-trace-node", "@opentelemetry/sdk-trace-web"] }, "sha512-OJGnlNkxfhUmZ/8aLIfQly8ic2tntcnwidAP0BdrTUKa1/sbZjq5xTrhVUjvmehFra2Thsef0k4UPTgsOrBG1A=="], - "@effect/platform-browser": ["@effect/platform-browser@4.0.0-beta.78", "", { "dependencies": { "multipasta": "^0.2.7" }, "peerDependencies": { "effect": "^4.0.0-beta.78" } }, "sha512-8r9MVuZ8xJRyVyi+C8SKSYLbMsHr7qOiUgLV6lKMECuAWyMhlbK/7Ka9SQGr0ZPqOe5ShLEvV7DevnGkG+owAQ=="], "@effect/platform-bun": ["@effect/platform-bun@4.0.0-beta.78", "", { "dependencies": { "@effect/platform-node-shared": "^4.0.0-beta.78" }, "peerDependencies": { "effect": "^4.0.0-beta.78" } }, "sha512-lmPCL1G7SlkCWCguX3rDPS7kKuvJ/AN4pjS7IXb/5SoauHPd67iUdc1ZbB7o6lwTChJaIfWNNPkUWygiaUeJiA=="], @@ -382,8 +332,6 @@ "@opentelemetry/api": ["@opentelemetry/api@1.9.1", "", {}, "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q=="], - "@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.41.1", "", {}, "sha512-/UhIkaZgPutTFmQ7RnIJGgDXZmtEJ7Dvi86xNTFWcnRxVRNk/aotsqDJYeEvDP+FSMB2SdW+pQzNMcWP0rwuNA=="], - "@pdf-lib/standard-fonts": ["@pdf-lib/standard-fonts@1.0.0", "", { "dependencies": { "pako": "^1.0.6" } }, "sha512-hU30BK9IUN/su0Mn9VdlVKsWBS6GyhVfqjwl1FjZN4TxP6cCw0jP2w7V3Hf5uX7M0AZJ16vey9yE0ny7Sa59ZA=="], "@pdf-lib/upng": ["@pdf-lib/upng@1.0.1", "", { "dependencies": { "pako": "^1.0.10" } }, "sha512-dQK2FUMQtowVP00mtIksrlZhdFXQZPC+taih1q4CvPZ5vqdxR/LKBaFg0oAfzd1GlHZXXSPdQfzQnt+ViGvEIQ=="], @@ -478,10 +426,6 @@ "@tailwindcss/vite": ["@tailwindcss/vite@4.2.2", "", { "dependencies": { "@tailwindcss/node": "4.2.2", "@tailwindcss/oxide": "4.2.2", "tailwindcss": "4.2.2" }, "peerDependencies": { "vite": "6.4.1" } }, "sha512-mEiF5HO1QqCLXoNEfXVA1Tzo+cYsrqV7w9Juj2wdUFyW07JRenqMG225MvPwr3ZD9N1bFQj46X7r33iHxLUW0w=="], - "@tanstack/query-core": ["@tanstack/query-core@5.96.2", "", {}, "sha512-hzI6cTVh4KNRk8UtoIBS7Lv9g6BnJPXvBKsvYH1aGWvv0347jT3BnSvztOE+kD76XGvZnRC/t6qdW1CaIfwCeA=="], - - "@tanstack/react-query": ["@tanstack/react-query@5.96.2", "", { "dependencies": { "@tanstack/query-core": "5.96.2" }, "peerDependencies": { "react": "19.2.4" } }, "sha512-sYyzzJT4G0g02azzJ8o55VFFV31XvFpdUpG+unxS0vSaYsJnSPKGoI6WdPwUucJL1wpgGfwfmntNX/Ub1uOViA=="], - "@trustgraph/base": ["@trustgraph/base@workspace:packages/base"], "@trustgraph/cli": ["@trustgraph/cli@workspace:packages/cli"], @@ -1242,8 +1186,6 @@ "zod-to-json-schema": ["zod-to-json-schema@3.25.2", "", { "peerDependencies": { "zod": "3.25.76" } }, "sha512-O/PgfnpT1xKSDeQYSCfRI5Gy3hPf91mKVDuYLUHZJMiDFptvP41MSnWofm8dnCm0256ZNfZIM7DSzuSMAFnjHA=="], - "zustand": ["zustand@5.0.12", "", { "optionalDependencies": { "@types/react": "19.2.14", "react": "19.2.4" } }, "sha512-i77ae3aZq4dhMlRhJVCYgMLKuSiZAaUPAct2AksxQ+gOtimhGMdXljRT21P5BNpeT4kXlLIckvkPM029OljD7g=="], - "zwitch": ["zwitch@2.0.4", "", {}, "sha512-bXE4cR/kVZhKZX/RjPEflHaKVhUVl85noU3v6b8apfQEc1x4A+zBxjZ4lN8LqGd6WZ3dl98pY4o717VFmoPp+A=="], "@qdrant/js-client-rest/undici": ["undici@6.24.1", "", {}, "sha512-sC+b0tB1whOCzbtlx20fx3WgCXwkW627p4EA9uM+/tNNPkSS+eSEld6pAs9nDv7WbY1UUljBMYPtu9BCOrCWKA=="], diff --git a/ts/packages/base/package.json b/ts/packages/base/package.json index f7d8b5fd..d87b4e79 100644 --- a/ts/packages/base/package.json +++ b/ts/packages/base/package.json @@ -20,14 +20,6 @@ "test": "bunx --bun vitest run" }, "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", - "@effect/platform-bun": "4.0.0-beta.78", "effect": "4.0.0-beta.78", "nats": "^2.29.0" }, diff --git a/ts/packages/base/src/processor/async-processor.ts b/ts/packages/base/src/processor/async-processor.ts index ed725a1b..46fb6339 100644 --- a/ts/packages/base/src/processor/async-processor.ts +++ b/ts/packages/base/src/processor/async-processor.ts @@ -7,7 +7,7 @@ */ import type { PubSubBackend } from "../backend/types.js"; -import { makeNatsBackend } from "../backend/nats.js"; +import { makeNatsBackend, makeNatsBackendScoped } from "../backend/nats.js"; import { Cause, Config as EffectConfig, Context, Effect } from "effect"; import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js"; import { loadProcessorRuntimeConfig } from "../runtime/config.js"; @@ -198,6 +198,27 @@ export function makeAsyncProcessor< return processor; } +export const makeAsyncProcessorScoped = Effect.fn("makeAsyncProcessorScoped")(function* < + RunError = ProcessorLifecycleError, + RunRequirements = never, +>( + config: ProcessorConfig, + options: AsyncProcessorRuntimeOptions = {}, +) { + if (config.pubsub !== undefined) { + return makeAsyncProcessor(config, options); + } + + const pubsub = yield* makeNatsBackendScoped(config.pubsubUrl ?? "nats://localhost:4222"); + return makeAsyncProcessor( + { + ...config, + pubsub, + }, + options, + ); +}); + export type AsyncProcessor< RunError = ProcessorLifecycleError, RunRequirements = never, diff --git a/ts/packages/base/src/processor/index.ts b/ts/packages/base/src/processor/index.ts index ed56e73a..73b690ca 100644 --- a/ts/packages/base/src/processor/index.ts +++ b/ts/packages/base/src/processor/index.ts @@ -1,6 +1,7 @@ export { AsyncProcessor, makeAsyncProcessor, + makeAsyncProcessorScoped, type ConfigHandler, type EffectConfigHandler, type AsyncProcessorRuntime, diff --git a/ts/packages/base/src/processor/program.ts b/ts/packages/base/src/processor/program.ts index b8668ed7..411069af 100644 --- a/ts/packages/base/src/processor/program.ts +++ b/ts/packages/base/src/processor/program.ts @@ -11,7 +11,7 @@ import { type ProcessorLifecycleError, type PubSubError, } from "../errors.js"; -import { makeNatsBackend } from "../backend/nats.js"; +import { makeNatsBackendScoped } from "../backend/nats.js"; import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { ConsumerFactory, @@ -119,18 +119,9 @@ export function makeProcessorProgram< manageProcessSignals: false, } as Config; - const pubsub = makePubSubService(makeNatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222")); + const backend = yield* makeNatsBackendScoped(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"); + const pubsub = makePubSubService(backend); const messagingConfig = yield* loadMessagingRuntimeConfig(); - yield* Effect.addFinalizer(() => - pubsub.close.pipe( - Effect.catch((error) => - Effect.logError("[PubSub] Failed to close processor backend", { - error: error.message, - operation: error.operation, - }), - ), - ), - ); const processorEffect = runProcessorScoped( runtimeConfig, options.make, @@ -202,18 +193,9 @@ export function makeFlowProcessorProgram< manageProcessSignals: false, } as Config; - const pubsub = makePubSubService(makeNatsBackend(runtimeConfig.pubsubUrl ?? "nats://localhost:4222")); + const backend = yield* makeNatsBackendScoped(runtimeConfig.pubsubUrl ?? "nats://localhost:4222"); + const pubsub = makePubSubService(backend); const messagingConfig = yield* loadMessagingRuntimeConfig(); - yield* Effect.addFinalizer(() => - pubsub.close.pipe( - Effect.catch((error) => - Effect.logError("[PubSub] Failed to close processor backend", { - error: error.message, - operation: error.operation, - }), - ), - ), - ); const configHandlers = options.configHandlers?.(runtimeConfig); const processorOptions = { diff --git a/ts/packages/cli/package.json b/ts/packages/cli/package.json index 20589468..d1a0266b 100644 --- a/ts/packages/cli/package.json +++ b/ts/packages/cli/package.json @@ -12,16 +12,10 @@ "test": "bunx --bun vitest run --passWithNoTests --exclude=dist/**" }, "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", "@trustgraph/base": "workspace:*", "@trustgraph/client": "workspace:*", + "effect": "4.0.0-beta.78", "ws": "^8.18.0" }, "devDependencies": { diff --git a/ts/packages/cli/src/commands/agent.ts b/ts/packages/cli/src/commands/agent.ts index 96acaf4e..7ef213c6 100644 --- a/ts/packages/cli/src/commands/agent.ts +++ b/ts/packages/cli/src/commands/agent.ts @@ -7,37 +7,77 @@ import { Effect } from "effect"; import * as Argument from "effect/unstable/cli/Argument"; import * as Command from "effect/unstable/cli/Command"; -import { cliCommandError, withSocket } from "./util.js"; +import { cliCommandError, withGatewayClient, type CliCommandError } from "./util.js"; + +function asRecord(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : {}; +} + +function stringProperty(source: unknown, key: string): string | undefined { + const value = asRecord(source)[key]; + return typeof value === "string" ? value : undefined; +} + +function booleanProperty(source: unknown, key: string): boolean | undefined { + const value = asRecord(source)[key]; + return typeof value === "boolean" ? value : undefined; +} + +function responseErrorMessage(source: unknown): string | undefined { + const error = asRecord(source).error; + if (typeof error === "string") return error; + return stringProperty(error, "message"); +} export const agentCommand = Command.make("agent", { question: Argument.string("question").pipe(Argument.withDescription("Question to ask")), }, ({ question }) => - withSocket((socket, opts) => + withGatewayClient((client, opts) => Effect.gen(function* () { - const flow = socket.flow(opts.flow); + let streamError: CliCommandError | undefined; - yield* Effect.callback>((resume) => { - flow.agent( + yield* client.runDispatchStream( + { + scope: "flow", + flow: opts.flow, + service: "agent", + request: { question, - (chunk) => { - // think — show thought process - if (chunk.length > 0) process.stderr.write(chunk); - }, - (chunk) => { - // observe — show observations - if (chunk.length > 0) process.stderr.write(chunk); - }, - (chunk, complete) => { - // answer — print to stdout - if (chunk.length > 0) process.stdout.write(chunk); - if (complete) { - process.stdout.write("\n"); - resume(Effect.void); - } - }, - (err) => resume(Effect.fail(cliCommandError("agent", err))), - ); - }); + user: opts.user, + collection: "default", + streaming: true, + }, + }, + (chunk) => { + const resp = asRecord(chunk.response); + const chunkType = stringProperty(resp, "chunk_type"); + const error = chunkType === "error" ? responseErrorMessage(resp) ?? "Unknown agent error" : responseErrorMessage(resp); + if (error !== undefined) { + streamError = cliCommandError("agent", error); + return true; + } + + const content = stringProperty(resp, "content") ?? ""; + const messageComplete = booleanProperty(resp, "end_of_message") === true; + const dialogComplete = chunk.complete === true || booleanProperty(resp, "end_of_dialog") === true; + + if (chunkType === "thought" || chunkType === "observation") { + if (content.length > 0) process.stderr.write(content); + } else if (chunkType === "answer" || chunkType === "final-answer") { + if (content.length > 0) process.stdout.write(content); + if (messageComplete || dialogComplete) process.stdout.write("\n"); + } + + return dialogComplete; + }, + { timeoutMs: 120_000, retries: 2 }, + ); + + if (streamError !== undefined) { + return yield* streamError; + } }), ), ).pipe(Command.withDescription("Ask the TrustGraph agent a question")); diff --git a/ts/packages/cli/src/commands/util.ts b/ts/packages/cli/src/commands/util.ts index 97c1b921..a492555e 100644 --- a/ts/packages/cli/src/commands/util.ts +++ b/ts/packages/cli/src/commands/util.ts @@ -2,7 +2,12 @@ * Shared CLI utilities. */ -import { createTrustGraphSocket, type BaseApi } from "@trustgraph/client"; +import { + createTrustGraphSocket, + makeTrustGraphGatewayClientScoped, + type BaseApi, + type TrustGraphGatewayClient, +} from "@trustgraph/client"; import { Duration, Effect } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; @@ -109,6 +114,23 @@ export function createSocketEffect(opts: CliOpts): Effect.Effect( + use: (client: TrustGraphGatewayClient, opts: CliOpts) => Effect.Effect, +) { + const opts = yield* getOpts; + return yield* Effect.scoped( + makeTrustGraphGatewayClientScoped({ url: gatewayUrlWithToken(opts) }).pipe( + Effect.flatMap((client) => use(client, opts)), + ), + ); +}); + export const withSocket = Effect.fn("withSocket")(function* ( use: (socket: BaseApi, opts: CliOpts) => Effect.Effect, ) { diff --git a/ts/packages/client/package.json b/ts/packages/client/package.json index 78631199..b42faa75 100644 --- a/ts/packages/client/package.json +++ b/ts/packages/client/package.json @@ -5,6 +5,16 @@ "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + }, + "./rpc/contract": { + "types": "./dist/rpc/contract.d.ts", + "import": "./dist/rpc/contract.js" + } + }, "scripts": { "build": "bunx --bun tsc", "dev": "tsc --watch", diff --git a/ts/packages/client/src/__tests__/flows-api.test.ts b/ts/packages/client/src/__tests__/flows-api.test.ts index 5321255f..5ca9edc9 100644 --- a/ts/packages/client/src/__tests__/flows-api.test.ts +++ b/ts/packages/client/src/__tests__/flows-api.test.ts @@ -13,7 +13,7 @@ describe("FlowsApi", () => { makeRequest: vi.fn(), }; // eslint-disable-next-line @typescript-eslint/no-explicit-any - flowsApi = new FlowsApi(mockApi as any); + flowsApi = FlowsApi(mockApi as any); }); describe("startFlow", () => { diff --git a/ts/packages/client/src/__tests__/workbench-contracts.test.ts b/ts/packages/client/src/__tests__/workbench-contracts.test.ts index d6975003..abf0ada4 100644 --- a/ts/packages/client/src/__tests__/workbench-contracts.test.ts +++ b/ts/packages/client/src/__tests__/workbench-contracts.test.ts @@ -23,7 +23,7 @@ describe("workbench API contracts", () => { values: [{ type: "prompt", key: "welcome", value: "hello" }], }); - const result = await new ConfigApi(base).getValues("prompt"); + const result = await ConfigApi(base).getValues("prompt"); expect(makeRequest).toHaveBeenCalledWith( "config", @@ -45,7 +45,7 @@ describe("workbench API contracts", () => { ], }); - const result = await new ConfigApi(base).getTokenCosts(); + const result = await ConfigApi(base).getTokenCosts(); expect(result).toEqual([ { model: "gpt-test", input_price: 0.1, output_price: 0.2 }, @@ -55,7 +55,7 @@ describe("workbench API contracts", () => { it("writes and deletes config using Python-style key/value arrays", async () => { const { base, makeRequest } = makeApi(); makeRequest.mockResolvedValue({}); - const config = new ConfigApi(base); + const config = ConfigApi(base); await config.putConfig([{ type: "tool", key: "search", value: "{}" }]); await config.deleteConfig({ type: "tool", key: "search" }); @@ -86,7 +86,7 @@ describe("workbench API contracts", () => { const { base, makeRequest } = makeApi(); const document = { id: "doc-1", title: "Document" }; const processing = { id: "proc-1", "document-id": "doc-1" }; - const librarian = new LibrarianApi(base); + const librarian = LibrarianApi(base); makeRequest .mockResolvedValueOnce({ "document-metadatas": [document] }) @@ -101,7 +101,7 @@ describe("workbench API contracts", () => { const document = { id: "doc-1", title: "Document" }; makeRequest.mockResolvedValue({ "document-metadata": document }); - const result = await new LibrarianApi(base).getDocumentMetadata("doc-1"); + const result = await LibrarianApi(base).getDocumentMetadata("doc-1"); expect(makeRequest).toHaveBeenCalledWith( "librarian", @@ -120,7 +120,7 @@ describe("workbench API contracts", () => { const { base, makeRequest } = makeApi(); makeRequest.mockResolvedValue({}); - await new LibrarianApi(base).loadDocument( + await LibrarianApi(base).loadDocument( "SGVsbG8=", "text/plain", "Hello", @@ -145,7 +145,7 @@ describe("workbench API contracts", () => { describe("KnowledgeApi", () => { it("lists and loads document embedding cores", async () => { const { base, makeRequest } = makeApi(); - const knowledge = new KnowledgeApi(base); + const knowledge = KnowledgeApi(base); makeRequest .mockResolvedValueOnce({ ids: ["de-core"] }) @@ -178,7 +178,7 @@ describe("workbench API contracts", () => { const { base, makeRequest } = makeApi(); makeRequest.mockResolvedValue({}); - await new KnowledgeApi(base).unloadKgCore("kg-core", "default"); + await KnowledgeApi(base).unloadKgCore("kg-core", "default"); expect(makeRequest).toHaveBeenCalledWith( "knowledge", diff --git a/ts/packages/client/src/index.ts b/ts/packages/client/src/index.ts index bb75c411..0a44f4b2 100644 --- a/ts/packages/client/src/index.ts +++ b/ts/packages/client/src/index.ts @@ -8,6 +8,7 @@ export * from "./models/namespaces.js"; // Export socket client export * from "./socket/trustgraph-socket.js"; +export * from "./socket/effect-rpc-client.js"; export * from "./rpc/contract.js"; // Export WebSocket adapter (isomorphic helpers and types) diff --git a/ts/packages/client/src/rpc/contract.ts b/ts/packages/client/src/rpc/contract.ts index 09a615d8..84166649 100644 --- a/ts/packages/client/src/rpc/contract.ts +++ b/ts/packages/client/src/rpc/contract.ts @@ -1,4 +1,5 @@ import { Schema as S } from "effect"; +import { HttpApi, HttpApiEndpoint, HttpApiGroup } from "effect/unstable/httpapi"; import * as Rpc from "effect/unstable/rpc/Rpc"; import * as RpcGroup from "effect/unstable/rpc/RpcGroup"; @@ -32,3 +33,14 @@ export class DispatchStream extends Rpc.make("DispatchStream", { }) {} export const TrustGraphRpcs = RpcGroup.make(Dispatch, DispatchStream); + +export class GatewayWorkbenchHttpApi extends HttpApi.make("trustgraph-gateway-workbench") + .add( + HttpApiGroup.make("workbench", { topLevel: true }).add( + HttpApiEndpoint.post("dispatch", "/api/v1/workbench/dispatch", { + payload: DispatchPayload, + success: S.Unknown, + }), + ), + ) +{} diff --git a/ts/packages/client/src/socket/effect-rpc-client.ts b/ts/packages/client/src/socket/effect-rpc-client.ts index 6876cfcd..a3669e86 100644 --- a/ts/packages/client/src/socket/effect-rpc-client.ts +++ b/ts/packages/client/src/socket/effect-rpc-client.ts @@ -1,4 +1,4 @@ -import { Cause, Context, Effect, Fiber, Layer, ManagedRuntime, Stream, SubscriptionRef } from "effect"; +import { Cause, Context, Effect, Exit, Fiber, Layer, Ref, Scope, Stream, SubscriptionRef } from "effect"; import type * as RpcGroup from "effect/unstable/rpc/RpcGroup"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import type { RpcClientError } from "effect/unstable/rpc/RpcClientError"; @@ -35,24 +35,44 @@ export interface DispatchOptions { readonly retries?: number; } +export interface TrustGraphGatewayClient { + readonly state: Effect.Effect; + readonly changes: Stream.Stream; + readonly subscribe: ( + listener: (state: RpcConnectionState) => void, + ) => Effect.Effect>; + readonly dispatch: ( + input: DispatchInput, + options?: DispatchOptions, + ) => Effect.Effect; + readonly dispatchStream: ( + input: DispatchInput, + options?: DispatchOptions, + ) => Stream.Stream; + readonly runDispatchStream: ( + input: DispatchInput, + receiver: (chunk: DispatchStreamChunk) => boolean, + options?: DispatchOptions, + ) => Effect.Effect; + readonly close: Effect.Effect; +} + +export class TrustGraphGatewayClientService extends Context.Service< + TrustGraphGatewayClientService, + TrustGraphGatewayClient +>()("@trustgraph/client/socket/effect-rpc-client/TrustGraphGatewayClientService") {} + +export interface TrustGraphGatewayClientOptions { + readonly url: string; + readonly onConnect?: () => void; + readonly onDisconnect?: () => void; + readonly stateRef?: SubscriptionRef.SubscriptionRef; + readonly closedRef?: Ref.Ref; +} + const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_REQUEST_ATTEMPTS = 3; -type NewableFactory = { - new (...args: Args): A; - (...args: Args): A; - readonly prototype: A; -}; - -function newableFactory( - factory: (...args: Args) => A, -): NewableFactory { - function Constructor(...args: Args): A { - return factory(...args); - } - return Constructor as unknown as NewableFactory; -} - export interface EffectRpcClient { readonly subscribe: (listener: (state: RpcConnectionState) => void) => () => void; readonly dispatch: ( @@ -67,131 +87,207 @@ export interface EffectRpcClient { readonly close: () => Promise; } +const makeClientLayer = ( + options: TrustGraphGatewayClientOptions, + stateRef: SubscriptionRef.SubscriptionRef, + closedRef: Ref.Ref, +): Layer.Layer => { + const setState = (nextState: RpcConnectionState) => + SubscriptionRef.set(stateRef, nextState); + + const socketLayer = Layer.effect( + Socket.Socket, + Socket.makeWebSocket(options.url, { + closeCodeIsError: (code) => code !== 1000, + openTimeout: "10 seconds", + }), + ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal)); + + const hooksLayer = Layer.succeed( + RpcClient.ConnectionHooks, + RpcClient.ConnectionHooks.of({ + onConnect: Effect.gen(function* () { + yield* setState({ status: "connected" }); + options.onConnect?.(); + }), + onDisconnect: Effect.gen(function* () { + const closed = yield* Ref.get(closedRef); + if (!closed) { + yield* setState({ + status: "connecting", + lastError: "Disconnected from gateway", + }); + } + options.onDisconnect?.(); + }), + }), + ); + + const protocolLayer = RpcClient.layerProtocolSocket({ + retryTransientErrors: true, + }).pipe( + Layer.provide(socketLayer), + Layer.provide(RpcSerialization.layerNdjson), + Layer.provide(hooksLayer), + ); + + return Layer.effect( + TrustGraphRpcClientService, + RpcClient.make(TrustGraphRpcs), + ).pipe(Layer.provide(protocolLayer)); +}; + +const makeSubscribeEffect = Effect.fn("makeSubscribeEffect")(function* ( + stateRef: SubscriptionRef.SubscriptionRef, + scope: Scope.Scope, + listener: (state: RpcConnectionState) => void, + ) { + let latest = SubscriptionRef.getUnsafe(stateRef); + listener(latest); + let replaySeen = false; + const fiber = yield* Effect.forkIn(SubscriptionRef.changes(stateRef).pipe( + Stream.runForEach((nextState) => + Effect.sync(() => { + if (!replaySeen) { + replaySeen = true; + if (nextState === latest) return; + } + latest = nextState; + listener(nextState); + }) + ), + ), scope); + return yield* Effect.succeed(Fiber.interrupt(fiber).pipe(Effect.asVoid)); +}); + +export const makeTrustGraphGatewayClientScoped: ( + options: TrustGraphGatewayClientOptions, +) => Effect.Effect = Effect.fn("makeTrustGraphGatewayClientScoped")(function* ( + options, +) { + const stateRef = options.stateRef ?? (yield* SubscriptionRef.make({ status: "connecting" })); + const closedRef = options.closedRef ?? (yield* Ref.make(false)); + const scope = yield* Scope.Scope; + const context = yield* Layer.buildWithScope(makeClientLayer(options, stateRef, closedRef), scope).pipe( + Effect.tapCause((cause) => + SubscriptionRef.set(stateRef, { + status: "failed", + lastError: Cause.pretty(cause), + }) + ), + ); + const client = Context.get(context, TrustGraphRpcClientService); + + const close = Effect.gen(function* () { + const wasClosed = yield* Ref.getAndSet(closedRef, true); + if (!wasClosed) { + yield* SubscriptionRef.set(stateRef, { status: "closed" }); + } + }); + + yield* Effect.addFinalizer(() => close); + + return { + state: SubscriptionRef.get(stateRef), + changes: SubscriptionRef.changes(stateRef), + subscribe: (listener) => makeSubscribeEffect(stateRef, scope, listener), + dispatch: (input, options = {}) => + withDispatchRequestPolicy(client.Dispatch(DispatchPayload.make(input)), options), + dispatchStream: (input, options = {}) => + Stream.unwrap( + withDispatchRequestPolicy( + Effect.succeed(client.DispatchStream(DispatchPayload.make(input))), + options, + ), + ), + runDispatchStream: (input, receiver, options = {}) => { + let last: DispatchStreamChunk | undefined; + return withDispatchRequestPolicy( + client.DispatchStream(DispatchPayload.make(input)).pipe( + Stream.runForEachWhile((chunk) => + Effect.suspend(() => { + last = chunk; + return Effect.succeed(!receiver(chunk)); + }), + ), + Effect.andThen(() => Effect.succeed(last)), + ), + options, + ); + }, + close, + } satisfies TrustGraphGatewayClient; +}); + +export const makeTrustGraphGatewayClientLayer = ( + options: TrustGraphGatewayClientOptions, +): Layer.Layer => + Layer.effect( + TrustGraphGatewayClientService, + makeTrustGraphGatewayClientScoped(options).pipe( + Effect.map(TrustGraphGatewayClientService.of), + ), + ); + export function makeEffectRpcClient( url: string, onConnect?: () => void, onDisconnect?: () => void, ): EffectRpcClient { const stateRef = Effect.runSync(SubscriptionRef.make({ status: "connecting" })); - let closed = false; - - const setState = (nextState: RpcConnectionState) => - SubscriptionRef.set(stateRef, nextState); - - const makeClientLayer = (): Layer.Layer => { - const socketLayer = Layer.effect( - Socket.Socket, - Socket.makeWebSocket(url, { - closeCodeIsError: (code) => code !== 1000, - openTimeout: "10 seconds", - }), - ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal)); - - const hooksLayer = Layer.succeed( - RpcClient.ConnectionHooks, - RpcClient.ConnectionHooks.of({ - onConnect: Effect.gen(function* () { - yield* setState({ status: "connected" }); - onConnect?.(); - }), - onDisconnect: Effect.gen(function* () { - if (!closed) { - yield* setState({ - status: "connecting", - lastError: "Disconnected from gateway", - }); - } - onDisconnect?.(); - }), - }), - ); - - const protocolLayer = RpcClient.layerProtocolSocket({ - retryTransientErrors: true, - }).pipe( - Layer.provide(socketLayer), - Layer.provide(RpcSerialization.layerNdjson), - Layer.provide(hooksLayer), - ); - - const clientLayer = Layer.effect( - TrustGraphRpcClientService, - RpcClient.make(TrustGraphRpcs), - ).pipe(Layer.provide(protocolLayer)); - - return clientLayer; + const closedRef = Effect.runSync(Ref.make(false)); + const scope = Effect.runSync(Scope.make()); + const options: TrustGraphGatewayClientOptions = { + url, + stateRef, + closedRef, + ...(onConnect === undefined ? {} : { onConnect }), + ...(onDisconnect === undefined ? {} : { onDisconnect }), }; - - const runtime = ManagedRuntime.make(makeClientLayer()); - const clientPromise = runtime.runPromise( - TrustGraphRpcClientService.pipe( - Effect.tapCause((cause) => - setState({ - status: "failed", - lastError: Cause.pretty(cause), - }) - ), - ), + const clientPromise = Effect.runPromise( + makeTrustGraphGatewayClientScoped(options).pipe(Scope.provide(scope)), ); return { subscribe: (listener) => { - let latest = SubscriptionRef.getUnsafe(stateRef); - listener(latest); - let replaySeen = false; - const fiber = Effect.runFork( - SubscriptionRef.changes(stateRef).pipe( - Stream.runForEach((nextState) => - Effect.sync(() => { - if (!replaySeen) { - replaySeen = true; - if (nextState === latest) return; - } - latest = nextState; - listener(nextState); - }) - ), - ), + let unsubscribe: Effect.Effect | undefined; + let cancelled = false; + listener(SubscriptionRef.getUnsafe(stateRef)); + void clientPromise.then((client) => + Effect.runPromise(client.subscribe(listener)).then((release) => { + if (cancelled) { + return Effect.runPromise(release); + } + unsubscribe = release; + }) ); + return () => { - Effect.runFork(Fiber.interrupt(fiber)); + cancelled = true; + if (unsubscribe !== undefined) { + Effect.runFork(unsubscribe); + } }; }, dispatch: (input, options = {}) => clientPromise.then((client) => - runtime.runPromise( - withDispatchRequestPolicy(client.Dispatch(DispatchPayload.make(input)), options), - ) + Effect.runPromise(client.dispatch(input, options)) ), - dispatchStream: (input, receiver, options = {}) => { - let last: DispatchStreamChunk | undefined; - return clientPromise.then((client) => - runtime.runPromise( - withDispatchRequestPolicy( - client.DispatchStream(DispatchPayload.make(input)).pipe( - Stream.runForEachWhile((chunk) => - Effect.suspend(() => { - last = chunk; - return Effect.succeed(!receiver(chunk)); - }), - ), - ), - options, + dispatchStream: (input, receiver, options = {}) => + clientPromise.then((client) => + Effect.runPromise(client.runDispatchStream(input, receiver, options)) + ), + close: () => + clientPromise.then((client) => + Effect.runPromise( + client.close.pipe( + Effect.andThen(Scope.close(scope, Exit.void)), ), ) - ).then(() => last); - }, - close: () => { - if (closed) return Promise.resolve(); - closed = true; - Effect.runSync(setState({ status: "closed" })); - return runtime.dispose(); - }, + ), }; } -export const EffectRpcClient = newableFactory(makeEffectRpcClient); - export function withDispatchRequestPolicy( effect: Effect.Effect, options: DispatchOptions, diff --git a/ts/packages/client/src/socket/trustgraph-socket.ts b/ts/packages/client/src/socket/trustgraph-socket.ts index dc830621..7d66c2b1 100644 --- a/ts/packages/client/src/socket/trustgraph-socket.ts +++ b/ts/packages/client/src/socket/trustgraph-socket.ts @@ -1,7 +1,7 @@ // Import core types and classes for the TrustGraph API import type { Term, Triple } from "../models/Triple.js"; import { - EffectRpcClient, + type EffectRpcClient, type DispatchInput, type DispatchOptions, type RpcConnectionState, @@ -445,21 +445,6 @@ function makeid(length: number) { ); } -type NewableFactory = { - new (...args: Args): A; - (...args: Args): A; - readonly prototype: A; -}; - -function newableFactory( - factory: (...args: Args) => A, -): NewableFactory { - function Constructor(...args: Args): A { - return factory(...args); - } - return Constructor as unknown as NewableFactory; -} - /** * BaseApi - Core WebSocket client for TrustGraph API * Manages connection lifecycle, message routing, and provides base request @@ -622,27 +607,27 @@ export function makeBaseApi( // Factory methods for creating specialized API instances librarian() { - return new LibrarianApi(api); + return makeLibrarianApi(api); }, flows() { - return new FlowsApi(api); + return makeFlowsApi(api); }, flow(id: string) { - return new FlowApi(api, id); + return makeFlowApi(api, id); }, knowledge() { - return new KnowledgeApi(api); + return makeKnowledgeApi(api); }, config() { - return new ConfigApi(api); + return makeConfigApi(api); }, collectionManagement() { - return new CollectionManagementApi(api); + return makeCollectionManagementApi(api); }, }; @@ -739,7 +724,7 @@ export function makeBaseApi( } export type BaseApi = ReturnType; -export const BaseApi = newableFactory(makeBaseApi); +export const BaseApi = makeBaseApi; export function makeBaseApiWithRpc( user: string, @@ -1153,7 +1138,7 @@ export function makeLibrarianApi(api: BaseApi) { } export type LibrarianApi = ReturnType; -export const LibrarianApi = newableFactory(makeLibrarianApi); +export const LibrarianApi = makeLibrarianApi; /** * FlowsApi - Manages processing flows and configuration @@ -1418,7 +1403,7 @@ export function makeFlowsApi(api: BaseApi) { } export type FlowsApi = ReturnType; -export const FlowsApi = newableFactory(makeFlowsApi); +export const FlowsApi = makeFlowsApi; /** * FlowApi - Interface for interacting with a specific flow instance @@ -2205,7 +2190,7 @@ export function makeFlowApi(api: BaseApi, flowId: string) { } export type FlowApi = ReturnType; -export const FlowApi = newableFactory(makeFlowApi); +export const FlowApi = makeFlowApi; /** * ConfigApi - Dedicated configuration management interface @@ -2401,7 +2386,7 @@ export function makeConfigApi(api: BaseApi) { } export type ConfigApi = ReturnType; -export const ConfigApi = newableFactory(makeConfigApi); +export const ConfigApi = makeConfigApi; /** * KnowledgeApi - Manages knowledge graph cores and data @@ -2568,7 +2553,7 @@ export function makeKnowledgeApi(api: BaseApi) { } export type KnowledgeApi = ReturnType; -export const KnowledgeApi = newableFactory(makeKnowledgeApi); +export const KnowledgeApi = makeKnowledgeApi; /** * CollectionManagementApi - Manages collections for organizing documents @@ -2677,7 +2662,7 @@ export function makeCollectionManagementApi(api: BaseApi) { } export type CollectionManagementApi = ReturnType; -export const CollectionManagementApi = newableFactory(makeCollectionManagementApi); +export const CollectionManagementApi = makeCollectionManagementApi; /** * Factory function to create a new TrustGraph WebSocket connection @@ -2690,4 +2675,4 @@ export const createTrustGraphSocket = ( user: string, token?: string, socketUrl?: string, -): BaseApi => new BaseApi(user, token, socketUrl); +): BaseApi => makeBaseApi(user, token, socketUrl); diff --git a/ts/packages/flow/package.json b/ts/packages/flow/package.json index 0e7c6501..d155b42c 100644 --- a/ts/packages/flow/package.json +++ b/ts/packages/flow/package.json @@ -12,21 +12,13 @@ }, "dependencies": { "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78", "@mistralai/mistralai": "^1.0.0", "@modelcontextprotocol/sdk": "^1.12.0", "@qdrant/js-client-rest": "^1.13.0", "@trustgraph/base": "workspace:*", + "@trustgraph/client": "workspace:*", "effect": "4.0.0-beta.78", "falkordb": "^5.0.0", "ollama": "^0.6.3", diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 0d1dc934..428f09c5 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -12,6 +12,7 @@ import { Clock, Effect, Exit, HashMap, HashSet, Option, Random, Scope, Synchroni import { loadMessagingRuntimeConfig, makeNatsBackend, + makeNatsBackendScoped, makePubSubService, makeRequestResponseFactoryService, messagingDeliveryError, @@ -401,3 +402,17 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager publishToTopic, }; } + +export const makeDispatcherManagerScoped = Effect.fn("makeDispatcherManagerScoped")(function* ( + config: GatewayConfig, +) { + if (config.pubsub !== undefined) { + return makeDispatcherManager(config); + } + + const pubsub = yield* makeNatsBackendScoped(config.natsUrl ?? "nats://localhost:4222"); + return makeDispatcherManager({ + ...config, + pubsub, + }); +}); diff --git a/ts/packages/flow/src/gateway/rpc-contract.ts b/ts/packages/flow/src/gateway/rpc-contract.ts index 09a615d8..5f485dce 100644 --- a/ts/packages/flow/src/gateway/rpc-contract.ts +++ b/ts/packages/flow/src/gateway/rpc-contract.ts @@ -1,34 +1 @@ -import { Schema as S } from "effect"; -import * as Rpc from "effect/unstable/rpc/Rpc"; -import * as RpcGroup from "effect/unstable/rpc/RpcGroup"; - -export class DispatchPayload extends S.Class("DispatchPayload")({ - scope: S.Literals(["global", "flow"]), - service: S.String, - flow: S.optionalKey(S.String), - request: S.Record(S.String, S.Unknown), -}) {} - -export class DispatchStreamChunk extends S.Class("DispatchStreamChunk")({ - response: S.Unknown, - complete: S.Boolean, -}) {} - -export class DispatchError extends S.TaggedErrorClass()("DispatchError", { - message: S.String, -}) {} - -export class Dispatch extends Rpc.make("Dispatch", { - payload: DispatchPayload, - success: S.Unknown, - error: DispatchError, -}) {} - -export class DispatchStream extends Rpc.make("DispatchStream", { - payload: DispatchPayload, - success: DispatchStreamChunk, - error: DispatchError, - stream: true, -}) {} - -export const TrustGraphRpcs = RpcGroup.make(Dispatch, DispatchStream); +export * from "@trustgraph/client/rpc/contract"; diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts index a382b2c3..ecc1339d 100644 --- a/ts/packages/flow/src/gateway/server.ts +++ b/ts/packages/flow/src/gateway/server.ts @@ -10,6 +10,7 @@ import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"; import { Clock, Config, Effect, Exit, Layer, Random, Scope } from "effect"; import * as O from "effect/Option"; import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"; +import { HttpApiBuilder } from "effect/unstable/httpapi"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import { formatPrometheusMetrics, @@ -19,7 +20,8 @@ import { toTgError, type PubSubBackend, } from "@trustgraph/base"; -import { makeDispatcherManager, type DispatcherManager } from "./dispatch/manager.js"; +import { GatewayWorkbenchHttpApi } from "@trustgraph/client/rpc/contract"; +import { makeDispatcherManagerScoped, type DispatcherManager } from "./dispatch/manager.js"; import { makeGatewayRpcServer, type GatewayRpcServer } from "./rpc-server.js"; export interface GatewayConfig { @@ -134,6 +136,22 @@ const workbenchDispatch = ( ), ); +const gatewayWorkbenchHttpApiRoutes = ( + config: GatewayConfig, + dispatcher: DispatcherManager, +) => { + const handlers = HttpApiBuilder.group( + GatewayWorkbenchHttpApi, + "workbench", + (handlers) => + handlers.handleRaw("dispatch", () => workbenchDispatch(config, dispatcher)), + ); + + return HttpApiBuilder.layer(GatewayWorkbenchHttpApi).pipe( + Layer.provide(handlers), + ); +}; + const globalDispatch = ( config: GatewayConfig, dispatcher: DispatcherManager, @@ -240,7 +258,7 @@ export const makeGatewayRoutes = ( rpcScope: Scope.Scope, ) => Layer.mergeAll( - HttpRouter.add("POST", "/api/v1/workbench/dispatch", workbenchDispatch(config, dispatcher)), + gatewayWorkbenchHttpApiRoutes(config, dispatcher), HttpRouter.add("POST", "/api/v1/:kind", globalDispatch(config, dispatcher)), HttpRouter.add("POST", "/api/v1/flow/:flow/service/:kind", flowDispatch(config, dispatcher)), HttpRouter.add("POST", "/api/v1/flow/:flow/load", flowLoad(config, dispatcher)), @@ -251,7 +269,7 @@ export const makeGatewayRoutes = ( export function createGateway(config: GatewayConfig) { return Layer.effectDiscard( Effect.scoped(Effect.gen(function* () { - const dispatcher = makeDispatcherManager(config); + const dispatcher = yield* makeDispatcherManagerScoped(config); yield* dispatcher.start.pipe( Effect.mapError((cause) => messagingLifecycleError("gateway", "dispatcher-start", cause)), ); diff --git a/ts/packages/flow/tsconfig.json b/ts/packages/flow/tsconfig.json index 04e240eb..88a93e26 100644 --- a/ts/packages/flow/tsconfig.json +++ b/ts/packages/flow/tsconfig.json @@ -9,6 +9,7 @@ "include": ["src"], "exclude": ["src/**/*.test.ts", "src/**/*.spec.ts"], "references": [ - { "path": "../base" } + { "path": "../base" }, + { "path": "../client" } ] } diff --git a/ts/packages/mcp/package.json b/ts/packages/mcp/package.json index e503301b..182ff059 100644 --- a/ts/packages/mcp/package.json +++ b/ts/packages/mcp/package.json @@ -13,18 +13,9 @@ "dependencies": { "@trustgraph/base": "workspace:*", "@trustgraph/client": "workspace:*", - "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", - "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", - "@effect/platform-browser": "4.0.0-beta.78", "@effect/platform-bun": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78" + "@effect/platform-node": "4.0.0-beta.78", + "effect": "4.0.0-beta.78" }, "devDependencies": { "@effect/vitest": "4.0.0-beta.78", diff --git a/ts/packages/mcp/src/__tests__/server-effect.test.ts b/ts/packages/mcp/src/__tests__/server-effect.test.ts index bff415c4..ce6da511 100644 --- a/ts/packages/mcp/src/__tests__/server-effect.test.ts +++ b/ts/packages/mcp/src/__tests__/server-effect.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "@effect/vitest"; -import type { BaseApi } from "@trustgraph/client"; -import { Effect, Layer } from "effect"; +import { DispatchStreamChunk, type BaseApi, type TrustGraphGatewayClient } from "@trustgraph/client"; +import { Effect, Layer, Stream } from "effect"; import * as S from "effect/Schema"; import { McpServer } from "effect/unstable/ai"; import * as McpSchema from "effect/unstable/ai/McpSchema"; @@ -13,6 +13,7 @@ import { TrustGraphMcpConfig, TrustGraphMcpToolkit, TrustGraphMcpToolkitLive, + TrustGraphGateway, TrustGraphSocket, } from "../server-effect.js"; @@ -124,6 +125,29 @@ const makeFakeSocket = ( return { socket, calls }; }; +const makeFakeGateway = (): TrustGraphGatewayClient => ({ + state: Effect.succeed({ status: "connected" }), + changes: Stream.empty, + subscribe: () => Effect.succeed(Effect.void), + dispatch: () => Effect.succeed({}), + dispatchStream: () => Stream.empty, + runDispatchStream: (_input, receiver) => + Effect.sync(() => { + const chunk = DispatchStreamChunk.make({ + response: { + chunk_type: "answer", + content: "agent answer", + end_of_message: true, + end_of_dialog: true, + }, + complete: true, + }); + receiver(chunk); + return chunk; + }), + close: Effect.void, +}); + const testConfig = TrustGraphMcpConfig.of({ gatewayUrl: "ws://localhost:8088/api/v1/rpc", user: "mcp-test", @@ -147,8 +171,10 @@ const makeNativeTestClientEffect = Effect.fn("makeNativeTestClient")(function*( textCompletion: options.textCompletion, graphRag: options.graphRag, }); + const gateway = makeFakeGateway(); const serverLayer = McpServer.toolkit(TrustGraphMcpToolkit).pipe( Layer.provide(TrustGraphMcpToolkitLive), + Layer.provide(Layer.succeed(TrustGraphGateway, TrustGraphGateway.of(gateway))), Layer.provide(Layer.succeed(TrustGraphSocket, TrustGraphSocket.of(socket))), Layer.provide(Layer.succeed(TrustGraphMcpConfig, testConfig)), Layer.provide(McpServer.layerHttp({ diff --git a/ts/packages/mcp/src/server-effect.ts b/ts/packages/mcp/src/server-effect.ts index 91ba9904..360eb9c3 100644 --- a/ts/packages/mcp/src/server-effect.ts +++ b/ts/packages/mcp/src/server-effect.ts @@ -1,6 +1,12 @@ import {BunHttpServer, BunRuntime} from "@effect/platform-bun"; import {NodeRuntime, NodeStdio} from "@effect/platform-node"; -import {createTrustGraphSocket, type BaseApi, type Term as ClientTerm} from "@trustgraph/client"; +import { + createTrustGraphSocket, + makeTrustGraphGatewayClientScoped, + type BaseApi, + type Term as ClientTerm, + type TrustGraphGatewayClient, +} from "@trustgraph/client"; import {Config, Context, Effect, Layer} from "effect"; import * as O from "effect/Option"; import * as Predicate from "effect/Predicate"; @@ -1223,6 +1229,12 @@ export interface TrustGraphMcpConfigShape { const readNonEmpty = (value: string | undefined): string | undefined => value !== undefined && value.length > 0 ? value : undefined +const gatewayUrlWithToken = (config: TrustGraphMcpConfigShape): string => { + if (config.token === undefined || config.token.length === 0) return config.gatewayUrl + const separator = config.gatewayUrl.includes("?") ? "&" : "?" + return `${config.gatewayUrl}${separator}token=${encodeURIComponent(config.token)}` +} + const parsePort = (raw: string | undefined): number => { if (raw === undefined) { return 3000 @@ -1283,6 +1295,19 @@ export class TrustGraphSocket extends Context.Service ) } +export class TrustGraphGateway extends Context.Service()( + "@trustgraph/mcp/server-effect/TrustGraphGateway", +) { + static readonly layer = Layer.effect( + TrustGraphGateway, + Effect.gen(function*() { + const config = yield* TrustGraphMcpConfig + const client = yield* makeTrustGraphGatewayClientScoped({url: gatewayUrlWithToken(config)}) + return TrustGraphGateway.of(client) + }), + ) +} + const toErrorMessage = (cause: unknown): string => { if (Predicate.isError(cause) && cause.message.length > 0) { return cause.message @@ -1296,6 +1321,25 @@ const toErrorMessage = (cause: unknown): string => { return "TrustGraph MCP tool failed" } +const asRecord = (value: unknown): Record => + Predicate.isObject(value) && !Array.isArray(value) ? value as Record : {} + +const stringProperty = (source: unknown, key: string): string | undefined => { + const value = asRecord(source)[key] + return Predicate.isString(value) ? value : undefined +} + +const booleanProperty = (source: unknown, key: string): boolean | undefined => { + const value = asRecord(source)[key] + return typeof value === "boolean" ? value : undefined +} + +const responseErrorMessage = (source: unknown): string | undefined => { + const error = asRecord(source).error + if (Predicate.isString(error)) return error + return stringProperty(error, "message") +} + const decodeJson = S.decodeUnknownEffect(S.Json) const decodeJsonArray = S.decodeUnknownEffect(S.Array(S.Json)) @@ -1316,10 +1360,59 @@ const decodeJsonArrayOrFail = ( const asIriTerm = (value: string | undefined): ClientTerm | undefined => value !== undefined && value.length > 0 ? {t: "i", i: value} : undefined +const runAgentTool = Effect.fn("TrustGraphMcpToolkit.agent")(function*( + gateway: TrustGraphGatewayClient, + config: TrustGraphMcpConfigShape, + question: string, +) { + let fullAnswer = "" + let streamError: AgentError | undefined + + yield* gateway.runDispatchStream( + { + scope: "flow", + flow: config.flowId, + service: "agent", + request: { + question, + user: config.user, + collection: "default", + streaming: true, + }, + }, + (chunk) => { + const resp = asRecord(chunk.response) + const chunkType = stringProperty(resp, "chunk_type") + const error = chunkType === "error" + ? responseErrorMessage(resp) ?? "Unknown agent error" + : responseErrorMessage(resp) + if (error !== undefined) { + streamError = AgentError.make({message: error}) + return true + } + + if (chunkType === "answer" || chunkType === "final-answer") { + fullAnswer += stringProperty(resp, "content") ?? "" + } + + return chunk.complete === true || booleanProperty(resp, "end_of_dialog") === true + }, + {timeoutMs: 120_000, retries: 2}, + ).pipe( + Effect.mapError((cause) => AgentError.make({message: toErrorMessage(cause)})), + ) + + if (streamError !== undefined) { + return yield* streamError + } + return AgentSuccess.make({text: fullAnswer}) +}) + export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( Effect.gen(function*() { const config = yield* TrustGraphMcpConfig const socket = yield* TrustGraphSocket + const gateway = yield* TrustGraphGateway return TrustGraphMcpToolkit.of({ text_completion: ({system, prompt}) => @@ -1354,22 +1447,7 @@ export const TrustGraphMcpToolkitLive = TrustGraphMcpToolkit.toLayer( Effect.map((text) => DocumentRagSuccess.make({text})), ), - agent: ({question}) => - Effect.callback((resume) => { - let fullAnswer = "" - socket.flow(config.flowId).agent( - question, - () => {}, - () => {}, - (chunk, complete) => { - fullAnswer += chunk - if (complete) { - resume(Effect.succeed(AgentSuccess.make({text: fullAnswer}))) - } - }, - (cause) => resume(Effect.fail(AgentError.make({message: toErrorMessage(cause)}))), - ) - }), + agent: ({question}) => runAgentTool(gateway, config, question), embeddings: ({text}) => Effect.tryPromise({ @@ -1694,6 +1772,7 @@ const makeTrustGraphMcpHttpLayerFromConfig = ( version: config.version, path: config.mcpPath, })), + Layer.provide(TrustGraphGateway.layer), Layer.provide(TrustGraphSocket.layer), Layer.provide(Layer.succeed(TrustGraphMcpConfig, TrustGraphMcpConfig.of(config))), ) @@ -1713,6 +1792,7 @@ const makeTrustGraphMcpStdioLayerFromConfig = ( version: config.version, })), Layer.provide(NodeStdio.layer), + Layer.provide(TrustGraphGateway.layer), Layer.provide(TrustGraphSocket.layer), Layer.provide(Layer.succeed(TrustGraphMcpConfig, TrustGraphMcpConfig.of(config))), ) diff --git a/ts/packages/workbench/package.json b/ts/packages/workbench/package.json index af87d62e..9c55696a 100644 --- a/ts/packages/workbench/package.json +++ b/ts/packages/workbench/package.json @@ -10,21 +10,11 @@ "qa:browser": "playwright test" }, "dependencies": { - "@effect/ai-anthropic": "4.0.0-beta.78", - "@effect/ai-openai": "4.0.0-beta.78", - "@effect/ai-openrouter": "4.0.0-beta.78", "@effect/atom-react": "4.0.0-beta.78", - "@effect/openapi-generator": "4.0.0-beta.78", - "@effect/opentelemetry": "4.0.0-beta.78", "@effect/platform-browser": "4.0.0-beta.78", - "@effect/platform-bun": "4.0.0-beta.78", - "@effect/platform-node": "4.0.0-beta.78", - "@effect/platform-node-shared": "4.0.0-beta.78", - "@effect/tsgo": "0.14.0", - "@effect/vitest": "4.0.0-beta.78", - "@tanstack/react-query": "^5.75.0", "@trustgraph/client": "workspace:*", "clsx": "^2.1.0", + "effect": "4.0.0-beta.78", "lucide-react": "^0.513.0", "react": "^19.1.0", "react-dom": "^19.1.0", @@ -32,8 +22,7 @@ "react-force-graph-2d": "^1.29.1", "react-markdown": "^10.1.0", "react-router": "^7.6.0", - "tailwind-merge": "^3.3.0", - "zustand": "^5.0.0" + "tailwind-merge": "^3.3.0" }, "devDependencies": { "@effect/vitest": "4.0.0-beta.78", diff --git a/ts/packages/workbench/src/atoms/workbench.ts b/ts/packages/workbench/src/atoms/workbench.ts index c25f1edc..a5fecec4 100644 --- a/ts/packages/workbench/src/atoms/workbench.ts +++ b/ts/packages/workbench/src/atoms/workbench.ts @@ -1,14 +1,38 @@ import { Clipboard as BrowserClipboard } from "@effect/platform-browser"; import * as BrowserHttpClient from "@effect/platform-browser/BrowserHttpClient"; import * as BrowserKeyValueStore from "@effect/platform-browser/BrowserKeyValueStore"; -import { BaseApi, type ConnectionState, type DocumentMetadata, type ExplainEvent, type StreamingMetadata, type Term, type Triple } from "@trustgraph/client"; -import { Cause, Clock, Context, Effect, Layer, Match, Metric, Option, Random, Schema as S } from "effect"; +import { + DispatchPayload, + GatewayWorkbenchHttpApi, + type GraphRagOptions, + makeBaseApi, + TrustGraphRpcs, + type BaseApi, + type BeginUploadResponse, + type ChunkedUploadDocumentMetadata, + type CompleteUploadResponse, + type ConnectionState, + type DocumentMetadata, + type ExplainEvent, + type StreamingMetadata, + type Term, + type Triple, + type UploadChunkResponse, +} from "@trustgraph/client"; +import { Cause, Clock, Context, Effect, Layer, Match, Metric, Option, Random, Schema as S, Scope, Stream } from "effect"; import * as MutableHashMap from "effect/MutableHashMap"; import * as Predicate from "effect/Predicate"; +import { HttpClient, HttpClientRequest } from "effect/unstable/http"; import * as Otlp from "effect/unstable/observability/Otlp"; +import * as RpcClient from "effect/unstable/rpc/RpcClient"; +import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as AsyncResult from "effect/unstable/reactivity/AsyncResult"; import * as Atom from "effect/unstable/reactivity/Atom"; +import * as AtomRegistry from "effect/unstable/reactivity/AtomRegistry"; +import * as AtomHttpApi from "effect/unstable/reactivity/AtomHttpApi"; +import * as AtomRpc from "effect/unstable/reactivity/AtomRpc"; import * as Reactivity from "effect/unstable/reactivity/Reactivity"; +import * as Socket from "effect/unstable/socket/Socket"; // --------------------------------------------------------------------------- // Browser runtime, telemetry, and generic workbench HTTP API marker @@ -103,12 +127,9 @@ export const workbenchRuntimeFactory = Atom.context({ memoMap: Layer.makeMemoMapUnsafe(), }); -export const workbenchRuntime = workbenchRuntimeFactory( - Layer.mergeAll( - Reactivity.layer, - workbenchBaseLayer, - ), -); +workbenchRuntimeFactory.addGlobalLayer(workbenchBaseLayer); + +export const workbenchRuntime = workbenchRuntimeFactory(workbenchBaseLayer); const queryCounter = Metric.counter("trustgraph_workbench_query_total", { description: "Workbench atom-backed query attempts", @@ -523,6 +544,27 @@ function mapConfigEntries(raw: unknown): Array<{ key: string; value: string }> { : []; } +function configValuesFromResponse(raw: unknown): Array<{ key: string; value: unknown; type?: string; workspace?: string }> { + const values = jsonRecordProperty(raw, "values"); + if (!Array.isArray(values)) return []; + return values.flatMap((value) => { + if (!Predicate.isObject(value) || !Predicate.hasProperty(value, "key") || typeof value.key !== "string") { + return []; + } + const entry: { key: string; value: unknown; type?: string; workspace?: string } = { + key: value.key, + value: Predicate.hasProperty(value, "value") ? value.value : undefined, + }; + if (Predicate.hasProperty(value, "type") && typeof value.type === "string") entry.type = value.type; + if (Predicate.hasProperty(value, "workspace") && typeof value.workspace === "string") entry.workspace = value.workspace; + return [entry]; + }); +} + +function parseConfigValue(value: unknown): unknown { + return typeof value === "string" ? parseJsonUnknown(value) ?? value : value; +} + function parseConfigEntries(raw: unknown, label: string): T[] { const entries: T[] = []; for (const item of mapConfigEntries(raw)) { @@ -542,6 +584,152 @@ function withDefaultCollection(collection: string): string { type WorkbenchReactivityKeys = ReadonlyArray | Record>; type WorkbenchRuntimeRequirements = BrowserClipboard.Clipboard | WorkbenchFiles | Reactivity.Reactivity; +type WorkbenchHttpAtomRequirements = + | AtomRegistry.AtomRegistry + | Reactivity.Reactivity + | Scope.Scope + | WorkbenchGatewayHttp; +type WorkbenchGatewayAtomRequirements = + | AtomRegistry.AtomRegistry + | Reactivity.Reactivity + | Scope.Scope + | WorkbenchGatewayHttp + | WorkbenchGatewayRpc; +type WorkbenchDispatchInput = DispatchPayload; +type JsonRecord = Record; + +const StreamingEnvelopeSchema = S.Struct({ + response: S.optionalKey(S.Unknown), + complete: S.optionalKey(S.Boolean), + error: S.optionalKey(S.String), +}); +type StreamingEnvelope = typeof StreamingEnvelopeSchema.Type; + +const ClientTripleSchema: S.Codec = S.suspend(() => + S.Struct({ + s: ClientTermSchema, + p: ClientTermSchema, + o: ClientTermSchema, + g: S.optionalKey(S.String), + }) +); + +const ClientTermSchema: S.Codec = S.suspend(() => + S.Union([ + S.Struct({ + t: S.Literal("i"), + i: S.String, + }), + S.Struct({ + t: S.Literal("b"), + d: S.String, + }), + S.Struct({ + t: S.Literal("l"), + v: S.String, + dt: S.optionalKey(S.String), + ln: S.optionalKey(S.String), + }), + S.Struct({ + t: S.Literal("t"), + tr: S.optionalKey(ClientTripleSchema), + }), + ]) +); + +const decodeStreamingEnvelope = S.decodeUnknownOption(StreamingEnvelopeSchema); +const decodeClientTriples = S.decodeUnknownOption(S.Array(ClientTripleSchema).pipe(S.mutable)); + +function gatewayHttpBaseUrl(settings: Settings): string { + const raw = settings.gatewayUrl.trim(); + if (raw.length === 0 || raw === "/api/v1/rpc") return ""; + if (raw.startsWith("/")) { + return raw.replace(/\/api\/v1\/rpc$/, "").replace(/\/api\/v1$/, ""); + } + const normalized = raw.startsWith("ws://") + ? `http://${raw.slice("ws://".length)}` + : raw.startsWith("wss://") + ? `https://${raw.slice("wss://".length)}` + : raw; + const parsed = new URL(normalized); + return parsed.origin; +} + +function gatewayRpcUrl(settings: Settings): string { + const raw = settings.gatewayUrl.trim().length > 0 ? settings.gatewayUrl.trim() : "/api/v1/rpc"; + const normalized = raw.startsWith("http://") + ? `ws://${raw.slice("http://".length)}` + : raw.startsWith("https://") + ? `wss://${raw.slice("https://".length)}` + : raw; + if (settings.apiKey.length === 0) return normalized; + const separator = normalized.includes("?") ? "&" : "?"; + return `${normalized}${separator}token=${encodeURIComponent(settings.apiKey)}`; +} + +const gatewayHttpClientLayer = (get: Atom.AtomContext) => { + const settings = get(settingsAtom); + const baseUrl = gatewayHttpBaseUrl(settings); + const token = settings.apiKey.length > 0 ? settings.apiKey : undefined; + + return Layer.effect( + HttpClient.HttpClient, + HttpClient.HttpClient.pipe( + Effect.map((client) => + HttpClient.mapRequest(client, (request) => { + const withBaseUrl = baseUrl.length > 0 + ? HttpClientRequest.prependUrl(request, baseUrl) + : request; + return token === undefined + ? withBaseUrl + : HttpClientRequest.bearerToken(withBaseUrl, token); + }) + ), + ), + ).pipe(Layer.provide(BrowserHttpClient.layerFetch)); +}; + +const gatewayRpcProtocolLayer = (get: Atom.AtomContext) => { + const socketLayer = Layer.effect( + Socket.Socket, + Socket.makeWebSocket(gatewayRpcUrl(get(settingsAtom)), { + closeCodeIsError: (code) => code !== 1000, + openTimeout: "10 seconds", + }), + ).pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal)); + + return RpcClient.layerProtocolSocket({ + retryTransientErrors: true, + }).pipe( + Layer.provide(socketLayer), + Layer.provide(RpcSerialization.layerNdjson), + ); +}; + +export class WorkbenchGatewayHttp extends AtomHttpApi.Service()( + "@trustgraph/workbench/atoms/workbench/WorkbenchGatewayHttp", + { + api: GatewayWorkbenchHttpApi, + httpClient: gatewayHttpClientLayer, + runtime: workbenchRuntimeFactory, + }, +) {} + +export class WorkbenchGatewayRpc extends AtomRpc.Service()( + "@trustgraph/workbench/atoms/workbench/WorkbenchGatewayRpc", + { + group: TrustGraphRpcs, + protocol: gatewayRpcProtocolLayer, + runtime: workbenchRuntimeFactory, + }, +) {} + +const workbenchGatewayRuntime = workbenchRuntimeFactory((get) => + Layer.merge( + get(WorkbenchGatewayHttp.runtime.layer), + get(WorkbenchGatewayRpc.runtime.layer), + ) +); interface CommandOptions { readonly initialValue?: A; @@ -549,22 +737,668 @@ interface CommandOptions { readonly concurrent?: boolean; } +function asJsonRecord(value: unknown): JsonRecord { + return Predicate.isObject(value) && !Array.isArray(value) ? value as JsonRecord : {}; +} + +function jsonRecordProperty(value: unknown, key: string): unknown | undefined { + return Predicate.isObject(value) && Predicate.hasProperty(value, key) ? value[key] : undefined; +} + +function streamingEnvelopeFrom(message: unknown): StreamingEnvelope { + return Option.getOrElse(decodeStreamingEnvelope(message), () => ({ + complete: true, + error: "Streaming message could not be decoded", + })); +} + +function propertyValue(source: unknown, key: string): unknown | undefined { + return Predicate.hasProperty(source, key) ? source[key] : undefined; +} + +function stringProperty(source: unknown, key: string): string | undefined { + const value = propertyValue(source, key); + return typeof value === "string" ? value : undefined; +} + +function numberProperty(source: unknown, key: string): number | undefined { + const value = propertyValue(source, key); + return typeof value === "number" ? value : undefined; +} + +function booleanProperty(source: unknown, key: string): boolean | undefined { + const value = propertyValue(source, key); + return typeof value === "boolean" ? value : undefined; +} + +function gatewayResponseErrorMessage(value: unknown): string | undefined { + const error = jsonRecordProperty(value, "error"); + if (typeof error === "string") return error; + const message = jsonRecordProperty(error, "message"); + return typeof message === "string" && message.length > 0 ? message : undefined; +} + +function responseErrorMessage(source: unknown): string | undefined { + const error = propertyValue(source, "error"); + if (typeof error === "string") return error; + return stringProperty(error, "message"); +} + +function streamComplete( + envelope: StreamingEnvelope, + response: unknown, + responseMarkers: ReadonlyArray = [], +): boolean { + return envelope.complete === true || responseMarkers.some((key) => booleanProperty(response, key) === true); +} + +function explainTriplesFrom(source: unknown): Triple[] | undefined { + return Option.getOrUndefined(decodeClientTriples(propertyValue(source, "explain_triples"))); +} + +function streamingMetadataFrom(source: unknown): StreamingMetadata | undefined { + const metadata: StreamingMetadata = {}; + let hasMetadata = false; + + const inToken = numberProperty(source, "in_token"); + if (inToken !== undefined) { + metadata.in_token = inToken; + hasMetadata = true; + } + const outToken = numberProperty(source, "out_token"); + if (outToken !== undefined) { + metadata.out_token = outToken; + hasMetadata = true; + } + const model = stringProperty(source, "model"); + if (model !== undefined) { + metadata.model = model; + hasMetadata = true; + } + + return hasMetadata ? metadata : undefined; +} + +function failWorkbenchRemote(operation: string, cause: unknown): WorkbenchPromiseError { + return WorkbenchPromiseError.make({ cause, message: `${operation}: ${errorMessage(cause)}` }); +} + +function decodeResponseJsonEffect(value: unknown, operation: string): Effect.Effect { + const parsed = typeof value === "string" ? parseJsonUnknown(value) : value; + return parsed === undefined + ? Effect.fail(WorkbenchPromiseError.make({ cause: value, message: `${operation}: response JSON could not be decoded` })) + : Effect.succeed(parsed); +} + +function ensureNoGatewayResponseError(operation: string, value: A): Effect.Effect { + const message = gatewayResponseErrorMessage(value); + return message === undefined + ? Effect.succeed(value) + : Effect.fail(WorkbenchPromiseError.make({ cause: value, message: `${operation}: ${message}` })); +} + +function qaBaseApi(): BaseApi | undefined { + if (typeof window === "undefined") return undefined; + return (window as Window & { __TRUSTGRAPH_WORKBENCH_QA_API__?: BaseApi }).__TRUSTGRAPH_WORKBENCH_QA_API__; +} + +function makeWorkbenchGatewayApi(settings: Settings) { + const user = settings.user; + const dispatch = ( + service: string, + request: JsonRecord, + flow?: string, + ): Effect.Effect => + Effect.gen(function* () { + const qaApi = qaBaseApi(); + if (qaApi !== undefined) { + const response = yield* promiseBoundary(() => + qaApi.makeRequest(service, request, undefined, undefined, flow) + ); + return asJsonRecord(response); + } + + const client = yield* WorkbenchGatewayHttp; + const scope = flow === undefined ? "global" as const : "flow" as const; + const input: WorkbenchDispatchInput = flow === undefined + ? { scope, service, request } + : { scope, service, flow, request }; + const response = yield* client.dispatch({ payload: DispatchPayload.make(input) }); + return asJsonRecord(response); + }).pipe( + Effect.mapError((cause) => failWorkbenchRemote(`dispatch ${service}`, cause)), + ); + + const flowDispatch = (flow: string, service: string, request: JsonRecord) => + dispatch(service, request, flow); + + const dispatchStream = ( + service: string, + request: JsonRecord, + flow: string, + receive: (message: StreamingEnvelope) => boolean, + label: string, + onError: (message: string) => void, + ): Effect.Effect => + Effect.gen(function* () { + const qaApi = qaBaseApi(); + if (qaApi !== undefined) { + return yield* promiseBoundary(() => + qaApi.makeRequestMulti( + service, + request, + (message) => receive(streamingEnvelopeFrom(message)), + undefined, + undefined, + flow, + ) + ).pipe( + Effect.catch((cause) => + Effect.sync(() => { + onError(`${label} request failed: ${errorMessage(cause)}`); + }) + ), + Effect.asVoid, + ); + } + + const client = yield* WorkbenchGatewayRpc; + const payload = DispatchPayload.make({ + scope: "flow", + service, + flow, + request, + }); + + yield* client("DispatchStream", payload).pipe( + Stream.runForEachWhile((chunk) => + Effect.sync(() => !receive({ response: chunk.response, complete: chunk.complete })) + ), + ); + }).pipe( + Effect.catch((cause) => + Effect.sync(() => { + onError(`${label} request failed: ${errorMessage(cause)}`); + }) + ), + ); + + const configAll = () => dispatch("config", { operation: "config" }, undefined); + + const configApi = { + getConfigAll: configAll, + getPrompts: () => + configAll().pipe( + Effect.map((response) => { + const config = asJsonRecord(response.config); + const promptNs = asJsonRecord(config.prompt); + return Object.keys(promptNs) + .filter((key) => key !== "system") + .sort() + .map((id) => ({ id, name: id })); + }), + ), + getSystemPrompt: () => + configAll().pipe( + Effect.map((response) => { + const config = asJsonRecord(response.config); + const prompt = asJsonRecord(config.prompt); + const raw = prompt.system; + return raw == null ? "" : raw; + }), + ), + getPrompt: (id: string) => + configAll().pipe( + Effect.map((response) => { + const config = asJsonRecord(response.config); + return asJsonRecord(config.prompt)[id] ?? null; + }), + ), + getValues: (type: string) => + dispatch("config", { operation: "getvalues", type }, undefined).pipe( + Effect.map((response) => configValuesFromResponse(response)), + ), + getTokenCosts: () => + dispatch("config", { operation: "getvalues", type: "token-cost" }, undefined).pipe( + Effect.map((response) => + configValuesFromResponse(response).map((item) => { + const value = parseConfigValue(item.value) as Record; + return { + model: item.key, + input_price: value.input_price, + output_price: value.output_price, + }; + }) + ), + ), + putConfig: (items: { type: string; key: string; value: string }[]) => + dispatch("config", { operation: "put", values: items }, undefined), + deleteConfig: (target: { type: string; key: string }) => + dispatch("config", { operation: "delete", keys: [target] }, undefined), + }; + + return { + user, + flows: () => ({ + getFlows: () => + dispatch("flow", { operation: "list-flows" }, undefined).pipe( + Effect.map((response) => Array.isArray(response["flow-ids"]) ? response["flow-ids"] as string[] : []), + ), + getFlow: (id: string) => + dispatch("flow", { operation: "get-flow", "flow-id": id }, undefined).pipe( + Effect.flatMap((response) => decodeResponseJsonEffect(response.flow, "get-flow")), + ), + getFlowBlueprints: () => + dispatch("flow", { operation: "list-blueprints" }, undefined).pipe( + Effect.map((response) => Array.isArray(response["blueprint-names"]) ? response["blueprint-names"] as string[] : []), + ), + getFlowBlueprint: (name: string) => + dispatch("flow", { operation: "get-blueprint", "blueprint-name": name }, undefined).pipe( + Effect.flatMap((response) => decodeResponseJsonEffect(response["blueprint-definition"], "get-blueprint")), + ), + startFlow: (id: string, blueprint: string, description: string, parameters?: Record) => { + const request: JsonRecord = { + operation: "start-flow", + "flow-id": id, + "blueprint-name": blueprint, + description, + }; + if (parameters !== undefined && Object.keys(parameters).length > 0) request.parameters = parameters; + return dispatch("flow", request, undefined).pipe( + Effect.flatMap((response) => ensureNoGatewayResponseError("start-flow", response)), + ); + }, + stopFlow: (id: string) => + dispatch("flow", { operation: "stop-flow", "flow-id": id }, undefined), + }), + config: () => configApi, + librarian: () => ({ + getDocuments: () => + dispatch("librarian", { operation: "list-documents", user }, undefined).pipe( + Effect.map((response) => (response["document-metadatas"] ?? response.documents ?? []) as DocumentMetadata[]), + ), + getProcessing: () => + dispatch("librarian", { operation: "list-processing", user }, undefined).pipe( + Effect.map((response) => (response["processing-metadatas"] ?? response.processing ?? response["processing-metadata"] ?? []) as ProcessingMetadata[]), + ), + getDocumentMetadata: (documentId: string) => + dispatch("librarian", { + operation: "get-document-metadata", + "document-id": documentId, + documentId, + user, + }, undefined).pipe( + Effect.map((response) => (response["document-metadata"] ?? response.documentMetadata ?? null) as DocumentMetadata | null), + ), + loadDocument: Effect.fn("trustgraph.workbench.gateway.librarian.loadDocument")(function*( + document: string, + mimeType: string, + title: string, + comments: string, + tags: string[], + id?: string, + metadata?: Triple[], + ) { + const timestamp = yield* Clock.currentTimeMillis; + const documentMetadata: DocumentMetadata = { + time: Math.floor(timestamp / 1000), + kind: mimeType, + title, + comments, + user, + tags, + "document-type": "source", + documentType: "source", + }; + if (id !== undefined) documentMetadata.id = id; + if (metadata !== undefined) documentMetadata.metadata = metadata; + return yield* dispatch("librarian", { + operation: "add-document", + "document-metadata": documentMetadata, + documentMetadata, + content: document, + }, undefined); + }), + removeDocument: (id: string, collection?: string) => + dispatch("librarian", { + operation: "remove-document", + "document-id": id, + documentId: id, + user, + collection: withDefaultCollection(collection ?? "default"), + }, undefined), + beginUpload: ( + metadata: ChunkedUploadDocumentMetadata, + totalSize: number, + chunkSize?: number, + ): Effect.Effect => { + const request: JsonRecord = { + operation: "begin-upload", + "document-metadata": metadata, + documentMetadata: metadata, + "total-size": totalSize, + }; + if (chunkSize !== undefined) request["chunk-size"] = chunkSize; + return dispatch("librarian", request, undefined).pipe( + Effect.flatMap((response) => ensureNoGatewayResponseError("begin-upload", response)), + Effect.map((response) => response as unknown as BeginUploadResponse), + ); + }, + uploadChunk: ( + uploadId: string, + chunkIndex: number, + content: string, + ): Effect.Effect => + dispatch("librarian", { + operation: "upload-chunk", + "upload-id": uploadId, + "chunk-index": chunkIndex, + content, + user, + }, undefined).pipe( + Effect.flatMap((response) => ensureNoGatewayResponseError("upload-chunk", response)), + Effect.map((response) => response as unknown as UploadChunkResponse), + ), + completeUpload: (uploadId: string): Effect.Effect => + dispatch("librarian", { + operation: "complete-upload", + "upload-id": uploadId, + user, + }, undefined).pipe( + Effect.flatMap((response) => ensureNoGatewayResponseError("complete-upload", response)), + Effect.map((response) => response as unknown as CompleteUploadResponse), + ), + }), + knowledge: () => ({ + getKnowledgeCores: () => + dispatch("knowledge", { operation: "list-kg-cores", user }, undefined).pipe( + Effect.map((response) => Array.isArray(response.ids) ? response.ids as string[] : []), + ), + getDocumentEmbeddingCores: () => + dispatch("knowledge", { operation: "list-de-cores", user }, undefined).pipe( + Effect.map((response) => Array.isArray(response.ids) ? response.ids as string[] : []), + ), + loadKgCore: (id: string, flow: string, collection?: string) => + dispatch("knowledge", { + operation: "load-kg-core", + id, + flow, + user, + collection: withDefaultCollection(collection ?? "default"), + }, undefined), + deleteKgCore: (id: string, collection?: string) => + dispatch("knowledge", { + operation: "delete-kg-core", + id, + user, + collection: withDefaultCollection(collection ?? "default"), + }, undefined), + }), + collectionManagement: () => ({ + listCollections: (tagFilter?: string[]) => { + const request: JsonRecord = { operation: "list-collections", user }; + if (tagFilter !== undefined && tagFilter.length > 0) request.tag_filter = tagFilter; + return dispatch("collection-management", request, undefined).pipe( + Effect.map((response) => (response.collections ?? []) as CollectionSummary[]), + ); + }, + updateCollection: (collection: string, name?: string, description?: string, tags?: string[]) => { + const request: JsonRecord = { operation: "update-collection", user, collection }; + if (name !== undefined) request.name = name; + if (description !== undefined) request.description = description; + if (tags !== undefined) request.tags = tags; + return dispatch("collection-management", request, undefined).pipe( + Effect.flatMap((response) => { + const collections = response.collections; + return Array.isArray(collections) && collections.length > 0 + ? Effect.succeed(collections[0]) + : Effect.fail(WorkbenchPromiseError.make({ cause: response, message: "update-collection: failed to update collection" })); + }), + ); + }, + deleteCollection: (collection: string) => + dispatch("collection-management", { + operation: "delete-collection", + user, + collection, + }, undefined), + }), + flow: (flowId: string) => ({ + triplesQuery: Effect.fn("trustgraph.workbench.gateway.flow.triplesQuery")(function*( + s?: Term, + p?: Term, + o?: Term, + limit?: number, + collection?: string, + graph?: string, + ) { + const request: JsonRecord = { + limit: limit ?? 20, + user, + collection: withDefaultCollection(collection ?? "default"), + }; + if (s !== undefined) request.s = s; + if (p !== undefined) request.p = p; + if (o !== undefined) request.o = o; + if (graph !== undefined) request.g = graph; + return yield* flowDispatch(flowId, "triples", request).pipe( + Effect.map((response) => (response.triples ?? response.response ?? []) as Triple[]), + ); + }), + graphRagStreaming: Effect.fn("trustgraph.workbench.gateway.flow.graphRagStreaming")(function*( + text: string, + receiver: (chunk: string, complete: boolean, metadata?: StreamingMetadata) => void, + onError: (error: string) => void, + options?: GraphRagOptions, + collection?: string, + onExplain?: (event: ExplainEvent) => void, + ) { + const recv = (message: unknown): boolean => { + const msg = streamingEnvelopeFrom(message); + if (msg.error !== undefined) { + onError(msg.error); + return true; + } + + const resp = msg.response ?? {}; + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); + return true; + } + + const messageType = stringProperty(resp, "message_type"); + const explainId = stringProperty(resp, "explain_id"); + const explainTriples = explainTriplesFrom(resp); + if ( + messageType === "explain" && + (explainId !== undefined || explainTriples !== undefined) + ) { + const event: ExplainEvent = { + explainId: explainId ?? "", + explainGraph: stringProperty(resp, "explain_graph") ?? "", + }; + if (explainTriples !== undefined) { + event.explainTriples = explainTriples; + } + onExplain?.(event); + if ( + stringProperty(resp, "response") === undefined && + booleanProperty(resp, "endOfStream") !== true && + booleanProperty(resp, "end_of_session") !== true + ) { + return false; + } + } + + const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? ""; + const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]); + const metadata = complete ? streamingMetadataFrom(resp) : undefined; + receiver(chunk, complete, metadata); + return complete; + }; + + const request: JsonRecord = { + query: text, + user, + collection: withDefaultCollection(collection ?? "default"), + streaming: true, + }; + if (options?.entityLimit !== undefined) request["entity-limit"] = options.entityLimit; + if (options?.tripleLimit !== undefined) request["triple-limit"] = options.tripleLimit; + if (options?.maxSubgraphSize !== undefined) request["max-subgraph-size"] = options.maxSubgraphSize; + if (options?.pathLength !== undefined) request["max-path-length"] = options.pathLength; + + return yield* dispatchStream("graph-rag", request, flowId, recv, "Graph RAG", onError); + }), + documentRagStreaming: Effect.fn("trustgraph.workbench.gateway.flow.documentRagStreaming")(function*( + text: string, + receiver: (chunk: string, complete: boolean, metadata?: StreamingMetadata) => void, + onError: (error: string) => void, + docLimit?: number, + collection?: string, + onExplain?: (event: ExplainEvent) => void, + ) { + const recv = (message: unknown): boolean => { + const msg = streamingEnvelopeFrom(message); + if (msg.error !== undefined) { + onError(msg.error); + return true; + } + + const resp = msg.response ?? {}; + const responseError = responseErrorMessage(resp); + if (responseError !== undefined) { + onError(responseError); + return true; + } + + const explainId = stringProperty(resp, "explain_id"); + const explainGraph = stringProperty(resp, "explain_graph"); + if ( + stringProperty(resp, "message_type") === "explain" && + explainId !== undefined && + explainGraph !== undefined + ) { + onExplain?.({ + explainId, + explainGraph, + }); + return false; + } + + const chunk = stringProperty(resp, "response") ?? stringProperty(resp, "chunk") ?? ""; + const complete = streamComplete(msg, resp, ["end_of_session", "endOfStream"]); + const metadata = complete ? streamingMetadataFrom(resp) : undefined; + receiver(chunk, complete, metadata); + return complete; + }; + + const request: JsonRecord = { + query: text, + user, + collection: withDefaultCollection(collection ?? "default"), + streaming: true, + }; + if (docLimit !== undefined) request["doc-limit"] = docLimit; + + return yield* dispatchStream("document-rag", request, flowId, recv, "Document RAG", onError); + }), + agent: Effect.fn("trustgraph.workbench.gateway.flow.agent")(function*( + question: string, + think: (chunk: string, complete: boolean, metadata?: StreamingMetadata) => void, + observe: (chunk: string, complete: boolean, metadata?: StreamingMetadata) => void, + answer: (chunk: string, complete: boolean, metadata?: StreamingMetadata) => void, + error: (e: string) => void, + onExplain?: (event: ExplainEvent) => void, + collection?: string, + ) { + const recv = (message: unknown): boolean => { + const msg = streamingEnvelopeFrom(message); + if (msg.error !== undefined) { + error(msg.error); + return true; + } + + const resp = msg.response ?? {}; + const responseError = responseErrorMessage(resp); + if (stringProperty(resp, "chunk_type") === "error" || responseError !== undefined) { + error(responseError ?? "Unknown agent error"); + return true; + } + + const chunkType = stringProperty(resp, "chunk_type"); + const messageType = stringProperty(resp, "message_type"); + const explainId = stringProperty(resp, "explain_id"); + const explainTriples = explainTriplesFrom(resp); + if ( + (chunkType === "explain" || messageType === "explain") && + (explainId !== undefined || explainTriples !== undefined) + ) { + const event: ExplainEvent = { + explainId: explainId ?? "", + explainGraph: stringProperty(resp, "explain_graph") ?? "", + }; + if (explainTriples !== undefined) { + event.explainTriples = explainTriples; + } + onExplain?.(event); + return false; + } + + const content = stringProperty(resp, "content") ?? ""; + const messageComplete = booleanProperty(resp, "end_of_message") === true; + const dialogComplete = streamComplete(msg, resp, ["end_of_dialog"]); + const metadata = dialogComplete ? streamingMetadataFrom(resp) : undefined; + + Match.value(chunkType).pipe( + Match.when("thought", () => think(content, messageComplete, metadata)), + Match.when("observation", () => observe(content, messageComplete, metadata)), + Match.when("answer", () => answer(content, messageComplete, metadata)), + Match.when("final-answer", () => answer(content, messageComplete, metadata)), + Match.when("action", () => undefined), + Match.orElse(() => undefined), + ); + + return dialogComplete; + }; + + return yield* dispatchStream( + "agent", + { + question, + user, + collection: withDefaultCollection(collection ?? "default"), + streaming: true, + }, + flowId, + recv, + "Agent", + error, + ); + }), + }), + }; +} + +type WorkbenchGatewayApi = ReturnType; + function queryAtom( name: string, - fetcher: (get: Atom.AtomContext, api: BaseApi) => Effect.Effect, + fetcher: (get: Atom.AtomContext, api: WorkbenchGatewayApi) => Effect.Effect, options?: { readonly reactivityKeys?: WorkbenchReactivityKeys; }, ) { const readQuery = Effect.fn(`trustgraph.workbench.query.${name}`)(function*(get: Atom.AtomContext) { - const api = get(apiAtom); + const api = makeWorkbenchGatewayApi(get(settingsAtom)); return yield* fetcher(get, api).pipe( Effect.tap(() => Metric.update(queryCounter, 1)), Effect.tapError((error) => Effect.logError(`[workbench:${name}] query failed`, { error })), ); }); - const atom = workbenchRuntime.atom((get) => readQuery(get)); + const atom = WorkbenchGatewayHttp.runtime.atom((get) => readQuery(get)); const reactiveAtom = options?.reactivityKeys === undefined ? atom : workbenchRuntime.factory.withReactivity(options.reactivityKeys)(atom); @@ -591,18 +1425,49 @@ function localCommandAtom( name: string, - run: (arg: Arg, get: Atom.FnContext, api: BaseApi) => Effect.Effect, + run: (arg: Arg, get: Atom.FnContext, api: WorkbenchGatewayApi) => Effect.Effect, options?: CommandOptions, ) { const runCommand = Effect.fn(`trustgraph.workbench.command.${name}`)(function*(arg: Arg, get: Atom.FnContext) { - const api = get(apiAtom); + const api = makeWorkbenchGatewayApi(get(settingsAtom)); return yield* run(arg, get, api).pipe( Effect.tap(() => Metric.update(mutationCounter, 1)), Effect.tapError((error) => Effect.logError(`[workbench:${name}] command failed`, { error })), ); }); - return workbenchRuntime.fn()(runCommand, options); + // Browser-only services like WorkbenchFiles are installed as global layers on + // workbenchRuntimeFactory, but AtomRuntime's type parameter only tracks the + // HTTP service layer passed to AtomHttpApi.Service. + return WorkbenchGatewayHttp.runtime.fn()( + runCommand as ( + arg: Arg, + get: Atom.FnContext, + ) => Effect.Effect, + options, + ); +} + +function gatewayCommandAtom( + name: string, + run: (arg: Arg, get: Atom.FnContext, api: WorkbenchGatewayApi) => Effect.Effect, + options?: CommandOptions, +) { + const runCommand = Effect.fn(`trustgraph.workbench.command.${name}`)(function*(arg: Arg, get: Atom.FnContext) { + const api = makeWorkbenchGatewayApi(get(settingsAtom)); + return yield* run(arg, get, api).pipe( + Effect.tap(() => Metric.update(mutationCounter, 1)), + Effect.tapError((error) => Effect.logError(`[workbench:${name}] command failed`, { error })), + ); + }); + + return workbenchGatewayRuntime.fn()( + runCommand as ( + arg: Arg, + get: Atom.FnContext, + ) => Effect.Effect, + options, + ); } function setActivity(get: Atom.FnContext, label: string, active: boolean): void { @@ -790,7 +1655,7 @@ export const toggleThemeAtom = Atom.writable( const liveApiFactory: WorkbenchApiFactory = { create: (settings) => - new BaseApi( + makeBaseApi( settings.user, settings.apiKey.length > 0 ? settings.apiKey : undefined, settings.gatewayUrl.length > 0 ? settings.gatewayUrl : undefined, @@ -825,10 +1690,10 @@ export const connectionStateAtom = Atom.make((get) => { export const flowsAtom = queryAtom( "flows", Effect.fn("trustgraph.workbench.flows")(function*(_get, api) { - const ids = yield* promiseBoundary(() => api.flows().getFlows()); + const ids = yield* api.flows().getFlows(); return yield* Effect.all( ids.map((id) => - promiseBoundary(() => api.flows().getFlow(id)).pipe( + api.flows().getFlow(id).pipe( Effect.map((definition) => ({ id, ...(typeof definition === "object" && definition !== null ? definition : {}), @@ -844,7 +1709,7 @@ export const flowsAtom = queryAtom( export const flowBlueprintsAtom = queryAtom( "flowBlueprints", Effect.fn("trustgraph.workbench.flowBlueprints")(function*(_get, api) { - const list = yield* promiseBoundary(() => api.flows().getFlowBlueprints()); + const list = yield* api.flows().getFlowBlueprints(); return list ?? []; }), { reactivityKeys: ["flows", "flow-blueprints"] }, @@ -855,7 +1720,7 @@ export const flowBlueprintAtom = Atom.family((name: string) => `flowBlueprint.${name}`, Effect.fn("trustgraph.workbench.flowBlueprint")(function*(_get, api) { if (name.length === 0) return null; - return yield* promiseBoundary(() => api.flows().getFlowBlueprint(name)); + return yield* api.flows().getFlowBlueprint(name); }), { reactivityKeys: ["flow-blueprint", name] }, ).pipe(Atom.setIdleTTL("10 minutes")), @@ -864,7 +1729,7 @@ export const flowBlueprintAtom = Atom.family((name: string) => export const configAllAtom = queryAtom( "configAll", Effect.fn("trustgraph.workbench.configAll")(function*(_get, api) { - return yield* promiseBoundary(() => api.config().getConfigAll()); + return yield* api.config().getConfigAll(); }), { reactivityKeys: ["config"] }, ).pipe(Atom.setIdleTTL("2 minutes")); @@ -872,7 +1737,7 @@ export const configAllAtom = queryAtom( export const promptsAtom = queryAtom( "prompts", Effect.fn("trustgraph.workbench.prompts")(function*(_get, api) { - return yield* promiseBoundary(() => api.config().getPrompts()); + return yield* api.config().getPrompts(); }), { reactivityKeys: ["config", "prompts"] }, ).pipe(Atom.setIdleTTL("2 minutes")); @@ -880,7 +1745,7 @@ export const promptsAtom = queryAtom( export const systemPromptAtom = queryAtom( "systemPrompt", Effect.fn("trustgraph.workbench.systemPrompt")(function*(_get, api) { - const prompt = yield* promiseBoundary(() => api.config().getSystemPrompt()); + const prompt = yield* api.config().getSystemPrompt(); return typeof prompt === "string" ? prompt : encodeJsonUnknownString(prompt); }), { reactivityKeys: ["config", "system-prompt"] }, @@ -891,7 +1756,7 @@ export const promptDetailAtom = Atom.family((id: string) => `prompt.${id}`, Effect.fn("trustgraph.workbench.promptDetail")(function*(_get, api) { if (id.length === 0) return null; - return yield* promiseBoundary(() => api.config().getPrompt(id)); + return yield* api.config().getPrompt(id); }), { reactivityKeys: ["config", "prompt", id] }, ).pipe(Atom.setIdleTTL("2 minutes")), @@ -900,7 +1765,7 @@ export const promptDetailAtom = Atom.family((id: string) => export const tokenCostsAtom = queryAtom( "tokenCosts", Effect.fn("trustgraph.workbench.tokenCosts")(function*(_get, api) { - const data = yield* promiseBoundary(() => api.config().getTokenCosts()); + const data = yield* api.config().getTokenCosts(); return Array.isArray(data) ? data.map((item: Record) => ({ model: String(item.model ?? ""), @@ -915,7 +1780,7 @@ export const tokenCostsAtom = queryAtom( export const mcpServersAtom = queryAtom( "mcpServers", Effect.fn("trustgraph.workbench.mcpServers")(function*(_get, api) { - const values = yield* promiseBoundary(() => api.config().getValues("mcp")); + const values = yield* api.config().getValues("mcp"); return parseConfigEntries(values, "MCP server config"); }), { reactivityKeys: ["config", "mcp"] }, @@ -924,7 +1789,7 @@ export const mcpServersAtom = queryAtom( export const mcpToolsAtom = queryAtom( "mcpTools", Effect.fn("trustgraph.workbench.mcpTools")(function*(_get, api) { - const values = yield* promiseBoundary(() => api.config().getValues("tool")); + const values = yield* api.config().getValues("tool"); return parseConfigEntries(values, "tool config"); }), { reactivityKeys: ["config", "tool"] }, @@ -933,7 +1798,7 @@ export const mcpToolsAtom = queryAtom( export const libraryDocumentsAtom = queryAtom( "libraryDocuments", Effect.fn("trustgraph.workbench.libraryDocuments")(function*(_get, api) { - return yield* promiseBoundary(() => api.librarian().getDocuments()); + return yield* api.librarian().getDocuments(); }), { reactivityKeys: ["library", "documents"] }, ).pipe(Atom.setIdleTTL("1 minute")); @@ -941,7 +1806,7 @@ export const libraryDocumentsAtom = queryAtom( export const libraryProcessingAtom = queryAtom( "libraryProcessing", Effect.fn("trustgraph.workbench.libraryProcessing")(function*(_get, api) { - return (yield* promiseBoundary(() => api.librarian().getProcessing())) as ProcessingMetadata[]; + return (yield* api.librarian().getProcessing()) as ProcessingMetadata[]; }), { reactivityKeys: ["library", "processing"] }, ).pipe(Atom.setIdleTTL("30 seconds")); @@ -951,7 +1816,7 @@ export const documentMetadataAtom = Atom.family((documentId: string) => `documentMetadata.${documentId}`, Effect.fn("trustgraph.workbench.documentMetadata")(function*(_get, api) { if (documentId.length === 0) return null; - return yield* promiseBoundary(() => api.librarian().getDocumentMetadata(documentId)); + return yield* api.librarian().getDocumentMetadata(documentId); }), { reactivityKeys: ["library", "document", documentId] }, ).pipe(Atom.setIdleTTL("5 minutes")), @@ -960,7 +1825,7 @@ export const documentMetadataAtom = Atom.family((documentId: string) => export const kgCoresAtom = queryAtom( "kgCores", Effect.fn("trustgraph.workbench.kgCores")(function*(_get, api) { - return yield* promiseBoundary(() => api.knowledge().getKnowledgeCores()); + return yield* api.knowledge().getKnowledgeCores(); }), { reactivityKeys: ["knowledge", "kg-cores"] }, ).pipe(Atom.setIdleTTL("2 minutes")); @@ -968,7 +1833,7 @@ export const kgCoresAtom = queryAtom( export const deCoresAtom = queryAtom( "deCores", Effect.fn("trustgraph.workbench.deCores")(function*(_get, api) { - return yield* promiseBoundary(() => api.knowledge().getDocumentEmbeddingCores()); + return yield* api.knowledge().getDocumentEmbeddingCores(); }), { reactivityKeys: ["knowledge", "de-cores"] }, ).pipe(Atom.setIdleTTL("2 minutes")); @@ -976,7 +1841,7 @@ export const deCoresAtom = queryAtom( export const collectionsAtom = queryAtom( "collections", Effect.fn("trustgraph.workbench.collections")(function*(_get, api) { - const collections = (yield* promiseBoundary(() => api.collectionManagement().listCollections())) as CollectionSummary[]; + const collections = (yield* api.collectionManagement().listCollections()) as CollectionSummary[]; const list = Array.isArray(collections) ? [...collections] : []; const hasDefault = list.some((item) => (item.collection ?? item.id ?? item.name) === "default"); if (!hasDefault) list.unshift({ id: "default", collection: "default", name: "default" }); @@ -1029,7 +1894,7 @@ const graphTriplesAtomByKey = Atom.family((key: string) => { return queryAtom( `graphTriples.${input.flowId}.${input.collection}.${input.limit}`, Effect.fn("trustgraph.workbench.graphTriples")(function*(_get, api) { - return yield* promiseBoundary(() => + return yield* ( api.flow(input.flowId).triplesQuery(undefined, undefined, undefined, input.limit, input.collection) ); }), @@ -1053,7 +1918,7 @@ const explainTriplesAtomByKey = Atom.family((key: string) => ); const results = yield* Effect.all( graphUris.map((event) => - promiseBoundary(() => + ( api.flow(input.flowId) .triplesQuery(undefined, undefined, undefined, 500, input.collection, event.explainGraph) ).pipe(Effect.orElseSucceed((): Array => [])) @@ -1200,7 +2065,7 @@ export const startFlowAtom = commandAtom< yield* withActivity( get, `Start flow ${input.id}`, - promiseBoundary(() => api.flows().startFlow(input.id, input.blueprint, input.description, input.parameters)).pipe( + api.flows().startFlow(input.id, input.blueprint, input.description, input.parameters).pipe( Effect.tap(() => Effect.sync(() => { get.refresh(flowsAtom); get.set(flowIdAtom, input.id); @@ -1218,7 +2083,7 @@ export const stopFlowAtom = commandAtom("stopFlow", Effect.fn("tru yield* withActivity( get, `Stop flow ${id}`, - promiseBoundary(() => api.flows().stopFlow(id)).pipe( + api.flows().stopFlow(id).pipe( Effect.tap(() => Effect.sync(() => { get.refresh(flowsAtom); get.set(pushNotificationAtom, { @@ -1234,7 +2099,7 @@ export const stopFlowAtom = commandAtom("stopFlow", Effect.fn("tru export const saveMcpServerAtom = commandAtom<{ key: string; config: McpServerConfig }, void>( "saveMcpServer", Effect.fn("trustgraph.workbench.saveMcpServer")(function*({ key, config }, get, api) { - yield* promiseBoundary(() => api.config().putConfig([{ type: "mcp", key, value: encodeJsonUnknownString(config) }])); + yield* api.config().putConfig([{ type: "mcp", key, value: encodeJsonUnknownString(config) }]); get.refresh(mcpServersAtom); get.refresh(configAllAtom); get.set(pushNotificationAtom, { type: "success", title: "MCP server saved", description: key }); @@ -1243,7 +2108,7 @@ export const saveMcpServerAtom = commandAtom<{ key: string; config: McpServerCon ); export const deleteMcpServerAtom = commandAtom("deleteMcpServer", Effect.fn("trustgraph.workbench.deleteMcpServer")(function*(key, get, api) { - yield* promiseBoundary(() => api.config().deleteConfig({ type: "mcp", key })); + yield* api.config().deleteConfig({ type: "mcp", key }); get.refresh(mcpServersAtom); get.refresh(configAllAtom); get.set(pushNotificationAtom, { type: "success", title: "MCP server deleted", description: key }); @@ -1252,7 +2117,7 @@ export const deleteMcpServerAtom = commandAtom("deleteMcpServer", export const saveMcpToolAtom = commandAtom<{ key: string; config: ToolConfig }, void>( "saveMcpTool", Effect.fn("trustgraph.workbench.saveMcpTool")(function*({ key, config }, get, api) { - yield* promiseBoundary(() => api.config().putConfig([{ type: "tool", key, value: encodeJsonUnknownString(config) }])); + yield* api.config().putConfig([{ type: "tool", key, value: encodeJsonUnknownString(config) }]); get.refresh(mcpToolsAtom); get.refresh(configAllAtom); get.set(pushNotificationAtom, { type: "success", title: "Tool saved", description: key }); @@ -1261,7 +2126,7 @@ export const saveMcpToolAtom = commandAtom<{ key: string; config: ToolConfig }, ); export const deleteMcpToolAtom = commandAtom("deleteMcpTool", Effect.fn("trustgraph.workbench.deleteMcpTool")(function*(key, get, api) { - yield* promiseBoundary(() => api.config().deleteConfig({ type: "tool", key })); + yield* api.config().deleteConfig({ type: "tool", key }); get.refresh(mcpToolsAtom); get.refresh(configAllAtom); get.set(pushNotificationAtom, { type: "success", title: "Tool deleted", description: key }); @@ -1280,12 +2145,12 @@ export interface UploadDocumentInput { const uploadDocumentEffect = Effect.fn("trustgraph.workbench.uploadDocument.effect")(function*( input: UploadDocumentInput, get: Atom.FnContext, - api: BaseApi, + api: WorkbenchGatewayApi, ) { yield* withActivity( get, "Upload document", - promiseBoundary(() => api.librarian().loadDocument(input.base64, input.mimeType, input.title, input.comments, input.tags)).pipe( + api.librarian().loadDocument(input.base64, input.mimeType, input.title, input.comments, input.tags).pipe( Effect.tap(() => Effect.sync(() => { get.refresh(libraryDocumentsAtom); get.refresh(libraryProcessingAtom); @@ -1302,7 +2167,7 @@ const uploadDocumentEffect = Effect.fn("trustgraph.workbench.uploadDocument.effe const uploadDocumentChunkedEffect = Effect.fn("trustgraph.workbench.uploadDocumentChunked.effect")(function*( input: UploadDocumentInput, get: Atom.FnContext, - api: BaseApi, + api: WorkbenchGatewayApi, ) { yield* withActivity(get, "Upload document (chunked)", Effect.gen(function*() { const totalSize = input.base64.length; @@ -1316,7 +2181,7 @@ const uploadDocumentChunkedEffect = Effect.fn("trustgraph.workbench.uploadDocume const lib = api.librarian(); const documentId = yield* randomId("upload"); const timestamp = yield* Clock.currentTimeMillis; - const beginResp = yield* promiseBoundary(() => lib.beginUpload({ + const beginResp = yield* lib.beginUpload({ id: documentId, time: Math.floor(timestamp / 1000), kind: input.mimeType, @@ -1324,7 +2189,7 @@ const uploadDocumentChunkedEffect = Effect.fn("trustgraph.workbench.uploadDocume comments: input.comments, tags: input.tags, user: get(settingsAtom).user, - }, totalSize)); + }, totalSize); const uploadId = beginResp["upload-id"]; const chunkSize = beginResp["chunk-size"]; const totalChunks = beginResp["total-chunks"]; @@ -1333,7 +2198,7 @@ const uploadDocumentChunkedEffect = Effect.fn("trustgraph.workbench.uploadDocume const start = i * chunkSize; const end = Math.min(start + chunkSize, totalSize); const chunk = input.base64.slice(start, end); - yield* promiseBoundary(() => lib.uploadChunk(uploadId, i, chunk)); + yield* lib.uploadChunk(uploadId, i, chunk); bytesUploaded += chunk.length; get.set(uploadFormAtom, { ...get(uploadFormAtom), progress: { phase: "uploading", @@ -1350,7 +2215,7 @@ const uploadDocumentChunkedEffect = Effect.fn("trustgraph.workbench.uploadDocume bytesTotal: totalSize, bytesUploaded: totalSize, } }); - yield* promiseBoundary(() => lib.completeUpload(uploadId)); + yield* lib.completeUpload(uploadId); get.refresh(libraryDocumentsAtom); get.refresh(libraryProcessingAtom); get.set(pushNotificationAtom, { @@ -1426,7 +2291,7 @@ export const removeDocumentAtom = commandAtom("removeDocument", Ef yield* withActivity( get, "Remove document", - promiseBoundary(() => api.librarian().removeDocument(documentId, get(settingsAtom).collection)).pipe( + api.librarian().removeDocument(documentId, get(settingsAtom).collection).pipe( Effect.tap(() => Effect.sync(() => { get.refresh(libraryDocumentsAtom); get.refresh(libraryProcessingAtom); @@ -1438,7 +2303,7 @@ export const removeDocumentAtom = commandAtom("removeDocument", Ef export const loadKgCoreAtom = commandAtom("loadKgCore", Effect.fn("trustgraph.workbench.loadKgCore")(function*(id, get, api) { get.set(activeActionAtom, id); - yield* promiseBoundary(() => api.knowledge().loadKgCore(id, get(flowIdAtom), get(settingsAtom).collection)).pipe( + yield* api.knowledge().loadKgCore(id, get(flowIdAtom), get(settingsAtom).collection).pipe( Effect.tap(() => Effect.sync(() => { get.set(pushNotificationAtom, { type: "success", title: "Core loaded", description: id }); })), @@ -1448,7 +2313,7 @@ export const loadKgCoreAtom = commandAtom("loadKgCore", Effect.fn( export const deleteKgCoreAtom = commandAtom("deleteKgCore", Effect.fn("trustgraph.workbench.deleteKgCore")(function*(id, get, api) { get.set(activeActionAtom, id); - yield* promiseBoundary(() => api.knowledge().deleteKgCore(id, get(settingsAtom).collection)).pipe( + yield* api.knowledge().deleteKgCore(id, get(settingsAtom).collection).pipe( Effect.tap(() => Effect.sync(() => { get.refresh(kgCoresAtom); get.set(pushNotificationAtom, { type: "success", title: "Core deleted", description: id }); @@ -1460,13 +2325,12 @@ export const deleteKgCoreAtom = commandAtom("deleteKgCore", Effect export const createCollectionAtom = commandAtom("createCollection", Effect.fn("trustgraph.workbench.createCollection")(function*(form, get, api) { const id = form.id.trim(); const tags = form.tags.split(",").map((tag) => tag.trim()).filter((tag) => tag.length > 0); - yield* promiseBoundary(() => api.collectionManagement().updateCollection( + yield* api.collectionManagement().updateCollection( id, form.name.trim().length > 0 ? form.name.trim() : undefined, form.description.trim().length > 0 ? form.description.trim() : undefined, tags.length > 0 ? tags : undefined, - )); - get.set(settingsAtom, { ...get(settingsAtom), collection: id }); + ); get.refresh(collectionsAtom); get.set(pushNotificationAtom, { type: "success", @@ -1476,7 +2340,7 @@ export const createCollectionAtom = commandAtom("createCol }), { reactivityKeys: ["collections"] }); export const deleteCollectionAtom = commandAtom("deleteCollection", Effect.fn("trustgraph.workbench.deleteCollection")(function*(id, get, api) { - yield* promiseBoundary(() => api.collectionManagement().deleteCollection(id)); + yield* api.collectionManagement().deleteCollection(id); get.refresh(collectionsAtom); const current = get(settingsAtom); if (current.collection === id) { @@ -1485,7 +2349,7 @@ export const deleteCollectionAtom = commandAtom("deleteCollection" get.set(pushNotificationAtom, { type: "success", title: "Collection deleted", description: id }); }), { reactivityKeys: ["collections"] }); -export const submitMessageAtom = commandAtom<{ input: string }, void>( +export const submitMessageAtom = gatewayCommandAtom<{ input: string }, void>( "submitMessage", Effect.fn("trustgraph.workbench.submitMessage")(function*({ input }, get, api) { const trimmed = input.trim(); @@ -1576,14 +2440,14 @@ export const submitMessageAtom = commandAtom<{ input: string }, void>( explainEvents.push(event); }; - Match.value(chatMode).pipe( - Match.when("graph-rag", () => { - flow.graphRagStreaming(trimmed, onChunk, onError, undefined, collection, onExplain); - }), - Match.when("document-rag", () => { - flow.documentRagStreaming(trimmed, onChunk, onError, undefined, collection, onExplain); - }), - Match.when("agent", () => { + yield* Match.value(chatMode).pipe( + Match.when("graph-rag", () => + flow.graphRagStreaming(trimmed, onChunk, onError, undefined, collection, onExplain) + ), + Match.when("document-rag", () => + flow.documentRagStreaming(trimmed, onChunk, onError, undefined, collection, onExplain) + ), + Match.when("agent", () => flow.agent( trimmed, (chunk, complete) => { @@ -1632,8 +2496,8 @@ export const submitMessageAtom = commandAtom<{ input: string }, void>( onError, onExplain, collection, - ); - }), + ) + ), Match.exhaustive, ); }), diff --git a/ts/packages/workbench/src/components/notification-toasts.tsx b/ts/packages/workbench/src/components/notification-toasts.tsx index ac34e14e..e2c0f29c 100644 --- a/ts/packages/workbench/src/components/notification-toasts.tsx +++ b/ts/packages/workbench/src/components/notification-toasts.tsx @@ -20,12 +20,12 @@ export function NotificationToasts() { if (notifications.length === 0) return null; return ( -
+
{notifications.map((n) => (
@@ -37,7 +37,7 @@ export function NotificationToasts() {