diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 33d43c58..44057f30 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,15 +12,15 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Librarian typed -runtime loop slice: +Current signal counts from `ts/packages` after the 2026-06-02 Librarian +ref-backed state slice: | Signal | Count | | --- | ---: | | `Effect.runPromise` | 208 | -| `Map<` | 77 | -| `WebSocket` | 47 | -| `new Map` | 56 | +| `Map<` | 88 | +| `WebSocket` | 49 | +| `new Map` | 62 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | @@ -28,7 +28,7 @@ runtime loop slice: | `new Error` | 14 | | `new Promise` | 10 | | `JSON.parse` | 7 | -| `localStorage` | 9 | +| `localStorage` | 8 | | `JSON.stringify` | 6 | | `setTimeout` | 4 | | `process.env` | 3 | @@ -42,10 +42,9 @@ Notes: - `Effect.runPromise` is expected at external Promise compatibility boundaries, but each match should still be audited for avoidable internal runtime ownership. -- The `Effect.runPromise`, `Map<`, and `new Map` counts increased in this - snapshot because the FlowManager slice added focused service tests and - Promise compatibility facades while removing the service's internal mutable - object state. +- The `Map<` and `new Map` counts increased in this snapshot because the + Librarian slice introduced explicit ref-backed state types and clone helpers + while removing the service object's direct mutable maps/handles. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -269,8 +268,7 @@ Notes: - New librarian tests cover modeled upload fields, concrete persisted-state loading, and schema-backed metadata triple normalization. - Remaining: - - Librarian still has the dynamic `AsyncProcessorRuntime & Record` service object. Keep it as the next P0 state/ref-backed migration. + - Resolved by the typed runtime loop and ref-backed state slices below. - Verification: - `bun run --cwd ts/packages/base build` - `bun run --cwd ts/packages/flow build` @@ -294,9 +292,7 @@ Notes: - The librarian tests now await the Promise compatibility facade for upload status. - Remaining: - - The typed runtime loop slice addresses the dynamic service object and raw - poll loop. Librarian mutable maps/handles remain the next P0 ref-backed - state migration. + - Resolved by the typed runtime loop and ref-backed state slices below. - Verification: - `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts` - `bun run --cwd ts/packages/flow build` @@ -320,9 +316,7 @@ Notes: - The local operation helpers retrieve the initialized service through an Effect gate rather than closing over an unsafe partially built value. - Remaining: - - Librarian still stores `documents`, `processing`, `uploads`, collection - manager, and producer/consumer handles as mutable fields. Move those into - `SynchronizedRef` next. + - Resolved by the ref-backed state slice below. - Verification: - `bun run --cwd ts/packages/flow build` - `cd ts && bun run check` @@ -331,6 +325,29 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Librarian Ref-Backed State Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/librarian/service.ts` now stores documents, + processing records, upload sessions, collection manager, and pubsub + handles in `SynchronizedRef`. + - Document, processing, upload, collection, persistence, load, and stop paths + now read snapshots or mutate cloned maps/managers through the ref instead + of writing fields on the service object. + - Upload chunk updates clone nested `UploadSession.chunks` before replacing + the upload map entry, avoiding mutable nested state hidden behind the ref. + - Librarian response producers and consumers are read/nullified through + ref-backed handles. +- Verification: + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts` + - `cd ts && bun run check` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -340,12 +357,10 @@ Notes: the client API is less Promise-first. - MCP env is now Config-backed; continue that policy for future MCP settings. - Flow stateful services: - - Config service, KnowledgeCore service, and FlowManager ref-backed state - are complete. Librarian now has native Effect module startup, a typed - service surface, and an `Effect.whileLoop` runtime, but it still stores - service maps and pubsub handles as mutable fields. It remains a good - candidate for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, - `Schedule`, and managed persistence. + - Config service, KnowledgeCore service, FlowManager, and Librarian + ref-backed state slices are complete. Follow-up service work should focus + on scoped layers, schedules where polling semantics allow, and managed + persistence providers rather than direct mutable service fields. - Persistence IO should move toward `FileSystem` or `KeyValueStore` where the installed beta has the needed provider surface. - Base messaging/processors: @@ -371,26 +386,6 @@ Notes: ## Ranked Findings -### P0: Migrate Librarian Mutable State To Ref-Backed Effect Service - -- TrustGraph evidence: - - `ts/packages/flow/src/librarian/service.ts` -- Effect primitives: - - `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`, - `Effect.addFinalizer`, `Config`, `Schema`, `FileSystem`, - `KeyValueStore`. -- Rewrite shape: - - Model one remaining service at a time as a `Context` service plus scoped - layer or ref-backed state slice. - - Store mutable service state in `Ref` or `SynchronizedRef`. - - Run service main programs with platform runtime entrypoints such as - `NodeRuntime.runMain`; keep `ManagedRuntime` only for compatibility - Promise facades. - - Replace polling sleep loops with schedules where behavior allows. - - Decode persisted payloads and config with schemas at boundaries. -- Tests: - - Service-specific tests plus `cd ts && bun run --cwd packages/flow test`. - ### P1: Finish Client RPC Boundary Modernization - TrustGraph evidence: @@ -481,12 +476,11 @@ Notes: ## Recommended PR Order -1. Librarian or flow-manager scoped state migration. -2. Client RPC managed runtime/scoped layer cleanup. -3. Base processor registry and constructor shim redesign. -4. Gateway RPC callback and client streaming completion cleanup. -5. Storage/provider managed resource cleanup. -6. MCP parity/deletion decision and workbench platform polish. +1. Client RPC managed runtime/scoped layer cleanup. +2. Base processor registry and constructor shim redesign. +3. Gateway RPC callback and client streaming completion cleanup. +4. Storage/provider managed resource cleanup. +5. MCP parity/deletion decision and workbench platform polish. ## No-Op Rules diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index dde1c875..fdb08f56 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -31,7 +31,7 @@ import { } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect"; +import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random, SynchronizedRef } from "effect"; import * as S from "effect/Schema"; import { makeCollectionManager, type CollectionManager } from "./collection-manager.js"; import { @@ -144,14 +144,7 @@ const randomUuid: Effect.Effect = Effect.gen(function* () { }); export interface LibrarianService extends AsyncProcessorRuntime { - documents: Map; - processing: Map; - uploads: Map; - collectionManager: CollectionManager; - libConsumer: BackendConsumer | null; - libProducer: BackendProducer | null; - colConsumer: BackendConsumer | null; - colProducer: BackendProducer | null; + state: SynchronizedRef.SynchronizedRef; dataDir: string; persistPath: string; requestRecord: (request: LibrarianRequest) => Record; @@ -191,15 +184,86 @@ export interface LibrarianService extends AsyncProcessorRuntime Promise; } +interface LibrarianServiceState { + readonly documents: Map; + readonly processing: Map; + readonly uploads: Map; + readonly collectionManager: CollectionManager; + readonly libConsumer: BackendConsumer | null; + readonly libProducer: BackendProducer | null; + readonly colConsumer: BackendConsumer | null; + readonly colProducer: BackendProducer | null; +} + +const cloneDocuments = (source: Map): Map => + new Map(source); + +const cloneProcessing = (source: Map): Map => + new Map(source); + +const cloneUploads = (source: Map): Map => + new Map(source); + +const cloneUploadSession = (session: UploadSession): UploadSession => ({ + ...session, + chunks: new Map(session.chunks), +}); + +const cloneCollectionManager = (source: CollectionManager): CollectionManager => { + const manager = makeCollectionManager(); + manager.loadFromJSON(source.toJSON()); + return manager; +}; + +const initialState = (): LibrarianServiceState => ({ + documents: new Map(), + processing: new Map(), + uploads: new Map(), + collectionManager: makeCollectionManager(), + libConsumer: null, + libProducer: null, + colConsumer: null, + colProducer: null, +}); + +const stateSnapshot = (stateRef: SynchronizedRef.SynchronizedRef): LibrarianServiceState => + SynchronizedRef.getUnsafe(stateRef); + +const updateHandles = ( + stateRef: SynchronizedRef.SynchronizedRef, + handles: { + readonly libConsumer?: BackendConsumer | null; + readonly libProducer?: BackendProducer | null; + readonly colConsumer?: BackendConsumer | null; + readonly colProducer?: BackendProducer | null; + }, +) => + SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + libConsumer: handles.libConsumer === undefined ? state.libConsumer : handles.libConsumer, + libProducer: handles.libProducer === undefined ? state.libProducer : handles.libProducer, + colConsumer: handles.colConsumer === undefined ? state.colConsumer : handles.colConsumer, + colProducer: handles.colProducer === undefined ? state.colProducer : handles.colProducer, + })); + +const modifyResult = ( + value: Value, + state: LibrarianServiceState, +): readonly [Value, LibrarianServiceState] => [value, state]; + +const uploadBytesReceived = (session: UploadSession): number => + [...session.chunks.values()].reduce((sum, chunk) => sum + chunk.length, 0); + const consumeOnceEffect = ( service: LibrarianService, ): Effect.Effect => Effect.gen(function* () { - const libConsumer = service.libConsumer; + const state = yield* SynchronizedRef.get(service.state); + const libConsumer = state.libConsumer; if (libConsumer === null) { return yield* librarianServiceError("consume", "Librarian consumer not started"); } - const colConsumer = service.colConsumer; + const colConsumer = state.colConsumer; if (colConsumer === null) { return yield* librarianServiceError("consume", "Collection consumer not started"); } @@ -249,33 +313,35 @@ const runLibrarianServiceEffect = ( catch: (cause) => librarianServiceError("load", cause), }); - service.libProducer = yield* Effect.tryPromise({ + const libProducer = yield* Effect.tryPromise({ try: () => service.pubsub.createProducer({ topic: topics.librarianResponse, }), catch: (cause) => librarianServiceError("librarian-producer", cause), }); - service.colProducer = yield* Effect.tryPromise({ + const colProducer = yield* Effect.tryPromise({ try: () => service.pubsub.createProducer({ topic: topics.collectionManagementResponse, }), catch: (cause) => librarianServiceError("collection-producer", cause), }); + yield* updateHandles(service.state, { libProducer, colProducer }); - service.libConsumer = yield* Effect.tryPromise({ + const libConsumer = yield* Effect.tryPromise({ try: () => service.pubsub.createConsumer({ topic: topics.librarianRequest, subscription: `${service.config.id}-librarian-request`, }), catch: (cause) => librarianServiceError("librarian-consumer", cause), }); - service.colConsumer = yield* Effect.tryPromise({ + const colConsumer = yield* Effect.tryPromise({ try: () => service.pubsub.createConsumer({ topic: topics.collectionManagementRequest, subscription: `${service.config.id}-collection-management-request`, }), catch: (cause) => librarianServiceError("collection-consumer", cause), }); + yield* updateHandles(service.state, { libConsumer, colConsumer }); yield* Effect.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`); @@ -295,6 +361,7 @@ const runLibrarianServiceEffect = ( }); export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianService { + const state = SynchronizedRef.makeUnsafe(initialState()); let service: LibrarianService | undefined; const getService = Effect.sync(() => service).pipe( @@ -320,7 +387,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("get-document-metadata", "get-document-metadata requires documentId"); } - const doc = current.documents.get(id); + const doc = (yield* SynchronizedRef.get(current.state)).documents.get(id); if (doc === undefined) { return yield* librarianServiceError("get-document-metadata", `Document not found: ${id}`); } @@ -337,7 +404,8 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } const children: DocumentMetadata[] = []; - for (const doc of current.documents.values()) { + const currentState = yield* SynchronizedRef.get(current.state); + for (const doc of currentState.documents.values()) { if (doc.parentId === parentId) { children.push(doc); } @@ -354,29 +422,38 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (uploadId === undefined) { return yield* librarianServiceError("upload-chunk", "upload-chunk requires upload-id"); } - const session = current.uploads.get(uploadId); - if (session === undefined) { - return yield* librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`); - } const chunkIndex = typeof req["chunk-index"] === "number" ? req["chunk-index"] : -1; - if (!Number.isInteger(chunkIndex) || chunkIndex < 0 || chunkIndex >= session.totalChunks) { - return yield* librarianServiceError("upload-chunk", "upload-chunk requires a valid chunk-index"); - } const content = optionalString(req.content); if (content === undefined) { return yield* librarianServiceError("upload-chunk", "upload-chunk requires content"); } - session.chunks.set(chunkIndex, content); - const bytesReceived = [...session.chunks.values()].reduce((sum, chunk) => sum + chunk.length, 0); - return { - "upload-id": uploadId, - "chunk-index": chunkIndex, - "chunks-received": session.chunks.size, - "total-chunks": session.totalChunks, - "bytes-received": bytesReceived, - "total-bytes": session.totalSize, - }; + return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => { + const currentSession = serviceState.uploads.get(uploadId); + if (currentSession === undefined) { + return Effect.fail(librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`)); + } + if (!Number.isInteger(chunkIndex) || chunkIndex < 0 || chunkIndex >= currentSession.totalChunks) { + return Effect.fail(librarianServiceError("upload-chunk", "upload-chunk requires a valid chunk-index")); + } + + const session = cloneUploadSession(currentSession); + session.chunks.set(chunkIndex, content); + const uploads = cloneUploads(serviceState.uploads); + uploads.set(uploadId, session); + + return Effect.succeed(modifyResult({ + "upload-id": uploadId, + "chunk-index": chunkIndex, + "chunks-received": session.chunks.size, + "total-chunks": session.totalChunks, + "bytes-received": uploadBytesReceived(session), + "total-bytes": session.totalSize, + }, { + ...serviceState, + uploads, + })); + }); }); const getUploadStatusEffect = (request: LibrarianRequest): Effect.Effect => @@ -386,14 +463,13 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (uploadId === undefined) { return yield* librarianServiceError("get-upload-status", "get-upload-status requires upload-id"); } - const session = current.uploads.get(uploadId); + const session = (yield* SynchronizedRef.get(current.state)).uploads.get(uploadId); if (session === undefined) { return yield* librarianServiceError("get-upload-status", `Upload not found: ${uploadId}`); } const receivedChunks = [...session.chunks.keys()].sort((a, b) => a - b); const receivedSet = new Set(receivedChunks); const missingChunks = Array.from({ length: session.totalChunks }, (_, i) => i).filter((i) => !receivedSet.has(i)); - const bytesReceived = [...session.chunks.values()].reduce((sum, chunk) => sum + chunk.length, 0); return { "upload-id": uploadId, "upload-state": "in-progress", @@ -401,7 +477,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "total-chunks": session.totalChunks, "received-chunks": receivedChunks, "missing-chunks": missingChunks, - "bytes-received": bytesReceived, + "bytes-received": uploadBytesReceived(session), "total-bytes": session.totalSize, }; }); @@ -413,19 +489,21 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (uploadId === undefined) { return yield* librarianServiceError("abort-upload", "abort-upload requires upload-id"); } - current.uploads.delete(uploadId); - return {}; + return yield* SynchronizedRef.modifyEffect(current.state, (serviceState) => { + if (!serviceState.uploads.has(uploadId)) { + return Effect.fail(librarianServiceError("abort-upload", `Upload not found: ${uploadId}`)); + } + const uploads = cloneUploads(serviceState.uploads); + uploads.delete(uploadId); + return Effect.succeed(modifyResult({}, { + ...serviceState, + uploads, + })); + }); }); const librarianService: LibrarianService = Object.assign(base, { - documents: new Map(), - processing: new Map(), - uploads: new Map(), - collectionManager: makeCollectionManager(), - libConsumer: null, - libProducer: null, - colConsumer: null, - colProducer: null, + state, dataDir, persistPath, @@ -596,7 +674,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const sendResponse = (response: LibrarianResponse): Effect.Effect => Effect.gen(function* () { - const producer = service.libProducer; + const producer = (yield* SynchronizedRef.get(service.state)).libProducer; if (producer === null) { return yield* librarianServiceError("librarian-respond", "Librarian producer not started"); } @@ -743,7 +821,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS time: now, }; - service.documents.set(id, doc); + yield* SynchronizedRef.update(service.state, (serviceState) => { + const documents = cloneDocuments(serviceState.documents); + documents.set(id, doc); + return { + ...serviceState, + documents, + }; + }); // Store file content if provided if (request.content !== undefined && request.content.length > 0) { @@ -778,8 +863,31 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("remove-document", "remove-document requires documentId"); } - // Remove the document itself - service.documents.delete(id); + const removal = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { + const childIds = [...serviceState.documents.entries()] + .filter(([, doc]) => doc.parentId === id) + .map(([childId]) => childId); + const procIds = [...serviceState.processing.entries()] + .filter(([, proc]) => proc.documentId === id) + .map(([procId]) => procId); + + const documents = cloneDocuments(serviceState.documents); + documents.delete(id); + for (const childId of childIds) { + documents.delete(childId); + } + + const processing = cloneProcessing(serviceState.processing); + for (const procId of procIds) { + processing.delete(procId); + } + + return Effect.succeed(modifyResult({ childIds, procIds }, { + ...serviceState, + documents, + processing, + })); + }); // Remove the file yield* Effect.tryPromise({ @@ -788,32 +896,18 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS }).pipe(Effect.orElseSucceed(() => undefined)); // Cascade: remove children - const childIds = [...service.documents.entries()] - .filter(([, doc]) => doc.parentId === id) - .map(([childId]) => childId); - - for (const childId of childIds) { - service.documents.delete(childId); + for (const childId of removal.childIds) { yield* Effect.tryPromise({ try: () => removePath(joinPath(service.dataDir, "docs", `${childId}.bin`)), catch: (cause) => librarianServiceError("remove-child-file", cause), }).pipe(Effect.orElseSucceed(() => undefined)); } - // Remove associated processing records - const procIds = [...service.processing.entries()] - .filter(([, proc]) => proc.documentId === id) - .map(([procId]) => procId); - - for (const procId of procIds) { - service.processing.delete(procId); - } - yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("remove-document-persist", cause), }); - yield* Effect.log(`[LibrarianService] Removed document ${id} (cascade: ${childIds.length} children, ${procIds.length} processing)`); + yield* Effect.log(`[LibrarianService] Removed document ${id} (cascade: ${removal.childIds.length} children, ${removal.procIds.length} processing)`); return {}; }), @@ -835,17 +929,26 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (id === undefined || id.length === 0) { return yield* librarianServiceError("update-document", "update-document requires documentId"); } - const existing = service.documents.get(id); - if (existing === undefined) return yield* librarianServiceError("update-document", `Document not found: ${id}`); if (meta === undefined) return yield* librarianServiceError("update-document", "update-document requires documentMetadata"); - const doc: DocumentMetadata = service.publicDocument({ - ...existing, - ...meta, - id, - time: meta.time ?? existing.time, + const doc = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { + const existing = serviceState.documents.get(id); + if (existing === undefined) { + return Effect.fail(librarianServiceError("update-document", `Document not found: ${id}`)); + } + const next: DocumentMetadata = service.publicDocument({ + ...existing, + ...meta, + id, + time: meta.time ?? existing.time, + }); + const documents = cloneDocuments(serviceState.documents); + documents.set(id, next); + return Effect.succeed(modifyResult(next, { + ...serviceState, + documents, + })); }); - service.documents.set(id, doc); yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("update-document-persist", cause), @@ -862,8 +965,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const user = request.user ?? ""; const includeChildren = this.requestRecord(request)["include-children"] === true; const docs: DocumentMetadata[] = []; + const serviceState = this.state.pipe(stateSnapshot); - for (const doc of this.documents.values()) { + for (const doc of serviceState.documents.values()) { // Filter by user if (user.length > 0 && doc.user !== user) continue; // Exclude children (only top-level documents) unless explicitly requested @@ -893,7 +997,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("get-document-content", "get-document-content requires documentId"); } - const doc = service.documents.get(id); + const doc = (yield* SynchronizedRef.get(service.state)).documents.get(id); if (doc === undefined) return yield* librarianServiceError("get-document-content", `Document not found: ${id}`); const filePath = joinPath(service.dataDir, "docs", `${id}.bin`); @@ -924,11 +1028,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (meta.parentId === undefined || meta.parentId.length === 0) { return yield* librarianServiceError("add-child-document", "add-child-document requires parentId in metadata"); } - - // Verify parent exists - if (Boolean(service.documents.has(meta.parentId)) === false) { - return yield* librarianServiceError("add-child-document", `Parent document not found: ${meta.parentId}`); - } + const parentId = meta.parentId; const id = meta.id; const now = yield* currentEpochSeconds; @@ -939,7 +1039,17 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS time: now, }; - service.documents.set(id, doc); + yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { + if (Boolean(serviceState.documents.has(parentId)) === false) { + return Effect.fail(librarianServiceError("add-child-document", `Parent document not found: ${parentId}`)); + } + const documents = cloneDocuments(serviceState.documents); + documents.set(id, doc); + return Effect.succeed(modifyResult(undefined, { + ...serviceState, + documents, + })); + }); // Store file content if provided if (request.content !== undefined && request.content.length > 0) { @@ -955,7 +1065,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS try: () => service.persist(), catch: (cause) => librarianServiceError("add-child-document-persist", cause), }); - yield* Effect.log(`[LibrarianService] Added child document ${id} (parent: ${meta.parentId})`); + yield* Effect.log(`[LibrarianService] Added child document ${id} (parent: ${parentId})`); return service.documentResponse(doc); }), @@ -991,7 +1101,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS time: now, }; - service.processing.set(id, record); + yield* SynchronizedRef.update(service.state, (serviceState) => { + const processing = cloneProcessing(serviceState.processing); + processing.set(id, record); + return { + ...serviceState, + processing, + }; + }); yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("add-processing-persist", cause), @@ -1015,7 +1132,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS return yield* librarianServiceError("remove-processing", "remove-processing requires processingId"); } - service.processing.delete(id); + yield* SynchronizedRef.update(service.state, (serviceState) => { + const processing = cloneProcessing(serviceState.processing); + processing.delete(id); + return { + ...serviceState, + processing, + }; + }); yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("remove-processing-persist", cause), @@ -1032,8 +1156,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS listProcessing: function(this: LibrarianService, request: LibrarianRequest): LibrarianResponse { const documentId = this.documentId(request); const records: ProcessingMetadata[] = []; + const serviceState = this.state.pipe(stateSnapshot); - for (const proc of this.processing.values()) { + for (const proc of serviceState.processing.values()) { const procDocumentId = proc.documentId ?? proc["document-id"]; if (documentId !== undefined && documentId.length > 0 && procDocumentId !== documentId) { continue; @@ -1066,7 +1191,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const uploadId = yield* randomUuid; const createdAt = yield* currentIsoString; - service.uploads.set(uploadId, { + const session: UploadSession = { id: uploadId, documentMetadata: meta, totalSize, @@ -1075,6 +1200,15 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS createdAt, chunks: new Map(), user: meta.user ?? optionalString(req.user) ?? "default", + }; + + yield* SynchronizedRef.update(service.state, (serviceState) => { + const uploads = cloneUploads(serviceState.uploads); + uploads.set(uploadId, session); + return { + ...serviceState, + uploads, + }; }); return { @@ -1102,7 +1236,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS Effect.gen(function* () { const uploadId = optionalString(service.requestRecord(request)["upload-id"]); if (uploadId === undefined) return yield* librarianServiceError("complete-upload", "complete-upload requires upload-id"); - const session = service.uploads.get(uploadId); + const session = (yield* SynchronizedRef.get(service.state)).uploads.get(uploadId); if (session === undefined) return yield* librarianServiceError("complete-upload", `Upload not found: ${uploadId}`); if (session.chunks.size !== session.totalChunks) { return yield* librarianServiceError("complete-upload", `Upload incomplete: ${session.chunks.size}/${session.totalChunks} chunks received`); @@ -1119,7 +1253,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS }), catch: (cause) => librarianServiceError("complete-upload-add-document", cause), }); - service.uploads.delete(uploadId); + yield* SynchronizedRef.update(service.state, (serviceState) => { + const uploads = cloneUploads(serviceState.uploads); + uploads.delete(uploadId); + return { + ...serviceState, + uploads, + }; + }); const documentId = response.documentMetadata?.id ?? response["document-metadata"]?.id ?? session.documentMetadata.id; return { ...response, @@ -1153,7 +1294,8 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS Effect.gen(function* () { const user = optionalString(service.requestRecord(request).user); const sessions = []; - for (const session of service.uploads.values()) { + const serviceState = yield* SynchronizedRef.get(service.state); + for (const session of serviceState.uploads.values()) { if (user !== undefined && session.user !== user) continue; const documentMetadataJson = yield* encodeJsonString( "list-uploads-document-metadata", @@ -1229,7 +1371,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const sendResponse = (response: CollectionManagementResponse): Effect.Effect => Effect.gen(function* () { - const producer = service.colProducer; + const producer = (yield* SynchronizedRef.get(service.state)).colProducer; if (producer === null) { return yield* librarianServiceError("collection-respond", "Collection producer not started"); } @@ -1266,7 +1408,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS switch (request.operation) { case "list-collections": { const user = request.user ?? ""; - const collections = service.collectionManager.listCollections(user); + const collections = (yield* SynchronizedRef.get(service.state)).collectionManager.listCollections(user); return { collections }; } @@ -1277,13 +1419,19 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const description = request.description ?? ""; const tags = request.tags ?? []; - service.collectionManager.updateCollection(user, collection, name, description, tags); + const collections = yield* SynchronizedRef.modifyEffect(service.state, (serviceState) => { + const collectionManager = cloneCollectionManager(serviceState.collectionManager); + collectionManager.updateCollection(user, collection, name, description, tags); + return Effect.succeed(modifyResult(collectionManager.listCollections(user), { + ...serviceState, + collectionManager, + })); + }); yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("update-collection-persist", cause), }); - const collections = service.collectionManager.listCollections(user); return { collections }; } @@ -1291,7 +1439,14 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const user = request.user ?? ""; const collection = request.collection ?? ""; - service.collectionManager.deleteCollection(user, collection); + yield* SynchronizedRef.update(service.state, (serviceState) => { + const collectionManager = cloneCollectionManager(serviceState.collectionManager); + collectionManager.deleteCollection(user, collection); + return { + ...serviceState, + collectionManager, + }; + }); yield* Effect.tryPromise({ try: () => service.persist(), catch: (cause) => librarianServiceError("delete-collection-persist", cause), @@ -1316,10 +1471,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const service = this; return Effect.runPromise( Effect.gen(function* () { + const serviceState = yield* SynchronizedRef.get(service.state); const data = { - documents: Object.fromEntries(service.documents), - processing: Object.fromEntries(service.processing), - collections: service.collectionManager.toJSON(), + documents: Object.fromEntries(serviceState.documents), + processing: Object.fromEntries(serviceState.processing), + collections: serviceState.collectionManager.toJSON(), }; const json = yield* encodeJsonString("persist-encode", data); @@ -1358,26 +1514,34 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS if (parsed === null) return; - service.documents.clear(); + const documents = new Map(); if (parsed.documents !== undefined) { for (const [id, doc] of Object.entries(parsed.documents)) { - service.documents.set(id, service.publicDocument(doc)); + documents.set(id, service.publicDocument(doc)); } } - service.processing.clear(); + const processing = new Map(); if (parsed.processing !== undefined) { for (const [id, proc] of Object.entries(parsed.processing)) { - service.processing.set(id, service.publicProcessing(proc)); + processing.set(id, service.publicProcessing(proc)); } } + const collectionManager = makeCollectionManager(); if (parsed.collections !== undefined) { - service.collectionManager.loadFromJSON(parsed.collections); + collectionManager.loadFromJSON(parsed.collections); } + yield* SynchronizedRef.update(service.state, (serviceState) => ({ + ...serviceState, + documents, + processing, + collectionManager, + })); + yield* Effect.log( - `[LibrarianService] Loaded persisted state (documents=${service.documents.size}, processing=${service.processing.size})`, + `[LibrarianService] Loaded persisted state (documents=${documents.size}, processing=${processing.size})`, ); }), ); @@ -1390,38 +1554,41 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const service = this; return Effect.runPromise( Effect.gen(function* () { - const libConsumer = service.libConsumer; + const serviceState = yield* SynchronizedRef.get(service.state); + const libConsumer = serviceState.libConsumer; if (libConsumer !== null) { yield* Effect.tryPromise({ try: () => libConsumer.close(), catch: (cause) => librarianServiceError("close-librarian-consumer", cause), }); - service.libConsumer = null; } - const libProducer = service.libProducer; + const libProducer = serviceState.libProducer; if (libProducer !== null) { yield* Effect.tryPromise({ try: () => libProducer.close(), catch: (cause) => librarianServiceError("close-librarian-producer", cause), }); - service.libProducer = null; } - const colConsumer = service.colConsumer; + const colConsumer = serviceState.colConsumer; if (colConsumer !== null) { yield* Effect.tryPromise({ try: () => colConsumer.close(), catch: (cause) => librarianServiceError("close-collection-consumer", cause), }); - service.colConsumer = null; } - const colProducer = service.colProducer; + const colProducer = serviceState.colProducer; if (colProducer !== null) { yield* Effect.tryPromise({ try: () => colProducer.close(), catch: (cause) => librarianServiceError("close-collection-producer", cause), }); - service.colProducer = null; } + yield* updateHandles(service.state, { + libConsumer: null, + libProducer: null, + colConsumer: null, + colProducer: null, + }); yield* Effect.tryPromise({ try: () => baseStop(), catch: (cause) => librarianServiceError("stop", cause),