diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 39941eb7..6fcd7a73 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,16 +12,16 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 config service -runtime slice: +Current signal counts from `ts/packages` after the 2026-06-02 RAG and agent +requestor bridge slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 207 | +| `Effect.runPromise` | 198 | | `Map<` | 65 | | `WebSocket` | 51 | | `new Map` | 47 | -| `toPromiseRequestor` | 19 | +| `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | | `while (` | 12 | @@ -133,6 +133,36 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: RAG And Agent Requestor Bridge Slice + +- Status: migrated, root-verified, and ready to commit. +- Completed: + - `ts/packages/flow/src/retrieval/graph-rag.ts` and + `ts/packages/flow/src/retrieval/document-rag.ts` now accept + `EffectRequestResponse` clients directly. The engines no longer adapt + Effect requestors back to Promise requestors and then wrap those calls in + `Effect.tryPromise`. + - `ts/packages/flow/src/retrieval/graph-rag-service.ts` and + `ts/packages/flow/src/retrieval/document-rag-service.ts` now pass native + flow requestors directly into the engines. + - `ts/packages/flow/src/agent/react/tools.ts` now accepts + `EffectRequestResponse` clients directly for graph RAG, document RAG, + triples, and MCP tool calls. Tool input narrowing uses Schema and + `effect/Predicate` rather than local request/response type assertions. + - `ts/packages/flow/src/agent/react/service.ts` wires default and configured + tools with native Effect requestors instead of `toPromiseRequestor`. + - Graph RAG, document RAG, and agent service startup now expose `runMain()` + through `NodeRuntime.runMain`; their legacy `run()` Promise facades use + `ManagedRuntime`. + - `ts/scripts/run-graph-rag.ts`, `ts/scripts/run-document-rag.ts`, and + `ts/scripts/run-agent.ts` now delegate to `runMain()`. +- Verification: + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -163,8 +193,8 @@ Notes: - WebSocket adapter shims still contain host-boundary `try`/`catch` and normal `Error` construction. - RAG/providers/storage: - - RAG and agent helpers still adapt Effect requestors back to Promise - requestors. + - RAG and agent requestor bridges are complete: `toPromiseRequestor` has no + remaining `ts/packages` matches. - Provider SDKs and storage clients should become managed resources where they have meaningful lifecycle. - FalkorDB/Qdrant/Ollama/OpenAI-compatible surfaces still need config, @@ -194,24 +224,6 @@ Notes: - Tests: - Service-specific tests plus `cd ts && bun run --cwd packages/flow test`. -### P0: Remove RAG And Agent `toPromiseRequestor` Bridges - -- TrustGraph evidence: - - `ts/packages/flow/src/retrieval/document-rag-service.ts` - - `ts/packages/flow/src/retrieval/graph-rag-service.ts` - - `ts/packages/flow/src/agent/react/service.ts` -- Effect primitives: - - Existing `EffectRequestResponse`, `Effect.fn`, `Stream`, - `Effect.runPromiseWith` at true external boundaries only. -- Rewrite shape: - - Update engines and agent helpers to accept Effect requestors or - Effect-returning functions directly. - - Preserve Promise wrappers only for old public APIs or tests that explicitly - verify compatibility. -- Tests: - - Existing RAG and agent service tests. - - Add tests that assert requestor errors stay typed through the Effect path. - ### P1: Finish Client RPC Boundary Modernization - TrustGraph evidence: @@ -302,13 +314,12 @@ Notes: ## Recommended PR Order -1. RAG and agent requestor bridge removal. -2. Librarian, cores, or flow-manager scoped state migration. -3. Client RPC managed runtime/scoped layer cleanup. -4. Base processor registry and constructor shim redesign. -5. Gateway RPC callback and client streaming completion cleanup. -6. Storage/provider managed resource cleanup. -7. MCP parity/deletion decision and workbench platform polish. +1. Librarian, cores, or flow-manager scoped state migration. +2. Client RPC managed runtime/scoped layer cleanup. +3. Base processor registry and constructor shim redesign. +4. Gateway RPC callback and client streaming completion cleanup. +5. Storage/provider managed resource cleanup. +6. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/retrieval-rag.test.ts b/ts/packages/flow/src/__tests__/retrieval-rag.test.ts index d3384399..5baec87c 100644 --- a/ts/packages/flow/src/__tests__/retrieval-rag.test.ts +++ b/ts/packages/flow/src/__tests__/retrieval-rag.test.ts @@ -5,7 +5,7 @@ import type { DocumentEmbeddingsResponse, EmbeddingsRequest, EmbeddingsResponse, - FlowRequestor, + EffectRequestResponse, GraphEmbeddingsRequest, GraphEmbeddingsResponse, PromptRequest, @@ -19,10 +19,10 @@ import { makeDocumentRagEngine, type DocumentRagClients } from "../retrieval/doc import { makeGraphRagEngine, type GraphRagClients } from "../retrieval/graph-rag.js"; const requestor = ( - handler: (request: TReq) => TRes | Promise, -): FlowRequestor => ({ - request: async (request) => handler(request), - stop: async () => undefined, + handler: (request: TReq) => TRes, +): EffectRequestResponse => ({ + request: (request) => Effect.succeed(handler(request)), + stop: Effect.void, }); describe("RAG engines", () => { diff --git a/ts/packages/flow/src/agent/react/service.ts b/ts/packages/flow/src/agent/react/service.ts index f6638515..e9bae51a 100644 --- a/ts/packages/flow/src/agent/react/service.ts +++ b/ts/packages/flow/src/agent/react/service.ts @@ -16,6 +16,9 @@ * Python reference: trustgraph-flow/trustgraph/agent/react/service.py */ +import { + NodeRuntime, +} from "@effect/platform-node"; import { makeFlowProcessor, makeConsumerSpec, @@ -39,16 +42,13 @@ import { type ToolRequest, type ToolResponse, type EffectConfigHandler, - type EffectRequestOptions, - type EffectRequestResponse, - type FlowRequestOptions, - type FlowRequestor, type FlowResourceNotFoundError, type MessagingDeliveryError, type Spec, } from "@trustgraph/base"; -import { Context, Effect, Layer, Ref } from "effect"; +import {Context, Effect, Layer, ManagedRuntime, Ref} from "effect"; import * as O from "effect/Option"; +import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; import { @@ -106,29 +106,6 @@ export class AgentRuntime extends Context.Service( - options: FlowRequestOptions | undefined, -): EffectRequestOptions | undefined => { - if (options === undefined) return undefined; - return { - ...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }), - ...(options.recipient === undefined - ? {} - : { - recipient: (response: TRes) => - Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)), - }), - }; -}; - -const toPromiseRequestor = ( - requestor: EffectRequestResponse, -): FlowRequestor => ({ - request: (request, options) => - Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))), - stop: () => Effect.runPromise(requestor.stop), -}); - const buildConfiguredTool = ( toolId: string, data: ToolConfigEntry, @@ -137,7 +114,7 @@ const buildConfiguredTool = ( const implType = data.type ?? ""; const name = data.name ?? ""; const description = data.description ?? ""; - const config = { ...data } as Record; + const config: Record = { ...data }; if (name.length === 0) { yield* Effect.logWarning(`[AgentService] Skipping tool with no name: ${toolId}`); @@ -277,12 +254,13 @@ const wireTools = Effect.fn("AgentService.wireTools")(function* ( const mcpTool = yield* flowCtx.flow.requestorEffect("mcp-tool"); return tools.map((tool) => { - const implType = tool.config?.type as string | undefined; + const rawImplType = tool.config?.type; + const implType = Predicate.isString(rawImplType) ? rawImplType : undefined; switch (implType) { case "knowledge-query": { const live = createKnowledgeQueryTool( - toPromiseRequestor(graphRag), + graphRag, collection, onExplain, ); @@ -290,21 +268,21 @@ const wireTools = Effect.fn("AgentService.wireTools")(function* ( } case "document-query": { const live = createDocumentQueryTool( - toPromiseRequestor(docRag), + docRag, collection, ); return { ...tool, execute: live.execute }; } case "triples-query": { const live = createTriplesQueryTool( - toPromiseRequestor(triples), + triples, collection, ); return { ...tool, execute: live.execute }; } case "mcp-tool": { const live = createMcpTool( - toPromiseRequestor(mcpTool), + mcpTool, tool.name, tool.description, tool.args, @@ -328,16 +306,16 @@ const defaultTools = Effect.fn("AgentService.defaultTools")(function* ( return [ createKnowledgeQueryTool( - toPromiseRequestor(graphRag), + graphRag, collection, onExplain, ), createDocumentQueryTool( - toPromiseRequestor(docRag), + docRag, collection, ), createTriplesQueryTool( - toPromiseRequestor(triples), + triples, collection, ), ]; @@ -433,7 +411,7 @@ const onAgentRequest = Effect.fn("AgentService.onRequest")(function* ( content: "", explain_id: explain.explainId, explain_triples: explain.triples, - } as AgentResponse); + }); } yield* responseProducer.send(requestId, { @@ -630,6 +608,12 @@ export const program = makeFlowProcessorProgram AgentRuntimeLive, }); +const agentRuntime = ManagedRuntime.make(Layer.empty); + export function run(): Promise { - return Effect.runPromise(program); + return agentRuntime.runPromise(program); +} + +export function runMain(): void { + NodeRuntime.runMain(program); } diff --git a/ts/packages/flow/src/agent/react/tools.ts b/ts/packages/flow/src/agent/react/tools.ts index e2d877d5..8a0c1eef 100644 --- a/ts/packages/flow/src/agent/react/tools.ts +++ b/ts/packages/flow/src/agent/react/tools.ts @@ -6,7 +6,7 @@ */ import type { - FlowRequestor, + EffectRequestResponse, GraphRagRequest, GraphRagResponse, DocumentRagRequest, @@ -18,13 +18,16 @@ import type { Term, Triple, } from "@trustgraph/base"; +import {Term as TermSchema} from "@trustgraph/base"; import { Effect } from "effect"; import * as O from "effect/Option"; +import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; import type { AgentTool, ToolArg } from "./types.js"; const decodeJsonUnknown = S.decodeUnknownOption(S.UnknownFromJsonString); +const decodeTerm = S.decodeUnknownOption(TermSchema); /** * Format a Term to a human-readable string. @@ -71,7 +74,7 @@ export interface ExplainData { * Query the knowledge graph for information about entities and their relationships. */ export function createKnowledgeQueryTool( - client: FlowRequestor, + client: EffectRequestResponse, collection?: string, onExplain?: (data: ExplainData) => void, ): AgentTool { @@ -93,19 +96,14 @@ export function createKnowledgeQueryTool( query: question, ...(collection !== undefined ? { collection } : {}), }; - const res = yield* Effect.tryPromise(() => client.request(request)); + const res = yield* client.request(request); yield* Effect.log(`[KnowledgeQuery] Response (${res.response?.length ?? 0} chars): ${res.error !== undefined ? `ERROR: ${res.error.message}` : `${res.response?.slice(0, 300)}...`}`); - // Extract explain data if embedded in the response - const rawRes = res as Record; - if ( - rawRes.message_type === "explain" && - rawRes.explain_triples !== undefined && - onExplain !== undefined - ) { + const explainTriples = res.explain_triples; + if (res.message_type === "explain" && explainTriples !== undefined && onExplain !== undefined) { yield* Effect.sync(() => onExplain({ - explainId: (rawRes.explain_id as string) ?? "", - triples: rawRes.explain_triples as Triple[], + explainId: res.explain_id ?? "", + triples: Array.from(explainTriples), })); } @@ -119,7 +117,7 @@ export function createKnowledgeQueryTool( * Search documents for relevant information. */ export function createDocumentQueryTool( - client: FlowRequestor, + client: EffectRequestResponse, collection?: string, ): AgentTool { return { @@ -139,13 +137,24 @@ export function createDocumentQueryTool( query: question, ...(collection !== undefined ? { collection } : {}), }; - const res = yield* Effect.tryPromise(() => client.request(request)); + const res = yield* client.request(request); if (res.error !== undefined) return `Error: ${res.error.message}`; return res.response; })), }; } +const objectProperty = (value: object, key: string): unknown => + Predicate.hasProperty(value, key) ? value[key] : undefined; + +const termFromUnknown = (value: unknown): Term | undefined => { + if (Predicate.isString(value)) { + return { type: "LITERAL", value }; + } + const decoded = decodeTerm(value); + return O.isSome(decoded) ? decoded.value : undefined; +}; + /** * Parse triples query input. Accepts JSON with optional s, p, o fields. */ @@ -166,30 +175,21 @@ function parseTriplesInput(input: string): { }; } - const parsed = decoded.value as Record; - const toTerm = (val: unknown): Term | undefined => { - if (typeof val === "string") { - return { type: "LITERAL", value: val }; - } - if (typeof val === "object" && val !== null && "type" in val) { - return val as Term; - } - return undefined; - }; - const result: { s?: Term; p?: Term; o?: Term; limit?: number; } = {}; - const s = toTerm(parsed.subject ?? parsed.s); - const p = toTerm(parsed.predicate ?? parsed.p); - const o = toTerm(parsed.object ?? parsed.o); + const parsed = decoded.value; + const s = termFromUnknown(objectProperty(parsed, "subject") ?? objectProperty(parsed, "s")); + const p = termFromUnknown(objectProperty(parsed, "predicate") ?? objectProperty(parsed, "p")); + const o = termFromUnknown(objectProperty(parsed, "object") ?? objectProperty(parsed, "o")); + const limit = objectProperty(parsed, "limit"); if (s !== undefined) result.s = s; if (p !== undefined) result.p = p; if (o !== undefined) result.o = o; - if (typeof parsed.limit === "number") result.limit = parsed.limit; + if (Predicate.isNumber(limit)) result.limit = limit; return result; } @@ -197,7 +197,7 @@ function parseTriplesInput(input: string): { * Query for specific triples (subject-predicate-object relationships) in the knowledge graph. */ export function createTriplesQueryTool( - client: FlowRequestor, + client: EffectRequestResponse, collection?: string, ): AgentTool { return { @@ -231,7 +231,7 @@ export function createTriplesQueryTool( ...(o !== undefined ? { o } : {}), ...(collection !== undefined ? { collection } : {}), }; - const res = yield* Effect.tryPromise(() => client.request(request)); + const res = yield* client.request(request); if (res.error !== undefined) return `Error: ${res.error.message}`; @@ -255,7 +255,7 @@ export function createTriplesQueryTool( * this function just wraps it as an AgentTool the ReAct agent can invoke. */ export function createMcpTool( - client: FlowRequestor, + client: EffectRequestResponse, toolName: string, description: string, args: ToolArg[], @@ -265,7 +265,7 @@ export function createMcpTool( description, args, execute: (input: string): Promise => Effect.runPromise(Effect.gen(function* () { - const res = yield* Effect.tryPromise(() => client.request({ name: toolName, parameters: input })); + const res = yield* client.request({ name: toolName, parameters: input }); if (res.error !== undefined) return `Error: ${res.error.message}`; if (res.text !== undefined) return res.text; if (res.object !== undefined) return res.object; diff --git a/ts/packages/flow/src/retrieval/document-rag-service.ts b/ts/packages/flow/src/retrieval/document-rag-service.ts index ddb3bc27..1bc8d133 100644 --- a/ts/packages/flow/src/retrieval/document-rag-service.ts +++ b/ts/packages/flow/src/retrieval/document-rag-service.ts @@ -7,6 +7,7 @@ * Python reference: trustgraph-flow/trustgraph/retrieval/document_rag/ */ +import {NodeRuntime} from "@effect/platform-node"; import { makeConsumerSpec, makeFlowProcessor, @@ -17,14 +18,10 @@ import { type DocumentEmbeddingsResponse, type DocumentRagRequest, type DocumentRagResponse, - type EffectRequestOptions, - type EffectRequestResponse, type EmbeddingsRequest, type EmbeddingsResponse, type FlowContext, type FlowProcessorRuntime, - type FlowRequestOptions, - type FlowRequestor, type FlowResourceNotFoundError, type MessagingDeliveryError, type ProcessorConfig, @@ -34,7 +31,7 @@ import { type TextCompletionRequest, type TextCompletionResponse, } from "@trustgraph/base"; -import { Effect } from "effect"; +import {Effect, Layer, ManagedRuntime} from "effect"; import { DocumentRagEngine, DocumentRagEngineError, @@ -43,29 +40,6 @@ import { type DocumentRagClients, } from "./document-rag.js"; -const toEffectRequestOptions = ( - options: FlowRequestOptions | undefined, -): EffectRequestOptions | undefined => { - if (options === undefined) return undefined; - return { - ...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }), - ...(options.recipient === undefined - ? {} - : { - recipient: (response: TRes) => - Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)), - }), - }; -}; - -const toPromiseRequestor = ( - requestor: EffectRequestResponse, -): FlowRequestor => ({ - request: (request, options) => - Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))), - stop: () => Effect.runPromise(requestor.stop), -}); - const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function* ( msg: DocumentRagRequest, properties: Record, @@ -78,12 +52,10 @@ const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function* const engine = yield* DocumentRagEngine; const clients: DocumentRagClients = { - llm: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("llm")), - embeddings: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("embeddings")), - docEmbeddings: toPromiseRequestor( - yield* flowCtx.flow.requestorEffect("doc-embeddings"), - ), - prompt: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("prompt")), + llm: yield* flowCtx.flow.requestorEffect("llm"), + embeddings: yield* flowCtx.flow.requestorEffect("embeddings"), + docEmbeddings: yield* flowCtx.flow.requestorEffect("doc-embeddings"), + prompt: yield* flowCtx.flow.requestorEffect("prompt"), }; const response = yield* engine.query( @@ -161,6 +133,12 @@ export const program = makeFlowProcessorProgram({ layer: () => DocumentRagLive, }); +const documentRagRuntime = ManagedRuntime.make(Layer.empty); + export function run(): Promise { - return Effect.runPromise(program); + return documentRagRuntime.runPromise(program); +} + +export function runMain(): void { + NodeRuntime.runMain(program); } diff --git a/ts/packages/flow/src/retrieval/document-rag.ts b/ts/packages/flow/src/retrieval/document-rag.ts index d614032f..409e38c8 100644 --- a/ts/packages/flow/src/retrieval/document-rag.ts +++ b/ts/packages/flow/src/retrieval/document-rag.ts @@ -9,7 +9,7 @@ import type { DocumentEmbeddingsResponse, EmbeddingsRequest, EmbeddingsResponse, - FlowRequestor, + EffectRequestResponse, PromptRequest, PromptResponse, TextCompletionRequest, @@ -20,10 +20,10 @@ import { Context, Effect, Layer } from "effect"; import * as S from "effect/Schema"; export interface DocumentRagClients { - llm: FlowRequestor; - embeddings: FlowRequestor; - docEmbeddings: FlowRequestor; - prompt: FlowRequestor; + llm: EffectRequestResponse; + embeddings: EffectRequestResponse; + docEmbeddings: EffectRequestResponse; + prompt: EffectRequestResponse; } export type ChunkCallback = (text: string, endOfStream: boolean) => Promise; @@ -101,21 +101,19 @@ function queryDocumentRag( return Effect.gen(function* () { const collection = options?.collection ?? "default"; - const embResp = yield* Effect.tryPromise({ - try: () => clients.embeddings.request({ text: [queryText] }), - catch: (cause) => documentRagError("embeddings", cause), - }); + const embResp = yield* clients.embeddings.request({ text: [queryText] }).pipe( + Effect.mapError((cause) => documentRagError("embeddings", cause)), + ); const vectors = embResp.vectors; - const docResp = yield* Effect.tryPromise({ - try: () => clients.docEmbeddings.request({ + const docResp = yield* clients.docEmbeddings.request({ vectors, limit: 10, collection, user: "default", - }), - catch: (cause) => documentRagError("document-embeddings", cause), - }); + }).pipe( + Effect.mapError((cause) => documentRagError("document-embeddings", cause)), + ); const chunks = docResp.chunks ?? []; yield* Effect.log(`[DocumentRag] Found ${chunks.length} matching chunks`); @@ -125,21 +123,19 @@ function queryDocumentRag( ) .join("\n\n---\n\n"); - const promptResp = yield* Effect.tryPromise({ - try: () => clients.prompt.request({ + const promptResp = yield* clients.prompt.request({ name: "document-rag-synthesize", variables: { query: queryText, context }, - }), - catch: (cause) => documentRagError("prompt", cause), - }); + }).pipe( + Effect.mapError((cause) => documentRagError("prompt", cause)), + ); - const resp = yield* Effect.tryPromise({ - try: () => clients.llm.request({ + const resp = yield* clients.llm.request({ system: promptResp.system, prompt: promptResp.prompt, - }), - catch: (cause) => documentRagError("llm", cause), - }); + }).pipe( + Effect.mapError((cause) => documentRagError("llm", cause)), + ); return resp.response; }); diff --git a/ts/packages/flow/src/retrieval/graph-rag-service.ts b/ts/packages/flow/src/retrieval/graph-rag-service.ts index b1d72285..f9d3a21f 100644 --- a/ts/packages/flow/src/retrieval/graph-rag-service.ts +++ b/ts/packages/flow/src/retrieval/graph-rag-service.ts @@ -7,18 +7,15 @@ * Python reference: trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py */ +import {NodeRuntime} from "@effect/platform-node"; import { makeConsumerSpec, makeFlowProcessor, makeProducerSpec, makeRequestResponseSpec, makeFlowProcessorProgram, - type EffectRequestOptions, - type EffectRequestResponse, type FlowContext, type FlowProcessorRuntime, - type FlowRequestOptions, - type FlowRequestor, type FlowResourceNotFoundError, type GraphEmbeddingsRequest, type GraphEmbeddingsResponse, @@ -36,7 +33,7 @@ import { type TriplesQueryRequest, type TriplesQueryResponse, } from "@trustgraph/base"; -import { Effect } from "effect"; +import {Effect, Layer, ManagedRuntime} from "effect"; import { GraphRagEngine, GraphRagEngineError, @@ -46,29 +43,6 @@ import { type GraphRagConfig, } from "./graph-rag.js"; -const toEffectRequestOptions = ( - options: FlowRequestOptions | undefined, -): EffectRequestOptions | undefined => { - if (options === undefined) return undefined; - return { - ...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }), - ...(options.recipient === undefined - ? {} - : { - recipient: (response: TRes) => - Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)), - }), - }; -}; - -const toPromiseRequestor = ( - requestor: EffectRequestResponse, -): FlowRequestor => ({ - request: (request, options) => - Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))), - stop: () => Effect.runPromise(requestor.stop), -}); - const graphRagConfigFromRequest = (msg: GraphRagRequest): GraphRagConfig => ({ ...(msg.entityLimit !== undefined ? { entityLimit: msg.entityLimit } : {}), ...(msg.tripleLimit !== undefined ? { tripleLimit: msg.tripleLimit } : {}), @@ -90,13 +64,11 @@ const onGraphRagRequest = Effect.fn("GraphRagService.onRequest")(function* ( yield* Effect.log(`[GraphRagService] Received request ${requestId}: "${msg.query?.slice(0, 60)}..." collection=${msg.collection}`); const clients: GraphRagClients = { - llm: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("llm")), - embeddings: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("embeddings")), - graphEmbeddings: toPromiseRequestor( - yield* flowCtx.flow.requestorEffect("graph-embeddings"), - ), - triples: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("triples")), - prompt: toPromiseRequestor(yield* flowCtx.flow.requestorEffect("prompt")), + llm: yield* flowCtx.flow.requestorEffect("llm"), + embeddings: yield* flowCtx.flow.requestorEffect("embeddings"), + graphEmbeddings: yield* flowCtx.flow.requestorEffect("graph-embeddings"), + triples: yield* flowCtx.flow.requestorEffect("triples"), + prompt: yield* flowCtx.flow.requestorEffect("prompt"), }; const result = yield* engine.query( @@ -125,16 +97,18 @@ const onGraphRagRequest = Effect.fn("GraphRagService.onRequest")(function* ( if (result === undefined) return; - const response: GraphRagResponse = { - response: result.answer, - endOfStream: true, - }; - - if (result.subgraph.length > 0) { - (response as Record).message_type = "explain"; - (response as Record).explain_id = `explain-${requestId}`; - (response as Record).explain_triples = result.subgraph; - } + const response: GraphRagResponse = result.subgraph.length === 0 + ? { + response: result.answer, + endOfStream: true, + } + : { + response: result.answer, + endOfStream: true, + message_type: "explain", + explain_id: `explain-${requestId}`, + explain_triples: result.subgraph, + }; yield* producer.send(requestId, response); }); @@ -192,6 +166,12 @@ export const program = makeFlowProcessorProgram({ layer: () => GraphRagLive, }); +const graphRagRuntime = ManagedRuntime.make(Layer.empty); + export function run(): Promise { - return Effect.runPromise(program); + return graphRagRuntime.runPromise(program); +} + +export function runMain(): void { + NodeRuntime.runMain(program); } diff --git a/ts/packages/flow/src/retrieval/graph-rag.ts b/ts/packages/flow/src/retrieval/graph-rag.ts index a56c8fcd..3eaff23b 100644 --- a/ts/packages/flow/src/retrieval/graph-rag.ts +++ b/ts/packages/flow/src/retrieval/graph-rag.ts @@ -7,7 +7,8 @@ import type { EmbeddingsRequest, EmbeddingsResponse, - FlowRequestor, + EffectRequestOptions, + EffectRequestResponse, GraphEmbeddingsRequest, GraphEmbeddingsResponse, PromptRequest, @@ -34,11 +35,11 @@ export interface GraphRagConfig { } export interface GraphRagClients { - llm: FlowRequestor; - embeddings: FlowRequestor; - graphEmbeddings: FlowRequestor; - triples: FlowRequestor; - prompt: FlowRequestor; + llm: EffectRequestResponse; + embeddings: EffectRequestResponse; + graphEmbeddings: EffectRequestResponse; + triples: EffectRequestResponse; + prompt: EffectRequestResponse; } export type ChunkCallback = (text: string, endOfStream: boolean) => Promise; @@ -92,6 +93,16 @@ const graphRagError = (operation: string, cause: unknown) => message: errorMessage(cause), }); +const requestClient = ( + requestor: EffectRequestResponse, + operation: string, + request: TReq, + options?: EffectRequestOptions, +): Effect.Effect => + requestor.request(request, options).pipe( + Effect.mapError((cause) => graphRagError(operation, cause)), + ); + export function normalizeGraphRagConfig(config: GraphRagConfig = {}): NormalizedGraphRagConfig { return { entityLimit: config.entityLimit ?? 50, @@ -178,21 +189,23 @@ function queryGraphRag( function extractConcepts(clients: GraphRagClients, query: string): Effect.Effect { return Effect.gen(function* () { - const promptResp = yield* Effect.tryPromise({ - try: () => clients.prompt.request({ + const promptResp = yield* requestClient( + clients.prompt, + "extract-concepts-prompt", + { name: "extract-concepts", variables: { query }, - }), - catch: (cause) => graphRagError("extract-concepts-prompt", cause), - }); + }, + ); - const llmResp = yield* Effect.tryPromise({ - try: () => clients.llm.request({ + const llmResp = yield* requestClient( + clients.llm, + "extract-concepts-llm", + { system: promptResp.system, prompt: promptResp.prompt, - }), - catch: (cause) => graphRagError("extract-concepts-llm", cause), - }); + }, + ); return llmResp.response .split("\n") @@ -203,10 +216,7 @@ function extractConcepts(clients: GraphRagClients, query: string): Effect.Effect function getVectors(clients: GraphRagClients, concepts: string[]): Effect.Effect { return Effect.gen(function* () { - const resp = yield* Effect.tryPromise({ - try: () => clients.embeddings.request({ text: concepts }), - catch: (cause) => graphRagError("get-vectors", cause), - }); + const resp = yield* requestClient(clients.embeddings, "get-vectors", { text: concepts }); return resp.vectors; }); } @@ -218,15 +228,16 @@ function getEntities( collection?: string, ): Effect.Effect { return Effect.gen(function* () { - const resp = yield* Effect.tryPromise({ - try: () => clients.graphEmbeddings.request({ + const resp = yield* requestClient( + clients.graphEmbeddings, + "get-entities", + { vectors, user: "default", collection: collection ?? "default", limit: config.entityLimit, - }), - catch: (cause) => graphRagError("get-entities", cause), - }); + }, + ); return resp.entities; }); } @@ -259,10 +270,7 @@ function followEdges( limit: config.tripleLimit, ...(collection !== undefined ? { collection } : {}), }; - return Effect.tryPromise({ - try: () => clients.triples.request(request), - catch: (cause) => graphRagError("follow-edges-query", cause), - }); + return requestClient(clients.triples, "follow-edges-query", request); }); const results = yield* Effect.all(queries); @@ -321,24 +329,26 @@ function scoreEdges( Effect.mapError((cause) => graphRagError("edge-score-encode", cause)), ); - const promptResp = yield* Effect.tryPromise({ - try: () => clients.prompt.request({ + const promptResp = yield* requestClient( + clients.prompt, + "edge-score-prompt", + { name: "kg-edge-scoring", variables: { query, knowledge: knowledgeJson, }, - }), - catch: (cause) => graphRagError("edge-score-prompt", cause), - }); + }, + ); - const llmResp = yield* Effect.tryPromise({ - try: () => clients.llm.request({ + const llmResp = yield* requestClient( + clients.llm, + "edge-score-llm", + { system: promptResp.system, prompt: promptResp.prompt, - }), - catch: (cause) => graphRagError("edge-score-llm", cause), - }); + }, + ); yield* Effect.log(`[GraphRag] Edge scoring LLM response (first 500 chars): ${llmResp.response.slice(0, 500)}`); @@ -375,43 +385,49 @@ function synthesize( .map((triple) => `${termToString(triple.s)} -> ${termToString(triple.p)} -> ${termToString(triple.o)}`) .join("\n"); - const promptResp = yield* Effect.tryPromise({ - try: () => clients.prompt.request({ + const promptResp = yield* requestClient( + clients.prompt, + "synthesize-prompt", + { name: "graph-rag-synthesize", variables: { query, context }, - }), - catch: (cause) => graphRagError("synthesize-prompt", cause), - }); + }, + ); if (chunkCallback !== undefined) { let fullText = ""; - yield* Effect.tryPromise({ - try: () => clients.llm.request( - { - system: promptResp.system, - prompt: promptResp.prompt, - streaming: true, + yield* requestClient( + clients.llm, + "synthesize-stream", + { + system: promptResp.system, + prompt: promptResp.prompt, + streaming: true, + }, + { + recipient: (resp) => { + if (resp.response.length === 0) { + return Effect.succeed(resp.endOfStream === true); + } + fullText += resp.response; + return Effect.tryPromise({ + try: () => chunkCallback(resp.response, resp.endOfStream === true).then(() => resp.endOfStream === true), + catch: (cause) => graphRagError("synthesize-stream-callback", cause), + }); }, - { - recipient: (resp) => { - if (resp.response.length === 0) return Promise.resolve(resp.endOfStream === true); - fullText += resp.response; - return chunkCallback(resp.response, resp.endOfStream === true).then(() => resp.endOfStream === true); - }, - }, - ), - catch: (cause) => graphRagError("synthesize-stream", cause), - }); + }, + ); return fullText; } - const resp = yield* Effect.tryPromise({ - try: () => clients.llm.request({ + const resp = yield* requestClient( + clients.llm, + "synthesize-llm", + { system: promptResp.system, prompt: promptResp.prompt, - }), - catch: (cause) => graphRagError("synthesize-llm", cause), - }); + }, + ); return resp.response; }); diff --git a/ts/scripts/run-agent.ts b/ts/scripts/run-agent.ts index 54d5b6a0..5f2d9565 100644 --- a/ts/scripts/run-agent.ts +++ b/ts/scripts/run-agent.ts @@ -6,9 +6,6 @@ * Env: * NATS_URL (default: nats://localhost:4222) */ -import { run } from "../packages/flow/src/agent/react/service.js"; +import {runMain} from "../packages/flow/src/agent/react/service.js"; -run().catch((err) => { - console.error("Agent service failed:", err); - process.exit(1); -}); +runMain(); diff --git a/ts/scripts/run-document-rag.ts b/ts/scripts/run-document-rag.ts index fc430d3a..39465e1c 100644 --- a/ts/scripts/run-document-rag.ts +++ b/ts/scripts/run-document-rag.ts @@ -1,6 +1,3 @@ -import { run } from "../packages/flow/src/retrieval/document-rag-service.js"; +import {runMain} from "../packages/flow/src/retrieval/document-rag-service.js"; -run().catch((err) => { - console.error("Document RAG service failed:", err); - process.exit(1); -}); +runMain(); diff --git a/ts/scripts/run-graph-rag.ts b/ts/scripts/run-graph-rag.ts index a35d93db..655563b3 100644 --- a/ts/scripts/run-graph-rag.ts +++ b/ts/scripts/run-graph-rag.ts @@ -1,6 +1,3 @@ -import { run } from "../packages/flow/src/retrieval/graph-rag-service.js"; +import {runMain} from "../packages/flow/src/retrieval/graph-rag-service.js"; -run().catch((err) => { - console.error("Graph RAG service failed:", err); - process.exit(1); -}); +runMain();