Model librarian upload schema boundaries

This commit is contained in:
elpresidank 2026-06-02 01:34:46 -05:00
parent 3809a38c46
commit 459347ad12
4 changed files with 270 additions and 60 deletions

View file

@ -0,0 +1,131 @@
import {mkdtemp, rm} from "node:fs/promises";
import {tmpdir} from "node:os";
import {join} from "node:path";
import {describe, expect, it} from "vitest";
import {
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
type CreateProducerOptions,
type DocumentMetadata,
type Message,
type PubSubBackend,
type Triple,
} from "@trustgraph/base";
import {makeLibrarianService} from "../librarian/service.js";
class NoopPubSub implements PubSubBackend {
async createProducer<T>(_options: CreateProducerOptions<T>): Promise<BackendProducer<T>> {
return {
send: async () => undefined,
flush: async () => undefined,
close: async () => undefined,
};
}
async createConsumer<T>(_options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
return {
receive: async () => null,
acknowledge: async (_message: Message<T>) => undefined,
negativeAcknowledge: async (_message: Message<T>) => undefined,
unsubscribe: async () => undefined,
close: async () => undefined,
};
}
async close(): Promise<void> {}
}
const sampleTriple: Triple = {
s: {type: "IRI", iri: "https://example.test/doc"},
p: {type: "IRI", iri: "https://example.test/title"},
o: {type: "LITERAL", value: "Document"},
};
const sampleDocument: DocumentMetadata = {
id: "doc-a",
time: 1,
kind: "text/plain",
title: "Document A",
comments: "",
user: "alice",
tags: [],
};
const makeService = (dataDir: string) =>
makeLibrarianService({
id: "librarian-test",
manageProcessSignals: false,
pubsub: new NoopPubSub(),
dataDir,
});
describe("LibrarianService schema-backed boundaries", () => {
it("returns modeled upload fields without response assertions", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
const service = makeService(dir);
const response = await service.beginUpload({
operation: "begin-upload",
documentMetadata: sampleDocument,
"document-metadata": sampleDocument,
"total-size": 12,
"chunk-size": 4,
});
const uploadId = response["upload-id"];
const status = service.getUploadStatus({
operation: "get-upload-status",
"upload-id": uploadId,
});
await rm(dir, {recursive: true, force: true});
expect(uploadId).toEqual(expect.any(String));
expect(response).toMatchObject({
"chunk-size": 4,
"total-chunks": 3,
});
expect(status).toMatchObject({
"upload-id": uploadId,
"upload-state": "in-progress",
"missing-chunks": [0, 1, 2],
});
});
it("loads persisted state through concrete schemas", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
await Bun.write(
join(dir, "librarian-state.json"),
`{"documents":{"doc-a":{"id":"doc-a","time":1,"kind":"text/plain","title":"Document A","comments":"","user":"alice","tags":[]}},"processing":{},"collections":[{"user":"alice","collection":"default","name":"Default","description":"","tags":[]}]}`,
);
const service = makeService(dir);
await service.loadFromDisk();
const documents = service.listDocuments({operation: "list-documents", user: "alice"}).documents;
await rm(dir, {recursive: true, force: true});
expect(documents).toEqual([{
...sampleDocument,
documentType: "source",
"document-type": "source",
}]);
});
it("normalises document metadata through triple schema decoding", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
const service = makeService(dir);
const valid = await service.normaliseDocumentMetadata({
...sampleDocument,
metadata: [sampleTriple],
});
const invalid = await service.normaliseDocumentMetadata({
...sampleDocument,
id: "doc-b",
metadata: [{not: "a triple"}],
});
await rm(dir, {recursive: true, force: true});
expect(valid.metadata).toEqual([sampleTriple]);
expect(invalid.metadata).toBeUndefined();
});
});

View file

@ -21,12 +21,15 @@ import {
type LibrarianResponse,
type CollectionManagementRequest,
type CollectionManagementResponse,
DocumentMetadata as DocumentMetadataSchema,
type DocumentMetadata,
ProcessingMetadata as ProcessingMetadataSchema,
type ProcessingMetadata,
Triple as TripleSchema,
} from "@trustgraph/base";
import type { Message } from "@trustgraph/base";
import { NodeRuntime } from "@effect/platform-node";
import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Random } from "effect";
import { Clock, Config, Context, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random } from "effect";
import * as S from "effect/Schema";
import { makeCollectionManager } from "./collection-manager.js";
import {
@ -54,20 +57,6 @@ interface UploadSession {
user: string;
}
type PersistedCollection = {
user: string;
collection: string;
name: string;
description: string;
tags: string[];
};
type PersistedLibrarianState = {
documents?: Record<string, DocumentMetadata>;
processing?: Record<string, ProcessingMetadata>;
collections?: PersistedCollection[];
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
@ -111,10 +100,30 @@ const encodeJsonString = (operation: string, value: unknown): Effect.Effect<stri
Effect.mapError((cause) => librarianServiceError(operation, cause)),
);
const decodeJsonString = <A>(operation: string, value: string): Effect.Effect<A, LibrarianServiceError> =>
S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe(
Effect.map((decoded) => decoded as A),
Effect.mapError((cause) => librarianServiceError(operation, cause)),
const PersistedCollectionSchema = S.Struct({
user: S.String,
collection: S.String,
name: S.String,
description: S.String,
tags: S.Array(S.String).pipe(S.mutable),
});
const PersistedLibrarianStateSchema = S.Struct({
documents: S.optionalKey(S.Record(S.String, DocumentMetadataSchema)),
processing: S.optionalKey(S.Record(S.String, ProcessingMetadataSchema)),
collections: S.optionalKey(PersistedCollectionSchema.pipe(S.Array, S.mutable)),
});
const PersistedLibrarianStateJsonSchema = PersistedLibrarianStateSchema.pipe(S.fromJsonString);
type PersistedLibrarianState = typeof PersistedLibrarianStateSchema.Type;
const DocumentMetadataTriplesSchema = TripleSchema.pipe(S.Array, S.mutable);
const decodeDocumentMetadataTriples = S.decodeUnknownOption(DocumentMetadataTriplesSchema);
const decodePersistedLibrarianState = (
raw: string,
): Effect.Effect<PersistedLibrarianState, LibrarianServiceError> =>
S.decodeUnknownEffect(PersistedLibrarianStateJsonSchema)(raw).pipe(
Effect.mapError((cause) => librarianServiceError("persist-decode", cause)),
);
const randomUuid: Effect.Effect<string> = Effect.gen(function* () {
@ -268,7 +277,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
// ---------- Librarian message handling ----------
requestRecord: function(this: LibrarianService, request: LibrarianRequest): Record<string, unknown> {
return request as Record<string, unknown>;
return request;
},
@ -334,6 +343,9 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
const parentId = optionalString(value.parentId) ?? optionalString(value["parent-id"]);
const documentType = optionalString(value.documentType) ?? optionalString(value["document-type"]) ?? "source";
const time = typeof value.time === "number" ? value.time : yield* currentEpochSeconds;
const metadata = Array.isArray(value.metadata)
? Option.getOrUndefined(decodeDocumentMetadataTriples(value.metadata))
: undefined;
return {
id,
time,
@ -345,7 +357,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
...(parentId !== undefined ? { parentId, "parent-id": parentId } : {}),
documentType,
"document-type": documentType,
...(Array.isArray(value.metadata) ? { metadata: value.metadata as NonNullable<DocumentMetadata["metadata"]> } : {}),
...(metadata === undefined ? {} : { metadata }),
};
}),
);
@ -563,7 +575,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
case "stream-document":
return yield* librarianServiceError("stream-document", "stream-document must be handled as a streaming operation");
default:
return yield* librarianServiceError("operation", `Unknown librarian operation: ${request.operation as string}`);
return yield* librarianServiceError("operation", `Unknown librarian operation: ${String(request.operation)}`);
}
}),
);
@ -782,7 +794,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
// Verify parent exists
if (!(service.documents as Map<string, DocumentMetadata>).has(meta.parentId)) {
if (Boolean(service.documents.has(meta.parentId)) === false) {
return yield* librarianServiceError("add-child-document", `Parent document not found: ${meta.parentId}`);
}
@ -949,7 +961,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"upload-id": uploadId,
"chunk-size": chunkSize,
"total-chunks": totalChunks,
} as LibrarianResponse;
};
}),
);
@ -979,7 +991,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"total-chunks": session.totalChunks,
"bytes-received": bytesReceived,
"total-bytes": session.totalSize,
} as LibrarianResponse;
};
},
@ -1005,7 +1017,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"document-metadata": session.documentMetadata,
content,
user: session.user,
} as LibrarianRequest),
}),
catch: (cause) => librarianServiceError("complete-upload-add-document", cause),
});
service.uploads.delete(uploadId);
@ -1014,7 +1026,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
...response,
"document-id": documentId,
"object-id": documentId,
} as LibrarianResponse;
};
}),
);
@ -1040,7 +1052,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"missing-chunks": missingChunks,
"bytes-received": bytesReceived,
"total-bytes": session.totalSize,
} as LibrarianResponse;
};
},
@ -1079,7 +1091,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"created-at": session.createdAt,
});
}
return { "upload-sessions": sessions } as LibrarianResponse;
return { "upload-sessions": sessions };
}),
);
@ -1112,7 +1124,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
"chunk-index": index,
"total-chunks": totalChunks,
eos: index === totalChunks - 1,
} as LibrarianResponse;
};
});
}),
);
@ -1210,7 +1222,7 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
}
default:
return yield* librarianServiceError("collection-operation", `Unknown collection operation: ${request.operation as string}`);
return yield* librarianServiceError("collection-operation", `Unknown collection operation: ${String(request.operation)}`);
}
}),
);
@ -1256,11 +1268,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
try: () => readTextFile(service.persistPath),
catch: (cause) => librarianServiceError("persist-read", cause),
});
return yield* decodeJsonString<PersistedLibrarianState>("persist-decode", raw);
return yield* decodePersistedLibrarianState(raw);
}).pipe(
Effect.catch(() =>
Effect.log("[LibrarianService] No persisted state found, starting fresh").pipe(
Effect.as(null as PersistedLibrarianState | null),
Effect.flatMap(() => Effect.succeed<PersistedLibrarianState | null>(null)),
),
),
);