diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 2eafdda3..91b8ba4c 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,13 +12,14 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 gateway -streaming callback slice: +Current signal counts from `ts/packages` after the 2026-06-02 FalkorDB scoped +client lifecycle slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 163 | +| `Effect.runPromise` | 165 | | `Effect.runPromiseWith` | 0 | +| `Effect.cached` | 0 | | `Map<` | 82 | | `WebSocket` | 62 | | `new Map` | 60 | @@ -98,6 +99,12 @@ Notes: streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client `StopStreaming` sentinel error with `Stream.runForEachWhile`. +- The FalkorDB scoped client lifecycle slice removed the remaining + `Effect.cached` matches from `ts/packages`. FalkorDB triples store/query + Live layers and direct compatibility factories now acquire clients through + `Effect.acquireRelease` and disconnect them on scope close. The + `Effect.runPromise` count increased by two because the new lifecycle tests + run scoped programs at the test boundary. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -758,6 +765,40 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: FalkorDB Scoped Client Lifecycle Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/storage/triples/falkordb.ts` and + `ts/packages/flow/src/query/triples/falkordb.ts` now model FalkorDB client + acquisition with `Effect.acquireRelease`. + - FalkorDB Live layers now use `Layer.effect` and own Redis client + disconnect finalizers through the layer scope. + - Direct Promise compatibility factories and direct service factories now + bracket each operation with scoped acquisition instead of hiding mutable + `Effect.cached` connection slots. + - Legacy `makeTriplesStoreService` and `makeTriplesQueryService` provider + hooks now acquire scoped FalkorDB services and map acquisition failures to + `ProcessorLifecycleError`; modern `program` entrypoints preserve the + FalkorDB tagged layer error type. + - FalkorDB query row field extraction now uses `effect/Predicate` narrowing + instead of record/string type assertions. + - New lifecycle tests use fake clients/graphs to prove connect on acquire + and disconnect on scope close for both triples store and triples query. +- Remaining: + - Qdrant graph/doc store/query construction still needs the next + config/schema/fakeability cleanup. The installed Qdrant client exposes no + close/disconnect method, so this should not be treated as a lifecycle + finalizer slice. +- Verification: + - `bunx --bun vitest run src/__tests__/falkordb-lifecycle.test.ts` + - `bun run --cwd ts/packages/flow build` + - `cd ts && bun run check` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -810,9 +851,9 @@ Notes: remaining `ts/packages` matches. - Provider SDKs and storage clients should become managed resources where they have meaningful lifecycle. - - FalkorDB should be the next P1 storage slice: both triples query and store - connect Redis clients, cache them with mutable `Effect.cached` slots, and - expose `Layer.succeed` services without a scoped client finalizer. + - FalkorDB scoped lifecycle is complete for triples query/store. Use the + fakeable client/graph factory pattern from that slice for future storage + client tests. - Qdrant has no close/disconnect surface in the installed client, so treat it as a config/schema/fakeability slice rather than an `acquireRelease` close slice. @@ -821,33 +862,47 @@ Notes: ## Ranked Findings -### P1: Make SDK, Storage, And Provider Layers Managed Resources +### P1: Qdrant Config, Schema, And Fakeable Construction Cleanup - TrustGraph evidence: - - `ts/packages/flow/src/storage/triples/falkordb.ts` - - `ts/packages/flow/src/query/triples/falkordb.ts` - `ts/packages/flow/src/storage/embeddings/qdrant-graph.ts` - `ts/packages/flow/src/storage/embeddings/qdrant-doc.ts` - `ts/packages/flow/src/query/embeddings/qdrant-graph.ts` - `ts/packages/flow/src/query/embeddings/qdrant-doc.ts` +- Effect primitives: + - `Config`, `ConfigProvider`, `Layer.effect`, `Schema.decodeUnknownEffect`, + `Predicate`, `Option`. +- Rewrite shape: + - Move Qdrant config loading out of sync factory construction and into + Effect config/layer paths. + - Add fakeable Qdrant client construction before behavior changes. + - Decode query payloads through Schema instead of manual payload casts. + - Do not add an `acquireRelease` finalizer unless a concrete close API is + found in the installed Qdrant client. +- Tests: + - Qdrant graph/doc store/query tests with fake clients. + - Config tests with `ConfigProvider.fromUnknown`. + - Schema decode failure tests for malformed payloads. + +### P2: Provider Layer And Effect AI Cleanup + +- TrustGraph evidence: - `ts/packages/flow/src/model/text-completion/*.ts` - `ts/packages/flow/src/embeddings/ollama.ts` - Effect primitives: - - `Effect.acquireRelease`, `Layer.effect`/`Layer.scoped`, `Config`, - `ConfigProvider`, `Metric`, `Logger`, Effect AI provider layers. + - `Config`, `ConfigProvider`, `Metric`, `Logger`, + `effect/unstable/ai/LanguageModel`, `effect/unstable/ai/EmbeddingModel`, + Effect AI OpenAI/Anthropic provider layers. - Rewrite shape: - - First migrate FalkorDB triples store/query so Redis client connect and - disconnect/quit are owned by the service layer scope instead of mutable - cached effects hidden inside a `Layer.succeed` service. - - Move env/config reading into `Config` loaders and provider-specific layers. - - Scope SDK clients that need explicit close/disconnect; for clients without - close APIs, prefer config/schema/fakeable construction work instead. + - Migrate provider config into Effect layers. + - Use Effect AI provider layers where parity is proven. + - Keep OpenAI-compatible/Azure-compatible behavior behind parity tests + because current code uses chat-completions style APIs while the Effect + OpenAI language model is Responses API backed. - Tests: - - FalkorDB tests with fake client factories proving connect on acquire and - disconnect/quit on scope close. - - Provider/config tests with `ConfigProvider.fromUnknown`. - - Storage/query tests with fake clients before changing real resource - lifetimes. + - Provider parity for `LlmResult`, final streaming chunk token counts, 429 + mapping, missing-token config failures, and OpenAI-compatible local-server + behavior. ### P2: Canonicalize MCP Around The Effect Server @@ -881,9 +936,9 @@ Notes: ## Recommended PR Order -1. FalkorDB triples store/query scoped client lifecycle. -2. Qdrant config/schema/fakeable construction cleanup. -3. Client streaming facade completion normalization. +1. Qdrant config/schema/fakeable construction cleanup. +2. Client streaming facade completion normalization. +3. Provider layer and Effect AI cleanup. 4. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/falkordb-lifecycle.test.ts b/ts/packages/flow/src/__tests__/falkordb-lifecycle.test.ts new file mode 100644 index 00000000..43bd400f --- /dev/null +++ b/ts/packages/flow/src/__tests__/falkordb-lifecycle.test.ts @@ -0,0 +1,102 @@ +import { Effect } from "effect"; +import { describe, expect, it } from "vitest"; +import { + FalkorDBTriplesQueryLive, + FalkorDBTriplesQueryService, + type FalkorDBClosableClient as FalkorDBQueryClient, + type FalkorDBQueryGraph, +} from "../query/triples/falkordb.js"; +import { + FalkorDBTriplesStoreLive, + FalkorDBTriplesStoreService, + type FalkorDBClosableClient as FalkorDBStoreClient, + type FalkorDBStoreGraph, +} from "../storage/triples/falkordb.js"; + +class FakeFalkorDBClient implements FalkorDBStoreClient, FalkorDBQueryClient { + connectCount = 0; + disconnectCount = 0; + + async connect(): Promise { + this.connectCount += 1; + } + + async disconnect(): Promise { + this.disconnectCount += 1; + } +} + +class FakeStoreGraph implements FalkorDBStoreGraph { + readonly queries: string[] = []; + + async query(query: string): Promise<{ readonly data?: Array }> { + this.queries.push(query); + return {}; + } +} + +class FakeQueryGraph implements FalkorDBQueryGraph { + readonly queries: string[] = []; + + async query(query: string): Promise<{ readonly data?: Array }> { + this.queries.push(query); + return {}; + } +} + +describe("FalkorDB scoped layers", () => { + it("connects and disconnects the triples store client with the layer scope", async () => { + const client = new FakeFalkorDBClient(); + const graph = new FakeStoreGraph(); + + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const store = yield* FalkorDBTriplesStoreService; + yield* store.deleteCollection("alice", "demo"); + }).pipe( + Effect.provide( + FalkorDBTriplesStoreLive({ + url: "redis://falkor.test:6379", + database: "test-store", + clientFactory: () => client, + graphFactory: () => graph, + }), + ), + ), + ), + ); + + expect(client.connectCount).toBe(1); + expect(client.disconnectCount).toBe(1); + expect(graph.queries).toHaveLength(3); + }); + + it("connects and disconnects the triples query client with the layer scope", async () => { + const client = new FakeFalkorDBClient(); + const graph = new FakeQueryGraph(); + + const triples = await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const query = yield* FalkorDBTriplesQueryService; + return yield* query.queryTriples(undefined, undefined, undefined, 10); + }).pipe( + Effect.provide( + FalkorDBTriplesQueryLive({ + url: "redis://falkor.test:6379", + database: "test-query", + clientFactory: () => client, + graphFactory: () => graph, + }), + ), + ), + ), + ); + + expect(triples).toEqual([]); + expect(client.connectCount).toBe(1); + expect(client.disconnectCount).toBe(1); + expect(graph.queries).toHaveLength(2); + }); +}); diff --git a/ts/packages/flow/src/query/triples/falkordb-service.ts b/ts/packages/flow/src/query/triples/falkordb-service.ts index e94192fd..71fee7bd 100644 --- a/ts/packages/flow/src/query/triples/falkordb-service.ts +++ b/ts/packages/flow/src/query/triples/falkordb-service.ts @@ -11,8 +11,10 @@ import { makeFlowProcessor, makeConsumerSpec, makeProducerSpec, + processorLifecycleError, type ProcessorConfig, type FlowProcessorRuntime, + type FlowProcessorStartEffect, type FlowContext, type FlowResourceNotFoundError, type MessagingDeliveryError, @@ -26,8 +28,9 @@ import { Effect, Layer, ManagedRuntime } from "effect"; import { FalkorDBTriplesQueryLive, FalkorDBTriplesQueryService, - makeFalkorDBTriplesQueryService, + makeFalkorDBTriplesQueryServiceScoped, type FalkorDBQueryConfig, + type FalkorDBTriplesQueryError, } from "./falkordb.js"; const TriplesResponseProducer = makeProducerSpec("triples-response"); @@ -79,17 +82,25 @@ export const makeTriplesQuerySpecs = (): ReadonlyArray; +const provideFalkorDBTriplesQuery = (processorId: string) => + Effect.fn("TriplesQueryService.provideFalkorDB")(function* ( + effect: FlowProcessorStartEffect, + ) { + const query = yield* makeFalkorDBTriplesQueryServiceScoped().pipe( + Effect.mapError((error) => processorLifecycleError(processorId, "falkordb-query-connect", error)), + ); + yield* effect.pipe( + Effect.provideService( + FalkorDBTriplesQueryService, + FalkorDBTriplesQueryService.of(query), + ), + ); + }); + export function makeTriplesQueryService(config: ProcessorConfig): TriplesQueryService { - const query = makeFalkorDBTriplesQueryService(); const service = makeFlowProcessor(config, { specifications: makeTriplesQuerySpecs(), - provide: (effect) => - effect.pipe( - Effect.provideService( - FalkorDBTriplesQueryService, - FalkorDBTriplesQueryService.of(query), - ), - ), + provide: provideFalkorDBTriplesQuery(config.id), }); void Effect.runPromise(Effect.log("[TriplesQuery] Service initialized")); return service; @@ -97,7 +108,11 @@ export function makeTriplesQueryService(config: ProcessorConfig): TriplesQuerySe export const TriplesQueryService = makeTriplesQueryService; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + ProcessorConfig & FalkorDBQueryConfig, + FalkorDBTriplesQueryError, + FalkorDBTriplesQueryService +>({ id: "triples-query", specs: () => makeTriplesQuerySpecs(), layer: (config) => FalkorDBTriplesQueryLive(config), diff --git a/ts/packages/flow/src/query/triples/falkordb.ts b/ts/packages/flow/src/query/triples/falkordb.ts index 8aa1558d..e2ec1bd8 100644 --- a/ts/packages/flow/src/query/triples/falkordb.ts +++ b/ts/packages/flow/src/query/triples/falkordb.ts @@ -9,11 +9,34 @@ import { createClient, Graph } from "falkordb"; import { errorMessage, type Term, type Triple } from "@trustgraph/base"; import { Config, Context, Effect, Layer } from "effect"; +import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; +export interface FalkorDBClosableClient { + readonly connect: () => Promise; + readonly disconnect: () => Promise; +} + +export type FalkorDBQueryOptions = Parameters[1]; + +export interface FalkorDBQueryGraph { + readonly query: ( + query: string, + options?: FalkorDBQueryOptions, + ) => Promise<{ readonly data?: Array }>; +} + +export type FalkorDBQueryClientFactory = (url: string) => FalkorDBClosableClient; +export type FalkorDBQueryGraphFactory = ( + client: FalkorDBClosableClient, + database: string, +) => FalkorDBQueryGraph; + export interface FalkorDBQueryConfig { url?: string; database?: string; + clientFactory?: FalkorDBQueryClientFactory; + graphFactory?: FalkorDBQueryGraphFactory; } function termToValue(term: Term | undefined): string | null { @@ -41,7 +64,9 @@ function createTerm(value: string): Term { } function field(row: unknown, key: string): string { - return (row as Record)?.[key] as string ?? ""; + if (!Predicate.hasProperty(row, key)) return ""; + const value = row[key]; + return typeof value === "string" ? value : ""; } export interface FalkorDBTriplesQuery { @@ -86,11 +111,10 @@ const falkorDBTriplesQueryError = (operation: string, cause: unknown): FalkorDBT }); interface FalkorDBQueryConnection { - readonly graph: Graph; + readonly client: FalkorDBClosableClient; + readonly graph: FalkorDBQueryGraph; } -type FalkorDBQueryOptions = Parameters[1]; - const resolveFalkorDBQueryConfig = Effect.fn("FalkorDBTriplesQuery.resolveConfig")(function* ( config: FalkorDBQueryConfig, ) { @@ -109,8 +133,25 @@ const connectFalkorDBTriplesQuery = ( ): Effect.Effect => Effect.gen(function* () { const { url, database } = yield* resolveFalkorDBQueryConfig(config); + const clientFactory = config.clientFactory; + const graphFactory = config.graphFactory; + + if ( + (clientFactory === undefined && graphFactory !== undefined) || + (clientFactory !== undefined && graphFactory === undefined) + ) { + return yield* falkorDBTriplesQueryError( + "create-client", + "FalkorDB custom clientFactory and graphFactory must be configured together", + ); + } + const { client, graph } = yield* Effect.try({ try: () => { + if (clientFactory !== undefined && graphFactory !== undefined) { + const client = clientFactory(url); + return { client, graph: graphFactory(client, database) }; + } const client = createClient({ url }); return { client, graph: new Graph(client, database) }; }, @@ -130,11 +171,35 @@ const connectFalkorDBTriplesQuery = ( ); yield* Effect.log(`[FalkorDBTriplesQuery] Connected to ${url}, graph: ${database}`); - return { graph }; + return { client, graph }; }); +const disconnectFalkorDBTriplesQuery = ( + connection: FalkorDBQueryConnection, +): Effect.Effect => + Effect.tryPromise({ + try: () => connection.client.disconnect(), + catch: (cause) => falkorDBTriplesQueryError("disconnect", cause), + }).pipe( + Effect.catch((error) => + Effect.logError("[FalkorDBTriplesQuery] Disconnect failed", { + error: error.message, + operation: error.operation, + }), + ), + Effect.asVoid, + ); + +const acquireFalkorDBTriplesQuery = ( + config: FalkorDBQueryConfig, +) => + Effect.acquireRelease( + connectFalkorDBTriplesQuery(config), + (connection) => disconnectFalkorDBTriplesQuery(connection), + ); + const queryRows = ( - graph: Graph, + graph: FalkorDBQueryGraph, operation: string, query: string, options?: FalkorDBQueryOptions, @@ -147,7 +212,7 @@ const queryRows = ( ); const matchPattern = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], sv: string, pv: string, @@ -171,7 +236,7 @@ const matchPattern = ( }); const matchSP = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], sv: string, pv: string, @@ -202,7 +267,7 @@ const matchSP = ( }); const matchSO = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], sv: string, ov: string, @@ -224,7 +289,7 @@ const matchSO = ( }); const matchPO = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], pv: string, ov: string, @@ -246,7 +311,7 @@ const matchPO = ( }); const matchS = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], sv: string, limit: number, @@ -276,7 +341,7 @@ const matchS = ( }); const matchP = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], pv: string, limit: number, @@ -306,7 +371,7 @@ const matchP = ( }); const matchO = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], ov: string, limit: number, @@ -327,7 +392,7 @@ const matchO = ( }); const matchAll = ( - graph: Graph, + graph: FalkorDBQueryGraph, out: [string, string, string][], limit: number, ): Effect.Effect => @@ -395,45 +460,67 @@ const queryTriplesEffect = ( }); const makeFalkorDBTriplesQueryEffect = ( + getConnection: () => Effect.Effect, +): FalkorDBTriplesQueryServiceShape => ({ + queryTriples: Effect.fn("FalkorDBTriplesQuery.queryTriples")(( + s: Term | undefined, + p: Term | undefined, + o: Term | undefined, + limit: number, + ) => queryTriplesEffect(getConnection, s, p, o, limit)), +}); + +const makeFalkorDBTriplesQueryEffectScoped = ( config: FalkorDBQueryConfig = {}, -): FalkorDBTriplesQueryServiceShape => { - let cachedConnection: Effect.Effect | undefined; +) => + acquireFalkorDBTriplesQuery(config).pipe( + Effect.map((connection) => makeFalkorDBTriplesQueryEffect(() => Effect.succeed(connection))), + ); - const getConnection = Effect.fn("FalkorDBTriplesQuery.connection")(function* () { - if (cachedConnection === undefined) { - cachedConnection = yield* Effect.cached(connectFalkorDBTriplesQuery(config)); - } - return yield* cachedConnection; - }); - - return { - queryTriples: Effect.fn("FalkorDBTriplesQuery.queryTriples")(( - s: Term | undefined, - p: Term | undefined, - o: Term | undefined, - limit: number, - ) => queryTriplesEffect(getConnection, s, p, o, limit)), - }; -}; +const withFalkorDBTriplesQuery = ( + config: FalkorDBQueryConfig, + use: (query: FalkorDBTriplesQueryServiceShape) => Effect.Effect, +) => + Effect.scoped( + makeFalkorDBTriplesQueryEffectScoped(config).pipe( + Effect.flatMap(use), + ), + ); export function makeFalkorDBTriplesQuery( config: FalkorDBQueryConfig = {}, ): FalkorDBTriplesQuery { - const query = makeFalkorDBTriplesQueryEffect(config); return { queryTriples: (s, p, o, limit = 100) => - Effect.runPromise(query.queryTriples(s, p, o, limit)).then((triples) => Array.from(triples)), + Effect.runPromise( + withFalkorDBTriplesQuery(config, (query) => query.queryTriples(s, p, o, limit)), + ).then((triples) => Array.from(triples)), }; } export const makeFalkorDBTriplesQueryService = ( config: FalkorDBQueryConfig = {}, -): FalkorDBTriplesQueryServiceShape => makeFalkorDBTriplesQueryEffect(config); +): FalkorDBTriplesQueryServiceShape => ({ + queryTriples: (s, p, o, limit) => + withFalkorDBTriplesQuery(config, (query) => query.queryTriples(s, p, o, limit)), +}); + +export const makeFalkorDBTriplesQueryServiceFromConnection = ( + connection: FalkorDBQueryConnection, +): FalkorDBTriplesQueryServiceShape => + makeFalkorDBTriplesQueryEffect(() => Effect.succeed(connection)); + +export const makeFalkorDBTriplesQueryServiceScoped = ( + config: FalkorDBQueryConfig = {}, +) => + makeFalkorDBTriplesQueryEffectScoped(config); export const FalkorDBTriplesQueryLive = ( config: FalkorDBQueryConfig = {}, -): Layer.Layer => - Layer.succeed( +): Layer.Layer => + Layer.effect( FalkorDBTriplesQueryService, - FalkorDBTriplesQueryService.of(makeFalkorDBTriplesQueryService(config)), + makeFalkorDBTriplesQueryServiceScoped(config).pipe( + Effect.map((service) => FalkorDBTriplesQueryService.of(service)), + ), ); diff --git a/ts/packages/flow/src/storage/triples/falkordb-service.ts b/ts/packages/flow/src/storage/triples/falkordb-service.ts index 0810a4d4..974c5f14 100644 --- a/ts/packages/flow/src/storage/triples/falkordb-service.ts +++ b/ts/packages/flow/src/storage/triples/falkordb-service.ts @@ -11,8 +11,10 @@ import { makeFlowProcessor, makeConsumerSpec, + processorLifecycleError, type ProcessorConfig, type FlowProcessorRuntime, + type FlowProcessorStartEffect, type FlowContext, type Triples, type Spec, @@ -23,7 +25,7 @@ import { Effect, Layer, ManagedRuntime } from "effect"; import { FalkorDBTriplesStoreLive, FalkorDBTriplesStoreService, - makeFalkorDBTriplesStoreService, + makeFalkorDBTriplesStoreServiceScoped, type FalkorDBConfig, type FalkorDBTriplesStoreError, } from "./falkordb.js"; @@ -55,17 +57,25 @@ export const makeTriplesStoreSpecs = (): ReadonlyArray; +const provideFalkorDBTriplesStore = (processorId: string) => + Effect.fn("TriplesStoreService.provideFalkorDB")(function* ( + effect: FlowProcessorStartEffect, + ) { + const store = yield* makeFalkorDBTriplesStoreServiceScoped().pipe( + Effect.mapError((error) => processorLifecycleError(processorId, "falkordb-store-connect", error)), + ); + yield* effect.pipe( + Effect.provideService( + FalkorDBTriplesStoreService, + FalkorDBTriplesStoreService.of(store), + ), + ); + }); + export function makeTriplesStoreService(config: ProcessorConfig): TriplesStoreService { - const store = makeFalkorDBTriplesStoreService(); const service = makeFlowProcessor(config, { specifications: makeTriplesStoreSpecs(), - provide: (effect) => - effect.pipe( - Effect.provideService( - FalkorDBTriplesStoreService, - FalkorDBTriplesStoreService.of(store), - ), - ), + provide: provideFalkorDBTriplesStore(config.id), }); void Effect.runPromise(Effect.log("[TriplesStore] Service initialized")); return service; @@ -73,7 +83,11 @@ export function makeTriplesStoreService(config: ProcessorConfig): TriplesStoreSe export const TriplesStoreService = makeTriplesStoreService; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + ProcessorConfig & FalkorDBConfig, + FalkorDBTriplesStoreError, + FalkorDBTriplesStoreService +>({ id: "triples-store", specs: () => makeTriplesStoreSpecs(), layer: (config) => FalkorDBTriplesStoreLive(config), diff --git a/ts/packages/flow/src/storage/triples/falkordb.ts b/ts/packages/flow/src/storage/triples/falkordb.ts index 3f052c2a..45d986f1 100644 --- a/ts/packages/flow/src/storage/triples/falkordb.ts +++ b/ts/packages/flow/src/storage/triples/falkordb.ts @@ -12,9 +12,31 @@ import { errorMessage, type Term, type Triple } from "@trustgraph/base"; import { Config, Context, Effect, Layer } from "effect"; import * as S from "effect/Schema"; +export interface FalkorDBClosableClient { + readonly connect: () => Promise; + readonly disconnect: () => Promise; +} + +export type FalkorDBStoreQueryOptions = Parameters[1]; + +export interface FalkorDBStoreGraph { + readonly query: ( + query: string, + options?: FalkorDBStoreQueryOptions, + ) => Promise<{ readonly data?: Array }>; +} + +export type FalkorDBStoreClientFactory = (url: string) => FalkorDBClosableClient; +export type FalkorDBStoreGraphFactory = ( + client: FalkorDBClosableClient, + database: string, +) => FalkorDBStoreGraph; + export interface FalkorDBConfig { url?: string; database?: string; + clientFactory?: FalkorDBStoreClientFactory; + graphFactory?: FalkorDBStoreGraphFactory; } function getTermValue(term: Term): string { @@ -91,11 +113,10 @@ const falkorDBTriplesStoreError = (operation: string, cause: unknown): FalkorDBT }); interface FalkorDBStoreConnection { - readonly graph: Graph; + readonly client: FalkorDBClosableClient; + readonly graph: FalkorDBStoreGraph; } -type FalkorDBQueryOptions = Parameters[1]; - interface FalkorDBTriplesStoreEffectShape { readonly createNode: ( uri: string, @@ -150,8 +171,25 @@ const connectFalkorDBTriplesStore = ( ): Effect.Effect => Effect.gen(function* () { const { url, database } = yield* resolveFalkorDBStoreConfig(config); + const clientFactory = config.clientFactory; + const graphFactory = config.graphFactory; + + if ( + (clientFactory === undefined && graphFactory !== undefined) || + (clientFactory !== undefined && graphFactory === undefined) + ) { + return yield* falkorDBTriplesStoreError( + "create-client", + "FalkorDB custom clientFactory and graphFactory must be configured together", + ); + } + const { client, graph } = yield* Effect.try({ try: () => { + if (clientFactory !== undefined && graphFactory !== undefined) { + const client = clientFactory(url); + return { client, graph: graphFactory(client, database) }; + } const client = createClient({ url }); return { client, graph: new Graph(client, database) }; }, @@ -171,14 +209,38 @@ const connectFalkorDBTriplesStore = ( ); yield* Effect.log(`[FalkorDBTriplesStore] Connected to ${url}, graph: ${database}`); - return { graph }; + return { client, graph }; }); +const disconnectFalkorDBTriplesStore = ( + connection: FalkorDBStoreConnection, +): Effect.Effect => + Effect.tryPromise({ + try: () => connection.client.disconnect(), + catch: (cause) => falkorDBTriplesStoreError("disconnect", cause), + }).pipe( + Effect.catch((error) => + Effect.logError("[FalkorDBTriplesStore] Disconnect failed", { + error: error.message, + operation: error.operation, + }), + ), + Effect.asVoid, + ); + +const acquireFalkorDBTriplesStore = ( + config: FalkorDBConfig, +) => + Effect.acquireRelease( + connectFalkorDBTriplesStore(config), + (connection) => disconnectFalkorDBTriplesStore(connection), + ); + const runGraphQuery = ( - graph: Graph, + graph: FalkorDBStoreGraph, operation: string, query: string, - options?: FalkorDBQueryOptions, + options?: FalkorDBStoreQueryOptions, ): Effect.Effect => Effect.tryPromise({ try: () => graph.query(query, options), @@ -188,17 +250,8 @@ const runGraphQuery = ( ); const makeFalkorDBTriplesStoreEffect = ( - config: FalkorDBConfig = {}, + getConnection: () => Effect.Effect, ): FalkorDBTriplesStoreEffectShape => { - let cachedConnection: Effect.Effect | undefined; - - const getConnection = Effect.fn("FalkorDBTriplesStore.connection")(function* () { - if (cachedConnection === undefined) { - cachedConnection = yield* Effect.cached(connectFalkorDBTriplesStore(config)); - } - return yield* cachedConnection; - }); - const createNode = Effect.fn("FalkorDBTriplesStore.createNode")(function* ( uri: string, user: string, @@ -320,38 +373,75 @@ const makeFalkorDBTriplesStoreEffect = ( }; }; +const makeFalkorDBTriplesStoreEffectScoped = ( + config: FalkorDBConfig = {}, +) => + acquireFalkorDBTriplesStore(config).pipe( + Effect.map((connection) => makeFalkorDBTriplesStoreEffect(() => Effect.succeed(connection))), + ); + +const withFalkorDBTriplesStore = ( + config: FalkorDBConfig, + use: (store: FalkorDBTriplesStoreEffectShape) => Effect.Effect, +) => + Effect.scoped( + makeFalkorDBTriplesStoreEffectScoped(config).pipe( + Effect.flatMap(use), + ), + ); + export function makeFalkorDBTriplesStore(config: FalkorDBConfig = {}): FalkorDBTriplesStore { - const store = makeFalkorDBTriplesStoreEffect(config); return { createNode: (uri, user, collection) => - Effect.runPromise(store.createNode(uri, user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.createNode(uri, user, collection))), createLiteral: (value, user, collection) => - Effect.runPromise(store.createLiteral(value, user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.createLiteral(value, user, collection))), relateNode: (src, uri, dest, user, collection) => - Effect.runPromise(store.relateNode(src, uri, dest, user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.relateNode(src, uri, dest, user, collection))), relateLiteral: (src, uri, dest, user, collection) => - Effect.runPromise(store.relateLiteral(src, uri, dest, user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.relateLiteral(src, uri, dest, user, collection))), storeTriples: (triples, user = "default", collection = "default") => - Effect.runPromise(store.storeTriples(triples, user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.storeTriples(triples, user, collection))), deleteCollection: (user, collection) => - Effect.runPromise(store.deleteCollection(user, collection)), + Effect.runPromise(withFalkorDBTriplesStore(config, (store) => store.deleteCollection(user, collection))), }; } export const makeFalkorDBTriplesStoreService = ( config: FalkorDBConfig = {}, +): FalkorDBTriplesStoreServiceShape => ({ + storeTriples: (triples, user, collection) => + withFalkorDBTriplesStore(config, (store) => store.storeTriples(triples, user, collection)), + deleteCollection: (user, collection) => + withFalkorDBTriplesStore(config, (store) => store.deleteCollection(user, collection)), +}); + +export const makeFalkorDBTriplesStoreServiceFromConnection = ( + connection: FalkorDBStoreConnection, ): FalkorDBTriplesStoreServiceShape => { - const store = makeFalkorDBTriplesStoreEffect(config); + const store = makeFalkorDBTriplesStoreEffect(() => Effect.succeed(connection)); return { storeTriples: store.storeTriples, deleteCollection: store.deleteCollection, }; }; +export const makeFalkorDBTriplesStoreServiceScoped = ( + config: FalkorDBConfig = {}, +) => + makeFalkorDBTriplesStoreEffectScoped(config).pipe( + Effect.map((store) => ({ + storeTriples: store.storeTriples, + deleteCollection: store.deleteCollection, + })), + ); + export const FalkorDBTriplesStoreLive = ( config: FalkorDBConfig = {}, -): Layer.Layer => - Layer.succeed( +): Layer.Layer => + Layer.effect( FalkorDBTriplesStoreService, - FalkorDBTriplesStoreService.of(makeFalkorDBTriplesStoreService(config)), + makeFalkorDBTriplesStoreServiceScoped(config).pipe( + Effect.map((service) => FalkorDBTriplesStoreService.of(service)), + ), );