Use HashMap for flow manager state

This commit is contained in:
elpresidank 2026-06-04 06:53:21 -05:00
parent 9eaa1a2c1e
commit 67b5e0dd5b
3 changed files with 92 additions and 56 deletions

View file

@ -1,4 +1,4 @@
import {Effect, SynchronizedRef} from "effect";
import {Effect, HashMap, Option, SynchronizedRef} from "effect";
import {describe, expect, it} from "vitest";
import {
topics,
@ -203,7 +203,7 @@ describe("FlowManagerService operations", () => {
parameters: {limit: 3},
});
let state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.get("flow-a")).toMatchObject({
expect(Option.getOrUndefined(HashMap.get(state.flows, "flow-a"))).toMatchObject({
id: "flow-a",
blueprintName: "default",
description: "alpha",
@ -217,7 +217,7 @@ describe("FlowManagerService operations", () => {
});
state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.has("flow-a")).toBe(false);
expect(HashMap.has(state.flows, "flow-a")).toBe(false);
expect(configClient.requests.map((request) => ({
operation: request.operation,
keys: request.keys,
@ -248,13 +248,13 @@ describe("FlowManagerService operations", () => {
await service.refreshBlueprintsFromConfig();
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.blueprints.get("custom")).toMatchObject({
expect(Option.getOrUndefined(HashMap.get(state.blueprints, "custom"))).toMatchObject({
description: "Custom",
topics: {input: "topic.in"},
extra: true,
});
expect(state.blueprints.has("broken")).toBe(false);
expect(state.blueprints.has("default")).toBe(true);
expect(HashMap.has(state.blueprints, "broken")).toBe(false);
expect(HashMap.has(state.blueprints, "default")).toBe(true);
});
it("serializes duplicate starts through the ref-backed map", async () => {
@ -268,7 +268,7 @@ describe("FlowManagerService operations", () => {
]);
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.get("flow-a")).toMatchObject({id: "flow-a"});
expect(Option.getOrUndefined(HashMap.get(state.flows, "flow-a"))).toMatchObject({id: "flow-a"});
expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1);
expect(results.filter((result) => result.status === "rejected")).toHaveLength(1);
});

View file

@ -34,7 +34,7 @@ import {
import { makeProcessorProgram } from "@trustgraph/base";
import type { Message } from "@trustgraph/base";
import { NodeRuntime } from "@effect/platform-node";
import { Duration, Effect, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect";
import { Duration, Effect, HashMap, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect";
import * as S from "effect/Schema";
// ---------- Internal state types ----------
@ -54,6 +54,9 @@ interface Blueprint {
[key: string]: unknown;
}
type FlowStore = HashMap.HashMap<string, FlowInstance>;
type BlueprintStore = HashMap.HashMap<string, Blueprint>;
interface ConfigValueEntry {
key: string;
value: unknown;
@ -163,8 +166,8 @@ const DEFAULT_BLUEPRINT: Blueprint = {
// ---------- Service ----------
interface FlowManagerServiceState {
readonly flows: Map<string, FlowInstance>;
readonly blueprints: Map<string, Blueprint>;
readonly flows: FlowStore;
readonly blueprints: BlueprintStore;
readonly consumer: BackendConsumer<FlowRequest> | null;
readonly responseProducer: BackendProducer<FlowResponse> | null;
readonly configClient: RequestResponse<ConfigRequest, ConfigResponse> | null;
@ -205,10 +208,11 @@ export interface FlowManagerService extends AsyncProcessorRuntime<FlowManagerErr
}
const initialState = (): FlowManagerServiceState => {
const blueprints = new Map<string, Blueprint>();
blueprints.set("default", DEFAULT_BLUEPRINT);
const blueprints = HashMap.empty<string, Blueprint>().pipe(
HashMap.set("default", DEFAULT_BLUEPRINT),
);
return {
flows: new Map<string, FlowInstance>(),
flows: HashMap.empty<string, FlowInstance>(),
blueprints,
consumer: null,
responseProducer: null,
@ -219,11 +223,14 @@ const initialState = (): FlowManagerServiceState => {
const isStringRecord = (value: unknown): value is Record<string, string> =>
isRecord(value) && Object.values(value).every((item) => typeof item === "string");
const cloneFlows = (source: Map<string, FlowInstance>): Map<string, FlowInstance> =>
new Map(source);
const getHashMapValue = <K, V>(store: HashMap.HashMap<K, V>, key: K): V | undefined =>
Option.getOrUndefined(HashMap.get(store, key));
const cloneBlueprints = (source: Map<string, Blueprint>): Map<string, Blueprint> =>
new Map(source);
const sortedEntries = <A>(store: HashMap.HashMap<string, A>): ReadonlyArray<readonly [string, A]> =>
HashMap.toEntries(store).sort(([left], [right]) => left.localeCompare(right));
const sortedKeys = <A>(store: HashMap.HashMap<string, A>): Array<string> =>
sortedEntries(store).map(([key]) => key);
const stateSnapshot = (
stateRef: SynchronizedRef.SynchronizedRef<FlowManagerServiceState>,
@ -319,17 +326,17 @@ const refreshBlueprintsFromConfigEffect = Effect.fn("FlowManager.refreshBlueprin
operation: "getvalues",
type: "flow-blueprint",
});
const next = new Map<string, Blueprint>();
let next = HashMap.empty<string, Blueprint>();
for (const item of configValues(response)) {
const blueprint = blueprintFromConfig(item.value);
if (blueprint !== undefined) {
next.set(item.key, blueprint);
next = HashMap.set(next, item.key, blueprint);
}
}
if (!next.has("default")) {
next.set("default", DEFAULT_BLUEPRINT);
if (!HashMap.has(next, "default")) {
next = HashMap.set(next, "default", DEFAULT_BLUEPRINT);
}
yield* SynchronizedRef.update(stateRef, (state) => ({
@ -345,22 +352,22 @@ const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConf
operation: "getvalues",
type: "flow",
});
const next = new Map<string, FlowInstance>();
let next = HashMap.empty<string, FlowInstance>();
for (const item of configValues(response)) {
const flow = flowFromConfig(item.key, item.value);
if (flow !== undefined) {
next.set(item.key, flow);
next = HashMap.set(next, item.key, flow);
}
}
if (next.size === 0) {
if (HashMap.size(next) === 0) {
const flowsResponse = yield* configRequestEffect(stateRef, {
operation: "getvalues",
type: "flows",
});
for (const item of configValues(flowsResponse)) {
next.set(item.key, {
next = HashMap.set(next, item.key, {
id: item.key,
blueprintName: "default",
description: "",
@ -377,7 +384,7 @@ const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConf
});
const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({
"blueprint-names": [...state.blueprints.keys()],
"blueprint-names": sortedKeys(state.blueprints),
});
const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(function* (
@ -389,7 +396,7 @@ const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(fun
return yield* flowManagerError("get-blueprint", "Missing blueprint-name");
}
const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name);
const blueprint = getHashMapValue((yield* SynchronizedRef.get(stateRef)).blueprints, name);
if (blueprint === undefined) {
return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`);
}
@ -442,20 +449,16 @@ const handleDeleteBlueprintEffect = Effect.fn("FlowManager.handleDeleteBlueprint
operation: "delete",
keys: ["flow-blueprint", name],
});
yield* SynchronizedRef.update(stateRef, (state) => {
const blueprints = cloneBlueprints(state.blueprints);
blueprints.delete(name);
return {
...state,
blueprints,
};
});
yield* SynchronizedRef.update(stateRef, (state) => ({
...state,
blueprints: HashMap.remove(state.blueprints, name),
}));
return {};
});
const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({
"flow-ids": [...state.flows.keys()],
"flow-ids": sortedKeys(state.flows),
});
const flowRecord = (inst: FlowInstance) => ({
@ -473,7 +476,7 @@ const handleGetFlowEffect = Effect.fn("FlowManager.handleGetFlow")(function* (
return yield* flowManagerError("get-flow", "Missing flow-id");
}
const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id);
const inst = getHashMapValue((yield* SynchronizedRef.get(stateRef)).flows, id);
if (inst === undefined) {
return yield* flowManagerError("get-flow", `Flow not found: ${id}`);
}
@ -496,10 +499,10 @@ const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function*
}
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => {
if (state.flows.has(id)) {
if (HashMap.has(state.flows, id)) {
return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`));
}
if (!state.blueprints.has(blueprintName)) {
if (!HashMap.has(state.blueprints, blueprintName)) {
return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`));
}
@ -510,11 +513,9 @@ const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function*
parameters,
status: "running",
};
const flows = cloneFlows(state.flows);
flows.set(id, next);
return Effect.succeed(modifyResult(next, {
...state,
flows,
flows: HashMap.set(state.flows, id, next),
}));
});
@ -534,16 +535,14 @@ const handleStopFlowEffect = Effect.fn("FlowManager.handleStopFlow")(function* (
}
const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => {
const current = state.flows.get(id);
const current = getHashMapValue(state.flows, id);
if (current === undefined) {
return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`));
}
const flows = cloneFlows(state.flows);
flows.delete(id);
return Effect.succeed(modifyResult(current, {
...state,
flows,
flows: HashMap.remove(state.flows, id),
}));
});
@ -564,8 +563,8 @@ const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")(
const flowsConfig: Record<string, { topics: Record<string, string> }> = {};
const flowRecords: Record<string, string> = {};
for (const [id, inst] of state.flows) {
const blueprint = state.blueprints.get(inst.blueprintName);
for (const [id, inst] of sortedEntries(state.flows)) {
const blueprint = getHashMapValue(state.blueprints, inst.blueprintName);
if (blueprint !== undefined) {
flowsConfig[id] = { topics: blueprint.topics };
flowRecords[id] = yield* encodeJson(flowRecord(inst), "encode-flow-config");
@ -590,7 +589,7 @@ const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")(
}),
catch: (cause) => flowManagerError("put-flow-records", cause),
});
yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`);
yield* Effect.log(`[FlowManager] Pushed flows config (${HashMap.size(state.flows)} active flows)`);
},
(effect) =>
effect.pipe(