diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 8bff92ac..de913e83 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1546,6 +1546,34 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-02: Messaging Runtime Duration Config Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/base/src/runtime/messaging-config.ts` now stores internal + runtime timing fields as `Duration.Duration` instead of number-shaped + millisecond fields. + - `loadMessagingRuntimeConfig()` reads `TG_*` timing env vars with + `Config.duration` while preserving legacy bare-number millisecond env + values through a `Config.number(...).map(Duration.millis)` fallback. + - `ts/packages/base/src/messaging/runtime.ts` now uses `Duration` values + directly for sleeps, retry schedules, and timeout options. It converts back + to milliseconds only at the broker `receive(timeoutMs)` boundary and for + existing timeout error payloads. + - Public compatibility options such as `receiveTimeoutMs`, + `rateLimitRetryMs`, `rateLimitTimeoutMs`, and request `timeoutMs` remain + numeric millisecond inputs. + - Tests now cover both legacy numeric env values and native Effect duration + strings. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/schema-effect.test.ts src/__tests__/messaging-runtime.test.ts src/__tests__/consumer.test.ts` + - `cd ts/packages/base && bun run build` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1658,10 +1686,10 @@ Notes: gateway translation, and pure term helper switches. Future work should only reopen this if client socket schema drift appears or a hidden consumer needs a different named-graph shape. - - Messaging runtime `Config.duration` is the next strongest scratch target: - internal runtime config can use `Duration.Duration` while public - `timeoutMs` compatibility surfaces stay numeric. - - Qdrant graph/doc known-collection caches are a good small + - Messaging runtime `Config.duration` cleanup is complete. Internal runtime + config uses `Duration.Duration`; public timeout compatibility inputs and + broker receive/error payload boundaries remain numeric milliseconds. + - Qdrant graph/doc known-collection caches are the next good small `MutableHashSet` candidate; short-lived local traversal sets remain no-ops. - FlowManager and sibling service `() => Effect.gen(...)` factories remain a @@ -1776,7 +1804,7 @@ Notes: triples, compact graph strings, malformed known-tag payloads, and malformed client triple failures. -### P1: Messaging Runtime Duration Config Cleanup +### Complete: Messaging Runtime Duration Config Cleanup - TrustGraph evidence: - `ts/packages/base/src/runtime/messaging-config.ts` @@ -1794,8 +1822,9 @@ Notes: processor, and client boundaries unless their public API is deliberately changed. - Tests: - - Extend base runtime config tests for env duration parsing and verify - messaging retry/timeout behavior still uses the same effective durations. + - Base runtime config tests cover legacy millisecond env values and Effect + duration string env values. + - Messaging runtime and consumer tests preserve retry and timeout behavior. ### P2: Qdrant Known-Collection MutableHashSet Cleanup @@ -1848,10 +1877,10 @@ Notes: ## Recommended PR Order -1. Messaging runtime `Config.duration` / `Duration` cleanup. -2. Qdrant known-collection `MutableHashSet` cleanup. -3. MCP protocol parity tests and legacy stdio flip/removal decision. -4. FlowManager/service `Effect.fn` normalization. +1. Qdrant known-collection `MutableHashSet` cleanup. +2. MCP protocol parity tests and legacy stdio flip/removal decision. +3. FlowManager/service `Effect.fn` normalization. +4. Flow/client RPC stream and remaining service operation `Match` follow-ups. ## No-Op Rules diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index d9f4c3b8..ca85b0b1 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -338,7 +338,7 @@ describe("Effect-native messaging runtime", () => { PubSub.fromBackend(backend), { ...defaultMessagingRuntimeConfig, - consumerReceiveTimeoutMs: 1, + consumerReceiveTimeout: Duration.millis(1), }, { requestTopic: "tg.test.request", @@ -378,7 +378,7 @@ describe("Effect-native messaging runtime", () => { PubSub.fromBackend(backend), { ...defaultMessagingRuntimeConfig, - consumerReceiveTimeoutMs: 1, + consumerReceiveTimeout: Duration.millis(1), }, { requestTopic: "tg.test.request", @@ -417,7 +417,7 @@ describe("Effect-native messaging runtime", () => { PubSub.fromBackend(backend), { ...defaultMessagingRuntimeConfig, - consumerReceiveTimeoutMs: 1, + consumerReceiveTimeout: Duration.millis(1), }, { requestTopic: "tg.test.request", @@ -452,7 +452,7 @@ describe("Effect-native messaging runtime", () => { PubSub.fromBackend(backend), { ...defaultMessagingRuntimeConfig, - consumerReceiveTimeoutMs: 1, + consumerReceiveTimeout: Duration.millis(1), }, { requestTopic: "tg.test.request", diff --git a/ts/packages/base/src/__tests__/schema-effect.test.ts b/ts/packages/base/src/__tests__/schema-effect.test.ts index ceabcedd..b487f85d 100644 --- a/ts/packages/base/src/__tests__/schema-effect.test.ts +++ b/ts/packages/base/src/__tests__/schema-effect.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "@effect/vitest"; -import { ConfigProvider, Effect } from "effect"; +import { ConfigProvider, Duration, Effect } from "effect"; import * as S from "effect/Schema"; import { ConfigRequest, @@ -7,6 +7,7 @@ import { Term, TextCompletionRequest, Triple, + loadMessagingRuntimeConfig, loadProcessorRuntimeConfig, } from "../index.js"; @@ -107,4 +108,56 @@ describe("Effect runtime config", () => { }); }), ); + + it.effect( + "loads messaging durations from legacy millisecond env values", + Effect.fnUntraced(function* () { + const provider = ConfigProvider.fromEnv({ + env: { + TG_CONSUMER_RECEIVE_TIMEOUT_MS: "5", + TG_CONSUMER_ERROR_BACKOFF_MS: "10", + TG_RATE_LIMIT_RETRY_MS: "15", + TG_RATE_LIMIT_TIMEOUT_MS: "20", + TG_REQUEST_TIMEOUT_MS: "25", + }, + }); + + const config = yield* Effect.provide( + loadMessagingRuntimeConfig(), + ConfigProvider.layer(provider), + ); + + expect(Duration.toMillis(config.consumerReceiveTimeout)).toBe(5); + expect(Duration.toMillis(config.consumerErrorBackoff)).toBe(10); + expect(Duration.toMillis(config.rateLimitRetry)).toBe(15); + expect(Duration.toMillis(config.rateLimitTimeout)).toBe(20); + expect(Duration.toMillis(config.requestTimeout)).toBe(25); + }), + ); + + it.effect( + "loads messaging durations from Effect duration env values", + Effect.fnUntraced(function* () { + const provider = ConfigProvider.fromEnv({ + env: { + TG_CONSUMER_RECEIVE_TIMEOUT_MS: "2 seconds", + TG_CONSUMER_ERROR_BACKOFF_MS: "3 seconds", + TG_RATE_LIMIT_RETRY_MS: "4 seconds", + TG_RATE_LIMIT_TIMEOUT_MS: "5 seconds", + TG_REQUEST_TIMEOUT_MS: "6 seconds", + }, + }); + + const config = yield* Effect.provide( + loadMessagingRuntimeConfig(), + ConfigProvider.layer(provider), + ); + + expect(Duration.toMillis(config.consumerReceiveTimeout)).toBe(2_000); + expect(Duration.toMillis(config.consumerErrorBackoff)).toBe(3_000); + expect(Duration.toMillis(config.rateLimitRetry)).toBe(4_000); + expect(Duration.toMillis(config.rateLimitTimeout)).toBe(5_000); + expect(Duration.toMillis(config.requestTimeout)).toBe(6_000); + }), + ); }); diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index 82dcd75f..b802f852 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -52,6 +52,12 @@ import { const isTooManyRequestsError = S.is(TooManyRequestsError); +const durationFromMsOption = ( + value: number | undefined, + fallback: Duration.Duration, +): Duration.Duration => + value === undefined ? fallback : Duration.millis(value); + export type EffectMessageHandler = ( message: T, properties: Record, @@ -254,8 +260,10 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* , config: MessagingRuntimeConfig, ) { - const rateLimitRetryMs = options.rateLimitRetryMs ?? config.rateLimitRetryMs; - const rateLimitTimeoutMs = options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs; + const rateLimitRetry = durationFromMsOption(options.rateLimitRetryMs, config.rateLimitRetry); + const rateLimitTimeout = durationFromMsOption(options.rateLimitTimeoutMs, config.rateLimitTimeout); + const rateLimitRetryMs = Duration.toMillis(rateLimitRetry); + const rateLimitTimeoutMs = Duration.toMillis(rateLimitTimeout); const runHandler = (): Effect.Effect => options.handler(message.value(), message.properties(), flow).pipe( Effect.mapError((error): TooManyRequestsError | MessagingHandlerError => @@ -276,11 +284,11 @@ const handleMessageWithRetry = Effect.fn("handleMessageWithRetry")(function* Effect.fail(messagingTimeoutError("rate-limit", rateLimitTimeoutMs)), }), Effect.mapError((error) => @@ -325,14 +333,18 @@ const consumerLoop = ( options: EffectConsumerOptions, flow: FlowContext, config: MessagingRuntimeConfig, -): Effect.Effect => - Effect.whileLoop({ +): Effect.Effect => { + const receiveTimeout = durationFromMsOption(options.receiveTimeoutMs, config.consumerReceiveTimeout); + const receiveTimeoutMs = Duration.toMillis(receiveTimeout); + const errorBackoff = durationFromMsOption(options.errorBackoffMs, config.consumerErrorBackoff); + + return Effect.whileLoop({ while: () => true, body: () => - receiveMessage(backend, options.topic, options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs).pipe( + receiveMessage(backend, options.topic, receiveTimeoutMs).pipe( Effect.flatMap((message) => message === null - ? Effect.sleep(Duration.millis(options.receiveTimeoutMs ?? config.consumerReceiveTimeoutMs)) + ? Effect.sleep(receiveTimeout) : processConsumerMessage(backend, options, flow, message, config), ), Effect.catch((error) => @@ -342,13 +354,14 @@ const consumerLoop = ( subscription: options.subscription, }).pipe( Effect.flatMap(() => - Effect.sleep(Duration.millis(options.errorBackoffMs ?? config.consumerErrorBackoffMs)), + Effect.sleep(errorBackoff), ), ), ), ), step: () => undefined, }); +}; export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPubSub")(function* ( pubsub: PubSubService, @@ -364,15 +377,10 @@ export const makeEffectConsumerFromPubSub = Effect.fn("makeEffectConsumerFromPub }; const concurrency = Math.max(1, options.concurrency ?? 1); const workerIndexes = Array.from({ length: concurrency }, (_value, index) => index); - const workerConfig = { - ...config, - rateLimitRetryMs: options.rateLimitRetryMs ?? config.rateLimitRetryMs, - rateLimitTimeoutMs: options.rateLimitTimeoutMs ?? config.rateLimitTimeoutMs, - }; const workers = yield* Effect.forEach(workerIndexes, () => Effect.gen(function* () { const backend = yield* pubsub.createConsumer(createOptions); - const fiber = yield* consumerLoop(backend, options, flow, workerConfig).pipe(Effect.forkScoped); + const fiber = yield* consumerLoop(backend, options, flow, config).pipe(Effect.forkScoped); return { backend, fiber }; }), ); @@ -417,10 +425,10 @@ const dispatchResponseLoop = ( Effect.whileLoop({ while: () => true, body: () => - receiveMessage(backend, responseTopic, config.consumerReceiveTimeoutMs).pipe( + receiveMessage(backend, responseTopic, Duration.toMillis(config.consumerReceiveTimeout)).pipe( Effect.flatMap((message) => { if (message === null) { - return Effect.sleep(Duration.millis(config.consumerReceiveTimeoutMs)); + return Effect.sleep(config.consumerReceiveTimeout); } const id = message.properties().id; @@ -438,7 +446,7 @@ const dispatchResponseLoop = ( Effect.logError("[RequestResponse] Response dispatch failed", { error: error.message, topic: responseTopic, - }).pipe(Effect.flatMap(() => Effect.sleep(Duration.millis(config.consumerErrorBackoffMs)))), + }).pipe(Effect.flatMap(() => Effect.sleep(config.consumerErrorBackoff))), ), ), step: () => undefined, @@ -532,7 +540,8 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR requestOptions?: EffectRequestOptions, ) => { const id = randomUUID(); - const timeoutMs = requestOptions?.timeoutMs ?? config.requestTimeoutMs; + const timeout = durationFromMsOption(requestOptions?.timeoutMs, config.requestTimeout); + const timeoutMs = Duration.toMillis(timeout); return Effect.scoped( Effect.gen(function* () { @@ -540,7 +549,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR yield* producer.send(id, request); const result = yield* waitForResponse(subscription, id, requestOptions).pipe( Effect.raceFirst(Deferred.await(stoppedSignal)), - Effect.timeoutOption(Duration.millis(timeoutMs)), + Effect.timeoutOption(timeout), ); return yield* O.match(result, { onNone: () => Effect.fail(messagingTimeoutError("request-response", timeoutMs)), diff --git a/ts/packages/base/src/runtime/messaging-config.ts b/ts/packages/base/src/runtime/messaging-config.ts index bb45334d..b5ef1bc8 100644 --- a/ts/packages/base/src/runtime/messaging-config.ts +++ b/ts/packages/base/src/runtime/messaging-config.ts @@ -2,46 +2,57 @@ * Effect Config contracts for messaging runtime behavior. */ -import { Config, Effect } from "effect"; +import { Config, Duration, Effect } from "effect"; export interface MessagingRuntimeConfig { - readonly consumerReceiveTimeoutMs: number; - readonly consumerErrorBackoffMs: number; - readonly rateLimitRetryMs: number; - readonly rateLimitTimeoutMs: number; - readonly requestTimeoutMs: number; + readonly consumerReceiveTimeout: Duration.Duration; + readonly consumerErrorBackoff: Duration.Duration; + readonly rateLimitRetry: Duration.Duration; + readonly rateLimitTimeout: Duration.Duration; + readonly requestTimeout: Duration.Duration; } export const defaultMessagingRuntimeConfig: MessagingRuntimeConfig = { - consumerReceiveTimeoutMs: 2_000, - consumerErrorBackoffMs: 1_000, - rateLimitRetryMs: 10_000, - rateLimitTimeoutMs: 7_200_000, - requestTimeoutMs: 300_000, + consumerReceiveTimeout: Duration.millis(2_000), + consumerErrorBackoff: Duration.millis(1_000), + rateLimitRetry: Duration.millis(10_000), + rateLimitTimeout: Duration.millis(7_200_000), + requestTimeout: Duration.millis(300_000), }; +const durationConfig = (name: string, defaultValue: Duration.Duration) => + Config.duration(name).pipe( + Config.orElse(() => Config.number(name).pipe(Config.map(Duration.millis))), + Config.withDefault(defaultValue), + ); + export const loadMessagingRuntimeConfig = Effect.fn("loadMessagingRuntimeConfig")(function* () { - const consumerReceiveTimeoutMs = yield* Config.number("TG_CONSUMER_RECEIVE_TIMEOUT_MS").pipe( - Config.withDefault(defaultMessagingRuntimeConfig.consumerReceiveTimeoutMs), + const consumerReceiveTimeout = yield* durationConfig( + "TG_CONSUMER_RECEIVE_TIMEOUT_MS", + defaultMessagingRuntimeConfig.consumerReceiveTimeout, ); - const consumerErrorBackoffMs = yield* Config.number("TG_CONSUMER_ERROR_BACKOFF_MS").pipe( - Config.withDefault(defaultMessagingRuntimeConfig.consumerErrorBackoffMs), + const consumerErrorBackoff = yield* durationConfig( + "TG_CONSUMER_ERROR_BACKOFF_MS", + defaultMessagingRuntimeConfig.consumerErrorBackoff, ); - const rateLimitRetryMs = yield* Config.number("TG_RATE_LIMIT_RETRY_MS").pipe( - Config.withDefault(defaultMessagingRuntimeConfig.rateLimitRetryMs), + const rateLimitRetry = yield* durationConfig( + "TG_RATE_LIMIT_RETRY_MS", + defaultMessagingRuntimeConfig.rateLimitRetry, ); - const rateLimitTimeoutMs = yield* Config.number("TG_RATE_LIMIT_TIMEOUT_MS").pipe( - Config.withDefault(defaultMessagingRuntimeConfig.rateLimitTimeoutMs), + const rateLimitTimeout = yield* durationConfig( + "TG_RATE_LIMIT_TIMEOUT_MS", + defaultMessagingRuntimeConfig.rateLimitTimeout, ); - const requestTimeoutMs = yield* Config.number("TG_REQUEST_TIMEOUT_MS").pipe( - Config.withDefault(defaultMessagingRuntimeConfig.requestTimeoutMs), + const requestTimeout = yield* durationConfig( + "TG_REQUEST_TIMEOUT_MS", + defaultMessagingRuntimeConfig.requestTimeout, ); return { - consumerReceiveTimeoutMs, - consumerErrorBackoffMs, - rateLimitRetryMs, - rateLimitTimeoutMs, - requestTimeoutMs, + consumerReceiveTimeout, + consumerErrorBackoff, + rateLimitRetry, + rateLimitTimeout, + requestTimeout, } satisfies MessagingRuntimeConfig; });