mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-30 17:09:38 +02:00
Use Match for flow manager operations
This commit is contained in:
parent
213222bb42
commit
68cbcde1f6
3 changed files with 100 additions and 31 deletions
|
|
@ -384,6 +384,24 @@ Notes:
|
|||
- `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts`
|
||||
- `cd ts && bun run check:tsgo`
|
||||
|
||||
### 2026-06-04: FlowManager Operation Match Slice
|
||||
|
||||
- Status: migrated and package-verified.
|
||||
- Completed:
|
||||
- `ts/packages/flow/src/flow-manager/service.ts` now dispatches flow
|
||||
operations with `effect/Match` instead of a native `switch`.
|
||||
- The dispatcher keeps the existing config refresh behavior before routing
|
||||
and uses `Match.orElse` because `FlowRequest.operation` is a public
|
||||
wire-level `string`, not a closed schema literal union.
|
||||
- Existing tagged `FlowManagerError` behavior is preserved for unknown
|
||||
operations and branch-specific failures.
|
||||
- Flow-manager tests now cover all eight flow operations through
|
||||
`handleOperation`, including config-client blueprint mutations and the
|
||||
runtime unknown-operation fallback.
|
||||
- Verification:
|
||||
- `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts`
|
||||
- `cd ts && bun run check:tsgo`
|
||||
|
||||
### 2026-06-02: RAG And Agent Requestor Bridge Slice
|
||||
|
||||
- Status: migrated, root-verified, committed, and pushed.
|
||||
|
|
@ -1831,8 +1849,8 @@ Notes:
|
|||
`Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a
|
||||
focused scan before treating them as valid migration targets.
|
||||
- ConfigService and KnowledgeCore operation dispatch now use `effect/Match`
|
||||
with `Match.exhaustive`; librarian operation dispatch now uses
|
||||
`effect/Match` with runtime-preserving `Match.orElse` fallbacks.
|
||||
with `Match.exhaustive`; FlowManager and Librarian operation dispatch now
|
||||
use `effect/Match` with runtime-preserving `Match.orElse` fallbacks.
|
||||
- Long-lived `Map` / `Set` state in ref-backed services can move toward
|
||||
Effect collections later; local pure traversal maps/sets remain no-ops.
|
||||
|
||||
|
|
|
|||
|
|
@ -109,6 +109,74 @@ const seedResponseProducer = async (
|
|||
};
|
||||
|
||||
describe("FlowManagerService operations", () => {
|
||||
it("dispatches all flow operations through the Match-backed handler", async () => {
|
||||
const configClient = new RecordingConfigClient(
|
||||
[
|
||||
{
|
||||
key: "custom",
|
||||
value: "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"}}",
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
key: "flow-a",
|
||||
value: "{\"blueprint-name\":\"custom\",\"description\":\"Alpha\",\"parameters\":{\"limit\":3}}",
|
||||
},
|
||||
],
|
||||
);
|
||||
const service = makeService();
|
||||
await seedConfigClient(service, configClient);
|
||||
|
||||
await expect(service.handleOperation({operation: "list-blueprints"})).resolves.toEqual({
|
||||
"blueprint-names": ["custom", "default"],
|
||||
});
|
||||
await expect(service.handleOperation({
|
||||
operation: "get-blueprint",
|
||||
"blueprint-name": "custom",
|
||||
})).resolves.toMatchObject({
|
||||
"blueprint-definition": "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"}}",
|
||||
});
|
||||
await expect(service.handleOperation({
|
||||
operation: "put-blueprint",
|
||||
"blueprint-name": "added",
|
||||
"blueprint-definition": {description: "Added", topics: {input: "topic.added"}},
|
||||
})).resolves.toEqual({});
|
||||
await expect(service.handleOperation({
|
||||
operation: "delete-blueprint",
|
||||
"blueprint-name": "custom",
|
||||
})).resolves.toEqual({});
|
||||
await expect(service.handleOperation({operation: "list-flows"})).resolves.toEqual({
|
||||
"flow-ids": ["flow-a"],
|
||||
});
|
||||
await expect(service.handleOperation({
|
||||
operation: "get-flow",
|
||||
"flow-id": "flow-a",
|
||||
})).resolves.toEqual({
|
||||
flow: "{\"blueprint-name\":\"custom\",\"description\":\"Alpha\",\"parameters\":{\"limit\":3}}",
|
||||
});
|
||||
await expect(service.handleOperation({
|
||||
operation: "start-flow",
|
||||
"flow-id": "flow-b",
|
||||
"blueprint-name": "custom",
|
||||
})).resolves.toEqual({});
|
||||
await expect(service.handleOperation({
|
||||
operation: "stop-flow",
|
||||
"flow-id": "flow-a",
|
||||
})).resolves.toEqual({});
|
||||
await expect(service.handleOperation({operation: "unknown-flow"})).rejects.toMatchObject({
|
||||
_tag: "FlowManagerError",
|
||||
operation: "operation",
|
||||
message: "Unknown flow operation: unknown-flow",
|
||||
});
|
||||
|
||||
expect(configClient.requests.some((request) =>
|
||||
request.operation === "put" && request.keys?.[0] === "flow-blueprint"
|
||||
)).toBe(true);
|
||||
expect(configClient.requests.some((request) =>
|
||||
request.operation === "delete" && request.keys?.[0] === "flow-blueprint"
|
||||
)).toBe(true);
|
||||
});
|
||||
|
||||
it("uses tagged errors for invalid flow mutations", async () => {
|
||||
const service = makeService();
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import {
|
|||
import { makeProcessorProgram } from "@trustgraph/base";
|
||||
import type { Message } from "@trustgraph/base";
|
||||
import { NodeRuntime } from "@effect/platform-node";
|
||||
import { Duration, Effect, Layer, ManagedRuntime, Option, SynchronizedRef } from "effect";
|
||||
import { Duration, Effect, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect";
|
||||
import * as S from "effect/Schema";
|
||||
|
||||
// ---------- Internal state types ----------
|
||||
|
|
@ -756,34 +756,17 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ
|
|||
yield* refreshBlueprintsFromConfigEffect(state);
|
||||
yield* refreshFlowsFromConfigEffect(state);
|
||||
|
||||
switch (op) {
|
||||
case "list-blueprints":
|
||||
return handleListBlueprintsWithState(state.pipe(stateSnapshot));
|
||||
|
||||
case "put-blueprint":
|
||||
return yield* handlePutBlueprintEffect(state, request);
|
||||
|
||||
case "get-blueprint":
|
||||
return yield* handleGetBlueprintEffect(state, request);
|
||||
|
||||
case "delete-blueprint":
|
||||
return yield* handleDeleteBlueprintEffect(state, request);
|
||||
|
||||
case "list-flows":
|
||||
return handleListFlowsWithState(state.pipe(stateSnapshot));
|
||||
|
||||
case "get-flow":
|
||||
return yield* handleGetFlowEffect(state, request);
|
||||
|
||||
case "start-flow":
|
||||
return yield* handleStartFlowEffect(state, request);
|
||||
|
||||
case "stop-flow":
|
||||
return yield* handleStopFlowEffect(state, request);
|
||||
|
||||
default:
|
||||
return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`);
|
||||
}
|
||||
return yield* Match.value(op).pipe(
|
||||
Match.when("list-blueprints", () => Effect.succeed(handleListBlueprintsWithState(state.pipe(stateSnapshot)))),
|
||||
Match.when("put-blueprint", () => handlePutBlueprintEffect(state, request)),
|
||||
Match.when("get-blueprint", () => handleGetBlueprintEffect(state, request)),
|
||||
Match.when("delete-blueprint", () => handleDeleteBlueprintEffect(state, request)),
|
||||
Match.when("list-flows", () => Effect.succeed(handleListFlowsWithState(state.pipe(stateSnapshot)))),
|
||||
Match.when("get-flow", () => handleGetFlowEffect(state, request)),
|
||||
Match.when("start-flow", () => handleStartFlowEffect(state, request)),
|
||||
Match.when("stop-flow", () => handleStopFlowEffect(state, request)),
|
||||
Match.orElse(() => Effect.fail(flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`))),
|
||||
);
|
||||
});
|
||||
|
||||
const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message<FlowRequest>) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue