diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 305d94a6..33d43c58 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,19 +12,19 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Librarian tagged -operation helper slice: +Current signal counts from `ts/packages` after the 2026-06-02 Librarian typed +runtime loop slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 209 | -| `Map<` | 74 | +| `Effect.runPromise` | 208 | +| `Map<` | 77 | | `WebSocket` | 47 | | `new Map` | 56 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | -| `while (` | 10 | +| `while (` | 9 | | `new Error` | 14 | | `new Promise` | 10 | | `JSON.parse` | 7 | @@ -46,8 +46,8 @@ Notes: snapshot because the FlowManager slice added focused service tests and Promise compatibility facades while removing the service's internal mutable object state. -- The remaining `Record` hit is the librarian service object and - should be removed in the next librarian state migration slice. +- `Record` and `throwLibrarianServiceError` are now clean in + `ts/packages`. ## Loop Passes @@ -294,9 +294,9 @@ Notes: - The librarian tests now await the Promise compatibility facade for upload status. - Remaining: - - Librarian still has the dynamic service object, mutable maps/handles on - that object, and a raw `while (service.running)` poll loop. That remains - the next P0 state/ref-backed migration. + - The typed runtime loop slice addresses the dynamic service object and raw + poll loop. Librarian mutable maps/handles remain the next P0 ref-backed + state migration. - Verification: - `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts` - `bun run --cwd ts/packages/flow build` @@ -306,6 +306,31 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Librarian Typed Runtime Loop Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/librarian/service.ts` now exposes a typed + `LibrarianService` interface instead of `AsyncProcessorRuntime & + Record`. + - Service construction now uses `makeAsyncProcessor` + with `runEffect`; the old method-bag `run` override and + `as LibrarianService` cast are gone. + - The librarian startup poller now uses `Effect.whileLoop`. + - The local operation helpers retrieve the initialized service through an + Effect gate rather than closing over an unsafe partially built value. +- Remaining: + - Librarian still stores `documents`, `processing`, `uploads`, collection + manager, and producer/consumer handles as mutable fields. Move those into + `SynchronizedRef` next. +- Verification: + - `bun run --cwd ts/packages/flow build` + - `cd ts && bun run check` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -316,10 +341,10 @@ Notes: - MCP env is now Config-backed; continue that policy for future MCP settings. - Flow stateful services: - Config service, KnowledgeCore service, and FlowManager ref-backed state - are complete. Librarian now has native Effect module startup - (`NodeRuntime.runMain` with a `ManagedRuntime` compatibility facade), but - it still has a mutable poller service object. It remains a good candidate - for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, + are complete. Librarian now has native Effect module startup, a typed + service surface, and an `Effect.whileLoop` runtime, but it still stores + service maps and pubsub handles as mutable fields. It remains a good + candidate for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, `Schedule`, and managed persistence. - Persistence IO should move toward `FileSystem` or `KeyValueStore` where the installed beta has the needed provider surface. @@ -346,7 +371,7 @@ Notes: ## Ranked Findings -### P0: Migrate Librarian Stateful Service To Scoped Effect Service +### P0: Migrate Librarian Mutable State To Ref-Backed Effect Service - TrustGraph evidence: - `ts/packages/flow/src/librarian/service.ts` diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index 264c9129..dde1c875 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -16,6 +16,8 @@ import { makeProcessorProgram, type ProcessorConfig, type AsyncProcessorRuntime, + type BackendConsumer, + type BackendProducer, topics, type LibrarianRequest, type LibrarianResponse, @@ -29,9 +31,9 @@ import { } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect"; +import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect"; import * as S from "effect/Schema"; -import { makeCollectionManager } from "./collection-manager.js"; +import { makeCollectionManager, type CollectionManager } from "./collection-manager.js"; import { ensureDirectory, joinPath, @@ -141,64 +143,218 @@ const randomUuid: Effect.Effect = Effect.gen(function* () { ].join("-"); }); -export type LibrarianService = AsyncProcessorRuntime & Record; +export interface LibrarianService extends AsyncProcessorRuntime { + documents: Map; + processing: Map; + uploads: Map; + collectionManager: CollectionManager; + libConsumer: BackendConsumer | null; + libProducer: BackendProducer | null; + colConsumer: BackendConsumer | null; + colProducer: BackendProducer | null; + dataDir: string; + persistPath: string; + requestRecord: (request: LibrarianRequest) => Record; + documentId: (request: LibrarianRequest) => string | undefined; + processingId: (request: LibrarianRequest) => string | undefined; + documentMetadata: (request: LibrarianRequest) => Promise; + processingMetadata: (request: LibrarianRequest) => Promise; + normaliseDocumentMetadata: (value: Record) => Promise; + publicDocument: (doc: DocumentMetadata) => DocumentMetadata; + publicProcessing: (proc: ProcessingMetadata) => ProcessingMetadata; + documentResponse: (doc: DocumentMetadata) => LibrarianResponse; + documentsResponse: (docs: DocumentMetadata[]) => LibrarianResponse; + processingResponse: (records: ProcessingMetadata[]) => LibrarianResponse; + handleLibrarianMessage: (msg: Message) => Promise; + handleLibrarianOperation: (request: LibrarianRequest) => Promise; + addDocument: (request: LibrarianRequest) => Promise; + removeDocument: (request: LibrarianRequest) => Promise; + updateDocument: (request: LibrarianRequest) => Promise; + listDocuments: (request: LibrarianRequest) => LibrarianResponse; + getDocumentMetadata: (request: LibrarianRequest) => Promise; + getDocumentContent: (request: LibrarianRequest) => Promise; + addChildDocument: (request: LibrarianRequest) => Promise; + listChildren: (request: LibrarianRequest) => Promise; + addProcessing: (request: LibrarianRequest) => Promise; + removeProcessing: (request: LibrarianRequest) => Promise; + listProcessing: (request: LibrarianRequest) => LibrarianResponse; + beginUpload: (request: LibrarianRequest) => Promise; + uploadChunk: (request: LibrarianRequest) => Promise; + completeUpload: (request: LibrarianRequest) => Promise; + getUploadStatus: (request: LibrarianRequest) => Promise; + abortUpload: (request: LibrarianRequest) => Promise; + listUploads: (request: LibrarianRequest) => Promise; + streamDocument: (request: LibrarianRequest) => Promise; + handleCollectionMessage: (msg: Message) => Promise; + handleCollectionOperation: (request: CollectionManagementRequest) => Promise; + persist: () => Promise; + loadFromDisk: () => Promise; +} + +const consumeOnceEffect = ( + service: LibrarianService, +): Effect.Effect => + Effect.gen(function* () { + const libConsumer = service.libConsumer; + if (libConsumer === null) { + return yield* librarianServiceError("consume", "Librarian consumer not started"); + } + const colConsumer = service.colConsumer; + if (colConsumer === null) { + return yield* librarianServiceError("consume", "Collection consumer not started"); + } + + const libMsg = yield* Effect.tryPromise({ + try: () => libConsumer.receive(2000), + catch: (cause) => librarianServiceError("librarian-receive", cause), + }); + if (libMsg !== null) { + yield* Effect.tryPromise({ + try: () => service.handleLibrarianMessage(libMsg), + catch: (cause) => librarianServiceError("librarian-handle", cause), + }); + yield* Effect.tryPromise({ + try: () => libConsumer.acknowledge(libMsg), + catch: (cause) => librarianServiceError("librarian-acknowledge", cause), + }); + } + + const colMsg = yield* Effect.tryPromise({ + try: () => colConsumer.receive(2000), + catch: (cause) => librarianServiceError("collection-receive", cause), + }); + if (colMsg !== null) { + yield* Effect.tryPromise({ + try: () => service.handleCollectionMessage(colMsg), + catch: (cause) => librarianServiceError("collection-handle", cause), + }); + yield* Effect.tryPromise({ + try: () => colConsumer.acknowledge(colMsg), + catch: (cause) => librarianServiceError("collection-acknowledge", cause), + }); + } + }); + +const runLibrarianServiceEffect = ( + service: LibrarianService, +): Effect.Effect => + Effect.gen(function* () { + yield* Effect.tryPromise({ + try: () => ensureDirectory(joinPath(service.dataDir, "docs")), + catch: (cause) => librarianServiceError("ensure-data-dir", cause), + }); + + yield* Effect.tryPromise({ + try: () => service.loadFromDisk(), + catch: (cause) => librarianServiceError("load", cause), + }); + + service.libProducer = yield* Effect.tryPromise({ + try: () => service.pubsub.createProducer({ + topic: topics.librarianResponse, + }), + catch: (cause) => librarianServiceError("librarian-producer", cause), + }); + service.colProducer = yield* Effect.tryPromise({ + try: () => service.pubsub.createProducer({ + topic: topics.collectionManagementResponse, + }), + catch: (cause) => librarianServiceError("collection-producer", cause), + }); + + service.libConsumer = yield* Effect.tryPromise({ + try: () => service.pubsub.createConsumer({ + topic: topics.librarianRequest, + subscription: `${service.config.id}-librarian-request`, + }), + catch: (cause) => librarianServiceError("librarian-consumer", cause), + }); + service.colConsumer = yield* Effect.tryPromise({ + try: () => service.pubsub.createConsumer({ + topic: topics.collectionManagementRequest, + subscription: `${service.config.id}-collection-management-request`, + }), + catch: (cause) => librarianServiceError("collection-consumer", cause), + }); + + yield* Effect.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((err) => { + if (!service.running) return Effect.void; + return Effect.logError("[LibrarianService] Error in consume loop", { error: err.message }).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); + }); export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianService { - const service = makeAsyncProcessor(config, { - run: () => service.run(Context.empty()), - }) as LibrarianService; - const baseStop = service.stop; - service.documents = new Map(); - service.processing = new Map(); - service.uploads = new Map(); - service.collectionManager = makeCollectionManager(); - service.libConsumer = null; - service.libProducer = null; - service.colConsumer = null; - service.colProducer = null; - service.dataDir = resolveDataDir(config); - service.persistPath = joinPath(service.dataDir, "librarian-state.json"); + let service: LibrarianService | undefined; + + const getService = Effect.sync(() => service).pipe( + Effect.flatMap((current) => + current === undefined + ? Effect.fail(librarianServiceError("service", "Librarian service not initialized")) + : Effect.succeed(current) + ), + ); + + const base = makeAsyncProcessor(config, { + runEffect: () => getService.pipe(Effect.flatMap(runLibrarianServiceEffect)), + }); + const baseStop = base.stop; + const dataDir = resolveDataDir(config); + const persistPath = joinPath(dataDir, "librarian-state.json"); const getDocumentMetadataEffect = (request: LibrarianRequest): Effect.Effect => Effect.gen(function* () { - const id = service.documentId(request); + const current = yield* getService; + const id = current.documentId(request); if (id === undefined || id.length === 0) { return yield* librarianServiceError("get-document-metadata", "get-document-metadata requires documentId"); } - const doc = service.documents.get(id); + const doc = current.documents.get(id); if (doc === undefined) { return yield* librarianServiceError("get-document-metadata", `Document not found: ${id}`); } - return service.documentResponse(doc); + return current.documentResponse(doc); }); const listChildrenEffect = (request: LibrarianRequest): Effect.Effect => Effect.gen(function* () { - const parentId = service.documentId(request); + const current = yield* getService; + const parentId = current.documentId(request); if (parentId === undefined || parentId.length === 0) { return yield* librarianServiceError("list-children", "list-children requires documentId"); } const children: DocumentMetadata[] = []; - for (const doc of service.documents.values()) { + for (const doc of current.documents.values()) { if (doc.parentId === parentId) { children.push(doc); } } - return service.documentsResponse(children); + return current.documentsResponse(children); }); const uploadChunkEffect = (request: LibrarianRequest): Effect.Effect => Effect.gen(function* () { - const req = service.requestRecord(request); + const current = yield* getService; + const req = current.requestRecord(request); const uploadId = optionalString(req["upload-id"]); if (uploadId === undefined) { return yield* librarianServiceError("upload-chunk", "upload-chunk requires upload-id"); } - const session = service.uploads.get(uploadId); + const session = current.uploads.get(uploadId); if (session === undefined) { return yield* librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`); } @@ -225,11 +381,12 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const getUploadStatusEffect = (request: LibrarianRequest): Effect.Effect => Effect.gen(function* () { - const uploadId = optionalString(service.requestRecord(request)["upload-id"]); + const current = yield* getService; + const uploadId = optionalString(current.requestRecord(request)["upload-id"]); if (uploadId === undefined) { return yield* librarianServiceError("get-upload-status", "get-upload-status requires upload-id"); } - const session = service.uploads.get(uploadId); + const session = current.uploads.get(uploadId); if (session === undefined) { return yield* librarianServiceError("get-upload-status", `Upload not found: ${uploadId}`); } @@ -251,125 +408,26 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const abortUploadEffect = (request: LibrarianRequest): Effect.Effect => Effect.gen(function* () { - const uploadId = optionalString(service.requestRecord(request)["upload-id"]); + const current = yield* getService; + const uploadId = optionalString(current.requestRecord(request)["upload-id"]); if (uploadId === undefined) { return yield* librarianServiceError("abort-upload", "abort-upload requires upload-id"); } - service.uploads.delete(uploadId); + current.uploads.delete(uploadId); return {}; }); - Object.assign(service, { - - - run: function(this: LibrarianService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - // Ensure directories exist - yield* Effect.tryPromise({ - try: () => ensureDirectory(joinPath(service.dataDir, "docs")), - catch: (cause) => librarianServiceError("ensure-data-dir", cause), - }); - - // Load persisted state - yield* Effect.tryPromise({ - try: () => service.loadFromDisk(), - catch: (cause) => librarianServiceError("load", cause), - }); - - // Create producers - service.libProducer = yield* Effect.tryPromise({ - try: () => service.pubsub.createProducer({ - topic: topics.librarianResponse, - }), - catch: (cause) => librarianServiceError("librarian-producer", cause), - }); - service.colProducer = yield* Effect.tryPromise({ - try: () => service.pubsub.createProducer({ - topic: topics.collectionManagementResponse, - }), - catch: (cause) => librarianServiceError("collection-producer", cause), - }); - - // Create consumers - service.libConsumer = yield* Effect.tryPromise({ - try: () => service.pubsub.createConsumer({ - topic: topics.librarianRequest, - subscription: `${service.config.id}-librarian-request`, - }), - catch: (cause) => librarianServiceError("librarian-consumer", cause), - }); - service.colConsumer = yield* Effect.tryPromise({ - try: () => service.pubsub.createConsumer({ - topic: topics.collectionManagementRequest, - subscription: `${service.config.id}-collection-management-request`, - }), - catch: (cause) => librarianServiceError("collection-consumer", cause), - }); - - yield* Effect.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`); - - // Main consume loop -- poll both consumers - while (service.running) { - const shouldContinue = yield* Effect.gen(function* () { - const libConsumer = service.libConsumer; - if (libConsumer === null) { - return yield* librarianServiceError("consume", "Librarian consumer not started"); - } - const colConsumer = service.colConsumer; - if (colConsumer === null) { - return yield* librarianServiceError("consume", "Collection consumer not started"); - } - - const libMsg = yield* Effect.tryPromise({ - try: () => libConsumer.receive(2000), - catch: (cause) => librarianServiceError("librarian-receive", cause), - }); - if (libMsg !== null) { - yield* Effect.tryPromise({ - try: () => service.handleLibrarianMessage(libMsg), - catch: (cause) => librarianServiceError("librarian-handle", cause), - }); - yield* Effect.tryPromise({ - try: () => libConsumer.acknowledge(libMsg), - catch: (cause) => librarianServiceError("librarian-acknowledge", cause), - }); - } - - const colMsg = yield* Effect.tryPromise({ - try: () => colConsumer.receive(2000), - catch: (cause) => librarianServiceError("collection-receive", cause), - }); - if (colMsg !== null) { - yield* Effect.tryPromise({ - try: () => service.handleCollectionMessage(colMsg), - catch: (cause) => librarianServiceError("collection-handle", cause), - }); - yield* Effect.tryPromise({ - try: () => colConsumer.acknowledge(colMsg), - catch: (cause) => librarianServiceError("collection-acknowledge", cause), - }); - } - - return true; - }).pipe( - Effect.catch((err) => { - if (!service.running) return Effect.succeed(false); - return Effect.logError("[LibrarianService] Error in consume loop", { error: err.message }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - Effect.as(true), - ); - }), - ); - if (!shouldContinue) break; - } - }), - ); - - }, - - + const librarianService: LibrarianService = Object.assign(base, { + documents: new Map(), + processing: new Map(), + uploads: new Map(), + collectionManager: makeCollectionManager(), + libConsumer: null, + libProducer: null, + colConsumer: null, + colProducer: null, + dataDir, + persistPath, // ---------- Librarian message handling ---------- @@ -1373,7 +1431,8 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } }); - return service; + service = librarianService; + return librarianService; } export const LibrarianService = makeLibrarianService;