Use Effect fn for base processor helpers

This commit is contained in:
elpresidank 2026-06-04 07:34:14 -05:00
parent 4032d15d96
commit 1a29bdef9d
8 changed files with 316 additions and 193 deletions

View file

@ -167,7 +167,7 @@ describe("Effect-native flow specifications", () => {
provideRuntime(
backend,
Effect.gen(function* () {
yield* flow.startEffect();
yield* flow.startEffect;
const producer = yield* flow.producerEffect(outputProducerSpec);
const duplicateSpecError = yield* flow.producerEffect(duplicateOutputProducerSpec).pipe(Effect.flip);
expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError");
@ -212,7 +212,7 @@ describe("Effect-native flow specifications", () => {
provideRuntime(
backend,
Effect.gen(function* () {
yield* flow.startEffect();
yield* flow.startEffect;
yield* Effect.yieldNow;
yield* TestClock.adjust(Duration.millis(5));
}),
@ -254,7 +254,7 @@ describe("Effect-native flow specifications", () => {
provideRuntime(
backend,
Effect.gen(function* () {
yield* flow.startEffect();
yield* flow.startEffect;
const duplicateSpecError = yield* flow.requestorEffect(duplicateRequestResponseSpec).pipe(Effect.flip);
expect(duplicateSpecError._tag).toBe("FlowResourceNotFoundError");
const requestor = flow.requestor(requestResponseSpec);
@ -293,7 +293,7 @@ describe("Effect-native flow specifications", () => {
provideRuntime(
backend,
Effect.gen(function* () {
yield* flow.startEffect();
yield* flow.startEffect;
const producerError = yield* flow.producerEffect("missing-producer").pipe(Effect.flip);
const parameter = yield* flow.parameterEffect(presentParameter);
const legacyParameter = yield* flow.parameterEffect("present");

View file

@ -625,7 +625,7 @@ export const RequestResponseFactoryLive = Layer.effect(
export const runFlowRuntimeScoped = Effect.fn("FlowRuntime.run")(function* <Requirements = never>(
flow: Flow<Requirements>,
) {
yield* flow.startEffect().pipe(
yield* flow.startEffect.pipe(
Effect.mapError((error) => flowRuntimeError(flow.name, "start", error)),
);
yield* Effect.addFinalizer(() =>
@ -673,8 +673,8 @@ export const runEffectRequestResponseScoped = Effect.fn("runEffectRequestRespons
return yield* makeEffectRequestResponseFromPubSub<TReq, TRes>(pubsub, config, options);
});
export const runFlowScoped = Effect.fn("runFlowScoped")(function* (
flow: Flow,
export const runFlowScoped = Effect.fn("runFlowScoped")(function* <Requirements = never>(
flow: Flow<Requirements>,
) {
yield* runFlowRuntimeScoped(flow);
});

View file

@ -37,7 +37,7 @@ import {
} from "../messaging/runtime.js";
import { makePubSubService, PubSub } from "../backend/pubsub.js";
import { loadMessagingRuntimeConfig } from "../runtime/index.ts";
import { Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import { Context, Duration, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
@ -172,127 +172,110 @@ export function runFlowProcessorDefinitionScoped<
);
};
const startFlowEffect = (
const startFlowEffect = Effect.fn("FlowProcessor.startFlow")(function* (
name: string,
definition: FlowDefinition,
): Effect.Effect<
ActiveFlow,
FlowRuntimeError,
FlowRuntime | ProducerFactory | ConsumerFactory | RequestResponseFactory | FlowRequirements
> =>
Effect.gen(function* () {
const flowRuntime = yield* FlowRuntime;
const scope = yield* Scope.make();
const flow = new Flow<FlowRequirements>(
name,
options.id,
options.pubsub,
definition,
options.specifications,
);
return yield* flowRuntime.run(flow).pipe(
Scope.provide(scope),
Effect.as({ scope } satisfies ActiveFlow),
Effect.catch((error) =>
Scope.close(scope, Exit.void).pipe(
Effect.flatMap(() => Effect.fail(error)),
),
) {
const flowRuntime = yield* FlowRuntime;
const scope = yield* Scope.make();
const flow = new Flow<FlowRequirements>(
name,
options.id,
options.pubsub,
definition,
options.specifications,
);
return yield* flowRuntime.run(flow).pipe(
Scope.provide(scope),
Effect.as({ scope } satisfies ActiveFlow),
Effect.catch((error) =>
Scope.close(scope, Exit.void).pipe(
Effect.flatMap(() => Effect.fail(error)),
),
);
});
),
);
});
const onConfigureFlowsEffect = (
const onConfigureFlowsEffect = Effect.fn("FlowProcessor.configureFlows")(function* (
config: Record<string, unknown>,
_version: number,
): Effect.Effect<
void,
FlowRuntimeError,
FlowRuntime | ProducerFactory | ConsumerFactory | RequestResponseFactory | FlowRequirements
> =>
Effect.gen(function* () {
const flowDefs = config.flows;
if (flowDefs === undefined) {
yield* Effect.log(`[${options.id}] No flows in config push, skipping`);
return;
}
const decodedFlowDefs = decodeFlowDefinitions(flowDefs);
if (O.isNone(decodedFlowDefs)) {
yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`);
return;
}
const flowDefinitions = decodedFlowDefs.value;
) {
const flowDefs = config.flows;
if (flowDefs === undefined) {
yield* Effect.log(`[${options.id}] No flows in config push, skipping`);
return;
}
const decodedFlowDefs = decodeFlowDefinitions(flowDefs);
if (O.isNone(decodedFlowDefs)) {
yield* Effect.logWarning(`[${options.id}] Skipping config push: flows is not an object`);
return;
}
const flowDefinitions = decodedFlowDefs.value;
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe(
Effect.catch((error) => Effect.succeed(String(error))),
);
if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) {
yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`);
return;
}
lastFlowsJson = flowsJson;
const flowsJson = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(flowDefinitions).pipe(
Effect.catch((error) => Effect.succeed(String(error))),
);
if (lastFlowsJson.length > 0 && flowsJson === lastFlowsJson && flows.size > 0) {
yield* Effect.log(`[${options.id}] Flow definitions unchanged, skipping restart`);
return;
}
lastFlowsJson = flowsJson;
for (const [name, activeFlow] of flows) {
if (!(name in flowDefinitions)) {
yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`);
yield* closeFlowEffect(name, activeFlow);
flows.delete(name);
}
for (const [name, activeFlow] of flows) {
if (!(name in flowDefinitions)) {
yield* Effect.log(`[${options.id}] Stopping removed flow: ${name}`);
yield* closeFlowEffect(name, activeFlow);
flows.delete(name);
}
}
for (const [name, defn] of Object.entries(flowDefinitions)) {
const existing = flows.get(name);
if (existing !== undefined) {
yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`);
yield* closeFlowEffect(name, existing);
flows.delete(name);
}
for (const [name, defn] of Object.entries(flowDefinitions)) {
const existing = flows.get(name);
if (existing !== undefined) {
yield* Effect.log(`[${options.id}] Restarting flow "${name}" with updated config`);
yield* closeFlowEffect(name, existing);
flows.delete(name);
}
yield* Effect.log(`[${options.id}] Starting flow "${name}"`);
const activeFlow = yield* startFlowEffect(name, defn);
flows.set(name, activeFlow);
yield* Effect.log(`[${options.id}] Flow "${name}" started`);
}
});
yield* Effect.log(`[${options.id}] Starting flow "${name}"`);
const activeFlow = yield* startFlowEffect(name, defn);
flows.set(name, activeFlow);
yield* Effect.log(`[${options.id}] Flow "${name}" started`);
}
const processNextConfigPushEffect = Effect.fn("FlowProcessor.processNextConfigPush")(function* () {
const consumer = configConsumer;
if (consumer === null) {
yield* Effect.sleep(Duration.millis(1000));
return;
}
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (error) => pubSubError("receive:config-push", error),
});
if (msg === null) {
return;
}
const processNextConfigPushEffect = (): Effect.Effect<
void,
never,
| FlowRuntime
| ProducerFactory
| ConsumerFactory
| RequestResponseFactory
| FlowRequirements
| ConfigHandlerRequirements
> =>
Effect.gen(function* () {
const consumer = configConsumer;
if (consumer === null) {
yield* Effect.sleep(Duration.millis(1000));
return;
}
const push = msg.value();
yield* Effect.log(`[${options.id}] Received config push version=${push.version}`);
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (error) => pubSubError("receive:config-push", error),
});
if (msg === null) {
return;
}
yield* onConfigureFlowsEffect(push.config, push.version);
const push = msg.value();
yield* Effect.log(`[${options.id}] Received config push version=${push.version}`);
for (const handler of options.configHandlers ?? []) {
yield* handler(push.config, push.version);
}
yield* onConfigureFlowsEffect(push.config, push.version);
yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg),
catch: (error) => pubSubError("acknowledge:config-push", error),
});
});
for (const handler of options.configHandlers ?? []) {
yield* handler(push.config, push.version);
}
yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg),
catch: (error) => pubSubError("acknowledge:config-push", error),
});
}).pipe(
const processNextConfigPushSafelyEffect = Effect.fn("FlowProcessor.processNextConfigPushSafely")(function* () {
return yield* processNextConfigPushEffect().pipe(
Effect.catch((error) => {
if (!isRunning()) {
return Effect.void;
@ -304,6 +287,7 @@ export function runFlowProcessorDefinitionScoped<
);
}),
);
});
return Effect.gen(function* () {
const pubsub = yield* PubSub;
@ -325,7 +309,7 @@ export function runFlowProcessorDefinitionScoped<
yield* Effect.whileLoop({
while: isRunning,
body: processNextConfigPushEffect,
body: processNextConfigPushSafelyEffect,
step: () => undefined,
});
});
@ -368,6 +352,24 @@ export function makeFlowProcessor<FlowRequirements = never>(
return options.provide?.(effect) ?? effect;
};
const startProcessorEffect = Effect.fn("FlowProcessor.start")(function* (
context: Context.Context<FlowProcessorRuntimeRequirements<FlowRequirements>>,
) {
const pubsub = makePubSubService(base.pubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
const start = processor.startEffect.pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
);
return yield* Effect.provide(Effect.scoped(start), context);
});
processor = {
...base,
specifications,
@ -377,25 +379,7 @@ export function makeFlowProcessor<FlowRequirements = never>(
get startEffect() {
return makeStartEffect();
},
start: (context) =>
compatibilityRuntime.runPromise(Effect.provide(
Effect.gen(function* () {
const pubsub = makePubSubService(base.pubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
const start = processor.startEffect.pipe(
Effect.provideService(PubSub, pubsub),
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsub))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsub, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsub, messagingConfig)),
),
Effect.provideService(FlowRuntime, FlowRuntime.of({ run: runFlowRuntimeScoped })),
);
yield* Effect.scoped(start);
}),
context,
)),
start: (context) => compatibilityRuntime.runPromise(startProcessorEffect(context)),
};
return processor;

View file

@ -4,7 +4,7 @@
* Python reference: trustgraph-base/trustgraph/base/flow.py
*/
import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import { Config as EffectConfig, Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import * as O from "effect/Option";
import * as S from "effect/Schema";
import type { PubSubBackend } from "../backend/types.js";
@ -66,13 +66,68 @@ export interface FlowRequestor<TReq, TRes> {
type FlowParameterError = FlowResourceNotFoundError | FlowParameterDecodeError;
export interface Flow<Requirements = never> {
readonly name: string;
readonly processorId: string;
startEffect: Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements>;
start: (context: Context.Context<Requirements>) => Promise<void>;
stop: () => Promise<void>;
stopEffect: Effect.Effect<void>;
runInCompatibilityScopeEffect: <A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
context: Context.Context<Requirements>,
) => Effect.Effect<A, E | EffectConfig.ConfigError>;
runInCompatibilityScope: <A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
context: Context.Context<Requirements>,
) => Promise<A>;
clearResources: () => void;
registerProducer: <T>(registerName: string, producer: EffectProducer<T>) => void;
registerConsumer: (registerName: string, consumer: EffectConsumer) => void;
registerRequestor: <TReq, TRes>(
registerName: string,
rr: EffectRequestResponse<TReq, TRes>,
) => void;
setParameter: (parameterName: string, value: unknown) => void;
producerEffect: {
<T>(producerSpec: ProducerSpec<T>): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError>;
(producerName: string): Effect.Effect<EffectProducer<never>, FlowResourceNotFoundError>;
};
consumerEffect: (consumerName: string) => Effect.Effect<EffectConsumer, FlowResourceNotFoundError>;
requestorEffect: {
<TReq, TRes>(
requestorSpec: RequestResponseSpec<TReq, TRes>,
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError>;
(requestorName: string): Effect.Effect<EffectRequestResponse<never, unknown>, FlowResourceNotFoundError>;
};
parameterEffect: {
<T>(parameterSpec: ParameterSpec<T>): Effect.Effect<T, FlowParameterError>;
(parameterName: string): Effect.Effect<unknown, FlowResourceNotFoundError>;
};
producer: {
<T>(producerSpec: ProducerSpec<T>): FlowProducer<T>;
(producerName: string): FlowProducer<never>;
};
consumer: (consumerName: string) => FlowConsumer;
requestor: {
<TReq, TRes>(requestorSpec: RequestResponseSpec<TReq, TRes>): FlowRequestor<TReq, TRes>;
(requestorName: string): FlowRequestor<never, unknown>;
};
parameter: {
<T>(parameterSpec: ParameterSpec<T>): T;
(parameterName: string): unknown;
};
}
export function makeFlow<Requirements = never>(
name: string,
processorId: string,
pubsub: PubSubBackend,
definition: FlowDefinition,
specifications: ReadonlyArray<Spec<Requirements>>,
) {
): Flow<Requirements> {
const producers = new Map<string, EffectProducer<never>>();
const consumers = new Map<string, EffectConsumer>();
const requestors = new Map<string, EffectRequestResponse<never, unknown>>();
@ -275,62 +330,56 @@ export function makeFlow<Requirements = never>(
return toFlowRequestor(compatibilityRuntime.runSync(requestor.requestorEffect(flow)));
}
const flow = {
const flow: Flow<Requirements> = {
name,
processorId,
startEffect(): Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements> {
return Effect.gen(function* () {
for (const spec of specifications) {
yield* spec.addEffect(flow, definition);
}
});
},
startEffect: Effect.gen(function* () {
for (const spec of specifications) {
yield* spec.addEffect(flow, definition);
}
}).pipe(Effect.withSpan("Flow.startEffect")),
start(context: Context.Context<Requirements>): Promise<void> {
return compatibilityRuntime.runPromise(
Effect.gen(function* () {
if (compatibilityScope !== null) {
yield* flow.stopEffect();
yield* flow.stopEffect;
}
yield* flow.runInCompatibilityScopeEffect(flow.startEffect(), pubsub, context);
yield* flow.runInCompatibilityScopeEffect(flow.startEffect, pubsub, context);
}),
);
},
stop(): Promise<void> {
return compatibilityRuntime.runPromise(flow.stopEffect());
return compatibilityRuntime.runPromise(flow.stopEffect);
},
stopEffect(): Effect.Effect<void> {
return Effect.gen(function* () {
const scope = compatibilityScope;
compatibilityScope = null;
if (scope !== null) {
yield* Scope.close(scope, Exit.void);
}
flow.clearResources();
});
},
runInCompatibilityScopeEffect<A, E>(
stopEffect: Effect.gen(function* () {
const scope = compatibilityScope;
compatibilityScope = null;
if (scope !== null) {
yield* Scope.close(scope, Exit.void);
}
flow.clearResources();
}).pipe(Effect.withSpan("Flow.stopEffect")),
runInCompatibilityScopeEffect: Effect.fn("Flow.runInCompatibilityScopeEffect")(function* <A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
context: Context.Context<Requirements>,
) {
return Effect.gen(function* () {
const scope = yield* ensureCompatibilityScopeEffect();
const pubsubService = makePubSubService(runtimePubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
return yield* Effect.provide(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
),
Scope.provide(scope),
const scope = yield* ensureCompatibilityScopeEffect();
const pubsubService = makePubSubService(runtimePubsub);
const messagingConfig = yield* loadMessagingRuntimeConfig();
return yield* Effect.provide(
effect.pipe(
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
Effect.provideService(
RequestResponseFactory,
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
),
context,
);
});
},
Scope.provide(scope),
),
context,
);
}),
runInCompatibilityScope<A, E>(
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
runtimePubsub: PubSubBackend,
@ -380,8 +429,6 @@ export function makeFlow<Requirements = never>(
return flow;
}
export type Flow<Requirements = never> = ReturnType<typeof makeFlow<Requirements>>;
export const Flow = makeFlow as unknown as {
new <Requirements = never>(
name: string,

View file

@ -7,16 +7,28 @@
import { Effect, type Context } from "effect";
import * as S from "effect/Schema";
import type { PubSubBackend } from "../backend/types.js";
import type { Spec } from "./types.js";
import type { SpecRuntimeRequirements } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import type { PubSubError } from "../errors.js";
declare const ParameterSpecType: unique symbol;
const UnknownParameterSchema: S.Codec<unknown, unknown> = S.Unknown;
export interface ParameterSpec<T = unknown> extends Spec {
export interface ParameterSpec<T = unknown> {
readonly [ParameterSpecType]?: (_: T) => T;
readonly name: string;
readonly schema: S.Codec<T, unknown>;
readonly addEffect: <Requirements = never>(
flow: Flow<Requirements>,
definition: FlowDefinition,
) => Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements>;
readonly add: <Requirements = never>(
flow: Flow<Requirements>,
pubsub: PubSubBackend,
definition: FlowDefinition,
context: Context.Context<Requirements>,
) => Promise<void>;
}
export function makeParameterSpec(name: string): ParameterSpec<unknown>;
@ -29,7 +41,7 @@ export function makeParameterSpec<T>(
schema?: S.Codec<T, unknown>,
) {
const parameterSchema = schema ?? UnknownParameterSchema;
const addEffect = (flow: Flow, definition: FlowDefinition) =>
const addEffect = <Requirements = never>(flow: Flow<Requirements>, definition: FlowDefinition) =>
Effect.sync(() => {
const value = definition.parameters?.[name];
flow.setParameter(name, value);
@ -39,11 +51,11 @@ export function makeParameterSpec<T>(
name,
schema: parameterSchema,
addEffect,
add: (
flow: Flow,
add: <Requirements = never>(
flow: Flow<Requirements>,
pubsub: PubSubBackend,
definition: FlowDefinition,
context: Context.Context<never>,
context: Context.Context<Requirements>,
) =>
flow.runInCompatibilityScope(addEffect(flow, definition), pubsub, context),
};

View file

@ -4,12 +4,14 @@
* Python reference: trustgraph-base/trustgraph/base/producer_spec.py
*/
import { Effect } from "effect";
import type { Spec } from "./types.js";
import { Effect, type Context } from "effect";
import type { SpecRuntimeRequirements } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import type { PubSubBackend } from "../backend/types.js";
import {
flowResourceNotFoundError,
type FlowResourceNotFoundError,
type PubSubError,
} from "../errors.js";
import {
type EffectProducer,
@ -18,8 +20,19 @@ import {
declare const ProducerSpecType: unique symbol;
export interface ProducerSpec<T> extends Spec {
export interface ProducerSpec<T> {
readonly [ProducerSpecType]?: (_: T) => T;
readonly name: string;
readonly addEffect: <Requirements = never>(
flow: Flow<Requirements>,
definition: FlowDefinition,
) => Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements>;
readonly add: <Requirements = never>(
flow: Flow<Requirements>,
pubsub: PubSubBackend,
definition: FlowDefinition,
context: Context.Context<Requirements>,
) => Promise<void>;
readonly producerEffect: <Requirements = never>(
flow: Flow<Requirements>,
) => Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError>;
@ -55,8 +68,8 @@ export function makeProducerSpec<T>(name: string): ProducerSpec<T> {
: Effect.succeed(producer);
};
const addEffect = Effect.fn("ProducerSpec.addEffect")(function* (
flow: Flow,
const addEffect = Effect.fn("ProducerSpec.addEffect")(function* <Requirements = never>(
flow: Flow<Requirements>,
definition: FlowDefinition,
) {
const topic = definition.topics?.[name] ?? name;

View file

@ -7,12 +7,14 @@
* Python reference: trustgraph-base/trustgraph/base/prompt_client_spec.py
*/
import { Effect } from "effect";
import type { Spec } from "./types.js";
import { Effect, type Context } from "effect";
import type { SpecRuntimeRequirements } from "./types.js";
import type { Flow, FlowDefinition } from "../processor/flow.js";
import type { PubSubBackend } from "../backend/types.js";
import {
flowResourceNotFoundError,
type FlowResourceNotFoundError,
type PubSubError,
} from "../errors.js";
import {
type EffectRequestResponse,
@ -21,11 +23,22 @@ import {
declare const RequestResponseSpecType: unique symbol;
export interface RequestResponseSpec<TReq, TRes> extends Spec {
export interface RequestResponseSpec<TReq, TRes> {
readonly [RequestResponseSpecType]?: {
readonly request: TReq;
readonly response: TRes;
};
readonly name: string;
readonly addEffect: <Requirements = never>(
flow: Flow<Requirements>,
definition: FlowDefinition,
) => Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements>;
readonly add: <Requirements = never>(
flow: Flow<Requirements>,
pubsub: PubSubBackend,
definition: FlowDefinition,
context: Context.Context<Requirements>,
) => Promise<void>;
readonly requestorEffect: <Requirements = never>(
flow: Flow<Requirements>,
) => Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError>;
@ -65,8 +78,8 @@ export function makeRequestResponseSpec<TReq, TRes>(
: Effect.succeed(requestor);
};
const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* (
flow: Flow,
const addEffect = Effect.fn("RequestResponseSpec.addEffect")(function* <Requirements = never>(
flow: Flow<Requirements>,
definition: FlowDefinition,
) {
const requestTopic = definition.topics?.[requestTopicName] ?? requestTopicName;