From 749f75715deafbd4bf99f228aeb8b7a2429234c5 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 06:36:30 -0500 Subject: [PATCH] Use HashMap for knowledge core state --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 30 +++++- .../__tests__/knowledge-core-service.test.ts | 8 +- ts/packages/flow/src/cores/service.ts | 101 ++++++++---------- 3 files changed, 78 insertions(+), 61 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 9bd8feb6..accc7943 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1983,6 +1983,28 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: KnowledgeCore HashMap State Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/cores/service.ts` now stores knowledge-core and + document-core state in `HashMap` inside the existing `SynchronizedRef`. + - Put/delete/load/get operations now read and update immutable `HashMap` + snapshots with `HashMap.get`, `HashMap.set`, `HashMap.remove`, and + `HashMap.has`. + - Persistence and list-response helpers convert `HashMap` state to + deterministic sorted records/arrays only at the API/JSON boundaries. + - `ts/packages/flow/src/__tests__/knowledge-core-service.test.ts` now reads + service state through `HashMap.get` and `Option`. + - The focused scan for native map state in `cores/service.ts` is clean. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/knowledge-core-service.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2003,8 +2025,12 @@ Notes: workspace state are complete: the long-lived config store now uses `HashMap` inside `SynchronizedRef`, and plain records remain only at persistence/API boundaries. - - KnowledgeCore service, FlowManager, and Librarian ref-backed state slices - are complete. Follow-up service work should focus on scoped layers, + - KnowledgeCore service operation dispatch, helper functions, and ref-backed + core state are complete: `kgCores` and `deCores` now use `HashMap` inside + `SynchronizedRef`, and plain records remain only at persistence/API + boundaries. + - FlowManager and Librarian ref-backed state slices are still valid larger + collection targets. Follow-up service work should focus on scoped layers, schedules where polling semantics allow, and managed persistence providers rather than direct mutable service fields. - Flow service startup facades now consistently use `ManagedRuntime`, and diff --git a/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts b/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts index 052f92c9..7f83f5e6 100644 --- a/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts +++ b/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts @@ -1,7 +1,7 @@ import {mkdtemp, rm} from "node:fs/promises"; import {tmpdir} from "node:os"; import {join} from "node:path"; -import {Effect, SynchronizedRef} from "effect"; +import {Effect, HashMap, Option, SynchronizedRef} from "effect"; import {describe, expect, it} from "vitest"; import { topics, @@ -96,7 +96,7 @@ describe("KnowledgeCoreService operations", () => { await service.putKgCore(request, "put-1"); const state = await Effect.runPromise(SynchronizedRef.get(service.state)); - const core = state.kgCores.get("alice:core-a"); + const core = Option.getOrUndefined(HashMap.get(state.kgCores, "alice:core-a")); await service.getKgCore({ operation: "get-kg-core", @@ -166,7 +166,7 @@ describe("KnowledgeCoreService operations", () => { const state = await Effect.runPromise(SynchronizedRef.get(service.state)); await rm(dir, {recursive: true, force: true}); - expect(state.kgCores.get("alice:core-b")?.triples).toHaveLength(2); + expect(Option.getOrUndefined(HashMap.get(state.kgCores, "alice:core-b"))?.triples).toHaveLength(2); }); it("loads the legacy persisted knowledge shape with schema decoding", async () => { @@ -187,6 +187,6 @@ describe("KnowledgeCoreService operations", () => { const state = await Effect.runPromise(SynchronizedRef.get(service.state)); await rm(dir, {recursive: true, force: true}); - expect(state.kgCores.get("alice:legacy")?.triples).toEqual([sampleTriple]); + expect(Option.getOrUndefined(HashMap.get(state.kgCores, "alice:legacy"))?.triples).toEqual([sampleTriple]); }); }); diff --git a/ts/packages/flow/src/cores/service.ts b/ts/packages/flow/src/cores/service.ts index 292d5fb8..ae8f513c 100644 --- a/ts/packages/flow/src/cores/service.ts +++ b/ts/packages/flow/src/cores/service.ts @@ -25,7 +25,7 @@ import { type Message, type ProcessorConfig, } from "@trustgraph/base"; -import {Duration, Effect, Layer, ManagedRuntime, Match, SynchronizedRef} from "effect"; +import {Duration, Effect, HashMap, Layer, ManagedRuntime, Match, SynchronizedRef} from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import {ensureDirectory, joinPath, readTextFile, writeTextFile} from "../runtime/effect-files.js"; @@ -81,8 +81,8 @@ const knowledgeCoreServiceError = (operation: string, cause: unknown): Knowledge message: errorMessage(cause), }); -type KnowledgeCoreStore = Map; -type DocumentCoreStore = Map>; +type KnowledgeCoreStore = HashMap.HashMap; +type DocumentCoreStore = HashMap.HashMap>; interface KnowledgeCoreServiceState { readonly kgCores: KnowledgeCoreStore; @@ -131,12 +131,15 @@ export interface KnowledgeCoreService extends AsyncProcessorRuntime ({ - kgCores: new Map(), - deCores: new Map>(), + kgCores: HashMap.empty(), + deCores: HashMap.empty>(), consumer: null, responseProducer: null, }); +const getHashMapValue = (store: HashMap.HashMap, key: K): V | undefined => + O.getOrUndefined(HashMap.get(store, key)); + const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({ triples: Array.from(core.triples), graphEmbeddings: core.graphEmbeddings.map((entry) => ({ @@ -145,30 +148,17 @@ const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({ })), }); -const cloneKgStore = (store: KnowledgeCoreStore): KnowledgeCoreStore => { - const next = new Map(); - for (const [key, core] of store) { - next.set(key, cloneKnowledgeCore(core)); - } - return next; -}; - -const cloneDeStore = (store: DocumentCoreStore): DocumentCoreStore => { - const next = new Map>(); - for (const [key, cores] of store) { - next.set(key, Array.from(cores)); - } - return next; -}; +const sortedEntries = (store: HashMap.HashMap): ReadonlyArray => + HashMap.toEntries(store).sort(([left], [right]) => left.localeCompare(right)); const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowledgeSnapshot => { const kg: Record = {}; const de: Record> = {}; - for (const [key, core] of state.kgCores) { + for (const [key, core] of sortedEntries(state.kgCores)) { kg[key] = cloneKnowledgeCore(core); } - for (const [key, core] of state.deCores) { + for (const [key, core] of sortedEntries(state.deCores)) { de[key] = Array.from(core); } @@ -176,9 +166,9 @@ const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowled }; const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore => { - const store = new Map(); + let store = HashMap.empty(); for (const [key, core] of Object.entries(record)) { - store.set(key, cloneKnowledgeCore(core)); + store = HashMap.set(store, key, cloneKnowledgeCore(core)); } return store; }; @@ -186,9 +176,9 @@ const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore const deStoreFromRecord = ( record: Record> | undefined, ): DocumentCoreStore => { - const store = new Map>(); + let store = HashMap.empty>(); for (const [key, core] of Object.entries(record ?? {})) { - store.set(key, Array.from(core)); + store = HashMap.set(store, key, Array.from(core)); } return store; }; @@ -265,7 +255,7 @@ const readPersistedKnowledgeEffect = Effect.fn("KnowledgeCoreService.readPersist if (O.isSome(legacy)) { return { kgCores: kgStoreFromRecord(legacy.value), - deCores: new Map>(), + deCores: HashMap.empty>(), }; } @@ -304,14 +294,14 @@ const persistStateEffect = Effect.fn("KnowledgeCoreService.persistState")( ), ); -const listIds = ( - store: ReadonlyMap, +const listIds = ( + store: HashMap.HashMap, user: string, ): Array => { const prefix = user.length > 0 ? `${user}:` : ""; const ids: Array = []; - for (const key of store.keys()) { + for (const [key] of sortedEntries(store)) { if (prefix.length === 0 || key.startsWith(prefix)) { ids.push(key.slice(prefix.length)); } @@ -413,7 +403,7 @@ const getKgCoreEffect = Effect.fn("getKgCoreEffect")(function* ( requestId: string, ) { const key = coreKey(request.user ?? "", request.id ?? ""); - const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); + const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).kgCores, key); if (core === undefined) { return yield* knowledgeCoreServiceError("get-kg-core", `Knowledge core not found: ${key}`); } @@ -452,11 +442,10 @@ const deleteKgCoreEffect = Effect.fn("deleteKgCoreEffect")(function* ( requestId: string, ) { const key = coreKey(request.user ?? "", request.id ?? ""); - const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { - const kgCores = cloneKgStore(state.kgCores); - kgCores.delete(key); - return {...state, kgCores}; - }); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + kgCores: HashMap.remove(state.kgCores, key), + })); yield* persistStateEffect(persistPath, next); yield* Effect.log(`[KnowledgeCoreService] Deleted core: ${key}`); @@ -471,8 +460,7 @@ const putKgCoreEffect = Effect.fn("putKgCoreEffect")(function* ( ) { const key = coreKey(request.user ?? "", request.id ?? ""); const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { - const kgCores = cloneKgStore(state.kgCores); - const existing = kgCores.get(key) ?? {triples: [], graphEmbeddings: []}; + const existing = getHashMapValue(state.kgCores, key) ?? {triples: [], graphEmbeddings: []}; const core: KnowledgeCore = { triples: [ ...existing.triples, @@ -486,11 +474,13 @@ const putKgCoreEffect = Effect.fn("putKgCoreEffect")(function* ( })), ], }; - kgCores.set(key, core); - return {...state, kgCores}; + return { + ...state, + kgCores: HashMap.set(state.kgCores, key, core), + }; }); - const core = next.kgCores.get(key); + const core = getHashMapValue(next.kgCores, key); yield* persistStateEffect(persistPath, next); yield* Effect.log( `[KnowledgeCoreService] Updated core ${key}: triples=${core?.triples.length ?? 0}, embeddings=${core?.graphEmbeddings.length ?? 0}`, @@ -507,7 +497,7 @@ const loadKgCoreEffect = Effect.fn("loadKgCoreEffect")(function* ( const user = request.user ?? ""; const coreId = request.id ?? ""; const key = coreKey(user, coreId); - const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); + const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).kgCores, key); if (core === undefined) { return yield* knowledgeCoreServiceError("load-kg-core", `Knowledge core not found: ${key}`); } @@ -554,7 +544,7 @@ const getDeCoreEffect = Effect.fn("getDeCoreEffect")(function* ( requestId: string, ) { const key = coreKey(request.user ?? "", request.id ?? ""); - const core = (yield* SynchronizedRef.get(stateRef)).deCores.get(key); + const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).deCores, key); if (core === undefined) { return yield* knowledgeCoreServiceError("get-de-core", `Document embeddings core not found: ${key}`); } @@ -584,11 +574,10 @@ const deleteDeCoreEffect = Effect.fn("deleteDeCoreEffect")(function* ( requestId: string, ) { const key = coreKey(request.user ?? "", request.id ?? ""); - const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { - const deCores = cloneDeStore(state.deCores); - deCores.delete(key); - return {...state, deCores}; - }); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + deCores: HashMap.remove(state.deCores, key), + })); yield* persistStateEffect(persistPath, next); yield* sendResponse(stateRef, {}, requestId); @@ -606,11 +595,13 @@ const putDeCoreEffect = Effect.fn("putDeCoreEffect")(function* ( } const key = coreKey(request.user ?? "", request.id ?? ""); - const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { - const deCores = cloneDeStore(state.deCores); - deCores.set(key, [...(deCores.get(key) ?? []), item]); - return {...state, deCores}; - }); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + deCores: HashMap.set(state.deCores, key, [ + ...(getHashMapValue(state.deCores, key) ?? []), + item, + ]), + })); yield* persistStateEffect(persistPath, next); yield* sendResponse(stateRef, {}, requestId); @@ -622,7 +613,7 @@ const loadDeCoreEffect = Effect.fn("loadDeCoreEffect")(function* ( requestId: string, ) { const key = coreKey(request.user ?? "", request.id ?? ""); - const exists = (yield* SynchronizedRef.get(stateRef)).deCores.has(key); + const exists = HashMap.has((yield* SynchronizedRef.get(stateRef)).deCores, key); if (!exists) { return yield* knowledgeCoreServiceError("load-de-core", `Document embeddings core not found: ${key}`); } @@ -705,7 +696,7 @@ export function makeKnowledgeCoreService(config: KnowledgeCoreServiceConfig): Kn deCores: loaded.deCores, })); - yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${next.kgCores.size}, de=${next.deCores.size})`); + yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${HashMap.size(next.kgCores)}, de=${HashMap.size(next.deCores)})`); }); service = Object.assign(base, {