Add fakeable Qdrant Effect services

This commit is contained in:
elpresidank 2026-06-02 04:10:03 -05:00
parent d38ce475fd
commit 8287e1cf93
10 changed files with 671 additions and 154 deletions

View file

@ -13,8 +13,10 @@ import {
makeFlowProcessor,
makeConsumerSpec,
makeRequestResponseSpec,
processorLifecycleError,
type ProcessorConfig,
type FlowProcessorRuntime,
type FlowProcessorStartEffect,
type FlowContext,
type FlowResourceNotFoundError,
type MessagingDeliveryError,
@ -30,7 +32,7 @@ import { Effect, Layer, ManagedRuntime } from "effect";
import {
QdrantGraphEmbeddingsStoreLive,
QdrantGraphEmbeddingsStoreService,
makeQdrantGraphEmbeddingsStoreService,
makeQdrantGraphEmbeddingsStoreServiceEffect,
type QdrantGraphEmbeddingsConfig,
type QdrantGraphEmbeddingsStoreError,
} from "./qdrant-graph.js";
@ -93,19 +95,27 @@ export const makeGraphEmbeddingsStoreSpecs = (): ReadonlyArray<Spec<GraphEmbeddi
export type GraphEmbeddingsStoreService = FlowProcessorRuntime<GraphEmbeddingsStoreRequirements>;
const provideQdrantGraphEmbeddingsStore = (processorId: string) =>
Effect.fn("GraphEmbeddingsStoreService.provideQdrant")(function* (
effect: FlowProcessorStartEffect<GraphEmbeddingsStoreRequirements>,
) {
const store = yield* makeQdrantGraphEmbeddingsStoreServiceEffect().pipe(
Effect.mapError((error) => processorLifecycleError(processorId, "qdrant-graph-store-connect", error)),
);
yield* effect.pipe(
Effect.provideService(
QdrantGraphEmbeddingsStoreService,
QdrantGraphEmbeddingsStoreService.of(store),
),
);
});
export function makeGraphEmbeddingsStoreService(config: ProcessorConfig): GraphEmbeddingsStoreService {
const store = makeQdrantGraphEmbeddingsStoreService();
const service = makeFlowProcessor(config, {
specifications: makeGraphEmbeddingsStoreSpecs(),
provide: (effect) =>
effect.pipe(
Effect.provideService(
QdrantGraphEmbeddingsStoreService,
QdrantGraphEmbeddingsStoreService.of(store),
),
),
provide: provideQdrantGraphEmbeddingsStore(config.id),
});
Effect.runSync(Effect.log("[GraphEmbeddingsStore] Service initialized"));
void Effect.runPromise(Effect.log("[GraphEmbeddingsStore] Service initialized"));
return service;
}
@ -113,7 +123,7 @@ export const GraphEmbeddingsStoreService = makeGraphEmbeddingsStoreService;
export const program = makeFlowProcessorProgram<
ProcessorConfig & QdrantGraphEmbeddingsConfig,
never,
QdrantGraphEmbeddingsStoreError,
GraphEmbeddingsStoreRequirements
>({
id: "graph-embeddings-store",

View file

@ -8,15 +8,16 @@
* Python reference: trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py
*/
import { QdrantClient } from "@qdrant/js-client-rest";
import { errorMessage } from "@trustgraph/base";
import { Config, Effect, Random } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js";
export interface QdrantDocEmbeddingsConfig {
url?: string;
apiKey?: string;
clientFactory?: QdrantClientFactory;
}
export interface DocEmbeddingChunk {
@ -94,18 +95,33 @@ export interface QdrantDocEmbeddingsStore {
) => Effect.Effect<void, QdrantDocEmbeddingsStoreError>;
}
export function makeQdrantDocEmbeddingsStore(
config: QdrantDocEmbeddingsConfig = {},
): QdrantDocEmbeddingsStore {
const resolved = Effect.runSync(loadQdrantDocEmbeddingsConfig(config));
const client = new QdrantClient({
url: resolved.url,
...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}),
const makeQdrantDocEmbeddingsClient = (
config: QdrantDocEmbeddingsConfig,
resolved: ResolvedQdrantDocEmbeddingsConfig,
) =>
Effect.try({
try: () =>
makeQdrantClient(config.clientFactory, {
url: resolved.url,
...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}),
}),
catch: (cause) => qdrantDocEmbeddingsStoreError("create-client", cause),
});
const knownCollections = new Set<string>();
Effect.runSync(Effect.log("[QdrantDocEmbeddings] Store initialized"));
interface QdrantDocEmbeddingsStoreEffectShape {
readonly store: (
message: DocEmbeddingsMessage,
) => Effect.Effect<void, QdrantDocEmbeddingsStoreError>;
readonly deleteCollection: (
user: string,
collection: string,
) => Effect.Effect<void, QdrantDocEmbeddingsStoreError>;
}
const makeQdrantDocEmbeddingsStoreFromClient = (
client: QdrantClientLike,
): QdrantDocEmbeddingsStoreEffectShape => {
const knownCollections = new Set<string>();
const collectionName = (user: string, collection: string, dim: number): string =>
`d_${user}_${collection}_${dim}`;
@ -199,6 +215,39 @@ export function makeQdrantDocEmbeddingsStore(
);
});
return {
store: storeEffect,
deleteCollection: deleteCollectionEffect,
};
};
const makeQdrantDocEmbeddingsStoreEffect = Effect.fn("makeQdrantDocEmbeddingsStoreEffect")(function* (
config: QdrantDocEmbeddingsConfig = {},
) {
const resolved = yield* loadQdrantDocEmbeddingsConfig(config).pipe(
Effect.mapError((cause) => qdrantDocEmbeddingsStoreError("load-config", cause)),
);
const client = yield* makeQdrantDocEmbeddingsClient(config, resolved);
yield* Effect.log("[QdrantDocEmbeddings] Store initialized");
return makeQdrantDocEmbeddingsStoreFromClient(client);
});
const withQdrantDocEmbeddingsStore = <A>(
config: QdrantDocEmbeddingsConfig,
use: (store: QdrantDocEmbeddingsStoreEffectShape) => Effect.Effect<A, QdrantDocEmbeddingsStoreError>,
) =>
makeQdrantDocEmbeddingsStoreEffect(config).pipe(
Effect.flatMap(use),
);
export function makeQdrantDocEmbeddingsStore(
config: QdrantDocEmbeddingsConfig = {},
): QdrantDocEmbeddingsStore {
const storeEffect = (message: DocEmbeddingsMessage) =>
withQdrantDocEmbeddingsStore(config, (store) => store.store(message));
const deleteCollectionEffect = (user: string, collection: string) =>
withQdrantDocEmbeddingsStore(config, (store) => store.deleteCollection(user, collection));
return {
store: (message) => Effect.runPromise(storeEffect(message)),
deleteCollection: (user, collection) =>

View file

@ -8,15 +8,16 @@
* Python reference: trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py
*/
import { QdrantClient } from "@qdrant/js-client-rest";
import { errorMessage, type Term } from "@trustgraph/base";
import { Config, Context, Effect, Layer, Random } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js";
export interface QdrantGraphEmbeddingsConfig {
url?: string;
apiKey?: string;
clientFactory?: QdrantClientFactory;
}
export interface GraphEmbeddingEntity {
@ -107,18 +108,23 @@ export interface QdrantGraphEmbeddingsStore {
) => Effect.Effect<void, QdrantGraphEmbeddingsStoreError>;
}
export function makeQdrantGraphEmbeddingsStore(
config: QdrantGraphEmbeddingsConfig = {},
): QdrantGraphEmbeddingsStore {
const resolved = Effect.runSync(loadQdrantGraphEmbeddingsConfig(config));
const client = new QdrantClient({
url: resolved.url,
...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}),
const makeQdrantGraphEmbeddingsClient = (
config: QdrantGraphEmbeddingsConfig,
resolved: ResolvedQdrantGraphEmbeddingsConfig,
) =>
Effect.try({
try: () =>
makeQdrantClient(config.clientFactory, {
url: resolved.url,
...(resolved.apiKey !== undefined ? { apiKey: resolved.apiKey } : {}),
}),
catch: (cause) => qdrantGraphEmbeddingsStoreError("create-client", cause),
});
const knownCollections = new Set<string>();
Effect.runSync(Effect.log("[QdrantGraphEmbeddings] Store initialized"));
const makeQdrantGraphEmbeddingsStoreFromClient = (
client: QdrantClientLike,
): QdrantGraphEmbeddingsStoreServiceShape => {
const knownCollections = new Set<string>();
const collectionName = (user: string, collection: string, dim: number): string =>
`t_${user}_${collection}_${dim}`;
@ -213,6 +219,39 @@ export function makeQdrantGraphEmbeddingsStore(
);
});
return {
store: storeEffect,
deleteCollection: deleteCollectionEffect,
};
};
export const makeQdrantGraphEmbeddingsStoreServiceEffect = Effect.fn(
"makeQdrantGraphEmbeddingsStoreServiceEffect",
)(function* (config: QdrantGraphEmbeddingsConfig = {}) {
const resolved = yield* loadQdrantGraphEmbeddingsConfig(config).pipe(
Effect.mapError((cause) => qdrantGraphEmbeddingsStoreError("load-config", cause)),
);
const client = yield* makeQdrantGraphEmbeddingsClient(config, resolved);
yield* Effect.log("[QdrantGraphEmbeddings] Store initialized");
return makeQdrantGraphEmbeddingsStoreFromClient(client);
});
const withQdrantGraphEmbeddingsStore = <A>(
config: QdrantGraphEmbeddingsConfig,
use: (store: QdrantGraphEmbeddingsStoreServiceShape) => Effect.Effect<A, QdrantGraphEmbeddingsStoreError>,
) =>
makeQdrantGraphEmbeddingsStoreServiceEffect(config).pipe(
Effect.flatMap(use),
);
export function makeQdrantGraphEmbeddingsStore(
config: QdrantGraphEmbeddingsConfig = {},
): QdrantGraphEmbeddingsStore {
const storeEffect = (message: GraphEmbeddingsMessage) =>
withQdrantGraphEmbeddingsStore(config, (store) => store.store(message));
const deleteCollectionEffect = (user: string, collection: string) =>
withQdrantGraphEmbeddingsStore(config, (store) => store.deleteCollection(user, collection));
return {
store: (message) => Effect.runPromise(storeEffect(message)),
deleteCollection: (user, collection) =>
@ -241,18 +280,18 @@ export class QdrantGraphEmbeddingsStoreService extends Context.Service<
export const makeQdrantGraphEmbeddingsStoreService = (
config: QdrantGraphEmbeddingsConfig = {},
): QdrantGraphEmbeddingsStoreServiceShape => {
const store = makeQdrantGraphEmbeddingsStore(config);
return {
store: store.storeEffect,
deleteCollection: store.deleteCollectionEffect,
};
};
): QdrantGraphEmbeddingsStoreServiceShape => ({
store: (message) => withQdrantGraphEmbeddingsStore(config, (store) => store.store(message)),
deleteCollection: (user, collection) =>
withQdrantGraphEmbeddingsStore(config, (store) => store.deleteCollection(user, collection)),
});
export const QdrantGraphEmbeddingsStoreLive = (
config: QdrantGraphEmbeddingsConfig = {},
): Layer.Layer<QdrantGraphEmbeddingsStoreService> =>
Layer.succeed(
): Layer.Layer<QdrantGraphEmbeddingsStoreService, QdrantGraphEmbeddingsStoreError> =>
Layer.effect(
QdrantGraphEmbeddingsStoreService,
QdrantGraphEmbeddingsStoreService.of(makeQdrantGraphEmbeddingsStoreService(config)),
makeQdrantGraphEmbeddingsStoreServiceEffect(config).pipe(
Effect.map((service) => QdrantGraphEmbeddingsStoreService.of(service)),
),
);