2026-04-05 21:09:33 -05:00
|
|
|
/**
|
|
|
|
|
* Runtime flow instance — created by FlowProcessor for each configured flow.
|
|
|
|
|
*
|
|
|
|
|
* Python reference: trustgraph-base/trustgraph/base/flow.py
|
|
|
|
|
*/
|
|
|
|
|
|
2026-06-02 02:45:11 -05:00
|
|
|
import { Context, Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
|
2026-04-05 21:09:33 -05:00
|
|
|
import type { PubSubBackend } from "../backend/types.js";
|
2026-05-12 08:06:58 -05:00
|
|
|
import { makePubSubService } from "../backend/pubsub.js";
|
|
|
|
|
import {
|
|
|
|
|
flowResourceNotFoundError,
|
|
|
|
|
type FlowResourceNotFoundError,
|
|
|
|
|
type PubSubError,
|
|
|
|
|
} from "../errors.js";
|
|
|
|
|
import {
|
|
|
|
|
ConsumerFactory,
|
|
|
|
|
ProducerFactory,
|
|
|
|
|
RequestResponseFactory,
|
|
|
|
|
type EffectConsumer,
|
|
|
|
|
type EffectProducer,
|
|
|
|
|
type EffectRequestOptions,
|
|
|
|
|
type EffectRequestResponse,
|
|
|
|
|
makeConsumerFactoryService,
|
|
|
|
|
makeProducerFactoryService,
|
|
|
|
|
makeRequestResponseFactoryService,
|
|
|
|
|
} from "../messaging/runtime.js";
|
|
|
|
|
import { loadMessagingRuntimeConfig } from "../runtime/messaging-config.js";
|
|
|
|
|
import type { Spec, SpecRuntimeRequirements } from "../spec/types.js";
|
2026-04-05 21:09:33 -05:00
|
|
|
|
|
|
|
|
export interface FlowDefinition {
|
|
|
|
|
/** Topic overrides keyed by spec name */
|
|
|
|
|
topics?: Record<string, string>;
|
|
|
|
|
/** Parameter values keyed by spec name */
|
|
|
|
|
parameters?: Record<string, unknown>;
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-12 08:06:58 -05:00
|
|
|
export interface FlowProducer<T> {
|
|
|
|
|
readonly send: (id: string, message: T) => Promise<void>;
|
|
|
|
|
readonly flush: () => Promise<void>;
|
|
|
|
|
readonly stop: () => Promise<void>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export interface FlowConsumer {
|
|
|
|
|
readonly stop: () => Promise<void>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export interface FlowRequestOptions<TRes> {
|
|
|
|
|
readonly timeoutMs?: number;
|
|
|
|
|
readonly recipient?: (response: TRes) => Promise<boolean>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export interface FlowRequestor<TReq, TRes> {
|
|
|
|
|
readonly request: (
|
|
|
|
|
request: TReq,
|
|
|
|
|
options?: FlowRequestOptions<TRes>,
|
|
|
|
|
) => Promise<TRes>;
|
|
|
|
|
readonly stop: () => Promise<void>;
|
|
|
|
|
}
|
|
|
|
|
|
2026-06-01 20:26:47 -05:00
|
|
|
export function makeFlow<Requirements = never>(
|
|
|
|
|
name: string,
|
|
|
|
|
processorId: string,
|
|
|
|
|
pubsub: PubSubBackend,
|
|
|
|
|
definition: FlowDefinition,
|
|
|
|
|
specifications: ReadonlyArray<Spec<Requirements>>,
|
|
|
|
|
) {
|
2026-06-02 00:22:04 -05:00
|
|
|
const producers = new Map<string, EffectProducer<never>>();
|
2026-06-01 20:26:47 -05:00
|
|
|
const consumers = new Map<string, EffectConsumer>();
|
2026-06-02 00:22:04 -05:00
|
|
|
const requestors = new Map<string, EffectRequestResponse<never, unknown>>();
|
2026-06-01 20:26:47 -05:00
|
|
|
const parameters = new Map<string, unknown>();
|
|
|
|
|
let compatibilityScope: Scope.Closeable | null = null;
|
2026-06-02 02:45:11 -05:00
|
|
|
const compatibilityRuntime = ManagedRuntime.make(Layer.empty);
|
2026-06-01 20:26:47 -05:00
|
|
|
|
2026-06-02 00:22:04 -05:00
|
|
|
const ensureCompatibilityScopeEffect = Effect.fn("Flow.ensureCompatibilityScope")(function* () {
|
2026-06-01 20:26:47 -05:00
|
|
|
if (compatibilityScope !== null) {
|
|
|
|
|
return compatibilityScope;
|
feat: add schema foundation for document pipeline, agent, and deployment
Add missing topics (librarian, knowledge, collection-management, flow),
pipeline message types (TextDocument, Chunk, Triples, EntityContexts),
service message types (Librarian, Knowledge, Collection, Flow CRUD),
and update AgentResponse for streaming chunk format.
Add RequestResponseSpec enabling flow-scoped request/response calls
(needed by knowledge extraction and agent services). Add requestor
registry to Flow class with proper lifecycle management.
Add end_of_dialog to gateway's isComplete() check for agent streaming.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 00:11:29 -05:00
|
|
|
}
|
2026-06-02 00:22:04 -05:00
|
|
|
const scope = yield* Scope.make();
|
|
|
|
|
compatibilityScope = scope;
|
|
|
|
|
return scope;
|
|
|
|
|
});
|
2026-05-12 08:06:58 -05:00
|
|
|
|
2026-06-01 20:26:47 -05:00
|
|
|
const toEffectRequestOptions = <TRes>(
|
2026-05-12 08:06:58 -05:00
|
|
|
options: FlowRequestOptions<TRes> | undefined,
|
2026-06-01 20:26:47 -05:00
|
|
|
): EffectRequestOptions<TRes> | undefined => {
|
2026-05-12 08:06:58 -05:00
|
|
|
if (options === undefined) {
|
|
|
|
|
return undefined;
|
|
|
|
|
}
|
|
|
|
|
const recipient = options.recipient;
|
|
|
|
|
return {
|
|
|
|
|
...(options.timeoutMs === undefined ? {} : { timeoutMs: options.timeoutMs }),
|
|
|
|
|
...(recipient === undefined
|
|
|
|
|
? {}
|
|
|
|
|
: {
|
|
|
|
|
recipient: (response: TRes) => Effect.promise(() => recipient(response)),
|
|
|
|
|
}),
|
|
|
|
|
};
|
2026-06-01 20:26:47 -05:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const flow = {
|
|
|
|
|
name,
|
|
|
|
|
processorId,
|
|
|
|
|
startEffect(): Effect.Effect<void, PubSubError, SpecRuntimeRequirements | Requirements> {
|
|
|
|
|
return Effect.gen(function* () {
|
|
|
|
|
for (const spec of specifications) {
|
|
|
|
|
yield* spec.addEffect(flow, definition);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
},
|
2026-06-02 00:22:04 -05:00
|
|
|
start(context: Context.Context<Requirements>): Promise<void> {
|
2026-06-02 02:45:11 -05:00
|
|
|
return compatibilityRuntime.runPromise(
|
2026-06-02 00:22:04 -05:00
|
|
|
Effect.gen(function* () {
|
|
|
|
|
if (compatibilityScope !== null) {
|
|
|
|
|
yield* flow.stopEffect();
|
|
|
|
|
}
|
|
|
|
|
yield* flow.runInCompatibilityScopeEffect(flow.startEffect(), pubsub, context);
|
|
|
|
|
}),
|
2026-06-01 20:26:47 -05:00
|
|
|
);
|
|
|
|
|
},
|
2026-06-02 00:22:04 -05:00
|
|
|
stop(): Promise<void> {
|
2026-06-02 02:45:11 -05:00
|
|
|
return compatibilityRuntime.runPromise(flow.stopEffect());
|
2026-06-02 00:22:04 -05:00
|
|
|
},
|
|
|
|
|
stopEffect(): Effect.Effect<void> {
|
|
|
|
|
return Effect.gen(function* () {
|
|
|
|
|
const scope = compatibilityScope;
|
|
|
|
|
compatibilityScope = null;
|
|
|
|
|
if (scope !== null) {
|
|
|
|
|
yield* Scope.close(scope, Exit.void);
|
|
|
|
|
}
|
|
|
|
|
flow.clearResources();
|
|
|
|
|
});
|
2026-06-01 20:26:47 -05:00
|
|
|
},
|
2026-06-02 00:22:04 -05:00
|
|
|
runInCompatibilityScopeEffect<A, E>(
|
|
|
|
|
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
|
2026-06-01 20:26:47 -05:00
|
|
|
runtimePubsub: PubSubBackend,
|
2026-06-02 00:22:04 -05:00
|
|
|
context: Context.Context<Requirements>,
|
|
|
|
|
) {
|
|
|
|
|
return Effect.gen(function* () {
|
|
|
|
|
const scope = yield* ensureCompatibilityScopeEffect();
|
|
|
|
|
const pubsubService = makePubSubService(runtimePubsub);
|
|
|
|
|
const messagingConfig = yield* loadMessagingRuntimeConfig();
|
|
|
|
|
return yield* Effect.provide(
|
|
|
|
|
effect.pipe(
|
|
|
|
|
Effect.provideService(ProducerFactory, ProducerFactory.of(makeProducerFactoryService(pubsubService))),
|
|
|
|
|
Effect.provideService(ConsumerFactory, ConsumerFactory.of(makeConsumerFactoryService(pubsubService, messagingConfig))),
|
|
|
|
|
Effect.provideService(
|
|
|
|
|
RequestResponseFactory,
|
|
|
|
|
RequestResponseFactory.of(makeRequestResponseFactoryService(pubsubService, messagingConfig)),
|
|
|
|
|
),
|
|
|
|
|
Scope.provide(scope),
|
2026-06-01 20:26:47 -05:00
|
|
|
),
|
2026-06-02 00:22:04 -05:00
|
|
|
context,
|
|
|
|
|
);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
runInCompatibilityScope<A, E>(
|
|
|
|
|
effect: Effect.Effect<A, E, SpecRuntimeRequirements | Requirements>,
|
|
|
|
|
runtimePubsub: PubSubBackend,
|
|
|
|
|
context: Context.Context<Requirements>,
|
|
|
|
|
): Promise<A> {
|
2026-06-02 02:45:11 -05:00
|
|
|
return compatibilityRuntime.runPromise(flow.runInCompatibilityScopeEffect(effect, runtimePubsub, context));
|
2026-06-01 20:26:47 -05:00
|
|
|
},
|
|
|
|
|
clearResources(): void {
|
|
|
|
|
producers.clear();
|
|
|
|
|
consumers.clear();
|
|
|
|
|
requestors.clear();
|
|
|
|
|
parameters.clear();
|
|
|
|
|
},
|
2026-06-02 00:22:04 -05:00
|
|
|
registerProducer<T>(registerName: string, producer: EffectProducer<T>): void {
|
2026-06-01 20:26:47 -05:00
|
|
|
producers.set(registerName, producer);
|
|
|
|
|
},
|
|
|
|
|
registerConsumer(registerName: string, consumer: EffectConsumer): void {
|
|
|
|
|
consumers.set(registerName, consumer);
|
|
|
|
|
},
|
2026-06-02 00:22:04 -05:00
|
|
|
registerRequestor<TReq, TRes>(registerName: string, rr: EffectRequestResponse<TReq, TRes>): void {
|
2026-06-01 20:26:47 -05:00
|
|
|
requestors.set(registerName, rr);
|
|
|
|
|
},
|
|
|
|
|
setParameter(parameterName: string, value: unknown): void {
|
|
|
|
|
parameters.set(parameterName, value);
|
|
|
|
|
},
|
|
|
|
|
producerEffect<T>(producerName: string): Effect.Effect<EffectProducer<T>, FlowResourceNotFoundError> {
|
|
|
|
|
const p = producers.get(producerName);
|
|
|
|
|
return p === undefined
|
|
|
|
|
? Effect.fail(flowResourceNotFoundError(name, "producer", producerName))
|
|
|
|
|
: Effect.succeed(p as EffectProducer<T>);
|
|
|
|
|
},
|
|
|
|
|
consumerEffect(consumerName: string): Effect.Effect<EffectConsumer, FlowResourceNotFoundError> {
|
|
|
|
|
const c = consumers.get(consumerName);
|
|
|
|
|
return c === undefined
|
|
|
|
|
? Effect.fail(flowResourceNotFoundError(name, "consumer", consumerName))
|
|
|
|
|
: Effect.succeed(c);
|
|
|
|
|
},
|
|
|
|
|
requestorEffect<TReq, TRes>(
|
|
|
|
|
requestorName: string,
|
|
|
|
|
): Effect.Effect<EffectRequestResponse<TReq, TRes>, FlowResourceNotFoundError> {
|
|
|
|
|
const rr = requestors.get(requestorName);
|
|
|
|
|
return rr === undefined
|
|
|
|
|
? Effect.fail(flowResourceNotFoundError(name, "requestor", requestorName))
|
|
|
|
|
: Effect.succeed(rr as EffectRequestResponse<TReq, TRes>);
|
|
|
|
|
},
|
|
|
|
|
parameterEffect<T>(parameterName: string): Effect.Effect<T, FlowResourceNotFoundError> {
|
|
|
|
|
const v = parameters.get(parameterName);
|
|
|
|
|
return v === undefined
|
|
|
|
|
? Effect.fail(flowResourceNotFoundError(name, "parameter", parameterName))
|
|
|
|
|
: Effect.succeed(v as T);
|
|
|
|
|
},
|
|
|
|
|
producer<T>(producerName: string): FlowProducer<T> {
|
|
|
|
|
const p = producers.get(producerName);
|
|
|
|
|
if (p === undefined) throw flowResourceNotFoundError(name, "producer", producerName);
|
|
|
|
|
return {
|
2026-06-02 02:45:11 -05:00
|
|
|
send: (id, message) => compatibilityRuntime.runPromise((p as EffectProducer<T>).send(id, message)),
|
|
|
|
|
flush: () => compatibilityRuntime.runPromise(p.flush),
|
|
|
|
|
stop: () => compatibilityRuntime.runPromise(p.flush.pipe(Effect.flatMap(() => p.close))),
|
2026-06-01 20:26:47 -05:00
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
consumer(consumerName: string): FlowConsumer {
|
|
|
|
|
const c = consumers.get(consumerName);
|
|
|
|
|
if (c === undefined) throw flowResourceNotFoundError(name, "consumer", consumerName);
|
|
|
|
|
return {
|
2026-06-02 02:45:11 -05:00
|
|
|
stop: () => compatibilityRuntime.runPromise(c.stop),
|
2026-06-01 20:26:47 -05:00
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
requestor<TReq, TRes>(requestorName: string): FlowRequestor<TReq, TRes> {
|
|
|
|
|
const rr = requestors.get(requestorName);
|
|
|
|
|
if (rr === undefined) throw flowResourceNotFoundError(name, "requestor", requestorName);
|
|
|
|
|
return {
|
|
|
|
|
request: (request, options) =>
|
2026-06-02 02:45:11 -05:00
|
|
|
compatibilityRuntime.runPromise(
|
2026-06-01 20:26:47 -05:00
|
|
|
(rr as EffectRequestResponse<TReq, TRes>).request(
|
|
|
|
|
request,
|
|
|
|
|
toEffectRequestOptions(options),
|
|
|
|
|
),
|
|
|
|
|
),
|
2026-06-02 02:45:11 -05:00
|
|
|
stop: () => compatibilityRuntime.runPromise(rr.stop),
|
2026-06-01 20:26:47 -05:00
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
parameter<T>(parameterName: string): T {
|
|
|
|
|
const v = parameters.get(parameterName);
|
|
|
|
|
if (v === undefined) throw flowResourceNotFoundError(name, "parameter", parameterName);
|
|
|
|
|
return v as T;
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return flow;
|
2026-04-05 21:09:33 -05:00
|
|
|
}
|
2026-06-01 20:26:47 -05:00
|
|
|
|
|
|
|
|
export type Flow<Requirements = never> = ReturnType<typeof makeFlow<Requirements>>;
|
|
|
|
|
|
|
|
|
|
export const Flow = makeFlow as unknown as {
|
|
|
|
|
new <Requirements = never>(
|
|
|
|
|
name: string,
|
|
|
|
|
processorId: string,
|
|
|
|
|
pubsub: PubSubBackend,
|
|
|
|
|
definition: FlowDefinition,
|
|
|
|
|
specifications: ReadonlyArray<Spec<Requirements>>,
|
|
|
|
|
): Flow<Requirements>;
|
|
|
|
|
<Requirements = never>(
|
|
|
|
|
name: string,
|
|
|
|
|
processorId: string,
|
|
|
|
|
pubsub: PubSubBackend,
|
|
|
|
|
definition: FlowDefinition,
|
|
|
|
|
specifications: ReadonlyArray<Spec<Requirements>>,
|
|
|
|
|
): Flow<Requirements>;
|
|
|
|
|
};
|