From 00a26b7debc6265e23a3ee3b6749078b307f8e8c Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 05:44:21 -0500 Subject: [PATCH] Map NATS boundary failures to tagged errors --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 41 +++++- .../base/src/__tests__/nats-backend.test.ts | 138 ++++++++++++++++++ ts/packages/base/src/backend/nats.ts | 49 ++++--- 3 files changed, 207 insertions(+), 21 deletions(-) create mode 100644 ts/packages/base/src/__tests__/nats-backend.test.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 623d723f..d33256c6 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 base producer -scoped runtime slice: +Current signal counts from `ts/packages` after the 2026-06-02 NATS typed +boundary slice: | Signal | Count | | --- | ---: | @@ -26,13 +26,13 @@ scoped runtime slice: | `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | -| `receive(` | 17 | +| `receive(` | 18 | | `while (` | 2 | | `new Error` | 8 | | `new Promise` | 10 | | `JSON.parse` | 4 | | `localStorage` | 9 | -| `JSON.stringify` | 7 | +| `JSON.stringify` | 8 | | `setTimeout` | 4 | | `process.env` | 3 | @@ -101,6 +101,10 @@ Notes: factory. Public `start`/`send`/`stop` remain Promise compatibility boundaries, while producer allocation, flush, and finalizer close now go through the Effect runtime path. +- The NATS typed boundary slice removed the dynamic `import("nats")` header + path and maps header construction plus `ack()`/`nak()` failures into tagged + `PubSubError`s with `Effect.try`. The `receive(` and `JSON.stringify` count + increases are from the new mocked NATS backend test, not production code. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1156,6 +1160,28 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: NATS Typed Boundary Slice + +- Status: migrated and root-verified. +- Completed: + - Replaced the dynamic header import inside `makeNatsProducer` with the + static NATS `headers` export. + - Wrapped publish header construction in `Effect.try`, so invalid header + names/values fail as tagged `PubSubError` values instead of defects. + - Wrapped NATS `ack()` and `nak()` calls in `Effect.try`, preserving the + existing wrong-message guard and mapping thrown acknowledgement failures + into tagged `PubSubError`s. + - Added a mocked NATS backend test covering invalid publish headers and + thrown ack/nak failures through the public `makeNatsBackend` path. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/nats-backend.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1187,6 +1213,10 @@ Notes: runtime. Remaining broker P0 work should focus on native backend/NATS runtime shape and consumer polling, not replacing `PubSubBackend` with `effect/PubSub`. + - NATS header construction and ack/nak operations now map thrown SDK + failures into tagged `PubSubError`s. Remaining NATS work is selective + 404 handling, scoped backend/layer construction, and stream/consumer state + ownership. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1257,6 +1287,9 @@ Notes: - Treat the producer Promise facade as a completed compatibility wrapper; avoid reopening it unless backend runtime changes require a narrower adapter. + - Keep NATS SDK boundary failures typed; future backend slices should avoid + catch-all create-on-failure behavior and move connection/stream state into + scoped Effect services. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. diff --git a/ts/packages/base/src/__tests__/nats-backend.test.ts b/ts/packages/base/src/__tests__/nats-backend.test.ts new file mode 100644 index 00000000..f32d31be --- /dev/null +++ b/ts/packages/base/src/__tests__/nats-backend.test.ts @@ -0,0 +1,138 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { makeNatsBackend } from "../backend/nats.js"; + +const natsMock = vi.hoisted(() => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + + const publish = vi.fn(); + const consumersGet = vi.fn(); + const consumersAdd = vi.fn(); + const streamsInfo = vi.fn(); + const streamsAdd = vi.fn(); + const next = vi.fn(); + const ack = vi.fn(); + const nak = vi.fn(); + const drain = vi.fn(); + const headerAppend = vi.fn(); + const headers = vi.fn(); + const connect = vi.fn(); + + return { + ack, + connect, + consumersAdd, + consumersGet, + decoder, + drain, + encoder, + headerAppend, + headers, + nak, + next, + publish, + streamsAdd, + streamsInfo, + }; +}); + +vi.mock("nats", () => ({ + AckPolicy: { Explicit: "explicit" }, + DeliverPolicy: { All: "all", New: "new" }, + StringCodec: () => ({ + decode: (input: Uint8Array) => natsMock.decoder.decode(input), + encode: (input: string) => natsMock.encoder.encode(input), + }), + connect: natsMock.connect, + headers: natsMock.headers, +})); + +function resetNatsMock(): void { + vi.clearAllMocks(); + + natsMock.publish.mockResolvedValue({ duplicate: false, seq: 1, stream: "tg_test" }); + natsMock.consumersGet.mockResolvedValue({ next: natsMock.next }); + natsMock.consumersAdd.mockResolvedValue(undefined); + natsMock.streamsInfo.mockResolvedValue({ config: { name: "tg_test" } }); + natsMock.streamsAdd.mockResolvedValue(undefined); + natsMock.next.mockResolvedValue({ + ack: natsMock.ack, + data: natsMock.encoder.encode(JSON.stringify("payload")), + headers: undefined, + nak: natsMock.nak, + }); + natsMock.ack.mockReturnValue(undefined); + natsMock.nak.mockReturnValue(undefined); + natsMock.drain.mockResolvedValue(undefined); + natsMock.headerAppend.mockReturnValue(undefined); + natsMock.headers.mockReturnValue({ append: natsMock.headerAppend }); + natsMock.connect.mockResolvedValue({ + drain: natsMock.drain, + jetstream: () => ({ + consumers: { get: natsMock.consumersGet }, + publish: natsMock.publish, + }), + jetstreamManager: () => + Promise.resolve({ + consumers: { add: natsMock.consumersAdd }, + streams: { + add: natsMock.streamsAdd, + info: natsMock.streamsInfo, + }, + }), + }); +} + +describe("NATS backend", () => { + beforeEach(() => { + resetNatsMock(); + }); + + it("maps invalid publish headers to tagged PubSubError", async () => { + natsMock.headerAppend.mockImplementation(() => { + throw "invalid header"; + }); + const backend = makeNatsBackend("nats://test"); + const producer = await backend.createProducer({ topic: "tg.test.topic" }); + + const error = await producer.send("hello", { bad: "value" }).catch((caught: unknown) => caught); + + expect(error).toMatchObject({ + _tag: "PubSubError", + operation: "headers:tg.test.topic", + }); + expect(natsMock.publish).not.toHaveBeenCalled(); + }); + + it("maps thrown ack and nak failures to tagged PubSubError", async () => { + natsMock.ack.mockImplementation(() => { + throw "ack failed"; + }); + natsMock.nak.mockImplementation(() => { + throw "nak failed"; + }); + const backend = makeNatsBackend("nats://test"); + const consumer = await backend.createConsumer({ + topic: "tg.test.topic", + subscription: "worker", + }); + const message = await consumer.receive(1); + + expect(message).not.toBeNull(); + if (message === null) { + return; + } + + const ackError = await consumer.acknowledge(message).catch((caught: unknown) => caught); + const nakError = await consumer.negativeAcknowledge(message).catch((caught: unknown) => caught); + + expect(ackError).toMatchObject({ + _tag: "PubSubError", + operation: "acknowledge:tg.test.topic", + }); + expect(nakError).toMatchObject({ + _tag: "PubSubError", + operation: "negative-acknowledge:tg.test.topic", + }); + }); +}); diff --git a/ts/packages/base/src/backend/nats.ts b/ts/packages/base/src/backend/nats.ts index 44ccce86..fab55047 100644 --- a/ts/packages/base/src/backend/nats.ts +++ b/ts/packages/base/src/backend/nats.ts @@ -14,7 +14,9 @@ import { type JetStreamClient, type JetStreamManager, type Consumer as NatsJsConsumer, + headers, type JsMsg, + type JetStreamPublishOptions, StringCodec, AckPolicy, DeliverPolicy, @@ -78,6 +80,25 @@ function makeNatsProducer( subject: string, schema?: S.Codec, ): BackendProducer { + const makePublishOptions = ( + properties: Record | undefined, + ): Effect.Effect, ReturnType> => { + if (properties === undefined || Object.keys(properties).length === 0) { + return Effect.succeed({}); + } + + return Effect.try({ + try: () => { + const hdrs = headers(); + for (const [key, val] of Object.entries(properties)) { + hdrs.append(key, val); + } + return { headers: hdrs }; + }, + catch: (error) => pubSubError(`headers:${subject}`, error), + }); + }; + return { send: (message, properties) => Effect.runPromise( @@ -91,19 +112,7 @@ function makeNatsProducer( Effect.mapError((error) => pubSubError(`encode-json:${subject}`, error)), ); const data = sc.encode(json); - const opts: Record = {}; - - if (properties !== undefined && Object.keys(properties).length > 0) { - const { headers } = yield* Effect.tryPromise({ - try: () => import("nats"), - catch: (error) => pubSubError("import:nats-headers", error), - }); - const hdrs = headers(); - for (const [key, val] of Object.entries(properties)) { - hdrs.append(key, val); - } - opts.headers = hdrs; - } + const opts = yield* makePublishOptions(properties); yield* Effect.tryPromise({ try: () => js.publish(subject, data, opts), @@ -204,8 +213,11 @@ function makeNatsConsumer( if (!isNatsMessage(message)) { return yield* pubSubError(`acknowledge:${subject}`, "Message was not produced by NATS backend"); } - yield* Effect.sync(() => { - message._jsMsg.ack(); + yield* Effect.try({ + try: () => { + message._jsMsg.ack(); + }, + catch: (error) => pubSubError(`acknowledge:${subject}`, error), }); }), ), @@ -218,8 +230,11 @@ function makeNatsConsumer( "Message was not produced by NATS backend", ); } - yield* Effect.sync(() => { - message._jsMsg.nak(); + yield* Effect.try({ + try: () => { + message._jsMsg.nak(); + }, + catch: (error) => pubSubError(`negative-acknowledge:${subject}`, error), }); }), ),