Add typed flow spec accessors

This commit is contained in:
elpresidank 2026-06-02 03:23:23 -05:00
parent abb6f3aed0
commit 44110c5bb4
19 changed files with 457 additions and 223 deletions

View file

@ -153,12 +153,14 @@ describe("Effect-native flow specifications", () => {
"starts producer specs through Effect factories and exposes typed accessors",
Effect.fnUntraced(function* () {
const backend = new RuntimeBackend(new ScriptedConsumer<unknown>());
const outputProducerSpec = makeProducerSpec<string>("output");
const duplicateOutputProducerSpec = makeProducerSpec<string>("output");
const flow = new Flow(
"default",
"processor",
backend,
{ topics: { output: "actual-output" } },
[makeProducerSpec<string>("output")],
[outputProducerSpec],
);
yield* Effect.scoped(
@ -166,17 +168,21 @@ describe("Effect-native flow specifications", () => {
backend,
Effect.gen(function* () {
yield* flow.startEffect();
const producer = yield* flow.producerEffect<string>("output");
const producer = yield* flow.producerEffect(outputProducerSpec);
const duplicateSpecError = yield* flow.producerEffect(duplicateOutputProducerSpec).pipe(Effect.flip);
expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError");
yield* producer.send("request-1", "hello");
}),
),
);
const closedProducerError = yield* flow.producerEffect(outputProducerSpec).pipe(Effect.flip);
expect(backend.producerOptions).toEqual({ topic: "actual-output" });
expect(backend.producer.sent).toEqual([
{ message: "hello", properties: { id: "request-1" } },
]);
expect(backend.producer.closeCount).toBe(1);
expect(closedProducerError._tag).toBe("FlowResourceNotFoundError");
}),
);
@ -229,6 +235,8 @@ describe("Effect-native flow specifications", () => {
responseConsumer.push(createMessage("response", { id: properties?.id ?? "" }));
},
);
const requestResponseSpec = makeRequestResponseSpec<string, string>("rr", "request", "response");
const duplicateRequestResponseSpec = makeRequestResponseSpec<string, string>("rr", "request", "response");
const flow = new Flow(
"default",
"processor",
@ -239,7 +247,7 @@ describe("Effect-native flow specifications", () => {
response: "actual-response",
},
},
[makeRequestResponseSpec<string, string>("rr", "request", "response")],
[requestResponseSpec],
);
const response = yield* Effect.scoped(
@ -247,7 +255,9 @@ describe("Effect-native flow specifications", () => {
backend,
Effect.gen(function* () {
yield* flow.startEffect();
const requestor = flow.requestor<string, string>("rr");
const duplicateSpecError = yield* flow.requestorEffect(duplicateRequestResponseSpec).pipe(Effect.flip);
expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError");
const requestor = flow.requestor(requestResponseSpec);
const fiber = yield* Effect.promise(() =>
requestor.request("request", { timeoutMs: 250 }),
).pipe(Effect.forkChild);
@ -256,10 +266,12 @@ describe("Effect-native flow specifications", () => {
}),
),
);
const closedRequestorError = yield* flow.requestorEffect(requestResponseSpec).pipe(Effect.flip);
expect(response).toBe("response");
expect(backend.producerOptions).toEqual({ topic: "actual-request" });
expect(responseConsumer.acknowledged.length).toBe(1);
expect(closedRequestorError._tag).toBe("FlowResourceNotFoundError");
}),
);
@ -282,7 +294,7 @@ describe("Effect-native flow specifications", () => {
backend,
Effect.gen(function* () {
yield* flow.startEffect();
const producerError = yield* flow.producerEffect<string>("missing-producer").pipe(Effect.flip);
const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip);
const parameter = yield* flow.parameterEffect(presentParameter);
const legacyParameter = yield* flow.parameterEffect("present");
const parameterError = yield* flow.parameterEffect("missing-parameter").pipe(Effect.flip);

View file

@ -30,6 +30,8 @@ import {
} from "../messaging/runtime.js";
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
import type { ParameterSpec } from "../spec/parameter-spec.js";
import type { ProducerSpec } from "../spec/producer-spec.js";
import type { RequestResponseSpec } from "../spec/request-response-spec.js";
import type { Spec, SpecRuntimeRequirements } from "../spec/types.js";
export interface FlowDefinition {
@ -131,6 +133,93 @@ export function makeFlow<Requirements = never>(
throw flowParameterDecodeError(name, spec.name, "Parameter value does not match schema");
};
const getProducerEffect = (
producerName: string,
): Effect.Effect<EffectProducer<never>, FlowResourceNotFoundError> => {
const producer = producers.get(producerName);
return producer === undefined
? Effect.fail(flowResourceNotFoundError(name, "producer", producerName))
: Effect.succeed(producer);
};
const getProducer = (producerName: string): EffectProducer<never> => {
const producer = producers.get(producerName);
if (producer === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
return producer;
};
const getRequestorEffect = (
requestorName: string,
): Effect.Effect<EffectRequestResponse<never, unknown>, FlowResourceNotFoundError> => {
const requestor = requestors.get(requestorName);
return requestor === undefined
? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName))
: Effect.succeed(requestor);
};
const getRequestor = (
requestorName: string,
): EffectRequestResponse<never, unknown> => {
const requestor = requestors.get(requestorName);
if (requestor === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
return requestor;
};
const toFlowProducer = <T>(producer: EffectProducer<T>): FlowProducer<T> => ({
send: (id, message) => compatibilityRuntime.runPromise(producer.send(id, message)),
flush: () => compatibilityRuntime.runPromise(producer.flush),
stop: () => compatibilityRuntime.runPromise(producer.flush.pipe(Effect.flatMap(() => producer.close))),
});
const toFlowRequestor = <TReq, TRes>(
requestor: EffectRequestResponse<TReq, TRes>,
): FlowRequestor<TReq, TRes> => ({
request: (request, options) =>
compatibilityRuntime.runPromise(
requestor.request(
request,
toEffectRequestOptions(options),
),
),
stop: () => compatibilityRuntime.runPromise(requestor.stop),
});
function producerEffect<T>(
producerSpec: ProducerSpec<T>,
): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError>;
function producerEffect(
producerName: string,
): Effect.Effect<EffectProducer<never>, FlowResourceNotFoundError>;
function producerEffect<T>(
producer: string | ProducerSpec<T>,
) {
if (typeof producer === "string") {
return getProducerEffect(producer);
}
if (!producers.has(producer.name)) {
return Effect.fail(flowResourceNotFoundError(name, "producer", producer.name));
}
return producer.producerEffect(flow);
}
function requestorEffect<TReq, TRes>(
requestorSpec: RequestResponseSpec<TReq, TRes>,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError>;
function requestorEffect(
requestorName: string,
): Effect.Effect<EffectRequestResponse<never, unknown>, FlowResourceNotFoundError>;
function requestorEffect<TReq, TRes>(
requestor: string | RequestResponseSpec<TReq, TRes>,
) {
if (typeof requestor === "string") {
return getRequestorEffect(requestor);
}
if (!requestors.has(requestor.name)) {
return Effect.fail(flowResourceNotFoundError(name, "requestor", requestor.name));
}
return requestor.requestorEffect(flow);
}
function parameterEffect<T>(
parameterSpec: ParameterSpec<T>,
): Effect.Effect<T, FlowParameterError>;
@ -158,6 +247,34 @@ export function makeFlow<Requirements = never>(
return decodeParameter(parameter, value);
}
function producer<T>(producerSpec: ProducerSpec<T>): FlowProducer<T>;
function producer(producerName: string): FlowProducer<never>;
function producer<T>(producer: string | ProducerSpec<T>) {
if (typeof producer === "string") {
return toFlowProducer(getProducer(producer));
}
if (!producers.has(producer.name)) {
throw flowResourceNotFoundError(name, "producer", producer.name);
}
return toFlowProducer(compatibilityRuntime.runSync(producer.producerEffect(flow)));
}
function requestor<TReq, TRes>(
requestorSpec: RequestResponseSpec<TReq, TRes>,
): FlowRequestor<TReq, TRes>;
function requestor(requestorName: string): FlowRequestor<never, unknown>;
function requestor<TReq, TRes>(
requestor: string | RequestResponseSpec<TReq, TRes>,
) {
if (typeof requestor === "string") {
return toFlowRequestor(getRequestor(requestor));
}
if (!requestors.has(requestor.name)) {
throw flowResourceNotFoundError(name, "requestor", requestor.name);
}
return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow)));
}
const flow = {
name,
processorId,
@ -239,36 +356,16 @@ export function makeFlow<Requirements = never>(
setParameter(parameterName: string, value: unknown): void {
parameters.set(parameterName, value);
},
producerEffect<T>(producerName: string): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError> {
const p = producers.get(producerName);
return p === undefined
? Effect.fail(flowResourceNotFoundError(name, "producer", producerName))
: Effect.succeed(p as EffectProducer<T>);
},
producerEffect,
consumerEffect(consumerName: string): Effect.Effect<EffectConsumer, FlowResourceNotFoundError> {
const c = consumers.get(consumerName);
return c === undefined
? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName))
: Effect.succeed(c);
},
requestorEffect<TReq, TRes>(
requestorName: string,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> {
const rr = requestors.get(requestorName);
return rr === undefined
? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName))
: Effect.succeed(rr as EffectRequestResponse<TReq, TRes>);
},
requestorEffect,
parameterEffect,
producer<T>(producerName: string): FlowProducer<T> {
const p = producers.get(producerName);
if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
return {
send: (id, message) => compatibilityRuntime.runPromise((p as EffectProducer<T>).send(id, message)),
flush: () => compatibilityRuntime.runPromise(p.flush),
stop: () => compatibilityRuntime.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))),
};
},
producer,
consumer(consumerName: string): FlowConsumer {
const c = consumers.get(consumerName);
if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName);
@ -276,20 +373,7 @@ export function makeFlow<Requirements = never>(
stop: () => compatibilityRuntime.runPromise(c.stop),
};
},
requestor<TReq, TRes>(requestorName: string): FlowRequestor<TReq, TRes> {
const rr = requestors.get(requestorName);
if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
return {
request: (request, options) =>
compatibilityRuntime.runPromise(
(rr as EffectRequestResponse<TReq, TRes>).request(
request,
toEffectRequestOptions(options),
),
),
stop: () => compatibilityRuntime.runPromise(rr.stop),
};
},
requestor,
parameter,
};

View file

@ -31,6 +31,8 @@ export class Embeddings extends Context.Service<Embeddings, EmbeddingsServiceSha
"@trustgraph/base/services/embeddings-service/Embeddings",
) {}
const EmbeddingsResponseProducer = makeProducerSpec<EmbeddingsResponse>("embeddings-response");
const onEmbeddingsRequest = Effect.fn("EmbeddingsService.onRequest")(function* (
msg: EmbeddingsRequest,
properties: Record<string, string>,
@ -41,7 +43,7 @@ const onEmbeddingsRequest = Effect.fn("EmbeddingsService.onRequest")(function* (
return;
}
const responseProducer = yield* flowCtx.flow.producerEffect<EmbeddingsResponse>("embeddings-response");
const responseProducer = yield* flowCtx.flow.producerEffect(EmbeddingsResponseProducer);
const embeddings = yield* Embeddings;
const response = yield* embeddings.embed(msg.text, msg.model).pipe(
Effect.map((vectors) => ({ vectors }) satisfies EmbeddingsResponse),
@ -70,7 +72,7 @@ export const makeEmbeddingsSpecs = (): ReadonlyArray<Spec<Embeddings>> => [
"embeddings-request",
onEmbeddingsRequest,
),
makeProducerSpec<EmbeddingsResponse>("embeddings-response"),
EmbeddingsResponseProducer,
makeParameterSpec("model"),
];

View file

@ -124,6 +124,8 @@ const llmErrorResponse = (error: LlmServiceError): TextCompletionResponse => ({
endOfStream: true,
});
const TextCompletionResponseProducer = makeProducerSpec<TextCompletionResponse>("text-completion-response");
const sendStreamingResponse = Effect.fn("LlmService.sendStreamingResponse")(function* (
llm: LlmServiceShape,
requestId: string,
@ -158,9 +160,7 @@ const onLlmRequest = Effect.fn("LlmService.onRequest")(function* (
const requestId = properties.id;
if (requestId === undefined || requestId.length === 0) return;
const responseProducer = yield* flowCtx.flow.producerEffect<TextCompletionResponse>(
"text-completion-response",
);
const responseProducer = yield* flowCtx.flow.producerEffect(TextCompletionResponseProducer);
const llm = yield* Llm;
if (msg.streaming === true && llm.supportsStreaming()) {
@ -210,7 +210,7 @@ export const makeLlmSpecs = (): ReadonlyArray<Spec<Llm>> => [
"text-completion-request",
onLlmRequest,
),
makeProducerSpec<TextCompletionResponse>("text-completion-response"),
TextCompletionResponseProducer,
makeParameterSpec("model"),
makeParameterSpec("temperature"),
];

View file

@ -8,6 +8,11 @@ import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
flowResourceNotFoundError,
type FlowResourceNotFoundError,
} from "../errors.js";
import {
type EffectProducer,
ProducerFactory,
} from "../messaging/runtime.js";
@ -15,9 +20,41 @@ declare const ProducerSpecType: unique symbol;
export interface ProducerSpec<T> extends Spec {
readonly [ProducerSpecType]?: (_: T) => T;
readonly producerEffect: <Requirements = never>(
flow: Flow<Requirements>,
) => Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError>;
}
export function makeProducerSpec<T>(name: string): ProducerSpec<T> {
const producers = new WeakMap<object, EffectProducer<T>>();
const registerProducer = <Requirements>(
flow: Flow<Requirements>,
producer: EffectProducer<T>,
) =>
Effect.sync(() => {
producers.set(flow, producer);
});
const unregisterProducer = <Requirements>(
flow: Flow<Requirements>,
producer: EffectProducer<T>,
) =>
Effect.sync(() => {
if (producers.get(flow) === producer) {
producers.delete(flow);
}
});
const producerEffect = <Requirements>(
flow: Flow<Requirements>,
): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError> => {
const producer = producers.get(flow);
return producer === undefined
? Effect.fail(flowResourceNotFoundError(flow.name, "producer", name))
: Effect.succeed(producer);
};
const addEffect = Effect.fn("ProducerSpec.addEffect")(function* (
flow: Flow,
definition: FlowDefinition,
@ -26,10 +63,13 @@ export function makeProducerSpec<T>(name: string): ProducerSpec<T> {
const factory = yield* ProducerFactory;
const producer = yield* factory.make<T>({ topic });
flow.registerProducer(name, producer);
yield* registerProducer(flow, producer);
yield* Effect.addFinalizer(() => unregisterProducer(flow, producer));
});
return {
name,
producerEffect,
addEffect,
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),

View file

@ -11,6 +11,11 @@ import { Effect } from "effect";
import type { Spec } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import {
flowResourceNotFoundError,
type FlowResourceNotFoundError,
} from "../errors.js";
import {
type EffectRequestResponse,
RequestResponseFactory,
} from "../messaging/runtime.js";
@ -21,6 +26,9 @@ export interface RequestResponseSpec<TReq, TRes> extends Spec {
readonly request: TReq;
readonly response: TRes;
};
readonly requestorEffect: <Requirements = never>(
flow: Flow<Requirements>,
) => Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError>;
}
export function makeRequestResponseSpec<TReq, TRes>(
@ -28,6 +36,35 @@ export function makeRequestResponseSpec<TReq, TRes>(
requestTopicName: string,
responseTopicName: string,
): RequestResponseSpec<TReq, TRes> {
const requestors = new WeakMap<object, EffectRequestResponse<TReq, TRes>>();
const registerRequestor = <Requirements>(
flow: Flow<Requirements>,
requestor: EffectRequestResponse<TReq, TRes>,
) =>
Effect.sync(() => {
requestors.set(flow, requestor);
});
const unregisterRequestor = <Requirements>(
flow: Flow<Requirements>,
requestor: EffectRequestResponse<TReq, TRes>,
) =>
Effect.sync(() => {
if (requestors.get(flow) === requestor) {
requestors.delete(flow);
}
});
const requestorEffect = <Requirements>(
flow: Flow<Requirements>,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> => {
const requestor = requestors.get(flow);
return requestor === undefined
? Effect.fail(flowResourceNotFoundError(flow.name, "requestor", name))
: Effect.succeed(requestor);
};
const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* (
flow: Flow,
definition: FlowDefinition,
@ -41,10 +78,13 @@ export function makeRequestResponseSpec<TReq, TRes>(
subscription: `${flow.processorId}-${flow.name}-${name}`,
});
flow.registerRequestor(name, requestor);
yield* registerRequestor(flow, requestor);
yield* Effect.addFinalizer(() => unregisterRequestor(flow, requestor));
});
return {
name,
requestorEffect,
addEffect,
add: (flow, pubsub, definition, context) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),