Use MutableHashMap for librarian state

This commit is contained in:
elpresidank 2026-06-04 08:02:20 -05:00
parent 4ffa84dbe7
commit c4500f216e
2 changed files with 101 additions and 59 deletions

View file

@ -32,6 +32,7 @@ import {
import type { Message } from "@trustgraph/base";
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Match, Option, Random, SynchronizedRef } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as S from "effect/Schema";
import { makeCollectionManager, type CollectionManager } from "./collection-manager.js";
import {
@ -55,7 +56,7 @@ interface UploadSession {
chunkSize: number;
totalChunks: number;
createdAt: string;
chunks: Map<number, string>;
chunks: MutableHashMap.MutableHashMap<number, string>;
user: string;
}
@ -185,9 +186,9 @@ export interface LibrarianService extends AsyncProcessorRuntime<LibrarianService
}
interface LibrarianServiceState {
readonly documents: Map<string, DocumentMetadata>;
readonly processing: Map<string, ProcessingMetadata>;
readonly uploads: Map<string, UploadSession>;
readonly documents: MutableHashMap.MutableHashMap<string, DocumentMetadata>;
readonly processing: MutableHashMap.MutableHashMap<string, ProcessingMetadata>;
readonly uploads: MutableHashMap.MutableHashMap<string, UploadSession>;
readonly collectionManager: CollectionManager;
readonly libConsumer: BackendConsumer<LibrarianRequest> | null;
readonly libProducer: BackendProducer<LibrarianResponse> | null;
@ -195,18 +196,24 @@ interface LibrarianServiceState {
readonly colProducer: BackendProducer<CollectionManagementResponse> | null;
}
const cloneDocuments = (source: Map<string, DocumentMetadata>): Map<string, DocumentMetadata> =>
new Map(source);
const cloneDocuments = (
source: MutableHashMap.MutableHashMap<string, DocumentMetadata>,
): MutableHashMap.MutableHashMap<string, DocumentMetadata> =>
MutableHashMap.fromIterable(source);
const cloneProcessing = (source: Map<string, ProcessingMetadata>): Map<string, ProcessingMetadata> =>
new Map(source);
const cloneProcessing = (
source: MutableHashMap.MutableHashMap<string, ProcessingMetadata>,
): MutableHashMap.MutableHashMap<string, ProcessingMetadata> =>
MutableHashMap.fromIterable(source);
const cloneUploads = (source: Map<string, UploadSession>): Map<string, UploadSession> =>
new Map(source);
const cloneUploads = (
source: MutableHashMap.MutableHashMap<string, UploadSession>,
): MutableHashMap.MutableHashMap<string, UploadSession> =>
MutableHashMap.fromIterable(source);
const cloneUploadSession = (session: UploadSession): UploadSession => ({
...session,
chunks: new Map(session.chunks),
chunks: MutableHashMap.fromIterable(session.chunks),
});
const cloneCollectionManager = (source: CollectionManager): CollectionManager => {
@ -216,9 +223,9 @@ const cloneCollectionManager = (source: CollectionManager): CollectionManager =>
};
const initialState = (): LibrarianServiceState => ({
documents: new Map<string, DocumentMetadata>(),
processing: new Map<string, ProcessingMetadata>(),
uploads: new Map<string, UploadSession>(),
documents: MutableHashMap.empty<string, DocumentMetadata>(),
processing: MutableHashMap.empty<string, ProcessingMetadata>(),
uploads: MutableHashMap.empty<string, UploadSession>(),
collectionManager: makeCollectionManager(),
libConsumer: null,
libProducer: null,
@ -252,7 +259,7 @@ const modifyResult = <Value>(
): readonly [Value, LibrarianServiceState] => [value, state];
const uploadBytesReceived = (session: UploadSession): number =>
[...session.chunks.values()].reduce((sum, chunk) => sum + chunk.length, 0);
Array.from(MutableHashMap.values(session.chunks)).reduce((sum, chunk) => sum + chunk.length, 0);
const consumeOnceEffect = Effect.fnUntraced(function* (
service: LibrarianService,
@ -386,7 +393,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
return yield* librarianServiceError("get-document-metadata", "get-document-metadata requires documentId");
}
const doc = (yield* SynchronizedRef.get(current.state)).documents.get(id);
const doc = Option.getOrUndefined(
MutableHashMap.get((yield* SynchronizedRef.get(current.state)).documents, id),
);
if (doc === undefined) {
return yield* librarianServiceError("get-document-metadata", `Document not found: ${id}`);
}
@ -405,7 +414,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const children: DocumentMetadata[] = [];
const currentState = yield* SynchronizedRef.get(current.state);
for (const doc of currentState.documents.values()) {
for (const doc of MutableHashMap.values(currentState.documents)) {
if (doc.parentId === parentId) {
children.push(doc);
}
@ -430,7 +439,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => {
const currentSession = serviceState.uploads.get(uploadId);
const currentSession = Option.getOrUndefined(MutableHashMap.get(serviceState.uploads, uploadId));
if (currentSession === undefined) {
return Effect.fail(librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`));
}
@ -439,14 +448,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
const session = cloneUploadSession(currentSession);
session.chunks.set(chunkIndex, content);
MutableHashMap.set(session.chunks, chunkIndex, content);
const uploads = cloneUploads(serviceState.uploads);
uploads.set(uploadId, session);
MutableHashMap.set(uploads, uploadId, session);
return Effect.succeed(modifyResult({
"upload-id": uploadId,
"chunk-index": chunkIndex,
"chunks-received": session.chunks.size,
"chunks-received": MutableHashMap.size(session.chunks),
"total-chunks": session.totalChunks,
"bytes-received": uploadBytesReceived(session),
"total-bytes": session.totalSize,
@ -465,17 +474,19 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
if (uploadId === undefined) {
return yield* librarianServiceError("get-upload-status", "get-upload-status requires upload-id");
}
const session = (yield* SynchronizedRef.get(current.state)).uploads.get(uploadId);
const session = Option.getOrUndefined(
MutableHashMap.get((yield* SynchronizedRef.get(current.state)).uploads, uploadId),
);
if (session === undefined) {
return yield* librarianServiceError("get-upload-status", `Upload not found: ${uploadId}`);
}
const receivedChunks = [...session.chunks.keys()].sort((a, b) => a - b);
const receivedChunks = Array.from(MutableHashMap.keys(session.chunks)).sort((a, b) => a - b);
const receivedSet = new Set(receivedChunks);
const missingChunks = Array.from({ length: session.totalChunks }, (_, i) => i).filter((i) => !receivedSet.has(i));
return {
"upload-id": uploadId,
"upload-state": "in-progress",
"chunks-received": session.chunks.size,
"chunks-received": MutableHashMap.size(session.chunks),
"total-chunks": session.totalChunks,
"received-chunks": receivedChunks,
"missing-chunks": missingChunks,
@ -493,11 +504,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
return yield* librarianServiceError("abort-upload", "abort-upload requires upload-id");
}
return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => {
if (!serviceState.uploads.has(uploadId)) {
if (!MutableHashMap.has(serviceState.uploads, uploadId)) {
return Effect.fail(librarianServiceError("abort-upload", `Upload not found: ${uploadId}`));
}
const uploads = cloneUploads(serviceState.uploads);
uploads.delete(uploadId);
MutableHashMap.remove(uploads, uploadId);
return Effect.succeed(modifyResult({}, {
...serviceState,
uploads,
@ -834,7 +845,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
yield* SynchronizedRef.update(service.state, (serviceState) => {
const documents = cloneDocuments(serviceState.documents);
documents.set(id, doc);
MutableHashMap.set(documents, id, doc);
return {
...serviceState,
documents,
@ -875,22 +886,22 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
const removal = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => {
const childIds = [...serviceState.documents.entries()]
const childIds = Array.from(serviceState.documents)
.filter(([, doc]) => doc.parentId === id)
.map(([childId]) => childId);
const procIds = [...serviceState.processing.entries()]
const procIds = Array.from(serviceState.processing)
.filter(([, proc]) => proc.documentId === id)
.map(([procId]) => procId);
const documents = cloneDocuments(serviceState.documents);
documents.delete(id);
MutableHashMap.remove(documents, id);
for (const childId of childIds) {
documents.delete(childId);
MutableHashMap.remove(documents, childId);
}
const processing = cloneProcessing(serviceState.processing);
for (const procId of procIds) {
processing.delete(procId);
MutableHashMap.remove(processing, procId);
}
return Effect.succeed(modifyResult({ childIds, procIds }, {
@ -943,7 +954,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
if (meta === undefined) return yield* librarianServiceError("update-document", "update-document requires documentMetadata");
const doc = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => {
const existing = serviceState.documents.get(id);
const existing = Option.getOrUndefined(MutableHashMap.get(serviceState.documents, id));
if (existing === undefined) {
return Effect.fail(librarianServiceError("update-document", `Document not found: ${id}`));
}
@ -954,7 +965,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
time: meta.time ?? existing.time,
});
const documents = cloneDocuments(serviceState.documents);
documents.set(id, next);
MutableHashMap.set(documents, id, next);
return Effect.succeed(modifyResult(next, {
...serviceState,
documents,
@ -978,7 +989,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const docs: DocumentMetadata[] = [];
const serviceState = this.state.pipe(stateSnapshot);
for (const doc of serviceState.documents.values()) {
for (const doc of MutableHashMap.values(serviceState.documents)) {
// Filter by user
if (user.length > 0 && doc.user !== user) continue;
// Exclude children (only top-level documents) unless explicitly requested
@ -1008,7 +1019,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
return yield* librarianServiceError("get-document-content", "get-document-content requires documentId");
}
const doc = (yield* SynchronizedRef.get(service.state)).documents.get(id);
const doc = Option.getOrUndefined(
MutableHashMap.get((yield* SynchronizedRef.get(service.state)).documents, id),
);
if (doc === undefined) return yield* librarianServiceError("get-document-content", `Document not found: ${id}`);
const filePath = joinPath(service.dataDir, "docs", `${id}.bin`);
@ -1051,11 +1064,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
};
yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => {
if (Boolean(serviceState.documents.has(parentId)) === false) {
if (Boolean(MutableHashMap.has(serviceState.documents, parentId)) === false) {
return Effect.fail(librarianServiceError("add-child-document", `Parent document not found: ${parentId}`));
}
const documents = cloneDocuments(serviceState.documents);
documents.set(id, doc);
MutableHashMap.set(documents, id, doc);
return Effect.succeed(modifyResult(undefined, {
...serviceState,
documents,
@ -1114,7 +1127,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
yield* SynchronizedRef.update(service.state, (serviceState) => {
const processing = cloneProcessing(serviceState.processing);
processing.set(id, record);
MutableHashMap.set(processing, id, record);
return {
...serviceState,
processing,
@ -1145,7 +1158,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
yield* SynchronizedRef.update(service.state, (serviceState) => {
const processing = cloneProcessing(serviceState.processing);
processing.delete(id);
MutableHashMap.remove(processing, id);
return {
...serviceState,
processing,
@ -1169,7 +1182,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const records: ProcessingMetadata[] = [];
const serviceState = this.state.pipe(stateSnapshot);
for (const proc of serviceState.processing.values()) {
for (const proc of MutableHashMap.values(serviceState.processing)) {
const procDocumentId = proc.documentId ?? proc["document-id"];
if (documentId !== undefined && documentId.length > 0 && procDocumentId !== documentId) {
continue;
@ -1209,13 +1222,13 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
chunkSize,
totalChunks,
createdAt,
chunks: new Map<number, string>(),
chunks: MutableHashMap.empty<number, string>(),
user: meta.user ?? optionalString(req.user) ?? "default",
};
yield* SynchronizedRef.update(service.state, (serviceState) => {
const uploads = cloneUploads(serviceState.uploads);
uploads.set(uploadId, session);
MutableHashMap.set(uploads, uploadId, session);
return {
...serviceState,
uploads,
@ -1247,13 +1260,18 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
Effect.gen(function* () {
const uploadId = optionalString(service.requestRecord(request)["upload-id"]);
if (uploadId === undefined) return yield* librarianServiceError("complete-upload", "complete-upload requires upload-id");
const session = (yield* SynchronizedRef.get(service.state)).uploads.get(uploadId);
const session = Option.getOrUndefined(
MutableHashMap.get((yield* SynchronizedRef.get(service.state)).uploads, uploadId),
);
if (session === undefined) return yield* librarianServiceError("complete-upload", `Upload not found: ${uploadId}`);
if (session.chunks.size !== session.totalChunks) {
return yield* librarianServiceError("complete-upload", `Upload incomplete: ${session.chunks.size}/${session.totalChunks} chunks received`);
const chunksReceived = MutableHashMap.size(session.chunks);
if (chunksReceived !== session.totalChunks) {
return yield* librarianServiceError("complete-upload", `Upload incomplete: ${chunksReceived}/${session.totalChunks} chunks received`);
}
const content = Array.from({ length: session.totalChunks }, (_, i) => session.chunks.get(i) ?? "").join("");
const content = Array.from({ length: session.totalChunks }, (_, i) =>
Option.getOrUndefined(MutableHashMap.get(session.chunks, i)) ?? ""
).join("");
const response = yield* Effect.tryPromise<LibrarianResponse, LibrarianServiceError>({
try: () => service.addDocument({
operation: "add-document",
@ -1266,7 +1284,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
});
yield* SynchronizedRef.update(service.state, (serviceState) => {
const uploads = cloneUploads(serviceState.uploads);
uploads.delete(uploadId);
MutableHashMap.remove(uploads, uploadId);
return {
...serviceState,
uploads,
@ -1306,7 +1324,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const user = optionalString(service.requestRecord(request).user);
const sessions = [];
const serviceState = yield* SynchronizedRef.get(service.state);
for (const session of serviceState.uploads.values()) {
for (const session of MutableHashMap.values(serviceState.uploads)) {
if (user !== undefined && session.user !== user) continue;
const documentMetadataJson = yield* encodeJsonString(
"list-uploads-document-metadata",
@ -1319,7 +1337,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"total-size": session.totalSize,
"chunk-size": session.chunkSize,
"total-chunks": session.totalChunks,
"chunks-received": session.chunks.size,
"chunks-received": MutableHashMap.size(session.chunks),
"created-at": session.createdAt,
});
}
@ -1528,17 +1546,17 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
if (parsed === null) return;
const documents = new Map<string, DocumentMetadata>();
const documents = MutableHashMap.empty<string, DocumentMetadata>();
if (parsed.documents !== undefined) {
for (const [id, doc] of Object.entries(parsed.documents)) {
documents.set(id, service.publicDocument(doc));
MutableHashMap.set(documents, id, service.publicDocument(doc));
}
}
const processing = new Map<string, ProcessingMetadata>();
const processing = MutableHashMap.empty<string, ProcessingMetadata>();
if (parsed.processing !== undefined) {
for (const [id, proc] of Object.entries(parsed.processing)) {
processing.set(id, service.publicProcessing(proc));
MutableHashMap.set(processing, id, service.publicProcessing(proc));
}
}
@ -1555,7 +1573,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}));
yield* Effect.log(
`[LibrarianService] Loaded persisted state (documents=${documents.size}, processing=${processing.size})`,
`[LibrarianService] Loaded persisted state (documents=${MutableHashMap.size(documents)}, processing=${MutableHashMap.size(processing)})`,
);
}),
);