diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 7591b3e6..aa6e7fc5 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,13 +12,13 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 -FlowManager ref-backed state slice: +Current signal counts from `ts/packages` after the 2026-06-02 Librarian +schema/assertion cleanup slice: | Signal | Count | | --- | ---: | | `Effect.runPromise` | 204 | -| `Map<` | 75 | +| `Map<` | 74 | | `WebSocket` | 47 | | `new Map` | 56 | | `toPromiseRequestor` | 0 | @@ -46,6 +46,8 @@ Notes: snapshot because the FlowManager slice added focused service tests and Promise compatibility facades while removing the service's internal mutable object state. +- The remaining `Record` hit is the librarian service object and + should be removed in the next librarian state migration slice. ## Loop Passes @@ -250,6 +252,36 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Librarian Schema And Assertion Cleanup Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/schema/messages.ts` now models librarian upload and + stream request/response fields directly, instead of requiring service-side + `as LibrarianResponse` casts for the existing wire protocol. + - `ts/packages/flow/src/librarian/service.ts` now decodes persisted + librarian state through a concrete `S.fromJsonString` schema instead of a + generic JSON decode plus `as A`. + - Document metadata `metadata` triples now narrow through Schema decoding + with `Option` before being included in normalized metadata. + - Upload, stream, and complete-upload request/response constructors now rely + on the schema-modeled fields instead of local type assertions. + - New librarian tests cover modeled upload fields, concrete persisted-state + loading, and schema-backed metadata triple normalization. +- Remaining: + - Librarian still has the dynamic `AsyncProcessorRuntime & Record` service object and sync throw helper paths. Keep it as the next P0 + state/ref-backed migration. +- Verification: + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: diff --git a/ts/packages/base/src/schema/messages.ts b/ts/packages/base/src/schema/messages.ts index 355e5dc1..1618ca8c 100644 --- a/ts/packages/base/src/schema/messages.ts +++ b/ts/packages/base/src/schema/messages.ts @@ -315,33 +315,68 @@ export const LibrarianOperation = S.Literals([ ]); export type LibrarianOperation = typeof LibrarianOperation.Type; -export const LibrarianRequest = S.Struct({ - operation: LibrarianOperation, - documentId: S.optionalKey(S.String), - "document-id": S.optionalKey(S.String), - processingId: S.optionalKey(S.String), - "processing-id": S.optionalKey(S.String), - documentMetadata: S.optionalKey(DocumentMetadata), - "document-metadata": S.optionalKey(DocumentMetadata), - processingMetadata: S.optionalKey(ProcessingMetadata), - "processing-metadata": S.optionalKey(ProcessingMetadata), - content: S.optionalKey(S.String), - user: S.optionalKey(S.String), - collection: S.optionalKey(S.String), -}); +export const LibrarianRequest = S.StructWithRest( + S.Struct({ + operation: LibrarianOperation, + documentId: S.optionalKey(S.String), + "document-id": S.optionalKey(S.String), + processingId: S.optionalKey(S.String), + "processing-id": S.optionalKey(S.String), + documentMetadata: S.optionalKey(DocumentMetadata), + "document-metadata": S.optionalKey(DocumentMetadata), + processingMetadata: S.optionalKey(ProcessingMetadata), + "processing-metadata": S.optionalKey(ProcessingMetadata), + content: S.optionalKey(S.String), + user: S.optionalKey(S.String), + collection: S.optionalKey(S.String), + "total-size": S.optionalKey(S.Number), + "chunk-size": S.optionalKey(S.Number), + "upload-id": S.optionalKey(S.String), + "chunk-index": S.optionalKey(S.Number), + }), + [UnknownRecord], +); export type LibrarianRequest = typeof LibrarianRequest.Type; -export const LibrarianResponse = S.Struct({ - error: S.optionalKey(TgError), - documentMetadata: S.optionalKey(DocumentMetadata), - "document-metadata": S.optionalKey(DocumentMetadata), - content: S.optionalKey(S.String), - documents: OptionalMutableArray(DocumentMetadata), - "document-metadatas": OptionalMutableArray(DocumentMetadata), - processing: OptionalMutableArray(ProcessingMetadata), - "processing-metadata": OptionalMutableArray(ProcessingMetadata), - "processing-metadatas": OptionalMutableArray(ProcessingMetadata), +const UploadSessionInfo = S.Struct({ + "upload-id": S.String, + "document-id": S.String, + "document-metadata-json": S.String, + "total-size": S.Number, + "chunk-size": S.Number, + "total-chunks": S.Number, + "chunks-received": S.Number, + "created-at": S.String, }); + +export const LibrarianResponse = S.StructWithRest( + S.Struct({ + error: S.optionalKey(TgError), + documentMetadata: S.optionalKey(DocumentMetadata), + "document-metadata": S.optionalKey(DocumentMetadata), + content: S.optionalKey(S.String), + documents: OptionalMutableArray(DocumentMetadata), + "document-metadatas": OptionalMutableArray(DocumentMetadata), + processing: OptionalMutableArray(ProcessingMetadata), + "processing-metadata": OptionalMutableArray(ProcessingMetadata), + "processing-metadatas": OptionalMutableArray(ProcessingMetadata), + "document-id": S.optionalKey(S.String), + "object-id": S.optionalKey(S.String), + "upload-id": S.optionalKey(S.String), + "chunk-size": S.optionalKey(S.Number), + "chunk-index": S.optionalKey(S.Number), + "total-chunks": S.optionalKey(S.Number), + "chunks-received": S.optionalKey(S.Number), + "bytes-received": S.optionalKey(S.Number), + "total-bytes": S.optionalKey(S.Number), + "upload-state": S.optionalKey(S.String), + "received-chunks": S.optionalKey(NumberArray), + "missing-chunks": S.optionalKey(NumberArray), + "upload-sessions": S.optionalKey(UploadSessionInfo.pipe(S.Array, S.mutable)), + eos: S.optionalKey(S.Boolean), + }), + [UnknownRecord], +); export type LibrarianResponse = typeof LibrarianResponse.Type; // Knowledge core diff --git a/ts/packages/flow/src/__tests__/librarian-service.test.ts b/ts/packages/flow/src/__tests__/librarian-service.test.ts new file mode 100644 index 00000000..6827757d --- /dev/null +++ b/ts/packages/flow/src/__tests__/librarian-service.test.ts @@ -0,0 +1,131 @@ +import {mkdtemp, rm} from "node:fs/promises"; +import {tmpdir} from "node:os"; +import {join} from "node:path"; +import {describe, expect, it} from "vitest"; +import { + type BackendConsumer, + type BackendProducer, + type CreateConsumerOptions, + type CreateProducerOptions, + type DocumentMetadata, + type Message, + type PubSubBackend, + type Triple, +} from "@trustgraph/base"; +import {makeLibrarianService} from "../librarian/service.js"; + +class NoopPubSub implements PubSubBackend { + async createProducer(_options: CreateProducerOptions): Promise> { + return { + send: async () => undefined, + flush: async () => undefined, + close: async () => undefined, + }; + } + + async createConsumer(_options: CreateConsumerOptions): Promise> { + return { + receive: async () => null, + acknowledge: async (_message: Message) => undefined, + negativeAcknowledge: async (_message: Message) => undefined, + unsubscribe: async () => undefined, + close: async () => undefined, + }; + } + + async close(): Promise {} +} + +const sampleTriple: Triple = { + s: {type: "IRI", iri: "https://example.test/doc"}, + p: {type: "IRI", iri: "https://example.test/title"}, + o: {type: "LITERAL", value: "Document"}, +}; + +const sampleDocument: DocumentMetadata = { + id: "doc-a", + time: 1, + kind: "text/plain", + title: "Document A", + comments: "", + user: "alice", + tags: [], +}; + +const makeService = (dataDir: string) => + makeLibrarianService({ + id: "librarian-test", + manageProcessSignals: false, + pubsub: new NoopPubSub(), + dataDir, + }); + +describe("LibrarianService schema-backed boundaries", () => { + it("returns modeled upload fields without response assertions", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-")); + const service = makeService(dir); + + const response = await service.beginUpload({ + operation: "begin-upload", + documentMetadata: sampleDocument, + "document-metadata": sampleDocument, + "total-size": 12, + "chunk-size": 4, + }); + const uploadId = response["upload-id"]; + const status = service.getUploadStatus({ + operation: "get-upload-status", + "upload-id": uploadId, + }); + await rm(dir, {recursive: true, force: true}); + + expect(uploadId).toEqual(expect.any(String)); + expect(response).toMatchObject({ + "chunk-size": 4, + "total-chunks": 3, + }); + expect(status).toMatchObject({ + "upload-id": uploadId, + "upload-state": "in-progress", + "missing-chunks": [0, 1, 2], + }); + }); + + it("loads persisted state through concrete schemas", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-")); + await Bun.write( + join(dir, "librarian-state.json"), + `{"documents":{"doc-a":{"id":"doc-a","time":1,"kind":"text/plain","title":"Document A","comments":"","user":"alice","tags":[]}},"processing":{},"collections":[{"user":"alice","collection":"default","name":"Default","description":"","tags":[]}]}`, + ); + const service = makeService(dir); + + await service.loadFromDisk(); + const documents = service.listDocuments({operation: "list-documents", user: "alice"}).documents; + await rm(dir, {recursive: true, force: true}); + + expect(documents).toEqual([{ + ...sampleDocument, + documentType: "source", + "document-type": "source", + }]); + }); + + it("normalises document metadata through triple schema decoding", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-")); + const service = makeService(dir); + + const valid = await service.normaliseDocumentMetadata({ + ...sampleDocument, + metadata: [sampleTriple], + }); + const invalid = await service.normaliseDocumentMetadata({ + ...sampleDocument, + id: "doc-b", + metadata: [{not: "a triple"}], + }); + await rm(dir, {recursive: true, force: true}); + + expect(valid.metadata).toEqual([sampleTriple]); + expect(invalid.metadata).toBeUndefined(); + }); +}); diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index 51c3f2e2..dfaf6694 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -21,12 +21,15 @@ import { type LibrarianResponse, type CollectionManagementRequest, type CollectionManagementResponse, + DocumentMetadata as DocumentMetadataSchema, type DocumentMetadata, + ProcessingMetadata as ProcessingMetadataSchema, type ProcessingMetadata, + Triple as TripleSchema, } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Random } from "effect"; +import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect"; import * as S from "effect/Schema"; import { makeCollectionManager } from "./collection-manager.js"; import { @@ -54,20 +57,6 @@ interface UploadSession { user: string; } -type PersistedCollection = { - user: string; - collection: string; - name: string; - description: string; - tags: string[]; -}; - -type PersistedLibrarianState = { - documents?: Record; - processing?: Record; - collections?: PersistedCollection[]; -}; - function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } @@ -111,10 +100,30 @@ const encodeJsonString = (operation: string, value: unknown): Effect.Effect librarianServiceError(operation, cause)), ); -const decodeJsonString = (operation: string, value: string): Effect.Effect => - S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( - Effect.map((decoded) => decoded as A), - Effect.mapError((cause) => librarianServiceError(operation, cause)), +const PersistedCollectionSchema = S.Struct({ + user: S.String, + collection: S.String, + name: S.String, + description: S.String, + tags: S.Array(S.String).pipe(S.mutable), +}); + +const PersistedLibrarianStateSchema = S.Struct({ + documents: S.optionalKey(S.Record(S.String, DocumentMetadataSchema)), + processing: S.optionalKey(S.Record(S.String, ProcessingMetadataSchema)), + collections: S.optionalKey(PersistedCollectionSchema.pipe(S.Array, S.mutable)), +}); +const PersistedLibrarianStateJsonSchema = PersistedLibrarianStateSchema.pipe(S.fromJsonString); +type PersistedLibrarianState = typeof PersistedLibrarianStateSchema.Type; + +const DocumentMetadataTriplesSchema = TripleSchema.pipe(S.Array, S.mutable); +const decodeDocumentMetadataTriples = S.decodeUnknownOption(DocumentMetadataTriplesSchema); + +const decodePersistedLibrarianState = ( + raw: string, +): Effect.Effect => + S.decodeUnknownEffect(PersistedLibrarianStateJsonSchema)(raw).pipe( + Effect.mapError((cause) => librarianServiceError("persist-decode", cause)), ); const randomUuid: Effect.Effect = Effect.gen(function* () { @@ -268,7 +277,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS // ---------- Librarian message handling ---------- requestRecord: function(this: LibrarianService, request: LibrarianRequest): Record { - return request as Record; + return request; }, @@ -334,6 +343,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS const parentId = optionalString(value.parentId) ?? optionalString(value["parent-id"]); const documentType = optionalString(value.documentType) ?? optionalString(value["document-type"]) ?? "source"; const time = typeof value.time === "number" ? value.time : yield* currentEpochSeconds; + const metadata = Array.isArray(value.metadata) + ? Option.getOrUndefined(decodeDocumentMetadataTriples(value.metadata)) + : undefined; return { id, time, @@ -345,7 +357,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS ...(parentId !== undefined ? { parentId, "parent-id": parentId } : {}), documentType, "document-type": documentType, - ...(Array.isArray(value.metadata) ? { metadata: value.metadata as NonNullable } : {}), + ...(metadata === undefined ? {} : { metadata }), }; }), ); @@ -563,7 +575,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS case "stream-document": return yield* librarianServiceError("stream-document", "stream-document must be handled as a streaming operation"); default: - return yield* librarianServiceError("operation", `Unknown librarian operation: ${request.operation as string}`); + return yield* librarianServiceError("operation", `Unknown librarian operation: ${String(request.operation)}`); } }), ); @@ -782,7 +794,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } // Verify parent exists - if (!(service.documents as Map).has(meta.parentId)) { + if (Boolean(service.documents.has(meta.parentId)) === false) { return yield* librarianServiceError("add-child-document", `Parent document not found: ${meta.parentId}`); } @@ -949,7 +961,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "upload-id": uploadId, "chunk-size": chunkSize, "total-chunks": totalChunks, - } as LibrarianResponse; + }; }), ); @@ -979,7 +991,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "total-chunks": session.totalChunks, "bytes-received": bytesReceived, "total-bytes": session.totalSize, - } as LibrarianResponse; + }; }, @@ -1005,7 +1017,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "document-metadata": session.documentMetadata, content, user: session.user, - } as LibrarianRequest), + }), catch: (cause) => librarianServiceError("complete-upload-add-document", cause), }); service.uploads.delete(uploadId); @@ -1014,7 +1026,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS ...response, "document-id": documentId, "object-id": documentId, - } as LibrarianResponse; + }; }), ); @@ -1040,7 +1052,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "missing-chunks": missingChunks, "bytes-received": bytesReceived, "total-bytes": session.totalSize, - } as LibrarianResponse; + }; }, @@ -1079,7 +1091,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "created-at": session.createdAt, }); } - return { "upload-sessions": sessions } as LibrarianResponse; + return { "upload-sessions": sessions }; }), ); @@ -1112,7 +1124,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS "chunk-index": index, "total-chunks": totalChunks, eos: index === totalChunks - 1, - } as LibrarianResponse; + }; }); }), ); @@ -1210,7 +1222,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS } default: - return yield* librarianServiceError("collection-operation", `Unknown collection operation: ${request.operation as string}`); + return yield* librarianServiceError("collection-operation", `Unknown collection operation: ${String(request.operation)}`); } }), ); @@ -1256,11 +1268,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS try: () => readTextFile(service.persistPath), catch: (cause) => librarianServiceError("persist-read", cause), }); - return yield* decodeJsonString("persist-decode", raw); + return yield* decodePersistedLibrarianState(raw); }).pipe( Effect.catch(() => Effect.log("[LibrarianService] No persisted state found, starting fresh").pipe( - Effect.as(null as PersistedLibrarianState | null), + Effect.flatMap(() => Effect.succeed(null)), ), ), );