diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index a3c734c7..7591b3e6 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -13,18 +13,18 @@ Verified source roots: - Installed Effect beta used by this workspace: `ts/node_modules/effect` Current signal counts from `ts/packages` after the 2026-06-02 -flow-manager/librarian runtime normalization slice: +FlowManager ref-backed state slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 198 | -| `Map<` | 71 | +| `Effect.runPromise` | 204 | +| `Map<` | 75 | | `WebSocket` | 47 | -| `new Map` | 53 | +| `new Map` | 56 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | -| `while (` | 11 | +| `while (` | 10 | | `new Error` | 14 | | `new Promise` | 10 | | `JSON.parse` | 7 | @@ -42,6 +42,10 @@ Notes: - `Effect.runPromise` is expected at external Promise compatibility boundaries, but each match should still be audited for avoidable internal runtime ownership. +- The `Effect.runPromise`, `Map<`, and `new Map` counts increased in this + snapshot because the FlowManager slice added focused service tests and + Promise compatibility facades while removing the service's internal mutable + object state. ## Loop Passes @@ -215,6 +219,37 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: FlowManager Ref-Backed State Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/flow-manager/service.ts` now exposes a typed + `FlowManagerService` instead of `AsyncProcessorRuntime & Record`. + - Runtime state now lives in + `SynchronizedRef` with `flows`, `blueprints`, the + request consumer, response producer, and config request client. + - Flow operations now have Effect-returning handlers with Promise facades + only on exported compatibility methods. + - Blueprint config loading now narrows runtime values before constructing + `Blueprint` records, replacing the prior `parsed as Blueprint` shortcut. + - `start-flow` and `stop-flow` mutate the flow map through + `SynchronizedRef.modifyEffect`, making duplicate checks and map updates + atomic. + - The consume loop now uses `Effect.whileLoop`; the remaining + `consumer.receive(2000)` call is a pubsub boundary for this service. + - New flow-manager tests cover tagged errors, ref-backed flow mutation, + config push/delete requests, blueprint narrowing, duplicate concurrent + starts, and message-level flow-error responses. +- Verification: + - `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -224,13 +259,12 @@ Notes: the client API is less Promise-first. - MCP env is now Config-backed; continue that policy for future MCP settings. - Flow stateful services: - - Config service and KnowledgeCore service ref-backed state are complete. - Librarian and flow-manager now have native Effect module startup - (`NodeRuntime.runMain` with `ManagedRuntime` compatibility facades), but - they still have mutable poller service objects. These remain good - candidates for `Context` services, scoped layers, - `Ref`/`SynchronizedRef`, `Schedule`, and managed - persistence. + - Config service, KnowledgeCore service, and FlowManager ref-backed state + are complete. Librarian now has native Effect module startup + (`NodeRuntime.runMain` with a `ManagedRuntime` compatibility facade), but + it still has a mutable poller service object. It remains a good candidate + for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, + `Schedule`, and managed persistence. - Persistence IO should move toward `FileSystem` or `KeyValueStore` where the installed beta has the needed provider surface. - Base messaging/processors: @@ -256,11 +290,10 @@ Notes: ## Ranked Findings -### P0: Continue Stateful Flow Services To Scoped Effect Services +### P0: Migrate Librarian Stateful Service To Scoped Effect Service - TrustGraph evidence: - `ts/packages/flow/src/librarian/service.ts` - - `ts/packages/flow/src/flow-manager/service.ts` - Effect primitives: - `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`, `Effect.addFinalizer`, `Config`, `Schema`, `FileSystem`, diff --git a/ts/packages/flow/src/__tests__/flow-manager-service.test.ts b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts new file mode 100644 index 00000000..f8506659 --- /dev/null +++ b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts @@ -0,0 +1,231 @@ +import {Effect, SynchronizedRef} from "effect"; +import {describe, expect, it} from "vitest"; +import { + topics, + type BackendConsumer, + type BackendProducer, + type ConfigRequest, + type ConfigResponse, + type CreateConsumerOptions, + type CreateProducerOptions, + type FlowRequest, + type FlowResponse, + type Message, + type PubSubBackend, + type RequestResponse, +} from "@trustgraph/base"; +import {FlowManagerError, makeFlowManagerService} from "../flow-manager/service.js"; + +class NoopPubSub implements PubSubBackend { + readonly sentByTopic = new Map>(); + + async createProducer(options: CreateProducerOptions): Promise> { + return { + send: async (message) => { + const sent = this.sentByTopic.get(options.topic) ?? []; + sent.push(message); + this.sentByTopic.set(options.topic, sent); + }, + flush: async () => undefined, + close: async () => undefined, + }; + } + + async createConsumer(_options: CreateConsumerOptions): Promise> { + return { + receive: async () => null, + acknowledge: async () => undefined, + negativeAcknowledge: async () => undefined, + unsubscribe: async () => undefined, + close: async () => undefined, + }; + } + + async close(): Promise {} +} + +class RecordingConfigClient implements RequestResponse { + readonly requests: Array = []; + + constructor( + private readonly blueprints: Array<{readonly key: string; readonly value: unknown}> = [], + private readonly flows: Array<{readonly key: string; readonly value: unknown}> = [], + private readonly legacyFlows: Array<{readonly key: string; readonly value: unknown}> = [], + ) {} + + async start(): Promise {} + + async stop(): Promise {} + + async request(request: ConfigRequest): Promise { + this.requests.push(request); + if (request.operation !== "getvalues") return {}; + + if (request.type === "flow-blueprint") { + return {values: this.blueprints}; + } + if (request.type === "flow") { + return {values: this.flows}; + } + if (request.type === "flows") { + return {values: this.legacyFlows}; + } + + return {values: []}; + } +} + +const makeService = (backend: PubSubBackend = new NoopPubSub()) => + makeFlowManagerService({ + id: "flow-manager-test", + manageProcessSignals: false, + pubsub: backend, + }); + +const seedConfigClient = async ( + service: ReturnType, + configClient: RecordingConfigClient, +) => + Effect.runPromise( + SynchronizedRef.update(service.state, (state) => ({ + ...state, + configClient, + })), + ); + +const seedResponseProducer = async ( + backend: NoopPubSub, + service: ReturnType, +) => { + const responseProducer = await backend.createProducer({ + topic: topics.flowResponse, + }); + await Effect.runPromise( + SynchronizedRef.update(service.state, (state) => ({ + ...state, + responseProducer, + })), + ); +}; + +describe("FlowManagerService operations", () => { + it("uses tagged errors for invalid flow mutations", async () => { + const service = makeService(); + + const startError = await service.handleStartFlow({operation: "start-flow"}) + .catch((caught: unknown) => caught); + const stopError = await service.handleStopFlow({operation: "stop-flow"}) + .catch((caught: unknown) => caught); + + expect(startError).toBeInstanceOf(FlowManagerError); + expect(startError).toMatchObject({_tag: "FlowManagerError", operation: "start-flow"}); + expect(stopError).toBeInstanceOf(FlowManagerError); + expect(stopError).toMatchObject({_tag: "FlowManagerError", operation: "stop-flow"}); + }); + + it("mutates flow state through the ref and pushes config updates", async () => { + const configClient = new RecordingConfigClient(); + const service = makeService(); + await seedConfigClient(service, configClient); + + await service.handleStartFlow({ + operation: "start-flow", + "flow-id": "flow-a", + description: "alpha", + parameters: {limit: 3}, + }); + let state = await Effect.runPromise(SynchronizedRef.get(service.state)); + expect(state.flows.get("flow-a")).toMatchObject({ + id: "flow-a", + blueprintName: "default", + description: "alpha", + parameters: {limit: 3}, + status: "running", + }); + + await service.handleStopFlow({ + operation: "stop-flow", + "flow-id": "flow-a", + }); + state = await Effect.runPromise(SynchronizedRef.get(service.state)); + + expect(state.flows.has("flow-a")).toBe(false); + expect(configClient.requests.map((request) => ({ + operation: request.operation, + keys: request.keys, + }))).toEqual([ + {operation: "put", keys: ["flows"]}, + {operation: "put", keys: ["flow"]}, + {operation: "delete", keys: ["flows", "flow-a"]}, + {operation: "delete", keys: ["flow", "flow-a"]}, + {operation: "put", keys: ["flows"]}, + {operation: "put", keys: ["flow"]}, + ]); + }); + + it("decodes valid blueprint config and skips invalid blueprint records", async () => { + const configClient = new RecordingConfigClient([ + { + key: "custom", + value: "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"},\"extra\":true}", + }, + { + key: "broken", + value: "{\"description\":\"Missing topics\"}", + }, + ]); + const service = makeService(); + await seedConfigClient(service, configClient); + + await service.refreshBlueprintsFromConfig(); + const state = await Effect.runPromise(SynchronizedRef.get(service.state)); + + expect(state.blueprints.get("custom")).toMatchObject({ + description: "Custom", + topics: {input: "topic.in"}, + extra: true, + }); + expect(state.blueprints.has("broken")).toBe(false); + expect(state.blueprints.has("default")).toBe(true); + }); + + it("serializes duplicate starts through the ref-backed map", async () => { + const configClient = new RecordingConfigClient(); + const service = makeService(); + await seedConfigClient(service, configClient); + + const results = await Promise.allSettled([ + service.handleStartFlow({operation: "start-flow", "flow-id": "flow-a"}), + service.handleStartFlow({operation: "start-flow", "flow-id": "flow-a"}), + ]); + const state = await Effect.runPromise(SynchronizedRef.get(service.state)); + + expect(state.flows.get("flow-a")).toMatchObject({id: "flow-a"}); + expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1); + expect(results.filter((result) => result.status === "rejected")).toHaveLength(1); + }); + + it("sends flow-error responses from handleMessageEffect", async () => { + const backend = new NoopPubSub(); + const configClient = new RecordingConfigClient(); + const service = makeService(backend); + await seedConfigClient(service, configClient); + await seedResponseProducer(backend, service); + + const message: Message = { + value: () => ({operation: "start-flow"}), + properties: () => ({id: "request-1"}), + }; + + await Effect.runPromise(service.handleMessageEffect(message)); + + expect(backend.sentByTopic.get(topics.flowResponse)).toEqual([ + { + error: { + type: "flow-error", + message: "Missing flow-id", + }, + }, + ]); + }); +}); diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 8fb22ff7..0ac88a60 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -18,16 +18,23 @@ import { makeAsyncProcessor, type ProcessorConfig, type AsyncProcessorRuntime, + type BackendConsumer, + type BackendProducer, topics, makeRequestResponse, + type RequestResponse, type ConfigRequest, type ConfigResponse, + FlowRequest as FlowRequestSchema, + FlowResponse as FlowResponseSchema, + type FlowRequest, + type FlowResponse, errorMessage, } from "@trustgraph/base"; import { makeProcessorProgram } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Context, Duration, Effect, Layer, ManagedRuntime, Option } from "effect"; +import { Duration, Effect, Layer, ManagedRuntime, Option, SynchronizedRef } from "effect"; import * as S from "effect/Schema"; // ---------- Internal state types ---------- @@ -155,728 +162,722 @@ const DEFAULT_BLUEPRINT: Blueprint = { // ---------- Service ---------- -export type FlowManagerService = AsyncProcessorRuntime & Record; +interface FlowManagerServiceState { + readonly flows: Map; + readonly blueprints: Map; + readonly consumer: BackendConsumer | null; + readonly responseProducer: BackendProducer | null; + readonly configClient: RequestResponse | null; +} + +export interface FlowManagerService extends AsyncProcessorRuntime { + readonly state: SynchronizedRef.SynchronizedRef; + readonly handleMessage: (msg: Message) => Promise; + readonly handleMessageEffect: (msg: Message) => Effect.Effect; + readonly configRequest: (request: ConfigRequest) => Promise; + readonly configRequestEffect: (request: ConfigRequest) => Effect.Effect; + readonly ensureDefaultBlueprint: () => Promise; + readonly ensureDefaultBlueprintEffect: Effect.Effect; + readonly refreshBlueprintsFromConfig: () => Promise; + readonly refreshBlueprintsFromConfigEffect: Effect.Effect; + readonly refreshFlowsFromConfig: () => Promise; + readonly refreshFlowsFromConfigEffect: Effect.Effect; + readonly handleOperation: (request: FlowRequest) => Promise; + readonly handleOperationEffect: (request: FlowRequest) => Effect.Effect; + readonly handleListBlueprints: () => FlowResponse; + readonly handleGetBlueprint: (request: FlowRequest) => Promise; + readonly handleGetBlueprintEffect: (request: FlowRequest) => Effect.Effect; + readonly handlePutBlueprint: (request: FlowRequest) => Promise; + readonly handlePutBlueprintEffect: (request: FlowRequest) => Effect.Effect; + readonly handleDeleteBlueprint: (request: FlowRequest) => Promise; + readonly handleDeleteBlueprintEffect: (request: FlowRequest) => Effect.Effect; + readonly handleListFlows: () => FlowResponse; + readonly handleGetFlow: (request: FlowRequest) => Promise; + readonly handleGetFlowEffect: (request: FlowRequest) => Effect.Effect; + readonly handleStartFlow: (request: FlowRequest) => Promise; + readonly handleStartFlowEffect: (request: FlowRequest) => Effect.Effect; + readonly handleStopFlow: (request: FlowRequest) => Promise; + readonly handleStopFlowEffect: (request: FlowRequest) => Effect.Effect; + readonly pushFlowsConfig: () => Promise; + readonly pushFlowsConfigEffect: Effect.Effect; + readonly deleteFlowConfig: (id: string) => Promise; + readonly deleteFlowConfigEffect: (id: string) => Effect.Effect; +} + +const initialState = (): FlowManagerServiceState => { + const blueprints = new Map(); + blueprints.set("default", DEFAULT_BLUEPRINT); + return { + flows: new Map(), + blueprints, + consumer: null, + responseProducer: null, + configClient: null, + }; +}; + +const isStringRecord = (value: unknown): value is Record => + isRecord(value) && Object.values(value).every((item) => typeof item === "string"); + +const cloneFlows = (source: Map): Map => + new Map(source); + +const cloneBlueprints = (source: Map): Map => + new Map(source); + +const stateSnapshot = ( + stateRef: SynchronizedRef.SynchronizedRef, +): FlowManagerServiceState => + SynchronizedRef.getUnsafe(stateRef); + +const modifyResult = ( + value: Value, + state: FlowManagerServiceState, +): readonly [Value, FlowManagerServiceState] => [value, state]; + +function blueprintFromConfig(value: unknown): Blueprint | undefined { + const parsed = parseConfigRecord(value); + if (parsed === undefined) return undefined; + const topics = isStringRecord(parsed.topics) ? parsed.topics : undefined; + if (topics === undefined) return undefined; + const parameters = isRecord(parsed.parameters) ? parsed.parameters : undefined; + return { + ...parsed, + description: optionalString(parsed.description) ?? "", + topics, + ...(parameters === undefined ? {} : { parameters }), + } satisfies Blueprint; +} + +function flowFromConfig(id: string, value: unknown): FlowInstance | undefined { + const parsed = parseConfigRecord(value); + if (parsed === undefined) return undefined; + return { + id, + blueprintName: optionalString(parsed["blueprint-name"]) ?? optionalString(parsed.blueprintName) ?? "default", + description: optionalString(parsed.description) ?? "", + parameters: isRecord(parsed.parameters) ? parsed.parameters : {}, + status: "running", + }; +} + +const updateHandles = ( + stateRef: SynchronizedRef.SynchronizedRef, + handles: { + readonly consumer?: BackendConsumer | null; + readonly responseProducer?: BackendProducer | null; + readonly configClient?: RequestResponse | null; + }, +) => + SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + consumer: handles.consumer === undefined ? state.consumer : handles.consumer, + responseProducer: handles.responseProducer === undefined ? state.responseProducer : handles.responseProducer, + configClient: handles.configClient === undefined ? state.configClient : handles.configClient, + })); + +const configRequestEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: ConfigRequest, +): Effect.Effect => + Effect.gen(function* () { + const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; + if (configClient === null) { + return yield* flowManagerError("config-request", "Config client not started"); + } + return yield* Effect.tryPromise({ + try: () => configClient.request(request), + catch: (cause) => flowManagerError("config-request", cause), + }); + }); + +const ensureDefaultBlueprintEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow-blueprint", + }); + if (configValues(response).some((value) => value.key === "default")) { + return; + } + + const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint"); + + yield* configRequestEffect(stateRef, { + operation: "put", + keys: ["flow-blueprint"], + values: { + default: defaultBlueprint, + }, + }); + }); + +const refreshBlueprintsFromConfigEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow-blueprint", + }); + const next = new Map(); + + for (const item of configValues(response)) { + const blueprint = blueprintFromConfig(item.value); + if (blueprint !== undefined) { + next.set(item.key, blueprint); + } + } + + if (!next.has("default")) { + next.set("default", DEFAULT_BLUEPRINT); + } + + yield* SynchronizedRef.update(stateRef, (state) => ({ + ...state, + blueprints: next, + })); + }); + +const refreshFlowsFromConfigEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow", + }); + const next = new Map(); + + for (const item of configValues(response)) { + const flow = flowFromConfig(item.key, item.value); + if (flow !== undefined) { + next.set(item.key, flow); + } + } + + if (next.size === 0) { + const flowsResponse = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flows", + }); + for (const item of configValues(flowsResponse)) { + next.set(item.key, { + id: item.key, + blueprintName: "default", + description: "", + parameters: {}, + status: "running", + }); + } + } + + yield* SynchronizedRef.update(stateRef, (state) => ({ + ...state, + flows: next, + })); + }); + +const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({ + "blueprint-names": [...state.blueprints.keys()], +}); + +const handleGetBlueprintEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); + } + + const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name); + if (blueprint === undefined) { + return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); + } + + const definition = yield* encodeJson(blueprint, "encode-blueprint"); + return { + "blueprint-definition": definition, + }; + }); + +const handlePutBlueprintEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("put-blueprint", "Missing blueprint-name"); + } + const rawDefinition = request["blueprint-definition"]; + if (rawDefinition === undefined) { + return yield* flowManagerError("put-blueprint", "Missing blueprint-definition"); + } + const definition = typeof rawDefinition === "string" + ? rawDefinition + : yield* encodeJson(rawDefinition, "encode-blueprint"); + + yield* configRequestEffect(stateRef, { + operation: "put", + keys: ["flow-blueprint"], + values: { [name]: definition }, + }); + yield* refreshBlueprintsFromConfigEffect(stateRef); + return {}; + }); + +const handleDeleteBlueprintEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("delete-blueprint", "Missing blueprint-name"); + } + + if (name === "default") { + return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint"); + } + + yield* configRequestEffect(stateRef, { + operation: "delete", + keys: ["flow-blueprint", name], + }); + yield* SynchronizedRef.update(stateRef, (state) => { + const blueprints = cloneBlueprints(state.blueprints); + blueprints.delete(name); + return { + ...state, + blueprints, + }; + }); + + return {}; + }); + +const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({ + "flow-ids": [...state.flows.keys()], +}); + +const flowRecord = (inst: FlowInstance) => ({ + "blueprint-name": inst.blueprintName, + description: inst.description, + parameters: inst.parameters, +}); + +const handleGetFlowEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const id = optionalString(request["flow-id"]); + if (id === undefined) { + return yield* flowManagerError("get-flow", "Missing flow-id"); + } + + const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id); + if (inst === undefined) { + return yield* flowManagerError("get-flow", `Flow not found: ${id}`); + } + + const flow = yield* encodeJson(flowRecord(inst), "encode-flow"); + return { flow }; + }); + +const handleStartFlowEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const id = optionalString(request["flow-id"]); + const blueprintName = optionalString(request["blueprint-name"]) ?? "default"; + const description = optionalString(request.description) ?? ""; + const parameters = isRecord(request.parameters) ? request.parameters : {}; + + if (id === undefined) { + return yield* flowManagerError("start-flow", "Missing flow-id"); + } + + const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + if (state.flows.has(id)) { + return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`)); + } + if (!state.blueprints.has(blueprintName)) { + return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`)); + } + + const next: FlowInstance = { + id, + blueprintName, + description, + parameters, + status: "running", + }; + const flows = cloneFlows(state.flows); + flows.set(id, next); + return Effect.succeed(modifyResult(next, { + ...state, + flows, + })); + }); + + yield* Effect.log(`[FlowManager] Started flow "${inst.id}" with blueprint "${inst.blueprintName}"`); + yield* pushFlowsConfigEffect(stateRef); + + return {}; + }); + +const handleStopFlowEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +): Effect.Effect => + Effect.gen(function* () { + const id = optionalString(request["flow-id"]); + if (id === undefined) { + return yield* flowManagerError("stop-flow", "Missing flow-id"); + } + + const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + const current = state.flows.get(id); + if (current === undefined) { + return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`)); + } + + const flows = cloneFlows(state.flows); + flows.delete(id); + return Effect.succeed(modifyResult(current, { + ...state, + flows, + })); + }); + + yield* Effect.log(`[FlowManager] Stopped flow "${inst.id}"`); + yield* deleteFlowConfigEffect(stateRef, id); + yield* pushFlowsConfigEffect(stateRef); + + return {}; + }); + +const pushFlowsConfigEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const state = yield* SynchronizedRef.get(stateRef); + const configClient = state.configClient; + if (configClient === null) return; + + const flowsConfig: Record }> = {}; + const flowRecords: Record = {}; + for (const [id, inst] of state.flows) { + const blueprint = state.blueprints.get(inst.blueprintName); + if (blueprint !== undefined) { + flowsConfig[id] = { topics: blueprint.topics }; + flowRecords[id] = yield* encodeJson(flowRecord(inst), "encode-flow-config"); + } + } + + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "put", + keys: ["flows"], + values: flowsConfig, + }), + catch: (cause) => flowManagerError("put-flows-config", cause), + }); + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "put", + keys: ["flow"], + values: flowRecords, + }), + catch: (cause) => flowManagerError("put-flow-records", cause), + }); + yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`); + }).pipe( + Effect.catch((err) => + Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }), + ), + ); + +const deleteFlowConfigEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + id: string, +): Effect.Effect => + Effect.gen(function* () { + const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; + if (configClient === null) return; + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "delete", + keys: ["flows", id], + }), + catch: (cause) => flowManagerError("delete-flows-config", cause), + }); + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "delete", + keys: ["flow", id], + }), + catch: (cause) => flowManagerError("delete-flow-record", cause), + }); + }); + +const closeFlowManagerResourcesEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const state = yield* SynchronizedRef.get(stateRef); + + const consumer = state.consumer; + if (consumer !== null) { + yield* Effect.tryPromise({ + try: () => consumer.close(), + catch: (cause) => flowManagerError("consumer-close", cause), + }); + } + const responseProducer = state.responseProducer; + if (responseProducer !== null) { + yield* Effect.tryPromise({ + try: () => responseProducer.close(), + catch: (cause) => flowManagerError("response-producer-close", cause), + }); + } + const configClient = state.configClient; + if (configClient !== null) { + yield* Effect.tryPromise({ + try: () => configClient.stop(), + catch: (cause) => flowManagerError("config-client-stop", cause), + }); + } + + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, + configClient: null, + }); + }); + +const consumeOnceEffect = ( + service: FlowManagerService, +): Effect.Effect => + Effect.gen(function* () { + const consumer = (yield* SynchronizedRef.get(service.state)).consumer; + if (consumer === null) { + return yield* flowManagerError("consume", "Flow request consumer not started"); + } + + const msg = yield* Effect.tryPromise({ + try: () => consumer.receive(2000), + catch: (cause) => flowManagerError("consume-receive", cause), + }); + if (msg === null) return; + + yield* service.handleMessageEffect(msg); + yield* Effect.tryPromise({ + try: () => consumer.acknowledge(msg), + catch: (cause) => flowManagerError("consume-acknowledge", cause), + }); + }); + +const runFlowManagerServiceEffect = ( + service: FlowManagerService, +): Effect.Effect => + Effect.gen(function* () { + const configClient = makeRequestResponse({ + pubsub: service.pubsub, + requestTopic: topics.configRequest, + responseTopic: topics.configResponse, + subscription: `${service.config.id}-config-client`, + }); + yield* updateHandles(service.state, { configClient }); + yield* Effect.tryPromise({ + try: () => configClient.start(), + catch: (cause) => flowManagerError("config-client-start", cause), + }); + yield* ensureDefaultBlueprintEffect(service.state); + yield* refreshBlueprintsFromConfigEffect(service.state); + + const responseProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.flowResponse, + schema: FlowResponseSchema, + }), + catch: (cause) => flowManagerError("response-producer", cause), + }); + yield* updateHandles(service.state, { responseProducer }); + + const consumer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createConsumer({ + topic: topics.flowRequest, + subscription: `${service.config.id}-flow-request`, + schema: FlowRequestSchema, + }), + catch: (cause) => flowManagerError("consumer", cause), + }); + yield* updateHandles(service.state, { consumer }); + + yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((err) => { + if (!service.running) return Effect.void; + return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); + }); export function makeFlowManagerService(config: ProcessorConfig): FlowManagerService { - const service = makeAsyncProcessor(config, { - run: () => service.run(Context.empty()), - }) as FlowManagerService; - const baseStop = service.stop; - service.flows = new Map(); - service.blueprints = new Map(); - service.consumer = null; - service.responseProducer = null; - service.configClient = null; - service.blueprints.set("default", DEFAULT_BLUEPRINT); - Object.assign(service, { - - - run: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - // Create config client for pushing flow configs to the config service - service.configClient = makeRequestResponse({ - pubsub: service.pubsub, - requestTopic: topics.configRequest, - responseTopic: topics.configResponse, - subscription: `${service.config.id}-config-client`, - }); - yield* Effect.tryPromise({ - try: () => service.configClient.start(), - catch: (cause) => flowManagerError("config-client-start", cause), - }); - yield* Effect.tryPromise({ - try: () => service.ensureDefaultBlueprint(), - catch: (cause) => flowManagerError("ensure-default-blueprint", cause), - }); - yield* Effect.tryPromise({ - try: () => service.refreshBlueprintsFromConfig(), - catch: (cause) => flowManagerError("refresh-blueprints", cause), - }); - - // Create producer for flow-response topic - service.responseProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer>({ - topic: topics.flowResponse, - }), - catch: (cause) => flowManagerError("response-producer", cause), - }); - - // Create consumer for flow-request topic - service.consumer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createConsumer>({ - topic: topics.flowRequest, - subscription: `${service.config.id}-flow-request`, - }), - catch: (cause) => flowManagerError("consumer", cause), - }); - - yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`); - - // Main consume loop (same pattern as ConfigService) - while (service.running) { - const shouldContinue = yield* Effect.gen(function* () { - const consumer = service.consumer; - if (consumer === null) { - return yield* flowManagerError("consume", "Flow request consumer not started"); - } - - const msg = yield* Effect.tryPromise({ - try: () => consumer.receive(2000), - catch: (cause) => flowManagerError("consume-receive", cause), - }); - if (msg === null) return true; - - yield* Effect.tryPromise({ - try: () => service.handleMessage(msg), - catch: (cause) => flowManagerError("consume-handle", cause), - }); - yield* Effect.tryPromise({ - try: () => consumer.acknowledge(msg), - catch: (cause) => flowManagerError("consume-acknowledge", cause), - }); - - return true; - }).pipe( - Effect.catch((err) => { - if (!service.running) return Effect.succeed(false); - return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - Effect.as(true), - ); - }), - ); - if (!shouldContinue) break; - } - }), - ); - - }, - - - - handleMessage: function(this: FlowManagerService, msg: Message>): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const request = msg.value(); - const props = msg.properties(); - const requestId = props.id; - - if (requestId === undefined || requestId.length === 0) { - yield* Effect.logWarning("[FlowManager] Received request without id, ignoring"); - return; - } - - const sendResponse = (response: Record): Effect.Effect => - Effect.gen(function* () { - const responseProducer = service.responseProducer; - if (responseProducer === null) { - return yield* flowManagerError("respond", "Flow response producer not started"); - } - yield* Effect.tryPromise({ - try: () => responseProducer.send(response, { id: requestId }), - catch: (cause) => flowManagerError("respond", cause), - }); - }); - - yield* Effect.gen(function* () { - const response = yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleOperation(request), - catch: (cause) => flowManagerError("operation", cause), - }); - yield* sendResponse(response); - }).pipe( - Effect.catch((err) => - sendResponse({ - error: { type: "flow-error", message: err.message }, - }), - ), - ); - }), - ); - - }, - - - - configRequest: function(this: FlowManagerService, request: ConfigRequest): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const configClient = service.configClient; - if (configClient === null) { - return yield* flowManagerError("config-request", "Config client not started"); - } - return yield* Effect.tryPromise({ - try: () => configClient.request(request), - catch: (cause) => flowManagerError("config-request", cause), - }); - }), - ); - - }, - - - - ensureDefaultBlueprint: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const response = yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "getvalues", - type: "flow-blueprint", - }), - catch: (cause) => flowManagerError("get-default-blueprint", cause), - }); - if (configValues(response).some((value) => value.key === "default")) { - return; - } - - const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint"); - - yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "put", - keys: ["flow-blueprint"], - values: { - default: defaultBlueprint, - }, - }), - catch: (cause) => flowManagerError("put-default-blueprint", cause), - }); - }), - ); - - }, - - - - refreshBlueprintsFromConfig: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const response = yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "getvalues", - type: "flow-blueprint", - }), - catch: (cause) => flowManagerError("refresh-blueprints", cause), - }); - const next = new Map(); - - for (const item of configValues(response)) { - const parsed = parseConfigRecord(item.value); - if (parsed === undefined) continue; - next.set(item.key, parsed as Blueprint); - } - - if (!next.has("default")) { - next.set("default", DEFAULT_BLUEPRINT); - } - service.blueprints = next; - }), - ); - - }, - - - - refreshFlowsFromConfig: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const response = yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "getvalues", - type: "flow", - }), - catch: (cause) => flowManagerError("refresh-flows", cause), - }); - const next = new Map(); - - for (const item of configValues(response)) { - const parsed = parseConfigRecord(item.value); - if (parsed === undefined) continue; - const parameters = isRecord(parsed.parameters) ? parsed.parameters : {}; - next.set(item.key, { - id: item.key, - blueprintName: optionalString(parsed["blueprint-name"]) ?? optionalString(parsed.blueprintName) ?? "default", - description: optionalString(parsed.description) ?? "", - parameters, - status: "running", - }); - } - - if (next.size === 0) { - const flowsResponse = yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "getvalues", - type: "flows", - }), - catch: (cause) => flowManagerError("refresh-legacy-flows", cause), - }); - for (const item of configValues(flowsResponse)) { - next.set(item.key, { - id: item.key, - blueprintName: "default", - description: "", - parameters: {}, - status: "running", - }); - } - } - - service.flows = next; - }), - ); - - }, - - - - handleOperation: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const op = optionalString(request.operation); - yield* Effect.tryPromise({ - try: () => service.refreshBlueprintsFromConfig(), - catch: (cause) => flowManagerError("refresh-blueprints", cause), - }); - yield* Effect.tryPromise({ - try: () => service.refreshFlowsFromConfig(), - catch: (cause) => flowManagerError("refresh-flows", cause), - }); - - switch (op) { - case "list-blueprints": - return service.handleListBlueprints(); - - case "put-blueprint": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handlePutBlueprint(request), - catch: (cause) => flowManagerError("put-blueprint", cause), - }); - - case "get-blueprint": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleGetBlueprint(request), - catch: (cause) => flowManagerError("get-blueprint", cause), - }); - - case "delete-blueprint": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleDeleteBlueprint(request), - catch: (cause) => flowManagerError("delete-blueprint", cause), - }); - - case "list-flows": - return service.handleListFlows(); - - case "get-flow": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleGetFlow(request), - catch: (cause) => flowManagerError("get-flow", cause), - }); - - case "start-flow": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleStartFlow(request), - catch: (cause) => flowManagerError("start-flow", cause), - }); - - case "stop-flow": - return yield* Effect.tryPromise, FlowManagerError>({ - try: () => service.handleStopFlow(request), - catch: (cause) => flowManagerError("stop-flow", cause), - }); - - default: - return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); - } - }), - ); - - }, - - - - // ---------- Blueprint operations ---------- - - handleListBlueprints: function(this: FlowManagerService): Record { - return { - "blueprint-names": [...this.blueprints.keys()], - }; - - }, - - - - handleGetBlueprint: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); - } - - const blueprint = service.blueprints.get(name); - if (blueprint === undefined) { - return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); - } - - const definition = yield* encodeJson(blueprint, "encode-blueprint"); - return { - "blueprint-definition": definition, - }; - }), - ); - - }, - - - - handlePutBlueprint: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("put-blueprint", "Missing blueprint-name"); - } - const rawDefinition = request["blueprint-definition"]; - if (rawDefinition === undefined) { - return yield* flowManagerError("put-blueprint", "Missing blueprint-definition"); - } - const definition = typeof rawDefinition === "string" - ? rawDefinition - : yield* encodeJson(rawDefinition, "encode-blueprint"); - - yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "put", - keys: ["flow-blueprint"], - values: { [name]: definition }, - }), - catch: (cause) => flowManagerError("put-blueprint-config", cause), - }); - yield* Effect.tryPromise({ - try: () => service.refreshBlueprintsFromConfig(), - catch: (cause) => flowManagerError("refresh-blueprints", cause), - }); - return {}; - }), - ); - - }, - - - - handleDeleteBlueprint: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("delete-blueprint", "Missing blueprint-name"); - } - - if (name === "default") { - return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint"); - } - - yield* Effect.tryPromise({ - try: () => - service.configRequest({ - operation: "delete", - keys: ["flow-blueprint", name], - }), - catch: (cause) => flowManagerError("delete-blueprint-config", cause), - }); - service.blueprints.delete(name); - - return {}; - }), - ); - - }, - - - - // ---------- Flow operations ---------- - - handleListFlows: function(this: FlowManagerService): Record { - return { - "flow-ids": [...this.flows.keys()], - }; - - }, - - - - handleGetFlow: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - if (id === undefined) { - return yield* flowManagerError("get-flow", "Missing flow-id"); - } - - const inst = service.flows.get(id); - if (inst === undefined) { - return yield* flowManagerError("get-flow", `Flow not found: ${id}`); - } - - const flow = yield* encodeJson( - { - "blueprint-name": inst.blueprintName, - description: inst.description, - parameters: inst.parameters, - }, - "encode-flow", - ); - - return { flow }; - }), - ); - - }, - - - - handleStartFlow: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - const blueprintName = optionalString(request["blueprint-name"]) ?? "default"; - const description = optionalString(request.description) ?? ""; - const parameters = isRecord(request.parameters) ? request.parameters : {}; - - if (id === undefined) { - return yield* flowManagerError("start-flow", "Missing flow-id"); - } - - if ((service.flows as Map).has(id)) { - return yield* flowManagerError("start-flow", `Flow already exists: ${id}`); - } - - const blueprint = service.blueprints.get(blueprintName); - if (blueprint === undefined) { - return yield* flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`); - } - - // Create the flow instance - const inst: FlowInstance = { - id, - blueprintName, - description, - parameters, - status: "running", - }; - service.flows.set(id, inst); - - yield* Effect.log( - `[FlowManager] Started flow "${id}" with blueprint "${blueprintName}"`, - ); - - // Push updated flows config to the config service - yield* Effect.tryPromise({ - try: () => service.pushFlowsConfig(), - catch: (cause) => flowManagerError("push-flows-config", cause), - }); - - return {}; - }), - ); - - }, - - - - handleStopFlow: function(this: FlowManagerService, request: Record): Promise> { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - if (id === undefined) { - return yield* flowManagerError("stop-flow", "Missing flow-id"); - } - - const inst = service.flows.get(id); - if (inst === undefined) { - return yield* flowManagerError("stop-flow", `Flow not found: ${id}`); - } - - service.flows.delete(id); - - yield* Effect.log(`[FlowManager] Stopped flow "${id}"`); - - yield* Effect.tryPromise({ - try: () => service.deleteFlowConfig(id), - catch: (cause) => flowManagerError("delete-flow-config", cause), - }); - - // Push updated flows config (without the removed flow) - yield* Effect.tryPromise({ - try: () => service.pushFlowsConfig(), - catch: (cause) => flowManagerError("push-flows-config", cause), - }); - - return {}; - }), - ); - - }, - - - - // ---------- Config push ---------- - - /** - * Build the flows config object from all running flows and push it - * to the config service via a PUT operation. - */ - pushFlowsConfig: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const configClient = service.configClient; - if (configClient === null) return; - - const flowsConfig: Record }> = {}; - const flowRecords: Record = {}; - for (const [id, inst] of service.flows) { - const blueprint = service.blueprints.get(inst.blueprintName); - if (blueprint !== undefined) { - flowsConfig[id] = { topics: blueprint.topics }; - flowRecords[id] = yield* encodeJson( - { - "blueprint-name": inst.blueprintName, - description: inst.description, - parameters: inst.parameters, - }, - "encode-flow-config", - ); - } - } - - yield* Effect.gen(function* () { - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "put", - keys: ["flows"], - values: flowsConfig, - }), - catch: (cause) => flowManagerError("put-flows-config", cause), - }); - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "put", - keys: ["flow"], - values: flowRecords, - }), - catch: (cause) => flowManagerError("put-flow-records", cause), - }); - yield* Effect.log( - `[FlowManager] Pushed flows config (${service.flows.size} active flows)`, - ); - }).pipe( - Effect.catch((err) => - Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }), - ), - ); - }), - ); - - }, - - - - deleteFlowConfig: function(this: FlowManagerService, id: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const configClient = service.configClient; - if (configClient === null) return; - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "delete", - keys: ["flows", id], - }), - catch: (cause) => flowManagerError("delete-flows-config", cause), - }); - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "delete", - keys: ["flow", id], - }), - catch: (cause) => flowManagerError("delete-flow-record", cause), - }); - }), - ); - - }, - - - - // ---------- Lifecycle ---------- - - stop: function(this: FlowManagerService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - if (service.consumer !== null) { - const consumer = service.consumer; - yield* Effect.tryPromise({ - try: () => consumer.close(), - catch: (cause) => flowManagerError("consumer-close", cause), - }); - service.consumer = null; - } - if (service.responseProducer !== null) { - const responseProducer = service.responseProducer; - yield* Effect.tryPromise({ - try: () => responseProducer.close(), - catch: (cause) => flowManagerError("response-producer-close", cause), - }); - service.responseProducer = null; - } - if (service.configClient !== null) { - const configClient = service.configClient; - yield* Effect.tryPromise({ - try: () => configClient.stop(), - catch: (cause) => flowManagerError("config-client-stop", cause), - }); - service.configClient = null; - } - yield* Effect.tryPromise({ + const state = SynchronizedRef.makeUnsafe(initialState()); + let service: FlowManagerService | undefined; + + const getService = Effect.sync(() => service).pipe( + Effect.flatMap((current) => + current === undefined + ? Effect.fail(flowManagerError("service", "Flow manager service not initialized")) + : Effect.succeed(current) + ), + ); + + const base = makeAsyncProcessor(config, { + runEffect: () => getService.pipe(Effect.flatMap(runFlowManagerServiceEffect)), + }); + const baseStop = base.stop; + + const handleOperationEffect = (request: FlowRequest): Effect.Effect => + Effect.gen(function* () { + const op = optionalString(request.operation); + yield* refreshBlueprintsFromConfigEffect(state); + yield* refreshFlowsFromConfigEffect(state); + + switch (op) { + case "list-blueprints": + return handleListBlueprintsWithState(state.pipe(stateSnapshot)); + + case "put-blueprint": + return yield* handlePutBlueprintEffect(state, request); + + case "get-blueprint": + return yield* handleGetBlueprintEffect(state, request); + + case "delete-blueprint": + return yield* handleDeleteBlueprintEffect(state, request); + + case "list-flows": + return handleListFlowsWithState(state.pipe(stateSnapshot)); + + case "get-flow": + return yield* handleGetFlowEffect(state, request); + + case "start-flow": + return yield* handleStartFlowEffect(state, request); + + case "stop-flow": + return yield* handleStopFlowEffect(state, request); + + default: + return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); + } + }); + + const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message) { + const request = yield* S.decodeUnknownEffect(FlowRequestSchema)(msg.value()).pipe( + Effect.mapError((cause) => flowManagerError("decode", cause)), + ); + const requestId = msg.properties().id; + + if (requestId === undefined || requestId.length === 0) { + yield* Effect.logWarning("[FlowManager] Received request without id, ignoring"); + return; + } + + const sendResponse = (response: FlowResponse): Effect.Effect => + Effect.gen(function* () { + const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer; + if (responseProducer === null) { + return yield* flowManagerError("respond", "Flow response producer not started"); + } + yield* Effect.tryPromise({ + try: () => responseProducer.send(response, { id: requestId }), + catch: (cause) => flowManagerError("respond", cause), + }); + }); + + yield* handleOperationEffect(request).pipe( + Effect.flatMap(sendResponse), + Effect.catch((err) => + sendResponse({ + error: { type: "flow-error", message: err.message }, + }) + ), + ); + }); + + const flowManagerService: FlowManagerService = Object.assign(base, { + state, + handleMessage: (msg: Message) => Effect.runPromise(handleMessageEffect(msg)), + handleMessageEffect, + configRequest: (request: ConfigRequest) => Effect.runPromise(configRequestEffect(state, request)), + configRequestEffect: (request: ConfigRequest) => configRequestEffect(state, request), + ensureDefaultBlueprint: () => Effect.runPromise(ensureDefaultBlueprintEffect(state)), + ensureDefaultBlueprintEffect: ensureDefaultBlueprintEffect(state), + refreshBlueprintsFromConfig: () => Effect.runPromise(refreshBlueprintsFromConfigEffect(state)), + refreshBlueprintsFromConfigEffect: refreshBlueprintsFromConfigEffect(state), + refreshFlowsFromConfig: () => Effect.runPromise(refreshFlowsFromConfigEffect(state)), + refreshFlowsFromConfigEffect: refreshFlowsFromConfigEffect(state), + handleOperation: (request: FlowRequest) => Effect.runPromise(handleOperationEffect(request)), + handleOperationEffect, + handleListBlueprints: () => handleListBlueprintsWithState(state.pipe(stateSnapshot)), + handleGetBlueprint: (request: FlowRequest) => Effect.runPromise(handleGetBlueprintEffect(state, request)), + handleGetBlueprintEffect: (request: FlowRequest) => handleGetBlueprintEffect(state, request), + handlePutBlueprint: (request: FlowRequest) => Effect.runPromise(handlePutBlueprintEffect(state, request)), + handlePutBlueprintEffect: (request: FlowRequest) => handlePutBlueprintEffect(state, request), + handleDeleteBlueprint: (request: FlowRequest) => Effect.runPromise(handleDeleteBlueprintEffect(state, request)), + handleDeleteBlueprintEffect: (request: FlowRequest) => handleDeleteBlueprintEffect(state, request), + handleListFlows: () => handleListFlowsWithState(state.pipe(stateSnapshot)), + handleGetFlow: (request: FlowRequest) => Effect.runPromise(handleGetFlowEffect(state, request)), + handleGetFlowEffect: (request: FlowRequest) => handleGetFlowEffect(state, request), + handleStartFlow: (request: FlowRequest) => Effect.runPromise(handleStartFlowEffect(state, request)), + handleStartFlowEffect: (request: FlowRequest) => handleStartFlowEffect(state, request), + handleStopFlow: (request: FlowRequest) => Effect.runPromise(handleStopFlowEffect(state, request)), + handleStopFlowEffect: (request: FlowRequest) => handleStopFlowEffect(state, request), + pushFlowsConfig: () => Effect.runPromise(pushFlowsConfigEffect(state)), + pushFlowsConfigEffect: pushFlowsConfigEffect(state), + deleteFlowConfig: (id: string) => Effect.runPromise(deleteFlowConfigEffect(state, id)), + deleteFlowConfigEffect: (id: string) => deleteFlowConfigEffect(state, id), + stop: () => + Effect.runPromise( + closeFlowManagerResourcesEffect(state).pipe( + Effect.flatMap(() => + Effect.tryPromise({ try: () => baseStop(), catch: (cause) => flowManagerError("base-stop", cause), - }); - }), - ); - - } + }) + ), + ), + ), }); - return service; + + service = flowManagerService; + return flowManagerService; } export const FlowManagerService = makeFlowManagerService;