From 5ed3f0e2d85e2823120ca31f4fabbf5d2c85129c Mon Sep 17 00:00:00 2001 From: elpresidank Date: Mon, 6 Apr 2026 00:11:29 -0500 Subject: [PATCH] feat: add schema foundation for document pipeline, agent, and deployment Add missing topics (librarian, knowledge, collection-management, flow), pipeline message types (TextDocument, Chunk, Triples, EntityContexts), service message types (Librarian, Knowledge, Collection, Flow CRUD), and update AgentResponse for streaming chunk format. Add RequestResponseSpec enabling flow-scoped request/response calls (needed by knowledge extraction and agent services). Add requestor registry to Flow class with proper lifecycle management. Add end_of_dialog to gateway's isComplete() check for agent streaming. Co-Authored-By: Claude Opus 4.6 (1M context) --- ts/packages/base/src/processor/flow.ts | 15 ++ ts/packages/base/src/schema/messages.ts | 168 +++++++++++++++++- ts/packages/base/src/schema/topics.ts | 16 ++ ts/packages/base/src/spec/index.ts | 1 + .../base/src/spec/request-response-spec.ts | 36 ++++ .../flow/src/gateway/dispatch/manager.ts | 1 + 6 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 ts/packages/base/src/spec/request-response-spec.ts diff --git a/ts/packages/base/src/processor/flow.ts b/ts/packages/base/src/processor/flow.ts index 4c872700..230885c1 100644 --- a/ts/packages/base/src/processor/flow.ts +++ b/ts/packages/base/src/processor/flow.ts @@ -8,6 +8,7 @@ import type { PubSubBackend } from "../backend/types.js"; import type { Spec } from "../spec/types.js"; import type { Producer } from "../messaging/producer.js"; import type { Consumer } from "../messaging/consumer.js"; +import type { RequestResponse } from "../messaging/request-response.js"; export interface FlowDefinition { /** Topic overrides keyed by spec name */ @@ -19,6 +20,7 @@ export interface FlowDefinition { export class Flow { private producers = new Map>(); private consumers = new Map>(); + private requestors = new Map>(); private parameters = new Map(); constructor( @@ -49,6 +51,9 @@ export class Flow { for (const producer of this.producers.values()) { await producer.stop(); } + for (const rr of this.requestors.values()) { + await rr.stop(); + } } registerProducer(name: string, producer: Producer): void { @@ -59,6 +64,10 @@ export class Flow { this.consumers.set(name, consumer); } + registerRequestor(name: string, rr: RequestResponse): void { + this.requestors.set(name, rr); + } + setParameter(name: string, value: unknown): void { this.parameters.set(name, value); } @@ -75,6 +84,12 @@ export class Flow { return c as Consumer; } + requestor(name: string): RequestResponse { + const rr = this.requestors.get(name); + if (!rr) throw new Error(`Requestor "${name}" not found in flow "${this.name}"`); + return rr as RequestResponse; + } + parameter(name: string): T { const v = this.parameters.get(name); if (v === undefined) throw new Error(`Parameter "${name}" not found in flow "${this.name}"`); diff --git a/ts/packages/base/src/schema/messages.ts b/ts/packages/base/src/schema/messages.ts index 269caa98..45b88703 100644 --- a/ts/packages/base/src/schema/messages.ts +++ b/ts/packages/base/src/schema/messages.ts @@ -70,10 +70,18 @@ export interface AgentRequest { question: string; collection?: string; streaming?: boolean; + group?: string[]; + state?: string; } export interface AgentResponse { - answer: string; + /** Streaming chunk type */ + chunk_type?: "thought" | "observation" | "answer" | "error"; + content?: string; + end_of_message?: boolean; + end_of_dialog?: boolean; + /** Legacy non-streaming fields */ + answer?: string; error?: TgError; endOfStream?: boolean; endOfSession?: boolean; @@ -133,3 +141,161 @@ export interface PromptResponse { prompt: string; error?: TgError; } + +// ---------- Pipeline types ---------- + +export interface PipelineMetadata { + id: string; + root: string; + user: string; + collection: string; +} + +export interface TextDocument { + metadata: PipelineMetadata; + text: string; + documentId: string; +} + +export interface Chunk { + metadata: PipelineMetadata; + chunk: string; + documentId: string; +} + +export interface EntityContext { + entity: Term; + context: string; + chunkId: string; +} + +export interface EntityContexts { + metadata: PipelineMetadata; + entities: EntityContext[]; +} + +export interface Triples { + metadata: PipelineMetadata; + triples: Triple[]; +} + +// ---------- Document metadata ---------- + +export interface DocumentMetadata { + id: string; + time: number; + kind: string; + title: string; + comments: string; + user: string; + tags: string[]; + parentId?: string; + documentType: string; // "source" | "page" | "chunk" | "extracted" + metadata?: Triple[]; +} + +export interface ProcessingMetadata { + id: string; + documentId: string; + time: number; + flow: string; + user: string; + collection: string; + tags: string[]; +} + +// ---------- Librarian ---------- + +export type LibrarianOperation = + | "add-document" + | "remove-document" + | "list-documents" + | "get-document-metadata" + | "get-document-content" + | "add-child-document" + | "list-children" + | "add-processing" + | "remove-processing" + | "list-processing"; + +export interface LibrarianRequest { + operation: LibrarianOperation; + documentId?: string; + processingId?: string; + documentMetadata?: DocumentMetadata; + processingMetadata?: ProcessingMetadata; + content?: string; // base64 + user?: string; + collection?: string; +} + +export interface LibrarianResponse { + error?: TgError; + documentMetadata?: DocumentMetadata; + content?: string; // base64 + documents?: DocumentMetadata[]; + processing?: ProcessingMetadata[]; +} + +// ---------- Knowledge core ---------- + +export type KnowledgeOperation = + | "list-kg-cores" + | "get-kg-core" + | "delete-kg-core" + | "put-kg-core" + | "load-kg-core"; + +export interface KnowledgeRequest { + operation: KnowledgeOperation; + user?: string; + id?: string; + flow?: string; + collection?: string; + triples?: Triple[]; + graphEmbeddings?: { entity: Term; vectors: number[][] }[]; +} + +export interface KnowledgeResponse { + error?: TgError; + ids?: string[]; + eos?: boolean; + triples?: Triple[]; + graphEmbeddings?: { entity: Term; vectors: number[][] }[]; +} + +// ---------- Collection management ---------- + +export type CollectionOperation = + | "list-collections" + | "update-collection" + | "delete-collection"; + +export interface CollectionManagementRequest { + operation: CollectionOperation; + user?: string; + collection?: string; + name?: string; + description?: string; + tags?: string[]; +} + +export interface CollectionManagementResponse { + error?: TgError; + collections?: { user: string; collection: string; name: string; description: string; tags: string[] }[]; +} + +// ---------- Flow management ---------- + +export type FlowOperation = "list" | "get" | "start" | "stop"; + +export interface FlowRequest { + operation: FlowOperation; + id?: string; + blueprint?: string; +} + +export interface FlowResponse { + error?: TgError; + flows?: { id: string; status: string; blueprint?: string }[]; +} diff --git a/ts/packages/base/src/schema/topics.ts b/ts/packages/base/src/schema/topics.ts index ffc3f1a5..b728367d 100644 --- a/ts/packages/base/src/schema/topics.ts +++ b/ts/packages/base/src/schema/topics.ts @@ -59,4 +59,20 @@ export const topics = { // Prompt promptRequest: topic("prompt-request"), promptResponse: topic("prompt-response"), + + // Librarian (document management) + librarianRequest: topic("librarian-request"), + librarianResponse: topic("librarian-response"), + + // Knowledge core management + knowledgeRequest: topic("knowledge-request"), + knowledgeResponse: topic("knowledge-response"), + + // Collection management + collectionManagementRequest: topic("collection-management-request"), + collectionManagementResponse: topic("collection-management-response"), + + // Flow management + flowRequest: topic("flow-request"), + flowResponse: topic("flow-response"), } as const; diff --git a/ts/packages/base/src/spec/index.ts b/ts/packages/base/src/spec/index.ts index 42435af4..6c24fe39 100644 --- a/ts/packages/base/src/spec/index.ts +++ b/ts/packages/base/src/spec/index.ts @@ -2,3 +2,4 @@ export type { Spec } from "./types.js"; export { ConsumerSpec } from "./consumer-spec.js"; export { ProducerSpec } from "./producer-spec.js"; export { ParameterSpec } from "./parameter-spec.js"; +export { RequestResponseSpec } from "./request-response-spec.js"; diff --git a/ts/packages/base/src/spec/request-response-spec.ts b/ts/packages/base/src/spec/request-response-spec.ts new file mode 100644 index 00000000..be4db8fa --- /dev/null +++ b/ts/packages/base/src/spec/request-response-spec.ts @@ -0,0 +1,36 @@ +/** + * Request/response specification — declares a request/response client for a flow. + * + * Enables FlowProcessor handlers to make request/response calls to other services + * (e.g., calling the prompt service or LLM from within a knowledge extraction handler). + * + * Python reference: trustgraph-base/trustgraph/base/prompt_client_spec.py + */ + +import type { Spec } from "./types.js"; +import type { PubSubBackend } from "../backend/types.js"; +import type { Flow, FlowDefinition } from "../processor/flow.js"; +import { RequestResponse } from "../messaging/request-response.js"; + +export class RequestResponseSpec implements Spec { + constructor( + public readonly name: string, + private readonly requestTopicName: string, + private readonly responseTopicName: string, + ) {} + + async add(flow: Flow, pubsub: PubSubBackend, definition: FlowDefinition): Promise { + const requestTopic = definition.topics?.[this.requestTopicName] ?? this.requestTopicName; + const responseTopic = definition.topics?.[this.responseTopicName] ?? this.responseTopicName; + + const rr = new RequestResponse({ + pubsub, + requestTopic, + responseTopic, + subscription: `${flow.processorId}-${flow.name}-${this.name}`, + }); + await rr.start(); + + flow.registerRequestor(this.name, rr as RequestResponse); + } +} diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 6dbeed5f..575e43f8 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -152,6 +152,7 @@ export class DispatcherManager { !!res.endOfSession || !!res.end_of_stream || !!res.end_of_session || + !!res.end_of_dialog || !!res.eos || // error responses are always final !!res.error