Use HashMap for knowledge core state

This commit is contained in:
elpresidank 2026-06-04 06:36:30 -05:00
parent 451c6dbc58
commit 749f75715d
3 changed files with 78 additions and 61 deletions

View file

@ -1983,6 +1983,28 @@ Notes:
- `cd ts && bun run lint` - `cd ts && bun run lint`
- `git diff --check` - `git diff --check`
### 2026-06-04: KnowledgeCore HashMap State Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/cores/service.ts` now stores knowledge-core and
document-core state in `HashMap` inside the existing `SynchronizedRef`.
- Put/delete/load/get operations now read and update immutable `HashMap`
snapshots with `HashMap.get`, `HashMap.set`, `HashMap.remove`, and
`HashMap.has`.
- Persistence and list-response helpers convert `HashMap` state to
deterministic sorted records/arrays only at the API/JSON boundaries.
- `ts/packages/flow/src/__tests__/knowledge-core-service.test.ts` now reads
service state through `HashMap.get` and `Option`.
- The focused scan for native map state in `cores/service.ts` is clean.
- Verification:
- `cd ts/packages/flow && bunx --bun vitest run src/__tests__/knowledge-core-service.test.ts`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -2003,8 +2025,12 @@ Notes:
workspace state are complete: the long-lived config store now uses workspace state are complete: the long-lived config store now uses
`HashMap` inside `SynchronizedRef`, and plain records remain only at `HashMap` inside `SynchronizedRef`, and plain records remain only at
persistence/API boundaries. persistence/API boundaries.
- KnowledgeCore service, FlowManager, and Librarian ref-backed state slices - KnowledgeCore service operation dispatch, helper functions, and ref-backed
are complete. Follow-up service work should focus on scoped layers, core state are complete: `kgCores` and `deCores` now use `HashMap` inside
`SynchronizedRef`, and plain records remain only at persistence/API
boundaries.
- FlowManager and Librarian ref-backed state slices are still valid larger
collection targets. Follow-up service work should focus on scoped layers,
schedules where polling semantics allow, and managed persistence providers schedules where polling semantics allow, and managed persistence providers
rather than direct mutable service fields. rather than direct mutable service fields.
- Flow service startup facades now consistently use `ManagedRuntime`, and - Flow service startup facades now consistently use `ManagedRuntime`, and

View file

@ -1,7 +1,7 @@
import {mkdtemp, rm} from "node:fs/promises"; import {mkdtemp, rm} from "node:fs/promises";
import {tmpdir} from "node:os"; import {tmpdir} from "node:os";
import {join} from "node:path"; import {join} from "node:path";
import {Effect, SynchronizedRef} from "effect"; import {Effect, HashMap, Option, SynchronizedRef} from "effect";
import {describe, expect, it} from "vitest"; import {describe, expect, it} from "vitest";
import { import {
topics, topics,
@ -96,7 +96,7 @@ describe("KnowledgeCoreService operations", () => {
await service.putKgCore(request, "put-1"); await service.putKgCore(request, "put-1");
const state = await Effect.runPromise(SynchronizedRef.get(service.state)); const state = await Effect.runPromise(SynchronizedRef.get(service.state));
const core = state.kgCores.get("alice:core-a"); const core = Option.getOrUndefined(HashMap.get(state.kgCores, "alice:core-a"));
await service.getKgCore({ await service.getKgCore({
operation: "get-kg-core", operation: "get-kg-core",
@ -166,7 +166,7 @@ describe("KnowledgeCoreService operations", () => {
const state = await Effect.runPromise(SynchronizedRef.get(service.state)); const state = await Effect.runPromise(SynchronizedRef.get(service.state));
await rm(dir, {recursive: true, force: true}); await rm(dir, {recursive: true, force: true});
expect(state.kgCores.get("alice:core-b")?.triples).toHaveLength(2); expect(Option.getOrUndefined(HashMap.get(state.kgCores, "alice:core-b"))?.triples).toHaveLength(2);
}); });
it("loads the legacy persisted knowledge shape with schema decoding", async () => { it("loads the legacy persisted knowledge shape with schema decoding", async () => {
@ -187,6 +187,6 @@ describe("KnowledgeCoreService operations", () => {
const state = await Effect.runPromise(SynchronizedRef.get(service.state)); const state = await Effect.runPromise(SynchronizedRef.get(service.state));
await rm(dir, {recursive: true, force: true}); await rm(dir, {recursive: true, force: true});
expect(state.kgCores.get("alice:legacy")?.triples).toEqual([sampleTriple]); expect(Option.getOrUndefined(HashMap.get(state.kgCores, "alice:legacy"))?.triples).toEqual([sampleTriple]);
}); });
}); });

View file

@ -25,7 +25,7 @@ import {
type Message, type Message,
type ProcessorConfig, type ProcessorConfig,
} from "@trustgraph/base"; } from "@trustgraph/base";
import {Duration, Effect, Layer, ManagedRuntime, Match, SynchronizedRef} from "effect"; import {Duration, Effect, HashMap, Layer, ManagedRuntime, Match, SynchronizedRef} from "effect";
import * as O from "effect/Option"; import * as O from "effect/Option";
import * as S from "effect/Schema"; import * as S from "effect/Schema";
import {ensureDirectory, joinPath, readTextFile, writeTextFile} from "../runtime/effect-files.js"; import {ensureDirectory, joinPath, readTextFile, writeTextFile} from "../runtime/effect-files.js";
@ -81,8 +81,8 @@ const knowledgeCoreServiceError = (operation: string, cause: unknown): Knowledge
message: errorMessage(cause), message: errorMessage(cause),
}); });
type KnowledgeCoreStore = Map<string, KnowledgeCore>; type KnowledgeCoreStore = HashMap.HashMap<string, KnowledgeCore>;
type DocumentCoreStore = Map<string, Array<DocumentEmbeddingsCore>>; type DocumentCoreStore = HashMap.HashMap<string, Array<DocumentEmbeddingsCore>>;
interface KnowledgeCoreServiceState { interface KnowledgeCoreServiceState {
readonly kgCores: KnowledgeCoreStore; readonly kgCores: KnowledgeCoreStore;
@ -131,12 +131,15 @@ export interface KnowledgeCoreService extends AsyncProcessorRuntime<KnowledgeCor
} }
const initialState = (): KnowledgeCoreServiceState => ({ const initialState = (): KnowledgeCoreServiceState => ({
kgCores: new Map<string, KnowledgeCore>(), kgCores: HashMap.empty<string, KnowledgeCore>(),
deCores: new Map<string, Array<DocumentEmbeddingsCore>>(), deCores: HashMap.empty<string, Array<DocumentEmbeddingsCore>>(),
consumer: null, consumer: null,
responseProducer: null, responseProducer: null,
}); });
const getHashMapValue = <K, V>(store: HashMap.HashMap<K, V>, key: K): V | undefined =>
O.getOrUndefined(HashMap.get(store, key));
const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({ const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({
triples: Array.from(core.triples), triples: Array.from(core.triples),
graphEmbeddings: core.graphEmbeddings.map((entry) => ({ graphEmbeddings: core.graphEmbeddings.map((entry) => ({
@ -145,30 +148,17 @@ const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({
})), })),
}); });
const cloneKgStore = (store: KnowledgeCoreStore): KnowledgeCoreStore => { const sortedEntries = <A>(store: HashMap.HashMap<string, A>): ReadonlyArray<readonly [string, A]> =>
const next = new Map<string, KnowledgeCore>(); HashMap.toEntries(store).sort(([left], [right]) => left.localeCompare(right));
for (const [key, core] of store) {
next.set(key, cloneKnowledgeCore(core));
}
return next;
};
const cloneDeStore = (store: DocumentCoreStore): DocumentCoreStore => {
const next = new Map<string, Array<DocumentEmbeddingsCore>>();
for (const [key, cores] of store) {
next.set(key, Array.from(cores));
}
return next;
};
const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowledgeSnapshot => { const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowledgeSnapshot => {
const kg: Record<string, KnowledgeCore> = {}; const kg: Record<string, KnowledgeCore> = {};
const de: Record<string, Array<DocumentEmbeddingsCore>> = {}; const de: Record<string, Array<DocumentEmbeddingsCore>> = {};
for (const [key, core] of state.kgCores) { for (const [key, core] of sortedEntries(state.kgCores)) {
kg[key] = cloneKnowledgeCore(core); kg[key] = cloneKnowledgeCore(core);
} }
for (const [key, core] of state.deCores) { for (const [key, core] of sortedEntries(state.deCores)) {
de[key] = Array.from(core); de[key] = Array.from(core);
} }
@ -176,9 +166,9 @@ const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowled
}; };
const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore => { const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore => {
const store = new Map<string, KnowledgeCore>(); let store = HashMap.empty<string, KnowledgeCore>();
for (const [key, core] of Object.entries(record)) { for (const [key, core] of Object.entries(record)) {
store.set(key, cloneKnowledgeCore(core)); store = HashMap.set(store, key, cloneKnowledgeCore(core));
} }
return store; return store;
}; };
@ -186,9 +176,9 @@ const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore
const deStoreFromRecord = ( const deStoreFromRecord = (
record: Record<string, Array<DocumentEmbeddingsCore>> | undefined, record: Record<string, Array<DocumentEmbeddingsCore>> | undefined,
): DocumentCoreStore => { ): DocumentCoreStore => {
const store = new Map<string, Array<DocumentEmbeddingsCore>>(); let store = HashMap.empty<string, Array<DocumentEmbeddingsCore>>();
for (const [key, core] of Object.entries(record ?? {})) { for (const [key, core] of Object.entries(record ?? {})) {
store.set(key, Array.from(core)); store = HashMap.set(store, key, Array.from(core));
} }
return store; return store;
}; };
@ -265,7 +255,7 @@ const readPersistedKnowledgeEffect = Effect.fn("KnowledgeCoreService.readPersist
if (O.isSome(legacy)) { if (O.isSome(legacy)) {
return { return {
kgCores: kgStoreFromRecord(legacy.value), kgCores: kgStoreFromRecord(legacy.value),
deCores: new Map<string, Array<DocumentEmbeddingsCore>>(), deCores: HashMap.empty<string, Array<DocumentEmbeddingsCore>>(),
}; };
} }
@ -304,14 +294,14 @@ const persistStateEffect = Effect.fn("KnowledgeCoreService.persistState")(
), ),
); );
const listIds = ( const listIds = <A>(
store: ReadonlyMap<string, unknown>, store: HashMap.HashMap<string, A>,
user: string, user: string,
): Array<string> => { ): Array<string> => {
const prefix = user.length > 0 ? `${user}:` : ""; const prefix = user.length > 0 ? `${user}:` : "";
const ids: Array<string> = []; const ids: Array<string> = [];
for (const key of store.keys()) { for (const [key] of sortedEntries(store)) {
if (prefix.length === 0 || key.startsWith(prefix)) { if (prefix.length === 0 || key.startsWith(prefix)) {
ids.push(key.slice(prefix.length)); ids.push(key.slice(prefix.length));
} }
@ -413,7 +403,7 @@ const getKgCoreEffect = Effect.fn("getKgCoreEffect")(function* (
requestId: string, requestId: string,
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).kgCores, key);
if (core === undefined) { if (core === undefined) {
return yield* knowledgeCoreServiceError("get-kg-core", `Knowledge core not found: ${key}`); return yield* knowledgeCoreServiceError("get-kg-core", `Knowledge core not found: ${key}`);
} }
@ -452,11 +442,10 @@ const deleteKgCoreEffect = Effect.fn("deleteKgCoreEffect")(function* (
requestId: string, requestId: string,
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({
const kgCores = cloneKgStore(state.kgCores); ...state,
kgCores.delete(key); kgCores: HashMap.remove(state.kgCores, key),
return {...state, kgCores}; }));
});
yield* persistStateEffect(persistPath, next); yield* persistStateEffect(persistPath, next);
yield* Effect.log(`[KnowledgeCoreService] Deleted core: ${key}`); yield* Effect.log(`[KnowledgeCoreService] Deleted core: ${key}`);
@ -471,8 +460,7 @@ const putKgCoreEffect = Effect.fn("putKgCoreEffect")(function* (
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => {
const kgCores = cloneKgStore(state.kgCores); const existing = getHashMapValue(state.kgCores, key) ?? {triples: [], graphEmbeddings: []};
const existing = kgCores.get(key) ?? {triples: [], graphEmbeddings: []};
const core: KnowledgeCore = { const core: KnowledgeCore = {
triples: [ triples: [
...existing.triples, ...existing.triples,
@ -486,11 +474,13 @@ const putKgCoreEffect = Effect.fn("putKgCoreEffect")(function* (
})), })),
], ],
}; };
kgCores.set(key, core); return {
return {...state, kgCores}; ...state,
kgCores: HashMap.set(state.kgCores, key, core),
};
}); });
const core = next.kgCores.get(key); const core = getHashMapValue(next.kgCores, key);
yield* persistStateEffect(persistPath, next); yield* persistStateEffect(persistPath, next);
yield* Effect.log( yield* Effect.log(
`[KnowledgeCoreService] Updated core ${key}: triples=${core?.triples.length ?? 0}, embeddings=${core?.graphEmbeddings.length ?? 0}`, `[KnowledgeCoreService] Updated core ${key}: triples=${core?.triples.length ?? 0}, embeddings=${core?.graphEmbeddings.length ?? 0}`,
@ -507,7 +497,7 @@ const loadKgCoreEffect = Effect.fn("loadKgCoreEffect")(function* (
const user = request.user ?? ""; const user = request.user ?? "";
const coreId = request.id ?? ""; const coreId = request.id ?? "";
const key = coreKey(user, coreId); const key = coreKey(user, coreId);
const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).kgCores, key);
if (core === undefined) { if (core === undefined) {
return yield* knowledgeCoreServiceError("load-kg-core", `Knowledge core not found: ${key}`); return yield* knowledgeCoreServiceError("load-kg-core", `Knowledge core not found: ${key}`);
} }
@ -554,7 +544,7 @@ const getDeCoreEffect = Effect.fn("getDeCoreEffect")(function* (
requestId: string, requestId: string,
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const core = (yield* SynchronizedRef.get(stateRef)).deCores.get(key); const core = getHashMapValue((yield* SynchronizedRef.get(stateRef)).deCores, key);
if (core === undefined) { if (core === undefined) {
return yield* knowledgeCoreServiceError("get-de-core", `Document embeddings core not found: ${key}`); return yield* knowledgeCoreServiceError("get-de-core", `Document embeddings core not found: ${key}`);
} }
@ -584,11 +574,10 @@ const deleteDeCoreEffect = Effect.fn("deleteDeCoreEffect")(function* (
requestId: string, requestId: string,
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({
const deCores = cloneDeStore(state.deCores); ...state,
deCores.delete(key); deCores: HashMap.remove(state.deCores, key),
return {...state, deCores}; }));
});
yield* persistStateEffect(persistPath, next); yield* persistStateEffect(persistPath, next);
yield* sendResponse(stateRef, {}, requestId); yield* sendResponse(stateRef, {}, requestId);
@ -606,11 +595,13 @@ const putDeCoreEffect = Effect.fn("putDeCoreEffect")(function* (
} }
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => ({
const deCores = cloneDeStore(state.deCores); ...state,
deCores.set(key, [...(deCores.get(key) ?? []), item]); deCores: HashMap.set(state.deCores, key, [
return {...state, deCores}; ...(getHashMapValue(state.deCores, key) ?? []),
}); item,
]),
}));
yield* persistStateEffect(persistPath, next); yield* persistStateEffect(persistPath, next);
yield* sendResponse(stateRef, {}, requestId); yield* sendResponse(stateRef, {}, requestId);
@ -622,7 +613,7 @@ const loadDeCoreEffect = Effect.fn("loadDeCoreEffect")(function* (
requestId: string, requestId: string,
) { ) {
const key = coreKey(request.user ?? "", request.id ?? ""); const key = coreKey(request.user ?? "", request.id ?? "");
const exists = (yield* SynchronizedRef.get(stateRef)).deCores.has(key); const exists = HashMap.has((yield* SynchronizedRef.get(stateRef)).deCores, key);
if (!exists) { if (!exists) {
return yield* knowledgeCoreServiceError("load-de-core", `Document embeddings core not found: ${key}`); return yield* knowledgeCoreServiceError("load-de-core", `Document embeddings core not found: ${key}`);
} }
@ -705,7 +696,7 @@ export function makeKnowledgeCoreService(config: KnowledgeCoreServiceConfig): Kn
deCores: loaded.deCores, deCores: loaded.deCores,
})); }));
yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${next.kgCores.size}, de=${next.deCores.size})`); yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${HashMap.size(next.kgCores)}, de=${HashMap.size(next.deCores)})`);
}); });
service = Object.assign(base, { service = Object.assign(base, {