diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 84a493fb..623d723f 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 gateway dispatcher -ownership and serialization slice: +Current signal counts from `ts/packages` after the 2026-06-02 base producer +scoped runtime slice: | Signal | Count | | --- | ---: | @@ -21,8 +21,8 @@ ownership and serialization slice: | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | | `Layer.succeed` | 12 | -| `Map<` | 88 | -| `WebSocket` | 74 | +| `Map<` | 82 | +| `WebSocket` | 64 | | `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | @@ -96,6 +96,11 @@ Notes: `PubSub` is an in-process hub and does not replace the broker-backed `PubSubBackend`/NATS boundary, but it should be preferred for future in-process broadcast/fanout needs. +- The base producer scoped runtime slice moved the legacy `makeProducer` + Promise facade onto the existing `makeEffectProducerFromPubSub` scoped + factory. Public `start`/`send`/`stop` remain Promise compatibility + boundaries, while producer allocation, flush, and finalizer close now go + through the Effect runtime path. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1128,6 +1133,29 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Base Producer Scoped Runtime Slice + +- Status: migrated and root-verified. +- Completed: + - Kept `PubSubBackend` as the broker adapter boundary; Effect native + `PubSub` remains an in-process primitive and is not a replacement for + broker-backed topics, subscriptions, acknowledgements, codecs, or backend + lifecycle. + - Reworked `makeProducer` so the legacy Promise facade allocates producers + through `makeEffectProducerFromPubSub` inside a closeable `Scope`. + - `stop()` now flushes the Effect producer and closes the scope with the + registered producer finalizer, including the flush-failure path. + - Added focused producer facade coverage for send routing, idempotent stop, + tagged not-started lifecycle errors, and close-on-flush-failure behavior. +- Verification: + - `cd ts && bun run check:tsgo` + - `bun run --cwd ts/packages/base build` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/producer.test.ts` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1155,6 +1183,10 @@ Notes: - The legacy `messaging/subscriber.ts` async queue/fanout implementation is removed. Use native `effect/PubSub` for future in-process fanout, while keeping `PubSubBackend` for broker-backed messaging. + - The legacy producer facade now delegates to the scoped Effect producer + runtime. Remaining broker P0 work should focus on native backend/NATS + runtime shape and consumer polling, not replacing `PubSubBackend` with + `effect/PubSub`. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1222,6 +1254,9 @@ Notes: acquisition and stream/schedule-based consumer loops. - Keep `PubSubBackend` as the compatibility adapter boundary; Effect native `PubSub` remains in-process only. + - Treat the producer Promise facade as a completed compatibility wrapper; + avoid reopening it unless backend runtime changes require a narrower + adapter. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. diff --git a/ts/packages/base/src/__tests__/producer.test.ts b/ts/packages/base/src/__tests__/producer.test.ts new file mode 100644 index 00000000..59a1c277 --- /dev/null +++ b/ts/packages/base/src/__tests__/producer.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it } from "vitest"; +import { + makeProducer, + type BackendConsumer, + type BackendProducer, + type CreateConsumerOptions, + type CreateProducerOptions, + type PubSubBackend, +} from "../index.js"; + +class ProducerBackend implements PubSubBackend { + readonly sent: Array<{ readonly message: unknown; readonly properties?: Record }> = []; + readonly producerTopics: Array = []; + closeCount = 0; + flushCount = 0; + failFlush = false; + + async createProducer(options: CreateProducerOptions): Promise> { + this.producerTopics.push(options.topic); + + return { + send: async (message, properties) => { + this.sent.push(properties === undefined ? { message } : { message, properties }); + }, + flush: async () => { + this.flushCount += 1; + if (this.failFlush) { + return Promise.reject("flush failed"); + } + }, + close: async () => { + this.closeCount += 1; + }, + }; + } + + createConsumer(_options: CreateConsumerOptions): Promise> { + return Promise.reject("consumer not supported"); + } + + async close(): Promise {} +} + +describe("Producer", () => { + it("routes the compatibility facade through the scoped Effect producer", async () => { + const backend = new ProducerBackend(); + const producer = makeProducer(backend, "tg.test.producer"); + + await producer.start(); + await producer.send("message-1", "hello"); + await producer.stop(); + + expect(backend.producerTopics).toEqual(["tg.test.producer"]); + expect(backend.sent).toEqual([ + { message: "hello", properties: { id: "message-1" } }, + ]); + expect(backend.flushCount).toBe(1); + expect(backend.closeCount).toBe(1); + await expect(producer.stop()).resolves.toBeUndefined(); + + const error = await producer.send("message-2", "late").catch((caught: unknown) => caught); + expect(error).toMatchObject({ + _tag: "MessagingLifecycleError", + operation: "send", + resource: "tg.test.producer", + }); + }); + + it("closes the scoped producer when flush fails during stop", async () => { + const backend = new ProducerBackend(); + const producer = makeProducer(backend, "tg.test.producer"); + + await producer.start(); + backend.failFlush = true; + + const error = await producer.stop().catch((caught: unknown) => caught); + + expect(error).toMatchObject({ + _tag: "MessagingDeliveryError", + operation: "flush", + topic: "tg.test.producer", + }); + expect(backend.flushCount).toBe(1); + expect(backend.closeCount).toBe(1); + }); +}); diff --git a/ts/packages/base/src/messaging/producer.ts b/ts/packages/base/src/messaging/producer.ts index 2253c89e..8e355924 100644 --- a/ts/packages/base/src/messaging/producer.ts +++ b/ts/packages/base/src/messaging/producer.ts @@ -6,8 +6,9 @@ import type { PubSubBackend } from "../backend/types.js"; import type { ProducerMetrics } from "../metrics/prometheus.js"; -import { Effect } from "effect"; -import { makeEffectProducerHandle, type EffectProducer } from "./runtime.js"; +import { Effect, Exit, Scope } from "effect"; +import { PubSub } from "../backend/pubsub.js"; +import { makeEffectProducerFromPubSub, type EffectProducer } from "./runtime.js"; import { messagingLifecycleError } from "../errors.js"; export interface Producer { @@ -16,46 +17,61 @@ export interface Producer { readonly stop: () => Promise; } +interface ProducerRuntime { + readonly scope: Scope.Closeable; + readonly producer: EffectProducer; +} + export function makeProducer( pubsub: PubSubBackend, topic: string, metrics?: ProducerMetrics, ): Producer { - let effectProducer: EffectProducer | null = null; + let runtime: ProducerRuntime | null = null; return { start: () => - Effect.runPromise( - Effect.gen(function* () { - const backend = yield* Effect.tryPromise({ - try: () => pubsub.createProducer({ topic }), - catch: (error) => messagingLifecycleError(topic, "create-producer", error), - }); - effectProducer = makeEffectProducerHandle(backend, { - topic, - ...(metrics === undefined ? {} : { metrics }), - }); - }), - ), - send: (id, message) => - effectProducer === null + runtime !== null + ? Promise.resolve() + : Effect.runPromise( + Effect.gen(function* () { + const scope = yield* Scope.make(); + const startProducer = Effect.gen(function* () { + const producer = yield* makeEffectProducerFromPubSub( + PubSub.fromBackend(pubsub), + { + topic, + ...(metrics === undefined ? {} : { metrics }), + }, + ).pipe( + Scope.provide(scope), + Effect.mapError((error) => messagingLifecycleError(topic, "create-producer", error)), + ); + + runtime = { scope, producer }; + }); + + yield* startProducer.pipe( + Effect.onError((cause) => Scope.close(scope, Exit.failCause(cause))), + ); + }), + ), + send: (id, message) => { + const current = runtime; + return current === null ? Effect.runPromise(Effect.fail(messagingLifecycleError(topic, "send", "Producer not started"))) - : Effect.runPromise(effectProducer.send(id, message)), - stop: () => - Effect.runPromise( - Effect.gen(function* () { - if (effectProducer !== null) { - const producer = effectProducer; - yield* producer.flush.pipe( - Effect.flatMap(() => producer.close), - Effect.ensuring( - Effect.sync(() => { - effectProducer = null; - }), - ), - ); - } - }), - ), + : Effect.runPromise(current.producer.send(id, message)); + }, + stop: () => { + const current = runtime; + runtime = null; + return current === null + ? Promise.resolve() + : Effect.runPromise( + current.producer.flush.pipe( + Effect.ensuring(Scope.close(current.scope, Exit.void)), + ), + ); + }, }; }