From f09ef4de454c2ea996e97647c2ad2d3d2a80dde1 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Mon, 6 Apr 2026 00:19:37 -0500 Subject: [PATCH] feat: add document pipeline, ReAct agent, and knowledge core services Document Pipeline (Team A): - LibrarianService: document storage with filesystem backend, metadata persistence, child document hierarchy, collection management - ChunkingService: recursive character text splitter with configurable chunk size/overlap, FlowProcessor pattern - KnowledgeExtractService: combined relationship + definition extraction using prompt service and LLM, emits RDF triples and entity contexts - KnowledgeCoreService: knowledge core CRUD with streaming export and flow-based loading ReAct Agent (Team B): - StreamingReActParser: state machine for parsing LLM output into Thought/Action/ActionInput/FinalAnswer sections - Three MVP tools: KnowledgeQuery (GraphRAG), DocumentQuery (DocRAG), TriplesQuery with RequestResponse clients - AgentService FlowProcessor with ReAct loop, tool execution, and streaming chunk responses (thought/observation/answer) Co-Authored-By: Claude Opus 4.6 (1M context) --- ts/package.json | 10 +- ts/packages/flow/src/agent/react/index.ts | 19 + ts/packages/flow/src/agent/react/parser.ts | 130 +++++ ts/packages/flow/src/agent/react/prompt.ts | 50 ++ ts/packages/flow/src/agent/react/service.ts | 306 +++++++++++ ts/packages/flow/src/agent/react/tools.ts | 199 +++++++ ts/packages/flow/src/agent/react/types.ts | 33 ++ .../flow/src/chunking/recursive-splitter.ts | 106 ++++ ts/packages/flow/src/chunking/service.ts | 94 ++++ ts/packages/flow/src/cores/service.ts | 293 ++++++++++ .../flow/src/extract/knowledge-extract.ts | 269 ++++++++++ .../flow/src/gateway/dispatch/serialize.ts | 2 + ts/packages/flow/src/index.ts | 17 + .../flow/src/librarian/collection-manager.ts | 73 +++ ts/packages/flow/src/librarian/service.ts | 502 ++++++++++++++++++ ts/scripts/run-agent.ts | 14 + ts/scripts/run-knowledge.ts | 15 + ts/scripts/run-librarian.ts | 15 + 18 files changed, 2145 insertions(+), 2 deletions(-) create mode 100644 ts/packages/flow/src/agent/react/index.ts create mode 100644 ts/packages/flow/src/agent/react/parser.ts create mode 100644 ts/packages/flow/src/agent/react/prompt.ts create mode 100644 ts/packages/flow/src/agent/react/service.ts create mode 100644 ts/packages/flow/src/agent/react/tools.ts create mode 100644 ts/packages/flow/src/agent/react/types.ts create mode 100644 ts/packages/flow/src/chunking/recursive-splitter.ts create mode 100644 ts/packages/flow/src/chunking/service.ts create mode 100644 ts/packages/flow/src/cores/service.ts create mode 100644 ts/packages/flow/src/extract/knowledge-extract.ts create mode 100644 ts/packages/flow/src/librarian/collection-manager.ts create mode 100644 ts/packages/flow/src/librarian/service.ts create mode 100644 ts/scripts/run-agent.ts create mode 100644 ts/scripts/run-knowledge.ts create mode 100644 ts/scripts/run-librarian.ts diff --git a/ts/package.json b/ts/package.json index 99c02db2..9d9cc53d 100644 --- a/ts/package.json +++ b/ts/package.json @@ -11,12 +11,18 @@ "config-svc": "tsx scripts/run-config.ts", "llm:claude": "tsx scripts/run-llm-claude.ts", "llm:openai": "tsx scripts/run-llm-openai.ts", - "test:pipeline": "tsx scripts/test-pipeline.ts" + "test:pipeline": "tsx scripts/test-pipeline.ts", + "agent": "tsx scripts/run-agent.ts", + "librarian": "tsx scripts/run-librarian.ts", + "knowledge": "tsx scripts/run-knowledge.ts" }, "devDependencies": { "tsx": "^4.21.0", "turbo": "^2.5.0", "typescript": "^5.8.0" }, - "packageManager": "pnpm@9.15.0" + "packageManager": "pnpm@9.15.0", + "workspaces": [ + "packages/*" + ] } diff --git a/ts/packages/flow/src/agent/react/index.ts b/ts/packages/flow/src/agent/react/index.ts new file mode 100644 index 00000000..90eab368 --- /dev/null +++ b/ts/packages/flow/src/agent/react/index.ts @@ -0,0 +1,19 @@ +// ReAct agent -- barrel exports + +export { AgentService } from "./service.js"; +export { StreamingReActParser } from "./parser.js"; +export { buildReActPrompt } from "./prompt.js"; +export { + createKnowledgeQueryTool, + createDocumentQueryTool, + createTriplesQueryTool, +} from "./tools.js"; +export type { + AgentTool, + ToolArg, + ReActState, + ParsedEvent, + OnThought, + OnObservation, + OnAnswer, +} from "./types.js"; diff --git a/ts/packages/flow/src/agent/react/parser.ts b/ts/packages/flow/src/agent/react/parser.ts new file mode 100644 index 00000000..572f11ad --- /dev/null +++ b/ts/packages/flow/src/agent/react/parser.ts @@ -0,0 +1,130 @@ +/** + * Streaming ReAct parser -- state machine that processes LLM output one chunk at a time. + * + * Detects these markers in the LLM output: + * - "Thought:" -> emit thought content + * - "Action:" -> emit action name (tool name) + * - "Action Input:" -> emit action input (JSON args) + * - "Final Answer:" -> emit final answer content + * + * Handles markers split across chunks by buffering lines. + */ + +import type { ReActState } from "./types.js"; + +const MARKERS = [ + { prefix: "Thought:", state: "thought" as ReActState }, + { prefix: "Action Input:", state: "action_input" as ReActState }, + { prefix: "Action:", state: "action" as ReActState }, + { prefix: "Final Answer:", state: "final_answer" as ReActState }, +]; + +// Longest marker prefix for partial-match detection +const MAX_MARKER_LEN = Math.max(...MARKERS.map((m) => m.prefix.length)); + +export class StreamingReActParser { + private state: ReActState = "initial"; + private buffer = ""; + + constructor( + private onThought: (text: string) => void, + private onAction: (name: string) => void, + private onActionInput: (input: string) => void, + private onFinalAnswer: (text: string) => void, + ) {} + + /** + * Feed a chunk of LLM output text into the parser. + * Accumulates in a buffer and processes complete lines. + */ + feed(text: string): void { + this.buffer += text; + this.processBuffer(false); + } + + /** + * Flush any remaining buffered content at the end of output. + */ + flush(): void { + this.processBuffer(true); + // Emit any remaining buffer content in the current state + if (this.buffer.trim().length > 0) { + this.emitContent(this.buffer); + this.buffer = ""; + } + } + + private processBuffer(isFinal: boolean): void { + // Process complete lines (terminated by newline) + while (true) { + const newlineIdx = this.buffer.indexOf("\n"); + if (newlineIdx === -1) { + // No complete line yet. + // If not final, check for partial marker match at the end and wait. + if (!isFinal) { + // If the remaining buffer could be the start of a marker, wait for more input. + const trimmed = this.buffer.trimStart(); + if (trimmed.length > 0 && trimmed.length < MAX_MARKER_LEN) { + const couldBeMarker = MARKERS.some((m) => + m.prefix.startsWith(trimmed), + ); + if (couldBeMarker) { + // Wait for more input before deciding + return; + } + } + } + break; + } + + const line = this.buffer.slice(0, newlineIdx); + this.buffer = this.buffer.slice(newlineIdx + 1); + this.processLine(line); + } + } + + private processLine(line: string): void { + const trimmed = line.trimStart(); + + // Check if this line starts a new section + for (const marker of MARKERS) { + if (trimmed.startsWith(marker.prefix)) { + const content = trimmed.slice(marker.prefix.length).trim(); + this.state = marker.state; + this.emitContent(content); + return; + } + } + + // Otherwise, this is continuation content for the current state + if (trimmed.length > 0) { + this.emitContent(trimmed); + } + } + + private emitContent(content: string): void { + if (content.length === 0) return; + + switch (this.state) { + case "thought": + this.onThought(content); + break; + case "action": + this.onAction(content); + break; + case "action_input": + this.onActionInput(content); + break; + case "final_answer": + this.onFinalAnswer(content); + break; + case "initial": + // Content before any marker -- treat as thought + this.state = "thought"; + this.onThought(content); + break; + case "complete": + break; + } + } +} diff --git a/ts/packages/flow/src/agent/react/prompt.ts b/ts/packages/flow/src/agent/react/prompt.ts new file mode 100644 index 00000000..b09c14dc --- /dev/null +++ b/ts/packages/flow/src/agent/react/prompt.ts @@ -0,0 +1,50 @@ +/** + * Build the ReAct system prompt for the agent. + * + * Formats available tools into the prompt template so the LLM knows what tools + * it can use and what format to follow. + */ + +import type { AgentTool } from "./types.js"; + +export function buildReActPrompt( + tools: AgentTool[], + question: string, +): { system: string; prompt: string } { + const toolDescriptions = tools + .map((t) => { + const argDesc = t.args + .map((a) => ` - ${a.name} (${a.type}): ${a.description}`) + .join("\n"); + return `${t.name}: ${t.description}\n Arguments:\n${argDesc}`; + }) + .join("\n\n"); + + const toolNames = tools.map((t) => t.name).join(", "); + + const system = `You are a helpful AI assistant that answers questions using available tools. + +You have access to the following tools: + +${toolDescriptions} + +Use this exact format for your response: + +Thought: [your reasoning about what to do] +Action: [tool name, one of: ${toolNames}] +Action Input: {"argument_name": "value"} +Observation: [tool result will be inserted here] +... (repeat Thought/Action/Action Input/Observation as needed) +Thought: I now have enough information to answer. +Final Answer: [your comprehensive answer] + +Important: +- Always start with a Thought. +- Action must be one of: ${toolNames} +- Action Input must be valid JSON. +- After receiving an Observation, continue with another Thought. +- When you have enough information, provide a Final Answer. +- Do NOT make up observations. Wait for the tool result.`; + + return { system, prompt: question }; +} diff --git a/ts/packages/flow/src/agent/react/service.ts b/ts/packages/flow/src/agent/react/service.ts new file mode 100644 index 00000000..36a821f1 --- /dev/null +++ b/ts/packages/flow/src/agent/react/service.ts @@ -0,0 +1,306 @@ +/** + * ReAct agent service -- a FlowProcessor that implements a streaming ReAct + * (Reasoning + Acting) agent with tool execution. + * + * The agent: + * 1. Receives an AgentRequest (a user question) + * 2. Builds a ReAct prompt with available tools + * 3. Iteratively calls the LLM, parses Thought/Action/Action Input/Final Answer + * 4. Executes tools and feeds observations back to the LLM + * 5. Sends streaming AgentResponse chunks (thought, observation, answer, error) + * + * Python reference: trustgraph-flow/trustgraph/agent/react/service.py + */ + +import { + FlowProcessor, + ConsumerSpec, + ProducerSpec, + RequestResponseSpec, + type ProcessorConfig, + type FlowContext, + type AgentRequest, + type AgentResponse, + type TextCompletionRequest, + type TextCompletionResponse, + type GraphRagRequest, + type GraphRagResponse, + type DocumentRagRequest, + type DocumentRagResponse, + type TriplesQueryRequest, + type TriplesQueryResponse, +} from "@trustgraph/base"; + +import { + createKnowledgeQueryTool, + createDocumentQueryTool, + createTriplesQueryTool, +} from "./tools.js"; +import { buildReActPrompt } from "./prompt.js"; +import type { AgentTool } from "./types.js"; + +const MAX_ITERATIONS = 10; + +export class AgentService extends FlowProcessor { + constructor(config: ProcessorConfig) { + super(config); + + // Consumer: agent requests + this.registerSpecification( + new ConsumerSpec("request", this.onRequest.bind(this)), + ); + + // Producer: agent responses (streaming chunks) + this.registerSpecification(new ProducerSpec("response")); + + // Request-response clients for tool execution + this.registerSpecification( + new RequestResponseSpec( + "llm", + "text-completion-request", + "text-completion-response", + ), + ); + this.registerSpecification( + new RequestResponseSpec( + "graph-rag", + "graph-rag-request", + "graph-rag-response", + ), + ); + this.registerSpecification( + new RequestResponseSpec( + "doc-rag", + "document-rag-request", + "document-rag-response", + ), + ); + this.registerSpecification( + new RequestResponseSpec( + "triples", + "triples-request", + "triples-response", + ), + ); + + console.log("[AgentService] Service initialized"); + } + + private async onRequest( + msg: AgentRequest, + properties: Record, + flowCtx: FlowContext, + ): Promise { + const requestId = properties.id; + if (!requestId) return; + + const responseProducer = flowCtx.flow.producer("response"); + + try { + // Build tools from flow requestors + const tools: AgentTool[] = [ + createKnowledgeQueryTool( + flowCtx.flow.requestor("graph-rag"), + msg.collection, + ), + createDocumentQueryTool( + flowCtx.flow.requestor("doc-rag"), + msg.collection, + ), + createTriplesQueryTool( + flowCtx.flow.requestor("triples"), + msg.collection, + ), + ]; + + // Build the ReAct prompt + const { system, prompt: initialPrompt } = buildReActPrompt( + tools, + msg.question, + ); + + const llmClient = flowCtx.flow.requestor< + TextCompletionRequest, + TextCompletionResponse + >("llm"); + + // Conversation accumulates the full exchange for multi-turn reasoning + let conversation = initialPrompt; + + for (let iteration = 0; iteration < MAX_ITERATIONS; iteration++) { + console.log( + `[AgentService] Iteration ${iteration + 1}/${MAX_ITERATIONS} for request ${requestId}`, + ); + + // Call LLM (non-streaming for MVP) + const llmResponse = await llmClient.request({ + system, + prompt: conversation, + }); + + if (llmResponse.error) { + await responseProducer.send(requestId, { + chunk_type: "error", + content: `LLM error: ${llmResponse.error.message}`, + end_of_dialog: true, + }); + return; + } + + const text = llmResponse.response; + + // Parse the LLM response with simple line-based parsing + const parsed = parseReActResponse(text); + + // Send thought chunk + if (parsed.thought) { + await responseProducer.send(requestId, { + chunk_type: "thought", + content: parsed.thought, + end_of_message: true, + }); + } + + // If we got a final answer, send it and return + if (parsed.finalAnswer) { + await responseProducer.send(requestId, { + chunk_type: "answer", + content: parsed.finalAnswer, + end_of_message: true, + end_of_dialog: true, + }); + return; + } + + // Execute tool if action was specified + if (parsed.action && parsed.actionInput) { + const tool = tools.find((t) => t.name === parsed.action); + let observation: string; + + if (tool) { + try { + observation = await tool.execute(parsed.actionInput); + } catch (err) { + observation = `Error executing tool: ${err instanceof Error ? err.message : String(err)}`; + } + } else { + observation = `Unknown tool: ${parsed.action}. Available tools: ${tools.map((t) => t.name).join(", ")}`; + } + + // Send observation chunk + await responseProducer.send(requestId, { + chunk_type: "observation", + content: observation, + end_of_message: true, + }); + + // Append the full exchange to conversation for the next iteration + conversation += `\n${text}\nObservation: ${observation}\n`; + } else if (!parsed.finalAnswer) { + // LLM didn't produce a valid action or final answer -- nudge it + conversation += `\n${text}\nObservation: You must either use a tool (Action + Action Input) or provide a Final Answer.\n`; + } + } + + // Max iterations reached without a final answer + await responseProducer.send(requestId, { + chunk_type: "error", + content: + "Maximum reasoning iterations reached without a final answer. " + + "The agent was unable to complete the task within the allowed steps.", + end_of_message: true, + end_of_dialog: true, + }); + } catch (err) { + console.error(`[AgentService] Error processing request ${requestId}:`, err); + + await responseProducer.send(requestId, { + chunk_type: "error", + content: `Agent error: ${err instanceof Error ? err.message : String(err)}`, + end_of_message: true, + end_of_dialog: true, + }); + } + } +} + +/** + * Simple line-based parser for ReAct LLM output. + * + * Extracts Thought, Action, Action Input, and Final Answer sections. + * For the MVP this avoids the complexity of the streaming parser -- + * we parse the complete response at once. + */ +function parseReActResponse(text: string): { + thought: string; + action: string; + actionInput: string; + finalAnswer: string; +} { + let thought = ""; + let action = ""; + let actionInput = ""; + let finalAnswer = ""; + + const lines = text.split("\n"); + let currentSection: "thought" | "action" | "action_input" | null = null; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const trimmed = line.trimStart(); + + if (trimmed.startsWith("Final Answer:")) { + // Everything from "Final Answer:" to end of text is the answer + const firstLine = trimmed.slice("Final Answer:".length).trim(); + const remainingLines = lines.slice(i + 1).join("\n").trim(); + finalAnswer = firstLine + (remainingLines ? "\n" + remainingLines : ""); + break; + } else if (trimmed.startsWith("Thought:")) { + currentSection = "thought"; + const content = trimmed.slice("Thought:".length).trim(); + if (content) { + thought += (thought ? "\n" : "") + content; + } + } else if (trimmed.startsWith("Action Input:")) { + currentSection = "action_input"; + const content = trimmed.slice("Action Input:".length).trim(); + if (content) { + actionInput += content; + } + } else if (trimmed.startsWith("Action:")) { + currentSection = "action"; + const content = trimmed.slice("Action:".length).trim(); + if (content) { + action = content; + } + } else if (trimmed.startsWith("Observation:")) { + // Stop processing -- observations are injected by us, not the LLM + currentSection = null; + } else if (trimmed.length > 0 && currentSection) { + // Continuation line for current section + switch (currentSection) { + case "thought": + thought += "\n" + trimmed; + break; + case "action": + // Action should be a single line (tool name), but handle multi-line + action += " " + trimmed; + break; + case "action_input": + actionInput += "\n" + trimmed; + break; + } + } + } + + return { + thought: thought.trim(), + action: action.trim(), + actionInput: actionInput.trim(), + finalAnswer: finalAnswer.trim(), + }; +} + +export async function run(): Promise { + await AgentService.launch("agent"); +} diff --git a/ts/packages/flow/src/agent/react/tools.ts b/ts/packages/flow/src/agent/react/tools.ts new file mode 100644 index 00000000..7ea78485 --- /dev/null +++ b/ts/packages/flow/src/agent/react/tools.ts @@ -0,0 +1,199 @@ +/** + * MVP tools for the ReAct agent. + * + * Each tool wraps a RequestResponse client from the flow, providing the agent + * with access to existing TrustGraph retrieval services. + */ + +import type { + RequestResponse, + GraphRagRequest, + GraphRagResponse, + DocumentRagRequest, + DocumentRagResponse, + TriplesQueryRequest, + TriplesQueryResponse, + Term, +} from "@trustgraph/base"; + +import type { AgentTool } from "./types.js"; + +/** + * Format a Term to a human-readable string. + */ +function termToString(term: Term): string { + switch (term.type) { + case "IRI": + return term.iri; + case "LITERAL": + return term.value; + case "BLANK": + return `_:${term.id}`; + case "TRIPLE": + return `(${termToString(term.triple.s)} ${termToString(term.triple.p)} ${termToString(term.triple.o)})`; + } +} + +/** + * Parse tool input -- accepts either raw JSON or a plain string question. + */ +function parseQuestion(input: string): string { + try { + const parsed = JSON.parse(input) as Record; + if (typeof parsed === "object" && parsed !== null && "question" in parsed) { + return String(parsed.question); + } + // If it's a string JSON value, use it directly + if (typeof parsed === "string") { + return parsed; + } + } catch { + // Not valid JSON -- treat as plain text + } + return input; +} + +/** + * Query the knowledge graph for information about entities and their relationships. + */ +export function createKnowledgeQueryTool( + client: RequestResponse, + collection?: string, +): AgentTool { + return { + name: "KnowledgeQuery", + description: + "Query the knowledge graph for information about entities and their relationships.", + args: [ + { + name: "question", + type: "string", + description: "The question to ask the knowledge graph", + }, + ], + async execute(input: string): Promise { + const question = parseQuestion(input); + const res = await client.request({ query: question, collection }); + if (res.error) return `Error: ${res.error.message}`; + return res.response; + }, + }; +} + +/** + * Search documents for relevant information. + */ +export function createDocumentQueryTool( + client: RequestResponse, + collection?: string, +): AgentTool { + return { + name: "DocumentQuery", + description: + "Search the document library for relevant information using semantic search.", + args: [ + { + name: "question", + type: "string", + description: "The question to search documents for", + }, + ], + async execute(input: string): Promise { + const question = parseQuestion(input); + const res = await client.request({ query: question, collection }); + if (res.error) return `Error: ${res.error.message}`; + return res.response; + }, + }; +} + +/** + * Parse triples query input. Accepts JSON with optional s, p, o fields. + */ +function parseTriplesInput(input: string): { + s?: Term; + p?: Term; + o?: Term; + limit?: number; +} { + try { + const parsed = JSON.parse(input) as Record; + + const toTerm = (val: unknown): Term | undefined => { + if (typeof val === "string") { + return { type: "LITERAL", value: val }; + } + if (typeof val === "object" && val !== null && "type" in val) { + return val as Term; + } + return undefined; + }; + + return { + s: toTerm(parsed.subject ?? parsed.s), + p: toTerm(parsed.predicate ?? parsed.p), + o: toTerm(parsed.object ?? parsed.o), + limit: + typeof parsed.limit === "number" ? parsed.limit : undefined, + }; + } catch { + // If not valid JSON, treat as a subject search + return { + s: { type: "LITERAL", value: input }, + }; + } +} + +/** + * Query for specific triples (subject-predicate-object relationships) in the knowledge graph. + */ +export function createTriplesQueryTool( + client: RequestResponse, + collection?: string, +): AgentTool { + return { + name: "TriplesQuery", + description: + "Query for specific triples (subject-predicate-object relationships) in the knowledge graph. " + + "Provide subject, predicate, and/or object to filter results.", + args: [ + { + name: "subject", + type: "string", + description: "The subject entity to search for (optional)", + }, + { + name: "predicate", + type: "string", + description: "The predicate/relationship to search for (optional)", + }, + { + name: "object", + type: "string", + description: "The object entity to search for (optional)", + }, + ], + async execute(input: string): Promise { + const { s, p, o, limit } = parseTriplesInput(input); + const res = await client.request({ + s, + p, + o, + collection, + limit: limit ?? 20, + }); + + if (res.error) return `Error: ${res.error.message}`; + + if (!res.triples || res.triples.length === 0) { + return "No triples found matching the query."; + } + + const lines = res.triples.map( + (t) => + `(${termToString(t.s)}) -[${termToString(t.p)}]-> (${termToString(t.o)})`, + ); + return lines.join("\n"); + }, + }; +} diff --git a/ts/packages/flow/src/agent/react/types.ts b/ts/packages/flow/src/agent/react/types.ts new file mode 100644 index 00000000..5b6a3a67 --- /dev/null +++ b/ts/packages/flow/src/agent/react/types.ts @@ -0,0 +1,33 @@ +/** + * Types for the ReAct agent service. + */ + +export interface ToolArg { + name: string; + type: string; + description: string; +} + +export interface AgentTool { + name: string; + description: string; + args: ToolArg[]; + execute: (input: string) => Promise; +} + +export type ReActState = + | "initial" + | "thought" + | "action" + | "action_input" + | "final_answer" + | "complete"; + +export interface ParsedEvent { + type: "thought" | "action" | "action_input" | "final_answer"; + content: string; +} + +export type OnThought = (text: string, isFinal: boolean) => Promise; +export type OnObservation = (text: string, isFinal: boolean) => Promise; +export type OnAnswer = (text: string) => Promise; diff --git a/ts/packages/flow/src/chunking/recursive-splitter.ts b/ts/packages/flow/src/chunking/recursive-splitter.ts new file mode 100644 index 00000000..c0a997a1 --- /dev/null +++ b/ts/packages/flow/src/chunking/recursive-splitter.ts @@ -0,0 +1,106 @@ +/** + * Recursive character text splitter. + * + * Matches the behaviour of LangChain's RecursiveCharacterTextSplitter: + * 1. Try separators in order: "\n\n", "\n", " ", "" + * 2. Split on the best separator that exists in the text + * 3. Merge small pieces until they approach chunkSize + * 4. Recursively split pieces that exceed chunkSize with the next separator + * 5. Apply overlap: include trailing chunkOverlap chars from the previous chunk + * + * Python reference: trustgraph-flow/trustgraph/chunking/recursive_splitter/service.py + */ + +const DEFAULT_SEPARATORS = ["\n\n", "\n", " ", ""]; + +export function recursiveSplit( + text: string, + chunkSize: number, + chunkOverlap: number, +): string[] { + return splitRecursive(text, chunkSize, chunkOverlap, DEFAULT_SEPARATORS); +} + +function splitRecursive( + text: string, + chunkSize: number, + chunkOverlap: number, + separators: string[], +): string[] { + if (text.length <= chunkSize) { + return text.trim().length > 0 ? [text] : []; + } + + // Find the best separator that exists in the text + let separator = ""; + let remainingSeparators = separators; + + for (let i = 0; i < separators.length; i++) { + const sep = separators[i]; + if (sep === "" || text.includes(sep)) { + separator = sep; + remainingSeparators = separators.slice(i + 1); + break; + } + } + + // Split on the selected separator + const pieces = separator === "" ? [...text] : text.split(separator); + + // Merge small pieces into chunks + const merged = mergePieces(pieces, separator, chunkSize); + + // Recursively split oversized chunks with the next separator + const results: string[] = []; + for (const chunk of merged) { + if (chunk.length > chunkSize && remainingSeparators.length > 0) { + const subChunks = splitRecursive(chunk, chunkSize, chunkOverlap, remainingSeparators); + results.push(...subChunks); + } else if (chunk.trim().length > 0) { + results.push(chunk); + } + } + + // Apply overlap + return applyOverlap(results, chunkOverlap); +} + +function mergePieces( + pieces: string[], + separator: string, + chunkSize: number, +): string[] { + const chunks: string[] = []; + let current = ""; + + for (const piece of pieces) { + const candidate = current.length > 0 ? current + separator + piece : piece; + + if (candidate.length > chunkSize && current.length > 0) { + chunks.push(current); + current = piece; + } else { + current = candidate; + } + } + + if (current.length > 0) { + chunks.push(current); + } + + return chunks; +} + +function applyOverlap(chunks: string[], overlapSize: number): string[] { + if (overlapSize <= 0 || chunks.length <= 1) return chunks; + + const result: string[] = [chunks[0]]; + + for (let i = 1; i < chunks.length; i++) { + const prev = chunks[i - 1]; + const overlapText = prev.slice(Math.max(0, prev.length - overlapSize)); + result.push(overlapText + chunks[i]); + } + + return result; +} diff --git a/ts/packages/flow/src/chunking/service.ts b/ts/packages/flow/src/chunking/service.ts new file mode 100644 index 00000000..46315504 --- /dev/null +++ b/ts/packages/flow/src/chunking/service.ts @@ -0,0 +1,94 @@ +/** + * Chunking service — splits text documents into chunks for downstream processing. + * + * A FlowProcessor that: + * 1. Consumes TextDocument messages + * 2. Splits text using recursive character text splitting + * 3. Emits Chunk messages for each resulting chunk + * + * Python reference: trustgraph-flow/trustgraph/chunking/recursive_splitter/service.py + */ + +import { + FlowProcessor, + ConsumerSpec, + ProducerSpec, + ParameterSpec, + type ProcessorConfig, + type FlowContext, + type TextDocument, + type Chunk, + type Triples, +} from "@trustgraph/base"; +import { recursiveSplit } from "./recursive-splitter.js"; + +const DEFAULT_CHUNK_SIZE = 2000; +const DEFAULT_CHUNK_OVERLAP = 100; + +export class ChunkingService extends FlowProcessor { + constructor(config: ProcessorConfig) { + super(config); + + this.registerSpecification( + new ConsumerSpec("input", this.onMessage.bind(this)), + ); + this.registerSpecification(new ProducerSpec("output")); + this.registerSpecification(new ProducerSpec("triples")); + this.registerSpecification(new ParameterSpec("chunk-size")); + this.registerSpecification(new ParameterSpec("chunk-overlap")); + + console.log("[ChunkingService] Service initialized"); + } + + private async onMessage( + msg: TextDocument, + properties: Record, + flowCtx: FlowContext, + ): Promise { + const requestId = properties.id; + if (!requestId) return; + + let chunkSize: number; + let chunkOverlap: number; + + try { + chunkSize = flowCtx.flow.parameter("chunk-size"); + } catch { + chunkSize = DEFAULT_CHUNK_SIZE; + } + + try { + chunkOverlap = flowCtx.flow.parameter("chunk-overlap"); + } catch { + chunkOverlap = DEFAULT_CHUNK_OVERLAP; + } + + const text = msg.text; + if (!text || text.trim().length === 0) { + console.warn(`[ChunkingService] Empty text received for document ${msg.documentId}`); + return; + } + + const chunks = recursiveSplit(text, chunkSize, chunkOverlap); + + console.log( + `[ChunkingService] Split document ${msg.documentId} into ${chunks.length} chunks (size=${chunkSize}, overlap=${chunkOverlap})`, + ); + + const outputProducer = flowCtx.flow.producer("output"); + + for (const chunkText of chunks) { + const chunk: Chunk = { + metadata: msg.metadata, + chunk: chunkText, + documentId: msg.documentId, + }; + + await outputProducer.send(requestId, chunk); + } + } +} + +export async function run(): Promise { + await ChunkingService.launch("chunking"); +} diff --git a/ts/packages/flow/src/cores/service.ts b/ts/packages/flow/src/cores/service.ts new file mode 100644 index 00000000..f0320b8d --- /dev/null +++ b/ts/packages/flow/src/cores/service.ts @@ -0,0 +1,293 @@ +/** + * Knowledge core service — manages stored knowledge graph cores (triples + embeddings). + * + * An AsyncProcessor (NOT FlowProcessor) that: + * 1. Listens on knowledge-request topic + * 2. Handles CRUD operations for knowledge graph cores + * 3. Each core stores triples and graph embeddings keyed by user:id + * 4. Persists state to JSON + * + * Python reference: trustgraph-flow/trustgraph/knowledge/service/service.py + */ + +import { readFile, writeFile, mkdir } from "node:fs/promises"; +import { dirname, join } from "node:path"; +import { + AsyncProcessor, + type ProcessorConfig, + topics, + type KnowledgeRequest, + type KnowledgeResponse, + type Triple, + type Term, +} from "@trustgraph/base"; +import type { BackendProducer, BackendConsumer, Message } from "@trustgraph/base"; + +export interface KnowledgeCoreServiceConfig extends ProcessorConfig { + dataDir?: string; +} + +interface KnowledgeCore { + triples: Triple[]; + graphEmbeddings: { entity: Term; vectors: number[][] }[]; +} + +export class KnowledgeCoreService extends AsyncProcessor { + /** Keyed by `${user}:${id}` */ + private cores = new Map(); + private readonly persistPath: string; + + private consumer: BackendConsumer | null = null; + private responseProducer: BackendProducer | null = null; + + constructor(config: KnowledgeCoreServiceConfig) { + super(config); + const dataDir = config.dataDir ?? process.env.KNOWLEDGE_DATA_DIR ?? "./data/knowledge"; + this.persistPath = join(dataDir, "knowledge-state.json"); + } + + private coreKey(user: string, id: string): string { + return `${user}:${id}`; + } + + protected override async run(): Promise { + // Load persisted state + await this.loadFromDisk(); + + // Create producer + this.responseProducer = await this.pubsub.createProducer({ + topic: topics.knowledgeResponse, + }); + + // Create consumer + this.consumer = await this.pubsub.createConsumer({ + topic: topics.knowledgeRequest, + subscription: `${this.config.id}-knowledge-request`, + }); + + console.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); + + // Main consume loop + while (this.running) { + try { + const msg = await this.consumer.receive(2000); + if (!msg) continue; + + await this.handleMessage(msg); + await this.consumer.acknowledge(msg); + } catch (err) { + if (!this.running) break; + console.error("[KnowledgeCoreService] Error in consume loop:", err); + await sleep(1000); + } + } + } + + private async handleMessage(msg: Message): Promise { + const request = msg.value(); + const props = msg.properties(); + const requestId = props.id; + + if (!requestId) { + console.warn("[KnowledgeCoreService] Received request without id, ignoring"); + return; + } + + try { + await this.handleOperation(request, requestId); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await this.responseProducer!.send( + { error: { type: "knowledge-error", message } }, + { id: requestId }, + ); + } + } + + private async handleOperation(request: KnowledgeRequest, requestId: string): Promise { + switch (request.operation) { + case "list-kg-cores": + return this.listKgCores(request, requestId); + case "get-kg-core": + return this.getKgCore(request, requestId); + case "delete-kg-core": + return this.deleteKgCore(request, requestId); + case "put-kg-core": + return this.putKgCore(request, requestId); + case "load-kg-core": + return this.loadKgCore(request, requestId); + default: + throw new Error(`Unknown knowledge operation: ${request.operation as string}`); + } + } + + private async listKgCores(request: KnowledgeRequest, requestId: string): Promise { + const user = request.user ?? ""; + const prefix = user ? `${user}:` : ""; + + const ids: string[] = []; + for (const key of this.cores.keys()) { + if (!prefix || key.startsWith(prefix)) { + // Extract the ID portion after the user prefix + const id = key.slice(prefix.length); + ids.push(id); + } + } + + await this.responseProducer!.send({ ids }, { id: requestId }); + } + + private async getKgCore(request: KnowledgeRequest, requestId: string): Promise { + const user = request.user ?? ""; + const coreId = request.id ?? ""; + const key = this.coreKey(user, coreId); + + const core = this.cores.get(key); + if (!core) { + throw new Error(`Knowledge core not found: ${key}`); + } + + // Send triples and embeddings in batches + const BATCH_SIZE = 100; + + // Send triples in batches + for (let i = 0; i < core.triples.length; i += BATCH_SIZE) { + const batch = core.triples.slice(i, i + BATCH_SIZE); + const isLast = i + BATCH_SIZE >= core.triples.length && core.graphEmbeddings.length === 0; + + await this.responseProducer!.send( + { triples: batch, eos: isLast }, + { id: requestId }, + ); + } + + // Send graph embeddings in batches + for (let i = 0; i < core.graphEmbeddings.length; i += BATCH_SIZE) { + const batch = core.graphEmbeddings.slice(i, i + BATCH_SIZE); + const isLast = i + BATCH_SIZE >= core.graphEmbeddings.length; + + await this.responseProducer!.send( + { graphEmbeddings: batch, eos: isLast }, + { id: requestId }, + ); + } + + // If core was empty, send a final eos + if (core.triples.length === 0 && core.graphEmbeddings.length === 0) { + await this.responseProducer!.send({ eos: true }, { id: requestId }); + } + } + + private async deleteKgCore(request: KnowledgeRequest, requestId: string): Promise { + const user = request.user ?? ""; + const coreId = request.id ?? ""; + const key = this.coreKey(user, coreId); + + this.cores.delete(key); + await this.persist(); + + console.log(`[KnowledgeCoreService] Deleted core: ${key}`); + await this.responseProducer!.send({}, { id: requestId }); + } + + private async putKgCore(request: KnowledgeRequest, requestId: string): Promise { + const user = request.user ?? ""; + const coreId = request.id ?? ""; + const key = this.coreKey(user, coreId); + + let core = this.cores.get(key); + if (!core) { + core = { triples: [], graphEmbeddings: [] }; + this.cores.set(key, core); + } + + // Append triples if provided + if (request.triples && request.triples.length > 0) { + core.triples.push(...request.triples); + } + + // Append graph embeddings if provided + if (request.graphEmbeddings && request.graphEmbeddings.length > 0) { + core.graphEmbeddings.push(...request.graphEmbeddings); + } + + await this.persist(); + + console.log( + `[KnowledgeCoreService] Updated core ${key}: triples=${core.triples.length}, embeddings=${core.graphEmbeddings.length}`, + ); + await this.responseProducer!.send({}, { id: requestId }); + } + + private async loadKgCore(request: KnowledgeRequest, requestId: string): Promise { + const user = request.user ?? ""; + const coreId = request.id ?? ""; + const key = this.coreKey(user, coreId); + + const core = this.cores.get(key); + if (!core) { + throw new Error(`Knowledge core not found: ${key}`); + } + + // MVP: just acknowledge. Full implementation would publish triples + // to flow storage topics via the flow config. + console.log( + `[KnowledgeCoreService] Load requested for core ${key} (triples=${core.triples.length}, embeddings=${core.graphEmbeddings.length}) — returning success`, + ); + await this.responseProducer!.send({}, { id: requestId }); + } + + // ---------- Persistence ---------- + + private async persist(): Promise { + try { + // Serialize Map to object + const data: Record = {}; + for (const [key, core] of this.cores) { + data[key] = core; + } + + const json = JSON.stringify(data, null, 2); + await mkdir(dirname(this.persistPath), { recursive: true }); + await writeFile(this.persistPath, json, "utf-8"); + } catch (err) { + console.error("[KnowledgeCoreService] Failed to persist state:", err); + } + } + + private async loadFromDisk(): Promise { + try { + const raw = await readFile(this.persistPath, "utf-8"); + const parsed = JSON.parse(raw) as Record; + + this.cores.clear(); + for (const [key, core] of Object.entries(parsed)) { + this.cores.set(key, core); + } + + console.log(`[KnowledgeCoreService] Loaded persisted state (cores=${this.cores.size})`); + } catch { + console.log("[KnowledgeCoreService] No persisted state found, starting fresh"); + } + } + + override async stop(): Promise { + if (this.consumer) { + await this.consumer.close(); + this.consumer = null; + } + if (this.responseProducer) { + await this.responseProducer.close(); + this.responseProducer = null; + } + await super.stop(); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function run(): Promise { + await KnowledgeCoreService.launch("knowledge-svc"); +} diff --git a/ts/packages/flow/src/extract/knowledge-extract.ts b/ts/packages/flow/src/extract/knowledge-extract.ts new file mode 100644 index 00000000..7eec6643 --- /dev/null +++ b/ts/packages/flow/src/extract/knowledge-extract.ts @@ -0,0 +1,269 @@ +/** + * Knowledge extraction service — extracts relationships and definitions from text chunks. + * + * A FlowProcessor that: + * 1. Consumes Chunk messages + * 2. Uses prompt service + LLM to extract relationships and definitions + * 3. Converts extractions into RDF triples and entity contexts + * 4. Emits Triples and EntityContexts messages + * + * Python reference: trustgraph-flow/trustgraph/extract/knowledge/service.py + */ + +import { + FlowProcessor, + ConsumerSpec, + ProducerSpec, + RequestResponseSpec, + type ProcessorConfig, + type FlowContext, + type Chunk, + type Triples, + type EntityContexts, + type EntityContext, + type PromptRequest, + type PromptResponse, + type TextCompletionRequest, + type TextCompletionResponse, + type Triple, + type Term, +} from "@trustgraph/base"; + +// Well-known RDF/SKOS IRIs +const RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"; +const SKOS_DEFINITION = "http://www.w3.org/2004/02/skos/core#definition"; + +interface ExtractedRelationship { + subject: string; + predicate: string; + object: string; +} + +interface ExtractedDefinition { + entity: string; + definition: string; +} + +export class KnowledgeExtractService extends FlowProcessor { + constructor(config: ProcessorConfig) { + super(config); + + this.registerSpecification( + new ConsumerSpec("input", this.onMessage.bind(this)), + ); + this.registerSpecification(new ProducerSpec("triples")); + this.registerSpecification(new ProducerSpec("entity-contexts")); + + this.registerSpecification( + new RequestResponseSpec( + "prompt-client", + "prompt-request", + "prompt-response", + ), + ); + this.registerSpecification( + new RequestResponseSpec( + "llm-client", + "text-completion-request", + "text-completion-response", + ), + ); + + console.log("[KnowledgeExtract] Service initialized"); + } + + private async onMessage( + msg: Chunk, + properties: Record, + flowCtx: FlowContext, + ): Promise { + const requestId = properties.id; + if (!requestId) return; + + const text = msg.chunk; + if (!text || text.trim().length === 0) return; + + const promptClient = flowCtx.flow.requestor("prompt-client"); + const llmClient = flowCtx.flow.requestor("llm-client"); + const triplesProducer = flowCtx.flow.producer("triples"); + const entityContextsProducer = flowCtx.flow.producer("entity-contexts"); + + const allTriples: Triple[] = []; + const allEntityContexts: EntityContext[] = []; + + // --- Extract relationships --- + try { + const relPrompt = await promptClient.request({ + name: "extract-relationships", + variables: { text }, + }); + + if (!relPrompt.error) { + const relCompletion = await llmClient.request({ + system: relPrompt.system, + prompt: relPrompt.prompt, + }); + + if (!relCompletion.error && relCompletion.response) { + const relationships = parseJsonResponse(relCompletion.response); + + if (relationships) { + for (const rel of relationships) { + if (!rel.subject || !rel.predicate || !rel.object) continue; + + const subjectIri = toEntityIri(rel.subject); + const predicateIri = toEntityIri(rel.predicate); + const objectIri = toEntityIri(rel.object); + + // Main relationship triple + allTriples.push({ s: subjectIri, p: predicateIri, o: objectIri }); + + // rdfs:label triples for each entity + allTriples.push({ + s: subjectIri, + p: iriTerm(RDFS_LABEL), + o: literalTerm(rel.subject), + }); + allTriples.push({ + s: predicateIri, + p: iriTerm(RDFS_LABEL), + o: literalTerm(rel.predicate), + }); + allTriples.push({ + s: objectIri, + p: iriTerm(RDFS_LABEL), + o: literalTerm(rel.object), + }); + + // Entity contexts for subject and object + allEntityContexts.push({ + entity: subjectIri, + context: text, + chunkId: msg.documentId, + }); + allEntityContexts.push({ + entity: objectIri, + context: text, + chunkId: msg.documentId, + }); + } + + console.log(`[KnowledgeExtract] Extracted ${relationships.length} relationships`); + } + } + } + } catch (err) { + console.error("[KnowledgeExtract] Relationship extraction failed:", err); + } + + // --- Extract definitions --- + try { + const defPrompt = await promptClient.request({ + name: "extract-definitions", + variables: { text }, + }); + + if (!defPrompt.error) { + const defCompletion = await llmClient.request({ + system: defPrompt.system, + prompt: defPrompt.prompt, + }); + + if (!defCompletion.error && defCompletion.response) { + const definitions = parseJsonResponse(defCompletion.response); + + if (definitions) { + for (const def of definitions) { + if (!def.entity || !def.definition) continue; + + const entityIri = toEntityIri(def.entity); + + // Definition triple + allTriples.push({ + s: entityIri, + p: iriTerm(SKOS_DEFINITION), + o: literalTerm(def.definition), + }); + + // Label triple + allTriples.push({ + s: entityIri, + p: iriTerm(RDFS_LABEL), + o: literalTerm(def.entity), + }); + + // Entity context + allEntityContexts.push({ + entity: entityIri, + context: text, + chunkId: msg.documentId, + }); + } + + console.log(`[KnowledgeExtract] Extracted ${definitions.length} definitions`); + } + } + } + } catch (err) { + console.error("[KnowledgeExtract] Definition extraction failed:", err); + } + + // --- Emit results --- + if (allTriples.length > 0) { + await triplesProducer.send(requestId, { + metadata: msg.metadata, + triples: allTriples, + }); + } + + if (allEntityContexts.length > 0) { + await entityContextsProducer.send(requestId, { + metadata: msg.metadata, + entities: allEntityContexts, + }); + } + } +} + +// ---------- Helpers ---------- + +function toEntityIri(name: string): Term { + const slug = encodeURIComponent(name.toLowerCase().replace(/\s+/g, "-")); + return { + type: "IRI", + iri: `http://trustgraph.ai/e/${slug}`, + }; +} + +function iriTerm(iri: string): Term { + return { type: "IRI", iri }; +} + +function literalTerm(value: string): Term { + return { type: "LITERAL", value }; +} + +/** + * Parse JSON from LLM output, handling markdown code fences and malformed output. + */ +function parseJsonResponse(raw: string): T | null { + try { + // Strip markdown code fences + let cleaned = raw.trim(); + + // Remove ```json ... ``` or ``` ... ``` + const fenceMatch = cleaned.match(/^```(?:json)?\s*\n?([\s\S]*?)\n?```$/); + if (fenceMatch) { + cleaned = fenceMatch[1].trim(); + } + + return JSON.parse(cleaned) as T; + } catch { + console.warn("[KnowledgeExtract] Failed to parse JSON from LLM response:", raw.slice(0, 200)); + return null; + } +} + +export async function run(): Promise { + await KnowledgeExtractService.launch("knowledge-extract"); +} diff --git a/ts/packages/flow/src/gateway/dispatch/serialize.ts b/ts/packages/flow/src/gateway/dispatch/serialize.ts index fee42e0f..7558f27d 100644 --- a/ts/packages/flow/src/gateway/dispatch/serialize.ts +++ b/ts/packages/flow/src/gateway/dispatch/serialize.ts @@ -229,6 +229,7 @@ function deepInternalToClient(value: unknown): unknown { const TERM_BEARING_REQUEST_SERVICES = new Set([ "triples", "knowledge", + "librarian", ]); /** @@ -238,6 +239,7 @@ const TERM_BEARING_RESPONSE_SERVICES = new Set([ "triples", "graph-embeddings", "knowledge", + "librarian", ]); // ---------- Top-level request / response translators ---------- diff --git a/ts/packages/flow/src/index.ts b/ts/packages/flow/src/index.ts index 34757efa..31ee1f80 100644 --- a/ts/packages/flow/src/index.ts +++ b/ts/packages/flow/src/index.ts @@ -44,3 +44,20 @@ export { PromptTemplateService, type PromptTemplate, type PromptTemplateConfig } // Config service export { ConfigService, type ConfigServiceConfig } from "./config/service.js"; + +// ReAct agent +export { AgentService } from "./agent/react/index.js"; + +// Librarian service +export { LibrarianService, type LibrarianServiceConfig } from "./librarian/service.js"; +export { CollectionManager, type CollectionEntry } from "./librarian/collection-manager.js"; + +// Chunking service +export { recursiveSplit } from "./chunking/recursive-splitter.js"; +export { ChunkingService } from "./chunking/service.js"; + +// Knowledge extraction service +export { KnowledgeExtractService } from "./extract/knowledge-extract.js"; + +// Knowledge core service +export { KnowledgeCoreService, type KnowledgeCoreServiceConfig } from "./cores/service.js"; diff --git a/ts/packages/flow/src/librarian/collection-manager.ts b/ts/packages/flow/src/librarian/collection-manager.ts new file mode 100644 index 00000000..bf7a49bc --- /dev/null +++ b/ts/packages/flow/src/librarian/collection-manager.ts @@ -0,0 +1,73 @@ +/** + * Collection manager — in-memory CRUD for document collections. + * + * Used by LibrarianService to manage collections per-user. + * MVP: purely in-memory, no persistence (state is persisted + * via the parent LibrarianService JSON snapshot). + */ + +export interface CollectionEntry { + user: string; + collection: string; + name: string; + description: string; + tags: string[]; +} + +export class CollectionManager { + /** keyed by `${user}:${collection}` */ + private collections = new Map(); + + private key(user: string, collection: string): string { + return `${user}:${collection}`; + } + + listCollections(user: string): CollectionEntry[] { + const result: CollectionEntry[] = []; + for (const entry of this.collections.values()) { + if (entry.user === user) { + result.push(entry); + } + } + return result; + } + + getCollection(user: string, collection: string): CollectionEntry | undefined { + return this.collections.get(this.key(user, collection)); + } + + updateCollection( + user: string, + collection: string, + name: string, + description: string, + tags: string[], + ): CollectionEntry { + const entry: CollectionEntry = { user, collection, name, description, tags }; + this.collections.set(this.key(user, collection), entry); + return entry; + } + + deleteCollection(user: string, collection: string): boolean { + return this.collections.delete(this.key(user, collection)); + } + + ensureCollectionExists(user: string, collection: string): CollectionEntry { + const existing = this.getCollection(user, collection); + if (existing) return existing; + return this.updateCollection(user, collection, collection, "", []); + } + + /** Serialize to a plain array for JSON persistence. */ + toJSON(): CollectionEntry[] { + return [...this.collections.values()]; + } + + /** Restore from a serialized array. */ + loadFromJSON(entries: CollectionEntry[]): void { + this.collections.clear(); + for (const entry of entries) { + this.collections.set(this.key(entry.user, entry.collection), entry); + } + } +} diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts new file mode 100644 index 00000000..93f52c0c --- /dev/null +++ b/ts/packages/flow/src/librarian/service.ts @@ -0,0 +1,502 @@ +/** + * Librarian service — manages document storage, metadata, and processing records. + * + * An AsyncProcessor (NOT FlowProcessor) that: + * 1. Listens on librarian-request and collection-management-request topics + * 2. Handles CRUD operations for documents, child documents, processing records + * 3. Handles collection management (list, update, delete) + * 4. Stores document files on disk, metadata in-memory (persisted to JSON) + * + * Python reference: trustgraph-flow/trustgraph/librarian/service/service.py + */ + +import { randomUUID } from "node:crypto"; +import { readFile, writeFile, mkdir, unlink } from "node:fs/promises"; +import { dirname, join } from "node:path"; +import { + AsyncProcessor, + type ProcessorConfig, + topics, + type LibrarianRequest, + type LibrarianResponse, + type CollectionManagementRequest, + type CollectionManagementResponse, + type DocumentMetadata, + type ProcessingMetadata, +} from "@trustgraph/base"; +import type { BackendProducer, BackendConsumer, Message } from "@trustgraph/base"; +import { CollectionManager } from "./collection-manager.js"; + +export interface LibrarianServiceConfig extends ProcessorConfig { + dataDir?: string; +} + +export class LibrarianService extends AsyncProcessor { + private documents = new Map(); + private processing = new Map(); + private collectionManager = new CollectionManager(); + private readonly dataDir: string; + private readonly persistPath: string; + + // Librarian topic consumers/producers + private libConsumer: BackendConsumer | null = null; + private libProducer: BackendProducer | null = null; + + // Collection management topic consumers/producers + private colConsumer: BackendConsumer | null = null; + private colProducer: BackendProducer | null = null; + + constructor(config: LibrarianServiceConfig) { + super(config); + this.dataDir = config.dataDir ?? process.env.LIBRARIAN_DATA_DIR ?? "./data/librarian"; + this.persistPath = join(this.dataDir, "librarian-state.json"); + } + + protected override async run(): Promise { + // Ensure directories exist + await mkdir(join(this.dataDir, "docs"), { recursive: true }); + + // Load persisted state + await this.loadFromDisk(); + + // Create producers + this.libProducer = await this.pubsub.createProducer({ + topic: topics.librarianResponse, + }); + this.colProducer = await this.pubsub.createProducer({ + topic: topics.collectionManagementResponse, + }); + + // Create consumers + this.libConsumer = await this.pubsub.createConsumer({ + topic: topics.librarianRequest, + subscription: `${this.config.id}-librarian-request`, + }); + this.colConsumer = await this.pubsub.createConsumer({ + topic: topics.collectionManagementRequest, + subscription: `${this.config.id}-collection-management-request`, + }); + + console.log(`[LibrarianService] Listening on ${topics.librarianRequest} and ${topics.collectionManagementRequest}`); + + // Main consume loop — poll both consumers + while (this.running) { + try { + // Poll librarian requests + const libMsg = await this.libConsumer.receive(500); + if (libMsg) { + await this.handleLibrarianMessage(libMsg); + await this.libConsumer.acknowledge(libMsg); + } + + // Poll collection management requests + const colMsg = await this.colConsumer.receive(500); + if (colMsg) { + await this.handleCollectionMessage(colMsg); + await this.colConsumer.acknowledge(colMsg); + } + } catch (err) { + if (!this.running) break; + console.error("[LibrarianService] Error in consume loop:", err); + await sleep(1000); + } + } + } + + // ---------- Librarian message handling ---------- + + private async handleLibrarianMessage(msg: Message): Promise { + const request = msg.value(); + const props = msg.properties(); + const requestId = props.id; + + if (!requestId) { + console.warn("[LibrarianService] Received request without id, ignoring"); + return; + } + + try { + const response = await this.handleLibrarianOperation(request); + await this.libProducer!.send(response, { id: requestId }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await this.libProducer!.send( + { error: { type: "librarian-error", message } }, + { id: requestId }, + ); + } + } + + private async handleLibrarianOperation(request: LibrarianRequest): Promise { + switch (request.operation) { + case "add-document": + return this.addDocument(request); + case "remove-document": + return this.removeDocument(request); + case "list-documents": + return this.listDocuments(request); + case "get-document-metadata": + return this.getDocumentMetadata(request); + case "get-document-content": + return this.getDocumentContent(request); + case "add-child-document": + return this.addChildDocument(request); + case "list-children": + return this.listChildren(request); + case "add-processing": + return this.addProcessing(request); + case "remove-processing": + return this.removeProcessing(request); + case "list-processing": + return this.listProcessing(request); + default: + throw new Error(`Unknown librarian operation: ${request.operation as string}`); + } + } + + private async addDocument(request: LibrarianRequest): Promise { + const meta = request.documentMetadata; + if (!meta) throw new Error("add-document requires documentMetadata"); + + const id = randomUUID(); + const now = Date.now(); + + const doc: DocumentMetadata = { + ...meta, + id, + time: now, + }; + + this.documents.set(id, doc); + + // Store file content if provided + if (request.content) { + const filePath = join(this.dataDir, "docs", `${id}.bin`); + const buf = Buffer.from(request.content, "base64"); + await writeFile(filePath, buf); + } + + await this.persist(); + console.log(`[LibrarianService] Added document ${id}: ${doc.title}`); + + return { documentMetadata: doc }; + } + + private async removeDocument(request: LibrarianRequest): Promise { + const id = request.documentId; + if (!id) throw new Error("remove-document requires documentId"); + + // Remove the document itself + this.documents.delete(id); + + // Remove the file + try { + await unlink(join(this.dataDir, "docs", `${id}.bin`)); + } catch { + // File may not exist — that's fine + } + + // Cascade: remove children + const childIds = [...this.documents.entries()] + .filter(([, doc]) => doc.parentId === id) + .map(([childId]) => childId); + + for (const childId of childIds) { + this.documents.delete(childId); + try { + await unlink(join(this.dataDir, "docs", `${childId}.bin`)); + } catch { + // ignore + } + } + + // Remove associated processing records + const procIds = [...this.processing.entries()] + .filter(([, proc]) => proc.documentId === id) + .map(([procId]) => procId); + + for (const procId of procIds) { + this.processing.delete(procId); + } + + await this.persist(); + console.log(`[LibrarianService] Removed document ${id} (cascade: ${childIds.length} children, ${procIds.length} processing)`); + + return {}; + } + + private listDocuments(request: LibrarianRequest): LibrarianResponse { + const user = request.user ?? ""; + const docs: DocumentMetadata[] = []; + + for (const doc of this.documents.values()) { + // Filter by user + if (user && doc.user !== user) continue; + // Exclude children (only top-level documents) unless explicitly requested + if (doc.parentId) continue; + docs.push(doc); + } + + return { documents: docs }; + } + + private getDocumentMetadata(request: LibrarianRequest): LibrarianResponse { + const id = request.documentId; + if (!id) throw new Error("get-document-metadata requires documentId"); + + const doc = this.documents.get(id); + if (!doc) throw new Error(`Document not found: ${id}`); + + return { documentMetadata: doc }; + } + + private async getDocumentContent(request: LibrarianRequest): Promise { + const id = request.documentId; + if (!id) throw new Error("get-document-content requires documentId"); + + const doc = this.documents.get(id); + if (!doc) throw new Error(`Document not found: ${id}`); + + try { + const filePath = join(this.dataDir, "docs", `${id}.bin`); + const buf = await readFile(filePath); + const content = buf.toString("base64"); + return { documentMetadata: doc, content }; + } catch { + throw new Error(`Document content not found on disk: ${id}`); + } + } + + private async addChildDocument(request: LibrarianRequest): Promise { + const meta = request.documentMetadata; + if (!meta) throw new Error("add-child-document requires documentMetadata"); + if (!meta.parentId) throw new Error("add-child-document requires parentId in metadata"); + + // Verify parent exists + if (!this.documents.has(meta.parentId)) { + throw new Error(`Parent document not found: ${meta.parentId}`); + } + + const id = randomUUID(); + const now = Date.now(); + + const doc: DocumentMetadata = { + ...meta, + id, + time: now, + }; + + this.documents.set(id, doc); + + // Store file content if provided + if (request.content) { + const filePath = join(this.dataDir, "docs", `${id}.bin`); + const buf = Buffer.from(request.content, "base64"); + await writeFile(filePath, buf); + } + + await this.persist(); + console.log(`[LibrarianService] Added child document ${id} (parent: ${meta.parentId})`); + + return { documentMetadata: doc }; + } + + private listChildren(request: LibrarianRequest): LibrarianResponse { + const parentId = request.documentId; + if (!parentId) throw new Error("list-children requires documentId"); + + const children: DocumentMetadata[] = []; + for (const doc of this.documents.values()) { + if (doc.parentId === parentId) { + children.push(doc); + } + } + + return { documents: children }; + } + + private async addProcessing(request: LibrarianRequest): Promise { + const proc = request.processingMetadata; + if (!proc) throw new Error("add-processing requires processingMetadata"); + + const id = randomUUID(); + const now = Date.now(); + + const record: ProcessingMetadata = { + ...proc, + id, + time: now, + }; + + this.processing.set(id, record); + await this.persist(); + + console.log(`[LibrarianService] Added processing ${id} for document ${proc.documentId}`); + return { processing: [record] }; + } + + private async removeProcessing(request: LibrarianRequest): Promise { + const id = request.processingId; + if (!id) throw new Error("remove-processing requires processingId"); + + this.processing.delete(id); + await this.persist(); + + return {}; + } + + private listProcessing(request: LibrarianRequest): LibrarianResponse { + const documentId = request.documentId; + const records: ProcessingMetadata[] = []; + + for (const proc of this.processing.values()) { + if (documentId && proc.documentId !== documentId) continue; + records.push(proc); + } + + return { processing: records }; + } + + // ---------- Collection management ---------- + + private async handleCollectionMessage(msg: Message): Promise { + const request = msg.value(); + const props = msg.properties(); + const requestId = props.id; + + if (!requestId) { + console.warn("[LibrarianService] Received collection request without id, ignoring"); + return; + } + + try { + const response = this.handleCollectionOperation(request); + await this.colProducer!.send(response, { id: requestId }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + await this.colProducer!.send( + { error: { type: "collection-error", message } }, + { id: requestId }, + ); + } + } + + private handleCollectionOperation(request: CollectionManagementRequest): CollectionManagementResponse { + switch (request.operation) { + case "list-collections": { + const user = request.user ?? ""; + const collections = this.collectionManager.listCollections(user); + return { collections }; + } + + case "update-collection": { + const user = request.user ?? ""; + const collection = request.collection ?? ""; + const name = request.name ?? collection; + const description = request.description ?? ""; + const tags = request.tags ?? []; + + this.collectionManager.updateCollection(user, collection, name, description, tags); + // Persist after mutation + this.persist().catch((err) => console.error("[LibrarianService] Persist failed:", err)); + + const collections = this.collectionManager.listCollections(user); + return { collections }; + } + + case "delete-collection": { + const user = request.user ?? ""; + const collection = request.collection ?? ""; + + this.collectionManager.deleteCollection(user, collection); + this.persist().catch((err) => console.error("[LibrarianService] Persist failed:", err)); + + return {}; + } + + default: + throw new Error(`Unknown collection operation: ${request.operation as string}`); + } + } + + // ---------- Persistence ---------- + + private async persist(): Promise { + try { + const data = { + documents: Object.fromEntries(this.documents), + processing: Object.fromEntries(this.processing), + collections: this.collectionManager.toJSON(), + }; + + const json = JSON.stringify(data, null, 2); + await mkdir(dirname(this.persistPath), { recursive: true }); + await writeFile(this.persistPath, json, "utf-8"); + } catch (err) { + console.error("[LibrarianService] Failed to persist state:", err); + } + } + + private async loadFromDisk(): Promise { + try { + const raw = await readFile(this.persistPath, "utf-8"); + const parsed = JSON.parse(raw) as { + documents?: Record; + processing?: Record; + collections?: Array<{ user: string; collection: string; name: string; description: string; tags: string[] }>; + }; + + this.documents.clear(); + if (parsed.documents) { + for (const [id, doc] of Object.entries(parsed.documents)) { + this.documents.set(id, doc); + } + } + + this.processing.clear(); + if (parsed.processing) { + for (const [id, proc] of Object.entries(parsed.processing)) { + this.processing.set(id, proc); + } + } + + if (parsed.collections) { + this.collectionManager.loadFromJSON(parsed.collections); + } + + console.log( + `[LibrarianService] Loaded persisted state (documents=${this.documents.size}, processing=${this.processing.size})`, + ); + } catch { + console.log("[LibrarianService] No persisted state found, starting fresh"); + } + } + + override async stop(): Promise { + if (this.libConsumer) { + await this.libConsumer.close(); + this.libConsumer = null; + } + if (this.libProducer) { + await this.libProducer.close(); + this.libProducer = null; + } + if (this.colConsumer) { + await this.colConsumer.close(); + this.colConsumer = null; + } + if (this.colProducer) { + await this.colProducer.close(); + this.colProducer = null; + } + await super.stop(); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function run(): Promise { + await LibrarianService.launch("librarian-svc"); +} diff --git a/ts/scripts/run-agent.ts b/ts/scripts/run-agent.ts new file mode 100644 index 00000000..54d5b6a0 --- /dev/null +++ b/ts/scripts/run-agent.ts @@ -0,0 +1,14 @@ +/** + * Start the ReAct agent service. + * + * Usage: pnpm tsx scripts/run-agent.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + */ +import { run } from "../packages/flow/src/agent/react/service.js"; + +run().catch((err) => { + console.error("Agent service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/run-knowledge.ts b/ts/scripts/run-knowledge.ts new file mode 100644 index 00000000..f96b6128 --- /dev/null +++ b/ts/scripts/run-knowledge.ts @@ -0,0 +1,15 @@ +/** + * Start the knowledge core service. + * + * Usage: pnpm tsx scripts/run-knowledge.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + * KNOWLEDGE_DATA_DIR (optional, e.g., ./data/knowledge) + */ +import { run } from "../packages/flow/src/cores/service.js"; + +run().catch((err) => { + console.error("Knowledge core service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/run-librarian.ts b/ts/scripts/run-librarian.ts new file mode 100644 index 00000000..82c5f95e --- /dev/null +++ b/ts/scripts/run-librarian.ts @@ -0,0 +1,15 @@ +/** + * Start the librarian service. + * + * Usage: pnpm tsx scripts/run-librarian.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + * LIBRARIAN_DATA_DIR (optional, e.g., ./data/librarian) + */ +import { run } from "../packages/flow/src/librarian/service.js"; + +run().catch((err) => { + console.error("Librarian service failed:", err); + process.exit(1); +});