This commit is contained in:
elpresidank 2026-04-05 22:44:45 -05:00
parent c386f68743
commit b6536eca38
100 changed files with 17680 additions and 377 deletions

View file

@ -13,10 +13,11 @@ import {
type NatsConnection,
type JetStreamClient,
type JetStreamManager,
type ConsumerMessages,
type Consumer as NatsJsConsumer,
type JsMsg,
StringCodec,
AckPolicy,
DeliverPolicy,
} from "nats";
import type {
@ -31,17 +32,22 @@ import type {
const sc = StringCodec();
class NatsMessage<T> implements Message<T> {
/** Exposed so acknowledge/negativeAcknowledge can access the raw JsMsg */
readonly _jsMsg: JsMsg;
constructor(
private readonly msg: JsMsg,
msg: JsMsg,
private readonly decoded: T,
) {}
) {
this._jsMsg = msg;
}
value(): T {
return this.decoded;
}
properties(): Record<string, string> {
const headers = this.msg.headers;
const headers = this._jsMsg.headers;
const props: Record<string, string> = {};
if (headers) {
for (const [key, values] of headers) {
@ -84,7 +90,7 @@ class NatsProducer<T> implements BackendProducer<T> {
}
class NatsConsumer<T> implements BackendConsumer<T> {
private messages: ConsumerMessages | null = null;
private consumer: NatsJsConsumer | null = null;
constructor(
private readonly js: JetStreamClient,
@ -106,43 +112,57 @@ class NatsConsumer<T> implements BackendConsumer<T> {
});
}
// Create or bind to durable consumer
const consumer = await this.js.consumers.get(streamName, this.subscription);
this.messages = await consumer.consume();
// Create or bind to durable consumer.
// Try to get an existing durable consumer first; if it doesn't exist, create it.
try {
this.consumer = await this.js.consumers.get(streamName, this.subscription);
} catch {
const deliverPolicy =
this.initialPosition === "earliest"
? DeliverPolicy.All
: DeliverPolicy.New;
await this.jsm.consumers.add(streamName, {
durable_name: this.subscription,
ack_policy: AckPolicy.Explicit,
deliver_policy: deliverPolicy,
filter_subject: this.subject,
});
this.consumer = await this.js.consumers.get(streamName, this.subscription);
}
}
async receive(timeoutMs = 2000): Promise<Message<T> | null> {
if (!this.messages) throw new Error("Consumer not initialized");
if (!this.consumer) throw new Error("Consumer not initialized");
const deadline = Date.now() + timeoutMs;
for await (const msg of this.messages) {
const decoded = JSON.parse(sc.decode(msg.data)) as T;
return new NatsMessage(msg, decoded);
}
// Pull a single message with a timeout using the pull-based API.
// consumer.next() returns a JsMsg or null when the timeout expires.
const msg = await this.consumer.next({ expires: timeoutMs });
if (!msg) return null;
if (Date.now() >= deadline) return null;
return null;
const decoded = JSON.parse(sc.decode(msg.data)) as T;
return new NatsMessage(msg, decoded);
}
async acknowledge(message: Message<T>): Promise<void> {
const natsMsg = message as NatsMessage<T>;
// Access internal JsMsg for ack — in practice we'd store the ref
// This is a simplified version; real impl tracks msg refs
void natsMsg;
natsMsg._jsMsg.ack();
}
async negativeAcknowledge(message: Message<T>): Promise<void> {
void message;
const natsMsg = message as NatsMessage<T>;
natsMsg._jsMsg.nak();
}
async unsubscribe(): Promise<void> {
// Drain and close consumer
// The pull-based consumer does not have a persistent subscription to drain.
// Clearing the reference is sufficient; the durable consumer persists server-side.
this.consumer = null;
}
async close(): Promise<void> {
if (this.messages) {
this.messages.stop();
}
this.consumer = null;
}
private streamNameFromSubject(subject: string): string {