From 1a29bdef9df81f95a83019d7db4d19df8c25f500 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 07:34:14 -0500 Subject: [PATCH] Use Effect fn for base processor helpers --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 58 ++++- .../src/__tests__/flow-spec-runtime.test.ts | 8 +- ts/packages/base/src/messaging/runtime.ts | 6 +- .../base/src/processor/flow-processor.ts | 234 ++++++++---------- ts/packages/base/src/processor/flow.ts | 133 ++++++---- ts/packages/base/src/spec/parameter-spec.ts | 24 +- ts/packages/base/src/spec/producer-spec.ts | 23 +- .../base/src/spec/request-response-spec.ts | 23 +- 8 files changed, 316 insertions(+), 193 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 1155124f..26b64c24 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2164,6 +2164,36 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Base Processor Effect.fn Helper Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/base/src/processor/flow.ts` now exposes `startEffect` and + `stopEffect` as Effect values with spans instead of lazy zero-argument + Effect methods. + - `Flow.runInCompatibilityScopeEffect` is now a named `Effect.fn` helper + instead of a method returning nested `Effect.gen`. + - The public `Flow` shape is explicit so Effect error and requirement + channels do not widen through recursive object inference. + - Producer, request/response, and parameter specs are generic over the + receiving flow requirements, so reusable specs remain mountable in flows + with additional service dependencies without type assertions. + - `ts/packages/base/src/processor/flow-processor.ts` now defines + `startFlow`, `configureFlows`, `processNextConfigPush`, the safe consume + wrapper, and the Promise `start` compatibility helper with named + `Effect.fn` functions. + - The focused scan for callable `return Effect.gen(...)` helpers in + `flow.ts` is clean. Remaining `flow-processor.ts` matches are one-shot + program values / exported program factories and should be scoped + separately from reusable helper normalization. +- Verification: + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/flow-spec-runtime.test.ts src/__tests__/flow-processor-runtime.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2198,6 +2228,10 @@ Notes: - Flow service startup facades now consistently use `ManagedRuntime`, and local scripts should delegate to `runMain()` instead of adding local `.catch(console.error/process.exit)` wrappers. + - Base `Flow.startEffect` / `stopEffect` are now Effect values, not + zero-argument lazy Effect methods. New runtime APIs should expose Effect + values where no arguments are needed, and reserve `Effect.fn` for reusable + helpers that take arguments. - Persistence IO should move toward `FileSystem` or `KeyValueStore` where the installed beta has the needed provider surface. - Base messaging/processors: @@ -2233,6 +2267,10 @@ Notes: - Base metrics are now Effect-native and Prometheus-formatted through `PrometheusMetrics.format`; do not reopen `prom-client` unless a future scrape requirement cannot be represented by Effect metrics. + - The scratch note's `prom-client` claim is stale: `ts/packages` has no + `prom-client` matches, and `metrics/prometheus.ts` already uses + `effect/Metric` plus `PrometheusMetrics.format`. Remaining observability + work is OTLP/span coverage and wiring existing consumer metrics helpers. - Numeric public timeout fields such as `timeoutMs` remain compatibility surfaces. Internal runtime config with `Config.number(...Ms)` is still a valid `Config.duration` / `Duration` cleanup target. @@ -2241,6 +2279,9 @@ Notes: Socket errors/JSON parsing now use tagged errors and Schema decoding. The remaining client `newableFactory` assertions are documented as public API compatibility boundaries for this loop. + - `EffectRpcClient` remains Promise-shaped at the public facade. A future + client API slice can add an Effect/Stream-native surface beside the + Promise facade, then migrate callback users incrementally. - Gateway/client `DispatchError` contracts now use `S.TaggedErrorClass`; do not reopen `S.ErrorClass` unless a new production match appears. - Gateway `DispatchStream` now uses Effect-native dispatcher streaming @@ -2288,6 +2329,9 @@ Notes: complete, and the Effect AI stream-part adapter now uses `effect/Match`. The remaining provider-layer item is parity-backed Effect AI adapter work, not a direct SDK swap. + - AI service modernization remains partially valid: Claude already uses + `@effect/ai-anthropic`, while OpenAI/Azure/OpenAI-compatible/Mistral/Ollama + still need parity-backed Effect AI adapters or provider-specific bridges. - Scratch-note follow-ups: - `Term` / compact client term serialization is complete for base schema, gateway translation, and pure term helper switches. Future work should @@ -2296,6 +2340,14 @@ Notes: - Messaging runtime `Config.duration` cleanup is complete. Internal runtime config uses `Duration.Duration`; public timeout compatibility inputs and broker receive/error payload boundaries remain numeric milliseconds. + - CLI modernization remains valid, but the live installed target is + `effect/unstable/cli` rather than an installed `@effect/cli` package. + - Chunking remains a small valid `effect/Chunk` slice: the recursive + splitter is still array/mutation based and can expose `Chunk.Chunk` + internally while preserving service behavior. + - Knowledge core internals are largely Effect-native, but public core service + facades still expose Promise methods; migrate tests to Effect-first + methods before shrinking those facades. - Qdrant graph/doc known-collection caches now use `MutableHashSet`. Short-lived local traversal sets remain no-ops. - Gateway dispatcher static service registries, streaming membership, and @@ -2322,8 +2374,10 @@ Notes: - Fresh strict signal sweep after the 2026-06-04 helper and collection slices found no production normal `Error`, raw `try`/`catch`, native `switch`, or Effect-focused type assertions under `ts/packages`. - - Remaining real helper-normalization targets from the fresh sweep are base - processor flow helpers and one workbench atom helper. + - Remaining real helper-normalization targets from the fresh sweep are one + workbench atom helper plus separately scoped inline callback/program + factories in messaging compatibility facades, gateway/librarian helpers, + and CLI command actions. - Remaining real long-lived native collection targets include base processor registries, Librarian service / collection manager state, prompt template cache, and a workbench module cache. Local traversal sets and test fakes diff --git a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts index 373e1e14..7a63fa0b 100644 --- a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts +++ b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts @@ -167,7 +167,7 @@ describe("Effect-native flow specifications", () => { provideRuntime( backend, Effect.gen(function* () { - yield* flow.startEffect(); + yield* flow.startEffect; const producer = yield* flow.producerEffect(outputProducerSpec); const duplicateSpecError = yield* flow.producerEffect(duplicateOutputProducerSpec).pipe(Effect.flip); expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError"); @@ -212,7 +212,7 @@ describe("Effect-native flow specifications", () => { provideRuntime( backend, Effect.gen(function* () { - yield* flow.startEffect(); + yield* flow.startEffect; yield* Effect.yieldNow; yield* TestClock.adjust(Duration.millis(5)); }), @@ -254,7 +254,7 @@ describe("Effect-native flow specifications", () => { provideRuntime( backend, Effect.gen(function* () { - yield* flow.startEffect(); + yield* flow.startEffect; const duplicateSpecError = yield* flow.requestorEffect(duplicateRequestResponseSpec).pipe(Effect.flip); expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError"); const requestor = flow.requestor(requestResponseSpec); @@ -293,7 +293,7 @@ describe("Effect-native flow specifications", () => { provideRuntime( backend, Effect.gen(function* () { - yield* flow.startEffect(); + yield* flow.startEffect; const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip); const parameter = yield* flow.parameterEffect(presentParameter); const legacyParameter = yield* flow.parameterEffect("present"); diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index ed6f2ba3..1085c38b 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -625,7 +625,7 @@ export const RequestResponseFactoryLive = Layer.effect( export const runFlowRuntimeScoped = Effect.fn("FlowRuntime.run")(function* ( flow: Flow, ) { - yield* flow.startEffect().pipe( + yield* flow.startEffect.pipe( Effect.mapError((error) => flowRuntimeError(flow.name, "start", error)), ); yield* Effect.addFinalizer(() => @@ -673,8 +673,8 @@ export const runEffectRequestResponseScoped = Effect.fn("runEffectRequestRespons return yield* makeEffectRequestResponseFromPubSub(pubsub, config, options); }); -export const runFlowScoped = Effect.fn("runFlowScoped")(function* ( - flow: Flow, +export const runFlowScoped = Effect.fn("runFlowScoped")(function* ( + flow: Flow, ) { yield* runFlowRuntimeScoped(flow); }); diff --git a/ts/packages/base/src/processor/flow-processor.ts b/ts/packages/base/src/processor/flow-processor.ts index 91f9a438..82403d01 100644 --- a/ts/packages/base/src/processor/flow-processor.ts +++ b/ts/packages/base/src/processor/flow-processor.ts @@ -37,7 +37,7 @@ import { } from "../messaging/runtime.js"; import { makePubSubService, PubSub } from "../backend/pubsub.js"; import { loadMessagingRuntimeConfig } from "../runtime/index.ts"; -import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import { Context, Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; @@ -172,127 +172,110 @@ export function runFlowProcessorDefinitionScoped< ); }; - const startFlowEffect = ( + const startFlowEffect = Effect.fn("FlowProcessor.startFlow")(function* ( name: string, definition: FlowDefinition, - ): Effect.Effect< - ActiveFlow, - FlowRuntimeError, - FlowRuntime | ProducerFactory | ConsumerFactory | RequestResponseFactory | FlowRequirements - > => - Effect.gen(function* () { - const flowRuntime = yield* FlowRuntime; - const scope = yield* Scope.make(); - const flow = new Flow( - name, - options.id, - options.pubsub, - definition, - options.specifications, - ); - return yield* flowRuntime.run(flow).pipe( - Scope.provide(scope), - Effect.as({ scope } satisfies ActiveFlow), - Effect.catch((error) => - Scope.close(scope, Exit.void).pipe( - Effect.flatMap(() => Effect.fail(error)), - ), + ) { + const flowRuntime = yield* FlowRuntime; + const scope = yield* Scope.make(); + const flow = new Flow( + name, + options.id, + options.pubsub, + definition, + options.specifications, + ); + return yield* flowRuntime.run(flow).pipe( + Scope.provide(scope), + Effect.as({ scope } satisfies ActiveFlow), + Effect.catch((error) => + Scope.close(scope, Exit.void).pipe( + Effect.flatMap(() => Effect.fail(error)), ), - ); - }); + ), + ); + }); - const onConfigureFlowsEffect = ( + const onConfigureFlowsEffect = Effect.fn("FlowProcessor.configureFlows")(function* ( config: Record, _version: number, - ): Effect.Effect< - void, - FlowRuntimeError, - FlowRuntime | ProducerFactory | ConsumerFactory | RequestResponseFactory | FlowRequirements - > => - Effect.gen(function* () { - const flowDefs = config.flows; - if (flowDefs === undefined) { - yield* Effect.log(`[${options.id}] No flows in config push, skipping`); - return; - } - const decodedFlowDefs = decodeFlowDefinitions(flowDefs); - if (O.isNone(decodedFlowDefs)) { - yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`); - return; - } - const flowDefinitions = decodedFlowDefs.value; + ) { + const flowDefs = config.flows; + if (flowDefs === undefined) { + yield* Effect.log(`[${options.id}] No flows in config push, skipping`); + return; + } + const decodedFlowDefs = decodeFlowDefinitions(flowDefs); + if (O.isNone(decodedFlowDefs)) { + yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`); + return; + } + const flowDefinitions = decodedFlowDefs.value; - const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe( - Effect.catch((error) => Effect.succeed(String(error))), - ); - if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) { - yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`); - return; - } - lastFlowsJson = flowsJson; + const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe( + Effect.catch((error) => Effect.succeed(String(error))), + ); + if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) { + yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`); + return; + } + lastFlowsJson = flowsJson; - for (const [name, activeFlow] of flows) { - if (!(name in flowDefinitions)) { - yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`); - yield* closeFlowEffect(name, activeFlow); - flows.delete(name); - } + for (const [name, activeFlow] of flows) { + if (!(name in flowDefinitions)) { + yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`); + yield* closeFlowEffect(name, activeFlow); + flows.delete(name); + } + } + + for (const [name, defn] of Object.entries(flowDefinitions)) { + const existing = flows.get(name); + if (existing !== undefined) { + yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`); + yield* closeFlowEffect(name, existing); + flows.delete(name); } - for (const [name, defn] of Object.entries(flowDefinitions)) { - const existing = flows.get(name); - if (existing !== undefined) { - yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`); - yield* closeFlowEffect(name, existing); - flows.delete(name); - } + yield* Effect.log(`[${options.id}] Starting flow "${name}"`); + const activeFlow = yield* startFlowEffect(name, defn); + flows.set(name, activeFlow); + yield* Effect.log(`[${options.id}] Flow "${name}" started`); + } + }); - yield* Effect.log(`[${options.id}] Starting flow "${name}"`); - const activeFlow = yield* startFlowEffect(name, defn); - flows.set(name, activeFlow); - yield* Effect.log(`[${options.id}] Flow "${name}" started`); - } + const processNextConfigPushEffect = Effect.fn("FlowProcessor.processNextConfigPush")(function* () { + const consumer = configConsumer; + if (consumer === null) { + yield* Effect.sleep(Duration.millis(1000)); + return; + } + + const msg = yield* Effect.tryPromise({ + try: () => consumer.receive(2000), + catch: (error) => pubSubError("receive:config-push", error), }); + if (msg === null) { + return; + } - const processNextConfigPushEffect = (): Effect.Effect< - void, - never, - | FlowRuntime - | ProducerFactory - | ConsumerFactory - | RequestResponseFactory - | FlowRequirements - | ConfigHandlerRequirements - > => - Effect.gen(function* () { - const consumer = configConsumer; - if (consumer === null) { - yield* Effect.sleep(Duration.millis(1000)); - return; - } + const push = msg.value(); + yield* Effect.log(`[${options.id}] Received config push version=${push.version}`); - const msg = yield* Effect.tryPromise({ - try: () => consumer.receive(2000), - catch: (error) => pubSubError("receive:config-push", error), - }); - if (msg === null) { - return; - } + yield* onConfigureFlowsEffect(push.config, push.version); - const push = msg.value(); - yield* Effect.log(`[${options.id}] Received config push version=${push.version}`); + for (const handler of options.configHandlers ?? []) { + yield* handler(push.config, push.version); + } - yield* onConfigureFlowsEffect(push.config, push.version); + yield* Effect.tryPromise({ + try: () => consumer.acknowledge(msg), + catch: (error) => pubSubError("acknowledge:config-push", error), + }); + }); - for (const handler of options.configHandlers ?? []) { - yield* handler(push.config, push.version); - } - - yield* Effect.tryPromise({ - try: () => consumer.acknowledge(msg), - catch: (error) => pubSubError("acknowledge:config-push", error), - }); - }).pipe( + const processNextConfigPushSafelyEffect = Effect.fn("FlowProcessor.processNextConfigPushSafely")(function* () { + return yield* processNextConfigPushEffect().pipe( Effect.catch((error) => { if (!isRunning()) { return Effect.void; @@ -304,6 +287,7 @@ export function runFlowProcessorDefinitionScoped< ); }), ); + }); return Effect.gen(function* () { const pubsub = yield* PubSub; @@ -325,7 +309,7 @@ export function runFlowProcessorDefinitionScoped< yield* Effect.whileLoop({ while: isRunning, - body: processNextConfigPushEffect, + body: processNextConfigPushSafelyEffect, step: () => undefined, }); }); @@ -368,6 +352,24 @@ export function makeFlowProcessor( return options.provide?.(effect) ?? effect; }; + const startProcessorEffect = Effect.fn("FlowProcessor.start")(function* ( + context: Context.Context>, + ) { + const pubsub = makePubSubService(base.pubsub); + const messagingConfig = yield* loadMessagingRuntimeConfig(); + const start = processor.startEffect.pipe( + Effect.provideService(PubSub, pubsub), + Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))), + Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))), + Effect.provideService( + RequestResponseFactory, + RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)), + ), + Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })), + ); + return yield* Effect.provide(Effect.scoped(start), context); + }); + processor = { ...base, specifications, @@ -377,25 +379,7 @@ export function makeFlowProcessor( get startEffect() { return makeStartEffect(); }, - start: (context) => - compatibilityRuntime.runPromise(Effect.provide( - Effect.gen(function* () { - const pubsub = makePubSubService(base.pubsub); - const messagingConfig = yield* loadMessagingRuntimeConfig(); - const start = processor.startEffect.pipe( - Effect.provideService(PubSub, pubsub), - Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))), - Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))), - Effect.provideService( - RequestResponseFactory, - RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)), - ), - Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })), - ); - yield* Effect.scoped(start); - }), - context, - )), + start: (context) => compatibilityRuntime.runPromise(startProcessorEffect(context)), }; return processor; diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index b0152b67..c93a846b 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -4,7 +4,7 @@ * Python reference: trustgraph-base/trustgraph/base/flow.py */ -import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import { Config as EffectConfig, Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { PubSubBackend } from "../backend/types.js"; @@ -66,13 +66,68 @@ export interface FlowRequestor { type FlowParameterError = FlowResourceNotFoundError | FlowParameterDecodeError; +export interface Flow { + readonly name: string; + readonly processorId: string; + startEffect: Effect.Effect; + start: (context: Context.Context) => Promise; + stop: () => Promise; + stopEffect: Effect.Effect; + runInCompatibilityScopeEffect: ( + effect: Effect.Effect, + runtimePubsub: PubSubBackend, + context: Context.Context, + ) => Effect.Effect; + runInCompatibilityScope: ( + effect: Effect.Effect, + runtimePubsub: PubSubBackend, + context: Context.Context, + ) => Promise; + clearResources: () => void; + registerProducer: (registerName: string, producer: EffectProducer) => void; + registerConsumer: (registerName: string, consumer: EffectConsumer) => void; + registerRequestor: ( + registerName: string, + rr: EffectRequestResponse, + ) => void; + setParameter: (parameterName: string, value: unknown) => void; + producerEffect: { + (producerSpec: ProducerSpec): Effect.Effect, FlowResourceNotFoundError>; + (producerName: string): Effect.Effect, FlowResourceNotFoundError>; + }; + consumerEffect: (consumerName: string) => Effect.Effect; + requestorEffect: { + ( + requestorSpec: RequestResponseSpec, + ): Effect.Effect, FlowResourceNotFoundError>; + (requestorName: string): Effect.Effect, FlowResourceNotFoundError>; + }; + parameterEffect: { + (parameterSpec: ParameterSpec): Effect.Effect; + (parameterName: string): Effect.Effect; + }; + producer: { + (producerSpec: ProducerSpec): FlowProducer; + (producerName: string): FlowProducer; + }; + consumer: (consumerName: string) => FlowConsumer; + requestor: { + (requestorSpec: RequestResponseSpec): FlowRequestor; + (requestorName: string): FlowRequestor; + }; + parameter: { + (parameterSpec: ParameterSpec): T; + (parameterName: string): unknown; + }; +} + export function makeFlow( name: string, processorId: string, pubsub: PubSubBackend, definition: FlowDefinition, specifications: ReadonlyArray>, -) { +): Flow { const producers = new Map>(); const consumers = new Map(); const requestors = new Map>(); @@ -275,62 +330,56 @@ export function makeFlow( return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow))); } - const flow = { + const flow: Flow = { name, processorId, - startEffect(): Effect.Effect { - return Effect.gen(function* () { - for (const spec of specifications) { - yield* spec.addEffect(flow, definition); - } - }); - }, + startEffect: Effect.gen(function* () { + for (const spec of specifications) { + yield* spec.addEffect(flow, definition); + } + }).pipe(Effect.withSpan("Flow.startEffect")), start(context: Context.Context): Promise { return compatibilityRuntime.runPromise( Effect.gen(function* () { if (compatibilityScope !== null) { - yield* flow.stopEffect(); + yield* flow.stopEffect; } - yield* flow.runInCompatibilityScopeEffect(flow.startEffect(), pubsub, context); + yield* flow.runInCompatibilityScopeEffect(flow.startEffect, pubsub, context); }), ); }, stop(): Promise { - return compatibilityRuntime.runPromise(flow.stopEffect()); + return compatibilityRuntime.runPromise(flow.stopEffect); }, - stopEffect(): Effect.Effect { - return Effect.gen(function* () { - const scope = compatibilityScope; - compatibilityScope = null; - if (scope !== null) { - yield* Scope.close(scope, Exit.void); - } - flow.clearResources(); - }); - }, - runInCompatibilityScopeEffect( + stopEffect: Effect.gen(function* () { + const scope = compatibilityScope; + compatibilityScope = null; + if (scope !== null) { + yield* Scope.close(scope, Exit.void); + } + flow.clearResources(); + }).pipe(Effect.withSpan("Flow.stopEffect")), + runInCompatibilityScopeEffect: Effect.fn("Flow.runInCompatibilityScopeEffect")(function* ( effect: Effect.Effect, runtimePubsub: PubSubBackend, context: Context.Context, ) { - return Effect.gen(function* () { - const scope = yield* ensureCompatibilityScopeEffect(); - const pubsubService = makePubSubService(runtimePubsub); - const messagingConfig = yield* loadMessagingRuntimeConfig(); - return yield* Effect.provide( - effect.pipe( - Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))), - Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))), - Effect.provideService( - RequestResponseFactory, - RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)), - ), - Scope.provide(scope), + const scope = yield* ensureCompatibilityScopeEffect(); + const pubsubService = makePubSubService(runtimePubsub); + const messagingConfig = yield* loadMessagingRuntimeConfig(); + return yield* Effect.provide( + effect.pipe( + Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))), + Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))), + Effect.provideService( + RequestResponseFactory, + RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)), ), - context, - ); - }); - }, + Scope.provide(scope), + ), + context, + ); + }), runInCompatibilityScope( effect: Effect.Effect, runtimePubsub: PubSubBackend, @@ -380,8 +429,6 @@ export function makeFlow( return flow; } -export type Flow = ReturnType>; - export const Flow = makeFlow as unknown as { new ( name: string, diff --git a/ts/packages/base/src/spec/parameter-spec.ts b/ts/packages/base/src/spec/parameter-spec.ts index 5d8ed3b5..9d2106f0 100644 --- a/ts/packages/base/src/spec/parameter-spec.ts +++ b/ts/packages/base/src/spec/parameter-spec.ts @@ -7,16 +7,28 @@ import { Effect, type Context } from "effect"; import * as S from "effect/Schema"; import type { PubSubBackend } from "../backend/types.js"; -import type { Spec } from "./types.js"; +import type { SpecRuntimeRequirements } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; +import type { PubSubError } from "../errors.js"; declare const ParameterSpecType: unique symbol; const UnknownParameterSchema: S.Codec = S.Unknown; -export interface ParameterSpec extends Spec { +export interface ParameterSpec { readonly [ParameterSpecType]?: (_: T) => T; + readonly name: string; readonly schema: S.Codec; + readonly addEffect: ( + flow: Flow, + definition: FlowDefinition, + ) => Effect.Effect; + readonly add: ( + flow: Flow, + pubsub: PubSubBackend, + definition: FlowDefinition, + context: Context.Context, + ) => Promise; } export function makeParameterSpec(name: string): ParameterSpec; @@ -29,7 +41,7 @@ export function makeParameterSpec( schema?: S.Codec, ) { const parameterSchema = schema ?? UnknownParameterSchema; - const addEffect = (flow: Flow, definition: FlowDefinition) => + const addEffect = (flow: Flow, definition: FlowDefinition) => Effect.sync(() => { const value = definition.parameters?.[name]; flow.setParameter(name, value); @@ -39,11 +51,11 @@ export function makeParameterSpec( name, schema: parameterSchema, addEffect, - add: ( - flow: Flow, + add: ( + flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition, - context: Context.Context, + context: Context.Context, ) => flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context), }; diff --git a/ts/packages/base/src/spec/producer-spec.ts b/ts/packages/base/src/spec/producer-spec.ts index f73dfcaa..218060f6 100644 --- a/ts/packages/base/src/spec/producer-spec.ts +++ b/ts/packages/base/src/spec/producer-spec.ts @@ -4,12 +4,14 @@ * Python reference: trustgraph-base/trustgraph/base/producer_spec.py */ -import { Effect } from "effect"; -import type { Spec } from "./types.js"; +import { Effect, type Context } from "effect"; +import type { SpecRuntimeRequirements } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; +import type { PubSubBackend } from "../backend/types.js"; import { flowResourceNotFoundError, type FlowResourceNotFoundError, + type PubSubError, } from "../errors.js"; import { type EffectProducer, @@ -18,8 +20,19 @@ import { declare const ProducerSpecType: unique symbol; -export interface ProducerSpec extends Spec { +export interface ProducerSpec { readonly [ProducerSpecType]?: (_: T) => T; + readonly name: string; + readonly addEffect: ( + flow: Flow, + definition: FlowDefinition, + ) => Effect.Effect; + readonly add: ( + flow: Flow, + pubsub: PubSubBackend, + definition: FlowDefinition, + context: Context.Context, + ) => Promise; readonly producerEffect: ( flow: Flow, ) => Effect.Effect, FlowResourceNotFoundError>; @@ -55,8 +68,8 @@ export function makeProducerSpec(name: string): ProducerSpec { : Effect.succeed(producer); }; - const addEffect = Effect.fn("ProducerSpec.addEffect")(function* ( - flow: Flow, + const addEffect = Effect.fn("ProducerSpec.addEffect")(function* ( + flow: Flow, definition: FlowDefinition, ) { const topic = definition.topics?.[name] ?? name; diff --git a/ts/packages/base/src/spec/request-response-spec.ts b/ts/packages/base/src/spec/request-response-spec.ts index 00d41e8a..7ac34914 100644 --- a/ts/packages/base/src/spec/request-response-spec.ts +++ b/ts/packages/base/src/spec/request-response-spec.ts @@ -7,12 +7,14 @@ * Python reference: trustgraph-base/trustgraph/base/prompt_client_spec.py */ -import { Effect } from "effect"; -import type { Spec } from "./types.js"; +import { Effect, type Context } from "effect"; +import type { SpecRuntimeRequirements } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; +import type { PubSubBackend } from "../backend/types.js"; import { flowResourceNotFoundError, type FlowResourceNotFoundError, + type PubSubError, } from "../errors.js"; import { type EffectRequestResponse, @@ -21,11 +23,22 @@ import { declare const RequestResponseSpecType: unique symbol; -export interface RequestResponseSpec extends Spec { +export interface RequestResponseSpec { readonly [RequestResponseSpecType]?: { readonly request: TReq; readonly response: TRes; }; + readonly name: string; + readonly addEffect: ( + flow: Flow, + definition: FlowDefinition, + ) => Effect.Effect; + readonly add: ( + flow: Flow, + pubsub: PubSubBackend, + definition: FlowDefinition, + context: Context.Context, + ) => Promise; readonly requestorEffect: ( flow: Flow, ) => Effect.Effect, FlowResourceNotFoundError>; @@ -65,8 +78,8 @@ export function makeRequestResponseSpec( : Effect.succeed(requestor); }; - const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* ( - flow: Flow, + const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* ( + flow: Flow, definition: FlowDefinition, ) { const requestTopic = definition.topics?.[requestTopicName] ?? requestTopicName;