Use Ref for request response stop state

This commit is contained in:
elpresidank 2026-06-04 06:16:03 -05:00
parent e46fc64275
commit 48710a0518
3 changed files with 30 additions and 4 deletions

View file

@ -1875,6 +1875,27 @@ Notes:
- `cd ts && bun run lint` - `cd ts && bun run lint`
- `git diff --check` - `git diff --check`
### 2026-06-04: Request-Response Stop Ref Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/base/src/messaging/runtime.ts` now tracks
`makeEffectRequestResponseFromPubSub` stop idempotence with `Ref` instead
of local mutable state.
- The request-response runtime now matches the consumer runtime stop pattern:
`Ref.getAndSet` gates shutdown, signal failure, PubSub shutdown, fiber
interruption, producer close, and backend consumer close.
- `ts/packages/base/src/__tests__/messaging-runtime.test.ts` now asserts that
explicit request-response stop plus scoped finalization closes the producer
and response consumer exactly once.
- Verification:
- `cd ts/packages/base && bunx --bun vitest run src/__tests__/messaging-runtime.test.ts src/__tests__/request-response.test.ts`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -2052,6 +2073,8 @@ Notes:
handles. handles.
- Treat request-response pending shutdown semantics as complete; do not flag - Treat request-response pending shutdown semantics as complete; do not flag
`waitForResponse` timeout behavior for stopped runtimes. `waitForResponse` timeout behavior for stopped runtimes.
- Treat request-response stop idempotence as complete; stop state now uses an
Effect `Ref` and explicit stop plus scoped finalizer closes resources once.
- Treat request-response in-process fanout as complete: response routing now - Treat request-response in-process fanout as complete: response routing now
uses native `effect/PubSub` subscriptions instead of a hand-managed uses native `effect/PubSub` subscriptions instead of a hand-managed
subscriber map. subscriber map.
@ -2237,7 +2260,7 @@ Do not flag these as rewrite blockers without additional proof:
- Request-response pending shutdown semantics are complete in - Request-response pending shutdown semantics are complete in
`makeEffectRequestResponseFromPubSub`: pending calls race response waiting `makeEffectRequestResponseFromPubSub`: pending calls race response waiting
against a `Deferred` stop signal and fail with tagged against a `Deferred` stop signal and fail with tagged
`MessagingLifecycleError`. `MessagingLifecycleError`; stop idempotence is owned by an Effect `Ref`.
- Legacy `makeConsumer` facade blocking-loop ownership is complete: - Legacy `makeConsumer` facade blocking-loop ownership is complete:
`start()` now initializes scoped Effect consumers and returns after startup, `start()` now initializes scoped Effect consumers and returns after startup,
while `stop()` closes the native consumer scope. while `stop()` closes the native consumer scope.

View file

@ -472,6 +472,8 @@ describe("Effect-native messaging runtime", () => {
operation: "stop", operation: "stop",
resource: "tg.test.request:tg.test.response", resource: "tg.test.request:tg.test.response",
}); });
expect(backend.producer.closeCount).toBe(1);
expect(responseConsumer.closeCount).toBe(1);
}), }),
); );

View file

@ -509,11 +509,12 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
const responses = yield* EffectPubSub.unbounded<ResponseEnvelope<TRes>>(); const responses = yield* EffectPubSub.unbounded<ResponseEnvelope<TRes>>();
const stoppedSignal = yield* Deferred.make<never, MessagingLifecycleError>(); const stoppedSignal = yield* Deferred.make<never, MessagingLifecycleError>();
const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, responses, config).pipe(Effect.forkScoped); const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, responses, config).pipe(Effect.forkScoped);
let stopped = false; const stopped = yield* Ref.make(false);
const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () { const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () {
if (stopped) return; const alreadyStopped = yield* Ref.getAndSet(stopped, true);
stopped = true; if (alreadyStopped) return;
yield* Deferred.fail( yield* Deferred.fail(
stoppedSignal, stoppedSignal,
messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"), messagingLifecycleError(`${options.requestTopic}:${options.responseTopic}`, "stop", "RequestResponse stopped"),