From 48710a05183ca830f4ab1f17eb15b6dfee0fd4bf Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 06:16:03 -0500 Subject: [PATCH] Use Ref for request response stop state --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 25 ++++++++++++++++++- .../src/__tests__/messaging-runtime.test.ts | 2 ++ ts/packages/base/src/messaging/runtime.ts | 7 +++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 07cb2344..9bb89507 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1875,6 +1875,27 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Request-Response Stop Ref Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/base/src/messaging/runtime.ts` now tracks + `makeEffectRequestResponseFromPubSub` stop idempotence with `Ref` instead + of local mutable state. + - The request-response runtime now matches the consumer runtime stop pattern: + `Ref.getAndSet` gates shutdown, signal failure, PubSub shutdown, fiber + interruption, producer close, and backend consumer close. + - `ts/packages/base/src/__tests__/messaging-runtime.test.ts` now asserts that + explicit request-response stop plus scoped finalization closes the producer + and response consumer exactly once. +- Verification: + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts src/__tests__/request-response.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2052,6 +2073,8 @@ Notes: handles. - Treat request-response pending shutdown semantics as complete; do not flag `waitForResponse` timeout behavior for stopped runtimes. + - Treat request-response stop idempotence as complete; stop state now uses an + Effect `Ref` and explicit stop plus scoped finalizer closes resources once. - Treat request-response in-process fanout as complete: response routing now uses native `effect/PubSub` subscriptions instead of a hand-managed subscriber map. @@ -2237,7 +2260,7 @@ Do not flag these as rewrite blockers without additional proof: - Request-response pending shutdown semantics are complete in `makeEffectRequestResponseFromPubSub`: pending calls race response waiting against a `Deferred` stop signal and fail with tagged - `MessagingLifecycleError`. + `MessagingLifecycleError`; stop idempotence is owned by an Effect `Ref`. - Legacy `makeConsumer` facade blocking-loop ownership is complete: `start()` now initializes scoped Effect consumers and returns after startup, while `stop()` closes the native consumer scope. diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index ca85b0b1..59d308ec 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -472,6 +472,8 @@ describe("Effect-native messaging runtime", () => { operation: "stop", resource: "tg.test.request:tg.test.response", }); + expect(backend.producer.closeCount).toBe(1); + expect(responseConsumer.closeCount).toBe(1); }), ); diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index b802f852..ed6f2ba3 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -509,11 +509,12 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR const responses = yield* EffectPubSub.unbounded>(); const stoppedSignal = yield* Deferred.make(); const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, responses, config).pipe(Effect.forkScoped); - let stopped = false; + const stopped = yield* Ref.make(false); const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { - if (stopped) return; - stopped = true; + const alreadyStopped = yield* Ref.getAndSet(stopped, true); + if (alreadyStopped) return; + yield* Deferred.fail( stoppedSignal, messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"),