Decode flow definitions with schema

This commit is contained in:
elpresidank 2026-06-02 02:49:42 -05:00
parent 4ec7e72532
commit 3070ce2b47
3 changed files with 84 additions and 28 deletions

View file

@ -136,6 +136,10 @@ class FlowProcessorBackend implements PubSubBackend {
}
pushConfig(version: number, flows: Record<string, unknown>): void {
this.pushFlowConfig(version, flows);
}
pushFlowConfig(version: number, flows: unknown): void {
this.configConsumer.push(createMessage({ version, config: { flows } }));
}
}
@ -254,4 +258,44 @@ describe("Effect-native FlowProcessor runtime", () => {
expect(backend.closeCount).toBe(1);
}),
);
it.effect(
"schema-decodes flow definitions before starting resources",
Effect.fnUntraced(function* () {
const backend = new FlowProcessorBackend();
const events: Array<string> = [];
yield* Effect.scoped(
Effect.gen(function* () {
const fiber = yield* runFlowProcessorDefinitionScoped({
id: "schema-flow-processor-test",
pubsub: backend,
specifications: [makeProducerSpec<string>("output")],
configHandlers: [
(_config, version) => Effect.sync(() => {
events.push(`handler:${version}`);
}),
],
}).pipe(
Effect.provide(MessagingRuntimeLive),
Effect.provide(PubSub.layer(backend)),
Effect.provide(fastMessagingConfig),
Effect.forkChild,
);
yield* waitFor(() => backend.consumerOptions.length === 1, "schema config subscription");
backend.pushFlowConfig(1, { default: { topics: { output: 42 } } });
yield* waitFor(() => backend.configConsumer.acknowledged.length === 1, "schema config ack");
yield* Fiber.interrupt(fiber);
}),
);
expect(backend.producers).toHaveLength(0);
expect(events).toEqual(["handler:1"]);
expect(backend.configConsumer.closeCount).toBeGreaterThanOrEqual(1);
expect(backend.closeCount).toBe(1);
}),
);
});

View file

@ -38,7 +38,7 @@ import {
import { makePubSubService, PubSub } from "../backend/pubsub.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as Predicate from "effect/Predicate";
import * as O from "effect/Option";
import * as S from "effect/Schema";
interface ConfigPush {
@ -105,19 +105,14 @@ const ConfigPushSchema = S.Struct({
config: S.Record(S.String, S.Unknown),
});
const isStringRecord = (value: unknown): value is Record<string, unknown> =>
Predicate.isObject(value) && !Array.isArray(value);
const FlowDefinitionSchema = S.Struct({
topics: S.optionalKey(S.Record(S.String, S.String)),
parameters: S.optionalKey(S.Record(S.String, S.Unknown)),
});
const isTopicsRecord = (value: unknown): value is Record<string, string> =>
isStringRecord(value) && Object.values(value).every((item) => typeof item === "string");
const FlowDefinitionsSchema = S.Record(S.String, FlowDefinitionSchema);
const isFlowDefinition = (value: unknown): value is FlowDefinition => {
if (!isStringRecord(value)) return false;
const topics = value.topics;
const parameters = value.parameters;
return (topics === undefined || isTopicsRecord(topics)) &&
(parameters === undefined || isStringRecord(parameters));
};
const decodeFlowDefinitions = S.decodeUnknownOption(FlowDefinitionsSchema);
export function runFlowProcessorDefinitionScoped<
FlowRequirements = never,
@ -220,12 +215,14 @@ export function runFlowProcessorDefinitionScoped<
yield* Effect.log(`[${options.id}] No flows in config push, skipping`);
return;
}
if (!isStringRecord(flowDefs)) {
const decodedFlowDefs = decodeFlowDefinitions(flowDefs);
if (O.isNone(decodedFlowDefs)) {
yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`);
return;
}
const flowDefinitions = decodedFlowDefs.value;
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefs).pipe(
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe(
Effect.catch((error) => Effect.succeed(String(error))),
);
if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) {
@ -235,19 +232,14 @@ export function runFlowProcessorDefinitionScoped<
lastFlowsJson = flowsJson;
for (const [name, activeFlow] of flows) {
if (!(name in flowDefs)) {
if (!(name in flowDefinitions)) {
yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`);
yield* closeFlowEffect(name, activeFlow);
flows.delete(name);
}
}
for (const [name, defn] of Object.entries(flowDefs)) {
if (!isFlowDefinition(defn)) {
yield* Effect.logWarning(`[${options.id}] Skipping flow "${name}": definition is not an object`);
continue;
}
for (const [name, defn] of Object.entries(flowDefinitions)) {
const existing = flows.get(name);
if (existing !== undefined) {
yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`);