From 46ae1dca828b843b45c5d725f699405f5fee26f1 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 05:50:34 -0500 Subject: [PATCH] Create NATS resources only on missing lookups --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 43 ++++++-- .../base/src/__tests__/nats-backend.test.ts | 88 +++++++++++++++ ts/packages/base/src/backend/nats.ts | 100 ++++++++++++------ 3 files changed, 190 insertions(+), 41 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index d33256c6..b8b082a7 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 NATS typed -boundary slice: +Current signal counts from `ts/packages` after the 2026-06-02 NATS selective +404 slice: | Signal | Count | | --- | ---: | @@ -105,6 +105,11 @@ Notes: path and maps header construction plus `ack()`/`nak()` failures into tagged `PubSubError`s with `Effect.try`. The `receive(` and `JSON.stringify` count increases are from the new mocked NATS backend test, not production code. +- The NATS selective 404 slice replaced catch-all stream/consumer create + fallbacks with an internal `S.TaggedErrorClass` lookup wrapper plus + `Effect.catchIf` recovery only for NATS JetStream missing-resource errors. + Non-missing lookup failures now stay on the typed failure path without + attempting to create streams or durable consumers. - The gateway streaming callback slice added Effect-returning dispatcher streaming methods, switched the RPC stream server off nested `Effect.runPromiseWith(context)` queue offers, and replaced the client @@ -1182,6 +1187,27 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: NATS Selective 404 Slice + +- Status: migrated and root-verified. +- Completed: + - Added an internal `NatsLookupError` tagged error to preserve lookup causes + without leaving `unknown` in Effect error channels. + - Stream creation now happens only when `manager.streams.info()` fails with + a NATS JetStream 404/missing-resource error. + - Durable consumer creation now happens only when `js.consumers.get()` fails + with a NATS JetStream 404/missing-resource error. + - Added mocked NATS tests proving 404 lookups create resources while + permission-style lookup failures do not. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/nats-backend.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1213,10 +1239,9 @@ Notes: runtime. Remaining broker P0 work should focus on native backend/NATS runtime shape and consumer polling, not replacing `PubSubBackend` with `effect/PubSub`. - - NATS header construction and ack/nak operations now map thrown SDK - failures into tagged `PubSubError`s. Remaining NATS work is selective - 404 handling, scoped backend/layer construction, and stream/consumer state - ownership. + - NATS header construction, ack/nak operations, and lookup create-on-missing + behavior now stay typed. Remaining NATS work is scoped backend/layer + construction and stream/consumer state ownership. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -1287,9 +1312,9 @@ Notes: - Treat the producer Promise facade as a completed compatibility wrapper; avoid reopening it unless backend runtime changes require a narrower adapter. - - Keep NATS SDK boundary failures typed; future backend slices should avoid - catch-all create-on-failure behavior and move connection/stream state into - scoped Effect services. + - Keep NATS SDK boundary failures typed and avoid catch-all + create-on-failure behavior. Future backend slices should move + connection/stream state into scoped Effect services. - Tests: - Fake backend ack/nak/backoff/stop tests, NATS close finalizer tests, and config-push stream tests. diff --git a/ts/packages/base/src/__tests__/nats-backend.test.ts b/ts/packages/base/src/__tests__/nats-backend.test.ts index f32d31be..66e3fb29 100644 --- a/ts/packages/base/src/__tests__/nats-backend.test.ts +++ b/ts/packages/base/src/__tests__/nats-backend.test.ts @@ -5,6 +5,27 @@ const natsMock = vi.hoisted(() => { const encoder = new TextEncoder(); const decoder = new TextDecoder(); + class MockNatsError extends Error { + readonly code: string; + private readonly apiCode: number | undefined; + + constructor(code: string, apiCode?: number) { + super(code); + this.name = "NatsError"; + this.code = code; + this.apiCode = apiCode; + } + + jsError() { + return this.apiCode === undefined + ? null + : { + code: this.apiCode, + description: this.code, + }; + } + } + const publish = vi.fn(); const consumersGet = vi.fn(); const consumersAdd = vi.fn(); @@ -28,6 +49,7 @@ const natsMock = vi.hoisted(() => { encoder, headerAppend, headers, + NatsError: MockNatsError, nak, next, publish, @@ -39,14 +61,20 @@ const natsMock = vi.hoisted(() => { vi.mock("nats", () => ({ AckPolicy: { Explicit: "explicit" }, DeliverPolicy: { All: "all", New: "new" }, + ErrorCode: { JetStream404NoMessages: "404" }, StringCodec: () => ({ decode: (input: Uint8Array) => natsMock.decoder.decode(input), encode: (input: string) => natsMock.encoder.encode(input), }), connect: natsMock.connect, headers: natsMock.headers, + NatsError: natsMock.NatsError, })); +function makeNatsError(code: string, apiCode?: number) { + return new natsMock.NatsError(code, apiCode); +} + function resetNatsMock(): void { vi.clearAllMocks(); @@ -88,6 +116,66 @@ describe("NATS backend", () => { resetNatsMock(); }); + it("creates streams only when stream lookup returns a JetStream 404", async () => { + natsMock.streamsInfo.mockRejectedValueOnce(makeNatsError("404", 404)); + const backend = makeNatsBackend("nats://test"); + + await backend.createProducer({ topic: "tg.test.topic" }); + + expect(natsMock.streamsAdd).toHaveBeenCalledWith({ + name: "tg_test", + subjects: ["tg.test.>"], + }); + }); + + it("does not create streams for non-missing lookup failures", async () => { + natsMock.streamsInfo.mockRejectedValueOnce(makeNatsError("PERMISSIONS_VIOLATION")); + const backend = makeNatsBackend("nats://test"); + + const error = await backend.createProducer({ topic: "tg.test.topic" }).catch((caught: unknown) => caught); + + expect(error).toMatchObject({ + _tag: "PubSubError", + operation: "stream-info:tg_test", + }); + expect(natsMock.streamsAdd).not.toHaveBeenCalled(); + }); + + it("creates durable consumers only when consumer lookup returns a JetStream 404", async () => { + natsMock.consumersGet + .mockRejectedValueOnce(makeNatsError("404", 404)) + .mockResolvedValueOnce({ next: natsMock.next }); + const backend = makeNatsBackend("nats://test"); + + await backend.createConsumer({ + topic: "tg.test.topic", + subscription: "worker", + }); + + expect(natsMock.consumersAdd).toHaveBeenCalledWith("tg_test", { + ack_policy: "explicit", + deliver_policy: "new", + durable_name: "worker", + filter_subject: "tg.test.topic", + }); + }); + + it("does not create durable consumers for non-missing lookup failures", async () => { + natsMock.consumersGet.mockRejectedValueOnce(makeNatsError("PERMISSIONS_VIOLATION")); + const backend = makeNatsBackend("nats://test"); + + const error = await backend.createConsumer({ + topic: "tg.test.topic", + subscription: "worker", + }).catch((caught: unknown) => caught); + + expect(error).toMatchObject({ + _tag: "PubSubError", + operation: "init-consumer:tg.test.topic", + }); + expect(natsMock.consumersAdd).not.toHaveBeenCalled(); + }); + it("maps invalid publish headers to tagged PubSubError", async () => { natsMock.headerAppend.mockImplementation(() => { throw "invalid header"; diff --git a/ts/packages/base/src/backend/nats.ts b/ts/packages/base/src/backend/nats.ts index fab55047..857928fc 100644 --- a/ts/packages/base/src/backend/nats.ts +++ b/ts/packages/base/src/backend/nats.ts @@ -10,6 +10,7 @@ import { connect, + ErrorCode, type NatsConnection, type JetStreamClient, type JetStreamManager, @@ -17,6 +18,7 @@ import { headers, type JsMsg, type JetStreamPublishOptions, + NatsError, StringCodec, AckPolicy, DeliverPolicy, @@ -64,6 +66,18 @@ function makeNatsMessage(msg: JsMsg, decoded: T): NatsMessage { const hasJsMsg = Predicate.hasProperty("_jsMsg"); +class NatsLookupError extends S.TaggedErrorClass()( + "NatsLookupError", + { + cause: S.Unknown, + operation: S.String, + }, +) {} + +function natsLookupError(operation: string, cause: unknown): NatsLookupError { + return NatsLookupError.make({ cause, operation }); +} + function isAckableJsMsg(value: unknown): value is Pick { if (!Predicate.isObject(value)) return false; if (!Predicate.hasProperty(value, "ack")) return false; @@ -75,6 +89,22 @@ function isNatsMessage(message: Message): message is NatsMessage { return hasJsMsg(message) && isAckableJsMsg(message._jsMsg); } +function isJetStreamMissingResource(error: unknown): boolean { + if (!(error instanceof NatsError)) { + return false; + } + if (error.code === ErrorCode.JetStream404NoMessages) { + return true; + } + + const jsError = error.jsError(); + return jsError?.code === 404; +} + +function isMissingLookupError(error: NatsLookupError): boolean { + return isJetStreamMissingResource(error.cause); +} + function makeNatsProducer( js: JetStreamClient, subject: string, @@ -148,31 +178,34 @@ function makeNatsConsumer( Effect.gen(function* () { const existing = yield* Effect.tryPromise({ try: () => js.consumers.get(streamName, subscription), - catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error), + catch: (error) => natsLookupError(`get-consumer:${streamName}:${subscription}`, error), }).pipe( - Effect.catch(() => - Effect.gen(function* () { - const deliverPolicy = - initialPosition === "earliest" - ? DeliverPolicy.All - : DeliverPolicy.New; + Effect.catchIf( + isMissingLookupError, + () => + Effect.gen(function* () { + const deliverPolicy = + initialPosition === "earliest" + ? DeliverPolicy.All + : DeliverPolicy.New; - yield* Effect.tryPromise({ - try: () => - jsm.consumers.add(streamName, { - durable_name: subscription, - ack_policy: AckPolicy.Explicit, - deliver_policy: deliverPolicy, - filter_subject: subject, - }), - catch: (error) => pubSubError(`add-consumer:${streamName}:${subscription}`, error), - }); + yield* Effect.tryPromise({ + try: () => + jsm.consumers.add(streamName, { + durable_name: subscription, + ack_policy: AckPolicy.Explicit, + deliver_policy: deliverPolicy, + filter_subject: subject, + }), + catch: (error) => pubSubError(`add-consumer:${streamName}:${subscription}`, error), + }); - return yield* Effect.tryPromise({ - try: () => js.consumers.get(streamName, subscription), - catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error), - }); - }), + return yield* Effect.tryPromise({ + try: () => js.consumers.get(streamName, subscription), + catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error), + }); + }), + (error) => Effect.fail(pubSubError(error.operation, error.cause)), ), ); consumer = existing; @@ -289,17 +322,20 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend { yield* Effect.tryPromise({ try: () => manager.streams.info(streamName), - catch: (error) => pubSubError(`stream-info:${streamName}`, error), + catch: (error) => natsLookupError(`stream-info:${streamName}`, error), }).pipe( - Effect.catch(() => - Effect.tryPromise({ - try: () => - manager.streams.add({ - name: streamName, - subjects: [wildcardSubject], - }), - catch: (error) => pubSubError(`stream-add:${streamName}`, error), - }), + Effect.catchIf( + isMissingLookupError, + () => + Effect.tryPromise({ + try: () => + manager.streams.add({ + name: streamName, + subjects: [wildcardSubject], + }), + catch: (error) => pubSubError(`stream-add:${streamName}`, error), + }), + (error) => Effect.fail(pubSubError(error.operation, error.cause)), ), ); initializedStreams.add(streamName);