mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-01 09:29:38 +02:00
Use MutableHashSet for NATS stream cache
This commit is contained in:
parent
6ba4cf3b32
commit
451c6dbc58
3 changed files with 40 additions and 3 deletions
|
|
@ -1963,6 +1963,26 @@ Notes:
|
||||||
- `cd ts && bun run lint`
|
- `cd ts && bun run lint`
|
||||||
- `git diff --check`
|
- `git diff --check`
|
||||||
|
|
||||||
|
### 2026-06-04: NATS MutableHashSet Stream Cache Slice
|
||||||
|
|
||||||
|
- Status: migrated and package-verified.
|
||||||
|
- Completed:
|
||||||
|
- `ts/packages/base/src/backend/nats.ts` now tracks initialized stream names
|
||||||
|
in `MutableHashSet<string>` instead of a native `Set`.
|
||||||
|
- `ensureStream` uses `MutableHashSet.has` and `MutableHashSet.add` while
|
||||||
|
preserving the NATS broker adapter boundary and the selective 404
|
||||||
|
create-on-missing behavior.
|
||||||
|
- `ts/packages/base/src/__tests__/nats-backend.test.ts` now proves repeated
|
||||||
|
producer/consumer creation for subjects in the same stream only performs
|
||||||
|
one stream lookup.
|
||||||
|
- Verification:
|
||||||
|
- `cd ts/packages/base && bunx --bun vitest run src/__tests__/nats-backend.test.ts`
|
||||||
|
- `cd ts && bun run check:tsgo`
|
||||||
|
- `cd ts && bun run build`
|
||||||
|
- `cd ts && bun run test`
|
||||||
|
- `cd ts && bun run lint`
|
||||||
|
- `git diff --check`
|
||||||
|
|
||||||
## Subagent Findings To Preserve
|
## Subagent Findings To Preserve
|
||||||
|
|
||||||
- MCP/workbench:
|
- MCP/workbench:
|
||||||
|
|
@ -2007,6 +2027,9 @@ Notes:
|
||||||
- NATS header construction, ack/nak operations, and lookup create-on-missing
|
- NATS header construction, ack/nak operations, and lookup create-on-missing
|
||||||
behavior now stay typed. `PubSubBackend` remains an intentional broker
|
behavior now stay typed. `PubSubBackend` remains an intentional broker
|
||||||
adapter contract rather than a direct `effect/PubSub` replacement target.
|
adapter contract rather than a direct `effect/PubSub` replacement target.
|
||||||
|
- NATS initialized stream cache now uses Effect `MutableHashSet`; keep the
|
||||||
|
rest of the NATS SDK connection/JetStream manager state as an external
|
||||||
|
broker adapter boundary unless moving the whole backend lifecycle.
|
||||||
- Consumer rate-limit retry timeout behavior is now wired in both legacy and
|
- Consumer rate-limit retry timeout behavior is now wired in both legacy and
|
||||||
Effect-native consumer paths. Effect-native consumer concurrency now owns
|
Effect-native consumer paths. Effect-native consumer concurrency now owns
|
||||||
one backend consumer per worker, and request-response pending shutdown now
|
one backend consumer per worker, and request-response pending shutdown now
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,19 @@ describe("NATS backend", () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("caches initialized streams through the Effect mutable set", async () => {
|
||||||
|
const backend = makeNatsBackend("nats://test");
|
||||||
|
|
||||||
|
await backend.createProducer<string>({ topic: "tg.test.topic" });
|
||||||
|
await backend.createConsumer<string>({
|
||||||
|
topic: "tg.test.other",
|
||||||
|
subscription: "worker",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(natsMock.streamsInfo).toHaveBeenCalledTimes(1);
|
||||||
|
expect(natsMock.streamsInfo).toHaveBeenCalledWith("tg_test");
|
||||||
|
});
|
||||||
|
|
||||||
it("does not create streams for non-missing lookup failures", async () => {
|
it("does not create streams for non-missing lookup failures", async () => {
|
||||||
natsMock.streamsInfo.mockRejectedValueOnce(makeNatsError("PERMISSIONS_VIOLATION"));
|
natsMock.streamsInfo.mockRejectedValueOnce(makeNatsError("PERMISSIONS_VIOLATION"));
|
||||||
const backend = makeNatsBackend("nats://test");
|
const backend = makeNatsBackend("nats://test");
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import {
|
||||||
DeliverPolicy,
|
DeliverPolicy,
|
||||||
} from "nats";
|
} from "nats";
|
||||||
import { Effect } from "effect";
|
import { Effect } from "effect";
|
||||||
|
import * as MutableHashSet from "effect/MutableHashSet";
|
||||||
import * as P from "effect/Predicate";
|
import * as P from "effect/Predicate";
|
||||||
import * as S from "effect/Schema";
|
import * as S from "effect/Schema";
|
||||||
|
|
||||||
|
|
@ -288,7 +289,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
|
||||||
let connection: NatsConnection | null = null;
|
let connection: NatsConnection | null = null;
|
||||||
let js: JetStreamClient | null = null;
|
let js: JetStreamClient | null = null;
|
||||||
let jsm: JetStreamManager | null = null;
|
let jsm: JetStreamManager | null = null;
|
||||||
const initializedStreams = new Set<string>();
|
const initializedStreams = MutableHashSet.empty<string>();
|
||||||
|
|
||||||
const ensureConnected = Effect.fn("NatsBackend.ensureConnected")(function* () {
|
const ensureConnected = Effect.fn("NatsBackend.ensureConnected")(function* () {
|
||||||
if (connection === null) {
|
if (connection === null) {
|
||||||
|
|
@ -313,7 +314,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
|
||||||
const parts = subject.split(".");
|
const parts = subject.split(".");
|
||||||
const streamName = parts.slice(0, 2).join("_");
|
const streamName = parts.slice(0, 2).join("_");
|
||||||
|
|
||||||
if (initializedStreams.has(streamName)) return streamName;
|
if (MutableHashSet.has(initializedStreams, streamName)) return streamName;
|
||||||
|
|
||||||
const wildcardSubject = `${parts.slice(0, 2).join(".")}.>`;
|
const wildcardSubject = `${parts.slice(0, 2).join(".")}.>`;
|
||||||
|
|
||||||
|
|
@ -338,7 +339,7 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
|
||||||
(error) => Effect.fail(pubSubError(error.operation, error.cause)),
|
(error) => Effect.fail(pubSubError(error.operation, error.cause)),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
initializedStreams.add(streamName);
|
MutableHashSet.add(initializedStreams, streamName);
|
||||||
return streamName;
|
return streamName;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue