diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 272a5254..a6e9ddcc 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Base processor -compatibility runtime slice: +Current signal counts from `ts/packages` after the 2026-06-02 Base flow +definition schema slice: | Signal | Count | | --- | ---: | @@ -65,6 +65,9 @@ Notes: - The base processor compatibility runtime slice dropped the `Effect.runPromise` count again by moving `AsyncProcessor`, `Flow`, and `FlowProcessor` Promise compatibility facades onto `ManagedRuntime`. +- The base flow definition schema slice removed hand-rolled + `Predicate`/object narrowing from `flow-processor.ts`; signal counts are + unchanged because this was a validation-quality migration. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -520,6 +523,26 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Base Flow Definition Schema Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/processor/flow-processor.ts` now validates + `config.flows` with Effect Schema instead of local + `Predicate`/object/string-record guards. + - Invalid flow definition payloads still log/skip and preserve the existing + config-handler and acknowledgement behavior. + - `ts/packages/base/src/__tests__/flow-processor-runtime.test.ts` now covers + an invalid nested flow definition that is acknowledged without starting + resources. +- Verification: + - `bun run --cwd ts/packages/base test -- src/__tests__/flow-processor-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -568,11 +591,10 @@ Notes: ## Ranked Findings -### P1: Base Flow Definition Schemas And Typed Spec Accessors +### P1: Base Typed Spec Accessors - TrustGraph evidence: - `ts/packages/base/src/processor/flow.ts` - - `ts/packages/base/src/processor/flow-processor.ts` - `ts/packages/base/src/spec/parameter-spec.ts` - `ts/packages/base/src/spec/producer-spec.ts` - `ts/packages/base/src/spec/request-response-spec.ts` @@ -580,8 +602,6 @@ Notes: - Schema-backed registries, `Context`, `Layer`, `Effect.fn`, `Option`, `Predicate`, `HashMap`/`MutableHashMap`. - Rewrite shape: - - Replace hand-rolled `isStringRecord` / `isFlowDefinition` narrowing with - Schema decoding plus `Option`/`Match`-style branches. - Add schema-backed generic parameter specs and spec-object accessors such as `flow.parameterEffect(spec)`, then keep string accessors as compatibility escapes. @@ -645,7 +665,7 @@ Notes: ## Recommended PR Order -1. Base flow definition schema decoding and typed spec accessors. +1. Base typed spec accessors. 2. Gateway RPC callback and client streaming completion cleanup. 3. Storage/provider managed resource cleanup. 4. MCP parity/deletion decision and workbench platform polish. diff --git a/ts/packages/base/src/__tests__/flow-processor-runtime.test.ts b/ts/packages/base/src/__tests__/flow-processor-runtime.test.ts index 6c4d2f39..f11ad659 100644 --- a/ts/packages/base/src/__tests__/flow-processor-runtime.test.ts +++ b/ts/packages/base/src/__tests__/flow-processor-runtime.test.ts @@ -136,6 +136,10 @@ class FlowProcessorBackend implements PubSubBackend { } pushConfig(version: number, flows: Record): void { + this.pushFlowConfig(version, flows); + } + + pushFlowConfig(version: number, flows: unknown): void { this.configConsumer.push(createMessage({ version, config: { flows } })); } } @@ -254,4 +258,44 @@ describe("Effect-native FlowProcessor runtime", () => { expect(backend.closeCount).toBe(1); }), ); + + it.effect( + "schema-decodes flow definitions before starting resources", + Effect.fnUntraced(function* () { + const backend = new FlowProcessorBackend(); + const events: Array = []; + + yield* Effect.scoped( + Effect.gen(function* () { + const fiber = yield* runFlowProcessorDefinitionScoped({ + id: "schema-flow-processor-test", + pubsub: backend, + specifications: [makeProducerSpec("output")], + configHandlers: [ + (_config, version) => Effect.sync(() => { + events.push(`handler:${version}`); + }), + ], + }).pipe( + Effect.provide(MessagingRuntimeLive), + Effect.provide(PubSub.layer(backend)), + Effect.provide(fastMessagingConfig), + Effect.forkChild, + ); + + yield* waitFor(() => backend.consumerOptions.length === 1, "schema config subscription"); + + backend.pushFlowConfig(1, { default: { topics: { output: 42 } } }); + yield* waitFor(() => backend.configConsumer.acknowledged.length === 1, "schema config ack"); + + yield* Fiber.interrupt(fiber); + }), + ); + + expect(backend.producers).toHaveLength(0); + expect(events).toEqual(["handler:1"]); + expect(backend.configConsumer.closeCount).toBeGreaterThanOrEqual(1); + expect(backend.closeCount).toBe(1); + }), + ); }); diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index 3a2f9e27..3f156e6b 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -38,7 +38,7 @@ import { import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; -import * as Predicate from "effect/Predicate"; +import * as O from "effect/Option"; import * as S from "effect/Schema"; interface ConfigPush { @@ -105,19 +105,14 @@ const ConfigPushSchema = S.Struct({ config: S.Record(S.String, S.Unknown), }); -const isStringRecord = (value: unknown): value is Record => - Predicate.isObject(value) && !Array.isArray(value); +const FlowDefinitionSchema = S.Struct({ + topics: S.optionalKey(S.Record(S.String, S.String)), + parameters: S.optionalKey(S.Record(S.String, S.Unknown)), +}); -const isTopicsRecord = (value: unknown): value is Record => - isStringRecord(value) && Object.values(value).every((item) => typeof item === "string"); +const FlowDefinitionsSchema = S.Record(S.String, FlowDefinitionSchema); -const isFlowDefinition = (value: unknown): value is FlowDefinition => { - if (!isStringRecord(value)) return false; - const topics = value.topics; - const parameters = value.parameters; - return (topics === undefined || isTopicsRecord(topics)) && - (parameters === undefined || isStringRecord(parameters)); -}; +const decodeFlowDefinitions = S.decodeUnknownOption(FlowDefinitionsSchema); export function runFlowProcessorDefinitionScoped< FlowRequirements = never, @@ -220,12 +215,14 @@ export function runFlowProcessorDefinitionScoped< yield* Effect.log(`[${options.id}] No flows in config push, skipping`); return; } - if (!isStringRecord(flowDefs)) { + const decodedFlowDefs = decodeFlowDefinitions(flowDefs); + if (O.isNone(decodedFlowDefs)) { yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`); return; } + const flowDefinitions = decodedFlowDefs.value; - const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefs).pipe( + const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe( Effect.catch((error) => Effect.succeed(String(error))), ); if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) { @@ -235,19 +232,14 @@ export function runFlowProcessorDefinitionScoped< lastFlowsJson = flowsJson; for (const [name, activeFlow] of flows) { - if (!(name in flowDefs)) { + if (!(name in flowDefinitions)) { yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`); yield* closeFlowEffect(name, activeFlow); flows.delete(name); } } - for (const [name, defn] of Object.entries(flowDefs)) { - if (!isFlowDefinition(defn)) { - yield* Effect.logWarning(`[${options.id}] Skipping flow "${name}": definition is not an object`); - continue; - } - + for (const [name, defn] of Object.entries(flowDefinitions)) { const existing = flows.get(name); if (existing !== undefined) { yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`);