From 4ffa84dbe7ed46ad3d579178f1380d51d521d453 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 07:53:49 -0500 Subject: [PATCH] Use MutableHashMap for base flow registries --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 22 ++++++++- .../base/src/processor/flow-processor.ts | 17 ++++--- ts/packages/base/src/processor/flow.ts | 49 ++++++++++--------- 3 files changed, 54 insertions(+), 34 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 92a17380..b1c8302f 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2290,6 +2290,24 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Base Processor Registry MutableHashMap Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/base/src/processor/flow.ts` now stores producers, consumers, + requestors, and parameters in `MutableHashMap` instead of native `Map`. + - `ts/packages/base/src/processor/flow-processor.ts` now tracks active flow + scopes in `MutableHashMap`, including `Option`-based lookups and + `MutableHashMap.clear` / `remove` / `size` / `set` for lifecycle changes. +- Verification: + - `cd ts && bun run --cwd packages/base build` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/flow-processor-runtime.test.ts src/__tests__/flow-spec-runtime.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2475,8 +2493,8 @@ Notes: compatibility facades, gateway/librarian helpers, and CLI command actions. The workbench random id helper is complete; the remaining workbench `Effect.gen` match is a local one-shot command effect value. - - Remaining real long-lived native collection targets include base processor - registries and Librarian service state. The standalone Librarian collection + - Remaining real long-lived native collection target is Librarian service + state. Base processor registries, the standalone Librarian collection manager, prompt template cache, and workbench explain triples module cache are complete. Local traversal sets and test fakes remain no-op boundaries. diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index 82403d01..4428bc87 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -38,6 +38,7 @@ import { import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { loadMessagingRuntimeConfig } from "../runtime/index.ts"; import { Context, Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import * as MutableHashMap from "effect/MutableHashMap"; import * as O from "effect/Option"; import * as S from "effect/Schema"; @@ -136,7 +137,7 @@ export function runFlowProcessorDefinitionScoped< | FlowRequirements | ConfigHandlerRequirements > { - const flows = new Map(); + const flows = MutableHashMap.empty(); let configConsumer: BackendConsumer | null = null; let lastFlowsJson = ""; const isRunning = options.isRunning ?? (() => true); @@ -147,11 +148,11 @@ export function runFlowProcessorDefinitionScoped< ); const closeAllFlowsEffect = Effect.gen(function* () { - const activeFlows = Array.from(flows.entries()); + const activeFlows = Array.from(flows); for (const [name, activeFlow] of activeFlows) { yield* closeFlowEffect(name, activeFlow); } - flows.clear(); + MutableHashMap.clear(flows); }); const closeConfigConsumerEffect = (): Effect.Effect => { @@ -215,7 +216,7 @@ export function runFlowProcessorDefinitionScoped< const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe( Effect.catch((error) => Effect.succeed(String(error))), ); - if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) { + if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && MutableHashMap.size(flows) > 0) { yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`); return; } @@ -225,21 +226,21 @@ export function runFlowProcessorDefinitionScoped< if (!(name in flowDefinitions)) { yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`); yield* closeFlowEffect(name, activeFlow); - flows.delete(name); + MutableHashMap.remove(flows, name); } } for (const [name, defn] of Object.entries(flowDefinitions)) { - const existing = flows.get(name); + const existing = O.getOrUndefined(MutableHashMap.get(flows, name)); if (existing !== undefined) { yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`); yield* closeFlowEffect(name, existing); - flows.delete(name); + MutableHashMap.remove(flows, name); } yield* Effect.log(`[${options.id}] Starting flow "${name}"`); const activeFlow = yield* startFlowEffect(name, defn); - flows.set(name, activeFlow); + MutableHashMap.set(flows, name, activeFlow); yield* Effect.log(`[${options.id}] Flow "${name}" started`); } }); diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index c93a846b..4bcbc4aa 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -5,6 +5,7 @@ */ import { Config as EffectConfig, Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import * as MutableHashMap from "effect/MutableHashMap"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { PubSubBackend } from "../backend/types.js"; @@ -128,10 +129,10 @@ export function makeFlow( definition: FlowDefinition, specifications: ReadonlyArray>, ): Flow { - const producers = new Map>(); - const consumers = new Map(); - const requestors = new Map>(); - const parameters = new Map(); + const producers = MutableHashMap.empty>(); + const consumers = MutableHashMap.empty(); + const requestors = MutableHashMap.empty>(); + const parameters = MutableHashMap.empty(); let compatibilityScope: Scope.Closeable | null = null; const compatibilityRuntime = ManagedRuntime.make(Layer.empty); @@ -162,14 +163,14 @@ export function makeFlow( }; const getParameterEffect = (parameterName: string): Effect.Effect => { - const value = parameters.get(parameterName); + const value = O.getOrUndefined(MutableHashMap.get(parameters, parameterName)); return value === undefined ? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName)) : Effect.succeed(value); }; const getParameter = (parameterName: string): unknown => { - const value = parameters.get(parameterName); + const value = O.getOrUndefined(MutableHashMap.get(parameters, parameterName)); if (value === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName); return value; }; @@ -191,14 +192,14 @@ export function makeFlow( const getProducerEffect = ( producerName: string, ): Effect.Effect, FlowResourceNotFoundError> => { - const producer = producers.get(producerName); + const producer = O.getOrUndefined(MutableHashMap.get(producers, producerName)); return producer === undefined ? Effect.fail(flowResourceNotFoundError(name, "producer", producerName)) : Effect.succeed(producer); }; const getProducer = (producerName: string): EffectProducer => { - const producer = producers.get(producerName); + const producer = O.getOrUndefined(MutableHashMap.get(producers, producerName)); if (producer === undefined) throw flowResourceNotFoundError(name, "producer", producerName); return producer; }; @@ -206,7 +207,7 @@ export function makeFlow( const getRequestorEffect = ( requestorName: string, ): Effect.Effect, FlowResourceNotFoundError> => { - const requestor = requestors.get(requestorName); + const requestor = O.getOrUndefined(MutableHashMap.get(requestors, requestorName)); return requestor === undefined ? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName)) : Effect.succeed(requestor); @@ -215,7 +216,7 @@ export function makeFlow( const getRequestor = ( requestorName: string, ): EffectRequestResponse => { - const requestor = requestors.get(requestorName); + const requestor = O.getOrUndefined(MutableHashMap.get(requestors, requestorName)); if (requestor === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName); return requestor; }; @@ -251,7 +252,7 @@ export function makeFlow( if (typeof producer === "string") { return getProducerEffect(producer); } - if (!producers.has(producer.name)) { + if (!MutableHashMap.has(producers, producer.name)) { return Effect.fail(flowResourceNotFoundError(name, "producer", producer.name)); } return producer.producerEffect(flow); @@ -269,7 +270,7 @@ export function makeFlow( if (typeof requestor === "string") { return getRequestorEffect(requestor); } - if (!requestors.has(requestor.name)) { + if (!MutableHashMap.has(requestors, requestor.name)) { return Effect.fail(flowResourceNotFoundError(name, "requestor", requestor.name)); } return requestor.requestorEffect(flow); @@ -308,7 +309,7 @@ export function makeFlow( if (typeof producer === "string") { return toFlowProducer(getProducer(producer)); } - if (!producers.has(producer.name)) { + if (!MutableHashMap.has(producers, producer.name)) { throw flowResourceNotFoundError(name, "producer", producer.name); } return toFlowProducer(compatibilityRuntime.runSync(producer.producerEffect(flow))); @@ -324,7 +325,7 @@ export function makeFlow( if (typeof requestor === "string") { return toFlowRequestor(getRequestor(requestor)); } - if (!requestors.has(requestor.name)) { + if (!MutableHashMap.has(requestors, requestor.name)) { throw flowResourceNotFoundError(name, "requestor", requestor.name); } return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow))); @@ -388,26 +389,26 @@ export function makeFlow( return compatibilityRuntime.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context)); }, clearResources(): void { - producers.clear(); - consumers.clear(); - requestors.clear(); - parameters.clear(); + MutableHashMap.clear(producers); + MutableHashMap.clear(consumers); + MutableHashMap.clear(requestors); + MutableHashMap.clear(parameters); }, registerProducer(registerName: string, producer: EffectProducer): void { - producers.set(registerName, producer); + MutableHashMap.set(producers, registerName, producer); }, registerConsumer(registerName: string, consumer: EffectConsumer): void { - consumers.set(registerName, consumer); + MutableHashMap.set(consumers, registerName, consumer); }, registerRequestor(registerName: string, rr: EffectRequestResponse): void { - requestors.set(registerName, rr); + MutableHashMap.set(requestors, registerName, rr); }, setParameter(parameterName: string, value: unknown): void { - parameters.set(parameterName, value); + MutableHashMap.set(parameters, parameterName, value); }, producerEffect, consumerEffect(consumerName: string): Effect.Effect { - const c = consumers.get(consumerName); + const c = O.getOrUndefined(MutableHashMap.get(consumers, consumerName)); return c === undefined ? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName)) : Effect.succeed(c); @@ -416,7 +417,7 @@ export function makeFlow( parameterEffect, producer, consumer(consumerName: string): FlowConsumer { - const c = consumers.get(consumerName); + const c = O.getOrUndefined(MutableHashMap.get(consumers, consumerName)); if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName); return { stop: () => compatibilityRuntime.runPromise(c.stop),