Type librarian service runtime loop

This commit is contained in:
elpresidank 2026-06-02 01:47:15 -05:00
parent e6384e65b9
commit 5f783832e9
2 changed files with 240 additions and 156 deletions

View file

@ -12,19 +12,19 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 Librarian tagged
operation helper slice:
Current signal counts from `ts/packages` after the 2026-06-02 Librarian typed
runtime loop slice:
| Signal | Count |
| --- | ---: |
| `Effect.runPromise` | 209 |
| `Map<` | 74 |
| `Effect.runPromise` | 208 |
| `Map<` | 77 |
| `WebSocket` | 47 |
| `new Map` | 56 |
| `toPromiseRequestor` | 0 |
| `makeAsyncProcessor` | 19 |
| `receive(` | 18 |
| `while (` | 10 |
| `while (` | 9 |
| `new Error` | 14 |
| `new Promise` | 10 |
| `JSON.parse` | 7 |
@ -46,8 +46,8 @@ Notes:
snapshot because the FlowManager slice added focused service tests and
Promise compatibility facades while removing the service's internal mutable
object state.
- The remaining `Record<string, any>` hit is the librarian service object and
should be removed in the next librarian state migration slice.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`.
## Loop Passes
@ -294,9 +294,9 @@ Notes:
- The librarian tests now await the Promise compatibility facade for upload
status.
- Remaining:
- Librarian still has the dynamic service object, mutable maps/handles on
that object, and a raw `while (service.running)` poll loop. That remains
the next P0 state/ref-backed migration.
- The typed runtime loop slice addresses the dynamic service object and raw
poll loop. Librarian mutable maps/handles remain the next P0 ref-backed
state migration.
- Verification:
- `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts`
- `bun run --cwd ts/packages/flow build`
@ -306,6 +306,31 @@ Notes:
- `cd ts && bun run test`
- `git diff --check`
### 2026-06-02: Librarian Typed Runtime Loop Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/flow/src/librarian/service.ts` now exposes a typed
`LibrarianService` interface instead of `AsyncProcessorRuntime &
Record<string, any>`.
- Service construction now uses `makeAsyncProcessor<LibrarianServiceError>`
with `runEffect`; the old method-bag `run` override and
`as LibrarianService` cast are gone.
- The librarian startup poller now uses `Effect.whileLoop`.
- The local operation helpers retrieve the initialized service through an
Effect gate rather than closing over an unsafe partially built value.
- Remaining:
- Librarian still stores `documents`, `processing`, `uploads`, collection
manager, and producer/consumer handles as mutable fields. Move those into
`SynchronizedRef<LibrarianServiceState>` next.
- Verification:
- `bun run --cwd ts/packages/flow build`
- `cd ts && bun run check`
- `bun run --cwd ts/packages/flow test`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
## Subagent Findings To Preserve
- MCP/workbench:
@ -316,10 +341,10 @@ Notes:
- MCP env is now Config-backed; continue that policy for future MCP settings.
- Flow stateful services:
- Config service, KnowledgeCore service, and FlowManager ref-backed state
are complete. Librarian now has native Effect module startup
(`NodeRuntime.runMain` with a `ManagedRuntime` compatibility facade), but
it still has a mutable poller service object. It remains a good candidate
for `Context` services, scoped layers, `Ref`/`SynchronizedRef`,
are complete. Librarian now has native Effect module startup, a typed
service surface, and an `Effect.whileLoop` runtime, but it still stores
service maps and pubsub handles as mutable fields. It remains a good
candidate for `Context` services, scoped layers, `Ref`/`SynchronizedRef`,
`Schedule`, and managed persistence.
- Persistence IO should move toward `FileSystem` or `KeyValueStore` where
the installed beta has the needed provider surface.
@ -346,7 +371,7 @@ Notes:
## Ranked Findings
### P0: Migrate Librarian Stateful Service To Scoped Effect Service
### P0: Migrate Librarian Mutable State To Ref-Backed Effect Service
- TrustGraph evidence:
- `ts/packages/flow/src/librarian/service.ts`

View file

@ -16,6 +16,8 @@ import {
makeProcessorProgram,
type ProcessorConfig,
type AsyncProcessorRuntime,
type BackendConsumer,
type BackendProducer,
topics,
type LibrarianRequest,
type LibrarianResponse,
@ -29,9 +31,9 @@ import {
} from "@trustgraph/base";
import type { Message } from "@trustgraph/base";
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect";
import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect";
import * as S from "effect/Schema";
import { makeCollectionManager } from "./collection-manager.js";
import { makeCollectionManager, type CollectionManager } from "./collection-manager.js";
import {
ensureDirectory,
joinPath,
@ -141,64 +143,218 @@ const randomUuid: Effect.Effect<string> = Effect.gen(function* () {
].join("-");
});
export type LibrarianService = AsyncProcessorRuntime & Record<string, any>;
export interface LibrarianService extends AsyncProcessorRuntime<LibrarianServiceError> {
documents: Map<string, DocumentMetadata>;
processing: Map<string, ProcessingMetadata>;
uploads: Map<string, UploadSession>;
collectionManager: CollectionManager;
libConsumer: BackendConsumer<LibrarianRequest> | null;
libProducer: BackendProducer<LibrarianResponse> | null;
colConsumer: BackendConsumer<CollectionManagementRequest> | null;
colProducer: BackendProducer<CollectionManagementResponse> | null;
dataDir: string;
persistPath: string;
requestRecord: (request: LibrarianRequest) => Record<string, unknown>;
documentId: (request: LibrarianRequest) => string | undefined;
processingId: (request: LibrarianRequest) => string | undefined;
documentMetadata: (request: LibrarianRequest) => Promise<DocumentMetadata | undefined>;
processingMetadata: (request: LibrarianRequest) => Promise<ProcessingMetadata | undefined>;
normaliseDocumentMetadata: (value: Record<string, unknown>) => Promise<DocumentMetadata>;
publicDocument: (doc: DocumentMetadata) => DocumentMetadata;
publicProcessing: (proc: ProcessingMetadata) => ProcessingMetadata;
documentResponse: (doc: DocumentMetadata) => LibrarianResponse;
documentsResponse: (docs: DocumentMetadata[]) => LibrarianResponse;
processingResponse: (records: ProcessingMetadata[]) => LibrarianResponse;
handleLibrarianMessage: (msg: Message<LibrarianRequest>) => Promise<void>;
handleLibrarianOperation: (request: LibrarianRequest) => Promise<LibrarianResponse>;
addDocument: (request: LibrarianRequest) => Promise<LibrarianResponse>;
removeDocument: (request: LibrarianRequest) => Promise<LibrarianResponse>;
updateDocument: (request: LibrarianRequest) => Promise<LibrarianResponse>;
listDocuments: (request: LibrarianRequest) => LibrarianResponse;
getDocumentMetadata: (request: LibrarianRequest) => Promise<LibrarianResponse>;
getDocumentContent: (request: LibrarianRequest) => Promise<LibrarianResponse>;
addChildDocument: (request: LibrarianRequest) => Promise<LibrarianResponse>;
listChildren: (request: LibrarianRequest) => Promise<LibrarianResponse>;
addProcessing: (request: LibrarianRequest) => Promise<LibrarianResponse>;
removeProcessing: (request: LibrarianRequest) => Promise<LibrarianResponse>;
listProcessing: (request: LibrarianRequest) => LibrarianResponse;
beginUpload: (request: LibrarianRequest) => Promise<LibrarianResponse>;
uploadChunk: (request: LibrarianRequest) => Promise<LibrarianResponse>;
completeUpload: (request: LibrarianRequest) => Promise<LibrarianResponse>;
getUploadStatus: (request: LibrarianRequest) => Promise<LibrarianResponse>;
abortUpload: (request: LibrarianRequest) => Promise<LibrarianResponse>;
listUploads: (request: LibrarianRequest) => Promise<LibrarianResponse>;
streamDocument: (request: LibrarianRequest) => Promise<LibrarianResponse[]>;
handleCollectionMessage: (msg: Message<CollectionManagementRequest>) => Promise<void>;
handleCollectionOperation: (request: CollectionManagementRequest) => Promise<CollectionManagementResponse>;
persist: () => Promise<void>;
loadFromDisk: () => Promise<void>;
}
const consumeOnceEffect = (
service: LibrarianService,
): Effect.Effect<void, LibrarianServiceError> =>
Effect.gen(function* () {
const libConsumer = service.libConsumer;
if (libConsumer === null) {
return yield* librarianServiceError("consume", "Librarian consumer not started");
}
const colConsumer = service.colConsumer;
if (colConsumer === null) {
return yield* librarianServiceError("consume", "Collection consumer not started");
}
const libMsg = yield* Effect.tryPromise({
try: () => libConsumer.receive(2000),
catch: (cause) => librarianServiceError("librarian-receive", cause),
});
if (libMsg !== null) {
yield* Effect.tryPromise({
try: () => service.handleLibrarianMessage(libMsg),
catch: (cause) => librarianServiceError("librarian-handle", cause),
});
yield* Effect.tryPromise({
try: () => libConsumer.acknowledge(libMsg),
catch: (cause) => librarianServiceError("librarian-acknowledge", cause),
});
}
const colMsg = yield* Effect.tryPromise({
try: () => colConsumer.receive(2000),
catch: (cause) => librarianServiceError("collection-receive", cause),
});
if (colMsg !== null) {
yield* Effect.tryPromise({
try: () => service.handleCollectionMessage(colMsg),
catch: (cause) => librarianServiceError("collection-handle", cause),
});
yield* Effect.tryPromise({
try: () => colConsumer.acknowledge(colMsg),
catch: (cause) => librarianServiceError("collection-acknowledge", cause),
});
}
});
const runLibrarianServiceEffect = (
service: LibrarianService,
): Effect.Effect<void, LibrarianServiceError> =>
Effect.gen(function* () {
yield* Effect.tryPromise({
try: () => ensureDirectory(joinPath(service.dataDir, "docs")),
catch: (cause) => librarianServiceError("ensure-data-dir", cause),
});
yield* Effect.tryPromise({
try: () => service.loadFromDisk(),
catch: (cause) => librarianServiceError("load", cause),
});
service.libProducer = yield* Effect.tryPromise({
try: () => service.pubsub.createProducer<LibrarianResponse>({
topic: topics.librarianResponse,
}),
catch: (cause) => librarianServiceError("librarian-producer", cause),
});
service.colProducer = yield* Effect.tryPromise({
try: () => service.pubsub.createProducer<CollectionManagementResponse>({
topic: topics.collectionManagementResponse,
}),
catch: (cause) => librarianServiceError("collection-producer", cause),
});
service.libConsumer = yield* Effect.tryPromise({
try: () => service.pubsub.createConsumer<LibrarianRequest>({
topic: topics.librarianRequest,
subscription: `${service.config.id}-librarian-request`,
}),
catch: (cause) => librarianServiceError("librarian-consumer", cause),
});
service.colConsumer = yield* Effect.tryPromise({
try: () => service.pubsub.createConsumer<CollectionManagementRequest>({
topic: topics.collectionManagementRequest,
subscription: `${service.config.id}-collection-management-request`,
}),
catch: (cause) => librarianServiceError("collection-consumer", cause),
});
yield* Effect.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`);
yield* Effect.whileLoop({
while: () => service.running,
body: () =>
consumeOnceEffect(service).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.void;
return Effect.logError("[LibrarianService] Error in consume loop", { error: err.message }).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
});
export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianService {
const service = makeAsyncProcessor(config, {
run: () => service.run(Context.empty()),
}) as LibrarianService;
const baseStop = service.stop;
service.documents = new Map<string, DocumentMetadata>();
service.processing = new Map<string, ProcessingMetadata>();
service.uploads = new Map<string, UploadSession>();
service.collectionManager = makeCollectionManager();
service.libConsumer = null;
service.libProducer = null;
service.colConsumer = null;
service.colProducer = null;
service.dataDir = resolveDataDir(config);
service.persistPath = joinPath(service.dataDir, "librarian-state.json");
let service: LibrarianService | undefined;
const getService = Effect.sync(() => service).pipe(
Effect.flatMap((current) =>
current === undefined
? Effect.fail(librarianServiceError("service", "Librarian service not initialized"))
: Effect.succeed(current)
),
);
const base = makeAsyncProcessor<LibrarianServiceError>(config, {
runEffect: () => getService.pipe(Effect.flatMap(runLibrarianServiceEffect)),
});
const baseStop = base.stop;
const dataDir = resolveDataDir(config);
const persistPath = joinPath(dataDir, "librarian-state.json");
const getDocumentMetadataEffect = (request: LibrarianRequest): Effect.Effect<LibrarianResponse, LibrarianServiceError> =>
Effect.gen(function* () {
const id = service.documentId(request);
const current = yield* getService;
const id = current.documentId(request);
if (id === undefined || id.length === 0) {
return yield* librarianServiceError("get-document-metadata", "get-document-metadata requires documentId");
}
const doc = service.documents.get(id);
const doc = current.documents.get(id);
if (doc === undefined) {
return yield* librarianServiceError("get-document-metadata", `Document not found: ${id}`);
}
return service.documentResponse(doc);
return current.documentResponse(doc);
});
const listChildrenEffect = (request: LibrarianRequest): Effect.Effect<LibrarianResponse, LibrarianServiceError> =>
Effect.gen(function* () {
const parentId = service.documentId(request);
const current = yield* getService;
const parentId = current.documentId(request);
if (parentId === undefined || parentId.length === 0) {
return yield* librarianServiceError("list-children", "list-children requires documentId");
}
const children: DocumentMetadata[] = [];
for (const doc of service.documents.values()) {
for (const doc of current.documents.values()) {
if (doc.parentId === parentId) {
children.push(doc);
}
}
return service.documentsResponse(children);
return current.documentsResponse(children);
});
const uploadChunkEffect = (request: LibrarianRequest): Effect.Effect<LibrarianResponse, LibrarianServiceError> =>
Effect.gen(function* () {
const req = service.requestRecord(request);
const current = yield* getService;
const req = current.requestRecord(request);
const uploadId = optionalString(req["upload-id"]);
if (uploadId === undefined) {
return yield* librarianServiceError("upload-chunk", "upload-chunk requires upload-id");
}
const session = service.uploads.get(uploadId);
const session = current.uploads.get(uploadId);
if (session === undefined) {
return yield* librarianServiceError("upload-chunk", `Upload not found: ${uploadId}`);
}
@ -225,11 +381,12 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const getUploadStatusEffect = (request: LibrarianRequest): Effect.Effect<LibrarianResponse, LibrarianServiceError> =>
Effect.gen(function* () {
const uploadId = optionalString(service.requestRecord(request)["upload-id"]);
const current = yield* getService;
const uploadId = optionalString(current.requestRecord(request)["upload-id"]);
if (uploadId === undefined) {
return yield* librarianServiceError("get-upload-status", "get-upload-status requires upload-id");
}
const session = service.uploads.get(uploadId);
const session = current.uploads.get(uploadId);
if (session === undefined) {
return yield* librarianServiceError("get-upload-status", `Upload not found: ${uploadId}`);
}
@ -251,125 +408,26 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const abortUploadEffect = (request: LibrarianRequest): Effect.Effect<LibrarianResponse, LibrarianServiceError> =>
Effect.gen(function* () {
const uploadId = optionalString(service.requestRecord(request)["upload-id"]);
const current = yield* getService;
const uploadId = optionalString(current.requestRecord(request)["upload-id"]);
if (uploadId === undefined) {
return yield* librarianServiceError("abort-upload", "abort-upload requires upload-id");
}
service.uploads.delete(uploadId);
current.uploads.delete(uploadId);
return {};
});
Object.assign(service, {
run: function(this: LibrarianService): Promise<void> {
const service = this;
return Effect.runPromise(
Effect.gen(function* () {
// Ensure directories exist
yield* Effect.tryPromise({
try: () => ensureDirectory(joinPath(service.dataDir, "docs")),
catch: (cause) => librarianServiceError("ensure-data-dir", cause),
});
// Load persisted state
yield* Effect.tryPromise({
try: () => service.loadFromDisk(),
catch: (cause) => librarianServiceError("load", cause),
});
// Create producers
service.libProducer = yield* Effect.tryPromise({
try: () => service.pubsub.createProducer<LibrarianResponse>({
topic: topics.librarianResponse,
}),
catch: (cause) => librarianServiceError("librarian-producer", cause),
});
service.colProducer = yield* Effect.tryPromise({
try: () => service.pubsub.createProducer<CollectionManagementResponse>({
topic: topics.collectionManagementResponse,
}),
catch: (cause) => librarianServiceError("collection-producer", cause),
});
// Create consumers
service.libConsumer = yield* Effect.tryPromise({
try: () => service.pubsub.createConsumer<LibrarianRequest>({
topic: topics.librarianRequest,
subscription: `${service.config.id}-librarian-request`,
}),
catch: (cause) => librarianServiceError("librarian-consumer", cause),
});
service.colConsumer = yield* Effect.tryPromise({
try: () => service.pubsub.createConsumer<CollectionManagementRequest>({
topic: topics.collectionManagementRequest,
subscription: `${service.config.id}-collection-management-request`,
}),
catch: (cause) => librarianServiceError("collection-consumer", cause),
});
yield* Effect.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`);
// Main consume loop -- poll both consumers
while (service.running) {
const shouldContinue = yield* Effect.gen(function* () {
const libConsumer = service.libConsumer;
if (libConsumer === null) {
return yield* librarianServiceError("consume", "Librarian consumer not started");
}
const colConsumer = service.colConsumer;
if (colConsumer === null) {
return yield* librarianServiceError("consume", "Collection consumer not started");
}
const libMsg = yield* Effect.tryPromise({
try: () => libConsumer.receive(2000),
catch: (cause) => librarianServiceError("librarian-receive", cause),
});
if (libMsg !== null) {
yield* Effect.tryPromise({
try: () => service.handleLibrarianMessage(libMsg),
catch: (cause) => librarianServiceError("librarian-handle", cause),
});
yield* Effect.tryPromise({
try: () => libConsumer.acknowledge(libMsg),
catch: (cause) => librarianServiceError("librarian-acknowledge", cause),
});
}
const colMsg = yield* Effect.tryPromise({
try: () => colConsumer.receive(2000),
catch: (cause) => librarianServiceError("collection-receive", cause),
});
if (colMsg !== null) {
yield* Effect.tryPromise({
try: () => service.handleCollectionMessage(colMsg),
catch: (cause) => librarianServiceError("collection-handle", cause),
});
yield* Effect.tryPromise({
try: () => colConsumer.acknowledge(colMsg),
catch: (cause) => librarianServiceError("collection-acknowledge", cause),
});
}
return true;
}).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.succeed(false);
return Effect.logError("[LibrarianService] Error in consume loop", { error: err.message }).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
Effect.as(true),
);
}),
);
if (!shouldContinue) break;
}
}),
);
},
const librarianService: LibrarianService = Object.assign(base, {
documents: new Map<string, DocumentMetadata>(),
processing: new Map<string, ProcessingMetadata>(),
uploads: new Map<string, UploadSession>(),
collectionManager: makeCollectionManager(),
libConsumer: null,
libProducer: null,
colConsumer: null,
colProducer: null,
dataDir,
persistPath,
// ---------- Librarian message handling ----------
@ -1373,7 +1431,8 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
});
return service;
service = librarianService;
return librarianService;
}
export const LibrarianService = makeLibrarianService;