From c4500f216eaa4fe7741ba3a2ecc49094a9ffd2a4 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 08:02:20 -0500 Subject: [PATCH] Use MutableHashMap for librarian state --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 32 +++++- ts/packages/flow/src/librarian/service.ts | 128 ++++++++++++---------- 2 files changed, 101 insertions(+), 59 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index b1c8302f..93dd895c 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2308,6 +2308,29 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Librarian Service State MutableHashMap Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/librarian/service.ts` now stores live documents, + processing records, uploads, and per-upload chunks in `MutableHashMap` + instead of native `Map`. + - State updates continue to clone the current collection before swapping it + into `SynchronizedRef`, but now use `MutableHashMap.fromIterable`, + `Option`-based lookups, and `MutableHashMap.set` / `remove` / `size` / + `keys` / `values` for service-state behavior. + - Persistence stays at the JSON boundary through `Object.fromEntries` over + the Effect collection iterable; persisted load repopulates + `MutableHashMap` state. +- Verification: + - `cd ts && bun run --cwd packages/flow build` + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/librarian-service.test.ts src/__tests__/collection-manager.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2493,10 +2516,11 @@ Notes: compatibility facades, gateway/librarian helpers, and CLI command actions. The workbench random id helper is complete; the remaining workbench `Effect.gen` match is a local one-shot command effect value. - - Remaining real long-lived native collection target is Librarian service - state. Base processor registries, the standalone Librarian collection - manager, prompt template cache, and workbench explain triples module cache - are complete. Local traversal sets and test fakes remain no-op boundaries. + - Fresh long-lived native collection targets from the scratch inventory are + complete: Librarian service state, base processor registries, the + standalone Librarian collection manager, prompt template cache, and + workbench explain triples module cache. Local traversal sets and test fakes + remain no-op boundaries. ## Ranked Findings diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index 000ec3ad..7d3a0e6a 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -32,6 +32,7 @@ import { import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Match, Option, Random, SynchronizedRef } from "effect"; +import * as MutableHashMap from "effect/MutableHashMap"; import * as S from "effect/Schema"; import { makeCollectionManager, type CollectionManager } from "./collection-manager.js"; import { @@ -55,7 +56,7 @@ interface UploadSession { chunkSize: number; totalChunks: number; createdAt: string; - chunks: Map; + chunks: MutableHashMap.MutableHashMap; user: string; } @@ -185,9 +186,9 @@ export interface LibrarianService extends AsyncProcessorRuntime; - readonly processing: Map; - readonly uploads: Map; + readonly documents: MutableHashMap.MutableHashMap; + readonly processing: MutableHashMap.MutableHashMap; + readonly uploads: MutableHashMap.MutableHashMap; readonly collectionManager: CollectionManager; readonly libConsumer: BackendConsumer | null; readonly libProducer: BackendProducer | null; @@ -195,18 +196,24 @@ interface LibrarianServiceState { readonly colProducer: BackendProducer | null; } -const cloneDocuments = (source: Map): Map => - new Map(source); +const cloneDocuments = ( + source: MutableHashMap.MutableHashMap, +): MutableHashMap.MutableHashMap => + MutableHashMap.fromIterable(source); -const cloneProcessing = (source: Map): Map => - new Map(source); +const cloneProcessing = ( + source: MutableHashMap.MutableHashMap, +): MutableHashMap.MutableHashMap => + MutableHashMap.fromIterable(source); -const cloneUploads = (source: Map): Map => - new Map(source); +const cloneUploads = ( + source: MutableHashMap.MutableHashMap, +): MutableHashMap.MutableHashMap => + MutableHashMap.fromIterable(source); const cloneUploadSession = (session: UploadSession): UploadSession => ({ ...session, - chunks: new Map(session.chunks), + chunks: MutableHashMap.fromIterable(session.chunks), }); const cloneCollectionManager = (source: CollectionManager): CollectionManager => { @@ -216,9 +223,9 @@ const cloneCollectionManager = (source: CollectionManager): CollectionManager => }; const initialState = (): LibrarianServiceState => ({ - documents: new Map(), - processing: new Map(), - uploads: new Map(), + documents: MutableHashMap.empty(), + processing: MutableHashMap.empty(), + uploads: MutableHashMap.empty(), collectionManager: makeCollectionManager(), libConsumer: null, libProducer: null, @@ -252,7 +259,7 @@ const modifyResult = ( ): readonly [Value, LibrarianServiceState] => [value, state]; const uploadBytesReceived = (session: UploadSession): number => - [...session.chunks.values()].reduce((sum, chunk) => sum + chunk.length, 0); + Array.from(MutableHashMap.values(session.chunks)).reduce((sum, chunk) => sum + chunk.length, 0); const consumeOnceEffect = Effect.fnUntraced(function* ( service: LibrarianService, @@ -386,7 +393,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("get-document-metadata", "get-document-metadata requires documentId"); } - const doc = (yield* SynchronizedRef.get(current.state)).documents.get(id); + const doc = Option.getOrUndefined( + MutableHashMap.get((yield* SynchronizedRef.get(current.state)).documents, id), + ); if (doc === undefined) { return yield* librarianServiceError("get-document-metadata", `Document not found: ${id}`); } @@ -405,7 +414,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const children: DocumentMetadata[] = []; const currentState = yield* SynchronizedRef.get(current.state); - for (const doc of currentState.documents.values()) { + for (const doc of MutableHashMap.values(currentState.documents)) { if (doc.parentId === parentId) { children.push(doc); } @@ -430,7 +439,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => { - const currentSession = serviceState.uploads.get(uploadId); + const currentSession = Option.getOrUndefined(MutableHashMap.get(serviceState.uploads, uploadId)); if (currentSession === undefined) { return Effect.fail(librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`)); } @@ -439,14 +448,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } const session = cloneUploadSession(currentSession); - session.chunks.set(chunkIndex, content); + MutableHashMap.set(session.chunks, chunkIndex, content); const uploads = cloneUploads(serviceState.uploads); - uploads.set(uploadId, session); + MutableHashMap.set(uploads, uploadId, session); return Effect.succeed(modifyResult({ "upload-id": uploadId, "chunk-index": chunkIndex, - "chunks-received": session.chunks.size, + "chunks-received": MutableHashMap.size(session.chunks), "total-chunks": session.totalChunks, "bytes-received": uploadBytesReceived(session), "total-bytes": session.totalSize, @@ -465,17 +474,19 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (uploadId === undefined) { return yield* librarianServiceError("get-upload-status", "get-upload-status requires upload-id"); } - const session = (yield* SynchronizedRef.get(current.state)).uploads.get(uploadId); + const session = Option.getOrUndefined( + MutableHashMap.get((yield* SynchronizedRef.get(current.state)).uploads, uploadId), + ); if (session === undefined) { return yield* librarianServiceError("get-upload-status", `Upload not found: ${uploadId}`); } - const receivedChunks = [...session.chunks.keys()].sort((a, b) => a - b); + const receivedChunks = Array.from(MutableHashMap.keys(session.chunks)).sort((a, b) => a - b); const receivedSet = new Set(receivedChunks); const missingChunks = Array.from({ length: session.totalChunks }, (_, i) => i).filter((i) => !receivedSet.has(i)); return { "upload-id": uploadId, "upload-state": "in-progress", - "chunks-received": session.chunks.size, + "chunks-received": MutableHashMap.size(session.chunks), "total-chunks": session.totalChunks, "received-chunks": receivedChunks, "missing-chunks": missingChunks, @@ -493,11 +504,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("abort-upload", "abort-upload requires upload-id"); } return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => { - if (!serviceState.uploads.has(uploadId)) { + if (!MutableHashMap.has(serviceState.uploads, uploadId)) { return Effect.fail(librarianServiceError("abort-upload", `Upload not found: ${uploadId}`)); } const uploads = cloneUploads(serviceState.uploads); - uploads.delete(uploadId); + MutableHashMap.remove(uploads, uploadId); return Effect.succeed(modifyResult({}, { ...serviceState, uploads, @@ -834,7 +845,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS yield* SynchronizedRef.update(service.state, (serviceState) => { const documents = cloneDocuments(serviceState.documents); - documents.set(id, doc); + MutableHashMap.set(documents, id, doc); return { ...serviceState, documents, @@ -875,22 +886,22 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } const removal = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { - const childIds = [...serviceState.documents.entries()] + const childIds = Array.from(serviceState.documents) .filter(([, doc]) => doc.parentId === id) .map(([childId]) => childId); - const procIds = [...serviceState.processing.entries()] + const procIds = Array.from(serviceState.processing) .filter(([, proc]) => proc.documentId === id) .map(([procId]) => procId); const documents = cloneDocuments(serviceState.documents); - documents.delete(id); + MutableHashMap.remove(documents, id); for (const childId of childIds) { - documents.delete(childId); + MutableHashMap.remove(documents, childId); } const processing = cloneProcessing(serviceState.processing); for (const procId of procIds) { - processing.delete(procId); + MutableHashMap.remove(processing, procId); } return Effect.succeed(modifyResult({ childIds, procIds }, { @@ -943,7 +954,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (meta === undefined) return yield* librarianServiceError("update-document", "update-document requires documentMetadata"); const doc = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { - const existing = serviceState.documents.get(id); + const existing = Option.getOrUndefined(MutableHashMap.get(serviceState.documents, id)); if (existing === undefined) { return Effect.fail(librarianServiceError("update-document", `Document not found: ${id}`)); } @@ -954,7 +965,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS time: meta.time ?? existing.time, }); const documents = cloneDocuments(serviceState.documents); - documents.set(id, next); + MutableHashMap.set(documents, id, next); return Effect.succeed(modifyResult(next, { ...serviceState, documents, @@ -978,7 +989,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const docs: DocumentMetadata[] = []; const serviceState = this.state.pipe(stateSnapshot); - for (const doc of serviceState.documents.values()) { + for (const doc of MutableHashMap.values(serviceState.documents)) { // Filter by user if (user.length > 0 && doc.user !== user) continue; // Exclude children (only top-level documents) unless explicitly requested @@ -1008,7 +1019,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("get-document-content", "get-document-content requires documentId"); } - const doc = (yield* SynchronizedRef.get(service.state)).documents.get(id); + const doc = Option.getOrUndefined( + MutableHashMap.get((yield* SynchronizedRef.get(service.state)).documents, id), + ); if (doc === undefined) return yield* librarianServiceError("get-document-content", `Document not found: ${id}`); const filePath = joinPath(service.dataDir, "docs", `${id}.bin`); @@ -1051,11 +1064,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS }; yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { - if (Boolean(serviceState.documents.has(parentId)) === false) { + if (Boolean(MutableHashMap.has(serviceState.documents, parentId)) === false) { return Effect.fail(librarianServiceError("add-child-document", `Parent document not found: ${parentId}`)); } const documents = cloneDocuments(serviceState.documents); - documents.set(id, doc); + MutableHashMap.set(documents, id, doc); return Effect.succeed(modifyResult(undefined, { ...serviceState, documents, @@ -1114,7 +1127,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS yield* SynchronizedRef.update(service.state, (serviceState) => { const processing = cloneProcessing(serviceState.processing); - processing.set(id, record); + MutableHashMap.set(processing, id, record); return { ...serviceState, processing, @@ -1145,7 +1158,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS yield* SynchronizedRef.update(service.state, (serviceState) => { const processing = cloneProcessing(serviceState.processing); - processing.delete(id); + MutableHashMap.remove(processing, id); return { ...serviceState, processing, @@ -1169,7 +1182,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const records: ProcessingMetadata[] = []; const serviceState = this.state.pipe(stateSnapshot); - for (const proc of serviceState.processing.values()) { + for (const proc of MutableHashMap.values(serviceState.processing)) { const procDocumentId = proc.documentId ?? proc["document-id"]; if (documentId !== undefined && documentId.length > 0 && procDocumentId !== documentId) { continue; @@ -1209,13 +1222,13 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS chunkSize, totalChunks, createdAt, - chunks: new Map(), + chunks: MutableHashMap.empty(), user: meta.user ?? optionalString(req.user) ?? "default", }; yield* SynchronizedRef.update(service.state, (serviceState) => { const uploads = cloneUploads(serviceState.uploads); - uploads.set(uploadId, session); + MutableHashMap.set(uploads, uploadId, session); return { ...serviceState, uploads, @@ -1247,13 +1260,18 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS Effect.gen(function* () { const uploadId = optionalString(service.requestRecord(request)["upload-id"]); if (uploadId === undefined) return yield* librarianServiceError("complete-upload", "complete-upload requires upload-id"); - const session = (yield* SynchronizedRef.get(service.state)).uploads.get(uploadId); + const session = Option.getOrUndefined( + MutableHashMap.get((yield* SynchronizedRef.get(service.state)).uploads, uploadId), + ); if (session === undefined) return yield* librarianServiceError("complete-upload", `Upload not found: ${uploadId}`); - if (session.chunks.size !== session.totalChunks) { - return yield* librarianServiceError("complete-upload", `Upload incomplete: ${session.chunks.size}/${session.totalChunks} chunks received`); + const chunksReceived = MutableHashMap.size(session.chunks); + if (chunksReceived !== session.totalChunks) { + return yield* librarianServiceError("complete-upload", `Upload incomplete: ${chunksReceived}/${session.totalChunks} chunks received`); } - const content = Array.from({ length: session.totalChunks }, (_, i) => session.chunks.get(i) ?? "").join(""); + const content = Array.from({ length: session.totalChunks }, (_, i) => + Option.getOrUndefined(MutableHashMap.get(session.chunks, i)) ?? "" + ).join(""); const response = yield* Effect.tryPromise({ try: () => service.addDocument({ operation: "add-document", @@ -1266,7 +1284,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS }); yield* SynchronizedRef.update(service.state, (serviceState) => { const uploads = cloneUploads(serviceState.uploads); - uploads.delete(uploadId); + MutableHashMap.remove(uploads, uploadId); return { ...serviceState, uploads, @@ -1306,7 +1324,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const user = optionalString(service.requestRecord(request).user); const sessions = []; const serviceState = yield* SynchronizedRef.get(service.state); - for (const session of serviceState.uploads.values()) { + for (const session of MutableHashMap.values(serviceState.uploads)) { if (user !== undefined && session.user !== user) continue; const documentMetadataJson = yield* encodeJsonString( "list-uploads-document-metadata", @@ -1319,7 +1337,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "total-size": session.totalSize, "chunk-size": session.chunkSize, "total-chunks": session.totalChunks, - "chunks-received": session.chunks.size, + "chunks-received": MutableHashMap.size(session.chunks), "created-at": session.createdAt, }); } @@ -1528,17 +1546,17 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (parsed === null) return; - const documents = new Map(); + const documents = MutableHashMap.empty(); if (parsed.documents !== undefined) { for (const [id, doc] of Object.entries(parsed.documents)) { - documents.set(id, service.publicDocument(doc)); + MutableHashMap.set(documents, id, service.publicDocument(doc)); } } - const processing = new Map(); + const processing = MutableHashMap.empty(); if (parsed.processing !== undefined) { for (const [id, proc] of Object.entries(parsed.processing)) { - processing.set(id, service.publicProcessing(proc)); + MutableHashMap.set(processing, id, service.publicProcessing(proc)); } } @@ -1555,7 +1573,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS })); yield* Effect.log( - `[LibrarianService] Loaded persisted state (documents=${documents.size}, processing=${processing.size})`, + `[LibrarianService] Loaded persisted state (documents=${MutableHashMap.size(documents)}, processing=${MutableHashMap.size(processing)})`, ); }), );