Use MutableHashMap for base flow registries

This commit is contained in:
elpresidank 2026-06-04 07:53:49 -05:00
parent fba0f97723
commit 4ffa84dbe7
3 changed files with 54 additions and 34 deletions

View file

@ -38,6 +38,7 @@ import {
import { makePubSubService, PubSub } from "../backend/pubsub.js";
import { loadMessagingRuntimeConfig } from "../runtime/index.ts";
import { Context, Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as O from "effect/Option";
import * as S from "effect/Schema";
@ -136,7 +137,7 @@ export function runFlowProcessorDefinitionScoped<
| FlowRequirements
| ConfigHandlerRequirements
> {
const flows = new Map<string, ActiveFlow>();
const flows = MutableHashMap.empty<string, ActiveFlow>();
let configConsumer: BackendConsumer<ConfigPush> | null = null;
let lastFlowsJson = "";
const isRunning = options.isRunning ?? (() => true);
@ -147,11 +148,11 @@ export function runFlowProcessorDefinitionScoped<
);
const closeAllFlowsEffect = Effect.gen(function* () {
const activeFlows = Array.from(flows.entries());
const activeFlows = Array.from(flows);
for (const [name, activeFlow] of activeFlows) {
yield* closeFlowEffect(name, activeFlow);
}
flows.clear();
MutableHashMap.clear(flows);
});
const closeConfigConsumerEffect = (): Effect.Effect<void> => {
@ -215,7 +216,7 @@ export function runFlowProcessorDefinitionScoped<
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe(
Effect.catch((error) => Effect.succeed(String(error))),
);
if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) {
if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && MutableHashMap.size(flows) > 0) {
yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`);
return;
}
@ -225,21 +226,21 @@ export function runFlowProcessorDefinitionScoped<
if (!(name in flowDefinitions)) {
yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`);
yield* closeFlowEffect(name, activeFlow);
flows.delete(name);
MutableHashMap.remove(flows, name);
}
}
for (const [name, defn] of Object.entries(flowDefinitions)) {
const existing = flows.get(name);
const existing = O.getOrUndefined(MutableHashMap.get(flows, name));
if (existing !== undefined) {
yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`);
yield* closeFlowEffect(name, existing);
flows.delete(name);
MutableHashMap.remove(flows, name);
}
yield* Effect.log(`[${options.id}] Starting flow "${name}"`);
const activeFlow = yield* startFlowEffect(name, defn);
flows.set(name, activeFlow);
MutableHashMap.set(flows, name, activeFlow);
yield* Effect.log(`[${options.id}] Flow "${name}" started`);
}
});

View file

@ -5,6 +5,7 @@
*/
import { Config as EffectConfig, Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import type { PubSubBackend } from "../backend/types.js";
@ -128,10 +129,10 @@ export function makeFlow<Requirements = never>(
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
): Flow<Requirements> {
const producers = new Map<string, EffectProducer<never>>();
const consumers = new Map<string, EffectConsumer>();
const requestors = new Map<string, EffectRequestResponse<never, unknown>>();
const parameters = new Map<string, unknown>();
const producers = MutableHashMap.empty<string, EffectProducer<never>>();
const consumers = MutableHashMap.empty<string, EffectConsumer>();
const requestors = MutableHashMap.empty<string, EffectRequestResponse<never, unknown>>();
const parameters = MutableHashMap.empty<string, unknown>();
let compatibilityScope: Scope.Closeable | null = null;
const compatibilityRuntime = ManagedRuntime.make(Layer.empty);
@ -162,14 +163,14 @@ export function makeFlow<Requirements = never>(
};
const getParameterEffect = (parameterName: string): Effect.Effect<unknown, FlowResourceNotFoundError> => {
const value = parameters.get(parameterName);
const value = O.getOrUndefined(MutableHashMap.get(parameters, parameterName));
return value === undefined
? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName))
: Effect.succeed(value);
};
const getParameter = (parameterName: string): unknown => {
const value = parameters.get(parameterName);
const value = O.getOrUndefined(MutableHashMap.get(parameters, parameterName));
if (value === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName);
return value;
};
@ -191,14 +192,14 @@ export function makeFlow<Requirements = never>(
const getProducerEffect = (
producerName: string,
): Effect.Effect<EffectProducer<never>, FlowResourceNotFoundError> => {
const producer = producers.get(producerName);
const producer = O.getOrUndefined(MutableHashMap.get(producers, producerName));
return producer === undefined
? Effect.fail(flowResourceNotFoundError(name, "producer", producerName))
: Effect.succeed(producer);
};
const getProducer = (producerName: string): EffectProducer<never> => {
const producer = producers.get(producerName);
const producer = O.getOrUndefined(MutableHashMap.get(producers, producerName));
if (producer === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
return producer;
};
@ -206,7 +207,7 @@ export function makeFlow<Requirements = never>(
const getRequestorEffect = (
requestorName: string,
): Effect.Effect<EffectRequestResponse<never, unknown>, FlowResourceNotFoundError> => {
const requestor = requestors.get(requestorName);
const requestor = O.getOrUndefined(MutableHashMap.get(requestors, requestorName));
return requestor === undefined
? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName))
: Effect.succeed(requestor);
@ -215,7 +216,7 @@ export function makeFlow<Requirements = never>(
const getRequestor = (
requestorName: string,
): EffectRequestResponse<never, unknown> => {
const requestor = requestors.get(requestorName);
const requestor = O.getOrUndefined(MutableHashMap.get(requestors, requestorName));
if (requestor === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
return requestor;
};
@ -251,7 +252,7 @@ export function makeFlow<Requirements = never>(
if (typeof producer === "string") {
return getProducerEffect(producer);
}
if (!producers.has(producer.name)) {
if (!MutableHashMap.has(producers, producer.name)) {
return Effect.fail(flowResourceNotFoundError(name, "producer", producer.name));
}
return producer.producerEffect(flow);
@ -269,7 +270,7 @@ export function makeFlow<Requirements = never>(
if (typeof requestor === "string") {
return getRequestorEffect(requestor);
}
if (!requestors.has(requestor.name)) {
if (!MutableHashMap.has(requestors, requestor.name)) {
return Effect.fail(flowResourceNotFoundError(name, "requestor", requestor.name));
}
return requestor.requestorEffect(flow);
@ -308,7 +309,7 @@ export function makeFlow<Requirements = never>(
if (typeof producer === "string") {
return toFlowProducer(getProducer(producer));
}
if (!producers.has(producer.name)) {
if (!MutableHashMap.has(producers, producer.name)) {
throw flowResourceNotFoundError(name, "producer", producer.name);
}
return toFlowProducer(compatibilityRuntime.runSync(producer.producerEffect(flow)));
@ -324,7 +325,7 @@ export function makeFlow<Requirements = never>(
if (typeof requestor === "string") {
return toFlowRequestor(getRequestor(requestor));
}
if (!requestors.has(requestor.name)) {
if (!MutableHashMap.has(requestors, requestor.name)) {
throw flowResourceNotFoundError(name, "requestor", requestor.name);
}
return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow)));
@ -388,26 +389,26 @@ export function makeFlow<Requirements = never>(
return compatibilityRuntime.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context));
},
clearResources(): void {
producers.clear();
consumers.clear();
requestors.clear();
parameters.clear();
MutableHashMap.clear(producers);
MutableHashMap.clear(consumers);
MutableHashMap.clear(requestors);
MutableHashMap.clear(parameters);
},
registerProducer<T>(registerName: string, producer: EffectProducer<T>): void {
producers.set(registerName, producer);
MutableHashMap.set(producers, registerName, producer);
},
registerConsumer(registerName: string, consumer: EffectConsumer): void {
consumers.set(registerName, consumer);
MutableHashMap.set(consumers, registerName, consumer);
},
registerRequestor<TReq, TRes>(registerName: string, rr: EffectRequestResponse<TReq, TRes>): void {
requestors.set(registerName, rr);
MutableHashMap.set(requestors, registerName, rr);
},
setParameter(parameterName: string, value: unknown): void {
parameters.set(parameterName, value);
MutableHashMap.set(parameters, parameterName, value);
},
producerEffect,
consumerEffect(consumerName: string): Effect.Effect<EffectConsumer, FlowResourceNotFoundError> {
const c = consumers.get(consumerName);
const c = O.getOrUndefined(MutableHashMap.get(consumers, consumerName));
return c === undefined
? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName))
: Effect.succeed(c);
@ -416,7 +417,7 @@ export function makeFlow<Requirements = never>(
parameterEffect,
producer,
consumer(consumerName: string): FlowConsumer {
const c = consumers.get(consumerName);
const c = O.getOrUndefined(MutableHashMap.get(consumers, consumerName));
if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName);
return {
stop: () => compatibilityRuntime.runPromise(c.stop),