Remove RAG requestor Promise bridges

This commit is contained in:
elpresidank 2026-06-02 00:54:47 -05:00
parent 88db18fbda
commit 5979d38b99
11 changed files with 249 additions and 293 deletions

View file

@ -5,7 +5,7 @@ import type {
DocumentEmbeddingsResponse,
EmbeddingsRequest,
EmbeddingsResponse,
FlowRequestor,
EffectRequestResponse,
GraphEmbeddingsRequest,
GraphEmbeddingsResponse,
PromptRequest,
@ -19,10 +19,10 @@ import { makeDocumentRagEngine, type DocumentRagClients } from "../retrieval/doc
import { makeGraphRagEngine, type GraphRagClients } from "../retrieval/graph-rag.js";
const requestor = <TReq, TRes>(
handler: (request: TReq) => TRes | Promise<TRes>,
): FlowRequestor<TReq, TRes> => ({
request: async (request) => handler(request),
stop: async () => undefined,
handler: (request: TReq) => TRes,
): EffectRequestResponse<TReq, TRes> => ({
request: (request) => Effect.succeed(handler(request)),
stop: Effect.void,
});
describe("RAG engines", () => {

View file

@ -16,6 +16,9 @@
* Python reference: trustgraph-flow/trustgraph/agent/react/service.py
*/
import {
NodeRuntime,
} from "@effect/platform-node";
import {
makeFlowProcessor,
makeConsumerSpec,
@ -39,16 +42,13 @@ import {
type ToolRequest,
type ToolResponse,
type EffectConfigHandler,
type EffectRequestOptions,
type EffectRequestResponse,
type FlowRequestOptions,
type FlowRequestor,
type FlowResourceNotFoundError,
type MessagingDeliveryError,
type Spec,
} from "@trustgraph/base";
import { Context, Effect, Layer, Ref } from "effect";
import {Context, Effect, Layer, ManagedRuntime, Ref} from "effect";
import * as O from "effect/Option";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
import {
@ -106,29 +106,6 @@ export class AgentRuntime extends Context.Service<AgentRuntime, AgentRuntimeServ
"@trustgraph/flow/agent/react/service/AgentRuntime",
) {}
const toEffectRequestOptions = <TRes>(
options: FlowRequestOptions<TRes> | undefined,
): EffectRequestOptions<TRes> | undefined => {
if (options === undefined) return undefined;
return {
...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }),
...(options.recipient === undefined
? {}
: {
recipient: (response: TRes) =>
Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)),
}),
};
};
const toPromiseRequestor = <TReq, TRes>(
requestor: EffectRequestResponse<TReq, TRes>,
): FlowRequestor<TReq, TRes> => ({
request: (request, options) =>
Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))),
stop: () => Effect.runPromise(requestor.stop),
});
const buildConfiguredTool = (
toolId: string,
data: ToolConfigEntry,
@ -137,7 +114,7 @@ const buildConfiguredTool = (
const implType = data.type ?? "";
const name = data.name ?? "";
const description = data.description ?? "";
const config = { ...data } as Record<string, unknown>;
const config: Record<string, unknown> = { ...data };
if (name.length === 0) {
yield* Effect.logWarning(`[AgentService] Skipping tool with no name: ${toolId}`);
@ -277,12 +254,13 @@ const wireTools = Effect.fn("AgentService.wireTools")(function* (
const mcpTool = yield* flowCtx.flow.requestorEffect<ToolRequest, ToolResponse>("mcp-tool");
return tools.map((tool) => {
const implType = tool.config?.type as string | undefined;
const rawImplType = tool.config?.type;
const implType = Predicate.isString(rawImplType) ? rawImplType : undefined;
switch (implType) {
case "knowledge-query": {
const live = createKnowledgeQueryTool(
toPromiseRequestor(graphRag),
graphRag,
collection,
onExplain,
);
@ -290,21 +268,21 @@ const wireTools = Effect.fn("AgentService.wireTools")(function* (
}
case "document-query": {
const live = createDocumentQueryTool(
toPromiseRequestor(docRag),
docRag,
collection,
);
return { ...tool, execute: live.execute };
}
case "triples-query": {
const live = createTriplesQueryTool(
toPromiseRequestor(triples),
triples,
collection,
);
return { ...tool, execute: live.execute };
}
case "mcp-tool": {
const live = createMcpTool(
toPromiseRequestor(mcpTool),
mcpTool,
tool.name,
tool.description,
tool.args,
@ -328,16 +306,16 @@ const defaultTools = Effect.fn("AgentService.defaultTools")(function* (
return [
createKnowledgeQueryTool(
toPromiseRequestor(graphRag),
graphRag,
collection,
onExplain,
),
createDocumentQueryTool(
toPromiseRequestor(docRag),
docRag,
collection,
),
createTriplesQueryTool(
toPromiseRequestor(triples),
triples,
collection,
),
];
@ -433,7 +411,7 @@ const onAgentRequest = Effect.fn("AgentService.onRequest")(function* (
content: "",
explain_id: explain.explainId,
explain_triples: explain.triples,
} as AgentResponse);
});
}
yield* responseProducer.send(requestId, {
@ -630,6 +608,12 @@ export const program = makeFlowProcessorProgram<ProcessorConfig, never, AgentRun
layer: () => AgentRuntimeLive,
});
const agentRuntime = ManagedRuntime.make(Layer.empty);
export function run(): Promise<void> {
return Effect.runPromise(program);
return agentRuntime.runPromise(program);
}
export function runMain(): void {
NodeRuntime.runMain(program);
}

View file

@ -6,7 +6,7 @@
*/
import type {
FlowRequestor,
EffectRequestResponse,
GraphRagRequest,
GraphRagResponse,
DocumentRagRequest,
@ -18,13 +18,16 @@ import type {
Term,
Triple,
} from "@trustgraph/base";
import {Term as TermSchema} from "@trustgraph/base";
import { Effect } from "effect";
import * as O from "effect/Option";
import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema";
import type { AgentTool, ToolArg } from "./types.js";
const decodeJsonUnknown = S.decodeUnknownOption(S.UnknownFromJsonString);
const decodeTerm = S.decodeUnknownOption(TermSchema);
/**
* Format a Term to a human-readable string.
@ -71,7 +74,7 @@ export interface ExplainData {
* Query the knowledge graph for information about entities and their relationships.
*/
export function createKnowledgeQueryTool(
client: FlowRequestor<GraphRagRequest, GraphRagResponse>,
client: EffectRequestResponse<GraphRagRequest, GraphRagResponse>,
collection?: string,
onExplain?: (data: ExplainData) => void,
): AgentTool {
@ -93,19 +96,14 @@ export function createKnowledgeQueryTool(
query: question,
...(collection !== undefined ? { collection } : {}),
};
const res = yield* Effect.tryPromise(() => client.request(request));
const res = yield* client.request(request);
yield* Effect.log(`[KnowledgeQuery] Response (${res.response?.length ?? 0} chars): ${res.error !== undefined ? `ERROR: ${res.error.message}` : `${res.response?.slice(0, 300)}...`}`);
// Extract explain data if embedded in the response
const rawRes = res as Record<string, unknown>;
if (
rawRes.message_type === "explain" &&
rawRes.explain_triples !== undefined &&
onExplain !== undefined
) {
const explainTriples = res.explain_triples;
if (res.message_type === "explain" && explainTriples !== undefined && onExplain !== undefined) {
yield* Effect.sync(() => onExplain({
explainId: (rawRes.explain_id as string) ?? "",
triples: rawRes.explain_triples as Triple[],
explainId: res.explain_id ?? "",
triples: Array.from(explainTriples),
}));
}
@ -119,7 +117,7 @@ export function createKnowledgeQueryTool(
* Search documents for relevant information.
*/
export function createDocumentQueryTool(
client: FlowRequestor<DocumentRagRequest, DocumentRagResponse>,
client: EffectRequestResponse<DocumentRagRequest, DocumentRagResponse>,
collection?: string,
): AgentTool {
return {
@ -139,13 +137,24 @@ export function createDocumentQueryTool(
query: question,
...(collection !== undefined ? { collection } : {}),
};
const res = yield* Effect.tryPromise(() => client.request(request));
const res = yield* client.request(request);
if (res.error !== undefined) return `Error: ${res.error.message}`;
return res.response;
})),
};
}
const objectProperty = (value: object, key: string): unknown =>
Predicate.hasProperty(value, key) ? value[key] : undefined;
const termFromUnknown = (value: unknown): Term | undefined => {
if (Predicate.isString(value)) {
return { type: "LITERAL", value };
}
const decoded = decodeTerm(value);
return O.isSome(decoded) ? decoded.value : undefined;
};
/**
* Parse triples query input. Accepts JSON with optional s, p, o fields.
*/
@ -166,30 +175,21 @@ function parseTriplesInput(input: string): {
};
}
const parsed = decoded.value as Record<string, unknown>;
const toTerm = (val: unknown): Term | undefined => {
if (typeof val === "string") {
return { type: "LITERAL", value: val };
}
if (typeof val === "object" && val !== null && "type" in val) {
return val as Term;
}
return undefined;
};
const result: {
s?: Term;
p?: Term;
o?: Term;
limit?: number;
} = {};
const s = toTerm(parsed.subject ?? parsed.s);
const p = toTerm(parsed.predicate ?? parsed.p);
const o = toTerm(parsed.object ?? parsed.o);
const parsed = decoded.value;
const s = termFromUnknown(objectProperty(parsed, "subject") ?? objectProperty(parsed, "s"));
const p = termFromUnknown(objectProperty(parsed, "predicate") ?? objectProperty(parsed, "p"));
const o = termFromUnknown(objectProperty(parsed, "object") ?? objectProperty(parsed, "o"));
const limit = objectProperty(parsed, "limit");
if (s !== undefined) result.s = s;
if (p !== undefined) result.p = p;
if (o !== undefined) result.o = o;
if (typeof parsed.limit === "number") result.limit = parsed.limit;
if (Predicate.isNumber(limit)) result.limit = limit;
return result;
}
@ -197,7 +197,7 @@ function parseTriplesInput(input: string): {
* Query for specific triples (subject-predicate-object relationships) in the knowledge graph.
*/
export function createTriplesQueryTool(
client: FlowRequestor<TriplesQueryRequest, TriplesQueryResponse>,
client: EffectRequestResponse<TriplesQueryRequest, TriplesQueryResponse>,
collection?: string,
): AgentTool {
return {
@ -231,7 +231,7 @@ export function createTriplesQueryTool(
...(o !== undefined ? { o } : {}),
...(collection !== undefined ? { collection } : {}),
};
const res = yield* Effect.tryPromise(() => client.request(request));
const res = yield* client.request(request);
if (res.error !== undefined) return `Error: ${res.error.message}`;
@ -255,7 +255,7 @@ export function createTriplesQueryTool(
* this function just wraps it as an AgentTool the ReAct agent can invoke.
*/
export function createMcpTool(
client: FlowRequestor<ToolRequest, ToolResponse>,
client: EffectRequestResponse<ToolRequest, ToolResponse>,
toolName: string,
description: string,
args: ToolArg[],
@ -265,7 +265,7 @@ export function createMcpTool(
description,
args,
execute: (input: string): Promise<string> => Effect.runPromise(Effect.gen(function* () {
const res = yield* Effect.tryPromise(() => client.request({ name: toolName, parameters: input }));
const res = yield* client.request({ name: toolName, parameters: input });
if (res.error !== undefined) return `Error: ${res.error.message}`;
if (res.text !== undefined) return res.text;
if (res.object !== undefined) return res.object;

View file

@ -7,6 +7,7 @@
* Python reference: trustgraph-flow/trustgraph/retrieval/document_rag/
*/
import {NodeRuntime} from "@effect/platform-node";
import {
makeConsumerSpec,
makeFlowProcessor,
@ -17,14 +18,10 @@ import {
type DocumentEmbeddingsResponse,
type DocumentRagRequest,
type DocumentRagResponse,
type EffectRequestOptions,
type EffectRequestResponse,
type EmbeddingsRequest,
type EmbeddingsResponse,
type FlowContext,
type FlowProcessorRuntime,
type FlowRequestOptions,
type FlowRequestor,
type FlowResourceNotFoundError,
type MessagingDeliveryError,
type ProcessorConfig,
@ -34,7 +31,7 @@ import {
type TextCompletionRequest,
type TextCompletionResponse,
} from "@trustgraph/base";
import { Effect } from "effect";
import {Effect, Layer, ManagedRuntime} from "effect";
import {
DocumentRagEngine,
DocumentRagEngineError,
@ -43,29 +40,6 @@ import {
type DocumentRagClients,
} from "./document-rag.js";
const toEffectRequestOptions = <TRes>(
options: FlowRequestOptions<TRes> | undefined,
): EffectRequestOptions<TRes> | undefined => {
if (options === undefined) return undefined;
return {
...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }),
...(options.recipient === undefined
? {}
: {
recipient: (response: TRes) =>
Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)),
}),
};
};
const toPromiseRequestor = <TReq, TRes>(
requestor: EffectRequestResponse<TReq, TRes>,
): FlowRequestor<TReq, TRes> => ({
request: (request, options) =>
Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))),
stop: () => Effect.runPromise(requestor.stop),
});
const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function* (
msg: DocumentRagRequest,
properties: Record<string, string>,
@ -78,12 +52,10 @@ const onDocumentRagRequest = Effect.fn("DocumentRagService.onRequest")(function*
const engine = yield* DocumentRagEngine;
const clients: DocumentRagClients = {
llm: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<TextCompletionRequest, TextCompletionResponse>("llm")),
embeddings: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<EmbeddingsRequest, EmbeddingsResponse>("embeddings")),
docEmbeddings: toPromiseRequestor(
yield* flowCtx.flow.requestorEffect<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>("doc-embeddings"),
),
prompt: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<PromptRequest, PromptResponse>("prompt")),
llm: yield* flowCtx.flow.requestorEffect<TextCompletionRequest, TextCompletionResponse>("llm"),
embeddings: yield* flowCtx.flow.requestorEffect<EmbeddingsRequest, EmbeddingsResponse>("embeddings"),
docEmbeddings: yield* flowCtx.flow.requestorEffect<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>("doc-embeddings"),
prompt: yield* flowCtx.flow.requestorEffect<PromptRequest, PromptResponse>("prompt"),
};
const response = yield* engine.query(
@ -161,6 +133,12 @@ export const program = makeFlowProcessorProgram({
layer: () => DocumentRagLive,
});
const documentRagRuntime = ManagedRuntime.make(Layer.empty);
export function run(): Promise<void> {
return Effect.runPromise(program);
return documentRagRuntime.runPromise(program);
}
export function runMain(): void {
NodeRuntime.runMain(program);
}

View file

@ -9,7 +9,7 @@ import type {
DocumentEmbeddingsResponse,
EmbeddingsRequest,
EmbeddingsResponse,
FlowRequestor,
EffectRequestResponse,
PromptRequest,
PromptResponse,
TextCompletionRequest,
@ -20,10 +20,10 @@ import { Context, Effect, Layer } from "effect";
import * as S from "effect/Schema";
export interface DocumentRagClients {
llm: FlowRequestor<TextCompletionRequest, TextCompletionResponse>;
embeddings: FlowRequestor<EmbeddingsRequest, EmbeddingsResponse>;
docEmbeddings: FlowRequestor<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>;
prompt: FlowRequestor<PromptRequest, PromptResponse>;
llm: EffectRequestResponse<TextCompletionRequest, TextCompletionResponse>;
embeddings: EffectRequestResponse<EmbeddingsRequest, EmbeddingsResponse>;
docEmbeddings: EffectRequestResponse<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>;
prompt: EffectRequestResponse<PromptRequest, PromptResponse>;
}
export type ChunkCallback = (text: string, endOfStream: boolean) => Promise<void>;
@ -101,21 +101,19 @@ function queryDocumentRag(
return Effect.gen(function* () {
const collection = options?.collection ?? "default";
const embResp = yield* Effect.tryPromise({
try: () => clients.embeddings.request({ text: [queryText] }),
catch: (cause) => documentRagError("embeddings", cause),
});
const embResp = yield* clients.embeddings.request({ text: [queryText] }).pipe(
Effect.mapError((cause) => documentRagError("embeddings", cause)),
);
const vectors = embResp.vectors;
const docResp = yield* Effect.tryPromise({
try: () => clients.docEmbeddings.request({
const docResp = yield* clients.docEmbeddings.request({
vectors,
limit: 10,
collection,
user: "default",
}),
catch: (cause) => documentRagError("document-embeddings", cause),
});
}).pipe(
Effect.mapError((cause) => documentRagError("document-embeddings", cause)),
);
const chunks = docResp.chunks ?? [];
yield* Effect.log(`[DocumentRag] Found ${chunks.length} matching chunks`);
@ -125,21 +123,19 @@ function queryDocumentRag(
)
.join("\n\n---\n\n");
const promptResp = yield* Effect.tryPromise({
try: () => clients.prompt.request({
const promptResp = yield* clients.prompt.request({
name: "document-rag-synthesize",
variables: { query: queryText, context },
}),
catch: (cause) => documentRagError("prompt", cause),
});
}).pipe(
Effect.mapError((cause) => documentRagError("prompt", cause)),
);
const resp = yield* Effect.tryPromise({
try: () => clients.llm.request({
const resp = yield* clients.llm.request({
system: promptResp.system,
prompt: promptResp.prompt,
}),
catch: (cause) => documentRagError("llm", cause),
});
}).pipe(
Effect.mapError((cause) => documentRagError("llm", cause)),
);
return resp.response;
});

View file

@ -7,18 +7,15 @@
* Python reference: trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py
*/
import {NodeRuntime} from "@effect/platform-node";
import {
makeConsumerSpec,
makeFlowProcessor,
makeProducerSpec,
makeRequestResponseSpec,
makeFlowProcessorProgram,
type EffectRequestOptions,
type EffectRequestResponse,
type FlowContext,
type FlowProcessorRuntime,
type FlowRequestOptions,
type FlowRequestor,
type FlowResourceNotFoundError,
type GraphEmbeddingsRequest,
type GraphEmbeddingsResponse,
@ -36,7 +33,7 @@ import {
type TriplesQueryRequest,
type TriplesQueryResponse,
} from "@trustgraph/base";
import { Effect } from "effect";
import {Effect, Layer, ManagedRuntime} from "effect";
import {
GraphRagEngine,
GraphRagEngineError,
@ -46,29 +43,6 @@ import {
type GraphRagConfig,
} from "./graph-rag.js";
const toEffectRequestOptions = <TRes>(
options: FlowRequestOptions<TRes> | undefined,
): EffectRequestOptions<TRes> | undefined => {
if (options === undefined) return undefined;
return {
...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }),
...(options.recipient === undefined
? {}
: {
recipient: (response: TRes) =>
Effect.promise(() => options.recipient?.(response) ?? Promise.resolve(true)),
}),
};
};
const toPromiseRequestor = <TReq, TRes>(
requestor: EffectRequestResponse<TReq, TRes>,
): FlowRequestor<TReq, TRes> => ({
request: (request, options) =>
Effect.runPromise(requestor.request(request, toEffectRequestOptions(options))),
stop: () => Effect.runPromise(requestor.stop),
});
const graphRagConfigFromRequest = (msg: GraphRagRequest): GraphRagConfig => ({
...(msg.entityLimit !== undefined ? { entityLimit: msg.entityLimit } : {}),
...(msg.tripleLimit !== undefined ? { tripleLimit: msg.tripleLimit } : {}),
@ -90,13 +64,11 @@ const onGraphRagRequest = Effect.fn("GraphRagService.onRequest")(function* (
yield* Effect.log(`[GraphRagService] Received request ${requestId}: "${msg.query?.slice(0, 60)}..." collection=${msg.collection}`);
const clients: GraphRagClients = {
llm: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<TextCompletionRequest, TextCompletionResponse>("llm")),
embeddings: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<EmbeddingsRequest, EmbeddingsResponse>("embeddings")),
graphEmbeddings: toPromiseRequestor(
yield* flowCtx.flow.requestorEffect<GraphEmbeddingsRequest, GraphEmbeddingsResponse>("graph-embeddings"),
),
triples: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<TriplesQueryRequest, TriplesQueryResponse>("triples")),
prompt: toPromiseRequestor(yield* flowCtx.flow.requestorEffect<PromptRequest, PromptResponse>("prompt")),
llm: yield* flowCtx.flow.requestorEffect<TextCompletionRequest, TextCompletionResponse>("llm"),
embeddings: yield* flowCtx.flow.requestorEffect<EmbeddingsRequest, EmbeddingsResponse>("embeddings"),
graphEmbeddings: yield* flowCtx.flow.requestorEffect<GraphEmbeddingsRequest, GraphEmbeddingsResponse>("graph-embeddings"),
triples: yield* flowCtx.flow.requestorEffect<TriplesQueryRequest, TriplesQueryResponse>("triples"),
prompt: yield* flowCtx.flow.requestorEffect<PromptRequest, PromptResponse>("prompt"),
};
const result = yield* engine.query(
@ -125,16 +97,18 @@ const onGraphRagRequest = Effect.fn("GraphRagService.onRequest")(function* (
if (result === undefined) return;
const response: GraphRagResponse = {
response: result.answer,
endOfStream: true,
};
if (result.subgraph.length > 0) {
(response as Record<string, unknown>).message_type = "explain";
(response as Record<string, unknown>).explain_id = `explain-${requestId}`;
(response as Record<string, unknown>).explain_triples = result.subgraph;
}
const response: GraphRagResponse = result.subgraph.length === 0
? {
response: result.answer,
endOfStream: true,
}
: {
response: result.answer,
endOfStream: true,
message_type: "explain",
explain_id: `explain-${requestId}`,
explain_triples: result.subgraph,
};
yield* producer.send(requestId, response);
});
@ -192,6 +166,12 @@ export const program = makeFlowProcessorProgram({
layer: () => GraphRagLive,
});
const graphRagRuntime = ManagedRuntime.make(Layer.empty);
export function run(): Promise<void> {
return Effect.runPromise(program);
return graphRagRuntime.runPromise(program);
}
export function runMain(): void {
NodeRuntime.runMain(program);
}

View file

@ -7,7 +7,8 @@
import type {
EmbeddingsRequest,
EmbeddingsResponse,
FlowRequestor,
EffectRequestOptions,
EffectRequestResponse,
GraphEmbeddingsRequest,
GraphEmbeddingsResponse,
PromptRequest,
@ -34,11 +35,11 @@ export interface GraphRagConfig {
}
export interface GraphRagClients {
llm: FlowRequestor<TextCompletionRequest, TextCompletionResponse>;
embeddings: FlowRequestor<EmbeddingsRequest, EmbeddingsResponse>;
graphEmbeddings: FlowRequestor<GraphEmbeddingsRequest, GraphEmbeddingsResponse>;
triples: FlowRequestor<TriplesQueryRequest, TriplesQueryResponse>;
prompt: FlowRequestor<PromptRequest, PromptResponse>;
llm: EffectRequestResponse<TextCompletionRequest, TextCompletionResponse>;
embeddings: EffectRequestResponse<EmbeddingsRequest, EmbeddingsResponse>;
graphEmbeddings: EffectRequestResponse<GraphEmbeddingsRequest, GraphEmbeddingsResponse>;
triples: EffectRequestResponse<TriplesQueryRequest, TriplesQueryResponse>;
prompt: EffectRequestResponse<PromptRequest, PromptResponse>;
}
export type ChunkCallback = (text: string, endOfStream: boolean) => Promise<void>;
@ -92,6 +93,16 @@ const graphRagError = (operation: string, cause: unknown) =>
message: errorMessage(cause),
});
const requestClient = <TReq, TRes>(
requestor: EffectRequestResponse<TReq, TRes>,
operation: string,
request: TReq,
options?: EffectRequestOptions<TRes, GraphRagEngineError>,
): Effect.Effect<TRes, GraphRagEngineError> =>
requestor.request(request, options).pipe(
Effect.mapError((cause) => graphRagError(operation, cause)),
);
export function normalizeGraphRagConfig(config: GraphRagConfig = {}): NormalizedGraphRagConfig {
return {
entityLimit: config.entityLimit ?? 50,
@ -178,21 +189,23 @@ function queryGraphRag(
function extractConcepts(clients: GraphRagClients, query: string): Effect.Effect<string[], GraphRagEngineError> {
return Effect.gen(function* () {
const promptResp = yield* Effect.tryPromise({
try: () => clients.prompt.request({
const promptResp = yield* requestClient(
clients.prompt,
"extract-concepts-prompt",
{
name: "extract-concepts",
variables: { query },
}),
catch: (cause) => graphRagError("extract-concepts-prompt", cause),
});
},
);
const llmResp = yield* Effect.tryPromise({
try: () => clients.llm.request({
const llmResp = yield* requestClient(
clients.llm,
"extract-concepts-llm",
{
system: promptResp.system,
prompt: promptResp.prompt,
}),
catch: (cause) => graphRagError("extract-concepts-llm", cause),
});
},
);
return llmResp.response
.split("\n")
@ -203,10 +216,7 @@ function extractConcepts(clients: GraphRagClients, query: string): Effect.Effect
function getVectors(clients: GraphRagClients, concepts: string[]): Effect.Effect<number[][], GraphRagEngineError> {
return Effect.gen(function* () {
const resp = yield* Effect.tryPromise({
try: () => clients.embeddings.request({ text: concepts }),
catch: (cause) => graphRagError("get-vectors", cause),
});
const resp = yield* requestClient(clients.embeddings, "get-vectors", { text: concepts });
return resp.vectors;
});
}
@ -218,15 +228,16 @@ function getEntities(
collection?: string,
): Effect.Effect<Term[], GraphRagEngineError> {
return Effect.gen(function* () {
const resp = yield* Effect.tryPromise({
try: () => clients.graphEmbeddings.request({
const resp = yield* requestClient(
clients.graphEmbeddings,
"get-entities",
{
vectors,
user: "default",
collection: collection ?? "default",
limit: config.entityLimit,
}),
catch: (cause) => graphRagError("get-entities", cause),
});
},
);
return resp.entities;
});
}
@ -259,10 +270,7 @@ function followEdges(
limit: config.tripleLimit,
...(collection !== undefined ? { collection } : {}),
};
return Effect.tryPromise({
try: () => clients.triples.request(request),
catch: (cause) => graphRagError("follow-edges-query", cause),
});
return requestClient(clients.triples, "follow-edges-query", request);
});
const results = yield* Effect.all(queries);
@ -321,24 +329,26 @@ function scoreEdges(
Effect.mapError((cause) => graphRagError("edge-score-encode", cause)),
);
const promptResp = yield* Effect.tryPromise({
try: () => clients.prompt.request({
const promptResp = yield* requestClient(
clients.prompt,
"edge-score-prompt",
{
name: "kg-edge-scoring",
variables: {
query,
knowledge: knowledgeJson,
},
}),
catch: (cause) => graphRagError("edge-score-prompt", cause),
});
},
);
const llmResp = yield* Effect.tryPromise({
try: () => clients.llm.request({
const llmResp = yield* requestClient(
clients.llm,
"edge-score-llm",
{
system: promptResp.system,
prompt: promptResp.prompt,
}),
catch: (cause) => graphRagError("edge-score-llm", cause),
});
},
);
yield* Effect.log(`[GraphRag] Edge scoring LLM response (first 500 chars): ${llmResp.response.slice(0, 500)}`);
@ -375,43 +385,49 @@ function synthesize(
.map((triple) => `${termToString(triple.s)} -> ${termToString(triple.p)} -> ${termToString(triple.o)}`)
.join("\n");
const promptResp = yield* Effect.tryPromise({
try: () => clients.prompt.request({
const promptResp = yield* requestClient(
clients.prompt,
"synthesize-prompt",
{
name: "graph-rag-synthesize",
variables: { query, context },
}),
catch: (cause) => graphRagError("synthesize-prompt", cause),
});
},
);
if (chunkCallback !== undefined) {
let fullText = "";
yield* Effect.tryPromise({
try: () => clients.llm.request(
{
system: promptResp.system,
prompt: promptResp.prompt,
streaming: true,
yield* requestClient(
clients.llm,
"synthesize-stream",
{
system: promptResp.system,
prompt: promptResp.prompt,
streaming: true,
},
{
recipient: (resp) => {
if (resp.response.length === 0) {
return Effect.succeed(resp.endOfStream === true);
}
fullText += resp.response;
return Effect.tryPromise({
try: () => chunkCallback(resp.response, resp.endOfStream === true).then(() => resp.endOfStream === true),
catch: (cause) => graphRagError("synthesize-stream-callback", cause),
});
},
{
recipient: (resp) => {
if (resp.response.length === 0) return Promise.resolve(resp.endOfStream === true);
fullText += resp.response;
return chunkCallback(resp.response, resp.endOfStream === true).then(() => resp.endOfStream === true);
},
},
),
catch: (cause) => graphRagError("synthesize-stream", cause),
});
},
);
return fullText;
}
const resp = yield* Effect.tryPromise({
try: () => clients.llm.request({
const resp = yield* requestClient(
clients.llm,
"synthesize-llm",
{
system: promptResp.system,
prompt: promptResp.prompt,
}),
catch: (cause) => graphRagError("synthesize-llm", cause),
});
},
);
return resp.response;
});