From 44110c5bb4ed2df576da6d4b591f1afa52a5ba42 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 03:23:23 -0500 Subject: [PATCH] Add typed flow spec accessors --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 82 +++++---- .../src/__tests__/flow-spec-runtime.test.ts | 22 ++- ts/packages/base/src/processor/flow.ts | 158 ++++++++++++++---- .../base/src/services/embeddings-service.ts | 6 +- ts/packages/base/src/services/llm-service.ts | 8 +- ts/packages/base/src/spec/producer-spec.ts | 40 +++++ .../base/src/spec/request-response-spec.ts | 40 +++++ .../flow/src/agent/mcp-tool/service.ts | 6 +- ts/packages/flow/src/agent/react/service.ts | 80 ++++----- ts/packages/flow/src/chunking/service.ts | 8 +- ts/packages/flow/src/decoding/pdf-decoder.ts | 26 +-- .../flow/src/extract/knowledge-extract.ts | 37 ++-- ts/packages/flow/src/prompt/template.ts | 5 +- .../query/embeddings/qdrant-doc-service.ts | 6 +- .../query/embeddings/qdrant-graph-service.ts | 6 +- .../src/query/triples/falkordb-service.ts | 6 +- .../src/retrieval/document-rag-service.ts | 58 ++++--- .../flow/src/retrieval/graph-rag-service.ts | 71 ++++---- .../embeddings/graph-embeddings-service.ts | 15 +- 19 files changed, 457 insertions(+), 223 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index befb2947..83ce30e2 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,13 +12,13 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Base parameter -spec accessor slice: +Current signal counts from `ts/packages` after the 2026-06-02 Base +producer/requestor spec accessor slice: | Signal | Count | | --- | ---: | | `Effect.runPromise` | 168 | -| `Map<` | 82 | +| `Map<` | 84 | | `WebSocket` | 62 | | `new Map` | 62 | | `toPromiseRequestor` | 0 | @@ -83,6 +83,11 @@ Notes: `flow.parameter(spec)`. Bare string parameter lookup remains available as an `unknown` compatibility escape, while typed parameter access now decodes through Schema and fails with a tagged `FlowParameterDecodeError`. +- The base producer/requestor spec accessor slice added typed spec-object + accessors for `ProducerSpec` and `RequestResponseSpec`, then + migrated flow service producer/requestor lookups off caller-chosen generic + string calls. Spec object handles are scoped per `Flow` through WeakMaps and + finalizers delete only the handle they registered. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -647,6 +652,42 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Base Producer And Requestor Spec Accessor Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/spec/producer-spec.ts` now exposes + `ProducerSpec.producerEffect(flow)` and stores typed producer handles in + a per-spec WeakMap keyed by `Flow`. + - `ts/packages/base/src/spec/request-response-spec.ts` now exposes + `RequestResponseSpec.requestorEffect(flow)` and stores typed + requestor handles in a per-spec WeakMap keyed by `Flow`. + - Spec finalizers remove only the exact handle they registered, avoiding + stale finalizers deleting newer registrations for the same flow/spec pair. + - `ts/packages/base/src/processor/flow.ts` now supports + `flow.producerEffect(spec)`, `flow.requestorEffect(spec)`, + `flow.producer(spec)`, and `flow.requestor(spec)` while keeping string + accessors as untyped compatibility escapes. + - Base service adapters and flow service handlers now reuse the same hoisted + producer/requestor spec object in their spec arrays and handler lookups. + - `ts/packages/base/src/__tests__/flow-spec-runtime.test.ts` covers typed + spec-object lookups, duplicate spec identity failures, and scoped + finalizer cleanup for producer and requestor handles. +- Remaining: + - Bare string `Flow` producer/requestor accessors remain compatibility + escapes for external/legacy callers, but new Effect service code should use + spec objects. +- Verification: + - `bun run --cwd ts/packages/base test -- src/__tests__/flow-spec-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/base test` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -673,10 +714,10 @@ Notes: layers. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - - Typed string registries in `Flow` now have Schema-backed parameter specs. - Producer and requestor typed spec-object accessors remain. Effect - `HashMap`/`MutableHashMap` can improve lookup ergonomics with `Option`, but - it does not remove the string-key type hole by itself. + - Typed string registries in `Flow` now have Schema-backed parameter specs + and typed producer/requestor spec-object accessors. New service handlers + should hoist spec objects and use those accessors; bare string accessors + remain compatibility escapes. - Gateway/client: - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. Socket errors/JSON parsing now use tagged errors and Schema decoding. @@ -695,26 +736,6 @@ Notes: ## Ranked Findings -### P1: Base Typed Producer And Requestor Spec Accessors - -- TrustGraph evidence: - - `ts/packages/base/src/processor/flow.ts` - - `ts/packages/base/src/spec/producer-spec.ts` - - `ts/packages/base/src/spec/request-response-spec.ts` -- Effect primitives: - - Typed spec-object registries, `Context`, `Layer`, `Effect.fn`, `Option`, - `Predicate`, `HashMap`/`MutableHashMap`. -- Rewrite shape: - - Parameter specs are now Schema-backed and support - `flow.parameterEffect(spec)` / `flow.parameter(spec)`. - - Add typed spec-object accessors for producers and requestors so call sites - stop spelling generic string lookups. - - Do not add assertions to quiet Effect channel inference problems. -- Tests: - - `cd ts && bun run --cwd packages/base test` - - Root `cd ts && bun run check` because this surface easily pollutes Effect - error and requirement channels. - ### P1: Make SDK, Storage, And Provider Layers Managed Resources - TrustGraph evidence: @@ -765,10 +786,9 @@ Notes: ## Recommended PR Order -1. Complete base typed producer/requestor spec accessors. -2. Gateway RPC callback and client streaming completion cleanup. -3. Storage/provider managed resource cleanup. -4. MCP parity/deletion decision and workbench platform polish. +1. Gateway RPC callback and client streaming completion cleanup. +2. Storage/provider managed resource cleanup. +3. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts index ef3c64c4..373e1e14 100644 --- a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts +++ b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts @@ -153,12 +153,14 @@ describe("Effect-native flow specifications", () => { "starts producer specs through Effect factories and exposes typed accessors", Effect.fnUntraced(function* () { const backend = new RuntimeBackend(new ScriptedConsumer()); + const outputProducerSpec = makeProducerSpec("output"); + const duplicateOutputProducerSpec = makeProducerSpec("output"); const flow = new Flow( "default", "processor", backend, { topics: { output: "actual-output" } }, - [makeProducerSpec("output")], + [outputProducerSpec], ); yield* Effect.scoped( @@ -166,17 +168,21 @@ describe("Effect-native flow specifications", () => { backend, Effect.gen(function* () { yield* flow.startEffect(); - const producer = yield* flow.producerEffect("output"); + const producer = yield* flow.producerEffect(outputProducerSpec); + const duplicateSpecError = yield* flow.producerEffect(duplicateOutputProducerSpec).pipe(Effect.flip); + expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError"); yield* producer.send("request-1", "hello"); }), ), ); + const closedProducerError = yield* flow.producerEffect(outputProducerSpec).pipe(Effect.flip); expect(backend.producerOptions).toEqual({ topic: "actual-output" }); expect(backend.producer.sent).toEqual([ { message: "hello", properties: { id: "request-1" } }, ]); expect(backend.producer.closeCount).toBe(1); + expect(closedProducerError._tag).toBe("FlowResourceNotFoundError"); }), ); @@ -229,6 +235,8 @@ describe("Effect-native flow specifications", () => { responseConsumer.push(createMessage("response", { id: properties?.id ?? "" })); }, ); + const requestResponseSpec = makeRequestResponseSpec("rr", "request", "response"); + const duplicateRequestResponseSpec = makeRequestResponseSpec("rr", "request", "response"); const flow = new Flow( "default", "processor", @@ -239,7 +247,7 @@ describe("Effect-native flow specifications", () => { response: "actual-response", }, }, - [makeRequestResponseSpec("rr", "request", "response")], + [requestResponseSpec], ); const response = yield* Effect.scoped( @@ -247,7 +255,9 @@ describe("Effect-native flow specifications", () => { backend, Effect.gen(function* () { yield* flow.startEffect(); - const requestor = flow.requestor("rr"); + const duplicateSpecError = yield* flow.requestorEffect(duplicateRequestResponseSpec).pipe(Effect.flip); + expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError"); + const requestor = flow.requestor(requestResponseSpec); const fiber = yield* Effect.promise(() => requestor.request("request", { timeoutMs: 250 }), ).pipe(Effect.forkChild); @@ -256,10 +266,12 @@ describe("Effect-native flow specifications", () => { }), ), ); + const closedRequestorError = yield* flow.requestorEffect(requestResponseSpec).pipe(Effect.flip); expect(response).toBe("response"); expect(backend.producerOptions).toEqual({ topic: "actual-request" }); expect(responseConsumer.acknowledged.length).toBe(1); + expect(closedRequestorError._tag).toBe("FlowResourceNotFoundError"); }), ); @@ -282,7 +294,7 @@ describe("Effect-native flow specifications", () => { backend, Effect.gen(function* () { yield* flow.startEffect(); - const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip); + const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip); const parameter = yield* flow.parameterEffect(presentParameter); const legacyParameter = yield* flow.parameterEffect("present"); const parameterError = yield* flow.parameterEffect("missing-parameter").pipe(Effect.flip); diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index 946f6a13..b0152b67 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -30,6 +30,8 @@ import { } from "../messaging/runtime.js"; import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; import type { ParameterSpec } from "../spec/parameter-spec.js"; +import type { ProducerSpec } from "../spec/producer-spec.js"; +import type { RequestResponseSpec } from "../spec/request-response-spec.js"; import type { Spec, SpecRuntimeRequirements } from "../spec/types.js"; export interface FlowDefinition { @@ -131,6 +133,93 @@ export function makeFlow( throw flowParameterDecodeError(name, spec.name, "Parameter value does not match schema"); }; + const getProducerEffect = ( + producerName: string, + ): Effect.Effect, FlowResourceNotFoundError> => { + const producer = producers.get(producerName); + return producer === undefined + ? Effect.fail(flowResourceNotFoundError(name, "producer", producerName)) + : Effect.succeed(producer); + }; + + const getProducer = (producerName: string): EffectProducer => { + const producer = producers.get(producerName); + if (producer === undefined) throw flowResourceNotFoundError(name, "producer", producerName); + return producer; + }; + + const getRequestorEffect = ( + requestorName: string, + ): Effect.Effect, FlowResourceNotFoundError> => { + const requestor = requestors.get(requestorName); + return requestor === undefined + ? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName)) + : Effect.succeed(requestor); + }; + + const getRequestor = ( + requestorName: string, + ): EffectRequestResponse => { + const requestor = requestors.get(requestorName); + if (requestor === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName); + return requestor; + }; + + const toFlowProducer = (producer: EffectProducer): FlowProducer => ({ + send: (id, message) => compatibilityRuntime.runPromise(producer.send(id, message)), + flush: () => compatibilityRuntime.runPromise(producer.flush), + stop: () => compatibilityRuntime.runPromise(producer.flush.pipe(Effect.flatMap(() => producer.close))), + }); + + const toFlowRequestor = ( + requestor: EffectRequestResponse, + ): FlowRequestor => ({ + request: (request, options) => + compatibilityRuntime.runPromise( + requestor.request( + request, + toEffectRequestOptions(options), + ), + ), + stop: () => compatibilityRuntime.runPromise(requestor.stop), + }); + + function producerEffect( + producerSpec: ProducerSpec, + ): Effect.Effect, FlowResourceNotFoundError>; + function producerEffect( + producerName: string, + ): Effect.Effect, FlowResourceNotFoundError>; + function producerEffect( + producer: string | ProducerSpec, + ) { + if (typeof producer === "string") { + return getProducerEffect(producer); + } + if (!producers.has(producer.name)) { + return Effect.fail(flowResourceNotFoundError(name, "producer", producer.name)); + } + return producer.producerEffect(flow); + } + + function requestorEffect( + requestorSpec: RequestResponseSpec, + ): Effect.Effect, FlowResourceNotFoundError>; + function requestorEffect( + requestorName: string, + ): Effect.Effect, FlowResourceNotFoundError>; + function requestorEffect( + requestor: string | RequestResponseSpec, + ) { + if (typeof requestor === "string") { + return getRequestorEffect(requestor); + } + if (!requestors.has(requestor.name)) { + return Effect.fail(flowResourceNotFoundError(name, "requestor", requestor.name)); + } + return requestor.requestorEffect(flow); + } + function parameterEffect( parameterSpec: ParameterSpec, ): Effect.Effect; @@ -158,6 +247,34 @@ export function makeFlow( return decodeParameter(parameter, value); } + function producer(producerSpec: ProducerSpec): FlowProducer; + function producer(producerName: string): FlowProducer; + function producer(producer: string | ProducerSpec) { + if (typeof producer === "string") { + return toFlowProducer(getProducer(producer)); + } + if (!producers.has(producer.name)) { + throw flowResourceNotFoundError(name, "producer", producer.name); + } + return toFlowProducer(compatibilityRuntime.runSync(producer.producerEffect(flow))); + } + + function requestor( + requestorSpec: RequestResponseSpec, + ): FlowRequestor; + function requestor(requestorName: string): FlowRequestor; + function requestor( + requestor: string | RequestResponseSpec, + ) { + if (typeof requestor === "string") { + return toFlowRequestor(getRequestor(requestor)); + } + if (!requestors.has(requestor.name)) { + throw flowResourceNotFoundError(name, "requestor", requestor.name); + } + return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow))); + } + const flow = { name, processorId, @@ -239,36 +356,16 @@ export function makeFlow( setParameter(parameterName: string, value: unknown): void { parameters.set(parameterName, value); }, - producerEffect(producerName: string): Effect.Effect, FlowResourceNotFoundError> { - const p = producers.get(producerName); - return p === undefined - ? Effect.fail(flowResourceNotFoundError(name, "producer", producerName)) - : Effect.succeed(p as EffectProducer); - }, + producerEffect, consumerEffect(consumerName: string): Effect.Effect { const c = consumers.get(consumerName); return c === undefined ? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName)) : Effect.succeed(c); }, - requestorEffect( - requestorName: string, - ): Effect.Effect, FlowResourceNotFoundError> { - const rr = requestors.get(requestorName); - return rr === undefined - ? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName)) - : Effect.succeed(rr as EffectRequestResponse); - }, + requestorEffect, parameterEffect, - producer(producerName: string): FlowProducer { - const p = producers.get(producerName); - if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName); - return { - send: (id, message) => compatibilityRuntime.runPromise((p as EffectProducer).send(id, message)), - flush: () => compatibilityRuntime.runPromise(p.flush), - stop: () => compatibilityRuntime.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))), - }; - }, + producer, consumer(consumerName: string): FlowConsumer { const c = consumers.get(consumerName); if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName); @@ -276,20 +373,7 @@ export function makeFlow( stop: () => compatibilityRuntime.runPromise(c.stop), }; }, - requestor(requestorName: string): FlowRequestor { - const rr = requestors.get(requestorName); - if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName); - return { - request: (request, options) => - compatibilityRuntime.runPromise( - (rr as EffectRequestResponse).request( - request, - toEffectRequestOptions(options), - ), - ), - stop: () => compatibilityRuntime.runPromise(rr.stop), - }; - }, + requestor, parameter, }; diff --git a/ts/packages/base/src/services/embeddings-service.ts b/ts/packages/base/src/services/embeddings-service.ts index 0789acd5..be75e400 100644 --- a/ts/packages/base/src/services/embeddings-service.ts +++ b/ts/packages/base/src/services/embeddings-service.ts @@ -31,6 +31,8 @@ export class Embeddings extends Context.Service("embeddings-response"); + const onEmbeddingsRequest = Effect.fn("EmbeddingsService.onRequest")(function* ( msg: EmbeddingsRequest, properties: Record, @@ -41,7 +43,7 @@ const onEmbeddingsRequest = Effect.fn("EmbeddingsService.onRequest")(function* ( return; } - const responseProducer = yield* flowCtx.flow.producerEffect("embeddings-response"); + const responseProducer = yield* flowCtx.flow.producerEffect(EmbeddingsResponseProducer); const embeddings = yield* Embeddings; const response = yield* embeddings.embed(msg.text, msg.model).pipe( Effect.map((vectors) => ({ vectors }) satisfies EmbeddingsResponse), @@ -70,7 +72,7 @@ export const makeEmbeddingsSpecs = (): ReadonlyArray> => [ "embeddings-request", onEmbeddingsRequest, ), - makeProducerSpec("embeddings-response"), + EmbeddingsResponseProducer, makeParameterSpec("model"), ]; diff --git a/ts/packages/base/src/services/llm-service.ts b/ts/packages/base/src/services/llm-service.ts index 555c0159..5d2127b0 100644 --- a/ts/packages/base/src/services/llm-service.ts +++ b/ts/packages/base/src/services/llm-service.ts @@ -124,6 +124,8 @@ const llmErrorResponse = (error: LlmServiceError): TextCompletionResponse => ({ endOfStream: true, }); +const TextCompletionResponseProducer = makeProducerSpec("text-completion-response"); + const sendStreamingResponse = Effect.fn("LlmService.sendStreamingResponse")(function* ( llm: LlmServiceShape, requestId: string, @@ -158,9 +160,7 @@ const onLlmRequest = Effect.fn("LlmService.onRequest")(function* ( const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const responseProducer = yield* flowCtx.flow.producerEffect( - "text-completion-response", - ); + const responseProducer = yield* flowCtx.flow.producerEffect(TextCompletionResponseProducer); const llm = yield* Llm; if (msg.streaming === true && llm.supportsStreaming()) { @@ -210,7 +210,7 @@ export const makeLlmSpecs = (): ReadonlyArray> => [ "text-completion-request", onLlmRequest, ), - makeProducerSpec("text-completion-response"), + TextCompletionResponseProducer, makeParameterSpec("model"), makeParameterSpec("temperature"), ]; diff --git a/ts/packages/base/src/spec/producer-spec.ts b/ts/packages/base/src/spec/producer-spec.ts index 5b6e4a63..f73dfcaa 100644 --- a/ts/packages/base/src/spec/producer-spec.ts +++ b/ts/packages/base/src/spec/producer-spec.ts @@ -8,6 +8,11 @@ import { Effect } from "effect"; import type { Spec } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; import { + flowResourceNotFoundError, + type FlowResourceNotFoundError, +} from "../errors.js"; +import { + type EffectProducer, ProducerFactory, } from "../messaging/runtime.js"; @@ -15,9 +20,41 @@ declare const ProducerSpecType: unique symbol; export interface ProducerSpec extends Spec { readonly [ProducerSpecType]?: (_: T) => T; + readonly producerEffect: ( + flow: Flow, + ) => Effect.Effect, FlowResourceNotFoundError>; } export function makeProducerSpec(name: string): ProducerSpec { + const producers = new WeakMap>(); + + const registerProducer = ( + flow: Flow, + producer: EffectProducer, + ) => + Effect.sync(() => { + producers.set(flow, producer); + }); + + const unregisterProducer = ( + flow: Flow, + producer: EffectProducer, + ) => + Effect.sync(() => { + if (producers.get(flow) === producer) { + producers.delete(flow); + } + }); + + const producerEffect = ( + flow: Flow, + ): Effect.Effect, FlowResourceNotFoundError> => { + const producer = producers.get(flow); + return producer === undefined + ? Effect.fail(flowResourceNotFoundError(flow.name, "producer", name)) + : Effect.succeed(producer); + }; + const addEffect = Effect.fn("ProducerSpec.addEffect")(function* ( flow: Flow, definition: FlowDefinition, @@ -26,10 +63,13 @@ export function makeProducerSpec(name: string): ProducerSpec { const factory = yield* ProducerFactory; const producer = yield* factory.make({ topic }); flow.registerProducer(name, producer); + yield* registerProducer(flow, producer); + yield* Effect.addFinalizer(() => unregisterProducer(flow, producer)); }); return { name, + producerEffect, addEffect, add: (flow, pubsub, definition, context) => flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context), diff --git a/ts/packages/base/src/spec/request-response-spec.ts b/ts/packages/base/src/spec/request-response-spec.ts index c7f78bf7..00d41e8a 100644 --- a/ts/packages/base/src/spec/request-response-spec.ts +++ b/ts/packages/base/src/spec/request-response-spec.ts @@ -11,6 +11,11 @@ import { Effect } from "effect"; import type { Spec } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; import { + flowResourceNotFoundError, + type FlowResourceNotFoundError, +} from "../errors.js"; +import { + type EffectRequestResponse, RequestResponseFactory, } from "../messaging/runtime.js"; @@ -21,6 +26,9 @@ export interface RequestResponseSpec extends Spec { readonly request: TReq; readonly response: TRes; }; + readonly requestorEffect: ( + flow: Flow, + ) => Effect.Effect, FlowResourceNotFoundError>; } export function makeRequestResponseSpec( @@ -28,6 +36,35 @@ export function makeRequestResponseSpec( requestTopicName: string, responseTopicName: string, ): RequestResponseSpec { + const requestors = new WeakMap>(); + + const registerRequestor = ( + flow: Flow, + requestor: EffectRequestResponse, + ) => + Effect.sync(() => { + requestors.set(flow, requestor); + }); + + const unregisterRequestor = ( + flow: Flow, + requestor: EffectRequestResponse, + ) => + Effect.sync(() => { + if (requestors.get(flow) === requestor) { + requestors.delete(flow); + } + }); + + const requestorEffect = ( + flow: Flow, + ): Effect.Effect, FlowResourceNotFoundError> => { + const requestor = requestors.get(flow); + return requestor === undefined + ? Effect.fail(flowResourceNotFoundError(flow.name, "requestor", name)) + : Effect.succeed(requestor); + }; + const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* ( flow: Flow, definition: FlowDefinition, @@ -41,10 +78,13 @@ export function makeRequestResponseSpec( subscription: `${flow.processorId}-${flow.name}-${name}`, }); flow.registerRequestor(name, requestor); + yield* registerRequestor(flow, requestor); + yield* Effect.addFinalizer(() => unregisterRequestor(flow, requestor)); }); return { name, + requestorEffect, addEffect, add: (flow, pubsub, definition, context) => flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context), diff --git a/ts/packages/flow/src/agent/mcp-tool/service.ts b/ts/packages/flow/src/agent/mcp-tool/service.ts index 836788ee..faf88b90 100644 --- a/ts/packages/flow/src/agent/mcp-tool/service.ts +++ b/ts/packages/flow/src/agent/mcp-tool/service.ts @@ -71,6 +71,8 @@ export class McpToolRuntime extends Context.Service< McpToolRuntimeService >()("@trustgraph/flow/agent/mcp-tool/service/McpToolRuntime") {} +const McpToolResponseProducer = makeProducerSpec("mcp-tool-response"); + const mcpToolError = ( operation: string, cause: unknown, @@ -246,7 +248,7 @@ const onMcpToolRequest = Effect.fn("McpToolService.onRequest")(function* ( const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const responseProducer = yield* flowCtx.flow.producerEffect("mcp-tool-response"); + const responseProducer = yield* flowCtx.flow.producerEffect(McpToolResponseProducer); const runtime = yield* McpToolRuntime; const result = yield* parametersFromJson(msg.name, msg.parameters).pipe( @@ -284,7 +286,7 @@ export const makeMcpToolSpecs = (): ReadonlyArray> => [ "mcp-tool-request", onMcpToolRequest, ), - makeProducerSpec("mcp-tool-response"), + McpToolResponseProducer, ]; export const makeMcpToolConfigHandlers = (): ReadonlyArray< diff --git a/ts/packages/flow/src/agent/react/service.ts b/ts/packages/flow/src/agent/react/service.ts index e9bae51a..efdb94c6 100644 --- a/ts/packages/flow/src/agent/react/service.ts +++ b/ts/packages/flow/src/agent/react/service.ts @@ -71,6 +71,33 @@ class AgentToolExecutionError extends S.TaggedErrorClass("agent-response"); +const AgentLlmClient = makeRequestResponseSpec( + "llm", + "text-completion-request", + "text-completion-response", +); +const AgentGraphRagClient = makeRequestResponseSpec( + "graph-rag", + "graph-rag-request", + "graph-rag-response", +); +const AgentDocRagClient = makeRequestResponseSpec( + "doc-rag", + "document-rag-request", + "document-rag-response", +); +const AgentTriplesClient = makeRequestResponseSpec( + "triples", + "triples-request", + "triples-response", +); +const AgentMcpToolClient = makeRequestResponseSpec( + "mcp-tool", + "mcp-tool-request", + "mcp-tool-response", +); + const UnknownRecord = S.Record(S.String, S.Unknown); const ToolArgumentConfig = S.StructWithRest( S.Struct({ @@ -248,10 +275,10 @@ const wireTools = Effect.fn("AgentService.wireTools")(function* ( collection: string | undefined, onExplain: (data: ExplainData) => void, ) { - const graphRag = yield* flowCtx.flow.requestorEffect("graph-rag"); - const docRag = yield* flowCtx.flow.requestorEffect("doc-rag"); - const triples = yield* flowCtx.flow.requestorEffect("triples"); - const mcpTool = yield* flowCtx.flow.requestorEffect("mcp-tool"); + const graphRag = yield* flowCtx.flow.requestorEffect(AgentGraphRagClient); + const docRag = yield* flowCtx.flow.requestorEffect(AgentDocRagClient); + const triples = yield* flowCtx.flow.requestorEffect(AgentTriplesClient); + const mcpTool = yield* flowCtx.flow.requestorEffect(AgentMcpToolClient); return tools.map((tool) => { const rawImplType = tool.config?.type; @@ -300,9 +327,9 @@ const defaultTools = Effect.fn("AgentService.defaultTools")(function* ( collection: string | undefined, onExplain: (data: ExplainData) => void, ) { - const graphRag = yield* flowCtx.flow.requestorEffect("graph-rag"); - const docRag = yield* flowCtx.flow.requestorEffect("doc-rag"); - const triples = yield* flowCtx.flow.requestorEffect("triples"); + const graphRag = yield* flowCtx.flow.requestorEffect(AgentGraphRagClient); + const docRag = yield* flowCtx.flow.requestorEffect(AgentDocRagClient); + const triples = yield* flowCtx.flow.requestorEffect(AgentTriplesClient); return [ createKnowledgeQueryTool( @@ -346,7 +373,7 @@ const onAgentRequest = Effect.fn("AgentService.onRequest")(function* ( const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const responseProducer = yield* flowCtx.flow.producerEffect("agent-response"); + const responseProducer = yield* flowCtx.flow.producerEffect(AgentResponseProducer); yield* Effect.gen(function* () { const runtime = yield* AgentRuntime; @@ -367,10 +394,7 @@ const onAgentRequest = Effect.fn("AgentService.onRequest")(function* ( msg.question, ); - const llmClient = yield* flowCtx.flow.requestorEffect< - TextCompletionRequest, - TextCompletionResponse - >("llm"); + const llmClient = yield* flowCtx.flow.requestorEffect(AgentLlmClient); let conversation = initialPrompt; @@ -472,32 +496,12 @@ export const makeAgentSpecs = (): ReadonlyArray> => [ "agent-request", onAgentRequest, ), - makeProducerSpec("agent-response"), - makeRequestResponseSpec( - "llm", - "text-completion-request", - "text-completion-response", - ), - makeRequestResponseSpec( - "graph-rag", - "graph-rag-request", - "graph-rag-response", - ), - makeRequestResponseSpec( - "doc-rag", - "document-rag-request", - "document-rag-response", - ), - makeRequestResponseSpec( - "triples", - "triples-request", - "triples-response", - ), - makeRequestResponseSpec( - "mcp-tool", - "mcp-tool-request", - "mcp-tool-response", - ), + AgentResponseProducer, + AgentLlmClient, + AgentGraphRagClient, + AgentDocRagClient, + AgentTriplesClient, + AgentMcpToolClient, ]; export const makeAgentConfigHandlers = (): ReadonlyArray< diff --git a/ts/packages/flow/src/chunking/service.ts b/ts/packages/flow/src/chunking/service.ts index 3835b126..13e88d60 100644 --- a/ts/packages/flow/src/chunking/service.ts +++ b/ts/packages/flow/src/chunking/service.ts @@ -34,6 +34,8 @@ const DEFAULT_CHUNK_SIZE = 2000; const DEFAULT_CHUNK_OVERLAP = 100; const ChunkSizeParameter = makeParameterSpec("chunk-size", S.Number); const ChunkOverlapParameter = makeParameterSpec("chunk-overlap", S.Number); +const ChunkOutputProducer = makeProducerSpec("chunk-output"); +const ChunkTriplesProducer = makeProducerSpec("chunk-triples"); const onChunkMessage = Effect.fn("ChunkingService.onMessage")(function* ( msg: TextDocument, @@ -62,7 +64,7 @@ const onChunkMessage = Effect.fn("ChunkingService.onMessage")(function* ( `[ChunkingService] Split document ${msg.documentId} into ${chunks.length} chunks (size=${chunkSize}, overlap=${chunkOverlap})`, ); - const outputProducer = yield* flowCtx.flow.producerEffect("chunk-output"); + const outputProducer = yield* flowCtx.flow.producerEffect(ChunkOutputProducer); yield* Effect.forEach( chunks, @@ -83,8 +85,8 @@ export const makeChunkingSpecs = (): ReadonlyArray< "chunk-input", onChunkMessage, ), - makeProducerSpec("chunk-output"), - makeProducerSpec("chunk-triples"), + ChunkOutputProducer, + ChunkTriplesProducer, ChunkSizeParameter, ChunkOverlapParameter, ]; diff --git a/ts/packages/flow/src/decoding/pdf-decoder.ts b/ts/packages/flow/src/decoding/pdf-decoder.ts index e588549b..e118fcf1 100644 --- a/ts/packages/flow/src/decoding/pdf-decoder.ts +++ b/ts/packages/flow/src/decoding/pdf-decoder.ts @@ -96,6 +96,14 @@ const loadPageText = Effect.fn("loadPageText")(function*( .join(" "); }); +const DecodeOutputProducer = makeProducerSpec("decode-output"); +const DecodeTriplesProducer = makeProducerSpec("decode-triples"); +const LibrarianClient = makeRequestResponseSpec( + "librarian-client", + "librarian-request", + "librarian-response", +); + const onPdfDecodeMessage = Effect.fn("PdfDecoderService.onMessage")(function* ( msg: Document, properties: Record, @@ -107,9 +115,7 @@ const onPdfDecodeMessage = Effect.fn("PdfDecoderService.onMessage")(function* ( const { documentId } = msg; const user = msg.metadata.user; - const librarian = yield* flowCtx.flow.requestorEffect( - "librarian-client", - ); + const librarian = yield* flowCtx.flow.requestorEffect(LibrarianClient); const metadataResp = yield* librarian.request({ operation: "get-document-metadata", @@ -152,8 +158,8 @@ const onPdfDecodeMessage = Effect.fn("PdfDecoderService.onMessage")(function* ( yield* Effect.log(`[PdfDecoder] Document ${documentId}: ${pdf.numPages} pages`); - const outputProducer = yield* flowCtx.flow.producerEffect("decode-output"); - const triplesProducer = yield* flowCtx.flow.producerEffect("decode-triples"); + const outputProducer = yield* flowCtx.flow.producerEffect(DecodeOutputProducer); + const triplesProducer = yield* flowCtx.flow.producerEffect(DecodeTriplesProducer); for (let i = 1; i <= pdf.numPages; i++) { const pageText = yield* loadPageText(documentId, i, pdf); @@ -219,13 +225,9 @@ const onPdfDecodeMessage = Effect.fn("PdfDecoderService.onMessage")(function* ( export const makePdfDecoderSpecs = (): ReadonlyArray> => [ makeConsumerSpec("decode-input", onPdfDecodeMessage), - makeProducerSpec("decode-output"), - makeProducerSpec("decode-triples"), - makeRequestResponseSpec( - "librarian-client", - "librarian-request", - "librarian-response", - ), + DecodeOutputProducer, + DecodeTriplesProducer, + LibrarianClient, ]; export type PdfDecoderService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/extract/knowledge-extract.ts b/ts/packages/flow/src/extract/knowledge-extract.ts index e3f403bd..3604bf47 100644 --- a/ts/packages/flow/src/extract/knowledge-extract.ts +++ b/ts/packages/flow/src/extract/knowledge-extract.ts @@ -69,6 +69,19 @@ type KnowledgeExtractHandlerError = type PromptClient = EffectRequestResponse; type LlmClient = EffectRequestResponse; +const ExtractTriplesProducer = makeProducerSpec("extract-triples"); +const ExtractEntityContextsProducer = makeProducerSpec("extract-entity-contexts"); +const PromptClientSpec = makeRequestResponseSpec( + "prompt-client", + "prompt-request", + "prompt-response", +); +const LlmClientSpec = makeRequestResponseSpec( + "llm-client", + "text-completion-request", + "text-completion-response", +); + const requestPrompt = Effect.fn("KnowledgeExtract.requestPrompt")(function* ( promptClient: PromptClient, name: string, @@ -153,10 +166,10 @@ const onKnowledgeExtractMessage = Effect.fn("KnowledgeExtractService.onMessage") const text = msg.chunk; if (text.trim().length === 0) return; - const promptClient = yield* flowCtx.flow.requestorEffect("prompt-client"); - const llmClient = yield* flowCtx.flow.requestorEffect("llm-client"); - const triplesProducer = yield* flowCtx.flow.producerEffect("extract-triples"); - const entityContextsProducer = yield* flowCtx.flow.producerEffect("extract-entity-contexts"); + const promptClient = yield* flowCtx.flow.requestorEffect(PromptClientSpec); + const llmClient = yield* flowCtx.flow.requestorEffect(LlmClientSpec); + const triplesProducer = yield* flowCtx.flow.producerEffect(ExtractTriplesProducer); + const entityContextsProducer = yield* flowCtx.flow.producerEffect(ExtractEntityContextsProducer); const allTriples: Triple[] = []; const allEntityContexts: EntityContext[] = []; @@ -270,18 +283,10 @@ export const makeKnowledgeExtractSpecs = (): ReadonlyArray> => [ "extract-input", onKnowledgeExtractMessage, ), - makeProducerSpec("extract-triples"), - makeProducerSpec("extract-entity-contexts"), - makeRequestResponseSpec( - "prompt-client", - "prompt-request", - "prompt-response", - ), - makeRequestResponseSpec( - "llm-client", - "text-completion-request", - "text-completion-response", - ), + ExtractTriplesProducer, + ExtractEntityContextsProducer, + PromptClientSpec, + LlmClientSpec, ]; export type KnowledgeExtractService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/prompt/template.ts b/ts/packages/flow/src/prompt/template.ts index 94f2e9e1..abbd764d 100644 --- a/ts/packages/flow/src/prompt/template.ts +++ b/ts/packages/flow/src/prompt/template.ts @@ -69,6 +69,7 @@ const programRuntimes = new WeakMap const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplateRuntime => { const templates = new Map(); const configKey = config.configKey ?? "prompt"; + const PromptResponseProducer = makeProducerSpec("prompt-response"); const onPromptConfig = Effect.fn("PromptTemplateService.onConfig")(function* ( pushedConfig: Record, @@ -114,7 +115,7 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const responseProducer = yield* flowCtx.flow.producerEffect("prompt-response"); + const responseProducer = yield* flowCtx.flow.producerEffect(PromptResponseProducer); const template = templates.get(msg.name); if (template === undefined) { yield* responseProducer.send(requestId, { @@ -142,7 +143,7 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate "prompt-request", onRequest, ), - makeProducerSpec("prompt-response"), + PromptResponseProducer, ], configHandlers: [onPromptConfig], }; diff --git a/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts b/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts index b67bbcd7..075ca9a3 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts @@ -30,6 +30,8 @@ import { type QdrantDocQueryConfig, } from "./qdrant-doc.js"; +const DocumentEmbeddingsResponseProducer = makeProducerSpec("document-embeddings-response"); + const onDocEmbeddingsQueryMessage = Effect.fn("DocEmbeddingsQueryService.onMessage")(function* ( msg: DocumentEmbeddingsRequest, properties: Record, @@ -38,7 +40,7 @@ const onDocEmbeddingsQueryMessage = Effect.fn("DocEmbeddingsQueryService.onMessa const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const producer = yield* flowCtx.flow.producerEffect("document-embeddings-response"); + const producer = yield* flowCtx.flow.producerEffect(DocumentEmbeddingsResponseProducer); const query = yield* QdrantDocEmbeddingsQueryService; const collection = msg.collection ?? "default"; const allChunks: DocumentEmbeddingsResponse["chunks"] = []; @@ -85,7 +87,7 @@ export const makeDocEmbeddingsQuerySpecs = (): ReadonlyArray("document-embeddings-request", onDocEmbeddingsQueryMessage), - makeProducerSpec("document-embeddings-response"), + DocumentEmbeddingsResponseProducer, ]; export type DocEmbeddingsQueryService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts b/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts index 7f7354b0..d60239b3 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts @@ -30,6 +30,8 @@ import { type QdrantGraphQueryConfig, } from "./qdrant-graph.js"; +const GraphEmbeddingsResponseProducer = makeProducerSpec("graph-embeddings-response"); + const onGraphEmbeddingsQueryMessage = Effect.fn("GraphEmbeddingsQueryService.onMessage")(function* ( msg: GraphEmbeddingsRequest, properties: Record, @@ -38,7 +40,7 @@ const onGraphEmbeddingsQueryMessage = Effect.fn("GraphEmbeddingsQueryService.onM const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const producer = yield* flowCtx.flow.producerEffect("graph-embeddings-response"); + const producer = yield* flowCtx.flow.producerEffect(GraphEmbeddingsResponseProducer); const query = yield* QdrantGraphEmbeddingsQueryService; const user = msg.user ?? "default"; const collection = msg.collection ?? "default"; @@ -86,7 +88,7 @@ export const makeGraphEmbeddingsQuerySpecs = (): ReadonlyArray("graph-embeddings-request", onGraphEmbeddingsQueryMessage), - makeProducerSpec("graph-embeddings-response"), + GraphEmbeddingsResponseProducer, ]; export type GraphEmbeddingsQueryService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/query/triples/falkordb-service.ts b/ts/packages/flow/src/query/triples/falkordb-service.ts index 1efbecfc..e94192fd 100644 --- a/ts/packages/flow/src/query/triples/falkordb-service.ts +++ b/ts/packages/flow/src/query/triples/falkordb-service.ts @@ -30,6 +30,8 @@ import { type FalkorDBQueryConfig, } from "./falkordb.js"; +const TriplesResponseProducer = makeProducerSpec("triples-response"); + const onTriplesQueryMessage = Effect.fn("TriplesQueryService.onMessage")(function* ( msg: TriplesQueryRequest, properties: Record, @@ -38,7 +40,7 @@ const onTriplesQueryMessage = Effect.fn("TriplesQueryService.onMessage")(functio const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const producer = yield* flowCtx.flow.producerEffect("triples-response"); + const producer = yield* flowCtx.flow.producerEffect(TriplesResponseProducer); const query = yield* FalkorDBTriplesQueryService; const triples = yield* query.queryTriples( msg.s, @@ -72,7 +74,7 @@ export const makeTriplesQuerySpecs = (): ReadonlyArray("triples-request", onTriplesQueryMessage), - makeProducerSpec("triples-response"), + TriplesResponseProducer, ]; export type TriplesQueryService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/retrieval/document-rag-service.ts b/ts/packages/flow/src/retrieval/document-rag-service.ts index 1bc8d133..973c76c1 100644 --- a/ts/packages/flow/src/retrieval/document-rag-service.ts +++ b/ts/packages/flow/src/retrieval/document-rag-service.ts @@ -40,6 +40,28 @@ import { type DocumentRagClients, } from "./document-rag.js"; +const DocumentRagResponseProducer = makeProducerSpec("document-rag-response"); +const DocumentRagLlmClient = makeRequestResponseSpec( + "llm", + "text-completion-request", + "text-completion-response", +); +const DocumentRagEmbeddingsClient = makeRequestResponseSpec( + "embeddings", + "embeddings-request", + "embeddings-response", +); +const DocumentRagDocEmbeddingsClient = makeRequestResponseSpec( + "doc-embeddings", + "document-embeddings-request", + "document-embeddings-response", +); +const DocumentRagPromptClient = makeRequestResponseSpec( + "prompt", + "prompt-request", + "prompt-response", +); + const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function* ( msg: DocumentRagRequest, properties: Record, @@ -48,14 +70,14 @@ const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function* const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const producer = yield* flowCtx.flow.producerEffect("document-rag-response"); + const producer = yield* flowCtx.flow.producerEffect(DocumentRagResponseProducer); const engine = yield* DocumentRagEngine; const clients: DocumentRagClients = { - llm: yield* flowCtx.flow.requestorEffect("llm"), - embeddings: yield* flowCtx.flow.requestorEffect("embeddings"), - docEmbeddings: yield* flowCtx.flow.requestorEffect("doc-embeddings"), - prompt: yield* flowCtx.flow.requestorEffect("prompt"), + llm: yield* flowCtx.flow.requestorEffect(DocumentRagLlmClient), + embeddings: yield* flowCtx.flow.requestorEffect(DocumentRagEmbeddingsClient), + docEmbeddings: yield* flowCtx.flow.requestorEffect(DocumentRagDocEmbeddingsClient), + prompt: yield* flowCtx.flow.requestorEffect(DocumentRagPromptClient), }; const response = yield* engine.query( @@ -90,27 +112,11 @@ export const makeDocumentRagSpecs = (): ReadonlyArray> = "document-rag-request", onDocumentRagRequest, ), - makeProducerSpec("document-rag-response"), - makeRequestResponseSpec( - "llm", - "text-completion-request", - "text-completion-response", - ), - makeRequestResponseSpec( - "embeddings", - "embeddings-request", - "embeddings-response", - ), - makeRequestResponseSpec( - "doc-embeddings", - "document-embeddings-request", - "document-embeddings-response", - ), - makeRequestResponseSpec( - "prompt", - "prompt-request", - "prompt-response", - ), + DocumentRagResponseProducer, + DocumentRagLlmClient, + DocumentRagEmbeddingsClient, + DocumentRagDocEmbeddingsClient, + DocumentRagPromptClient, ]; export type DocumentRagService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/retrieval/graph-rag-service.ts b/ts/packages/flow/src/retrieval/graph-rag-service.ts index f9d3a21f..c3d10158 100644 --- a/ts/packages/flow/src/retrieval/graph-rag-service.ts +++ b/ts/packages/flow/src/retrieval/graph-rag-service.ts @@ -43,6 +43,33 @@ import { type GraphRagConfig, } from "./graph-rag.js"; +const GraphRagResponseProducer = makeProducerSpec("graph-rag-response"); +const GraphRagLlmClient = makeRequestResponseSpec( + "llm", + "text-completion-request", + "text-completion-response", +); +const GraphRagEmbeddingsClient = makeRequestResponseSpec( + "embeddings", + "embeddings-request", + "embeddings-response", +); +const GraphRagGraphEmbeddingsClient = makeRequestResponseSpec( + "graph-embeddings", + "graph-embeddings-request", + "graph-embeddings-response", +); +const GraphRagTriplesClient = makeRequestResponseSpec( + "triples", + "triples-request", + "triples-response", +); +const GraphRagPromptClient = makeRequestResponseSpec( + "prompt", + "prompt-request", + "prompt-response", +); + const graphRagConfigFromRequest = (msg: GraphRagRequest): GraphRagConfig => ({ ...(msg.entityLimit !== undefined ? { entityLimit: msg.entityLimit } : {}), ...(msg.tripleLimit !== undefined ? { tripleLimit: msg.tripleLimit } : {}), @@ -58,17 +85,17 @@ const onGraphRagRequest = Effect.fn("GraphRagService.onRequest")(function* ( const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const producer = yield* flowCtx.flow.producerEffect("graph-rag-response"); + const producer = yield* flowCtx.flow.producerEffect(GraphRagResponseProducer); const engine = yield* GraphRagEngine; yield* Effect.log(`[GraphRagService] Received request ${requestId}: "${msg.query?.slice(0, 60)}..." collection=${msg.collection}`); const clients: GraphRagClients = { - llm: yield* flowCtx.flow.requestorEffect("llm"), - embeddings: yield* flowCtx.flow.requestorEffect("embeddings"), - graphEmbeddings: yield* flowCtx.flow.requestorEffect("graph-embeddings"), - triples: yield* flowCtx.flow.requestorEffect("triples"), - prompt: yield* flowCtx.flow.requestorEffect("prompt"), + llm: yield* flowCtx.flow.requestorEffect(GraphRagLlmClient), + embeddings: yield* flowCtx.flow.requestorEffect(GraphRagEmbeddingsClient), + graphEmbeddings: yield* flowCtx.flow.requestorEffect(GraphRagGraphEmbeddingsClient), + triples: yield* flowCtx.flow.requestorEffect(GraphRagTriplesClient), + prompt: yield* flowCtx.flow.requestorEffect(GraphRagPromptClient), }; const result = yield* engine.query( @@ -118,32 +145,12 @@ export const makeGraphRagSpecs = (): ReadonlyArray> => [ "graph-rag-request", onGraphRagRequest, ), - makeProducerSpec("graph-rag-response"), - makeRequestResponseSpec( - "llm", - "text-completion-request", - "text-completion-response", - ), - makeRequestResponseSpec( - "embeddings", - "embeddings-request", - "embeddings-response", - ), - makeRequestResponseSpec( - "graph-embeddings", - "graph-embeddings-request", - "graph-embeddings-response", - ), - makeRequestResponseSpec( - "triples", - "triples-request", - "triples-response", - ), - makeRequestResponseSpec( - "prompt", - "prompt-request", - "prompt-response", - ), + GraphRagResponseProducer, + GraphRagLlmClient, + GraphRagEmbeddingsClient, + GraphRagGraphEmbeddingsClient, + GraphRagTriplesClient, + GraphRagPromptClient, ]; export type GraphRagService = FlowProcessorRuntime; diff --git a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts index ae5a24f1..cd8a2043 100644 --- a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts +++ b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts @@ -42,6 +42,12 @@ type GraphEmbeddingsStoreError = | MessagingTimeoutError | QdrantGraphEmbeddingsStoreError; +const EmbeddingsClient = makeRequestResponseSpec( + "embeddings-client", + "embeddings-request", + "embeddings-response", +); + const onGraphEmbeddingsStoreMessage = Effect.fn("GraphEmbeddingsStoreService.onMessage")(function* ( msg: EntityContexts, _properties: Record, @@ -49,8 +55,7 @@ const onGraphEmbeddingsStoreMessage = Effect.fn("GraphEmbeddingsStoreService.onM ): Effect.fn.Return { if (msg.entities.length === 0) return; - const embeddingsClient = - yield* flowCtx.flow.requestorEffect("embeddings-client"); + const embeddingsClient = yield* flowCtx.flow.requestorEffect(EmbeddingsClient); const user = msg.metadata?.user ?? "default"; const collection = msg.metadata?.collection ?? "default"; @@ -83,11 +88,7 @@ export const makeGraphEmbeddingsStoreSpecs = (): ReadonlyArray( - "embeddings-client", - "embeddings-request", - "embeddings-response", - ), + EmbeddingsClient, ]; export type GraphEmbeddingsStoreService = FlowProcessorRuntime;