diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index d0465403..9cc214d6 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -459,6 +459,28 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: FlowManager Effect.fn Normalization Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/flow-manager/service.ts` no longer defines + reusable helpers as arrow functions that immediately return + `Effect.gen(...)`. + - Config request, blueprint refresh, flow refresh, blueprint handlers, flow + handlers, config push/delete, resource close, consume, run, and local + operation handling now use named `Effect.fn` providers. + - Hot local helpers for one-message consumption and response sending use + `Effect.fnUntraced`. + - `pushFlowsConfigEffect` keeps its best-effort logging/swallowing contract + through the `Effect.fn` pipeable form instead of a wrapper generator. +- Verification: + - `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ### 2026-06-02: Librarian Schema And Assertion Cleanup Slice - Status: migrated and root-verified. @@ -1750,9 +1772,9 @@ Notes: `MutableHashSet`. Short-lived local traversal sets remain no-ops. - Gateway dispatcher static service registries, streaming membership, and scoped requestor cache now use Effect `HashMap`/`HashSet`. - - FlowManager and sibling service `() => Effect.gen(...)` factories remain a - broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled - after Duration and small collection slices. + - FlowManager `() => Effect.gen(...)` factories are normalized to + `Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a + focused scan before treating them as valid migration targets. - Long-lived `Map` / `Set` state in ref-backed services can move toward Effect collections later; local pure traversal maps/sets remain no-ops. @@ -1935,9 +1957,10 @@ Notes: ## Recommended PR Order 1. MCP protocol parity tests and legacy stdio flip/removal decision. -2. FlowManager/service `Effect.fn` normalization. -3. Flow/client RPC stream and remaining service operation `Match` follow-ups. -4. Long-lived ref-backed `HashMap` state cleanup where clone helpers remain. +2. Flow/client RPC stream and remaining service operation `Match` follow-ups. +3. Long-lived ref-backed `HashMap` state cleanup where clone helpers remain. +4. Sibling service `Effect.fn` normalization where arrow-returned generators + still appear. ## No-Op Rules diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 0ac88a60..41085c87 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -199,7 +199,7 @@ export interface FlowManagerService extends AsyncProcessorRuntime Promise; readonly handleStopFlowEffect: (request: FlowRequest) => Effect.Effect; readonly pushFlowsConfig: () => Promise; - readonly pushFlowsConfigEffect: Effect.Effect; + readonly pushFlowsConfigEffect: Effect.Effect; readonly deleteFlowConfig: (id: string) => Promise; readonly deleteFlowConfigEffect: (id: string) => Effect.Effect; } @@ -276,190 +276,183 @@ const updateHandles = ( configClient: handles.configClient === undefined ? state.configClient : handles.configClient, })); -const configRequestEffect = ( +const configRequestEffect = Effect.fn("FlowManager.configRequest")(function* ( stateRef: SynchronizedRef.SynchronizedRef, request: ConfigRequest, -): Effect.Effect => - Effect.gen(function* () { - const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; - if (configClient === null) { - return yield* flowManagerError("config-request", "Config client not started"); - } - return yield* Effect.tryPromise({ - try: () => configClient.request(request), - catch: (cause) => flowManagerError("config-request", cause), - }); +) { + const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; + if (configClient === null) { + return yield* flowManagerError("config-request", "Config client not started"); + } + return yield* Effect.tryPromise({ + try: () => configClient.request(request), + catch: (cause) => flowManagerError("config-request", cause), }); +}); -const ensureDefaultBlueprintEffect = ( +const ensureDefaultBlueprintEffect = Effect.fn("FlowManager.ensureDefaultBlueprint")(function* ( stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const response = yield* configRequestEffect(stateRef, { - operation: "getvalues", - type: "flow-blueprint", - }); - if (configValues(response).some((value) => value.key === "default")) { - return; - } - - const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint"); - - yield* configRequestEffect(stateRef, { - operation: "put", - keys: ["flow-blueprint"], - values: { - default: defaultBlueprint, - }, - }); +) { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow-blueprint", }); + if (configValues(response).some((value) => value.key === "default")) { + return; + } -const refreshBlueprintsFromConfigEffect = ( - stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const response = yield* configRequestEffect(stateRef, { - operation: "getvalues", - type: "flow-blueprint", - }); - const next = new Map(); + const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint"); - for (const item of configValues(response)) { - const blueprint = blueprintFromConfig(item.value); - if (blueprint !== undefined) { - next.set(item.key, blueprint); - } - } - - if (!next.has("default")) { - next.set("default", DEFAULT_BLUEPRINT); - } - - yield* SynchronizedRef.update(stateRef, (state) => ({ - ...state, - blueprints: next, - })); + yield* configRequestEffect(stateRef, { + operation: "put", + keys: ["flow-blueprint"], + values: { + default: defaultBlueprint, + }, }); +}); -const refreshFlowsFromConfigEffect = ( +const refreshBlueprintsFromConfigEffect = Effect.fn("FlowManager.refreshBlueprintsFromConfig")(function* ( stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const response = yield* configRequestEffect(stateRef, { - operation: "getvalues", - type: "flow", - }); - const next = new Map(); +) { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow-blueprint", + }); + const next = new Map(); - for (const item of configValues(response)) { - const flow = flowFromConfig(item.key, item.value); - if (flow !== undefined) { - next.set(item.key, flow); - } + for (const item of configValues(response)) { + const blueprint = blueprintFromConfig(item.value); + if (blueprint !== undefined) { + next.set(item.key, blueprint); } + } - if (next.size === 0) { - const flowsResponse = yield* configRequestEffect(stateRef, { - operation: "getvalues", - type: "flows", + if (!next.has("default")) { + next.set("default", DEFAULT_BLUEPRINT); + } + + yield* SynchronizedRef.update(stateRef, (state) => ({ + ...state, + blueprints: next, + })); +}); + +const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConfig")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, +) { + const response = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flow", + }); + const next = new Map(); + + for (const item of configValues(response)) { + const flow = flowFromConfig(item.key, item.value); + if (flow !== undefined) { + next.set(item.key, flow); + } + } + + if (next.size === 0) { + const flowsResponse = yield* configRequestEffect(stateRef, { + operation: "getvalues", + type: "flows", + }); + for (const item of configValues(flowsResponse)) { + next.set(item.key, { + id: item.key, + blueprintName: "default", + description: "", + parameters: {}, + status: "running", }); - for (const item of configValues(flowsResponse)) { - next.set(item.key, { - id: item.key, - blueprintName: "default", - description: "", - parameters: {}, - status: "running", - }); - } } + } - yield* SynchronizedRef.update(stateRef, (state) => ({ - ...state, - flows: next, - })); - }); + yield* SynchronizedRef.update(stateRef, (state) => ({ + ...state, + flows: next, + })); +}); const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({ "blueprint-names": [...state.blueprints.keys()], }); -const handleGetBlueprintEffect = ( +const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(function* ( stateRef: SynchronizedRef.SynchronizedRef, request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); - } +) { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); + } - const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name); - if (blueprint === undefined) { - return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); - } + const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name); + if (blueprint === undefined) { + return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); + } - const definition = yield* encodeJson(blueprint, "encode-blueprint"); + const definition = yield* encodeJson(blueprint, "encode-blueprint"); + return { + "blueprint-definition": definition, + }; +}); + +const handlePutBlueprintEffect = Effect.fn("FlowManager.handlePutBlueprint")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +) { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("put-blueprint", "Missing blueprint-name"); + } + const rawDefinition = request["blueprint-definition"]; + if (rawDefinition === undefined) { + return yield* flowManagerError("put-blueprint", "Missing blueprint-definition"); + } + const definition = typeof rawDefinition === "string" + ? rawDefinition + : yield* encodeJson(rawDefinition, "encode-blueprint"); + + yield* configRequestEffect(stateRef, { + operation: "put", + keys: ["flow-blueprint"], + values: { [name]: definition }, + }); + yield* refreshBlueprintsFromConfigEffect(stateRef); + return {}; +}); + +const handleDeleteBlueprintEffect = Effect.fn("FlowManager.handleDeleteBlueprint")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + request: FlowRequest, +) { + const name = optionalString(request["blueprint-name"]); + if (name === undefined) { + return yield* flowManagerError("delete-blueprint", "Missing blueprint-name"); + } + + if (name === "default") { + return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint"); + } + + yield* configRequestEffect(stateRef, { + operation: "delete", + keys: ["flow-blueprint", name], + }); + yield* SynchronizedRef.update(stateRef, (state) => { + const blueprints = cloneBlueprints(state.blueprints); + blueprints.delete(name); return { - "blueprint-definition": definition, + ...state, + blueprints, }; }); -const handlePutBlueprintEffect = ( - stateRef: SynchronizedRef.SynchronizedRef, - request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("put-blueprint", "Missing blueprint-name"); - } - const rawDefinition = request["blueprint-definition"]; - if (rawDefinition === undefined) { - return yield* flowManagerError("put-blueprint", "Missing blueprint-definition"); - } - const definition = typeof rawDefinition === "string" - ? rawDefinition - : yield* encodeJson(rawDefinition, "encode-blueprint"); - - yield* configRequestEffect(stateRef, { - operation: "put", - keys: ["flow-blueprint"], - values: { [name]: definition }, - }); - yield* refreshBlueprintsFromConfigEffect(stateRef); - return {}; - }); - -const handleDeleteBlueprintEffect = ( - stateRef: SynchronizedRef.SynchronizedRef, - request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const name = optionalString(request["blueprint-name"]); - if (name === undefined) { - return yield* flowManagerError("delete-blueprint", "Missing blueprint-name"); - } - - if (name === "default") { - return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint"); - } - - yield* configRequestEffect(stateRef, { - operation: "delete", - keys: ["flow-blueprint", name], - }); - yield* SynchronizedRef.update(stateRef, (state) => { - const blueprints = cloneBlueprints(state.blueprints); - blueprints.delete(name); - return { - ...state, - blueprints, - }; - }); - - return {}; - }); + return {}; +}); const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({ "flow-ids": [...state.flows.keys()], @@ -471,103 +464,100 @@ const flowRecord = (inst: FlowInstance) => ({ parameters: inst.parameters, }); -const handleGetFlowEffect = ( +const handleGetFlowEffect = Effect.fn("FlowManager.handleGetFlow")(function* ( stateRef: SynchronizedRef.SynchronizedRef, request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - if (id === undefined) { - return yield* flowManagerError("get-flow", "Missing flow-id"); - } +) { + const id = optionalString(request["flow-id"]); + if (id === undefined) { + return yield* flowManagerError("get-flow", "Missing flow-id"); + } - const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id); - if (inst === undefined) { - return yield* flowManagerError("get-flow", `Flow not found: ${id}`); - } + const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id); + if (inst === undefined) { + return yield* flowManagerError("get-flow", `Flow not found: ${id}`); + } - const flow = yield* encodeJson(flowRecord(inst), "encode-flow"); - return { flow }; - }); + const flow = yield* encodeJson(flowRecord(inst), "encode-flow"); + return { flow }; +}); -const handleStartFlowEffect = ( +const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function* ( stateRef: SynchronizedRef.SynchronizedRef, request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - const blueprintName = optionalString(request["blueprint-name"]) ?? "default"; - const description = optionalString(request.description) ?? ""; - const parameters = isRecord(request.parameters) ? request.parameters : {}; +) { + const id = optionalString(request["flow-id"]); + const blueprintName = optionalString(request["blueprint-name"]) ?? "default"; + const description = optionalString(request.description) ?? ""; + const parameters = isRecord(request.parameters) ? request.parameters : {}; - if (id === undefined) { - return yield* flowManagerError("start-flow", "Missing flow-id"); + if (id === undefined) { + return yield* flowManagerError("start-flow", "Missing flow-id"); + } + + const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + if (state.flows.has(id)) { + return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`)); + } + if (!state.blueprints.has(blueprintName)) { + return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`)); } - const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { - if (state.flows.has(id)) { - return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`)); - } - if (!state.blueprints.has(blueprintName)) { - return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`)); - } - - const next: FlowInstance = { - id, - blueprintName, - description, - parameters, - status: "running", - }; - const flows = cloneFlows(state.flows); - flows.set(id, next); - return Effect.succeed(modifyResult(next, { - ...state, - flows, - })); - }); - - yield* Effect.log(`[FlowManager] Started flow "${inst.id}" with blueprint "${inst.blueprintName}"`); - yield* pushFlowsConfigEffect(stateRef); - - return {}; + const next: FlowInstance = { + id, + blueprintName, + description, + parameters, + status: "running", + }; + const flows = cloneFlows(state.flows); + flows.set(id, next); + return Effect.succeed(modifyResult(next, { + ...state, + flows, + })); }); -const handleStopFlowEffect = ( + yield* Effect.log(`[FlowManager] Started flow "${inst.id}" with blueprint "${inst.blueprintName}"`); + yield* pushFlowsConfigEffect(stateRef); + + return {}; +}); + +const handleStopFlowEffect = Effect.fn("FlowManager.handleStopFlow")(function* ( stateRef: SynchronizedRef.SynchronizedRef, request: FlowRequest, -): Effect.Effect => - Effect.gen(function* () { - const id = optionalString(request["flow-id"]); - if (id === undefined) { - return yield* flowManagerError("stop-flow", "Missing flow-id"); +) { + const id = optionalString(request["flow-id"]); + if (id === undefined) { + return yield* flowManagerError("stop-flow", "Missing flow-id"); + } + + const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + const current = state.flows.get(id); + if (current === undefined) { + return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`)); } - const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { - const current = state.flows.get(id); - if (current === undefined) { - return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`)); - } - - const flows = cloneFlows(state.flows); - flows.delete(id); - return Effect.succeed(modifyResult(current, { - ...state, - flows, - })); - }); - - yield* Effect.log(`[FlowManager] Stopped flow "${inst.id}"`); - yield* deleteFlowConfigEffect(stateRef, id); - yield* pushFlowsConfigEffect(stateRef); - - return {}; + const flows = cloneFlows(state.flows); + flows.delete(id); + return Effect.succeed(modifyResult(current, { + ...state, + flows, + })); }); -const pushFlowsConfigEffect = ( - stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { + yield* Effect.log(`[FlowManager] Stopped flow "${inst.id}"`); + yield* deleteFlowConfigEffect(stateRef, id); + yield* pushFlowsConfigEffect(stateRef); + + return {}; +}); + +const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")( + function* ( + stateRef: SynchronizedRef.SynchronizedRef, + ) { const state = yield* SynchronizedRef.get(stateRef); const configClient = state.configClient; if (configClient === null) return; @@ -601,149 +591,148 @@ const pushFlowsConfigEffect = ( catch: (cause) => flowManagerError("put-flow-records", cause), }); yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`); - }).pipe( - Effect.catch((err) => - Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }), + }, + (effect) => + effect.pipe( + Effect.catch((err) => + Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }), + ), ), - ); +); -const deleteFlowConfigEffect = ( +const deleteFlowConfigEffect = Effect.fn("FlowManager.deleteFlowConfig")(function* ( stateRef: SynchronizedRef.SynchronizedRef, id: string, -): Effect.Effect => - Effect.gen(function* () { - const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; - if (configClient === null) return; - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "delete", - keys: ["flows", id], - }), - catch: (cause) => flowManagerError("delete-flows-config", cause), - }); - yield* Effect.tryPromise({ - try: () => - configClient.request({ - operation: "delete", - keys: ["flow", id], - }), - catch: (cause) => flowManagerError("delete-flow-record", cause), - }); +) { + const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; + if (configClient === null) return; + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "delete", + keys: ["flows", id], + }), + catch: (cause) => flowManagerError("delete-flows-config", cause), }); + yield* Effect.tryPromise({ + try: () => + configClient.request({ + operation: "delete", + keys: ["flow", id], + }), + catch: (cause) => flowManagerError("delete-flow-record", cause), + }); +}); -const closeFlowManagerResourcesEffect = ( +const closeFlowManagerResourcesEffect = Effect.fn("FlowManager.closeResources")(function* ( stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const state = yield* SynchronizedRef.get(stateRef); +) { + const state = yield* SynchronizedRef.get(stateRef); - const consumer = state.consumer; - if (consumer !== null) { - yield* Effect.tryPromise({ - try: () => consumer.close(), - catch: (cause) => flowManagerError("consumer-close", cause), - }); - } - const responseProducer = state.responseProducer; - if (responseProducer !== null) { - yield* Effect.tryPromise({ - try: () => responseProducer.close(), - catch: (cause) => flowManagerError("response-producer-close", cause), - }); - } - const configClient = state.configClient; - if (configClient !== null) { - yield* Effect.tryPromise({ - try: () => configClient.stop(), - catch: (cause) => flowManagerError("config-client-stop", cause), - }); - } - - yield* updateHandles(stateRef, { - consumer: null, - responseProducer: null, - configClient: null, - }); - }); - -const consumeOnceEffect = ( - service: FlowManagerService, -): Effect.Effect => - Effect.gen(function* () { - const consumer = (yield* SynchronizedRef.get(service.state)).consumer; - if (consumer === null) { - return yield* flowManagerError("consume", "Flow request consumer not started"); - } - - const msg = yield* Effect.tryPromise({ - try: () => consumer.receive(2000), - catch: (cause) => flowManagerError("consume-receive", cause), - }); - if (msg === null) return; - - yield* service.handleMessageEffect(msg); + const consumer = state.consumer; + if (consumer !== null) { yield* Effect.tryPromise({ - try: () => consumer.acknowledge(msg), - catch: (cause) => flowManagerError("consume-acknowledge", cause), + try: () => consumer.close(), + catch: (cause) => flowManagerError("consumer-close", cause), }); - }); - -const runFlowManagerServiceEffect = ( - service: FlowManagerService, -): Effect.Effect => - Effect.gen(function* () { - const configClient = makeRequestResponse({ - pubsub: service.pubsub, - requestTopic: topics.configRequest, - responseTopic: topics.configResponse, - subscription: `${service.config.id}-config-client`, - }); - yield* updateHandles(service.state, { configClient }); + } + const responseProducer = state.responseProducer; + if (responseProducer !== null) { yield* Effect.tryPromise({ - try: () => configClient.start(), - catch: (cause) => flowManagerError("config-client-start", cause), + try: () => responseProducer.close(), + catch: (cause) => flowManagerError("response-producer-close", cause), }); - yield* ensureDefaultBlueprintEffect(service.state); - yield* refreshBlueprintsFromConfigEffect(service.state); - - const responseProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer({ - topic: topics.flowResponse, - schema: FlowResponseSchema, - }), - catch: (cause) => flowManagerError("response-producer", cause), + } + const configClient = state.configClient; + if (configClient !== null) { + yield* Effect.tryPromise({ + try: () => configClient.stop(), + catch: (cause) => flowManagerError("config-client-stop", cause), }); - yield* updateHandles(service.state, { responseProducer }); + } - const consumer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createConsumer({ - topic: topics.flowRequest, - subscription: `${service.config.id}-flow-request`, - schema: FlowRequestSchema, - }), - catch: (cause) => flowManagerError("consumer", cause), - }); - yield* updateHandles(service.state, { consumer }); - - yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`); - - yield* Effect.whileLoop({ - while: () => service.running, - body: () => - consumeOnceEffect(service).pipe( - Effect.catch((err) => { - if (!service.running) return Effect.void; - return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - ); - }), - ), - step: () => undefined, - }); + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, + configClient: null, }); +}); + +const consumeOnceEffect = Effect.fnUntraced(function* ( + service: FlowManagerService, +) { + const consumer = (yield* SynchronizedRef.get(service.state)).consumer; + if (consumer === null) { + return yield* flowManagerError("consume", "Flow request consumer not started"); + } + + const msg = yield* Effect.tryPromise({ + try: () => consumer.receive(2000), + catch: (cause) => flowManagerError("consume-receive", cause), + }); + if (msg === null) return; + + yield* service.handleMessageEffect(msg); + yield* Effect.tryPromise({ + try: () => consumer.acknowledge(msg), + catch: (cause) => flowManagerError("consume-acknowledge", cause), + }); +}); + +const runFlowManagerServiceEffect = Effect.fn("FlowManager.runService")(function* ( + service: FlowManagerService, +) { + const configClient = makeRequestResponse({ + pubsub: service.pubsub, + requestTopic: topics.configRequest, + responseTopic: topics.configResponse, + subscription: `${service.config.id}-config-client`, + }); + yield* updateHandles(service.state, { configClient }); + yield* Effect.tryPromise({ + try: () => configClient.start(), + catch: (cause) => flowManagerError("config-client-start", cause), + }); + yield* ensureDefaultBlueprintEffect(service.state); + yield* refreshBlueprintsFromConfigEffect(service.state); + + const responseProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.flowResponse, + schema: FlowResponseSchema, + }), + catch: (cause) => flowManagerError("response-producer", cause), + }); + yield* updateHandles(service.state, { responseProducer }); + + const consumer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createConsumer({ + topic: topics.flowRequest, + subscription: `${service.config.id}-flow-request`, + schema: FlowRequestSchema, + }), + catch: (cause) => flowManagerError("consumer", cause), + }); + yield* updateHandles(service.state, { consumer }); + + yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((err) => { + if (!service.running) return Effect.void; + return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); +}); export function makeFlowManagerService(config: ProcessorConfig): FlowManagerService { const state = SynchronizedRef.makeUnsafe(initialState()); @@ -762,41 +751,40 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ }); const baseStop = base.stop; - const handleOperationEffect = (request: FlowRequest): Effect.Effect => - Effect.gen(function* () { - const op = optionalString(request.operation); - yield* refreshBlueprintsFromConfigEffect(state); - yield* refreshFlowsFromConfigEffect(state); + const handleOperationEffect = Effect.fn("FlowManager.handleOperation")(function* (request: FlowRequest) { + const op = optionalString(request.operation); + yield* refreshBlueprintsFromConfigEffect(state); + yield* refreshFlowsFromConfigEffect(state); - switch (op) { - case "list-blueprints": - return handleListBlueprintsWithState(state.pipe(stateSnapshot)); + switch (op) { + case "list-blueprints": + return handleListBlueprintsWithState(state.pipe(stateSnapshot)); - case "put-blueprint": - return yield* handlePutBlueprintEffect(state, request); + case "put-blueprint": + return yield* handlePutBlueprintEffect(state, request); - case "get-blueprint": - return yield* handleGetBlueprintEffect(state, request); + case "get-blueprint": + return yield* handleGetBlueprintEffect(state, request); - case "delete-blueprint": - return yield* handleDeleteBlueprintEffect(state, request); + case "delete-blueprint": + return yield* handleDeleteBlueprintEffect(state, request); - case "list-flows": - return handleListFlowsWithState(state.pipe(stateSnapshot)); + case "list-flows": + return handleListFlowsWithState(state.pipe(stateSnapshot)); - case "get-flow": - return yield* handleGetFlowEffect(state, request); + case "get-flow": + return yield* handleGetFlowEffect(state, request); - case "start-flow": - return yield* handleStartFlowEffect(state, request); + case "start-flow": + return yield* handleStartFlowEffect(state, request); - case "stop-flow": - return yield* handleStopFlowEffect(state, request); + case "stop-flow": + return yield* handleStopFlowEffect(state, request); - default: - return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); - } - }); + default: + return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); + } + }); const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message) { const request = yield* S.decodeUnknownEffect(FlowRequestSchema)(msg.value()).pipe( @@ -809,17 +797,16 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ return; } - const sendResponse = (response: FlowResponse): Effect.Effect => - Effect.gen(function* () { - const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer; - if (responseProducer === null) { - return yield* flowManagerError("respond", "Flow response producer not started"); - } - yield* Effect.tryPromise({ - try: () => responseProducer.send(response, { id: requestId }), - catch: (cause) => flowManagerError("respond", cause), - }); + const sendResponse = Effect.fnUntraced(function* (response: FlowResponse) { + const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer; + if (responseProducer === null) { + return yield* flowManagerError("respond", "Flow response producer not started"); + } + yield* Effect.tryPromise({ + try: () => responseProducer.send(response, { id: requestId }), + catch: (cause) => flowManagerError("respond", cause), }); + }); yield* handleOperationEffect(request).pipe( Effect.flatMap(sendResponse),