Use managed runtimes for base processor facades

This commit is contained in:
elpresidank 2026-06-02 02:45:11 -05:00
parent 74955d6041
commit 4ec7e72532
5 changed files with 93 additions and 41 deletions

View file

@ -12,12 +12,12 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect` - Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 Service Current signal counts from `ts/packages` after the 2026-06-02 Base processor
entrypoint runtime slice: compatibility runtime slice:
| Signal | Count | | Signal | Count |
| --- | ---: | | --- | ---: |
| `Effect.runPromise` | 185 | | `Effect.runPromise` | 169 |
| `Map<` | 88 | | `Map<` | 88 |
| `WebSocket` | 72 | | `WebSocket` | 72 |
| `new Map` | 62 | | `new Map` | 62 |
@ -62,6 +62,9 @@ Notes:
replacing remaining flow service `run()` program facades with replacing remaining flow service `run()` program facades with
`ManagedRuntime` and routing local `ts/scripts/run-*` launchers through `ManagedRuntime` and routing local `ts/scripts/run-*` launchers through
`runMain()`/`NodeRuntime.runMain`. `runMain()`/`NodeRuntime.runMain`.
- The base processor compatibility runtime slice dropped the
`Effect.runPromise` count again by moving `AsyncProcessor`, `Flow`, and
`FlowProcessor` Promise compatibility facades onto `ManagedRuntime`.
- `Record<string, any>` and `throwLibrarianServiceError` are now clean in - `Record<string, any>` and `throwLibrarianServiceError` are now clean in
`ts/packages`. `ts/packages`.
@ -485,6 +488,38 @@ Notes:
- `cd ts && bun run build` - `cd ts && bun run build`
- `cd ts && bun run test` - `cd ts && bun run test`
### 2026-06-02: Base Processor Compatibility Runtime Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/base/src/processor/async-processor.ts` now uses a
`ManagedRuntime` for Promise compatibility methods, signal-shutdown
execution, and legacy `AsyncProcessor.launch`.
- `ts/packages/base/src/processor/flow.ts` now owns a per-flow
`ManagedRuntime` for `start`, `stop`, `runInCompatibilityScope`, and
Promise resource facades.
- `ts/packages/base/src/processor/flow-processor.ts` now uses a
`ManagedRuntime` for the public `start(context)` facade instead of a local
`Effect.runPromiseWith` runner.
- `ts/packages/base/src/spec/parameter-spec.ts` now routes legacy `add`
through `flow.runInCompatibilityScope(...)`, matching the other specs.
- Subagent checks confirmed `NodeRuntime` is process-entrypoint-only here;
`@trustgraph/base` should not add an `@effect/platform-node` dependency
for these compatibility facades.
- Remaining:
- Constructor `as unknown as` shims in base processors preserve
callable-plus-newable public exports and are compatibility boundaries for
this loop.
- Typed string lookup casts in `Flow` need a real typed-spec/key redesign;
`HashMap`/`MutableHashMap` alone cannot infer `T` from a bare string.
- Verification:
- `bun run --cwd ts/packages/base build`
- `bun run --cwd ts/packages/base test`
- `cd ts && bun run check`
- `git diff --check`
- `cd ts && bun run build`
- `cd ts && bun run test`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -504,12 +539,17 @@ Notes:
- Persistence IO should move toward `FileSystem` or `KeyValueStore` where - Persistence IO should move toward `FileSystem` or `KeyValueStore` where
the installed beta has the needed provider surface. the installed beta has the needed provider surface.
- Base messaging/processors: - Base messaging/processors:
- Subscriber queues/maps, processor/flow Promise compatibility, and dynamic - Processor/flow Promise compatibility now uses `ManagedRuntime`; keep
flow state should continue moving toward `Queue`, `Deferred`, `NodeRuntime` only for process `runMain()` entrypoints.
`SynchronizedRef`, `Schedule`, and scoped layers. - Subscriber queues/maps and dynamic flow state should continue moving
- Existing constructor shims and typed registries in base processors still toward `Queue`, `Deferred`, `SynchronizedRef`, `Schedule`, and scoped
use type assertions; they need a typed factory/registry redesign rather layers.
than more assertions. - Existing constructor shims preserve callable-plus-newable public exports;
removing them needs a public API split or real class redesign.
- Typed string registries in `Flow` need schema-backed parameters and
typed-spec/key accessors. Effect `HashMap`/`MutableHashMap` can improve
lookup ergonomics with `Option`, but it does not remove the string-key
type hole by itself.
- Gateway/client: - Gateway/client:
- `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`.
Socket errors/JSON parsing now use tagged errors and Schema decoding. Socket errors/JSON parsing now use tagged errors and Schema decoding.
@ -528,18 +568,25 @@ Notes:
## Ranked Findings ## Ranked Findings
### P1: Base Processor Registry And Constructor Shims ### P1: Base Flow Definition Schemas And Typed Spec Accessors
- TrustGraph evidence: - TrustGraph evidence:
- `ts/packages/base/src/processor/async-processor.ts`
- `ts/packages/base/src/processor/flow.ts` - `ts/packages/base/src/processor/flow.ts`
- `ts/packages/base/src/processor/flow-processor.ts` - `ts/packages/base/src/processor/flow-processor.ts`
- `ts/packages/base/src/spec/parameter-spec.ts`
- `ts/packages/base/src/spec/producer-spec.ts`
- `ts/packages/base/src/spec/request-response-spec.ts`
- Effect primitives: - Effect primitives:
- Schema-backed registries, `Context`, `Layer`, `Effect.fn`, `Option`, - Schema-backed registries, `Context`, `Layer`, `Effect.fn`, `Option`,
`Predicate`. `Predicate`, `HashMap`/`MutableHashMap`.
- Rewrite shape: - Rewrite shape:
- Replace constructor `as unknown as` shims with typed factory exports. - Replace hand-rolled `isStringRecord` / `isFlowDefinition` narrowing with
- Replace resource lookup casts with schema-backed typed registry helpers. Schema decoding plus `Option`/`Match`-style branches.
- Add schema-backed generic parameter specs and spec-object accessors such as
`flow.parameterEffect(spec)`, then keep string accessors as compatibility
escapes.
- Add typed spec-object accessors for producers and requestors so call sites
stop spelling generic string lookups.
- Do not add assertions to quiet Effect channel inference problems. - Do not add assertions to quiet Effect channel inference problems.
- Tests: - Tests:
- `cd ts && bun run --cwd packages/base test` - `cd ts && bun run --cwd packages/base test`
@ -598,7 +645,7 @@ Notes:
## Recommended PR Order ## Recommended PR Order
1. Base processor registry and constructor shim redesign. 1. Base flow definition schema decoding and typed spec accessors.
2. Gateway RPC callback and client streaming completion cleanup. 2. Gateway RPC callback and client streaming completion cleanup.
3. Storage/provider managed resource cleanup. 3. Storage/provider managed resource cleanup.
4. MCP parity/deletion decision and workbench platform polish. 4. MCP parity/deletion decision and workbench platform polish.
@ -623,6 +670,9 @@ Do not flag these as rewrite blockers without additional proof:
- Client `newableFactory` assertions that preserve vendored callable-plus-new - Client `newableFactory` assertions that preserve vendored callable-plus-new
API facades are compatibility boundaries unless the public constructor API is API facades are compatibility boundaries unless the public constructor API is
intentionally redesigned. intentionally redesigned.
- Base `AsyncProcessor`, `Flow`, and `FlowProcessor` callable-plus-newable
export assertions are compatibility boundaries unless the public constructor
API is intentionally redesigned.
## Acceptance For Final Loop Completion ## Acceptance For Final Loop Completion

View file

@ -8,7 +8,7 @@
import type { PubSubBackend } from "../backend/types.js"; import type { PubSubBackend } from "../backend/types.js";
import { makeNatsBackend } from "../backend/nats.js"; import { makeNatsBackend } from "../backend/nats.js";
import { Context, Effect } from "effect"; import { Context, Effect, Layer, ManagedRuntime } from "effect";
import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js"; import { processorLifecycleError, type ProcessorLifecycleError } from "../errors.js";
import { loadProcessorRuntimeConfig } from "../runtime/config.js"; import { loadProcessorRuntimeConfig } from "../runtime/config.js";
@ -74,6 +74,8 @@ interface RegisteredSignalHandler {
readonly handler: () => void; readonly handler: () => void;
} }
const asyncProcessorRuntime = ManagedRuntime.make(Layer.empty);
export function makeAsyncProcessor< export function makeAsyncProcessor<
RunError = ProcessorLifecycleError, RunError = ProcessorLifecycleError,
RunRequirements = never, RunRequirements = never,
@ -94,14 +96,10 @@ export function makeAsyncProcessor<
} }
const shutdown = () => { const shutdown = () => {
void Effect.runPromise( void asyncProcessorRuntime.runPromise(
Effect.log(`[${config.id}] Shutting down...`).pipe( Effect.log(`[${config.id}] Shutting down...`).pipe(
Effect.flatMap(() => Effect.flatMap(() => processor.stopEffect),
Effect.tryPromise({ Effect.mapError((error) => processorLifecycleError(config.id, "signal-shutdown", error)),
try: () => processor.stop(),
catch: (error) => processorLifecycleError(config.id, "signal-shutdown", error),
}),
),
), ),
).then(() => process.exit(0), () => process.exit(1)); ).then(() => process.exit(0), () => process.exit(1));
}; };
@ -133,8 +131,8 @@ export function makeAsyncProcessor<
registerConfigHandler: (handler) => { registerConfigHandler: (handler) => {
configHandlers.push(handler); configHandlers.push(handler);
}, },
start: (context) => Effect.runPromiseWith(context)(processor.startEffect), start: (context) => asyncProcessorRuntime.runPromise(Effect.provide(processor.startEffect, context)),
stop: () => Effect.runPromise(processor.stopEffect), stop: () => asyncProcessorRuntime.runPromise(processor.stopEffect),
onShutdown: (callback) => { onShutdown: (callback) => {
shutdownCallbacks.push(callback); shutdownCallbacks.push(callback);
}, },
@ -178,7 +176,7 @@ export function makeAsyncProcessor<
}); });
return stopProcessor(); return stopProcessor();
}, },
run: (context) => Effect.runPromiseWith(context)(processor.runEffect), run: (context) => asyncProcessorRuntime.runPromise(Effect.provide(processor.runEffect, context)),
get runEffect() { get runEffect() {
if (options.runEffect !== undefined) { if (options.runEffect !== undefined) {
return options.runEffect(processor); return options.runEffect(processor);
@ -208,7 +206,7 @@ export const AsyncProcessor = Object.assign(
id: string, id: string,
): Promise<void> { ): Promise<void> {
const ProcessorCtor = this; const ProcessorCtor = this;
return Effect.runPromise( return asyncProcessorRuntime.runPromise(
Effect.gen(function* () { Effect.gen(function* () {
const config = yield* loadProcessorRuntimeConfig(id); const config = yield* loadProcessorRuntimeConfig(id);
const processor = new ProcessorCtor(config); const processor = new ProcessorCtor(config);

View file

@ -37,7 +37,7 @@ import {
} from "../messaging/runtime.js"; } from "../messaging/runtime.js";
import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { makePubSubService, PubSub } from "../backend/pubsub.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import { Duration, Effect, Exit, Scope } from "effect"; import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as Predicate from "effect/Predicate"; import * as Predicate from "effect/Predicate";
import * as S from "effect/Schema"; import * as S from "effect/Schema";
@ -346,6 +346,7 @@ export function makeFlowProcessor<FlowRequirements = never>(
const specifications: Array<Spec<FlowRequirements>> = [ const specifications: Array<Spec<FlowRequirements>> = [
...(options.specifications ?? []), ...(options.specifications ?? []),
]; ];
const compatibilityRuntime = ManagedRuntime.make(Layer.empty);
let processor: FlowProcessorRuntime<FlowRequirements>; let processor: FlowProcessorRuntime<FlowRequirements>;
const base: AsyncProcessorRuntime< const base: AsyncProcessorRuntime<
PubSubError | FlowRuntimeError | ProcessorLifecycleError, PubSubError | FlowRuntimeError | ProcessorLifecycleError,
@ -385,7 +386,7 @@ export function makeFlowProcessor<FlowRequirements = never>(
return makeStartEffect(); return makeStartEffect();
}, },
start: (context) => start: (context) =>
Effect.runPromiseWith(context)( compatibilityRuntime.runPromise(Effect.provide(
Effect.gen(function* () { Effect.gen(function* () {
const pubsub = makePubSubService(base.pubsub); const pubsub = makePubSubService(base.pubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig(); const messagingConfig = yield* loadMessagingRuntimeConfig();
@ -401,7 +402,8 @@ export function makeFlowProcessor<FlowRequirements = never>(
); );
yield* Effect.scoped(start); yield* Effect.scoped(start);
}), }),
), context,
)),
}; };
return processor; return processor;

View file

@ -4,7 +4,7 @@
* Python reference: trustgraph-base/trustgraph/base/flow.py * Python reference: trustgraph-base/trustgraph/base/flow.py
*/ */
import { Context, Effect, Exit, Scope } from "effect"; import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import type { PubSubBackend } from "../backend/types.js"; import type { PubSubBackend } from "../backend/types.js";
import { makePubSubService } from "../backend/pubsub.js"; import { makePubSubService } from "../backend/pubsub.js";
import { import {
@ -69,6 +69,7 @@ export function makeFlow<Requirements = never>(
const requestors = new Map<string, EffectRequestResponse<never, unknown>>(); const requestors = new Map<string, EffectRequestResponse<never, unknown>>();
const parameters = new Map<string, unknown>(); const parameters = new Map<string, unknown>();
let compatibilityScope: Scope.Closeable | null = null; let compatibilityScope: Scope.Closeable | null = null;
const compatibilityRuntime = ManagedRuntime.make(Layer.empty);
const ensureCompatibilityScopeEffect = Effect.fn("Flow.ensureCompatibilityScope")(function* () { const ensureCompatibilityScopeEffect = Effect.fn("Flow.ensureCompatibilityScope")(function* () {
if (compatibilityScope !== null) { if (compatibilityScope !== null) {
@ -107,7 +108,7 @@ export function makeFlow<Requirements = never>(
}); });
}, },
start(context: Context.Context<Requirements>): Promise<void> { start(context: Context.Context<Requirements>): Promise<void> {
return Effect.runPromise( return compatibilityRuntime.runPromise(
Effect.gen(function* () { Effect.gen(function* () {
if (compatibilityScope !== null) { if (compatibilityScope !== null) {
yield* flow.stopEffect(); yield* flow.stopEffect();
@ -117,7 +118,7 @@ export function makeFlow<Requirements = never>(
); );
}, },
stop(): Promise<void> { stop(): Promise<void> {
return Effect.runPromise(flow.stopEffect()); return compatibilityRuntime.runPromise(flow.stopEffect());
}, },
stopEffect(): Effect.Effect<void> { stopEffect(): Effect.Effect<void> {
return Effect.gen(function* () { return Effect.gen(function* () {
@ -157,7 +158,7 @@ export function makeFlow<Requirements = never>(
runtimePubsub: PubSubBackend, runtimePubsub: PubSubBackend,
context: Context.Context<Requirements>, context: Context.Context<Requirements>,
): Promise<A> { ): Promise<A> {
return Effect.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context)); return compatibilityRuntime.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context));
}, },
clearResources(): void { clearResources(): void {
producers.clear(); producers.clear();
@ -207,16 +208,16 @@ export function makeFlow<Requirements = never>(
const p = producers.get(producerName); const p = producers.get(producerName);
if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName); if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
return { return {
send: (id, message) => Effect.runPromise((p as EffectProducer<T>).send(id, message)), send: (id, message) => compatibilityRuntime.runPromise((p as EffectProducer<T>).send(id, message)),
flush: () => Effect.runPromise(p.flush), flush: () => compatibilityRuntime.runPromise(p.flush),
stop: () => Effect.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))), stop: () => compatibilityRuntime.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))),
}; };
}, },
consumer(consumerName: string): FlowConsumer { consumer(consumerName: string): FlowConsumer {
const c = consumers.get(consumerName); const c = consumers.get(consumerName);
if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName); if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName);
return { return {
stop: () => Effect.runPromise(c.stop), stop: () => compatibilityRuntime.runPromise(c.stop),
}; };
}, },
requestor<TReq, TRes>(requestorName: string): FlowRequestor<TReq, TRes> { requestor<TReq, TRes>(requestorName: string): FlowRequestor<TReq, TRes> {
@ -224,13 +225,13 @@ export function makeFlow<Requirements = never>(
if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName); if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
return { return {
request: (request, options) => request: (request, options) =>
Effect.runPromise( compatibilityRuntime.runPromise(
(rr as EffectRequestResponse<TReq, TRes>).request( (rr as EffectRequestResponse<TReq, TRes>).request(
request, request,
toEffectRequestOptions(options), toEffectRequestOptions(options),
), ),
), ),
stop: () => Effect.runPromise(rr.stop), stop: () => compatibilityRuntime.runPromise(rr.stop),
}; };
}, },
parameter<T>(parameterName: string): T { parameter<T>(parameterName: string): T {

View file

@ -20,6 +20,7 @@ export function makeParameterSpec(name: string): ParameterSpec {
return { return {
name, name,
addEffect, addEffect,
add: (flow, _pubsub, definition) => Effect.runPromise(addEffect(flow, definition)), add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
}; };
} }