diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index cbb6ebb8..9bd8feb6 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1963,6 +1963,26 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: NATS MutableHashSet Stream Cache Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/base/src/backend/nats.ts` now tracks initialized stream names + in `MutableHashSet` instead of a native `Set`. + - `ensureStream` uses `MutableHashSet.has` and `MutableHashSet.add` while + preserving the NATS broker adapter boundary and the selective 404 + create-on-missing behavior. + - `ts/packages/base/src/__tests__/nats-backend.test.ts` now proves repeated + producer/consumer creation for subjects in the same stream only performs + one stream lookup. +- Verification: + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/nats-backend.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2007,6 +2027,9 @@ Notes: - NATS header construction, ack/nak operations, and lookup create-on-missing behavior now stay typed. `PubSubBackend` remains an intentional broker adapter contract rather than a direct `effect/PubSub` replacement target. + - NATS initialized stream cache now uses Effect `MutableHashSet`; keep the + rest of the NATS SDK connection/JetStream manager state as an external + broker adapter boundary unless moving the whole backend lifecycle. - Consumer rate-limit retry timeout behavior is now wired in both legacy and Effect-native consumer paths. Effect-native consumer concurrency now owns one backend consumer per worker, and request-response pending shutdown now diff --git a/ts/packages/base/src/__tests__/nats-backend.test.ts b/ts/packages/base/src/__tests__/nats-backend.test.ts index 66e3fb29..75c0a007 100644 --- a/ts/packages/base/src/__tests__/nats-backend.test.ts +++ b/ts/packages/base/src/__tests__/nats-backend.test.ts @@ -128,6 +128,19 @@ describe("NATS backend", () => { }); }); + it("caches initialized streams through the Effect mutable set", async () => { + const backend = makeNatsBackend("nats://test"); + + await backend.createProducer({ topic: "tg.test.topic" }); + await backend.createConsumer({ + topic: "tg.test.other", + subscription: "worker", + }); + + expect(natsMock.streamsInfo).toHaveBeenCalledTimes(1); + expect(natsMock.streamsInfo).toHaveBeenCalledWith("tg_test"); + }); + it("does not create streams for non-missing lookup failures", async () => { natsMock.streamsInfo.mockRejectedValueOnce(makeNatsError("PERMISSIONS_VIOLATION")); const backend = makeNatsBackend("nats://test"); diff --git a/ts/packages/base/src/backend/nats.ts b/ts/packages/base/src/backend/nats.ts index ffc8dee6..efebd177 100644 --- a/ts/packages/base/src/backend/nats.ts +++ b/ts/packages/base/src/backend/nats.ts @@ -24,6 +24,7 @@ import { DeliverPolicy, } from "nats"; import { Effect } from "effect"; +import * as MutableHashSet from "effect/MutableHashSet"; import * as P from "effect/Predicate"; import * as S from "effect/Schema"; @@ -288,7 +289,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend { let connection: NatsConnection | null = null; let js: JetStreamClient | null = null; let jsm: JetStreamManager | null = null; - const initializedStreams = new Set(); + const initializedStreams = MutableHashSet.empty(); const ensureConnected = Effect.fn("NatsBackend.ensureConnected")(function* () { if (connection === null) { @@ -313,7 +314,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend { const parts = subject.split("."); const streamName = parts.slice(0, 2).join("_"); - if (initializedStreams.has(streamName)) return streamName; + if (MutableHashSet.has(initializedStreams, streamName)) return streamName; const wildcardSubject = `${parts.slice(0, 2).join(".")}.>`; @@ -338,7 +339,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend { (error) => Effect.fail(pubSubError(error.operation, error.cause)), ), ); - initializedStreams.add(streamName); + MutableHashSet.add(initializedStreams, streamName); return streamName; });