From 67b5e0dd5bdecd012a79b4bf08cc23296f4463d5 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 06:53:21 -0500 Subject: [PATCH] Use HashMap for flow manager state --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 51 ++++++++++-- .../__tests__/flow-manager-service.test.ts | 14 ++-- ts/packages/flow/src/flow-manager/service.ts | 83 +++++++++---------- 3 files changed, 92 insertions(+), 56 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index bf0d8486..ff155fec 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2049,6 +2049,30 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: FlowManager HashMap State Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/flow-manager/service.ts` now stores long-lived + flow and blueprint state in immutable `HashMap` snapshots inside the + existing `SynchronizedRef`. + - Refresh, start, stop, delete, list, get, and config-push paths now use + `HashMap.get`, `HashMap.has`, `HashMap.set`, `HashMap.remove`, and + `HashMap.size` instead of cloned native `Map` state. + - List responses and config-push iteration convert `HashMap` state through + sorted entries only at the API/config boundary for deterministic output. + - `ts/packages/flow/src/__tests__/flow-manager-service.test.ts` now reads + service state through `HashMap.get` plus `Option` and keeps the test fake + pubsub `Map` as a compatibility fixture. + - The focused scan for native flow-manager map state is clean. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/flow-manager-service.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2073,10 +2097,13 @@ Notes: core state are complete: `kgCores` and `deCores` now use `HashMap` inside `SynchronizedRef`, and plain records remain only at persistence/API boundaries. - - FlowManager and Librarian ref-backed state slices are still valid larger - collection targets. Follow-up service work should focus on scoped layers, - schedules where polling semantics allow, and managed persistence providers - rather than direct mutable service fields. + - FlowManager operation dispatch, helper functions, and ref-backed flow / + blueprint state are complete: `flows` and `blueprints` now use `HashMap` + inside `SynchronizedRef`, and plain records remain only at config/API + boundaries. Librarian ref-backed state remains a larger collection target. + Follow-up service work should focus on scoped layers, schedules where + polling semantics allow, and managed persistence providers rather than + direct mutable service fields. - Flow service startup facades now consistently use `ManagedRuntime`, and local scripts should delegate to `runMain()` instead of adding local `.catch(console.error/process.exit)` wrappers. @@ -2181,9 +2208,9 @@ Notes: - Gateway dispatcher static service registries, streaming membership, and scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway term-bearing service membership sets now use Effect `HashSet` too. - - FlowManager, KnowledgeCore, and ConfigService `() => Effect.gen(...)` - factories are normalized to `Effect.fn` / `Effect.fnUntraced`. Librarian - helper factories still need a focused follow-up slice. + - FlowManager, KnowledgeCore, ConfigService, and Librarian `() => + Effect.gen(...)` factories are normalized to `Effect.fn` / + `Effect.fnUntraced`. - ConfigService and KnowledgeCore operation dispatch now use `effect/Match` with `Match.exhaustive`; FlowManager and Librarian operation dispatch now use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. @@ -2199,6 +2226,16 @@ Notes: - Long-lived `Map` / `Set` state in remaining ref-backed services can move toward Effect collections later; local pure traversal maps/sets remain no-ops. + - Fresh strict signal sweep after the 2026-06-04 helper and collection + slices found no production normal `Error`, raw `try`/`catch`, native + `switch`, or Effect-focused type assertions under `ts/packages`. + - Remaining real helper-normalization targets from the fresh sweep are + retrieval/document-rag, retrieval/graph-rag, embeddings/ollama, base + processor flow helpers, and one workbench atom helper. + - Remaining real long-lived native collection targets include + `gateway/rpc-protocol.ts`, base processor registries, Librarian service / + collection manager state, prompt template cache, and a workbench module + cache. Local traversal sets and test fakes remain no-op boundaries. ## Ranked Findings diff --git a/ts/packages/flow/src/__tests__/flow-manager-service.test.ts b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts index 0fa82463..dff32c9a 100644 --- a/ts/packages/flow/src/__tests__/flow-manager-service.test.ts +++ b/ts/packages/flow/src/__tests__/flow-manager-service.test.ts @@ -1,4 +1,4 @@ -import {Effect, SynchronizedRef} from "effect"; +import {Effect, HashMap, Option, SynchronizedRef} from "effect"; import {describe, expect, it} from "vitest"; import { topics, @@ -203,7 +203,7 @@ describe("FlowManagerService operations", () => { parameters: {limit: 3}, }); let state = await Effect.runPromise(SynchronizedRef.get(service.state)); - expect(state.flows.get("flow-a")).toMatchObject({ + expect(Option.getOrUndefined(HashMap.get(state.flows, "flow-a"))).toMatchObject({ id: "flow-a", blueprintName: "default", description: "alpha", @@ -217,7 +217,7 @@ describe("FlowManagerService operations", () => { }); state = await Effect.runPromise(SynchronizedRef.get(service.state)); - expect(state.flows.has("flow-a")).toBe(false); + expect(HashMap.has(state.flows, "flow-a")).toBe(false); expect(configClient.requests.map((request) => ({ operation: request.operation, keys: request.keys, @@ -248,13 +248,13 @@ describe("FlowManagerService operations", () => { await service.refreshBlueprintsFromConfig(); const state = await Effect.runPromise(SynchronizedRef.get(service.state)); - expect(state.blueprints.get("custom")).toMatchObject({ + expect(Option.getOrUndefined(HashMap.get(state.blueprints, "custom"))).toMatchObject({ description: "Custom", topics: {input: "topic.in"}, extra: true, }); - expect(state.blueprints.has("broken")).toBe(false); - expect(state.blueprints.has("default")).toBe(true); + expect(HashMap.has(state.blueprints, "broken")).toBe(false); + expect(HashMap.has(state.blueprints, "default")).toBe(true); }); it("serializes duplicate starts through the ref-backed map", async () => { @@ -268,7 +268,7 @@ describe("FlowManagerService operations", () => { ]); const state = await Effect.runPromise(SynchronizedRef.get(service.state)); - expect(state.flows.get("flow-a")).toMatchObject({id: "flow-a"}); + expect(Option.getOrUndefined(HashMap.get(state.flows, "flow-a"))).toMatchObject({id: "flow-a"}); expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1); expect(results.filter((result) => result.status === "rejected")).toHaveLength(1); }); diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 9bdd5095..1af460f9 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -34,7 +34,7 @@ import { import { makeProcessorProgram } from "@trustgraph/base"; import type { Message } from "@trustgraph/base"; import { NodeRuntime } from "@effect/platform-node"; -import { Duration, Effect, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect"; +import { Duration, Effect, HashMap, Layer, ManagedRuntime, Match, Option, SynchronizedRef } from "effect"; import * as S from "effect/Schema"; // ---------- Internal state types ---------- @@ -54,6 +54,9 @@ interface Blueprint { [key: string]: unknown; } +type FlowStore = HashMap.HashMap; +type BlueprintStore = HashMap.HashMap; + interface ConfigValueEntry { key: string; value: unknown; @@ -163,8 +166,8 @@ const DEFAULT_BLUEPRINT: Blueprint = { // ---------- Service ---------- interface FlowManagerServiceState { - readonly flows: Map; - readonly blueprints: Map; + readonly flows: FlowStore; + readonly blueprints: BlueprintStore; readonly consumer: BackendConsumer | null; readonly responseProducer: BackendProducer | null; readonly configClient: RequestResponse | null; @@ -205,10 +208,11 @@ export interface FlowManagerService extends AsyncProcessorRuntime { - const blueprints = new Map(); - blueprints.set("default", DEFAULT_BLUEPRINT); + const blueprints = HashMap.empty().pipe( + HashMap.set("default", DEFAULT_BLUEPRINT), + ); return { - flows: new Map(), + flows: HashMap.empty(), blueprints, consumer: null, responseProducer: null, @@ -219,11 +223,14 @@ const initialState = (): FlowManagerServiceState => { const isStringRecord = (value: unknown): value is Record => isRecord(value) && Object.values(value).every((item) => typeof item === "string"); -const cloneFlows = (source: Map): Map => - new Map(source); +const getHashMapValue = (store: HashMap.HashMap, key: K): V | undefined => + Option.getOrUndefined(HashMap.get(store, key)); -const cloneBlueprints = (source: Map): Map => - new Map(source); +const sortedEntries = (store: HashMap.HashMap): ReadonlyArray => + HashMap.toEntries(store).sort(([left], [right]) => left.localeCompare(right)); + +const sortedKeys = (store: HashMap.HashMap): Array => + sortedEntries(store).map(([key]) => key); const stateSnapshot = ( stateRef: SynchronizedRef.SynchronizedRef, @@ -319,17 +326,17 @@ const refreshBlueprintsFromConfigEffect = Effect.fn("FlowManager.refreshBlueprin operation: "getvalues", type: "flow-blueprint", }); - const next = new Map(); + let next = HashMap.empty(); for (const item of configValues(response)) { const blueprint = blueprintFromConfig(item.value); if (blueprint !== undefined) { - next.set(item.key, blueprint); + next = HashMap.set(next, item.key, blueprint); } } - if (!next.has("default")) { - next.set("default", DEFAULT_BLUEPRINT); + if (!HashMap.has(next, "default")) { + next = HashMap.set(next, "default", DEFAULT_BLUEPRINT); } yield* SynchronizedRef.update(stateRef, (state) => ({ @@ -345,22 +352,22 @@ const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConf operation: "getvalues", type: "flow", }); - const next = new Map(); + let next = HashMap.empty(); for (const item of configValues(response)) { const flow = flowFromConfig(item.key, item.value); if (flow !== undefined) { - next.set(item.key, flow); + next = HashMap.set(next, item.key, flow); } } - if (next.size === 0) { + if (HashMap.size(next) === 0) { const flowsResponse = yield* configRequestEffect(stateRef, { operation: "getvalues", type: "flows", }); for (const item of configValues(flowsResponse)) { - next.set(item.key, { + next = HashMap.set(next, item.key, { id: item.key, blueprintName: "default", description: "", @@ -377,7 +384,7 @@ const refreshFlowsFromConfigEffect = Effect.fn("FlowManager.refreshFlowsFromConf }); const handleListBlueprintsWithState = (state: FlowManagerServiceState): FlowResponse => ({ - "blueprint-names": [...state.blueprints.keys()], + "blueprint-names": sortedKeys(state.blueprints), }); const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(function* ( @@ -389,7 +396,7 @@ const handleGetBlueprintEffect = Effect.fn("FlowManager.handleGetBlueprint")(fun return yield* flowManagerError("get-blueprint", "Missing blueprint-name"); } - const blueprint = (yield* SynchronizedRef.get(stateRef)).blueprints.get(name); + const blueprint = getHashMapValue((yield* SynchronizedRef.get(stateRef)).blueprints, name); if (blueprint === undefined) { return yield* flowManagerError("get-blueprint", `Blueprint not found: ${name}`); } @@ -442,20 +449,16 @@ const handleDeleteBlueprintEffect = Effect.fn("FlowManager.handleDeleteBlueprint operation: "delete", keys: ["flow-blueprint", name], }); - yield* SynchronizedRef.update(stateRef, (state) => { - const blueprints = cloneBlueprints(state.blueprints); - blueprints.delete(name); - return { - ...state, - blueprints, - }; - }); + yield* SynchronizedRef.update(stateRef, (state) => ({ + ...state, + blueprints: HashMap.remove(state.blueprints, name), + })); return {}; }); const handleListFlowsWithState = (state: FlowManagerServiceState): FlowResponse => ({ - "flow-ids": [...state.flows.keys()], + "flow-ids": sortedKeys(state.flows), }); const flowRecord = (inst: FlowInstance) => ({ @@ -473,7 +476,7 @@ const handleGetFlowEffect = Effect.fn("FlowManager.handleGetFlow")(function* ( return yield* flowManagerError("get-flow", "Missing flow-id"); } - const inst = (yield* SynchronizedRef.get(stateRef)).flows.get(id); + const inst = getHashMapValue((yield* SynchronizedRef.get(stateRef)).flows, id); if (inst === undefined) { return yield* flowManagerError("get-flow", `Flow not found: ${id}`); } @@ -496,10 +499,10 @@ const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function* } const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { - if (state.flows.has(id)) { + if (HashMap.has(state.flows, id)) { return Effect.fail(flowManagerError("start-flow", `Flow already exists: ${id}`)); } - if (!state.blueprints.has(blueprintName)) { + if (!HashMap.has(state.blueprints, blueprintName)) { return Effect.fail(flowManagerError("start-flow", `Blueprint not found: ${blueprintName}`)); } @@ -510,11 +513,9 @@ const handleStartFlowEffect = Effect.fn("FlowManager.handleStartFlow")(function* parameters, status: "running", }; - const flows = cloneFlows(state.flows); - flows.set(id, next); return Effect.succeed(modifyResult(next, { ...state, - flows, + flows: HashMap.set(state.flows, id, next), })); }); @@ -534,16 +535,14 @@ const handleStopFlowEffect = Effect.fn("FlowManager.handleStopFlow")(function* ( } const inst = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { - const current = state.flows.get(id); + const current = getHashMapValue(state.flows, id); if (current === undefined) { return Effect.fail(flowManagerError("stop-flow", `Flow not found: ${id}`)); } - const flows = cloneFlows(state.flows); - flows.delete(id); return Effect.succeed(modifyResult(current, { ...state, - flows, + flows: HashMap.remove(state.flows, id), })); }); @@ -564,8 +563,8 @@ const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")( const flowsConfig: Record }> = {}; const flowRecords: Record = {}; - for (const [id, inst] of state.flows) { - const blueprint = state.blueprints.get(inst.blueprintName); + for (const [id, inst] of sortedEntries(state.flows)) { + const blueprint = getHashMapValue(state.blueprints, inst.blueprintName); if (blueprint !== undefined) { flowsConfig[id] = { topics: blueprint.topics }; flowRecords[id] = yield* encodeJson(flowRecord(inst), "encode-flow-config"); @@ -590,7 +589,7 @@ const pushFlowsConfigEffect = Effect.fn("FlowManager.pushFlowsConfig")( }), catch: (cause) => flowManagerError("put-flow-records", cause), }); - yield* Effect.log(`[FlowManager] Pushed flows config (${state.flows.size} active flows)`); + yield* Effect.log(`[FlowManager] Pushed flows config (${HashMap.size(state.flows)} active flows)`); }, (effect) => effect.pipe(