Normalize FlowManager effects with Effect.fn

This commit is contained in:
elpresidank 2026-06-02 09:41:32 -05:00
parent 7f9541e4fa
commit 47221d6ab5
2 changed files with 414 additions and 404 deletions

View file

@ -459,6 +459,28 @@ Notes:
- `cd ts && bun run test` - `cd ts && bun run test`
- `git diff --check` - `git diff --check`
### 2026-06-02: FlowManager Effect.fn Normalization Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/flow-manager/service.ts` no longer defines
reusable helpers as arrow functions that immediately return
`Effect.gen(...)`.
- Config request, blueprint refresh, flow refresh, blueprint handlers, flow
handlers, config push/delete, resource close, consume, run, and local
operation handling now use named `Effect.fn` providers.
- Hot local helpers for one-message consumption and response sending use
`Effect.fnUntraced`.
- `pushFlowsConfigEffect` keeps its best-effort logging/swallowing contract
through the `Effect.fn` pipeable form instead of a wrapper generator.
- Verification:
- `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
### 2026-06-02: Librarian Schema And Assertion Cleanup Slice ### 2026-06-02: Librarian Schema And Assertion Cleanup Slice
- Status: migrated and root-verified. - Status: migrated and root-verified.
@ -1750,9 +1772,9 @@ Notes:
`MutableHashSet<string>`. Short-lived local traversal sets remain no-ops. `MutableHashSet<string>`. Short-lived local traversal sets remain no-ops.
- Gateway dispatcher static service registries, streaming membership, and - Gateway dispatcher static service registries, streaming membership, and
scoped requestor cache now use Effect `HashMap`/`HashSet`. scoped requestor cache now use Effect `HashMap`/`HashSet`.
- FlowManager and sibling service `() => Effect.gen(...)` factories remain a - FlowManager `() => Effect.gen(...)` factories are normalized to
broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled `Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a
after Duration and small collection slices. focused scan before treating them as valid migration targets.
- Long-lived `Map` / `Set` state in ref-backed services can move toward - Long-lived `Map` / `Set` state in ref-backed services can move toward
Effect collections later; local pure traversal maps/sets remain no-ops. Effect collections later; local pure traversal maps/sets remain no-ops.
@ -1935,9 +1957,10 @@ Notes:
## Recommended PR Order ## Recommended PR Order
1. MCP protocol parity tests and legacy stdio flip/removal decision. 1. MCP protocol parity tests and legacy stdio flip/removal decision.
2. FlowManager/service `Effect.fn` normalization. 2. Flow/client RPC stream and remaining service operation `Match` follow-ups.
3. Flow/client RPC stream and remaining service operation `Match` follow-ups. 3. Long-lived ref-backed `HashMap` state cleanup where clone helpers remain.
4. Long-lived ref-backed `HashMap` state cleanup where clone helpers remain. 4. Sibling service `Effect.fn` normalization where arrow-returned generators
still appear.
## No-Op Rules ## No-Op Rules

View file

@ -199,7 +199,7 @@ export interface FlowManagerService extends AsyncProcessorRuntime<FlowManagerErr
readonly handleStopFlow: (request: FlowRequest) => Promise<FlowResponse>; readonly handleStopFlow: (request: FlowRequest) => Promise<FlowResponse>;
readonly handleStopFlowEffect: (request: FlowRequest) => Effect.Effect<FlowResponse, FlowManagerError>; readonly handleStopFlowEffect: (request: FlowRequest) => Effect.Effect<FlowResponse, FlowManagerError>;
readonly pushFlowsConfig: () => Promise<void>; readonly pushFlowsConfig: () => Promise<void>;
readonly pushFlowsConfigEffect: Effect.Effect<void, never>; readonly pushFlowsConfigEffect: Effect.Effect<void>;
readonly deleteFlowConfig: (id: string) => Promise<void>; readonly deleteFlowConfig: (id: string) => Promise<void>;
readonly deleteFlowConfigEffect: (id: string) => Effect.Effect<void, FlowManagerError>; readonly deleteFlowConfigEffect: (id: string) => Effect.Effect<void, FlowManagerError>;
} }
@ -276,190 +276,183 @@ const updateHandles = (
configClient: handles.configClient === undefined ? state.configClient : handles.configClient, configClient: handles.configClient === undefined ? state.configClient : handles.configClient,
})); }));
const configRequestEffect = ( const configRequestEffect = Effect.fn("FlowManager.configRequest")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: ConfigRequest, request: ConfigRequest,
): Effect.Effect<ConfigResponse, FlowManagerError> => ) {
Effect.gen(function* () { const configClient = (yield* SynchronizedRef.get(stateRef)).configClient;
const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; if (configClient === null) {
if (configClient === null) { return yield* flowManagerError("config-request", "Config client not started");
return yield* flowManagerError("config-request", "Config client not started"); }
} return yield* Effect.tryPromise({
return yield* Effect.tryPromise({ try: () => configClient.request(request),
try: () => configClient.request(request), catch: (cause) => flowManagerError("config-request", cause),
catch: (cause) => flowManagerError("config-request", cause),
});
}); });
});
const ensureDefaultBlueprintEffect = ( const ensureDefaultBlueprintEffect = Effect.fn("FlowManager.ensureDefaultBlueprint")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
): Effect.Effect<void, FlowManagerError> => ) {
Effect.gen(function* () { const response = yield* configRequestEffect(stateRef, {
const response = yield* configRequestEffect(stateRef, { operation: "getvalues",
operation: "getvalues", type: "flow-blueprint",
type: "flow-blueprint",
});
if (configValues(response).some((value) => value.key === "default")) {
return;
}
const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint");
yield* configRequestEffect(stateRef, {
operation: "put",
keys: ["flow-blueprint"],
values: {
default: defaultBlueprint,
},
});
}); });
if (configValues(response).some((value) => value.key === "default")) {
return;
}
const refreshBlueprintsFromConfigEffect = ( const defaultBlueprint = yield* encodeJson(DEFAULT_BLUEPRINT, "encode-default-blueprint");
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
): Effect.Effect<void, FlowManagerError> =>
Effect.gen(function* () {
const response = yield* configRequestEffect(stateRef, {
operation: "getvalues",
type: "flow-blueprint",
});
const next = new Map<string, Blueprint>();
for (const item of configValues(response)) { yield* configRequestEffect(stateRef, {
const blueprint = blueprintFromConfig(item.value); operation: "put",
if (blueprint !== undefined) { keys: ["flow-blueprint"],
next.set(item.key, blueprint); values: {
} default: defaultBlueprint,
} },
if (!next.has("default")) {
next.set("default", DEFAULT_BLUEPRINT);
}
yield* SynchronizedRef.update(stateRef, (state) => ({
...state,
blueprints: next,
}));
}); });
});
const refreshFlowsFromConfigEffect = ( const refreshBlueprintsFromConfigEffect = Effect.fn("FlowManager.refreshBlueprintsFromConfig")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
): Effect.Effect<void, FlowManagerError> => ) {
Effect.gen(function* () { const response = yield* configRequestEffect(stateRef, {
const response = yield* configRequestEffect(stateRef, { operation: "getvalues",
operation: "getvalues", type: "flow-blueprint",
type: "flow", });
}); const next = new Map<string, Blueprint>();
const next = new Map<string, FlowInstance>();
for (const item of configValues(response)) { for (const item of configValues(response)) {
const flow = flowFromConfig(item.key, item.value); const blueprint = blueprintFromConfig(item.value);
if (flow !== undefined) { if (blueprint !== undefined) {
next.set(item.key, flow); next.set(item.key, blueprint);
}
} }
}
if (next.size === 0) { if (!next.has("default")) {
const flowsResponse = yield* configRequestEffect(stateRef, { next.set("default", DEFAULT_BLUEPRINT);
operation: "getvalues", }
type: "flows",
yield* SynchronizedRef.update(stateRef, (state) => ({
...state,
blueprints: next,
}));
});
const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConfig")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
) {
const response = yield* configRequestEffect(stateRef, {
operation: "getvalues",
type: "flow",
});
const next = new Map<string, FlowInstance>();
for (const item of configValues(response)) {
const flow = flowFromConfig(item.key, item.value);
if (flow !== undefined) {
next.set(item.key, flow);
}
}
if (next.size === 0) {
const flowsResponse = yield* configRequestEffect(stateRef, {
operation: "getvalues",
type: "flows",
});
for (const item of configValues(flowsResponse)) {
next.set(item.key, {
id: item.key,
blueprintName: "default",
description: "",
parameters: {},
status: "running",
}); });
for (const item of configValues(flowsResponse)) {
next.set(item.key, {
id: item.key,
blueprintName: "default",
description: "",
parameters: {},
status: "running",
});
}
} }
}
yield* SynchronizedRef.update(stateRef, (state) => ({ yield* SynchronizedRef.update(stateRef, (state) => ({
...state, ...state,
flows: next, flows: next,
})); }));
}); });
const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({ const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({
"blueprint-names": [...state.blueprints.keys()], "blueprint-names": [...state.blueprints.keys()],
}); });
const handleGetBlueprintEffect = ( const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest, request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> => ) {
Effect.gen(function* () { const name = optionalString(request["blueprint-name"]);
const name = optionalString(request["blueprint-name"]); if (name === undefined) {
if (name === undefined) { return yield* flowManagerError("get-blueprint", "Missing blueprint-name");
return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); }
}
const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name); const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name);
if (blueprint === undefined) { if (blueprint === undefined) {
return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`);
} }
const definition = yield* encodeJson(blueprint, "encode-blueprint"); const definition = yield* encodeJson(blueprint, "encode-blueprint");
return {
"blueprint-definition": definition,
};
});
const handlePutBlueprintEffect = Effect.fn("FlowManager.handlePutBlueprint")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest,
) {
const name = optionalString(request["blueprint-name"]);
if (name === undefined) {
return yield* flowManagerError("put-blueprint", "Missing blueprint-name");
}
const rawDefinition = request["blueprint-definition"];
if (rawDefinition === undefined) {
return yield* flowManagerError("put-blueprint", "Missing blueprint-definition");
}
const definition = typeof rawDefinition === "string"
? rawDefinition
: yield* encodeJson(rawDefinition, "encode-blueprint");
yield* configRequestEffect(stateRef, {
operation: "put",
keys: ["flow-blueprint"],
values: { [name]: definition },
});
yield* refreshBlueprintsFromConfigEffect(stateRef);
return {};
});
const handleDeleteBlueprintEffect = Effect.fn("FlowManager.handleDeleteBlueprint")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest,
) {
const name = optionalString(request["blueprint-name"]);
if (name === undefined) {
return yield* flowManagerError("delete-blueprint", "Missing blueprint-name");
}
if (name === "default") {
return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint");
}
yield* configRequestEffect(stateRef, {
operation: "delete",
keys: ["flow-blueprint", name],
});
yield* SynchronizedRef.update(stateRef, (state) => {
const blueprints = cloneBlueprints(state.blueprints);
blueprints.delete(name);
return { return {
"blueprint-definition": definition, ...state,
blueprints,
}; };
}); });
const handlePutBlueprintEffect = ( return {};
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, });
request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> =>
Effect.gen(function* () {
const name = optionalString(request["blueprint-name"]);
if (name === undefined) {
return yield* flowManagerError("put-blueprint", "Missing blueprint-name");
}
const rawDefinition = request["blueprint-definition"];
if (rawDefinition === undefined) {
return yield* flowManagerError("put-blueprint", "Missing blueprint-definition");
}
const definition = typeof rawDefinition === "string"
? rawDefinition
: yield* encodeJson(rawDefinition, "encode-blueprint");
yield* configRequestEffect(stateRef, {
operation: "put",
keys: ["flow-blueprint"],
values: { [name]: definition },
});
yield* refreshBlueprintsFromConfigEffect(stateRef);
return {};
});
const handleDeleteBlueprintEffect = (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> =>
Effect.gen(function* () {
const name = optionalString(request["blueprint-name"]);
if (name === undefined) {
return yield* flowManagerError("delete-blueprint", "Missing blueprint-name");
}
if (name === "default") {
return yield* flowManagerError("delete-blueprint", "Cannot delete the default blueprint");
}
yield* configRequestEffect(stateRef, {
operation: "delete",
keys: ["flow-blueprint", name],
});
yield* SynchronizedRef.update(stateRef, (state) => {
const blueprints = cloneBlueprints(state.blueprints);
blueprints.delete(name);
return {
...state,
blueprints,
};
});
return {};
});
const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({ const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({
"flow-ids": [...state.flows.keys()], "flow-ids": [...state.flows.keys()],
@ -471,103 +464,100 @@ const flowRecord = (inst: FlowInstance) => ({
parameters: inst.parameters, parameters: inst.parameters,
}); });
const handleGetFlowEffect = ( const handleGetFlowEffect = Effect.fn("FlowManager.handleGetFlow")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest, request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> => ) {
Effect.gen(function* () { const id = optionalString(request["flow-id"]);
const id = optionalString(request["flow-id"]); if (id === undefined) {
if (id === undefined) { return yield* flowManagerError("get-flow", "Missing flow-id");
return yield* flowManagerError("get-flow", "Missing flow-id"); }
}
const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id); const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id);
if (inst === undefined) { if (inst === undefined) {
return yield* flowManagerError("get-flow", `Flow not found: ${id}`); return yield* flowManagerError("get-flow", `Flow not found: ${id}`);
} }
const flow = yield* encodeJson(flowRecord(inst), "encode-flow"); const flow = yield* encodeJson(flowRecord(inst), "encode-flow");
return { flow }; return { flow };
}); });
const handleStartFlowEffect = ( const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest, request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> => ) {
Effect.gen(function* () { const id = optionalString(request["flow-id"]);
const id = optionalString(request["flow-id"]); const blueprintName = optionalString(request["blueprint-name"]) ?? "default";
const blueprintName = optionalString(request["blueprint-name"]) ?? "default"; const description = optionalString(request.description) ?? "";
const description = optionalString(request.description) ?? ""; const parameters = isRecord(request.parameters) ? request.parameters : {};
const parameters = isRecord(request.parameters) ? request.parameters : {};
if (id === undefined) { if (id === undefined) {
return yield* flowManagerError("start-flow", "Missing flow-id"); return yield* flowManagerError("start-flow", "Missing flow-id");
}
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => {
if (state.flows.has(id)) {
return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`));
}
if (!state.blueprints.has(blueprintName)) {
return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`));
} }
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { const next: FlowInstance = {
if (state.flows.has(id)) { id,
return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`)); blueprintName,
} description,
if (!state.blueprints.has(blueprintName)) { parameters,
return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`)); status: "running",
} };
const flows = cloneFlows(state.flows);
const next: FlowInstance = { flows.set(id, next);
id, return Effect.succeed(modifyResult(next, {
blueprintName, ...state,
description, flows,
parameters, }));
status: "running",
};
const flows = cloneFlows(state.flows);
flows.set(id, next);
return Effect.succeed(modifyResult(next, {
...state,
flows,
}));
});
yield* Effect.log(`[FlowManager] Started flow "${inst.id}" with blueprint "${inst.blueprintName}"`);
yield* pushFlowsConfigEffect(stateRef);
return {};
}); });
const handleStopFlowEffect = ( yield* Effect.log(`[FlowManager] Started flow "${inst.id}" with blueprint "${inst.blueprintName}"`);
yield* pushFlowsConfigEffect(stateRef);
return {};
});
const handleStopFlowEffect = Effect.fn("FlowManager.handleStopFlow")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
request: FlowRequest, request: FlowRequest,
): Effect.Effect<FlowResponse, FlowManagerError> => ) {
Effect.gen(function* () { const id = optionalString(request["flow-id"]);
const id = optionalString(request["flow-id"]); if (id === undefined) {
if (id === undefined) { return yield* flowManagerError("stop-flow", "Missing flow-id");
return yield* flowManagerError("stop-flow", "Missing flow-id"); }
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => {
const current = state.flows.get(id);
if (current === undefined) {
return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`));
} }
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { const flows = cloneFlows(state.flows);
const current = state.flows.get(id); flows.delete(id);
if (current === undefined) { return Effect.succeed(modifyResult(current, {
return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`)); ...state,
} flows,
}));
const flows = cloneFlows(state.flows);
flows.delete(id);
return Effect.succeed(modifyResult(current, {
...state,
flows,
}));
});
yield* Effect.log(`[FlowManager] Stopped flow "${inst.id}"`);
yield* deleteFlowConfigEffect(stateRef, id);
yield* pushFlowsConfigEffect(stateRef);
return {};
}); });
const pushFlowsConfigEffect = ( yield* Effect.log(`[FlowManager] Stopped flow "${inst.id}"`);
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, yield* deleteFlowConfigEffect(stateRef, id);
): Effect.Effect<void, never> => yield* pushFlowsConfigEffect(stateRef);
Effect.gen(function* () {
return {};
});
const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")(
function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
) {
const state = yield* SynchronizedRef.get(stateRef); const state = yield* SynchronizedRef.get(stateRef);
const configClient = state.configClient; const configClient = state.configClient;
if (configClient === null) return; if (configClient === null) return;
@ -601,149 +591,148 @@ const pushFlowsConfigEffect = (
catch: (cause) => flowManagerError("put-flow-records", cause), catch: (cause) => flowManagerError("put-flow-records", cause),
}); });
yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`); yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`);
}).pipe( },
Effect.catch((err) => (effect) =>
Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }), effect.pipe(
Effect.catch((err) =>
Effect.logError("[FlowManager] Failed to push flows config", { error: err.message }),
),
), ),
); );
const deleteFlowConfigEffect = ( const deleteFlowConfigEffect = Effect.fn("FlowManager.deleteFlowConfig")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
id: string, id: string,
): Effect.Effect<void, FlowManagerError> => ) {
Effect.gen(function* () { const configClient = (yield* SynchronizedRef.get(stateRef)).configClient;
const configClient = (yield* SynchronizedRef.get(stateRef)).configClient; if (configClient === null) return;
if (configClient === null) return; yield* Effect.tryPromise({
yield* Effect.tryPromise({ try: () =>
try: () => configClient.request({
configClient.request({ operation: "delete",
operation: "delete", keys: ["flows", id],
keys: ["flows", id], }),
}), catch: (cause) => flowManagerError("delete-flows-config", cause),
catch: (cause) => flowManagerError("delete-flows-config", cause),
});
yield* Effect.tryPromise({
try: () =>
configClient.request({
operation: "delete",
keys: ["flow", id],
}),
catch: (cause) => flowManagerError("delete-flow-record", cause),
});
}); });
yield* Effect.tryPromise({
try: () =>
configClient.request({
operation: "delete",
keys: ["flow", id],
}),
catch: (cause) => flowManagerError("delete-flow-record", cause),
});
});
const closeFlowManagerResourcesEffect = ( const closeFlowManagerResourcesEffect = Effect.fn("FlowManager.closeResources")(function* (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>, stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
): Effect.Effect<void, FlowManagerError> => ) {
Effect.gen(function* () { const state = yield* SynchronizedRef.get(stateRef);
const state = yield* SynchronizedRef.get(stateRef);
const consumer = state.consumer; const consumer = state.consumer;
if (consumer !== null) { if (consumer !== null) {
yield* Effect.tryPromise({
try: () => consumer.close(),
catch: (cause) => flowManagerError("consumer-close", cause),
});
}
const responseProducer = state.responseProducer;
if (responseProducer !== null) {
yield* Effect.tryPromise({
try: () => responseProducer.close(),
catch: (cause) => flowManagerError("response-producer-close", cause),
});
}
const configClient = state.configClient;
if (configClient !== null) {
yield* Effect.tryPromise({
try: () => configClient.stop(),
catch: (cause) => flowManagerError("config-client-stop", cause),
});
}
yield* updateHandles(stateRef, {
consumer: null,
responseProducer: null,
configClient: null,
});
});
const consumeOnceEffect = (
service: FlowManagerService,
): Effect.Effect<void, FlowManagerError> =>
Effect.gen(function* () {
const consumer = (yield* SynchronizedRef.get(service.state)).consumer;
if (consumer === null) {
return yield* flowManagerError("consume", "Flow request consumer not started");
}
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (cause) => flowManagerError("consume-receive", cause),
});
if (msg === null) return;
yield* service.handleMessageEffect(msg);
yield* Effect.tryPromise({ yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg), try: () => consumer.close(),
catch: (cause) => flowManagerError("consume-acknowledge", cause), catch: (cause) => flowManagerError("consumer-close", cause),
}); });
}); }
const responseProducer = state.responseProducer;
const runFlowManagerServiceEffect = ( if (responseProducer !== null) {
service: FlowManagerService,
): Effect.Effect<void, FlowManagerError> =>
Effect.gen(function* () {
const configClient = makeRequestResponse<ConfigRequest, ConfigResponse>({
pubsub: service.pubsub,
requestTopic: topics.configRequest,
responseTopic: topics.configResponse,
subscription: `${service.config.id}-config-client`,
});
yield* updateHandles(service.state, { configClient });
yield* Effect.tryPromise({ yield* Effect.tryPromise({
try: () => configClient.start(), try: () => responseProducer.close(),
catch: (cause) => flowManagerError("config-client-start", cause), catch: (cause) => flowManagerError("response-producer-close", cause),
}); });
yield* ensureDefaultBlueprintEffect(service.state); }
yield* refreshBlueprintsFromConfigEffect(service.state); const configClient = state.configClient;
if (configClient !== null) {
const responseProducer = yield* Effect.tryPromise({ yield* Effect.tryPromise({
try: () => try: () => configClient.stop(),
service.pubsub.createProducer<FlowResponse>({ catch: (cause) => flowManagerError("config-client-stop", cause),
topic: topics.flowResponse,
schema: FlowResponseSchema,
}),
catch: (cause) => flowManagerError("response-producer", cause),
}); });
yield* updateHandles(service.state, { responseProducer }); }
const consumer = yield* Effect.tryPromise({ yield* updateHandles(stateRef, {
try: () => consumer: null,
service.pubsub.createConsumer<FlowRequest>({ responseProducer: null,
topic: topics.flowRequest, configClient: null,
subscription: `${service.config.id}-flow-request`,
schema: FlowRequestSchema,
}),
catch: (cause) => flowManagerError("consumer", cause),
});
yield* updateHandles(service.state, { consumer });
yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`);
yield* Effect.whileLoop({
while: () => service.running,
body: () =>
consumeOnceEffect(service).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.void;
return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
}); });
});
const consumeOnceEffect = Effect.fnUntraced(function* (
service: FlowManagerService,
) {
const consumer = (yield* SynchronizedRef.get(service.state)).consumer;
if (consumer === null) {
return yield* flowManagerError("consume", "Flow request consumer not started");
}
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (cause) => flowManagerError("consume-receive", cause),
});
if (msg === null) return;
yield* service.handleMessageEffect(msg);
yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg),
catch: (cause) => flowManagerError("consume-acknowledge", cause),
});
});
const runFlowManagerServiceEffect = Effect.fn("FlowManager.runService")(function* (
service: FlowManagerService,
) {
const configClient = makeRequestResponse<ConfigRequest, ConfigResponse>({
pubsub: service.pubsub,
requestTopic: topics.configRequest,
responseTopic: topics.configResponse,
subscription: `${service.config.id}-config-client`,
});
yield* updateHandles(service.state, { configClient });
yield* Effect.tryPromise({
try: () => configClient.start(),
catch: (cause) => flowManagerError("config-client-start", cause),
});
yield* ensureDefaultBlueprintEffect(service.state);
yield* refreshBlueprintsFromConfigEffect(service.state);
const responseProducer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createProducer<FlowResponse>({
topic: topics.flowResponse,
schema: FlowResponseSchema,
}),
catch: (cause) => flowManagerError("response-producer", cause),
});
yield* updateHandles(service.state, { responseProducer });
const consumer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createConsumer<FlowRequest>({
topic: topics.flowRequest,
subscription: `${service.config.id}-flow-request`,
schema: FlowRequestSchema,
}),
catch: (cause) => flowManagerError("consumer", cause),
});
yield* updateHandles(service.state, { consumer });
yield* Effect.log(`[FlowManager] Listening on ${topics.flowRequest}`);
yield* Effect.whileLoop({
while: () => service.running,
body: () =>
consumeOnceEffect(service).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.void;
return Effect.logError("[FlowManager] Error in consume loop", { error: err.message }).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
});
export function makeFlowManagerService(config: ProcessorConfig): FlowManagerService { export function makeFlowManagerService(config: ProcessorConfig): FlowManagerService {
const state = SynchronizedRef.makeUnsafe(initialState()); const state = SynchronizedRef.makeUnsafe(initialState());
@ -762,41 +751,40 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ
}); });
const baseStop = base.stop; const baseStop = base.stop;
const handleOperationEffect = (request: FlowRequest): Effect.Effect<FlowResponse, FlowManagerError> => const handleOperationEffect = Effect.fn("FlowManager.handleOperation")(function* (request: FlowRequest) {
Effect.gen(function* () { const op = optionalString(request.operation);
const op = optionalString(request.operation); yield* refreshBlueprintsFromConfigEffect(state);
yield* refreshBlueprintsFromConfigEffect(state); yield* refreshFlowsFromConfigEffect(state);
yield* refreshFlowsFromConfigEffect(state);
switch (op) { switch (op) {
case "list-blueprints": case "list-blueprints":
return handleListBlueprintsWithState(state.pipe(stateSnapshot)); return handleListBlueprintsWithState(state.pipe(stateSnapshot));
case "put-blueprint": case "put-blueprint":
return yield* handlePutBlueprintEffect(state, request); return yield* handlePutBlueprintEffect(state, request);
case "get-blueprint": case "get-blueprint":
return yield* handleGetBlueprintEffect(state, request); return yield* handleGetBlueprintEffect(state, request);
case "delete-blueprint": case "delete-blueprint":
return yield* handleDeleteBlueprintEffect(state, request); return yield* handleDeleteBlueprintEffect(state, request);
case "list-flows": case "list-flows":
return handleListFlowsWithState(state.pipe(stateSnapshot)); return handleListFlowsWithState(state.pipe(stateSnapshot));
case "get-flow": case "get-flow":
return yield* handleGetFlowEffect(state, request); return yield* handleGetFlowEffect(state, request);
case "start-flow": case "start-flow":
return yield* handleStartFlowEffect(state, request); return yield* handleStartFlowEffect(state, request);
case "stop-flow": case "stop-flow":
return yield* handleStopFlowEffect(state, request); return yield* handleStopFlowEffect(state, request);
default: default:
return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`); return yield* flowManagerError("operation", `Unknown flow operation: ${op ?? ""}`);
} }
}); });
const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message<FlowRequest>) { const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message<FlowRequest>) {
const request = yield* S.decodeUnknownEffect(FlowRequestSchema)(msg.value()).pipe( const request = yield* S.decodeUnknownEffect(FlowRequestSchema)(msg.value()).pipe(
@ -809,17 +797,16 @@ export function makeFlowManagerService(config: ProcessorConfig): FlowManagerServ
return; return;
} }
const sendResponse = (response: FlowResponse): Effect.Effect<void, FlowManagerError> => const sendResponse = Effect.fnUntraced(function* (response: FlowResponse) {
Effect.gen(function* () { const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer;
const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer; if (responseProducer === null) {
if (responseProducer === null) { return yield* flowManagerError("respond", "Flow response producer not started");
return yield* flowManagerError("respond", "Flow response producer not started"); }
} yield* Effect.tryPromise({
yield* Effect.tryPromise({ try: () => responseProducer.send(response, { id: requestId }),
try: () => responseProducer.send(response, { id: requestId }), catch: (cause) => flowManagerError("respond", cause),
catch: (cause) => flowManagerError("respond", cause),
});
}); });
});
yield* handleOperationEffect(request).pipe( yield* handleOperationEffect(request).pipe(
Effect.flatMap(sendResponse), Effect.flatMap(sendResponse),