trustgraph/ts/packages/base/src/spec/request-response-spec.ts

93 lines
3 KiB
TypeScript
Raw Normal View History

/**
* Request/response specification declares a request/response client for a flow.
*
* Enables FlowProcessor handlers to make request/response calls to other services
* (e.g., calling the prompt service or LLM from within a knowledge extraction handler).
*
* Python reference: trustgraph-base/trustgraph/base/prompt_client_spec.py
*/
2026-05-12 08:06:58 -05:00
import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
2026-05-12 08:06:58 -05:00
import {
2026-06-02 03:23:23 -05:00
flowResourceNotFoundError,
type FlowResourceNotFoundError,
} from "../errors.js";
import {
type EffectRequestResponse,
2026-05-12 08:06:58 -05:00
RequestResponseFactory,
} from "../messaging/runtime.js";
2026-06-01 20:26:47 -05:00
declare const RequestResponseSpecType: unique symbol;
2026-06-01 20:26:47 -05:00
export interface RequestResponseSpec<TReq, TRes> extends Spec {
readonly [RequestResponseSpecType]?: {
readonly request: TReq;
readonly response: TRes;
};
2026-06-02 03:23:23 -05:00
readonly requestorEffect: <Requirements = never>(
flow: Flow<Requirements>,
) => Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError>;
2026-06-01 20:26:47 -05:00
}
2026-06-01 20:26:47 -05:00
export function makeRequestResponseSpec<TReq, TRes>(
name: string,
requestTopicName: string,
responseTopicName: string,
): RequestResponseSpec<TReq, TRes> {
2026-06-02 03:23:23 -05:00
const requestors = new WeakMap<object, EffectRequestResponse<TReq, TRes>>();
const registerRequestor = <Requirements>(
flow: Flow<Requirements>,
requestor: EffectRequestResponse<TReq, TRes>,
) =>
Effect.sync(() => {
requestors.set(flow, requestor);
});
const unregisterRequestor = <Requirements>(
flow: Flow<Requirements>,
requestor: EffectRequestResponse<TReq, TRes>,
) =>
Effect.sync(() => {
if (requestors.get(flow) === requestor) {
requestors.delete(flow);
}
});
const requestorEffect = <Requirements>(
flow: Flow<Requirements>,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> => {
const requestor = requestors.get(flow);
return requestor === undefined
? Effect.fail(flowResourceNotFoundError(flow.name, "requestor", name))
: Effect.succeed(requestor);
};
2026-06-01 20:26:47 -05:00
const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* (
flow: Flow,
definition: FlowDefinition,
) {
const requestTopic = definition.topics?.[requestTopicName] ?? requestTopicName;
const responseTopic = definition.topics?.[responseTopicName] ?? responseTopicName;
2026-05-12 08:06:58 -05:00
const factory = yield* RequestResponseFactory;
const requestor = yield* factory.make<TReq, TRes>({
requestTopic,
responseTopic,
2026-06-01 20:26:47 -05:00
subscription: `${flow.processorId}-${flow.name}-${name}`,
2026-05-12 08:06:58 -05:00
});
2026-06-02 00:22:04 -05:00
flow.registerRequestor(name, requestor);
2026-06-02 03:23:23 -05:00
yield* registerRequestor(flow, requestor);
yield* Effect.addFinalizer(() => unregisterRequestor(flow, requestor));
2026-06-01 20:26:47 -05:00
});
2026-06-01 20:26:47 -05:00
return {
name,
2026-06-02 03:23:23 -05:00
requestorEffect,
2026-06-01 20:26:47 -05:00
addEffect,
2026-06-02 00:22:04 -05:00
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
2026-06-01 20:26:47 -05:00
};
}