diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 6fd07d87..54070d46 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 consumer -concurrency ownership slice: +Current signal counts from `ts/packages` after the 2026-06-02 +request-response stop signal slice: | Signal | Count | | --- | ---: | @@ -21,8 +21,8 @@ concurrency ownership slice: | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | | `Layer.succeed` | 12 | -| `Map<` | 82 | -| `WebSocket` | 64 | +| `Map<` | 88 | +| `WebSocket` | 74 | | `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | @@ -121,6 +121,10 @@ Notes: of sharing a single `BackendConsumer.receive()` handle. `stop` is now idempotent through `Ref`, so explicit stop and scoped finalizers do not close workers twice. +- The request-response stop signal slice added a `Deferred` shutdown signal to + `makeEffectRequestResponseFromPubSub`. Pending requests now race response + waiting against runtime stop and fail promptly with a tagged + `MessagingLifecycleError` instead of waiting for timeout. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1263,6 +1267,32 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: Request-Response Stop Signal Slice + +- Status: migrated and root-verified. +- Completed: + - `makeEffectRequestResponseFromPubSub` now owns a `Deferred` stop signal for + the lifetime of the scoped request-response runtime. + - `request()` races response waiting against that stop signal before applying + the request timeout, so pending calls fail promptly when the runtime stops. + - `stop()` fails the stop signal with a tagged `MessagingLifecycleError` + before interrupting the dispatch loop and closing the producer/consumer + resources. + - Flow PDF decoder and graph embeddings service error unions now include + `MessagingLifecycleError` because requestor failures can surface shutdown. + - Added Effect-native runtime coverage proving a pending request fails with + the tagged lifecycle error when the request-response runtime stops. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1299,9 +1329,10 @@ Notes: construction and stream/consumer state ownership. - Consumer rate-limit retry timeout behavior is now wired in both legacy and Effect-native consumer paths. Effect-native consumer concurrency now owns - one backend consumer per worker. Remaining consumer runtime work should - focus on request/response pending shutdown semantics and the legacy - consumer facade's blocking compatibility shape. + one backend consumer per worker, and request-response pending shutdown now + fails through a tagged lifecycle error. Remaining consumer runtime work + should focus on the legacy consumer facade's blocking compatibility shape + and scoped backend/layer construction. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1376,10 +1407,13 @@ Notes: create-on-failure behavior. Future backend slices should move connection/stream state into scoped Effect services. - Treat rate-limit retry timeout semantics as complete; next consumer slices - should focus on shutdown, not retry policy. + should focus on blocking compatibility and backend/layer ownership, not + retry policy. - Treat Effect-native per-worker consumer ownership as complete; do not flag `makeEffectConsumerFromPubSub` concurrency for shared backend receive handles. + - Treat request-response pending shutdown semantics as complete; do not flag + `waitForResponse` timeout behavior for stopped runtimes. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. @@ -1472,6 +1506,10 @@ Do not flag these as rewrite blockers without additional proof: rechecked against `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4/packages/effect/src/PubSub.ts`, whose exported API is local publish/subscribe over Effect queues. +- Request-response pending shutdown semantics are complete in + `makeEffectRequestResponseFromPubSub`: pending calls race response waiting + against a `Deferred` stop signal and fail with tagged + `MessagingLifecycleError`. - `ts/packages/flow/src/gateway/rpc-protocol.ts` is a Fastify socket compatibility bridge. Do not flag its internal connection maps/sets as a standalone replacement target until the gateway is ready to move onto Effect diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index c280bee4..f65ab0c1 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -408,8 +408,8 @@ describe("Effect-native messaging runtime", () => { it.effect( "fails request-response calls with a typed timeout", Effect.fnUntraced(function* () { - const responseConsumer = new ScriptedConsumer(); - const backend = new RuntimeBackend(responseConsumer as BackendConsumer); + const responseConsumer = new ScriptedConsumer(); + const backend = new RuntimeBackend(responseConsumer); const error = yield* Effect.scoped( Effect.gen(function* () { @@ -440,6 +440,41 @@ describe("Effect-native messaging runtime", () => { }), ); + it.effect( + "fails pending request-response calls when the runtime stops", + Effect.fnUntraced(function* () { + const responseConsumer = new ScriptedConsumer(); + const backend = new RuntimeBackend(responseConsumer as BackendConsumer); + + const error = yield* Effect.scoped( + Effect.gen(function* () { + const requestor = yield* makeEffectRequestResponseFromPubSub( + PubSub.fromBackend(backend), + { + ...defaultMessagingRuntimeConfig, + consumerReceiveTimeoutMs: 1, + }, + { + requestTopic: "tg.test.request", + responseTopic: "tg.test.response", + subscription: "sub", + }, + ); + const fiber = yield* requestor.request("request", { timeoutMs: 1_000 }).pipe(Effect.forkChild); + yield* TestClock.adjust(Duration.millis(5)); + yield* requestor.stop; + return yield* Fiber.join(fiber).pipe(Effect.flip); + }), + ); + + expect(error).toMatchObject({ + _tag: "MessagingLifecycleError", + operation: "stop", + resource: "tg.test.request:tg.test.response", + }); + }), + ); + it.effect( "owns Flow lifecycle through a scoped Effect boundary", Effect.fnUntraced(function* () { diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index 8a514fba..f1e5e10f 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -3,7 +3,7 @@ */ import { randomUUID } from "node:crypto"; -import { Context, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect"; +import { Context, Deferred, Duration, Effect, Fiber, Layer, Queue, Ref, Result, Schedule, Scope, Stream } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { @@ -92,7 +92,7 @@ export interface EffectRequestResponse { readonly request: ( request: TReq, options?: EffectRequestOptions, - ) => Effect.Effect; + ) => Effect.Effect; readonly stop: Effect.Effect; } @@ -476,12 +476,17 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR }; const backend = yield* pubsub.createConsumer(createOptions); const subscribers = new Map>(); + const stoppedSignal = yield* Deferred.make(); const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped); let stopped = false; const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { if (stopped) return; stopped = true; + yield* Deferred.fail( + stoppedSignal, + messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"), + ).pipe(Effect.ignore); yield* Fiber.interrupt(fiber); yield* producer.close; yield* closeConsumerBackend(backend, options.responseTopic, options.subscription); @@ -517,6 +522,7 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR Effect.gen(function* () { yield* producer.send(id, request); const result = yield* waitForResponse(queue, requestOptions).pipe( + Effect.raceFirst(Deferred.await(stoppedSignal)), Effect.timeoutOption(Duration.millis(timeoutMs)), ); return yield* O.match(result, { diff --git a/ts/packages/flow/src/decoding/pdf-decoder.ts b/ts/packages/flow/src/decoding/pdf-decoder.ts index e118fcf1..97c57b44 100644 --- a/ts/packages/flow/src/decoding/pdf-decoder.ts +++ b/ts/packages/flow/src/decoding/pdf-decoder.ts @@ -32,6 +32,7 @@ import { type LibrarianRequest, type LibrarianResponse, type MessagingDeliveryError, + type MessagingLifecycleError, type MessagingTimeoutError, type Spec, errorMessage, @@ -54,6 +55,7 @@ export class PdfDecoderError extends S.TaggedErrorClass()( type PdfDecoderHandlerError = | FlowResourceNotFoundError | MessagingDeliveryError + | MessagingLifecycleError | MessagingTimeoutError | PdfDecoderError; diff --git a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts index 00f3cb01..b5986099 100644 --- a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts +++ b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts @@ -20,6 +20,7 @@ import { type FlowContext, type FlowResourceNotFoundError, type MessagingDeliveryError, + type MessagingLifecycleError, type MessagingTimeoutError, type EntityContexts, type EmbeddingsRequest, @@ -41,6 +42,7 @@ type GraphEmbeddingsStoreRequirements = QdrantGraphEmbeddingsStoreService; type GraphEmbeddingsStoreError = | FlowResourceNotFoundError | MessagingDeliveryError + | MessagingLifecycleError | MessagingTimeoutError | QdrantGraphEmbeddingsStoreError;