Create NATS resources only on missing lookups

This commit is contained in:
elpresidank 2026-06-02 05:50:34 -05:00
parent 00a26b7deb
commit 46ae1dca82
3 changed files with 190 additions and 41 deletions

View file

@ -10,6 +10,7 @@
import {
connect,
ErrorCode,
type NatsConnection,
type JetStreamClient,
type JetStreamManager,
@ -17,6 +18,7 @@ import {
headers,
type JsMsg,
type JetStreamPublishOptions,
NatsError,
StringCodec,
AckPolicy,
DeliverPolicy,
@ -64,6 +66,18 @@ function makeNatsMessage<T>(msg: JsMsg, decoded: T): NatsMessage<T> {
const hasJsMsg = Predicate.hasProperty("_jsMsg");
class NatsLookupError extends S.TaggedErrorClass<NatsLookupError>()(
"NatsLookupError",
{
cause: S.Unknown,
operation: S.String,
},
) {}
function natsLookupError(operation: string, cause: unknown): NatsLookupError {
return NatsLookupError.make({ cause, operation });
}
function isAckableJsMsg(value: unknown): value is Pick<JsMsg, "ack" | "nak"> {
if (!Predicate.isObject(value)) return false;
if (!Predicate.hasProperty(value, "ack")) return false;
@ -75,6 +89,22 @@ function isNatsMessage<T>(message: Message<T>): message is NatsMessage<T> {
return hasJsMsg(message) && isAckableJsMsg(message._jsMsg);
}
function isJetStreamMissingResource(error: unknown): boolean {
if (!(error instanceof NatsError)) {
return false;
}
if (error.code === ErrorCode.JetStream404NoMessages) {
return true;
}
const jsError = error.jsError();
return jsError?.code === 404;
}
function isMissingLookupError(error: NatsLookupError): boolean {
return isJetStreamMissingResource(error.cause);
}
function makeNatsProducer<T>(
js: JetStreamClient,
subject: string,
@ -148,31 +178,34 @@ function makeNatsConsumer<T>(
Effect.gen(function* () {
const existing = yield* Effect.tryPromise({
try: () => js.consumers.get(streamName, subscription),
catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error),
catch: (error) => natsLookupError(`get-consumer:${streamName}:${subscription}`, error),
}).pipe(
Effect.catch(() =>
Effect.gen(function* () {
const deliverPolicy =
initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
Effect.catchIf(
isMissingLookupError,
() =>
Effect.gen(function* () {
const deliverPolicy =
initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
yield* Effect.tryPromise({
try: () =>
jsm.consumers.add(streamName, {
durable_name: subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: subject,
}),
catch: (error) => pubSubError(`add-consumer:${streamName}:${subscription}`, error),
});
yield* Effect.tryPromise({
try: () =>
jsm.consumers.add(streamName, {
durable_name: subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: subject,
}),
catch: (error) => pubSubError(`add-consumer:${streamName}:${subscription}`, error),
});
return yield* Effect.tryPromise({
try: () => js.consumers.get(streamName, subscription),
catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error),
});
}),
return yield* Effect.tryPromise({
try: () => js.consumers.get(streamName, subscription),
catch: (error) => pubSubError(`get-consumer:${streamName}:${subscription}`, error),
});
}),
(error) => Effect.fail(pubSubError(error.operation, error.cause)),
),
);
consumer = existing;
@ -289,17 +322,20 @@ export function makeNatsBackend(url = "nats://localhost:4222"): PubSubBackend {
yield* Effect.tryPromise({
try: () => manager.streams.info(streamName),
catch: (error) => pubSubError(`stream-info:${streamName}`, error),
catch: (error) => natsLookupError(`stream-info:${streamName}`, error),
}).pipe(
Effect.catch(() =>
Effect.tryPromise({
try: () =>
manager.streams.add({
name: streamName,
subjects: [wildcardSubject],
}),
catch: (error) => pubSubError(`stream-add:${streamName}`, error),
}),
Effect.catchIf(
isMissingLookupError,
() =>
Effect.tryPromise({
try: () =>
manager.streams.add({
name: streamName,
subjects: [wildcardSubject],
}),
catch: (error) => pubSubError(`stream-add:${streamName}`, error),
}),
(error) => Effect.fail(pubSubError(error.operation, error.cause)),
),
);
initializedStreams.add(streamName);