diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 1f8e1276..befb2947 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,14 +12,14 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Text completion -provider status narrowing slice: +Current signal counts from `ts/packages` after the 2026-06-02 Base parameter +spec accessor slice: | Signal | Count | | --- | ---: | | `Effect.runPromise` | 168 | -| `Map<` | 88 | -| `WebSocket` | 72 | +| `Map<` | 82 | +| `WebSocket` | 62 | | `new Map` | 62 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | @@ -78,6 +78,11 @@ Notes: `model/text-completion/common.ts`. - The text completion provider status slice replaced manual status/statusCode record assertions with `effect/Predicate` narrowing. +- The base parameter spec accessor slice added Schema-backed + `ParameterSpec` values plus `flow.parameterEffect(spec)` and + `flow.parameter(spec)`. Bare string parameter lookup remains available as an + `unknown` compatibility escape, while typed parameter access now decodes + through Schema and fails with a tagged `FlowParameterDecodeError`. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -609,6 +614,39 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Base Parameter Spec Accessor Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/spec/parameter-spec.ts` now models + `ParameterSpec` with an Effect Schema codec. Legacy parameter specs + default to `S.Unknown`, preserving name-based registration while making + typed access schema-backed. + - `ts/packages/base/src/processor/flow.ts` now exposes + `flow.parameterEffect(spec)` and `flow.parameter(spec)` for inferred, + Schema-decoded parameter values. String lookup remains available as an + `unknown` compatibility escape instead of a caller-chosen generic type. + - Parameter schema failures now fail with the tagged + `FlowParameterDecodeError` rather than a normal `Error`. + - `ts/packages/flow/src/chunking/service.ts` now declares numeric chunk + parameters once and retrieves them through the typed spec-object accessor. + - `ts/packages/base/src/__tests__/flow-spec-runtime.test.ts` covers typed + parameter decoding, legacy string lookup, missing parameter errors, sync + accessor decoding, and schema mismatch errors. +- Remaining: + - Add typed spec-object accessors for producers and requestors so call sites + can stop spelling generic string lookups for those registries too. +- Verification: + - `bun run --cwd ts/packages/base test -- src/__tests__/flow-spec-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/flow build` + - `cd ts && bun run check` + - `bun run --cwd ts/packages/base test` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -635,10 +673,10 @@ Notes: layers. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - - Typed string registries in `Flow` need schema-backed parameters and - typed-spec/key accessors. Effect `HashMap`/`MutableHashMap` can improve - lookup ergonomics with `Option`, but it does not remove the string-key - type hole by itself. + - Typed string registries in `Flow` now have Schema-backed parameter specs. + Producer and requestor typed spec-object accessors remain. Effect + `HashMap`/`MutableHashMap` can improve lookup ergonomics with `Option`, but + it does not remove the string-key type hole by itself. - Gateway/client: - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. Socket errors/JSON parsing now use tagged errors and Schema decoding. @@ -657,20 +695,18 @@ Notes: ## Ranked Findings -### P1: Base Typed Spec Accessors +### P1: Base Typed Producer And Requestor Spec Accessors - TrustGraph evidence: - `ts/packages/base/src/processor/flow.ts` - - `ts/packages/base/src/spec/parameter-spec.ts` - `ts/packages/base/src/spec/producer-spec.ts` - `ts/packages/base/src/spec/request-response-spec.ts` - Effect primitives: - - Schema-backed registries, `Context`, `Layer`, `Effect.fn`, `Option`, + - Typed spec-object registries, `Context`, `Layer`, `Effect.fn`, `Option`, `Predicate`, `HashMap`/`MutableHashMap`. - Rewrite shape: - - Add schema-backed generic parameter specs and spec-object accessors such as - `flow.parameterEffect(spec)`, then keep string accessors as compatibility - escapes. + - Parameter specs are now Schema-backed and support + `flow.parameterEffect(spec)` / `flow.parameter(spec)`. - Add typed spec-object accessors for producers and requestors so call sites stop spelling generic string lookups. - Do not add assertions to quiet Effect channel inference problems. @@ -729,7 +765,7 @@ Notes: ## Recommended PR Order -1. Base typed spec accessors. +1. Complete base typed producer/requestor spec accessors. 2. Gateway RPC callback and client streaming completion cleanup. 3. Storage/provider managed resource cleanup. 4. MCP parity/deletion decision and workbench platform polish. diff --git a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts index abe3723f..ef3c64c4 100644 --- a/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts +++ b/ts/packages/base/src/__tests__/flow-spec-runtime.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "@effect/vitest"; import { ConfigProvider, Duration, Effect, Fiber } from "effect"; +import * as S from "effect/Schema"; import * as TestClock from "effect/testing/TestClock"; import { makeConsumerSpec, @@ -266,12 +267,14 @@ describe("Effect-native flow specifications", () => { "returns typed errors for missing flow resources", Effect.fnUntraced(function* () { const backend = new RuntimeBackend(new ScriptedConsumer()); + const presentParameter = makeParameterSpec("present", S.Number); + const invalidParameter = makeParameterSpec("present", S.String); const flow = new Flow( "default", "processor", backend, { parameters: { present: 42 } }, - [makeParameterSpec("present")], + [presentParameter], ); const errors = yield* Effect.scoped( @@ -280,19 +283,27 @@ describe("Effect-native flow specifications", () => { Effect.gen(function* () { yield* flow.startEffect(); const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip); - const parameter = yield* flow.parameterEffect("present"); - const parameterError = yield* flow.parameterEffect("missing-parameter").pipe(Effect.flip); - return { producerError, parameter, parameterError }; + const parameter = yield* flow.parameterEffect(presentParameter); + const legacyParameter = yield* flow.parameterEffect("present"); + const parameterError = yield* flow.parameterEffect("missing-parameter").pipe(Effect.flip); + const invalidParameterError = yield* flow.parameterEffect(invalidParameter).pipe(Effect.flip); + return { producerError, parameter, legacyParameter, parameterError, invalidParameterError }; }), ), ); expect(errors.parameter).toBe(42); + expect(errors.legacyParameter).toBe(42); expect(errors.producerError._tag).toBe("FlowResourceNotFoundError"); expect(errors.producerError.resourceType).toBe("producer"); expect(errors.producerError.resourceName).toBe("missing-producer"); expect(errors.parameterError._tag).toBe("FlowResourceNotFoundError"); expect(errors.parameterError.resourceType).toBe("parameter"); + expect(errors.invalidParameterError._tag).toBe("FlowParameterDecodeError"); + expect(errors.invalidParameterError.parameterName).toBe("present"); + expect(flow.parameter(presentParameter)).toBe(42); + expect(flow.parameter("present")).toBe(42); + expect(() => flow.parameter(invalidParameter)).toThrow("failed schema decoding"); expect(() => flow.producer("missing-producer")).toThrow("not found"); }), ); diff --git a/ts/packages/base/src/errors.ts b/ts/packages/base/src/errors.ts index 5582fc8c..73ce5edb 100644 --- a/ts/packages/base/src/errors.ts +++ b/ts/packages/base/src/errors.ts @@ -140,6 +140,15 @@ export class FlowResourceNotFoundError extends S.TaggedErrorClass()( + "FlowParameterDecodeError", + { + message: S.String, + flowName: S.String, + parameterName: S.String, + }, +) {} + export type TrustGraphError = | TooManyRequestsError | LlmError @@ -155,6 +164,7 @@ export type TrustGraphError = | MessagingTimeoutError | MessagingHandlerError | FlowRuntimeError + | FlowParameterDecodeError | FlowResourceNotFoundError; export type MessagingRuntimeError = @@ -165,6 +175,7 @@ export type MessagingRuntimeError = | MessagingTimeoutError | MessagingHandlerError | FlowRuntimeError + | FlowParameterDecodeError | FlowResourceNotFoundError; export function tooManyRequestsError(message = "Rate limit exceeded"): TooManyRequestsError { @@ -291,6 +302,18 @@ export function flowResourceNotFoundError( }); } +export function flowParameterDecodeError( + flowName: string, + parameterName: string, + error: unknown, +): FlowParameterDecodeError { + return FlowParameterDecodeError.make({ + flowName, + parameterName, + message: `parameter "${parameterName}" in flow "${flowName}" failed schema decoding: ${errorMessage(error)}`, + }); +} + export function errorMessage(error: unknown): string { if (typeof error === "object" && error !== null && "message" in error) { const message = (error as { message?: unknown }).message; diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index 87fc7d75..946f6a13 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -5,10 +5,14 @@ */ import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect"; +import * as O from "effect/Option"; +import * as S from "effect/Schema"; import type { PubSubBackend } from "../backend/types.js"; import { makePubSubService } from "../backend/pubsub.js"; import { + flowParameterDecodeError, flowResourceNotFoundError, + type FlowParameterDecodeError, type FlowResourceNotFoundError, type PubSubError, } from "../errors.js"; @@ -25,6 +29,7 @@ import { makeRequestResponseFactoryService, } from "../messaging/runtime.js"; import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js"; +import type { ParameterSpec } from "../spec/parameter-spec.js"; import type { Spec, SpecRuntimeRequirements } from "../spec/types.js"; export interface FlowDefinition { @@ -57,6 +62,8 @@ export interface FlowRequestor { readonly stop: () => Promise; } +type FlowParameterError = FlowResourceNotFoundError | FlowParameterDecodeError; + export function makeFlow( name: string, processorId: string, @@ -97,6 +104,60 @@ export function makeFlow( }; }; + const getParameterEffect = (parameterName: string): Effect.Effect => { + const value = parameters.get(parameterName); + return value === undefined + ? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName)) + : Effect.succeed(value); + }; + + const getParameter = (parameterName: string): unknown => { + const value = parameters.get(parameterName); + if (value === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName); + return value; + }; + + const decodeParameterEffect = ( + spec: ParameterSpec, + value: unknown, + ): Effect.Effect => + S.decodeUnknownEffect(spec.schema)(value).pipe( + Effect.mapError((error) => flowParameterDecodeError(name, spec.name, error)), + ); + + const decodeParameter = (spec: ParameterSpec, value: unknown): T => { + const decoded = S.decodeUnknownOption(spec.schema)(value); + if (O.isSome(decoded)) return decoded.value; + throw flowParameterDecodeError(name, spec.name, "Parameter value does not match schema"); + }; + + function parameterEffect( + parameterSpec: ParameterSpec, + ): Effect.Effect; + function parameterEffect( + parameterName: string, + ): Effect.Effect; + function parameterEffect( + parameter: string | ParameterSpec, + ): Effect.Effect { + if (typeof parameter === "string") { + return getParameterEffect(parameter); + } + return getParameterEffect(parameter.name).pipe( + Effect.flatMap((value) => decodeParameterEffect(parameter, value)), + ); + } + + function parameter(parameterSpec: ParameterSpec): T; + function parameter(parameterName: string): unknown; + function parameter(parameter: string | ParameterSpec): unknown { + const value = getParameter(typeof parameter === "string" ? parameter : parameter.name); + if (typeof parameter === "string") { + return value; + } + return decodeParameter(parameter, value); + } + const flow = { name, processorId, @@ -198,12 +259,7 @@ export function makeFlow( ? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName)) : Effect.succeed(rr as EffectRequestResponse); }, - parameterEffect(parameterName: string): Effect.Effect { - const v = parameters.get(parameterName); - return v === undefined - ? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName)) - : Effect.succeed(v as T); - }, + parameterEffect, producer(producerName: string): FlowProducer { const p = producers.get(producerName); if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName); @@ -234,11 +290,7 @@ export function makeFlow( stop: () => compatibilityRuntime.runPromise(rr.stop), }; }, - parameter(parameterName: string): T { - const v = parameters.get(parameterName); - if (v === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName); - return v as T; - }, + parameter, }; return flow; diff --git a/ts/packages/base/src/spec/parameter-spec.ts b/ts/packages/base/src/spec/parameter-spec.ts index 2c933158..5d8ed3b5 100644 --- a/ts/packages/base/src/spec/parameter-spec.ts +++ b/ts/packages/base/src/spec/parameter-spec.ts @@ -4,13 +4,31 @@ * Python reference: trustgraph-base/trustgraph/base/parameter_spec.py */ -import { Effect } from "effect"; +import { Effect, type Context } from "effect"; +import * as S from "effect/Schema"; +import type { PubSubBackend } from "../backend/types.js"; import type { Spec } from "./types.js"; import type { Flow, FlowDefinition } from "../processor/flow.js"; -export interface ParameterSpec extends Spec {} +declare const ParameterSpecType: unique symbol; -export function makeParameterSpec(name: string): ParameterSpec { +const UnknownParameterSchema: S.Codec = S.Unknown; + +export interface ParameterSpec extends Spec { + readonly [ParameterSpecType]?: (_: T) => T; + readonly schema: S.Codec; +} + +export function makeParameterSpec(name: string): ParameterSpec; +export function makeParameterSpec( + name: string, + schema: S.Codec, +): ParameterSpec; +export function makeParameterSpec( + name: string, + schema?: S.Codec, +) { + const parameterSchema = schema ?? UnknownParameterSchema; const addEffect = (flow: Flow, definition: FlowDefinition) => Effect.sync(() => { const value = definition.parameters?.[name]; @@ -19,8 +37,14 @@ export function makeParameterSpec(name: string): ParameterSpec { return { name, + schema: parameterSchema, addEffect, - add: (flow, pubsub, definition, context) => + add: ( + flow: Flow, + pubsub: PubSubBackend, + definition: FlowDefinition, + context: Context.Context, + ) => flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context), }; } diff --git a/ts/packages/flow/src/chunking/service.ts b/ts/packages/flow/src/chunking/service.ts index d00b8407..3835b126 100644 --- a/ts/packages/flow/src/chunking/service.ts +++ b/ts/packages/flow/src/chunking/service.ts @@ -27,10 +27,13 @@ import { import { NodeRuntime } from "@effect/platform-node"; import { makeFlowProcessorProgram } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime } from "effect"; +import * as S from "effect/Schema"; import { recursiveSplit } from "./recursive-splitter.js"; const DEFAULT_CHUNK_SIZE = 2000; const DEFAULT_CHUNK_OVERLAP = 100; +const ChunkSizeParameter = makeParameterSpec("chunk-size", S.Number); +const ChunkOverlapParameter = makeParameterSpec("chunk-overlap", S.Number); const onChunkMessage = Effect.fn("ChunkingService.onMessage")(function* ( msg: TextDocument, @@ -40,10 +43,10 @@ const onChunkMessage = Effect.fn("ChunkingService.onMessage")(function* ( const requestId = properties.id; if (requestId === undefined || requestId.length === 0) return; - const chunkSize = yield* flowCtx.flow.parameterEffect("chunk-size").pipe( + const chunkSize = yield* flowCtx.flow.parameterEffect(ChunkSizeParameter).pipe( Effect.orElseSucceed(() => DEFAULT_CHUNK_SIZE), ); - const chunkOverlap = yield* flowCtx.flow.parameterEffect("chunk-overlap").pipe( + const chunkOverlap = yield* flowCtx.flow.parameterEffect(ChunkOverlapParameter).pipe( Effect.orElseSucceed(() => DEFAULT_CHUNK_OVERLAP), ); @@ -82,8 +85,8 @@ export const makeChunkingSpecs = (): ReadonlyArray< ), makeProducerSpec("chunk-output"), makeProducerSpec("chunk-triples"), - makeParameterSpec("chunk-size"), - makeParameterSpec("chunk-overlap"), + ChunkSizeParameter, + ChunkOverlapParameter, ]; export type ChunkingService = FlowProcessorRuntime;