From 68cbcde1f6da3f09e4ad82e0fdbefa2e88457004 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 05:20:03 -0500 Subject: [PATCH] Use Match for flow manager operations --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 22 +++++- .../__tests__/flow-manager-service.test.ts | 68 +++++++++++++++++++ ts/packages/flow/src/flow-manager/service.ts | 41 ++++------- 3 files changed, 100 insertions(+), 31 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index b67a0f46..d67f4751 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -384,6 +384,24 @@ Notes: - `bun run --cwd ts/packages/flow test -- src/__tests__/librarian-service.test.ts` - `cd ts && bun run check:tsgo` +### 2026-06-04: FlowManager Operation Match Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/flow-manager/service.ts` now dispatches flow + operations with `effect/Match` instead of a native `switch`. + - The dispatcher keeps the existing config refresh behavior before routing + and uses `Match.orElse` because `FlowRequest.operation` is a public + wire-level `string`, not a closed schema literal union. + - Existing tagged `FlowManagerError` behavior is preserved for unknown + operations and branch-specific failures. + - Flow-manager tests now cover all eight flow operations through + `handleOperation`, including config-client blueprint mutations and the + runtime unknown-operation fallback. +- Verification: + - `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts` + - `cd ts && bun run check:tsgo` + ### 2026-06-02: RAG And Agent Requestor Bridge Slice - Status: migrated, root-verified, committed, and pushed. @@ -1831,8 +1849,8 @@ Notes: `Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a focused scan before treating them as valid migration targets. - ConfigService and KnowledgeCore operation dispatch now use `effect/Match` - with `Match.exhaustive`; librarian operation dispatch now uses - `effect/Match` with runtime-preserving `Match.orElse` fallbacks. + with `Match.exhaustive`; FlowManager and Librarian operation dispatch now + use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. - Long-lived `Map` / `Set` state in ref-backed services can move toward Effect collections later; local pure traversal maps/sets remain no-ops. diff --git a/ts/packages/flow/src/__tests__/flow-manager-service.test.ts b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts index f8506659..0fa82463 100644 --- a/ts/packages/flow/src/__tests__/flow-manager-service.test.ts +++ b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts @@ -109,6 +109,74 @@ const seedResponseProducer = async ( }; describe("FlowManagerService operations", () => { + it("dispatches all flow operations through the Match-backed handler", async () => { + const configClient = new RecordingConfigClient( + [ + { + key: "custom", + value: "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"}}", + }, + ], + [ + { + key: "flow-a", + value: "{\"blueprint-name\":\"custom\",\"description\":\"Alpha\",\"parameters\":{\"limit\":3}}", + }, + ], + ); + const service = makeService(); + await seedConfigClient(service, configClient); + + await expect(service.handleOperation({operation: "list-blueprints"})).resolves.toEqual({ + "blueprint-names": ["custom", "default"], + }); + await expect(service.handleOperation({ + operation: "get-blueprint", + "blueprint-name": "custom", + })).resolves.toMatchObject({ + "blueprint-definition": "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"}}", + }); + await expect(service.handleOperation({ + operation: "put-blueprint", + "blueprint-name": "added", + "blueprint-definition": {description: "Added", topics: {input: "topic.added"}}, + })).resolves.toEqual({}); + await expect(service.handleOperation({ + operation: "delete-blueprint", + "blueprint-name": "custom", + })).resolves.toEqual({}); + await expect(service.handleOperation({operation: "list-flows"})).resolves.toEqual({ + "flow-ids": ["flow-a"], + }); + await expect(service.handleOperation({ + operation: "get-flow", + "flow-id": "flow-a", + })).resolves.toEqual({ + flow: "{\"blueprint-name\":\"custom\",\"description\":\"Alpha\",\"parameters\":{\"limit\":3}}", + }); + await expect(service.handleOperation({ + operation: "start-flow", + "flow-id": "flow-b", + "blueprint-name": "custom", + })).resolves.toEqual({}); + await expect(service.handleOperation({ + operation: "stop-flow", + "flow-id": "flow-a", + })).resolves.toEqual({}); + await expect(service.handleOperation({operation: "unknown-flow"})).rejects.toMatchObject({ + _tag: "FlowManagerError", + operation: "operation", + message: "Unknown flow operation: unknown-flow", + }); + + expect(configClient.requests.some((request) => + request.operation === "put" && request.keys?.[0] === "flow-blueprint" + )).toBe(true); + expect(configClient.requests.some((request) => + request.operation === "delete" && request.keys?.[0] === "flow-blueprint" + )).toBe(true); + }); + it("uses tagged errors for invalid flow mutations", async () => { const service = makeService(); diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 41085c87..9bdd5095 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -34,7 +34,7 @@ import { import { makeProcessorProgram } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Duration, Effect, Layer, ManagedRuntime, Option, SynchronizedRef } from "effect"; +import { Duration, Effect, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect"; import * as S from "effect/Schema"; // ---------- Internal state types ---------- @@ -756,34 +756,17 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ yield* refreshBlueprintsFromConfigEffect(state); yield* refreshFlowsFromConfigEffect(state); - switch (op) { - case "list-blueprints": - return handleListBlueprintsWithState(state.pipe(stateSnapshot)); - - case "put-blueprint": - return yield* handlePutBlueprintEffect(state, request); - - case "get-blueprint": - return yield* handleGetBlueprintEffect(state, request); - - case "delete-blueprint": - return yield* handleDeleteBlueprintEffect(state, request); - - case "list-flows": - return handleListFlowsWithState(state.pipe(stateSnapshot)); - - case "get-flow": - return yield* handleGetFlowEffect(state, request); - - case "start-flow": - return yield* handleStartFlowEffect(state, request); - - case "stop-flow": - return yield* handleStopFlowEffect(state, request); - - default: - return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); - } + return yield* Match.value(op).pipe( + Match.when("list-blueprints", () => Effect.succeed(handleListBlueprintsWithState(state.pipe(stateSnapshot)))), + Match.when("put-blueprint", () => handlePutBlueprintEffect(state, request)), + Match.when("get-blueprint", () => handleGetBlueprintEffect(state, request)), + Match.when("delete-blueprint", () => handleDeleteBlueprintEffect(state, request)), + Match.when("list-flows", () => Effect.succeed(handleListFlowsWithState(state.pipe(stateSnapshot)))), + Match.when("get-flow", () => handleGetFlowEffect(state, request)), + Match.when("start-flow", () => handleStartFlowEffect(state, request)), + Match.when("stop-flow", () => handleStopFlowEffect(state, request)), + Match.orElse(() => Effect.fail(flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`))), + ); }); const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message) {