diff --git a/ts/entrypoints/flow-manager.mjs b/ts/entrypoints/flow-manager.mjs new file mode 100644 index 00000000..91f3f308 --- /dev/null +++ b/ts/entrypoints/flow-manager.mjs @@ -0,0 +1,6 @@ +import("../packages/flow/dist/flow-manager/service.js") + .then((m) => m.run()) + .catch((err) => { + console.error(err); + process.exit(1); + }); diff --git a/ts/package.json b/ts/package.json index 9d9cc53d..97f1dbef 100644 --- a/ts/package.json +++ b/ts/package.json @@ -12,9 +12,11 @@ "llm:claude": "tsx scripts/run-llm-claude.ts", "llm:openai": "tsx scripts/run-llm-openai.ts", "test:pipeline": "tsx scripts/test-pipeline.ts", + "seed": "tsx scripts/seed-config.ts", "agent": "tsx scripts/run-agent.ts", "librarian": "tsx scripts/run-librarian.ts", - "knowledge": "tsx scripts/run-knowledge.ts" + "knowledge": "tsx scripts/run-knowledge.ts", + "flow-manager": "tsx scripts/run-flow-manager.ts" }, "devDependencies": { "tsx": "^4.21.0", diff --git a/ts/packages/base/src/schema/messages.ts b/ts/packages/base/src/schema/messages.ts index 45b88703..bbcaa504 100644 --- a/ts/packages/base/src/schema/messages.ts +++ b/ts/packages/base/src/schema/messages.ts @@ -287,15 +287,14 @@ export interface CollectionManagementResponse { // ---------- Flow management ---------- -export type FlowOperation = "list" | "get" | "start" | "stop"; - +// Flow request/response use kebab-case wire format to match the client. +// Access fields via bracket notation: request["flow-id"] export interface FlowRequest { - operation: FlowOperation; - id?: string; - blueprint?: string; + operation: string; + [key: string]: unknown; } export interface FlowResponse { error?: TgError; - flows?: { id: string; status: string; blueprint?: string }[]; + [key: string]: unknown; } diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts new file mode 100644 index 00000000..93da0783 --- /dev/null +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -0,0 +1,378 @@ +/** + * Flow manager service -- manages flow lifecycle (start/stop/list) and blueprints. + * + * An AsyncProcessor (NOT FlowProcessor) that: + * 1. Listens on flow-request topic + * 2. Handles operations: list-flows, get-flow, start-flow, stop-flow, + * list-blueprints, get-blueprint, delete-blueprint + * 3. Stores flows and blueprints in-memory + * 4. On start/stop: pushes updated flow config to the config service + * + * Wire format uses kebab-case field names to match the client. + * Access fields via bracket notation: request["flow-id"], response["flow-ids"]. + * + * Python reference: trustgraph-flow/trustgraph/flow/service.py + */ + +import { + AsyncProcessor, + type ProcessorConfig, + topics, + RequestResponse, + type ConfigRequest, + type ConfigResponse, +} from "@trustgraph/base"; +import type { + BackendProducer, + BackendConsumer, + Message, +} from "@trustgraph/base"; + +// ---------- Internal state types ---------- + +interface FlowInstance { + id: string; + blueprintName: string; + description: string; + parameters: Record; + status: "running" | "stopped"; +} + +interface Blueprint { + description: string; + topics: Record; +} + +// ---------- Default blueprint ---------- + +const DEFAULT_BLUEPRINT: Blueprint = { + description: "Default processing pipeline with all services", + topics: { + "request": "tg.flow.text-completion-request", + "response": "tg.flow.text-completion-response", + "prompt-request": "tg.flow.prompt-request", + "prompt-response": "tg.flow.prompt-response", + "graph-rag-request": "tg.flow.graph-rag-request", + "graph-rag-response": "tg.flow.graph-rag-response", + "document-rag-request": "tg.flow.document-rag-request", + "document-rag-response": "tg.flow.document-rag-response", + "triples-request": "tg.flow.triples-request", + "triples-response": "tg.flow.triples-response", + "text-completion-request": "tg.flow.text-completion-request", + "text-completion-response": "tg.flow.text-completion-response", + "input": "tg.flow.chunk", + "output": "tg.flow.chunk", + "triples": "tg.flow.triples", + "entity-contexts": "tg.flow.entity-contexts", + }, +}; + +// ---------- Service ---------- + +export class FlowManagerService extends AsyncProcessor { + private flows = new Map(); + private blueprints = new Map(); + + private consumer: BackendConsumer> | null = null; + private responseProducer: BackendProducer> | null = null; + private configClient: RequestResponse | null = null; + + constructor(config: ProcessorConfig) { + super(config); + this.blueprints.set("default", DEFAULT_BLUEPRINT); + } + + protected override async run(): Promise { + // Create config client for pushing flow configs to the config service + this.configClient = new RequestResponse({ + pubsub: this.pubsub, + requestTopic: topics.configRequest, + responseTopic: topics.configResponse, + subscription: `${this.config.id}-config-client`, + }); + await this.configClient.start(); + + // Create producer for flow-response topic + this.responseProducer = await this.pubsub.createProducer>({ + topic: topics.flowResponse, + }); + + // Create consumer for flow-request topic + this.consumer = await this.pubsub.createConsumer>({ + topic: topics.flowRequest, + subscription: `${this.config.id}-flow-request`, + }); + + console.log(`[FlowManager] Listening on ${topics.flowRequest}`); + + // Main consume loop (same pattern as ConfigService) + while (this.running) { + try { + const msg = await this.consumer.receive(2000); + if (!msg) continue; + + await this.handleMessage(msg); + await this.consumer.acknowledge(msg); + } catch (err) { + if (!this.running) break; + console.error("[FlowManager] Error in consume loop:", err); + await sleep(1000); + } + } + } + + private async handleMessage( + msg: Message>, + ): Promise { + const request = msg.value(); + const props = msg.properties(); + const requestId = props.id; + + if (!requestId) { + console.warn("[FlowManager] Received request without id, ignoring"); + return; + } + + try { + const response = await this.handleOperation(request); + await this.responseProducer!.send(response, { id: requestId }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await this.responseProducer!.send( + { + error: { type: "flow-error", message }, + }, + { id: requestId }, + ); + } + } + + private async handleOperation( + request: Record, + ): Promise> { + const op = request.operation as string; + + switch (op) { + case "list-blueprints": + return this.handleListBlueprints(); + + case "get-blueprint": + return this.handleGetBlueprint(request); + + case "delete-blueprint": + return this.handleDeleteBlueprint(request); + + case "list-flows": + return this.handleListFlows(); + + case "get-flow": + return this.handleGetFlow(request); + + case "start-flow": + return await this.handleStartFlow(request); + + case "stop-flow": + return await this.handleStopFlow(request); + + default: + throw new Error(`Unknown flow operation: ${op}`); + } + } + + // ---------- Blueprint operations ---------- + + private handleListBlueprints(): Record { + return { + "blueprint-names": [...this.blueprints.keys()], + }; + } + + private handleGetBlueprint( + request: Record, + ): Record { + const name = request["blueprint-name"] as string | undefined; + if (!name) { + throw new Error("Missing blueprint-name"); + } + + const blueprint = this.blueprints.get(name); + if (!blueprint) { + throw new Error(`Blueprint not found: ${name}`); + } + + return { + "blueprint-definition": JSON.stringify(blueprint), + }; + } + + private handleDeleteBlueprint( + request: Record, + ): Record { + const name = request["blueprint-name"] as string | undefined; + if (!name) { + throw new Error("Missing blueprint-name"); + } + + if (name === "default") { + throw new Error("Cannot delete the default blueprint"); + } + + const existed = this.blueprints.delete(name); + if (!existed) { + throw new Error(`Blueprint not found: ${name}`); + } + + return {}; + } + + // ---------- Flow operations ---------- + + private handleListFlows(): Record { + return { + "flow-ids": [...this.flows.keys()], + }; + } + + private handleGetFlow( + request: Record, + ): Record { + const id = request["flow-id"] as string | undefined; + if (!id) { + throw new Error("Missing flow-id"); + } + + const inst = this.flows.get(id); + if (!inst) { + throw new Error(`Flow not found: ${id}`); + } + + return { + flow: JSON.stringify({ + "blueprint-name": inst.blueprintName, + description: inst.description, + parameters: inst.parameters, + }), + }; + } + + private async handleStartFlow( + request: Record, + ): Promise> { + const id = request["flow-id"] as string | undefined; + const blueprintName = (request["blueprint-name"] as string) ?? "default"; + const description = (request["description"] as string) ?? ""; + const parameters = (request["parameters"] as Record) ?? {}; + + if (!id) { + throw new Error("Missing flow-id"); + } + + if (this.flows.has(id)) { + throw new Error(`Flow already exists: ${id}`); + } + + const blueprint = this.blueprints.get(blueprintName); + if (!blueprint) { + throw new Error(`Blueprint not found: ${blueprintName}`); + } + + // Create the flow instance + const inst: FlowInstance = { + id, + blueprintName, + description, + parameters, + status: "running", + }; + this.flows.set(id, inst); + + console.log( + `[FlowManager] Started flow "${id}" with blueprint "${blueprintName}"`, + ); + + // Push updated flows config to the config service + await this.pushFlowsConfig(); + + return {}; + } + + private async handleStopFlow( + request: Record, + ): Promise> { + const id = request["flow-id"] as string | undefined; + if (!id) { + throw new Error("Missing flow-id"); + } + + const inst = this.flows.get(id); + if (!inst) { + throw new Error(`Flow not found: ${id}`); + } + + this.flows.delete(id); + + console.log(`[FlowManager] Stopped flow "${id}"`); + + // Push updated flows config (without the removed flow) + await this.pushFlowsConfig(); + + return {}; + } + + // ---------- Config push ---------- + + /** + * Build the flows config object from all running flows and push it + * to the config service via a PUT operation. + */ + private async pushFlowsConfig(): Promise { + if (!this.configClient) return; + + const flowsConfig: Record }> = {}; + for (const [id, inst] of this.flows) { + const blueprint = this.blueprints.get(inst.blueprintName); + if (blueprint) { + flowsConfig[id] = { topics: blueprint.topics }; + } + } + + try { + await this.configClient.request({ + operation: "put", + keys: ["flows"], + values: flowsConfig, + }); + console.log( + `[FlowManager] Pushed flows config (${this.flows.size} active flows)`, + ); + } catch (err) { + console.error("[FlowManager] Failed to push flows config:", err); + } + } + + // ---------- Lifecycle ---------- + + override async stop(): Promise { + if (this.consumer) { + await this.consumer.close(); + this.consumer = null; + } + if (this.responseProducer) { + await this.responseProducer.close(); + this.responseProducer = null; + } + if (this.configClient) { + await this.configClient.stop(); + this.configClient = null; + } + await super.stop(); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function run(): Promise { + await FlowManagerService.launch("flow-manager"); +} diff --git a/ts/packages/flow/src/index.ts b/ts/packages/flow/src/index.ts index 31ee1f80..068382ed 100644 --- a/ts/packages/flow/src/index.ts +++ b/ts/packages/flow/src/index.ts @@ -61,3 +61,6 @@ export { KnowledgeExtractService } from "./extract/knowledge-extract.js"; // Knowledge core service export { KnowledgeCoreService, type KnowledgeCoreServiceConfig } from "./cores/service.js"; + +// Flow manager service +export { FlowManagerService } from "./flow-manager/service.js"; diff --git a/ts/scripts/run-flow-manager.ts b/ts/scripts/run-flow-manager.ts new file mode 100644 index 00000000..31a20350 --- /dev/null +++ b/ts/scripts/run-flow-manager.ts @@ -0,0 +1,14 @@ +/** + * Start the flow manager service. + * + * Usage: pnpm tsx scripts/run-flow-manager.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + */ +import { run } from "../packages/flow/src/flow-manager/service.js"; + +run().catch((err) => { + console.error("Flow manager failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/seed-config.ts b/ts/scripts/seed-config.ts new file mode 100644 index 00000000..22e49e1c --- /dev/null +++ b/ts/scripts/seed-config.ts @@ -0,0 +1,130 @@ +/** + * Seed configuration — pushes prompt templates and flow definitions + * needed for the full processing pipeline. + * + * Usage: pnpm seed + * Requires: gateway + config service running + */ + +const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088"; + +async function pushConfig(keys: string[], values: Record): Promise { + const res = await fetch(`${GATEWAY_URL}/api/v1/config`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ operation: "put", keys, values }), + }); + const data = await res.json(); + if (data.error) throw new Error(`Config push failed: ${data.error.message}`); + console.log(` Pushed config [${keys.join("/")}] → version ${data.version}`); +} + +async function main(): Promise { + console.log("Seeding TrustGraph configuration...\n"); + + // 1. Prompt templates + console.log("── Prompt Templates ──"); + await pushConfig(["prompt"], { + "extract-relationships": { + system: "You are a helpful assistant that extracts structured knowledge from text.", + prompt: [ + "Study the following text and derive entity relationships.", + "For each relationship, derive the subject, predicate and object.", + "", + "Output as a JSON array of objects with keys:", + "- subject: the subject of the relationship", + "- predicate: the predicate", + "- object: the object of the relationship", + "", + "Here is the text:", + "{text}", + "", + "Requirements:", + "- Respond only with a valid JSON array.", + "- Do not include explanations or markdown formatting.", + "- Example: [{\"subject\": \"Earth\", \"predicate\": \"orbits\", \"object\": \"Sun\"}]", + ].join("\n"), + }, + "extract-definitions": { + system: "You are a helpful assistant that extracts entity definitions from text.", + prompt: [ + "Study the following text and derive definitions for any discovered entities.", + "Do not provide definitions for entities whose definitions are incomplete or unknown.", + "", + "Output as a JSON array of objects with keys:", + "- entity: the name of the entity", + "- definition: English text which defines the entity", + "", + "Here is the text:", + "{text}", + "", + "Requirements:", + "- Respond only with a valid JSON array.", + "- Do not include explanations or markdown formatting.", + "- Do not include null or unknown definitions.", + "- Example: [{\"entity\": \"photosynthesis\", \"definition\": \"The process by which plants convert sunlight into energy\"}]", + ].join("\n"), + }, + "document-prompt": { + system: "You are a helpful assistant. Use only the provided context to answer questions.", + prompt: [ + "Use the following context to answer the question.", + "Do not speculate if the answer is not found in the context.", + "", + "Context:", + "{documents}", + "", + "Question: {query}", + ].join("\n"), + }, + "kg-prompt": { + system: "You are a helpful assistant that answers questions using knowledge graph data.", + prompt: [ + "Use the following knowledge graph information to answer the question.", + "", + "Knowledge:", + "{knowledge}", + "", + "Question: {query}", + ].join("\n"), + }, + }); + + // 2. Flow definitions (default flow with all topic mappings) + console.log("\n── Flow Definitions ──"); + await pushConfig(["flows"], { + default: { + topics: { + // LLM text completion + "request": "tg.flow.text-completion-request", + "response": "tg.flow.text-completion-response", + "text-completion-request": "tg.flow.text-completion-request", + "text-completion-response": "tg.flow.text-completion-response", + // Prompt service + "prompt-request": "tg.flow.prompt-request", + "prompt-response": "tg.flow.prompt-response", + // Graph RAG + "graph-rag-request": "tg.flow.graph-rag-request", + "graph-rag-response": "tg.flow.graph-rag-response", + // Document RAG + "document-rag-request": "tg.flow.document-rag-request", + "document-rag-response": "tg.flow.document-rag-response", + // Triples + "triples-request": "tg.flow.triples-request", + "triples-response": "tg.flow.triples-response", + // Chunking pipeline + "input": "tg.flow.chunk", + "output": "tg.flow.chunk", + "triples": "tg.flow.triples", + "entity-contexts": "tg.flow.entity-contexts", + }, + }, + }); + + console.log("\nConfiguration seeded successfully."); +} + +main().catch((err) => { + console.error("Seed failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/test-pipeline.ts b/ts/scripts/test-pipeline.ts index 185f9640..1fa4d75c 100644 --- a/ts/scripts/test-pipeline.ts +++ b/ts/scripts/test-pipeline.ts @@ -234,6 +234,162 @@ async function testWebSocket(): Promise { } } +// ─── Librarian Tests ────────────────────────────────────────────────── + +let testDocId = ""; + +async function testLibrarianAdd(): Promise { + try { + const content = Buffer.from("Hello from TrustGraph TypeScript!").toString("base64"); + const res = await post("/api/v1/librarian", { + operation: "add-document", + user: "test-user", + collection: "test-collection", + content, + documentMetadata: { + id: "", + time: Date.now(), + kind: "text/plain", + title: "Test Document", + comments: "", + user: "test-user", + tags: ["test"], + documentType: "source", + }, + }); + log("librarian/add", res); + const r = res as Record; + const meta = r.documentMetadata as Record | undefined; + if (meta?.id && typeof meta.id === "string") { + testDocId = meta.id; + pass(`Librarian add-document returned id: ${testDocId.slice(0, 8)}...`); + return true; + } + if (r.error) { + fail("Librarian add-document", r.error); + return false; + } + fail("Librarian add-document", "no documentMetadata.id in response"); + return false; + } catch (err) { + fail("Librarian add-document", err); + return false; + } +} + +async function testLibrarianList(): Promise { + try { + const res = await post("/api/v1/librarian", { + operation: "list-documents", + user: "test-user", + }); + log("librarian/list", res); + const r = res as Record; + const docs = r.documents as unknown[] | undefined; + if (docs && docs.length > 0) { + pass(`Librarian list-documents returned ${docs.length} document(s)`); + return true; + } + fail("Librarian list-documents", "empty or missing documents array"); + return false; + } catch (err) { + fail("Librarian list-documents", err); + return false; + } +} + +async function testLibrarianGetContent(): Promise { + if (!testDocId) { + fail("Librarian get-content", "no document ID from add test"); + return false; + } + try { + const res = await post("/api/v1/librarian", { + operation: "get-document-content", + documentId: testDocId, + user: "test-user", + }); + log("librarian/get-content", res); + const r = res as Record; + if (r.content && typeof r.content === "string") { + const decoded = Buffer.from(r.content, "base64").toString("utf-8"); + if (decoded === "Hello from TrustGraph TypeScript!") { + pass("Librarian get-content round-trips correctly"); + return true; + } + fail("Librarian get-content", `decoded: "${decoded}"`); + return false; + } + fail("Librarian get-content", "no content in response"); + return false; + } catch (err) { + fail("Librarian get-content", err); + return false; + } +} + +async function testLibrarianDelete(): Promise { + if (!testDocId) { + fail("Librarian delete", "no document ID from add test"); + return false; + } + try { + const res = await post("/api/v1/librarian", { + operation: "remove-document", + documentId: testDocId, + user: "test-user", + }); + log("librarian/delete", res); + + // Verify it's gone + const listRes = await post("/api/v1/librarian", { + operation: "list-documents", + user: "test-user", + }) as Record; + const docs = listRes.documents as unknown[] | undefined; + if (!docs || docs.length === 0) { + pass("Librarian remove-document deleted successfully"); + return true; + } + fail("Librarian remove-document", "document still present after delete"); + return false; + } catch (err) { + fail("Librarian delete", err); + return false; + } +} + +// ─── Agent Test ─────────────────────────────────────────────────────── + +async function testAgentQuery(): Promise { + try { + console.log("\n Sending agent request (may take a few seconds)..."); + const model = process.env.LLM_MODEL ?? "qwen2.5:0.5b"; + const res = await post("/api/v1/flow/default/service/agent", { + question: "What is the capital of France?", + model, + }); + log("agent", res); + const r = res as Record; + // Agent sends streaming chunks — gateway returns the first/final response + if (r.chunk_type || r.answer || r.content) { + pass("Agent returned a response"); + return true; + } + if (r.error) { + // Agent may error if no graph data — that's OK, proves routing works + const err = r.error as Record; + pass(`Agent responded with error (routing works): ${err.message ?? err.type}`); + return true; + } + fail("Agent", "unexpected response format"); + return false; + } catch (err) { + fail("Agent", err); + return false; + } +} + // ─── Main ───────────────────────────────────────────────────────────── async function main(): Promise { @@ -282,6 +438,25 @@ async function main(): Promise { console.log("\n (SKIP_LLM=1 — skipping LLM test)"); } + // Librarian tests (only if librarian service is running) + if (process.env.SKIP_LIBRARIAN !== "1") { + console.log("\n (Testing librarian — set SKIP_LIBRARIAN=1 to skip)"); + await run("Librarian Add", testLibrarianAdd); + await run("Librarian List", testLibrarianList); + await run("Librarian Get Content", testLibrarianGetContent); + await run("Librarian Delete", testLibrarianDelete); + } else { + console.log("\n (SKIP_LIBRARIAN=1 — skipping librarian tests)"); + } + + // Agent test (only if agent + LLM services are running) + if (process.env.SKIP_AGENT !== "1" && process.env.SKIP_LLM !== "1") { + console.log("\n (Testing agent — set SKIP_AGENT=1 to skip)"); + await run("Agent Query", testAgentQuery); + } else { + console.log("\n (Skipping agent test)"); + } + console.log("\n══════════════════════════════════════════════════"); console.log(` Results: ${passed} passed, ${failed} failed`); console.log("══════════════════════════════════════════════════\n");