Migrate request-response facade to Effect runtime

This commit is contained in:
elpresidank 2026-06-01 22:11:03 -05:00
parent 7f81c56c80
commit a0d2575273
5 changed files with 306 additions and 47 deletions

View file

@ -0,0 +1,182 @@
import { describe, expect, it } from "vitest";
import {
makeRequestResponse,
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
type CreateProducerOptions,
type Message,
type PubSubBackend,
} from "../index.js";
function createMessage<T>(value: T, properties: Record<string, string> = {}): Message<T> {
return {
value: () => value,
properties: () => properties,
};
}
class RecordingProducer<T> implements BackendProducer<T> {
readonly sent: Array<{ readonly message: T; readonly properties?: Record<string, string> }> = [];
closeCount = 0;
flushCount = 0;
constructor(private readonly onSend?: (message: T, properties?: Record<string, string>) => void) {}
async send(message: T, properties?: Record<string, string>): Promise<void> {
this.sent.push(properties === undefined ? { message } : { message, properties });
this.onSend?.(message, properties);
}
async flush(): Promise<void> {
this.flushCount += 1;
}
async close(): Promise<void> {
this.closeCount += 1;
}
}
class WaitingConsumer<T> implements BackendConsumer<T> {
readonly acknowledged: Array<Message<T>> = [];
readonly nacked: Array<Message<T>> = [];
closeCount = 0;
private readonly messages: Array<Message<T>> = [];
private readonly waiters: Array<(message: Message<T> | null) => void> = [];
private closed = false;
push(message: Message<T>): void {
const waiter = this.waiters.shift();
if (waiter !== undefined) {
waiter(message);
return;
}
this.messages.push(message);
}
async receive(): Promise<Message<T> | null> {
const message = this.messages.shift();
if (message !== undefined || this.closed) return message ?? null;
return await new Promise((resolve) => {
this.waiters.push(resolve);
});
}
async acknowledge(message: Message<T>): Promise<void> {
this.acknowledged.push(message);
}
async negativeAcknowledge(message: Message<T>): Promise<void> {
this.nacked.push(message);
}
async unsubscribe(): Promise<void> {}
async close(): Promise<void> {
this.closed = true;
for (const waiter of this.waiters.splice(0)) {
waiter(null);
}
this.closeCount += 1;
}
}
class RuntimeBackend implements PubSubBackend {
closeCount = 0;
producerOptions: CreateProducerOptions | null = null;
consumerOptions: CreateConsumerOptions | null = null;
readonly producer: RecordingProducer<unknown>;
constructor(
private readonly consumer: BackendConsumer<unknown>,
onSend?: (message: unknown, properties?: Record<string, string>) => void,
) {
this.producer = new RecordingProducer<unknown>(onSend);
}
async createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>> {
this.producerOptions = options;
return this.producer as BackendProducer<T>;
}
async createConsumer<T>(options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
this.consumerOptions = options;
return this.consumer as BackendConsumer<T>;
}
async close(): Promise<void> {
this.closeCount += 1;
}
}
describe("RequestResponse compatibility facade", () => {
it("routes requests through the Effect-native request-response runtime", async () => {
const consumer = new WaitingConsumer<string>();
const backend = new RuntimeBackend(
consumer as BackendConsumer<unknown>,
(_message, properties) => {
consumer.push(createMessage("response", { id: properties?.id ?? "" }));
},
);
const requestor = makeRequestResponse<string, string>({
pubsub: backend,
requestTopic: "request-topic",
responseTopic: "response-topic",
subscription: "sub",
});
await requestor.start();
const response = await requestor.request("request", { timeoutMs: 250 });
await requestor.stop();
expect(response).toBe("response");
expect(backend.producerOptions).toEqual({ topic: "request-topic" });
expect(backend.consumerOptions).toEqual({ topic: "response-topic", subscription: "sub" });
expect(backend.producer.sent[0]?.message).toBe("request");
expect(consumer.acknowledged.length).toBe(1);
expect(backend.producer.closeCount).toBe(1);
expect(consumer.closeCount).toBe(1);
});
it("rejects with a tagged timeout error instead of a normal Error", async () => {
const consumer = new WaitingConsumer<string>();
const backend = new RuntimeBackend(consumer as BackendConsumer<unknown>);
const requestor = makeRequestResponse<string, string>({
pubsub: backend,
requestTopic: "request-topic",
responseTopic: "response-topic",
subscription: "sub",
});
await requestor.start();
const error = await requestor.request("request", { timeoutMs: 5 }).catch((caught: unknown) => caught);
await requestor.stop();
expect(error).toMatchObject({
_tag: "MessagingTimeoutError",
operation: "request-response",
timeoutMs: 5,
});
});
it("rejects with a tagged lifecycle error when requested before start", async () => {
const consumer = new WaitingConsumer<string>();
const backend = new RuntimeBackend(consumer as BackendConsumer<unknown>);
const requestor = makeRequestResponse<string, string>({
pubsub: backend,
requestTopic: "request-topic",
responseTopic: "response-topic",
subscription: "sub",
});
const error = await requestor.request("request").catch((caught: unknown) => caught);
expect(error).toMatchObject({
_tag: "MessagingLifecycleError",
operation: "request",
resource: "request-topic:response-topic",
});
});
});

View file

@ -7,10 +7,12 @@
* Python reference: trustgraph-base/trustgraph/base/request_response_spec.py
*/
import { randomUUID } from "node:crypto";
import { makeProducer, type Producer } from "./producer.js";
import { makeSubscriber, type Subscriber } from "./subscriber.js";
import { Effect, Exit, Scope } from "effect";
import type { PubSubBackend } from "../backend/types.js";
import { PubSub } from "../backend/pubsub.js";
import { messagingDeliveryError, messagingLifecycleError } from "../errors.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import { makeEffectRequestResponseFromPubSub, type EffectRequestResponse } from "./runtime.js";
export interface RequestResponseOptions {
pubsub: PubSubBackend;
@ -31,24 +33,48 @@ export interface RequestResponse<TReq, TRes> {
) => Promise<TRes>;
}
interface RequestResponseRuntime<TReq, TRes> {
readonly scope: Scope.Closeable;
readonly requestor: EffectRequestResponse<TReq, TRes>;
}
export function makeRequestResponse<TReq, TRes>(
options: RequestResponseOptions,
): RequestResponse<TReq, TRes> {
const producer: Producer<TReq> = makeProducer<TReq>(options.pubsub, options.requestTopic);
const subscriber: Subscriber<TRes> = makeSubscriber<TRes>(
options.pubsub,
options.responseTopic,
options.subscription,
);
let runtime: RequestResponseRuntime<TReq, TRes> | null = null;
return {
start: async () => {
await producer.start();
await subscriber.start();
if (runtime !== null) return;
const scope = await Effect.runPromise(Scope.make());
try {
const config = await Effect.runPromise(loadMessagingRuntimeConfig());
const requestor = await Effect.runPromise(
makeEffectRequestResponseFromPubSub<TReq, TRes>(
PubSub.fromBackend(options.pubsub),
config,
{
requestTopic: options.requestTopic,
responseTopic: options.responseTopic,
subscription: options.subscription,
},
).pipe(Scope.provide(scope)),
);
runtime = { scope, requestor };
} catch (error) {
await Effect.runPromise(Scope.close(scope, Exit.fail(error))).catch(() => undefined);
throw error;
}
},
stop: async () => {
await producer.stop();
await subscriber.stop();
const current = runtime;
runtime = null;
if (current === null) return;
await Effect.runPromise(Scope.close(current.scope, Exit.void));
},
/**
* Send a request and wait for responses.
@ -60,35 +86,32 @@ export function makeRequestResponse<TReq, TRes>(
* If omitted, returns the first response.
*/
request: async (request, requestOptions) => {
const id = randomUUID();
const current = runtime;
if (current === null) {
throw messagingLifecycleError(
`${options.requestTopic}:${options.responseTopic}`,
"request",
"RequestResponse not started",
);
}
const timeoutMs = requestOptions?.timeoutMs ?? 300_000;
const recipient = requestOptions?.recipient;
const queue = subscriber.subscribe(id);
try {
await producer.send(id, request);
const deadline = Date.now() + timeoutMs;
while (true) {
const remaining = deadline - Date.now();
if (remaining <= 0) {
throw new Error(`Request timed out after ${timeoutMs}ms`);
}
const response = await queue.pop(remaining);
if (recipient !== undefined) {
const isFinal = await recipient(response);
if (isFinal) return response;
} else {
return response;
}
}
} finally {
subscriber.unsubscribe(id);
}
return await Effect.runPromise(
current.requestor.request(request, {
timeoutMs,
...(recipient === undefined
? {}
: {
recipient: (response) =>
Effect.tryPromise({
try: () => recipient(response),
catch: (error) => messagingDeliveryError(options.responseTopic, "recipient", error),
}),
}),
}),
);
},
};
}

View file

@ -424,7 +424,11 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
config: MessagingRuntimeConfig,
options: EffectRequestResponseOptions,
) {
const producer = yield* makeEffectProducerFromPubSub<TReq>(pubsub, {
const producerOptions: CreateProducerOptions = options.requestSchema === undefined
? { topic: options.requestTopic }
: { topic: options.requestTopic, schema: options.requestSchema };
const producerBackend = yield* pubsub.createProducer<TReq>(producerOptions);
const producer = makeEffectProducerHandle<TReq>(producerBackend, {
topic: options.requestTopic,
...(options.requestSchema === undefined ? {} : { schema: options.requestSchema }),
});
@ -435,9 +439,12 @@ export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestR
};
const backend = yield* pubsub.createConsumer<TRes>(createOptions);
const subscribers = new Map<string, Queue.Queue<TRes>>();
const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkChild);
const fiber = yield* dispatchResponseLoop(backend, options.responseTopic, subscribers, config).pipe(Effect.forkScoped);
let stopped = false;
const stop = Effect.fn(`RequestResponse.stop:${options.requestTopic}`)(function* () {
if (stopped) return;
stopped = true;
yield* Fiber.interrupt(fiber);
yield* producer.close;
yield* closeConsumerBackend(backend, options.responseTopic, options.subscription);