From 4ec7e72532a8f4f2e5b448b8c57b5d3737df02ce Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 02:45:11 -0500 Subject: [PATCH] Use managed runtimes for base processor facades --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 80 +++++++++++++++---- .../base/src/processor/async-processor.ts | 22 +++-- .../base/src/processor/flow-processor.ts | 8 +- ts/packages/base/src/processor/flow.ts | 21 ++--- ts/packages/base/src/spec/parameter-spec.ts | 3 +- 5 files changed, 93 insertions(+), 41 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index b4a4140a..272a5254 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,12 +12,12 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Service -entrypoint runtime slice: +Current signal counts from `ts/packages` after the 2026-06-02 Base processor +compatibility runtime slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 185 | +| `Effect.runPromise` | 169 | | `Map<` | 88 | | `WebSocket` | 72 | | `new Map` | 62 | @@ -62,6 +62,9 @@ Notes: replacing remaining flow service `run()` program facades with `ManagedRuntime` and routing local `ts/scripts/run-*` launchers through `runMain()`/`NodeRuntime.runMain`. +- The base processor compatibility runtime slice dropped the + `Effect.runPromise` count again by moving `AsyncProcessor`, `Flow`, and + `FlowProcessor` Promise compatibility facades onto `ManagedRuntime`. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -485,6 +488,38 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Base Processor Compatibility Runtime Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/processor/async-processor.ts` now uses a + `ManagedRuntime` for Promise compatibility methods, signal-shutdown + execution, and legacy `AsyncProcessor.launch`. + - `ts/packages/base/src/processor/flow.ts` now owns a per-flow + `ManagedRuntime` for `start`, `stop`, `runInCompatibilityScope`, and + Promise resource facades. + - `ts/packages/base/src/processor/flow-processor.ts` now uses a + `ManagedRuntime` for the public `start(context)` facade instead of a local + `Effect.runPromiseWith` runner. + - `ts/packages/base/src/spec/parameter-spec.ts` now routes legacy `add` + through `flow.runInCompatibilityScope(...)`, matching the other specs. + - Subagent checks confirmed `NodeRuntime` is process-entrypoint-only here; + `@trustgraph/base` should not add an `@effect/platform-node` dependency + for these compatibility facades. +- Remaining: + - Constructor `as unknown as` shims in base processors preserve + callable-plus-newable public exports and are compatibility boundaries for + this loop. + - Typed string lookup casts in `Flow` need a real typed-spec/key redesign; + `HashMap`/`MutableHashMap` alone cannot infer `T` from a bare string. +- Verification: + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `git diff --check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -504,12 +539,17 @@ Notes: - Persistence IO should move toward `FileSystem` or `KeyValueStore` where the installed beta has the needed provider surface. - Base messaging/processors: - - Subscriber queues/maps, processor/flow Promise compatibility, and dynamic - flow state should continue moving toward `Queue`, `Deferred`, - `SynchronizedRef`, `Schedule`, and scoped layers. - - Existing constructor shims and typed registries in base processors still - use type assertions; they need a typed factory/registry redesign rather - than more assertions. + - Processor/flow Promise compatibility now uses `ManagedRuntime`; keep + `NodeRuntime` only for process `runMain()` entrypoints. + - Subscriber queues/maps and dynamic flow state should continue moving + toward `Queue`, `Deferred`, `SynchronizedRef`, `Schedule`, and scoped + layers. + - Existing constructor shims preserve callable-plus-newable public exports; + removing them needs a public API split or real class redesign. + - Typed string registries in `Flow` need schema-backed parameters and + typed-spec/key accessors. Effect `HashMap`/`MutableHashMap` can improve + lookup ergonomics with `Option`, but it does not remove the string-key + type hole by itself. - Gateway/client: - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. Socket errors/JSON parsing now use tagged errors and Schema decoding. @@ -528,18 +568,25 @@ Notes: ## Ranked Findings -### P1: Base Processor Registry And Constructor Shims +### P1: Base Flow Definition Schemas And Typed Spec Accessors - TrustGraph evidence: - - `ts/packages/base/src/processor/async-processor.ts` - `ts/packages/base/src/processor/flow.ts` - `ts/packages/base/src/processor/flow-processor.ts` + - `ts/packages/base/src/spec/parameter-spec.ts` + - `ts/packages/base/src/spec/producer-spec.ts` + - `ts/packages/base/src/spec/request-response-spec.ts` - Effect primitives: - Schema-backed registries, `Context`, `Layer`, `Effect.fn`, `Option`, - `Predicate`. + `Predicate`, `HashMap`/`MutableHashMap`. - Rewrite shape: - - Replace constructor `as unknown as` shims with typed factory exports. - - Replace resource lookup casts with schema-backed typed registry helpers. + - Replace hand-rolled `isStringRecord` / `isFlowDefinition` narrowing with + Schema decoding plus `Option`/`Match`-style branches. + - Add schema-backed generic parameter specs and spec-object accessors such as + `flow.parameterEffect(spec)`, then keep string accessors as compatibility + escapes. + - Add typed spec-object accessors for producers and requestors so call sites + stop spelling generic string lookups. - Do not add assertions to quiet Effect channel inference problems. - Tests: - `cd ts && bun run --cwd packages/base test` @@ -598,7 +645,7 @@ Notes: ## Recommended PR Order -1. Base processor registry and constructor shim redesign. +1. Base flow definition schema decoding and typed spec accessors. 2. Gateway RPC callback and client streaming completion cleanup. 3. Storage/provider managed resource cleanup. 4. MCP parity/deletion decision and workbench platform polish. @@ -623,6 +670,9 @@ Do not flag these as rewrite blockers without additional proof: - Client `newableFactory` assertions that preserve vendored callable-plus-new API facades are compatibility boundaries unless the public constructor API is intentionally redesigned. +- Base `AsyncProcessor`, `Flow`, and `FlowProcessor` callable-plus-newable + export assertions are compatibility boundaries unless the public constructor + API is intentionally redesigned. ## Acceptance For Final Loop Completion diff --git a/ts/packages/base/src/processor/async-processor.ts b/ts/packages/base/src/processor/async-processor.ts index 4b811eb6..ee934de4 100644 --- a/ts/packages/base/src/processor/async-processor.ts +++ b/ts/packages/base/src/processor/async-processor.ts @@ -8,7 +8,7 @@ import type { PubSubBackend } from "../backend/types.js"; import { makeNatsBackend } from "../backend/nats.js"; -import { Context, Effect } from "effect"; +import { Context, Effect, Layer, ManagedRuntime } from "effect"; import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js"; import { loadProcessorRuntimeConfig } from "../runtime/config.js"; @@ -74,6 +74,8 @@ interface RegisteredSignalHandler { readonly handler: () => void; } +const asyncProcessorRuntime = ManagedRuntime.make(Layer.empty); + export function makeAsyncProcessor< RunError = ProcessorLifecycleError, RunRequirements = never, @@ -94,14 +96,10 @@ export function makeAsyncProcessor< } const shutdown = () => { - void Effect.runPromise( + void asyncProcessorRuntime.runPromise( Effect.log(`[${config.id}] Shutting down...`).pipe( - Effect.flatMap(() => - Effect.tryPromise({ - try: () => processor.stop(), - catch: (error) => processorLifecycleError(config.id, "signal-shutdown", error), - }), - ), + Effect.flatMap(() => processor.stopEffect), + Effect.mapError((error) => processorLifecycleError(config.id, "signal-shutdown", error)), ), ).then(() => process.exit(0), () => process.exit(1)); }; @@ -133,8 +131,8 @@ export function makeAsyncProcessor< registerConfigHandler: (handler) => { configHandlers.push(handler); }, - start: (context) => Effect.runPromiseWith(context)(processor.startEffect), - stop: () => Effect.runPromise(processor.stopEffect), + start: (context) => asyncProcessorRuntime.runPromise(Effect.provide(processor.startEffect, context)), + stop: () => asyncProcessorRuntime.runPromise(processor.stopEffect), onShutdown: (callback) => { shutdownCallbacks.push(callback); }, @@ -178,7 +176,7 @@ export function makeAsyncProcessor< }); return stopProcessor(); }, - run: (context) => Effect.runPromiseWith(context)(processor.runEffect), + run: (context) => asyncProcessorRuntime.runPromise(Effect.provide(processor.runEffect, context)), get runEffect() { if (options.runEffect !== undefined) { return options.runEffect(processor); @@ -208,7 +206,7 @@ export const AsyncProcessor = Object.assign( id: string, ): Promise { const ProcessorCtor = this; - return Effect.runPromise( + return asyncProcessorRuntime.runPromise( Effect.gen(function* () { const config = yield* loadProcessorRuntimeConfig(id); const processor = new ProcessorCtor(config); diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index 15cadadf..3a2f9e27 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -37,7 +37,7 @@ import { } from "../messaging/runtime.js"; import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; -import { Duration, Effect, Exit, Scope } from "effect"; +import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; @@ -346,6 +346,7 @@ export function makeFlowProcessor( const specifications: Array> = [ ...(options.specifications ?? []), ]; + const compatibilityRuntime = ManagedRuntime.make(Layer.empty); let processor: FlowProcessorRuntime; const base: AsyncProcessorRuntime< PubSubError | FlowRuntimeError | ProcessorLifecycleError, @@ -385,7 +386,7 @@ export function makeFlowProcessor( return makeStartEffect(); }, start: (context) => - Effect.runPromiseWith(context)( + compatibilityRuntime.runPromise(Effect.provide( Effect.gen(function* () { const pubsub = makePubSubService(base.pubsub); const messagingConfig = yield* loadMessagingRuntimeConfig(); @@ -401,7 +402,8 @@ export function makeFlowProcessor( ); yield* Effect.scoped(start); }), - ), + context, + )), }; return processor; diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index e8d5bb55..87fc7d75 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -4,7 +4,7 @@ * Python reference: trustgraph-base/trustgraph/base/flow.py */ -import { Context, Effect, Exit, Scope } from "effect"; +import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; import type { PubSubBackend } from "../backend/types.js"; import { makePubSubService } from "../backend/pubsub.js"; import { @@ -69,6 +69,7 @@ export function makeFlow( const requestors = new Map>(); const parameters = new Map(); let compatibilityScope: Scope.Closeable | null = null; + const compatibilityRuntime = ManagedRuntime.make(Layer.empty); const ensureCompatibilityScopeEffect = Effect.fn("Flow.ensureCompatibilityScope")(function* () { if (compatibilityScope !== null) { @@ -107,7 +108,7 @@ export function makeFlow( }); }, start(context: Context.Context): Promise { - return Effect.runPromise( + return compatibilityRuntime.runPromise( Effect.gen(function* () { if (compatibilityScope !== null) { yield* flow.stopEffect(); @@ -117,7 +118,7 @@ export function makeFlow( ); }, stop(): Promise { - return Effect.runPromise(flow.stopEffect()); + return compatibilityRuntime.runPromise(flow.stopEffect()); }, stopEffect(): Effect.Effect { return Effect.gen(function* () { @@ -157,7 +158,7 @@ export function makeFlow( runtimePubsub: PubSubBackend, context: Context.Context, ): Promise { - return Effect.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context)); + return compatibilityRuntime.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context)); }, clearResources(): void { producers.clear(); @@ -207,16 +208,16 @@ export function makeFlow( const p = producers.get(producerName); if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName); return { - send: (id, message) => Effect.runPromise((p as EffectProducer).send(id, message)), - flush: () => Effect.runPromise(p.flush), - stop: () => Effect.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))), + send: (id, message) => compatibilityRuntime.runPromise((p as EffectProducer).send(id, message)), + flush: () => compatibilityRuntime.runPromise(p.flush), + stop: () => compatibilityRuntime.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))), }; }, consumer(consumerName: string): FlowConsumer { const c = consumers.get(consumerName); if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName); return { - stop: () => Effect.runPromise(c.stop), + stop: () => compatibilityRuntime.runPromise(c.stop), }; }, requestor(requestorName: string): FlowRequestor { @@ -224,13 +225,13 @@ export function makeFlow( if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName); return { request: (request, options) => - Effect.runPromise( + compatibilityRuntime.runPromise( (rr as EffectRequestResponse).request( request, toEffectRequestOptions(options), ), ), - stop: () => Effect.runPromise(rr.stop), + stop: () => compatibilityRuntime.runPromise(rr.stop), }; }, parameter(parameterName: string): T { diff --git a/ts/packages/base/src/spec/parameter-spec.ts b/ts/packages/base/src/spec/parameter-spec.ts index c09bc9b2..2c933158 100644 --- a/ts/packages/base/src/spec/parameter-spec.ts +++ b/ts/packages/base/src/spec/parameter-spec.ts @@ -20,6 +20,7 @@ export function makeParameterSpec(name: string): ParameterSpec { return { name, addEffect, - add: (flow, _pubsub, definition) => Effect.runPromise(addEffect(flow, definition)), + add: (flow, pubsub, definition, context) => + flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context), }; }