From d19167b5667b4c600c0452310f4c65972bc0c450 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 09:21:19 -0500 Subject: [PATCH] Use MutableHashSet for Qdrant collection caches --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 48 ++++++++++++++---- .../src/__tests__/qdrant-embeddings.test.ts | 50 +++++++++++++++++-- .../flow/src/storage/embeddings/qdrant-doc.ts | 9 ++-- .../src/storage/embeddings/qdrant-graph.ts | 9 ++-- 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index de913e83..7a93d62d 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1574,6 +1574,35 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-02: Qdrant MutableHashSet Cache Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/storage/embeddings/qdrant-graph.ts` now uses + `MutableHashSet` for its known-collection cache. + - `ts/packages/flow/src/storage/embeddings/qdrant-doc.ts` now uses + `MutableHashSet` inside its store effect shape instead of a native + `Set`. + - Collection cache membership, insertion, and invalidation now use + `MutableHashSet.has`, `MutableHashSet.add`, and + `MutableHashSet.remove`. + - Qdrant tests now prove the graph store service avoids repeated collection + existence checks while the collection is cached and checks again after + delete invalidates the cache. +- Remaining: + - The document store's public direct facade still constructs a fresh store + effect per operation, so its cache lifetime is per constructed effect + shape. Changing that facade to own a shared runtime/service is a separate + lifecycle design slice, not part of this collection primitive migration. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/qdrant-embeddings.test.ts` + - `cd ts/packages/flow && bun run build` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1689,9 +1718,8 @@ Notes: - Messaging runtime `Config.duration` cleanup is complete. Internal runtime config uses `Duration.Duration`; public timeout compatibility inputs and broker receive/error payload boundaries remain numeric milliseconds. - - Qdrant graph/doc known-collection caches are the next good small - `MutableHashSet` candidate; short-lived local traversal sets - remain no-ops. + - Qdrant graph/doc known-collection caches now use + `MutableHashSet`. Short-lived local traversal sets remain no-ops. - FlowManager and sibling service `() => Effect.gen(...)` factories remain a broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled after Duration and small collection slices. @@ -1826,7 +1854,7 @@ Notes: duration string env values. - Messaging runtime and consumer tests preserve retry and timeout behavior. -### P2: Qdrant Known-Collection MutableHashSet Cleanup +### Complete: Qdrant Known-Collection MutableHashSet Cleanup - TrustGraph evidence: - `ts/packages/flow/src/storage/embeddings/qdrant-doc.ts` @@ -1839,8 +1867,8 @@ Notes: - Keep short-lived local `Set` values for pure query traversal or fixture assertions as no-op boundaries. - Tests: - - Existing Qdrant embeddings tests should prove lazy collection creation and - deletion cache invalidation still behave the same. + - Qdrant embeddings tests prove graph cache hits skip repeated collection + existence checks and deletion invalidates the cache. ### P2: Canonicalize MCP Around The Effect Server @@ -1877,10 +1905,10 @@ Notes: ## Recommended PR Order -1. Qdrant known-collection `MutableHashSet` cleanup. -2. MCP protocol parity tests and legacy stdio flip/removal decision. -3. FlowManager/service `Effect.fn` normalization. -4. Flow/client RPC stream and remaining service operation `Match` follow-ups. +1. MCP protocol parity tests and legacy stdio flip/removal decision. +2. FlowManager/service `Effect.fn` normalization. +3. Flow/client RPC stream and remaining service operation `Match` follow-ups. +4. Long-lived ref-backed `HashMap` state cleanup where clone helpers remain. ## No-Op Rules diff --git a/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts b/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts index 9a5956db..d7ac9f37 100644 --- a/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts +++ b/ts/packages/flow/src/__tests__/qdrant-embeddings.test.ts @@ -24,6 +24,7 @@ interface FakePoint { class FakeQdrantClient implements QdrantClientLike { readonly collections = new Set(); + readonly collectionExistsCalls: string[] = []; readonly createdCollections: Array<{ readonly name: string; readonly size: number }> = []; readonly upserts: Array<{ readonly collectionName: string; @@ -33,6 +34,7 @@ class FakeQdrantClient implements QdrantClientLike { searchResults: ReadonlyArray = []; async collectionExists(collectionName: string): Promise<{ readonly exists: boolean }> { + this.collectionExistsCalls.push(collectionName); return { exists: this.collections.has(collectionName) }; } @@ -161,6 +163,17 @@ describe("Qdrant embeddings", () => { collection: "graph", entities: [{ entity, vector: [1, 2, 3], chunkId: "chunk-a" }], }); + yield* store.store({ + user: "alice", + collection: "graph", + entities: [{ entity, vector: [3, 2, 1], chunkId: "chunk-b" }], + }); + yield* store.deleteCollection("alice", "graph"); + yield* store.store({ + user: "alice", + collection: "graph", + entities: [{ entity, vector: [1, 1, 1], chunkId: "chunk-c" }], + }); }).pipe( Effect.provide( QdrantGraphEmbeddingsStoreLive({ @@ -171,8 +184,13 @@ describe("Qdrant embeddings", () => { ), ); - expect(client.createdCollections).toEqual([{ name: "t_alice_graph_3", size: 3 }]); - expect(client.upserts).toHaveLength(1); + expect(client.collectionExistsCalls).toEqual(["t_alice_graph_3", "t_alice_graph_3"]); + expect(client.createdCollections).toEqual([ + { name: "t_alice_graph_3", size: 3 }, + { name: "t_alice_graph_3", size: 3 }, + ]); + expect(client.deletedCollections).toEqual(["t_alice_graph_3"]); + expect(client.upserts).toHaveLength(3); expect(client.upserts[0]?.collectionName).toBe("t_alice_graph_3"); expect(client.upserts[0]?.points[0]?.payload).toEqual({ entity: "https://example.com/entity", @@ -194,9 +212,33 @@ describe("Qdrant embeddings", () => { chunks: [{ chunkId: "chunk-a", vector: [1, 2], content: "alpha" }], }), ); + await Effect.runPromise( + store.storeEffect({ + user: "alice", + collection: "docs", + chunks: [{ chunkId: "chunk-b", vector: [2, 1], content: "beta" }], + }), + ); + await Effect.runPromise(store.deleteCollectionEffect("alice", "docs")); + await Effect.runPromise( + store.storeEffect({ + user: "alice", + collection: "docs", + chunks: [{ chunkId: "chunk-c", vector: [1, 1], content: "gamma" }], + }), + ); - expect(client.createdCollections).toEqual([{ name: "d_alice_docs_2", size: 2 }]); - expect(client.upserts).toHaveLength(1); + expect(client.collectionExistsCalls).toEqual([ + "d_alice_docs_2", + "d_alice_docs_2", + "d_alice_docs_2", + ]); + expect(client.createdCollections).toEqual([ + { name: "d_alice_docs_2", size: 2 }, + { name: "d_alice_docs_2", size: 2 }, + ]); + expect(client.deletedCollections).toEqual(["d_alice_docs_2"]); + expect(client.upserts).toHaveLength(3); expect(client.upserts[0]?.collectionName).toBe("d_alice_docs_2"); expect(client.upserts[0]?.points[0]?.payload).toEqual({ chunk_id: "chunk-a", diff --git a/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts b/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts index ccfdd125..9861ad20 100644 --- a/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts +++ b/ts/packages/flow/src/storage/embeddings/qdrant-doc.ts @@ -10,6 +10,7 @@ import { errorMessage } from "@trustgraph/base"; import { Config, Effect, Random } from "effect"; +import * as MutableHashSet from "effect/MutableHashSet"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; @@ -121,7 +122,7 @@ interface QdrantDocEmbeddingsStoreEffectShape { const makeQdrantDocEmbeddingsStoreFromClient = ( client: QdrantClientLike, ): QdrantDocEmbeddingsStoreEffectShape => { - const knownCollections = new Set(); + const knownCollections = MutableHashSet.empty(); const collectionName = (user: string, collection: string, dim: number): string => `d_${user}_${collection}_${dim}`; @@ -130,7 +131,7 @@ const makeQdrantDocEmbeddingsStoreFromClient = ( name: string, dim: number, ) { - if (knownCollections.has(name)) return; + if (MutableHashSet.has(knownCollections, name)) return; const exists = yield* Effect.tryPromise({ try: () => client.collectionExists(name), @@ -147,7 +148,7 @@ const makeQdrantDocEmbeddingsStoreFromClient = ( }); } - knownCollections.add(name); + MutableHashSet.add(knownCollections, name); }); const storeEffect = Effect.fn("QdrantDocEmbeddings.store")(function* (message: DocEmbeddingsMessage) { @@ -206,7 +207,7 @@ const makeQdrantDocEmbeddingsStoreFromClient = ( try: () => client.deleteCollection(coll.name), catch: (cause) => qdrantDocEmbeddingsStoreError("delete-collection", cause), }); - knownCollections.delete(coll.name); + MutableHashSet.remove(knownCollections, coll.name); yield* Effect.log(`[QdrantDocEmbeddings] Deleted collection: ${coll.name}`); } diff --git a/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts b/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts index 247283de..ad008ebf 100644 --- a/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts +++ b/ts/packages/flow/src/storage/embeddings/qdrant-graph.ts @@ -10,6 +10,7 @@ import { errorMessage, type Term } from "@trustgraph/base"; import { Config, Context, Effect, Layer, Match, Random } from "effect"; +import * as MutableHashSet from "effect/MutableHashSet"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import { makeQdrantClient, type QdrantClientFactory, type QdrantClientLike } from "../../qdrant/client.js"; @@ -122,7 +123,7 @@ const makeQdrantGraphEmbeddingsClient = ( const makeQdrantGraphEmbeddingsStoreFromClient = ( client: QdrantClientLike, ): QdrantGraphEmbeddingsStoreServiceShape => { - const knownCollections = new Set(); + const knownCollections = MutableHashSet.empty(); const collectionName = (user: string, collection: string, dim: number): string => `t_${user}_${collection}_${dim}`; @@ -131,7 +132,7 @@ const makeQdrantGraphEmbeddingsStoreFromClient = ( name: string, dim: number, ) { - if (knownCollections.has(name)) return; + if (MutableHashSet.has(knownCollections, name)) return; const exists = yield* Effect.tryPromise({ try: () => client.collectionExists(name), @@ -148,7 +149,7 @@ const makeQdrantGraphEmbeddingsStoreFromClient = ( }); } - knownCollections.add(name); + MutableHashSet.add(knownCollections, name); }); const storeEffect = Effect.fn("QdrantGraphEmbeddings.store")(function* (message: GraphEmbeddingsMessage) { @@ -208,7 +209,7 @@ const makeQdrantGraphEmbeddingsStoreFromClient = ( try: () => client.deleteCollection(coll.name), catch: (cause) => qdrantGraphEmbeddingsStoreError("delete-collection", cause), }); - knownCollections.delete(coll.name); + MutableHashSet.remove(knownCollections, coll.name); yield* Effect.log(`[QdrantGraphEmbeddings] Deleted collection: ${coll.name}`); }