From 8287e1cf930baf22c0b8e9107ca667ffc8ff14ee Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 04:10:03 -0500 Subject: [PATCH] Add fakeable Qdrant Effect services --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 85 ++++---- .../src/__tests__/qdrant-embeddings.test.ts | 206 ++++++++++++++++++ ts/packages/flow/src/qdrant/client.ts | 84 +++++++ .../query/embeddings/qdrant-doc-service.ts | 37 +++- .../flow/src/query/embeddings/qdrant-doc.ts | 104 ++++++--- .../query/embeddings/qdrant-graph-service.ts | 37 +++- .../flow/src/query/embeddings/qdrant-graph.ts | 88 ++++++-- .../embeddings/graph-embeddings-service.ts | 32 ++- .../flow/src/storage/embeddings/qdrant-doc.ts | 71 +++++- .../src/storage/embeddings/qdrant-graph.ts | 81 +++++-- 10 files changed, 671 insertions(+), 154 deletions(-) create mode 100644 ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts create mode 100644 ts/packages/flow/src/qdrant/client.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 91b8ba4c..2efaf920 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,14 +12,15 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 FalkorDB scoped -client lifecycle slice: +Current signal counts from `ts/packages` after the 2026-06-02 Qdrant +config/schema/fakeability slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 165 | +| `Effect.runPromise` | 172 | | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | +| `Layer.succeed` | 19 | | `Map<` | 82 | | `WebSocket` | 62 | | `new Map` | 60 | @@ -105,6 +106,14 @@ Notes: `Effect.acquireRelease` and disconnect them on scope close. The `Effect.runPromise` count increased by two because the new lifecycle tests run scoped programs at the test boundary. +- The Qdrant config/schema/fakeability slice removed direct production + `new QdrantClient`, sync config loading, payload casts, and Qdrant + `Layer.succeed` service construction from graph/doc store/query modules. + The installed Qdrant client exposes no public close/disconnect method, so + this remains a fakeable construction and Schema decode slice rather than a + scoped finalizer slice. `Effect.runPromise` increased because the new tests + and legacy service initialization logs run Effects at compatibility + boundaries. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -785,11 +794,6 @@ Notes: instead of record/string type assertions. - New lifecycle tests use fake clients/graphs to prove connect on acquire and disconnect on scope close for both triples store and triples query. -- Remaining: - - Qdrant graph/doc store/query construction still needs the next - config/schema/fakeability cleanup. The installed Qdrant client exposes no - close/disconnect method, so this should not be treated as a lifecycle - finalizer slice. - Verification: - `bunx --bun vitest run src/__tests__/falkordb-lifecycle.test.ts` - `bun run --cwd ts/packages/flow build` @@ -799,6 +803,35 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Qdrant Config, Schema, And Fakeable Construction Slice + +- Status: migrated and root-verified. +- Completed: + - Added `ts/packages/flow/src/qdrant/client.ts` as the narrow fakeable + Qdrant surface used by graph/doc embedding store/query modules. + - Graph and document Qdrant store/query constructors now create clients + through `Effect.try`, load Qdrant config in Effect, and map config/client + failures into their existing `S.TaggedErrorClass` errors. + - Graph and document query payload extraction now uses + `Schema.decodeUnknownEffect(...).pipe(Effect.option)` and skips malformed + Qdrant payloads without type assertions. + - Qdrant graph/doc query Live layers and graph store Live layer now use + `Layer.effect` instead of preconstructing services with `Layer.succeed`. + - Legacy graph store/query/doc query processor providers now acquire Qdrant + services with named `Effect.fn` providers and map startup failures to + `ProcessorLifecycleError`. + - The installed Qdrant client still has no public close/disconnect method, + so no `Effect.acquireRelease` finalizer was added for Qdrant. +- Verification: + - `bunx --bun vitest run src/__tests__/qdrant-embeddings.test.ts` + - `bun run --cwd ts/packages/flow build` + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -854,36 +887,15 @@ Notes: - FalkorDB scoped lifecycle is complete for triples query/store. Use the fakeable client/graph factory pattern from that slice for future storage client tests. - - Qdrant has no close/disconnect surface in the installed client, so treat it - as a config/schema/fakeability slice rather than an `acquireRelease` close - slice. + - Qdrant config/schema/fakeability is complete for graph/doc embedding + store/query modules. Qdrant still has no close/disconnect surface in the + installed client, so do not reopen it as an `acquireRelease` close slice + without new SDK evidence. - Ollama/OpenAI-compatible/provider surfaces still need config, schema, and provider-layer audits. ## Ranked Findings -### P1: Qdrant Config, Schema, And Fakeable Construction Cleanup - -- TrustGraph evidence: - - `ts/packages/flow/src/storage/embeddings/qdrant-graph.ts` - - `ts/packages/flow/src/storage/embeddings/qdrant-doc.ts` - - `ts/packages/flow/src/query/embeddings/qdrant-graph.ts` - - `ts/packages/flow/src/query/embeddings/qdrant-doc.ts` -- Effect primitives: - - `Config`, `ConfigProvider`, `Layer.effect`, `Schema.decodeUnknownEffect`, - `Predicate`, `Option`. -- Rewrite shape: - - Move Qdrant config loading out of sync factory construction and into - Effect config/layer paths. - - Add fakeable Qdrant client construction before behavior changes. - - Decode query payloads through Schema instead of manual payload casts. - - Do not add an `acquireRelease` finalizer unless a concrete close API is - found in the installed Qdrant client. -- Tests: - - Qdrant graph/doc store/query tests with fake clients. - - Config tests with `ConfigProvider.fromUnknown`. - - Schema decode failure tests for malformed payloads. - ### P2: Provider Layer And Effect AI Cleanup - TrustGraph evidence: @@ -936,10 +948,9 @@ Notes: ## Recommended PR Order -1. Qdrant config/schema/fakeable construction cleanup. -2. Client streaming facade completion normalization. -3. Provider layer and Effect AI cleanup. -4. MCP parity/deletion decision and workbench platform polish. +1. Client streaming facade completion normalization. +2. Provider layer and Effect AI cleanup. +3. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts b/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts new file mode 100644 index 00000000..9a5956db --- /dev/null +++ b/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts @@ -0,0 +1,206 @@ +import { Effect } from "effect"; +import { describe, expect, it } from "vitest"; +import { + QdrantDocEmbeddingsQueryLive, + QdrantDocEmbeddingsQueryService, +} from "../query/embeddings/qdrant-doc.js"; +import { + QdrantGraphEmbeddingsQueryLive, + QdrantGraphEmbeddingsQueryService, +} from "../query/embeddings/qdrant-graph.js"; +import type { QdrantClientLike, QdrantScoredPoint } from "../qdrant/client.js"; +import { makeQdrantDocEmbeddingsStore } from "../storage/embeddings/qdrant-doc.js"; +import { + QdrantGraphEmbeddingsStoreLive, + QdrantGraphEmbeddingsStoreService, +} from "../storage/embeddings/qdrant-graph.js"; +import type { Term } from "@trustgraph/base"; + +interface FakePoint { + readonly id: string; + readonly vector: ReadonlyArray; + readonly payload?: Record; +} + +class FakeQdrantClient implements QdrantClientLike { + readonly collections = new Set(); + readonly createdCollections: Array<{ readonly name: string; readonly size: number }> = []; + readonly upserts: Array<{ + readonly collectionName: string; + readonly points: ReadonlyArray; + }> = []; + readonly deletedCollections: string[] = []; + searchResults: ReadonlyArray = []; + + async collectionExists(collectionName: string): Promise<{ readonly exists: boolean }> { + return { exists: this.collections.has(collectionName) }; + } + + async createCollection( + collectionName: string, + options: { readonly vectors: { readonly size: number; readonly distance: "Cosine" } }, + ): Promise { + this.collections.add(collectionName); + this.createdCollections.push({ name: collectionName, size: options.vectors.size }); + } + + async upsert( + collectionName: string, + options: { readonly points: ReadonlyArray }, + ): Promise { + this.upserts.push({ collectionName, points: options.points }); + } + + async getCollections(): Promise<{ readonly collections: ReadonlyArray<{ readonly name: string }> }> { + return { collections: Array.from(this.collections, (name) => ({ name })) }; + } + + async deleteCollection(collectionName: string): Promise { + this.collections.delete(collectionName); + this.deletedCollections.push(collectionName); + } + + async search( + _collectionName: string, + _options: { + readonly vector: ReadonlyArray; + readonly limit: number; + readonly with_payload: boolean; + }, + ): Promise> { + return this.searchResults; + } +} + +describe("Qdrant embeddings", () => { + it("queries graph payloads through Schema and skips malformed points", async () => { + const client = new FakeQdrantClient(); + client.collections.add("t_alice_demo_2"); + client.searchResults = [ + { score: 0.9, payload: { entity: "https://example.com/entity" } }, + { score: 0.8, payload: { entity: 123 } }, + { score: 0.7, payload: { entity: "" } }, + { score: 0.6, payload: { entity: "plain entity" } }, + ]; + + const matches = await Effect.runPromise( + Effect.gen(function* () { + const query = yield* QdrantGraphEmbeddingsQueryService; + return yield* query.query({ + vector: [0.1, 0.2], + user: "alice", + collection: "demo", + limit: 10, + }); + }).pipe( + Effect.provide( + QdrantGraphEmbeddingsQueryLive({ + url: "http://qdrant.test", + clientFactory: () => client, + }), + ), + ), + ); + + expect(matches).toEqual([ + { + entity: { type: "IRI", iri: "https://example.com/entity" }, + score: 0.9, + }, + { + entity: { type: "LITERAL", value: "plain entity" }, + score: 0.6, + }, + ]); + }); + + it("queries document payloads through Schema and skips malformed points", async () => { + const client = new FakeQdrantClient(); + client.collections.add("d_alice_docs_2"); + client.searchResults = [ + { score: 0.9, payload: { chunk_id: "chunk-a", content: "alpha" } }, + { score: 0.8, payload: { chunk_id: 123, content: "bad" } }, + { score: 0.7, payload: { chunk_id: "" } }, + { score: 0.6, payload: { chunk_id: "chunk-b" } }, + ]; + + const matches = await Effect.runPromise( + Effect.gen(function* () { + const query = yield* QdrantDocEmbeddingsQueryService; + return yield* query.query({ + vector: [0.1, 0.2], + user: "alice", + collection: "docs", + limit: 10, + }); + }).pipe( + Effect.provide( + QdrantDocEmbeddingsQueryLive({ + url: "http://qdrant.test", + clientFactory: () => client, + }), + ), + ), + ); + + expect(matches).toEqual([ + { chunkId: "chunk-a", score: 0.9, content: "alpha" }, + { chunkId: "chunk-b", score: 0.6 }, + ]); + }); + + it("uses an injected graph store client for collection creation and upsert", async () => { + const client = new FakeQdrantClient(); + const entity: Term = { type: "IRI", iri: "https://example.com/entity" }; + + await Effect.runPromise( + Effect.gen(function* () { + const store = yield* QdrantGraphEmbeddingsStoreService; + yield* store.store({ + user: "alice", + collection: "graph", + entities: [{ entity, vector: [1, 2, 3], chunkId: "chunk-a" }], + }); + }).pipe( + Effect.provide( + QdrantGraphEmbeddingsStoreLive({ + url: "http://qdrant.test", + clientFactory: () => client, + }), + ), + ), + ); + + expect(client.createdCollections).toEqual([{ name: "t_alice_graph_3", size: 3 }]); + expect(client.upserts).toHaveLength(1); + expect(client.upserts[0]?.collectionName).toBe("t_alice_graph_3"); + expect(client.upserts[0]?.points[0]?.payload).toEqual({ + entity: "https://example.com/entity", + chunk_id: "chunk-a", + }); + }); + + it("uses an injected document store client for collection creation and upsert", async () => { + const client = new FakeQdrantClient(); + const store = makeQdrantDocEmbeddingsStore({ + url: "http://qdrant.test", + clientFactory: () => client, + }); + + await Effect.runPromise( + store.storeEffect({ + user: "alice", + collection: "docs", + chunks: [{ chunkId: "chunk-a", vector: [1, 2], content: "alpha" }], + }), + ); + + expect(client.createdCollections).toEqual([{ name: "d_alice_docs_2", size: 2 }]); + expect(client.upserts).toHaveLength(1); + expect(client.upserts[0]?.collectionName).toBe("d_alice_docs_2"); + expect(client.upserts[0]?.points[0]?.payload).toEqual({ + chunk_id: "chunk-a", + content: "alpha", + }); + }); +}); diff --git a/ts/packages/flow/src/qdrant/client.ts b/ts/packages/flow/src/qdrant/client.ts new file mode 100644 index 00000000..3a8a6480 --- /dev/null +++ b/ts/packages/flow/src/qdrant/client.ts @@ -0,0 +1,84 @@ +import { QdrantClient, type QdrantClientParams } from "@qdrant/js-client-rest"; + +export interface QdrantCollectionStatus { + readonly exists: boolean; +} + +export interface QdrantCollectionDescription { + readonly name: string; +} + +export interface QdrantCollections { + readonly collections: ReadonlyArray; +} + +export interface QdrantScoredPoint { + readonly score: number; + readonly payload?: unknown; +} + +export interface QdrantClientLike { + readonly collectionExists: (collectionName: string) => Promise; + readonly createCollection: ( + collectionName: string, + options: { + readonly vectors: { + readonly size: number; + readonly distance: "Cosine"; + }; + }, + ) => Promise; + readonly upsert: ( + collectionName: string, + options: { + readonly points: ReadonlyArray<{ + readonly id: string; + readonly vector: ReadonlyArray; + readonly payload?: Record; + }>; + }, + ) => Promise; + readonly getCollections: () => Promise; + readonly deleteCollection: (collectionName: string) => Promise; + readonly search: ( + collectionName: string, + options: { + readonly vector: ReadonlyArray; + readonly limit: number; + readonly with_payload: boolean; + }, + ) => Promise>; +} + +export type QdrantClientFactory = (params: QdrantClientParams) => QdrantClientLike; + +export const makeQdrantClient = ( + factory: QdrantClientFactory | undefined, + params: QdrantClientParams, +): QdrantClientLike => { + if (factory !== undefined) { + return factory(params); + } + + const client = new QdrantClient(params); + return { + collectionExists: (collectionName) => client.collectionExists(collectionName), + createCollection: (collectionName, options) => client.createCollection(collectionName, options), + upsert: (collectionName, options) => + client.upsert(collectionName, { + points: options.points.map((point) => ({ + id: point.id, + vector: Array.from(point.vector), + ...(point.payload !== undefined ? { payload: point.payload } : {}), + })), + }), + getCollections: () => client.getCollections(), + deleteCollection: (collectionName) => client.deleteCollection(collectionName), + search: (collectionName, options) => + client.search(collectionName, { + vector: Array.from(options.vector), + limit: options.limit, + with_payload: options.with_payload, + }), + }; +}; diff --git a/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts b/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts index 075ca9a3..6cb8b55b 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-doc-service.ts @@ -11,8 +11,10 @@ import { makeFlowProcessor, makeConsumerSpec, makeProducerSpec, + processorLifecycleError, type ProcessorConfig, type FlowProcessorRuntime, + type FlowProcessorStartEffect, type FlowContext, type FlowResourceNotFoundError, type MessagingDeliveryError, @@ -26,8 +28,9 @@ import { Effect, Layer, ManagedRuntime } from "effect"; import { QdrantDocEmbeddingsQueryLive, QdrantDocEmbeddingsQueryService, - makeQdrantDocEmbeddingsQueryService, + makeQdrantDocEmbeddingsQueryServiceEffect, type QdrantDocQueryConfig, + type QdrantDocEmbeddingsQueryError, } from "./qdrant-doc.js"; const DocumentEmbeddingsResponseProducer = makeProducerSpec("document-embeddings-response"); @@ -92,25 +95,37 @@ export const makeDocEmbeddingsQuerySpecs = (): ReadonlyArray; +const provideQdrantDocEmbeddingsQuery = (processorId: string) => + Effect.fn("DocEmbeddingsQueryService.provideQdrant")(function* ( + effect: FlowProcessorStartEffect, + ) { + const query = yield* makeQdrantDocEmbeddingsQueryServiceEffect().pipe( + Effect.mapError((error) => processorLifecycleError(processorId, "qdrant-doc-query-connect", error)), + ); + yield* effect.pipe( + Effect.provideService( + QdrantDocEmbeddingsQueryService, + QdrantDocEmbeddingsQueryService.of(query), + ), + ); + }); + export function makeDocEmbeddingsQueryService(config: ProcessorConfig): DocEmbeddingsQueryService { - const query = makeQdrantDocEmbeddingsQueryService(); const service = makeFlowProcessor(config, { specifications: makeDocEmbeddingsQuerySpecs(), - provide: (effect) => - effect.pipe( - Effect.provideService( - QdrantDocEmbeddingsQueryService, - QdrantDocEmbeddingsQueryService.of(query), - ), - ), + provide: provideQdrantDocEmbeddingsQuery(config.id), }); - Effect.runSync(Effect.log("[DocEmbeddingsQuery] Service initialized")); + void Effect.runPromise(Effect.log("[DocEmbeddingsQuery] Service initialized")); return service; } export const DocEmbeddingsQueryService = makeDocEmbeddingsQueryService; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + ProcessorConfig & QdrantDocQueryConfig, + QdrantDocEmbeddingsQueryError, + QdrantDocEmbeddingsQueryService +>({ id: "doc-embeddings-query", specs: () => makeDocEmbeddingsQuerySpecs(), layer: (config) => QdrantDocEmbeddingsQueryLive(config), diff --git a/ts/packages/flow/src/query/embeddings/qdrant-doc.ts b/ts/packages/flow/src/query/embeddings/qdrant-doc.ts index 83d8369d..72fb7a20 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-doc.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-doc.ts @@ -7,15 +7,16 @@ * Python reference: trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py */ -import { QdrantClient } from "@qdrant/js-client-rest"; import { errorMessage } from "@trustgraph/base"; import { Config, Context, Effect, Layer } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; +import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; export interface QdrantDocQueryConfig { url?: string; apiKey?: string; + clientFactory?: QdrantClientFactory; } export interface ChunkMatch { @@ -63,25 +64,37 @@ const loadQdrantDocQueryConfig = Effect.fn("QdrantDocEmbeddingsQuery.loadConfig" } satisfies ResolvedQdrantDocQueryConfig; }); +const DocPointPayloadSchema = S.Struct({ + chunk_id: S.String, + content: S.optionalKey(S.String), +}); + +const decodeDocPointPayload = (payload: unknown) => + S.decodeUnknownEffect(DocPointPayloadSchema)(payload).pipe(Effect.option); + export interface QdrantDocEmbeddingsQuery { - readonly query: (request: DocEmbeddingsQueryRequest) => Promise; + readonly query: (request: DocEmbeddingsQueryRequest) => Promise>; readonly queryEffect: ( request: DocEmbeddingsQueryRequest, ) => Effect.Effect, QdrantDocEmbeddingsQueryError>; } -export function makeQdrantDocEmbeddingsQuery( - config: QdrantDocQueryConfig = {}, -): QdrantDocEmbeddingsQuery { - const resolved = Effect.runSync(loadQdrantDocQueryConfig(config)); - - const client = new QdrantClient({ - url: resolved.url, - ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), +const makeQdrantDocEmbeddingsQueryClient = ( + config: QdrantDocQueryConfig, + resolved: ResolvedQdrantDocQueryConfig, +) => + Effect.try({ + try: () => + makeQdrantClient(config.clientFactory, { + url: resolved.url, + ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), + }), + catch: (cause) => qdrantDocEmbeddingsQueryError("create-client", cause), }); - Effect.runSync(Effect.log("[QdrantDocQuery] Query service initialized")); - +const makeQdrantDocEmbeddingsQueryFromClient = ( + client: QdrantClientLike, +): QdrantDocEmbeddingsQueryServiceShape => { const queryEffect = Effect.fn("QdrantDocEmbeddingsQuery.query")(function* (request: DocEmbeddingsQueryRequest) { const { vector, user, collection, limit } = request; @@ -116,20 +129,52 @@ export function makeQdrantDocEmbeddingsQuery( const chunks: ChunkMatch[] = []; for (const point of searchResult) { - const payload = point.payload as Record | undefined; - const chunkId = payload?.chunk_id as string | undefined; - if (chunkId !== undefined && chunkId.length > 0) { - chunks.push({ - chunkId, - score: point.score, - ...(typeof payload?.content === "string" ? { content: payload.content } : {}), - }); - } + const payload = yield* decodeDocPointPayload(point.payload); + if (O.isNone(payload)) continue; + + const chunkId = payload.value.chunk_id; + if (chunkId.length === 0) continue; + + chunks.push({ + chunkId, + score: point.score, + ...(payload.value.content !== undefined ? { content: payload.value.content } : {}), + }); } return chunks; }); + return { + query: queryEffect, + }; +}; + +export const makeQdrantDocEmbeddingsQueryServiceEffect = Effect.fn( + "makeQdrantDocEmbeddingsQueryServiceEffect", +)(function* (config: QdrantDocQueryConfig = {}) { + const resolved = yield* loadQdrantDocQueryConfig(config).pipe( + Effect.mapError((cause) => qdrantDocEmbeddingsQueryError("load-config", cause)), + ); + const client = yield* makeQdrantDocEmbeddingsQueryClient(config, resolved); + yield* Effect.log("[QdrantDocQuery] Query service initialized"); + return makeQdrantDocEmbeddingsQueryFromClient(client); +}); + +const withQdrantDocEmbeddingsQuery = ( + config: QdrantDocQueryConfig, + use: (query: QdrantDocEmbeddingsQueryServiceShape) => Effect.Effect, +) => + makeQdrantDocEmbeddingsQueryServiceEffect(config).pipe( + Effect.flatMap(use), + ); + +export function makeQdrantDocEmbeddingsQuery( + config: QdrantDocQueryConfig = {}, +): QdrantDocEmbeddingsQuery { + const queryEffect = (request: DocEmbeddingsQueryRequest) => + withQdrantDocEmbeddingsQuery(config, (query) => query.query(request)); + return { query: (request) => Effect.runPromise(queryEffect(request)), queryEffect, @@ -151,17 +196,16 @@ export class QdrantDocEmbeddingsQueryService extends Context.Service< export const makeQdrantDocEmbeddingsQueryService = ( config: QdrantDocQueryConfig = {}, -): QdrantDocEmbeddingsQueryServiceShape => { - const query = makeQdrantDocEmbeddingsQuery(config); - return { - query: query.queryEffect, - }; -}; +): QdrantDocEmbeddingsQueryServiceShape => ({ + query: (request) => withQdrantDocEmbeddingsQuery(config, (query) => query.query(request)), +}); export const QdrantDocEmbeddingsQueryLive = ( config: QdrantDocQueryConfig = {}, -): Layer.Layer => - Layer.succeed( +): Layer.Layer => + Layer.effect( QdrantDocEmbeddingsQueryService, - QdrantDocEmbeddingsQueryService.of(makeQdrantDocEmbeddingsQueryService(config)), + makeQdrantDocEmbeddingsQueryServiceEffect(config).pipe( + Effect.map((service) => QdrantDocEmbeddingsQueryService.of(service)), + ), ); diff --git a/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts b/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts index d60239b3..9e34f157 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-graph-service.ts @@ -11,8 +11,10 @@ import { makeFlowProcessor, makeConsumerSpec, makeProducerSpec, + processorLifecycleError, type ProcessorConfig, type FlowProcessorRuntime, + type FlowProcessorStartEffect, type FlowContext, type FlowResourceNotFoundError, type MessagingDeliveryError, @@ -26,8 +28,9 @@ import { Effect, Layer, ManagedRuntime } from "effect"; import { QdrantGraphEmbeddingsQueryLive, QdrantGraphEmbeddingsQueryService, - makeQdrantGraphEmbeddingsQueryService, + makeQdrantGraphEmbeddingsQueryServiceEffect, type QdrantGraphQueryConfig, + type QdrantGraphEmbeddingsQueryError, } from "./qdrant-graph.js"; const GraphEmbeddingsResponseProducer = makeProducerSpec("graph-embeddings-response"); @@ -93,25 +96,37 @@ export const makeGraphEmbeddingsQuerySpecs = (): ReadonlyArray; +const provideQdrantGraphEmbeddingsQuery = (processorId: string) => + Effect.fn("GraphEmbeddingsQueryService.provideQdrant")(function* ( + effect: FlowProcessorStartEffect, + ) { + const query = yield* makeQdrantGraphEmbeddingsQueryServiceEffect().pipe( + Effect.mapError((error) => processorLifecycleError(processorId, "qdrant-graph-query-connect", error)), + ); + yield* effect.pipe( + Effect.provideService( + QdrantGraphEmbeddingsQueryService, + QdrantGraphEmbeddingsQueryService.of(query), + ), + ); + }); + export function makeGraphEmbeddingsQueryService(config: ProcessorConfig): GraphEmbeddingsQueryService { - const query = makeQdrantGraphEmbeddingsQueryService(); const service = makeFlowProcessor(config, { specifications: makeGraphEmbeddingsQuerySpecs(), - provide: (effect) => - effect.pipe( - Effect.provideService( - QdrantGraphEmbeddingsQueryService, - QdrantGraphEmbeddingsQueryService.of(query), - ), - ), + provide: provideQdrantGraphEmbeddingsQuery(config.id), }); - Effect.runSync(Effect.log("[GraphEmbeddingsQuery] Service initialized")); + void Effect.runPromise(Effect.log("[GraphEmbeddingsQuery] Service initialized")); return service; } export const GraphEmbeddingsQueryService = makeGraphEmbeddingsQueryService; -export const program = makeFlowProcessorProgram({ +export const program = makeFlowProcessorProgram< + ProcessorConfig & QdrantGraphQueryConfig, + QdrantGraphEmbeddingsQueryError, + QdrantGraphEmbeddingsQueryService +>({ id: "graph-embeddings-query", specs: () => makeGraphEmbeddingsQuerySpecs(), layer: (config) => QdrantGraphEmbeddingsQueryLive(config), diff --git a/ts/packages/flow/src/query/embeddings/qdrant-graph.ts b/ts/packages/flow/src/query/embeddings/qdrant-graph.ts index e309d08e..f976cc51 100644 --- a/ts/packages/flow/src/query/embeddings/qdrant-graph.ts +++ b/ts/packages/flow/src/query/embeddings/qdrant-graph.ts @@ -10,15 +10,16 @@ * Python reference: trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py */ -import { QdrantClient } from "@qdrant/js-client-rest"; import { errorMessage, type Term } from "@trustgraph/base"; import { Config, Context, Effect, Layer } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; +import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; export interface QdrantGraphQueryConfig { url?: string; apiKey?: string; + clientFactory?: QdrantClientFactory; } export interface EntityMatch { @@ -72,24 +73,36 @@ function createTerm(value: string): Term { return { type: "LITERAL", value }; } +const GraphPointPayloadSchema = S.Struct({ + entity: S.String, +}); + +const decodeGraphPointPayload = (payload: unknown) => + S.decodeUnknownEffect(GraphPointPayloadSchema)(payload).pipe(Effect.option); + export interface QdrantGraphEmbeddingsQuery { - readonly query: (request: GraphEmbeddingsQueryRequest) => Promise; + readonly query: (request: GraphEmbeddingsQueryRequest) => Promise>; readonly queryEffect: ( request: GraphEmbeddingsQueryRequest, ) => Effect.Effect, QdrantGraphEmbeddingsQueryError>; } -export function makeQdrantGraphEmbeddingsQuery( - config: QdrantGraphQueryConfig = {}, -): QdrantGraphEmbeddingsQuery { - const resolved = Effect.runSync(loadQdrantGraphQueryConfig(config)); - - const client = new QdrantClient({ - url: resolved.url, - ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), +const makeQdrantGraphEmbeddingsQueryClient = ( + config: QdrantGraphQueryConfig, + resolved: ResolvedQdrantGraphQueryConfig, +) => + Effect.try({ + try: () => + makeQdrantClient(config.clientFactory, { + url: resolved.url, + ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), + }), + catch: (cause) => qdrantGraphEmbeddingsQueryError("create-client", cause), }); - Effect.runSync(Effect.log("[QdrantGraphQuery] Query service initialized")); +const makeQdrantGraphEmbeddingsQueryFromClient = ( + client: QdrantClientLike, +): QdrantGraphEmbeddingsQueryServiceShape => { const queryEffect = Effect.fn("QdrantGraphEmbeddingsQuery.query")(function* ( request: GraphEmbeddingsQueryRequest, @@ -131,8 +144,10 @@ export function makeQdrantGraphEmbeddingsQuery( const entities: EntityMatch[] = []; for (const point of searchResult) { - const payload = point.payload as Record | undefined; - const entityValue = payload?.entity as string | undefined; + const payload = yield* decodeGraphPointPayload(point.payload); + if (O.isNone(payload)) continue; + + const entityValue = payload.value.entity; if (entityValue === undefined || entityValue.length === 0) continue; // Deduplicate by entity value, keeping the highest score (results are @@ -152,6 +167,36 @@ export function makeQdrantGraphEmbeddingsQuery( return entities; }); + return { + query: queryEffect, + }; +}; + +export const makeQdrantGraphEmbeddingsQueryServiceEffect = Effect.fn( + "makeQdrantGraphEmbeddingsQueryServiceEffect", +)(function* (config: QdrantGraphQueryConfig = {}) { + const resolved = yield* loadQdrantGraphQueryConfig(config).pipe( + Effect.mapError((cause) => qdrantGraphEmbeddingsQueryError("load-config", cause)), + ); + const client = yield* makeQdrantGraphEmbeddingsQueryClient(config, resolved); + yield* Effect.log("[QdrantGraphQuery] Query service initialized"); + return makeQdrantGraphEmbeddingsQueryFromClient(client); +}); + +const withQdrantGraphEmbeddingsQuery = ( + config: QdrantGraphQueryConfig, + use: (query: QdrantGraphEmbeddingsQueryServiceShape) => Effect.Effect, +) => + makeQdrantGraphEmbeddingsQueryServiceEffect(config).pipe( + Effect.flatMap(use), + ); + +export function makeQdrantGraphEmbeddingsQuery( + config: QdrantGraphQueryConfig = {}, +): QdrantGraphEmbeddingsQuery { + const queryEffect = (request: GraphEmbeddingsQueryRequest) => + withQdrantGraphEmbeddingsQuery(config, (query) => query.query(request)); + return { query: (request) => Effect.runPromise(queryEffect(request)), queryEffect, @@ -173,17 +218,16 @@ export class QdrantGraphEmbeddingsQueryService extends Context.Service< export const makeQdrantGraphEmbeddingsQueryService = ( config: QdrantGraphQueryConfig = {}, -): QdrantGraphEmbeddingsQueryServiceShape => { - const query = makeQdrantGraphEmbeddingsQuery(config); - return { - query: query.queryEffect, - }; -}; +): QdrantGraphEmbeddingsQueryServiceShape => ({ + query: (request) => withQdrantGraphEmbeddingsQuery(config, (query) => query.query(request)), +}); export const QdrantGraphEmbeddingsQueryLive = ( config: QdrantGraphQueryConfig = {}, -): Layer.Layer => - Layer.succeed( +): Layer.Layer => + Layer.effect( QdrantGraphEmbeddingsQueryService, - QdrantGraphEmbeddingsQueryService.of(makeQdrantGraphEmbeddingsQueryService(config)), + makeQdrantGraphEmbeddingsQueryServiceEffect(config).pipe( + Effect.map((service) => QdrantGraphEmbeddingsQueryService.of(service)), + ), ); diff --git a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts index cd8a2043..00f3cb01 100644 --- a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts +++ b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts @@ -13,8 +13,10 @@ import { makeFlowProcessor, makeConsumerSpec, makeRequestResponseSpec, + processorLifecycleError, type ProcessorConfig, type FlowProcessorRuntime, + type FlowProcessorStartEffect, type FlowContext, type FlowResourceNotFoundError, type MessagingDeliveryError, @@ -30,7 +32,7 @@ import { Effect, Layer, ManagedRuntime } from "effect"; import { QdrantGraphEmbeddingsStoreLive, QdrantGraphEmbeddingsStoreService, - makeQdrantGraphEmbeddingsStoreService, + makeQdrantGraphEmbeddingsStoreServiceEffect, type QdrantGraphEmbeddingsConfig, type QdrantGraphEmbeddingsStoreError, } from "./qdrant-graph.js"; @@ -93,19 +95,27 @@ export const makeGraphEmbeddingsStoreSpecs = (): ReadonlyArray; +const provideQdrantGraphEmbeddingsStore = (processorId: string) => + Effect.fn("GraphEmbeddingsStoreService.provideQdrant")(function* ( + effect: FlowProcessorStartEffect, + ) { + const store = yield* makeQdrantGraphEmbeddingsStoreServiceEffect().pipe( + Effect.mapError((error) => processorLifecycleError(processorId, "qdrant-graph-store-connect", error)), + ); + yield* effect.pipe( + Effect.provideService( + QdrantGraphEmbeddingsStoreService, + QdrantGraphEmbeddingsStoreService.of(store), + ), + ); + }); + export function makeGraphEmbeddingsStoreService(config: ProcessorConfig): GraphEmbeddingsStoreService { - const store = makeQdrantGraphEmbeddingsStoreService(); const service = makeFlowProcessor(config, { specifications: makeGraphEmbeddingsStoreSpecs(), - provide: (effect) => - effect.pipe( - Effect.provideService( - QdrantGraphEmbeddingsStoreService, - QdrantGraphEmbeddingsStoreService.of(store), - ), - ), + provide: provideQdrantGraphEmbeddingsStore(config.id), }); - Effect.runSync(Effect.log("[GraphEmbeddingsStore] Service initialized")); + void Effect.runPromise(Effect.log("[GraphEmbeddingsStore] Service initialized")); return service; } @@ -113,7 +123,7 @@ export const GraphEmbeddingsStoreService = makeGraphEmbeddingsStoreService; export const program = makeFlowProcessorProgram< ProcessorConfig & QdrantGraphEmbeddingsConfig, - never, + QdrantGraphEmbeddingsStoreError, GraphEmbeddingsStoreRequirements >({ id: "graph-embeddings-store", diff --git a/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts b/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts index c8182fdd..ccfdd125 100644 --- a/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts +++ b/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts @@ -8,15 +8,16 @@ * Python reference: trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py */ -import { QdrantClient } from "@qdrant/js-client-rest"; import { errorMessage } from "@trustgraph/base"; import { Config, Effect, Random } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; +import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; export interface QdrantDocEmbeddingsConfig { url?: string; apiKey?: string; + clientFactory?: QdrantClientFactory; } export interface DocEmbeddingChunk { @@ -94,18 +95,33 @@ export interface QdrantDocEmbeddingsStore { ) => Effect.Effect; } -export function makeQdrantDocEmbeddingsStore( - config: QdrantDocEmbeddingsConfig = {}, -): QdrantDocEmbeddingsStore { - const resolved = Effect.runSync(loadQdrantDocEmbeddingsConfig(config)); - - const client = new QdrantClient({ - url: resolved.url, - ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), +const makeQdrantDocEmbeddingsClient = ( + config: QdrantDocEmbeddingsConfig, + resolved: ResolvedQdrantDocEmbeddingsConfig, +) => + Effect.try({ + try: () => + makeQdrantClient(config.clientFactory, { + url: resolved.url, + ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), + }), + catch: (cause) => qdrantDocEmbeddingsStoreError("create-client", cause), }); - const knownCollections = new Set(); - Effect.runSync(Effect.log("[QdrantDocEmbeddings] Store initialized")); +interface QdrantDocEmbeddingsStoreEffectShape { + readonly store: ( + message: DocEmbeddingsMessage, + ) => Effect.Effect; + readonly deleteCollection: ( + user: string, + collection: string, + ) => Effect.Effect; +} + +const makeQdrantDocEmbeddingsStoreFromClient = ( + client: QdrantClientLike, +): QdrantDocEmbeddingsStoreEffectShape => { + const knownCollections = new Set(); const collectionName = (user: string, collection: string, dim: number): string => `d_${user}_${collection}_${dim}`; @@ -199,6 +215,39 @@ export function makeQdrantDocEmbeddingsStore( ); }); + return { + store: storeEffect, + deleteCollection: deleteCollectionEffect, + }; +}; + +const makeQdrantDocEmbeddingsStoreEffect = Effect.fn("makeQdrantDocEmbeddingsStoreEffect")(function* ( + config: QdrantDocEmbeddingsConfig = {}, +) { + const resolved = yield* loadQdrantDocEmbeddingsConfig(config).pipe( + Effect.mapError((cause) => qdrantDocEmbeddingsStoreError("load-config", cause)), + ); + const client = yield* makeQdrantDocEmbeddingsClient(config, resolved); + yield* Effect.log("[QdrantDocEmbeddings] Store initialized"); + return makeQdrantDocEmbeddingsStoreFromClient(client); +}); + +const withQdrantDocEmbeddingsStore = ( + config: QdrantDocEmbeddingsConfig, + use: (store: QdrantDocEmbeddingsStoreEffectShape) => Effect.Effect, +) => + makeQdrantDocEmbeddingsStoreEffect(config).pipe( + Effect.flatMap(use), + ); + +export function makeQdrantDocEmbeddingsStore( + config: QdrantDocEmbeddingsConfig = {}, +): QdrantDocEmbeddingsStore { + const storeEffect = (message: DocEmbeddingsMessage) => + withQdrantDocEmbeddingsStore(config, (store) => store.store(message)); + const deleteCollectionEffect = (user: string, collection: string) => + withQdrantDocEmbeddingsStore(config, (store) => store.deleteCollection(user, collection)); + return { store: (message) => Effect.runPromise(storeEffect(message)), deleteCollection: (user, collection) => diff --git a/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts b/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts index 36075a61..0af2b79b 100644 --- a/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts +++ b/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts @@ -8,15 +8,16 @@ * Python reference: trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py */ -import { QdrantClient } from "@qdrant/js-client-rest"; import { errorMessage, type Term } from "@trustgraph/base"; import { Config, Context, Effect, Layer, Random } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; +import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; export interface QdrantGraphEmbeddingsConfig { url?: string; apiKey?: string; + clientFactory?: QdrantClientFactory; } export interface GraphEmbeddingEntity { @@ -107,18 +108,23 @@ export interface QdrantGraphEmbeddingsStore { ) => Effect.Effect; } -export function makeQdrantGraphEmbeddingsStore( - config: QdrantGraphEmbeddingsConfig = {}, -): QdrantGraphEmbeddingsStore { - const resolved = Effect.runSync(loadQdrantGraphEmbeddingsConfig(config)); - - const client = new QdrantClient({ - url: resolved.url, - ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), +const makeQdrantGraphEmbeddingsClient = ( + config: QdrantGraphEmbeddingsConfig, + resolved: ResolvedQdrantGraphEmbeddingsConfig, +) => + Effect.try({ + try: () => + makeQdrantClient(config.clientFactory, { + url: resolved.url, + ...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}), + }), + catch: (cause) => qdrantGraphEmbeddingsStoreError("create-client", cause), }); - const knownCollections = new Set(); - Effect.runSync(Effect.log("[QdrantGraphEmbeddings] Store initialized")); +const makeQdrantGraphEmbeddingsStoreFromClient = ( + client: QdrantClientLike, +): QdrantGraphEmbeddingsStoreServiceShape => { + const knownCollections = new Set(); const collectionName = (user: string, collection: string, dim: number): string => `t_${user}_${collection}_${dim}`; @@ -213,6 +219,39 @@ export function makeQdrantGraphEmbeddingsStore( ); }); + return { + store: storeEffect, + deleteCollection: deleteCollectionEffect, + }; +}; + +export const makeQdrantGraphEmbeddingsStoreServiceEffect = Effect.fn( + "makeQdrantGraphEmbeddingsStoreServiceEffect", +)(function* (config: QdrantGraphEmbeddingsConfig = {}) { + const resolved = yield* loadQdrantGraphEmbeddingsConfig(config).pipe( + Effect.mapError((cause) => qdrantGraphEmbeddingsStoreError("load-config", cause)), + ); + const client = yield* makeQdrantGraphEmbeddingsClient(config, resolved); + yield* Effect.log("[QdrantGraphEmbeddings] Store initialized"); + return makeQdrantGraphEmbeddingsStoreFromClient(client); +}); + +const withQdrantGraphEmbeddingsStore = ( + config: QdrantGraphEmbeddingsConfig, + use: (store: QdrantGraphEmbeddingsStoreServiceShape) => Effect.Effect, +) => + makeQdrantGraphEmbeddingsStoreServiceEffect(config).pipe( + Effect.flatMap(use), + ); + +export function makeQdrantGraphEmbeddingsStore( + config: QdrantGraphEmbeddingsConfig = {}, +): QdrantGraphEmbeddingsStore { + const storeEffect = (message: GraphEmbeddingsMessage) => + withQdrantGraphEmbeddingsStore(config, (store) => store.store(message)); + const deleteCollectionEffect = (user: string, collection: string) => + withQdrantGraphEmbeddingsStore(config, (store) => store.deleteCollection(user, collection)); + return { store: (message) => Effect.runPromise(storeEffect(message)), deleteCollection: (user, collection) => @@ -241,18 +280,18 @@ export class QdrantGraphEmbeddingsStoreService extends Context.Service< export const makeQdrantGraphEmbeddingsStoreService = ( config: QdrantGraphEmbeddingsConfig = {}, -): QdrantGraphEmbeddingsStoreServiceShape => { - const store = makeQdrantGraphEmbeddingsStore(config); - return { - store: store.storeEffect, - deleteCollection: store.deleteCollectionEffect, - }; -}; +): QdrantGraphEmbeddingsStoreServiceShape => ({ + store: (message) => withQdrantGraphEmbeddingsStore(config, (store) => store.store(message)), + deleteCollection: (user, collection) => + withQdrantGraphEmbeddingsStore(config, (store) => store.deleteCollection(user, collection)), +}); export const QdrantGraphEmbeddingsStoreLive = ( config: QdrantGraphEmbeddingsConfig = {}, -): Layer.Layer => - Layer.succeed( +): Layer.Layer => + Layer.effect( QdrantGraphEmbeddingsStoreService, - QdrantGraphEmbeddingsStoreService.of(makeQdrantGraphEmbeddingsStoreService(config)), + makeQdrantGraphEmbeddingsStoreServiceEffect(config).pipe( + Effect.map((service) => QdrantGraphEmbeddingsStoreService.of(service)), + ), );