mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-30 17:09:38 +02:00
Use Match for librarian operation dispatch
This commit is contained in:
parent
8d5edfae9a
commit
213222bb42
3 changed files with 229 additions and 94 deletions
|
|
@ -365,6 +365,25 @@ Notes:
|
|||
- `bun run --cwd ts/packages/flow test -- src/__tests__/config-service.test.ts`
|
||||
- `cd ts && bun run check:tsgo`
|
||||
|
||||
### 2026-06-02: Librarian Operation Match Slice
|
||||
|
||||
- Status: migrated and package-verified.
|
||||
- Completed:
|
||||
- `ts/packages/flow/src/librarian/service.ts` now dispatches librarian and
|
||||
collection-management operations with `effect/Match` instead of native
|
||||
`switch` statements.
|
||||
- Both dispatchers intentionally use `Match.orElse` rather than
|
||||
`Match.exhaustive` because raw broker message values can still contain
|
||||
unknown runtime operations before a schema boundary rejects them.
|
||||
- Existing tagged `LibrarianServiceError` operation labels are preserved for
|
||||
promise, sync, Effect-helper, stream-only, and unknown-operation branches.
|
||||
- Librarian-service tests now cover representative Match-backed librarian
|
||||
dispatch paths, collection list/update/delete dispatch, and runtime
|
||||
fallback errors without type assertions.
|
||||
- Verification:
|
||||
- `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts`
|
||||
- `cd ts && bun run check:tsgo`
|
||||
|
||||
### 2026-06-02: RAG And Agent Requestor Bridge Slice
|
||||
|
||||
- Status: migrated, root-verified, committed, and pushed.
|
||||
|
|
@ -1812,8 +1831,8 @@ Notes:
|
|||
`Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a
|
||||
focused scan before treating them as valid migration targets.
|
||||
- ConfigService and KnowledgeCore operation dispatch now use `effect/Match`
|
||||
with `Match.exhaustive`; remaining service operation switches are in
|
||||
librarian surfaces.
|
||||
with `Match.exhaustive`; librarian operation dispatch now uses
|
||||
`effect/Match` with runtime-preserving `Match.orElse` fallbacks.
|
||||
- Long-lived `Map` / `Set` state in ref-backed services can move toward
|
||||
Effect collections later; local pure traversal maps/sets remain no-ops.
|
||||
|
||||
|
|
|
|||
|
|
@ -61,6 +61,109 @@ const makeService = (dataDir: string) =>
|
|||
});
|
||||
|
||||
describe("LibrarianService schema-backed boundaries", () => {
|
||||
it("dispatches librarian operations through the Match-backed handler", async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
|
||||
const service = makeService(dir);
|
||||
|
||||
try {
|
||||
await expect(service.handleLibrarianOperation({
|
||||
operation: "list-documents",
|
||||
user: "alice",
|
||||
})).resolves.toEqual({
|
||||
documents: [],
|
||||
"document-metadatas": [],
|
||||
});
|
||||
|
||||
const upload = await service.handleLibrarianOperation({
|
||||
operation: "begin-upload",
|
||||
documentMetadata: sampleDocument,
|
||||
"document-metadata": sampleDocument,
|
||||
"total-size": 12,
|
||||
"chunk-size": 4,
|
||||
});
|
||||
await expect(service.handleLibrarianOperation({
|
||||
operation: "get-upload-status",
|
||||
"upload-id": upload["upload-id"],
|
||||
})).resolves.toMatchObject({
|
||||
"upload-id": upload["upload-id"],
|
||||
"upload-state": "in-progress",
|
||||
"missing-chunks": [0, 1, 2],
|
||||
});
|
||||
|
||||
await expect(service.handleLibrarianOperation({
|
||||
operation: "stream-document",
|
||||
"document-id": "doc-a",
|
||||
})).rejects.toMatchObject({
|
||||
_tag: "LibrarianServiceError",
|
||||
operation: "stream-document",
|
||||
message: "stream-document must be handled as a streaming operation",
|
||||
});
|
||||
|
||||
await expect(service.handleLibrarianOperation(JSON.parse(`{"operation":"unknown-librarian"}`))).rejects.toMatchObject({
|
||||
_tag: "LibrarianServiceError",
|
||||
operation: "operation",
|
||||
message: "Unknown librarian operation: unknown-librarian",
|
||||
});
|
||||
} finally {
|
||||
await rm(dir, {recursive: true, force: true});
|
||||
}
|
||||
});
|
||||
|
||||
it("dispatches collection operations through the Match-backed handler", async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
|
||||
const service = makeService(dir);
|
||||
|
||||
try {
|
||||
await expect(service.handleCollectionOperation({
|
||||
operation: "update-collection",
|
||||
user: "alice",
|
||||
collection: "docs",
|
||||
name: "Docs",
|
||||
description: "Documentation",
|
||||
tags: ["reference"],
|
||||
})).resolves.toEqual({
|
||||
collections: [{
|
||||
user: "alice",
|
||||
collection: "docs",
|
||||
name: "Docs",
|
||||
description: "Documentation",
|
||||
tags: ["reference"],
|
||||
}],
|
||||
});
|
||||
|
||||
await expect(service.handleCollectionOperation({
|
||||
operation: "list-collections",
|
||||
user: "alice",
|
||||
})).resolves.toEqual({
|
||||
collections: [{
|
||||
user: "alice",
|
||||
collection: "docs",
|
||||
name: "Docs",
|
||||
description: "Documentation",
|
||||
tags: ["reference"],
|
||||
}],
|
||||
});
|
||||
|
||||
await expect(service.handleCollectionOperation({
|
||||
operation: "delete-collection",
|
||||
user: "alice",
|
||||
collection: "docs",
|
||||
})).resolves.toEqual({});
|
||||
await expect(service.handleCollectionOperation({
|
||||
operation: "list-collections",
|
||||
user: "alice",
|
||||
})).resolves.toEqual({collections: []});
|
||||
|
||||
await expect(service.handleCollectionOperation(JSON.parse(`{"operation":"unknown-collection"}`))).rejects.toMatchObject({
|
||||
_tag: "LibrarianServiceError",
|
||||
operation: "collection-operation",
|
||||
message: "Unknown collection operation: unknown-collection",
|
||||
});
|
||||
} finally {
|
||||
await rm(dir, {recursive: true, force: true});
|
||||
}
|
||||
});
|
||||
|
||||
it("returns modeled upload fields without response assertions", async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), "trustgraph-librarian-service-"));
|
||||
const service = makeService(dir);
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import {
|
|||
} from "@trustgraph/base";
|
||||
import type { Message } from "@trustgraph/base";
|
||||
import { NodeRuntime } from "@effect/platform-node";
|
||||
import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Option, Random, SynchronizedRef } from "effect";
|
||||
import { Clock, Config, DateTime, Duration, Effect, Layer, ManagedRuntime, Match, Option, Random, SynchronizedRef } from "effect";
|
||||
import * as S from "effect/Schema";
|
||||
import { makeCollectionManager, type CollectionManager } from "./collection-manager.js";
|
||||
import {
|
||||
|
|
@ -718,84 +718,93 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
|
|||
handleLibrarianOperation: function(this: LibrarianService, request: LibrarianRequest): Promise<LibrarianResponse> {
|
||||
const service = this;
|
||||
return Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
switch (request.operation) {
|
||||
case "add-document":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.addDocument(request),
|
||||
catch: (cause) => librarianServiceError("add-document", cause),
|
||||
});
|
||||
case "remove-document":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.removeDocument(request),
|
||||
catch: (cause) => librarianServiceError("remove-document", cause),
|
||||
});
|
||||
case "update-document":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.updateDocument(request),
|
||||
catch: (cause) => librarianServiceError("update-document", cause),
|
||||
});
|
||||
case "list-documents":
|
||||
return yield* Effect.try({
|
||||
try: () => service.listDocuments(request),
|
||||
catch: (cause) => librarianServiceError("list-documents", cause),
|
||||
});
|
||||
case "get-document-metadata":
|
||||
return yield* getDocumentMetadataEffect(request);
|
||||
case "get-document-content":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.getDocumentContent(request),
|
||||
catch: (cause) => librarianServiceError("get-document-content", cause),
|
||||
});
|
||||
case "add-child-document":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.addChildDocument(request),
|
||||
catch: (cause) => librarianServiceError("add-child-document", cause),
|
||||
});
|
||||
case "list-children":
|
||||
return yield* listChildrenEffect(request);
|
||||
case "add-processing":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.addProcessing(request),
|
||||
catch: (cause) => librarianServiceError("add-processing", cause),
|
||||
});
|
||||
case "remove-processing":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.removeProcessing(request),
|
||||
catch: (cause) => librarianServiceError("remove-processing", cause),
|
||||
});
|
||||
case "list-processing":
|
||||
return yield* Effect.try({
|
||||
try: () => service.listProcessing(request),
|
||||
catch: (cause) => librarianServiceError("list-processing", cause),
|
||||
});
|
||||
case "begin-upload":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.beginUpload(request),
|
||||
catch: (cause) => librarianServiceError("begin-upload", cause),
|
||||
});
|
||||
case "upload-chunk":
|
||||
return yield* uploadChunkEffect(request);
|
||||
case "complete-upload":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.completeUpload(request),
|
||||
catch: (cause) => librarianServiceError("complete-upload", cause),
|
||||
});
|
||||
case "get-upload-status":
|
||||
return yield* getUploadStatusEffect(request);
|
||||
case "abort-upload":
|
||||
return yield* abortUploadEffect(request);
|
||||
case "list-uploads":
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => service.listUploads(request),
|
||||
catch: (cause) => librarianServiceError("list-uploads", cause),
|
||||
});
|
||||
case "stream-document":
|
||||
return yield* librarianServiceError("stream-document", "stream-document must be handled as a streaming operation");
|
||||
default:
|
||||
return yield* librarianServiceError("operation", `Unknown librarian operation: ${String(request.operation)}`);
|
||||
}
|
||||
}),
|
||||
Match.value(request.operation).pipe(
|
||||
Match.when("add-document", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.addDocument(request),
|
||||
catch: (cause) => librarianServiceError("add-document", cause),
|
||||
})
|
||||
),
|
||||
Match.when("remove-document", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.removeDocument(request),
|
||||
catch: (cause) => librarianServiceError("remove-document", cause),
|
||||
})
|
||||
),
|
||||
Match.when("update-document", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.updateDocument(request),
|
||||
catch: (cause) => librarianServiceError("update-document", cause),
|
||||
})
|
||||
),
|
||||
Match.when("list-documents", () =>
|
||||
Effect.try({
|
||||
try: () => service.listDocuments(request),
|
||||
catch: (cause) => librarianServiceError("list-documents", cause),
|
||||
})
|
||||
),
|
||||
Match.when("get-document-metadata", () => getDocumentMetadataEffect(request)),
|
||||
Match.when("get-document-content", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.getDocumentContent(request),
|
||||
catch: (cause) => librarianServiceError("get-document-content", cause),
|
||||
})
|
||||
),
|
||||
Match.when("add-child-document", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.addChildDocument(request),
|
||||
catch: (cause) => librarianServiceError("add-child-document", cause),
|
||||
})
|
||||
),
|
||||
Match.when("list-children", () => listChildrenEffect(request)),
|
||||
Match.when("add-processing", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.addProcessing(request),
|
||||
catch: (cause) => librarianServiceError("add-processing", cause),
|
||||
})
|
||||
),
|
||||
Match.when("remove-processing", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.removeProcessing(request),
|
||||
catch: (cause) => librarianServiceError("remove-processing", cause),
|
||||
})
|
||||
),
|
||||
Match.when("list-processing", () =>
|
||||
Effect.try({
|
||||
try: () => service.listProcessing(request),
|
||||
catch: (cause) => librarianServiceError("list-processing", cause),
|
||||
})
|
||||
),
|
||||
Match.when("begin-upload", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.beginUpload(request),
|
||||
catch: (cause) => librarianServiceError("begin-upload", cause),
|
||||
})
|
||||
),
|
||||
Match.when("upload-chunk", () => uploadChunkEffect(request)),
|
||||
Match.when("complete-upload", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.completeUpload(request),
|
||||
catch: (cause) => librarianServiceError("complete-upload", cause),
|
||||
})
|
||||
),
|
||||
Match.when("get-upload-status", () => getUploadStatusEffect(request)),
|
||||
Match.when("abort-upload", () => abortUploadEffect(request)),
|
||||
Match.when("list-uploads", () =>
|
||||
Effect.tryPromise({
|
||||
try: () => service.listUploads(request),
|
||||
catch: (cause) => librarianServiceError("list-uploads", cause),
|
||||
})
|
||||
),
|
||||
Match.when("stream-document", () =>
|
||||
Effect.fail(
|
||||
librarianServiceError("stream-document", "stream-document must be handled as a streaming operation"),
|
||||
)
|
||||
),
|
||||
Match.orElse((operation) =>
|
||||
Effect.fail(librarianServiceError("operation", `Unknown librarian operation: ${String(operation)}`))
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
},
|
||||
|
|
@ -1404,15 +1413,17 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
|
|||
handleCollectionOperation: function(this: LibrarianService, request: CollectionManagementRequest): Promise<CollectionManagementResponse> {
|
||||
const service = this;
|
||||
return Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
switch (request.operation) {
|
||||
case "list-collections": {
|
||||
Match.value(request.operation).pipe(
|
||||
Match.when("list-collections", () =>
|
||||
Effect.gen(function* () {
|
||||
const user = request.user ?? "";
|
||||
const collections = (yield* SynchronizedRef.get(service.state)).collectionManager.listCollections(user);
|
||||
return { collections };
|
||||
}
|
||||
})
|
||||
),
|
||||
|
||||
case "update-collection": {
|
||||
Match.when("update-collection", () =>
|
||||
Effect.gen(function* () {
|
||||
const user = request.user ?? "";
|
||||
const collection = request.collection ?? "";
|
||||
const name = request.name ?? collection;
|
||||
|
|
@ -1433,9 +1444,11 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
|
|||
});
|
||||
|
||||
return { collections };
|
||||
}
|
||||
})
|
||||
),
|
||||
|
||||
case "delete-collection": {
|
||||
Match.when("delete-collection", () =>
|
||||
Effect.gen(function* () {
|
||||
const user = request.user ?? "";
|
||||
const collection = request.collection ?? "";
|
||||
|
||||
|
|
@ -1453,12 +1466,12 @@ export function makeLibrarianService(config: LibrarianServiceConfig): LibrarianS
|
|||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
default:
|
||||
return yield* librarianServiceError("collection-operation", `Unknown collection operation: ${String(request.operation)}`);
|
||||
}
|
||||
}),
|
||||
})
|
||||
),
|
||||
Match.orElse((operation) =>
|
||||
Effect.fail(librarianServiceError("collection-operation", `Unknown collection operation: ${String(operation)}`))
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
},
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue