diff --git a/ts/package.json b/ts/package.json index efdb93af..de081e85 100644 --- a/ts/package.json +++ b/ts/package.json @@ -17,7 +17,11 @@ "agent": "tsx scripts/run-agent.ts", "librarian": "tsx scripts/run-librarian.ts", "knowledge": "tsx scripts/run-knowledge.ts", - "flow-manager": "tsx scripts/run-flow-manager.ts" + "flow-manager": "tsx scripts/run-flow-manager.ts", + "llm:ollama": "tsx scripts/run-ollama.ts", + "pdf-decoder": "tsx scripts/run-pdf-decoder.ts", + "triples-store": "tsx scripts/run-triples-store.ts", + "graph-embeddings-store": "tsx scripts/run-graph-embeddings-store.ts" }, "devDependencies": { "nats": "^2.29.0", diff --git a/ts/packages/base/src/schema/messages.ts b/ts/packages/base/src/schema/messages.ts index bbcaa504..dbcef0c5 100644 --- a/ts/packages/base/src/schema/messages.ts +++ b/ts/packages/base/src/schema/messages.ts @@ -151,6 +151,12 @@ export interface PipelineMetadata { collection: string; } +/** Document message — triggers the decode pipeline for a librarian document. */ +export interface Document { + metadata: PipelineMetadata; + documentId: string; +} + export interface TextDocument { metadata: PipelineMetadata; text: string; diff --git a/ts/packages/flow/package.json b/ts/packages/flow/package.json index fcdeb62c..dd11bd94 100644 --- a/ts/packages/flow/package.json +++ b/ts/packages/flow/package.json @@ -11,13 +11,15 @@ "test": "vitest run" }, "dependencies": { - "@trustgraph/base": "workspace:*", - "openai": "^4.85.0", "@anthropic-ai/sdk": "^0.39.0", + "@fastify/websocket": "^11.0.0", "@qdrant/js-client-rest": "^1.13.0", + "@trustgraph/base": "workspace:*", "falkordb": "^5.0.0", "fastify": "^5.2.0", - "@fastify/websocket": "^11.0.0" + "ollama": "^0.6.3", + "openai": "^4.85.0", + "pdfjs-dist": "^5.6.205" }, "devDependencies": { "typescript": "^5.8.0", diff --git a/ts/packages/flow/src/chunking/service.ts b/ts/packages/flow/src/chunking/service.ts index 46315504..03b8551e 100644 --- a/ts/packages/flow/src/chunking/service.ts +++ b/ts/packages/flow/src/chunking/service.ts @@ -30,10 +30,10 @@ export class ChunkingService extends FlowProcessor { super(config); this.registerSpecification( - new ConsumerSpec("input", this.onMessage.bind(this)), + new ConsumerSpec("chunk-input", this.onMessage.bind(this)), ); - this.registerSpecification(new ProducerSpec("output")); - this.registerSpecification(new ProducerSpec("triples")); + this.registerSpecification(new ProducerSpec("chunk-output")); + this.registerSpecification(new ProducerSpec("chunk-triples")); this.registerSpecification(new ParameterSpec("chunk-size")); this.registerSpecification(new ParameterSpec("chunk-overlap")); @@ -75,7 +75,7 @@ export class ChunkingService extends FlowProcessor { `[ChunkingService] Split document ${msg.documentId} into ${chunks.length} chunks (size=${chunkSize}, overlap=${chunkOverlap})`, ); - const outputProducer = flowCtx.flow.producer("output"); + const outputProducer = flowCtx.flow.producer("chunk-output"); for (const chunkText of chunks) { const chunk: Chunk = { diff --git a/ts/packages/flow/src/decoding/pdf-decoder.ts b/ts/packages/flow/src/decoding/pdf-decoder.ts new file mode 100644 index 00000000..958b40bf --- /dev/null +++ b/ts/packages/flow/src/decoding/pdf-decoder.ts @@ -0,0 +1,203 @@ +/** + * PDF decoder service — extracts text from PDF documents page by page. + * + * A FlowProcessor that: + * 1. Consumes Document messages (documentId + pipeline metadata) + * 2. Fetches document content from librarian via request/response + * 3. Validates it is a PDF (checks MIME type from librarian metadata) + * 4. Extracts text per page using pdfjs-dist + * 5. Saves each page as a child document in librarian + * 6. Emits TextDocument per page (to chunking pipeline) + * 7. Emits Triples per page (provenance) + * + * Python reference: trustgraph-flow/trustgraph/decoding/pdf/decoder.py + */ + +import { getDocument } from "pdfjs-dist/legacy/build/pdf.mjs"; +import type { TextItem } from "pdfjs-dist/types/src/display/api.js"; +import { + FlowProcessor, + ConsumerSpec, + ProducerSpec, + RequestResponseSpec, + type ProcessorConfig, + type FlowContext, + type Document, + type TextDocument, + type Triples, + type Triple, + type Term, + type LibrarianRequest, + type LibrarianResponse, +} from "@trustgraph/base"; + +export class PdfDecoderService extends FlowProcessor { + constructor(config: ProcessorConfig) { + super(config); + + this.registerSpecification( + new ConsumerSpec("decode-input", this.onMessage.bind(this)), + ); + this.registerSpecification(new ProducerSpec("decode-output")); + this.registerSpecification(new ProducerSpec("decode-triples")); + this.registerSpecification( + new RequestResponseSpec( + "librarian-client", + "librarian-request", + "librarian-response", + ), + ); + + console.log("[PdfDecoder] Service initialized"); + } + + private async onMessage( + msg: Document, + properties: Record, + flowCtx: FlowContext, + ): Promise { + const requestId = properties.id; + if (!requestId) return; + + const { documentId } = msg; + const user = msg.metadata.user; + + const librarian = flowCtx.flow.requestor( + "librarian-client", + ); + + // 1. Fetch document metadata to check MIME type + const metadataResp = await librarian.request({ + operation: "get-document-metadata", + documentId, + user, + }); + + if (metadataResp.error) { + console.error( + `[PdfDecoder] Failed to get metadata for ${documentId}:`, + metadataResp.error.message, + ); + return; + } + + const kind = metadataResp.documentMetadata?.kind; + if (kind !== "application/pdf") { + console.log( + `[PdfDecoder] Skipping document ${documentId}: kind=${kind} (not PDF)`, + ); + return; + } + + // 2. Fetch document content + const contentResp = await librarian.request({ + operation: "get-document-content", + documentId, + user, + }); + + if (contentResp.error || !contentResp.content) { + console.error( + `[PdfDecoder] Failed to get content for ${documentId}:`, + contentResp.error?.message ?? "no content", + ); + return; + } + + // 3. Decode base64 content and extract text per page + const pdfBuffer = Buffer.from(contentResp.content, "base64"); + const pdf = await getDocument({ data: new Uint8Array(pdfBuffer) }).promise; + + console.log( + `[PdfDecoder] Document ${documentId}: ${pdf.numPages} pages`, + ); + + const outputProducer = flowCtx.flow.producer("decode-output"); + const triplesProducer = flowCtx.flow.producer("decode-triples"); + + for (let i = 1; i <= pdf.numPages; i++) { + const page = await pdf.getPage(i); + const textContent = await page.getTextContent(); + const pageText = textContent.items + .filter((item): item is TextItem => "str" in item) + .map((item) => item.str) + .join(" "); + + if (!pageText.trim()) { + console.log( + `[PdfDecoder] Skipping empty page ${i} of document ${documentId}`, + ); + continue; + } + + // 4. Save as child document in librarian + const childResp = await librarian.request({ + operation: "add-child-document", + documentMetadata: { + id: "", + user, + kind: "text/plain", + title: `Page ${i}`, + parentId: documentId, + documentType: "page", + time: Date.now(), + comments: "", + tags: [], + }, + content: Buffer.from(pageText).toString("base64"), + }); + + if (childResp.error) { + console.error( + `[PdfDecoder] Failed to save page ${i} of ${documentId}:`, + childResp.error.message, + ); + continue; + } + + const childDocId = childResp.documentMetadata?.id ?? ""; + + // 5. Emit TextDocument for the chunking pipeline + await outputProducer.send(requestId, { + metadata: msg.metadata, + text: pageText, + documentId: childDocId, + }); + + // 6. Emit provenance triples + const triples: Triple[] = [ + { + s: iriTerm(`urn:tg:page:${childDocId}`), + p: iriTerm("http://www.w3.org/ns/prov#wasDerivedFrom"), + o: iriTerm(`urn:tg:doc:${documentId}`), + }, + { + s: iriTerm(`urn:tg:page:${childDocId}`), + p: iriTerm("http://www.w3.org/2000/01/rdf-schema#label"), + o: literalTerm(`Page ${i}`), + }, + ]; + + await triplesProducer.send(requestId, { + metadata: msg.metadata, + triples, + }); + } + + console.log( + `[PdfDecoder] Finished processing document ${documentId}`, + ); + } +} + +function iriTerm(iri: string): Term { + return { type: "IRI", iri }; +} + +function literalTerm(value: string): Term { + return { type: "LITERAL", value }; +} + +export async function run(): Promise { + await PdfDecoderService.launch("pdf-decoder"); +} diff --git a/ts/packages/flow/src/extract/knowledge-extract.ts b/ts/packages/flow/src/extract/knowledge-extract.ts index 7eec6643..65c1ecf2 100644 --- a/ts/packages/flow/src/extract/knowledge-extract.ts +++ b/ts/packages/flow/src/extract/knowledge-extract.ts @@ -49,10 +49,10 @@ export class KnowledgeExtractService extends FlowProcessor { super(config); this.registerSpecification( - new ConsumerSpec("input", this.onMessage.bind(this)), + new ConsumerSpec("extract-input", this.onMessage.bind(this)), ); - this.registerSpecification(new ProducerSpec("triples")); - this.registerSpecification(new ProducerSpec("entity-contexts")); + this.registerSpecification(new ProducerSpec("extract-triples")); + this.registerSpecification(new ProducerSpec("extract-entity-contexts")); this.registerSpecification( new RequestResponseSpec( @@ -85,8 +85,8 @@ export class KnowledgeExtractService extends FlowProcessor { const promptClient = flowCtx.flow.requestor("prompt-client"); const llmClient = flowCtx.flow.requestor("llm-client"); - const triplesProducer = flowCtx.flow.producer("triples"); - const entityContextsProducer = flowCtx.flow.producer("entity-contexts"); + const triplesProducer = flowCtx.flow.producer("extract-triples"); + const entityContextsProducer = flowCtx.flow.producer("extract-entity-contexts"); const allTriples: Triple[] = []; const allEntityContexts: EntityContext[] = []; diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 93da0783..7354ffe2 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -48,22 +48,43 @@ interface Blueprint { const DEFAULT_BLUEPRINT: Blueprint = { description: "Default processing pipeline with all services", topics: { - "request": "tg.flow.text-completion-request", - "response": "tg.flow.text-completion-response", - "prompt-request": "tg.flow.prompt-request", - "prompt-response": "tg.flow.prompt-response", - "graph-rag-request": "tg.flow.graph-rag-request", - "graph-rag-response": "tg.flow.graph-rag-response", - "document-rag-request": "tg.flow.document-rag-request", - "document-rag-response": "tg.flow.document-rag-response", - "triples-request": "tg.flow.triples-request", - "triples-response": "tg.flow.triples-response", + // Document processing pipeline + "decode-input": "tg.flow.document", + "decode-output": "tg.flow.text-document", + "decode-triples": "tg.flow.triples", + "chunk-input": "tg.flow.text-document", + "chunk-output": "tg.flow.chunk", + "chunk-triples": "tg.flow.triples", + "extract-input": "tg.flow.chunk", + "extract-triples": "tg.flow.triples", + "extract-entity-contexts": "tg.flow.entity-contexts", + // Storage consumers + "store-triples-input": "tg.flow.triples", + "store-graph-embeddings-input": "tg.flow.entity-contexts", + // LLM text completion "text-completion-request": "tg.flow.text-completion-request", "text-completion-response": "tg.flow.text-completion-response", - "input": "tg.flow.chunk", - "output": "tg.flow.chunk", - "triples": "tg.flow.triples", - "entity-contexts": "tg.flow.entity-contexts", + // Prompt service + "prompt-request": "tg.flow.prompt-request", + "prompt-response": "tg.flow.prompt-response", + // Graph RAG + "graph-rag-request": "tg.flow.graph-rag-request", + "graph-rag-response": "tg.flow.graph-rag-response", + // Document RAG + "document-rag-request": "tg.flow.document-rag-request", + "document-rag-response": "tg.flow.document-rag-response", + // Triples query + "triples-request": "tg.flow.triples-request", + "triples-response": "tg.flow.triples-response", + // Agent + "agent-request": "tg.flow.agent-request", + "agent-response": "tg.flow.agent-response", + // Embeddings + "embeddings-request": "tg.flow.embeddings-request", + "embeddings-response": "tg.flow.embeddings-response", + // Librarian RPC (for PDF decoder) + "librarian-request": "tg.flow.librarian-request", + "librarian-response": "tg.flow.librarian-response", }, }; diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index 575e43f8..35d00b22 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -235,6 +235,18 @@ export class DispatcherManager { }); } + // ---------- Fire-and-forget publish ---------- + + /** + * Publish a single message to an arbitrary topic (no request/response). + * Used for injecting documents into the processing pipeline. + */ + async publishToTopic(topic: string, message: unknown): Promise { + const producer = await this.pubsub.createProducer({ topic }); + await producer.send(message); + await producer.close(); + } + // ---------- Static introspection ---------- static get flowServiceNames(): readonly string[] { diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts index 96a8dff3..1b02a5a5 100644 --- a/ts/packages/flow/src/gateway/server.ts +++ b/ts/packages/flow/src/gateway/server.ts @@ -70,6 +70,48 @@ export async function createGateway(config: GatewayConfig) { }, ); + // REST endpoint: POST /api/v1/flow/:flow/load (trigger document processing) + app.post<{ Params: { flow: string } }>( + "/api/v1/flow/:flow/load", + async (request, reply) => { + const { flow } = request.params; + const body = request.body as { + documentId?: string; + user?: string; + collection?: string; + }; + + if (!body.documentId) { + return reply.code(400).send({ + error: { type: "bad-request", message: "documentId is required" }, + }); + } + + try { + const user = body.user ?? "default"; + const collection = body.collection ?? "default"; + const documentId = body.documentId; + + // Publish Document message to the decode-input topic + const topic = "tg.flow.document"; + const metadata = { + id: `load-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + root: documentId, + user, + collection, + }; + + await dispatcher.publishToTopic(topic, { metadata, documentId }); + + return { status: "processing", documentId, flow }; + } catch (err) { + reply.code(500).send({ + error: { type: "internal", message: String(err) }, + }); + } + }, + ); + // WebSocket endpoint: /api/v1/socket // Uses Mux for queue-based request buffering and concurrency control. app.get("/api/v1/socket", { websocket: true }, (socket, request) => { diff --git a/ts/packages/flow/src/index.ts b/ts/packages/flow/src/index.ts index 068382ed..585ddab8 100644 --- a/ts/packages/flow/src/index.ts +++ b/ts/packages/flow/src/index.ts @@ -62,5 +62,11 @@ export { KnowledgeExtractService } from "./extract/knowledge-extract.js"; // Knowledge core service export { KnowledgeCoreService, type KnowledgeCoreServiceConfig } from "./cores/service.js"; +// Ollama text completion +export { OllamaProcessor } from "./model/text-completion/ollama.js"; + +// PDF decoder +export { PdfDecoderService } from "./decoding/pdf-decoder.js"; + // Flow manager service export { FlowManagerService } from "./flow-manager/service.js"; diff --git a/ts/packages/flow/src/model/text-completion/ollama.ts b/ts/packages/flow/src/model/text-completion/ollama.ts new file mode 100644 index 00000000..55935664 --- /dev/null +++ b/ts/packages/flow/src/model/text-completion/ollama.ts @@ -0,0 +1,117 @@ +/** + * Ollama text completion service. + * + * Connects to a local Ollama instance for text generation. + * + * Python reference: trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py + */ + +import { Ollama } from "ollama"; +import { LlmService, type ProcessorConfig, type LlmResult, type LlmChunk } from "@trustgraph/base"; + +export class OllamaProcessor extends LlmService { + private client: Ollama; + private readonly defaultModel: string; + + constructor(config: ProcessorConfig & { + model?: string; + ollamaUrl?: string; + }) { + super(config); + + this.defaultModel = + config.model ?? + process.env.OLLAMA_MODEL ?? + "qwen2.5:0.5b"; + + const host = + config.ollamaUrl ?? + process.env.OLLAMA_URL ?? + "http://localhost:11434"; + + this.client = new Ollama({ host }); + + console.log( + `[Ollama] LLM service initialized (host=${host}, model=${this.defaultModel})`, + ); + } + + async generateContent( + system: string, + prompt: string, + model?: string, + _temperature?: number, + ): Promise { + const modelName = model ?? this.defaultModel; + const fullPrompt = system + "\n\n" + prompt; + + const resp = await this.client.generate({ + model: modelName, + prompt: fullPrompt, + stream: false, + }); + + return { + text: resp.response, + inToken: resp.prompt_eval_count ?? 0, + outToken: resp.eval_count ?? 0, + model: modelName, + }; + } + + override supportsStreaming(): boolean { + return true; + } + + async *generateContentStream( + system: string, + prompt: string, + model?: string, + _temperature?: number, + ): AsyncGenerator { + const modelName = model ?? this.defaultModel; + const fullPrompt = system + "\n\n" + prompt; + + const stream = await this.client.generate({ + model: modelName, + prompt: fullPrompt, + stream: true, + }); + + let totalInputTokens = 0; + let totalOutputTokens = 0; + + for await (const chunk of stream) { + // Token counts accumulate across chunks; keep the latest values + if (chunk.prompt_eval_count !== undefined) { + totalInputTokens = chunk.prompt_eval_count; + } + if (chunk.eval_count !== undefined) { + totalOutputTokens = chunk.eval_count; + } + + if (chunk.response) { + yield { + text: chunk.response, + inToken: null, + outToken: null, + model: modelName, + isFinal: false, + }; + } + } + + // Final chunk with accumulated token counts + yield { + text: "", + inToken: totalInputTokens, + outToken: totalOutputTokens, + model: modelName, + isFinal: true, + }; + } +} + +export async function run(): Promise { + await OllamaProcessor.launch("text-completion"); +} diff --git a/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts new file mode 100644 index 00000000..1f335f61 --- /dev/null +++ b/ts/packages/flow/src/storage/embeddings/graph-embeddings-service.ts @@ -0,0 +1,91 @@ +/** + * Graph embeddings store service — vectorizes entity contexts and writes to Qdrant. + * + * A FlowProcessor that: + * 1. Consumes EntityContexts messages + * 2. Calls the embeddings service to vectorize entity context strings + * 3. Writes entity+vector pairs to Qdrant using QdrantGraphEmbeddingsStore + * + * Python reference: trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/service.py + */ + +import { + FlowProcessor, + ConsumerSpec, + RequestResponseSpec, + type ProcessorConfig, + type FlowContext, + type EntityContexts, + type EmbeddingsRequest, + type EmbeddingsResponse, +} from "@trustgraph/base"; +import { QdrantGraphEmbeddingsStore } from "./qdrant-graph.js"; + +export class GraphEmbeddingsStoreService extends FlowProcessor { + private store: QdrantGraphEmbeddingsStore; + + constructor(config: ProcessorConfig) { + super(config); + this.store = new QdrantGraphEmbeddingsStore(); + + this.registerSpecification( + new ConsumerSpec( + "store-graph-embeddings-input", + this.onMessage.bind(this), + ), + ); + this.registerSpecification( + new RequestResponseSpec( + "embeddings-client", + "embeddings-request", + "embeddings-response", + ), + ); + + console.log("[GraphEmbeddingsStore] Service initialized"); + } + + private async onMessage( + msg: EntityContexts, + properties: Record, + flowCtx: FlowContext, + ): Promise { + if (!msg.entities || msg.entities.length === 0) return; + + const embeddingsClient = + flowCtx.flow.requestor("embeddings-client"); + + const user = msg.metadata?.user ?? "default"; + const collection = msg.metadata?.collection ?? "default"; + + // Get text contexts for vectorization + const texts = msg.entities.map((e) => e.context); + + // Call embeddings service + const embResponse = await embeddingsClient.request({ text: texts }); + if (embResponse.error) { + console.error( + "[GraphEmbeddingsStore] Embeddings error:", + embResponse.error.message, + ); + return; + } + + // Store entity+vector pairs + const entities = msg.entities.map((e, i) => ({ + entity: e.entity, + vector: embResponse.vectors[i], + chunkId: e.chunkId, + })); + + await this.store.store({ user, collection, entities }); + + console.log( + `[GraphEmbeddingsStore] Stored ${entities.length} embeddings for ${user}/${collection}`, + ); + } +} + +export async function run(): Promise { + await GraphEmbeddingsStoreService.launch("graph-embeddings-store"); +} diff --git a/ts/packages/flow/src/storage/triples/falkordb-service.ts b/ts/packages/flow/src/storage/triples/falkordb-service.ts new file mode 100644 index 00000000..a203396e --- /dev/null +++ b/ts/packages/flow/src/storage/triples/falkordb-service.ts @@ -0,0 +1,54 @@ +/** + * Triples store service — writes RDF triples to FalkorDB via FlowProcessor. + * + * A FlowProcessor that: + * 1. Consumes Triples messages + * 2. Writes each triple to FalkorDB using FalkorDBTriplesStore + * + * Python reference: trustgraph-flow/trustgraph/storage/triples/falkordb/service.py + */ + +import { + FlowProcessor, + ConsumerSpec, + type ProcessorConfig, + type FlowContext, + type Triples, +} from "@trustgraph/base"; +import { FalkorDBTriplesStore } from "./falkordb.js"; + +export class TriplesStoreService extends FlowProcessor { + private store: FalkorDBTriplesStore; + + constructor(config: ProcessorConfig) { + super(config); + this.store = new FalkorDBTriplesStore(); + + this.registerSpecification( + new ConsumerSpec("store-triples-input", this.onMessage.bind(this)), + ); + + console.log("[TriplesStore] Service initialized"); + } + + private async onMessage( + msg: Triples, + properties: Record, + flowCtx: FlowContext, + ): Promise { + if (!msg.triples || msg.triples.length === 0) return; + + const user = msg.metadata?.user ?? "default"; + const collection = msg.metadata?.collection ?? "default"; + + await this.store.storeTriples(msg.triples, user, collection); + + console.log( + `[TriplesStore] Stored ${msg.triples.length} triples for ${user}/${collection}`, + ); + } +} + +export async function run(): Promise { + await TriplesStoreService.launch("triples-store"); +} diff --git a/ts/pnpm-lock.yaml b/ts/pnpm-lock.yaml index 934a9314..5d3894ec 100644 --- a/ts/pnpm-lock.yaml +++ b/ts/pnpm-lock.yaml @@ -107,9 +107,15 @@ importers: fastify: specifier: ^5.2.0 version: 5.8.4 + ollama: + specifier: ^0.6.3 + version: 0.6.3 openai: specifier: ^4.85.0 version: 4.104.0(ws@8.20.0)(zod@3.25.76) + pdfjs-dist: + specifier: ^5.6.205 + version: 5.6.205 devDependencies: typescript: specifier: ^5.8.0 @@ -668,6 +674,76 @@ packages: '@cfworker/json-schema': optional: true + '@napi-rs/canvas-android-arm64@0.1.97': + resolution: {integrity: sha512-V1c/WVw+NzH8vk7ZK/O8/nyBSCQimU8sfMsB/9qeSvdkGKNU7+mxy/bIF0gTgeBFmHpj30S4E9WHMSrxXGQuVQ==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [android] + + '@napi-rs/canvas-darwin-arm64@0.1.97': + resolution: {integrity: sha512-ok+SCEF4YejcxuJ9Rm+WWunHHpf2HmiPxfz6z1a/NFQECGXtsY7A4B8XocK1LmT1D7P174MzwPF9Wy3AUAwEPw==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [darwin] + + '@napi-rs/canvas-darwin-x64@0.1.97': + resolution: {integrity: sha512-PUP6e6/UGlclUvAQNnuXCcnkpdUou6VYZfQOQxExLp86epOylmiwLkqXIvpFmjoTEDmPmXrI+coL/9EFU1gKPA==} + engines: {node: '>= 10'} + cpu: [x64] + os: [darwin] + + '@napi-rs/canvas-linux-arm-gnueabihf@0.1.97': + resolution: {integrity: sha512-XyXH2L/cic8eTNtbrXCcvqHtMX/nEOxN18+7rMrAM2XtLYC/EB5s0wnO1FsLMWmK+04ZSLN9FBGipo7kpIkcOw==} + engines: {node: '>= 10'} + cpu: [arm] + os: [linux] + + '@napi-rs/canvas-linux-arm64-gnu@0.1.97': + resolution: {integrity: sha512-Kuq/M3djq0K8ktgz6nPlK7Ne5d4uWeDxPpyKWOjWDK2RIOhHVtLtyLiJw2fuldw7Vn4mhw05EZXCEr4Q76rs9w==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [linux] + + '@napi-rs/canvas-linux-arm64-musl@0.1.97': + resolution: {integrity: sha512-kKmSkQVnWeqg7qdsiXvYxKhAFuHz3tkBjW/zyQv5YKUPhotpaVhpBGv5LqCngzyuRV85SXoe+OFj+Tv0a0QXkQ==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [linux] + + '@napi-rs/canvas-linux-riscv64-gnu@0.1.97': + resolution: {integrity: sha512-Jc7I3A51jnEOIAXeLsN/M/+Z28LUeakcsXs07FLq9prXc0eYOtVwsDEv913Gr+06IRo34gJJVgT0TXvmz+N2VA==} + engines: {node: '>= 10'} + cpu: [riscv64] + os: [linux] + + '@napi-rs/canvas-linux-x64-gnu@0.1.97': + resolution: {integrity: sha512-iDUBe7AilfuBSRbSa8/IGX38Mf+iCSBqoVKLSQ5XaY2JLOaqz1TVyPFEyIck7wT6mRQhQt5sN6ogfjIDfi74tg==} + engines: {node: '>= 10'} + cpu: [x64] + os: [linux] + + '@napi-rs/canvas-linux-x64-musl@0.1.97': + resolution: {integrity: sha512-AKLFd/v0Z5fvgqBDqhvqtAdx+fHMJ5t9JcUNKq4FIZ5WH+iegGm8HPdj00NFlCSnm83Fp3Ln8I2f7uq1aIiWaA==} + engines: {node: '>= 10'} + cpu: [x64] + os: [linux] + + '@napi-rs/canvas-win32-arm64-msvc@0.1.97': + resolution: {integrity: sha512-u883Yr6A6fO7Vpsy9YE4FVCIxzzo5sO+7pIUjjoDLjS3vQaNMkVzx5bdIpEL+ob+gU88WDK4VcxYMZ6nmnoX9A==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [win32] + + '@napi-rs/canvas-win32-x64-msvc@0.1.97': + resolution: {integrity: sha512-sWtD2EE3fV0IzN+iiQUqr/Q1SwqWhs2O1FKItFlxtdDkikpEj5g7DKQpY3x55H/MAOnL8iomnlk3mcEeGiUMoQ==} + engines: {node: '>= 10'} + cpu: [x64] + os: [win32] + + '@napi-rs/canvas@0.1.97': + resolution: {integrity: sha512-8cFniXvrIEnVwuNSRCW9wirRZbHvrD3JVujdS2P5n5xiJZNZMOZcfOvJ1pb66c7jXMKHHglJEDVJGbm8XWFcXQ==} + engines: {node: '>= 10'} + '@opentelemetry/api@1.9.1': resolution: {integrity: sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==} engines: {node: '>=8.0.0'} @@ -1912,6 +1988,9 @@ packages: encoding: optional: true + node-readable-to-web-readable-stream@0.4.2: + resolution: {integrity: sha512-/cMZNI34v//jUTrI+UIo4ieHAB5EZRY/+7OmXZgBxaWBMcW2tGdceIw06RFxWxrKZ5Jp3sI2i5TsRo+CBhtVLQ==} + node-releases@2.0.37: resolution: {integrity: sha512-1h5gKZCF+pO/o3Iqt5Jp7wc9rH3eJJ0+nh/CIoiRwjRxde/hAHyLPXYN4V3CqKAbiZPSeJFSWHmJsbkicta0Eg==} @@ -1923,6 +2002,9 @@ packages: resolution: {integrity: sha512-W67iLl4J2EXEGTbfeHCffrjDfitvLANg0UlX3wFUUSTx92KXRFegMHUVgSqE+wvhAbi4WqjGg9czysTV2Epbew==} engines: {node: '>= 0.4'} + ollama@0.6.3: + resolution: {integrity: sha512-KEWEhIqE5wtfzEIZbDCLH51VFZ6Z3ZSa6sIOg/E/tBV8S51flyqBOXi+bRxlOYKDf8i327zG9eSTb8IJxvm3Zg==} + on-exit-leak-free@2.1.2: resolution: {integrity: sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==} engines: {node: '>=14.0.0'} @@ -1967,6 +2049,10 @@ packages: resolution: {integrity: sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ==} engines: {node: '>= 14.16'} + pdfjs-dist@5.6.205: + resolution: {integrity: sha512-tlUj+2IDa7G1SbvBNN74UHRLJybZDWYom+k6p5KIZl7huBvsA4APi6mKL+zCxd3tLjN5hOOEE9Tv7VdzO88pfg==} + engines: {node: '>=20.19.0 || >=22.13.0 || >=24'} + picocolors@1.1.1: resolution: {integrity: sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==} @@ -2441,6 +2527,9 @@ packages: webidl-conversions@3.0.1: resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} + whatwg-fetch@3.6.20: + resolution: {integrity: sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg==} + whatwg-mimetype@3.0.0: resolution: {integrity: sha512-nt+N2dzIutVRxARx1nghPKGv1xHikU7HKdfafKkLNLindmPU/ch3U31NOCGGA/dmPcmb1VlofO0vnKAcsm0o/Q==} engines: {node: '>=12'} @@ -2877,6 +2966,54 @@ snapshots: transitivePeerDependencies: - supports-color + '@napi-rs/canvas-android-arm64@0.1.97': + optional: true + + '@napi-rs/canvas-darwin-arm64@0.1.97': + optional: true + + '@napi-rs/canvas-darwin-x64@0.1.97': + optional: true + + '@napi-rs/canvas-linux-arm-gnueabihf@0.1.97': + optional: true + + '@napi-rs/canvas-linux-arm64-gnu@0.1.97': + optional: true + + '@napi-rs/canvas-linux-arm64-musl@0.1.97': + optional: true + + '@napi-rs/canvas-linux-riscv64-gnu@0.1.97': + optional: true + + '@napi-rs/canvas-linux-x64-gnu@0.1.97': + optional: true + + '@napi-rs/canvas-linux-x64-musl@0.1.97': + optional: true + + '@napi-rs/canvas-win32-arm64-msvc@0.1.97': + optional: true + + '@napi-rs/canvas-win32-x64-msvc@0.1.97': + optional: true + + '@napi-rs/canvas@0.1.97': + optionalDependencies: + '@napi-rs/canvas-android-arm64': 0.1.97 + '@napi-rs/canvas-darwin-arm64': 0.1.97 + '@napi-rs/canvas-darwin-x64': 0.1.97 + '@napi-rs/canvas-linux-arm-gnueabihf': 0.1.97 + '@napi-rs/canvas-linux-arm64-gnu': 0.1.97 + '@napi-rs/canvas-linux-arm64-musl': 0.1.97 + '@napi-rs/canvas-linux-riscv64-gnu': 0.1.97 + '@napi-rs/canvas-linux-x64-gnu': 0.1.97 + '@napi-rs/canvas-linux-x64-musl': 0.1.97 + '@napi-rs/canvas-win32-arm64-msvc': 0.1.97 + '@napi-rs/canvas-win32-x64-msvc': 0.1.97 + optional: true + '@opentelemetry/api@1.9.1': {} '@pinojs/redact@0.4.0': {} @@ -4232,12 +4369,19 @@ snapshots: dependencies: whatwg-url: 5.0.0 + node-readable-to-web-readable-stream@0.4.2: + optional: true + node-releases@2.0.37: {} object-assign@4.1.1: {} object-inspect@1.13.4: {} + ollama@0.6.3: + dependencies: + whatwg-fetch: 3.6.20 + on-exit-leak-free@2.1.2: {} on-finished@2.4.1: @@ -4283,6 +4427,11 @@ snapshots: pathval@2.0.1: {} + pdfjs-dist@5.6.205: + optionalDependencies: + '@napi-rs/canvas': 0.1.97 + node-readable-to-web-readable-stream: 0.4.2 + picocolors@1.1.1: {} picomatch@4.0.4: {} @@ -4894,6 +5043,8 @@ snapshots: webidl-conversions@3.0.1: {} + whatwg-fetch@3.6.20: {} + whatwg-mimetype@3.0.0: {} whatwg-url@5.0.0: diff --git a/ts/scripts/run-graph-embeddings-store.ts b/ts/scripts/run-graph-embeddings-store.ts new file mode 100644 index 00000000..0f7d66ec --- /dev/null +++ b/ts/scripts/run-graph-embeddings-store.ts @@ -0,0 +1,6 @@ +import { run } from "../packages/flow/src/storage/embeddings/graph-embeddings-service.js"; + +run().catch((err) => { + console.error("Graph embeddings store service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/run-ollama.ts b/ts/scripts/run-ollama.ts new file mode 100644 index 00000000..c8ee0b95 --- /dev/null +++ b/ts/scripts/run-ollama.ts @@ -0,0 +1,16 @@ +/** + * Start the Ollama text-completion service. + * + * Usage: pnpm tsx scripts/run-ollama.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + * OLLAMA_URL (default: http://localhost:11434) + * OLLAMA_MODEL (default: qwen2.5:0.5b) + */ +import { run } from "../packages/flow/src/model/text-completion/ollama.js"; + +run().catch((err) => { + console.error("Ollama LLM service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/run-pdf-decoder.ts b/ts/scripts/run-pdf-decoder.ts new file mode 100644 index 00000000..11a7d6e5 --- /dev/null +++ b/ts/scripts/run-pdf-decoder.ts @@ -0,0 +1,14 @@ +/** + * Start the PDF decoder service. + * + * Usage: pnpm tsx scripts/run-pdf-decoder.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + */ +import { run } from "../packages/flow/src/decoding/pdf-decoder.js"; + +run().catch((err) => { + console.error("PDF decoder service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/run-triples-store.ts b/ts/scripts/run-triples-store.ts new file mode 100644 index 00000000..5c685514 --- /dev/null +++ b/ts/scripts/run-triples-store.ts @@ -0,0 +1,6 @@ +import { run } from "../packages/flow/src/storage/triples/falkordb-service.js"; + +run().catch((err) => { + console.error("Triples store service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/seed-config.ts b/ts/scripts/seed-config.ts index 075bdc91..08bd8fe5 100644 --- a/ts/scripts/seed-config.ts +++ b/ts/scripts/seed-config.ts @@ -95,6 +95,19 @@ async function main(): Promise { await pushConfig(["flows"], { default: { topics: { + // Document processing pipeline + "decode-input": "tg.flow.document", + "decode-output": "tg.flow.text-document", + "decode-triples": "tg.flow.triples", + "chunk-input": "tg.flow.text-document", + "chunk-output": "tg.flow.chunk", + "chunk-triples": "tg.flow.triples", + "extract-input": "tg.flow.chunk", + "extract-triples": "tg.flow.triples", + "extract-entity-contexts": "tg.flow.entity-contexts", + // Storage consumers + "store-triples-input": "tg.flow.triples", + "store-graph-embeddings-input": "tg.flow.entity-contexts", // LLM text completion "text-completion-request": "tg.flow.text-completion-request", "text-completion-response": "tg.flow.text-completion-response", @@ -107,17 +120,18 @@ async function main(): Promise { // Document RAG "document-rag-request": "tg.flow.document-rag-request", "document-rag-response": "tg.flow.document-rag-response", - // Triples + // Triples query "triples-request": "tg.flow.triples-request", "triples-response": "tg.flow.triples-response", // Agent "agent-request": "tg.flow.agent-request", "agent-response": "tg.flow.agent-response", - // Chunking pipeline - "input": "tg.flow.chunk", - "output": "tg.flow.chunk", - "triples": "tg.flow.triples", - "entity-contexts": "tg.flow.entity-contexts", + // Embeddings + "embeddings-request": "tg.flow.embeddings-request", + "embeddings-response": "tg.flow.embeddings-response", + // Librarian RPC (for PDF decoder) + "librarian-request": "tg.flow.librarian-request", + "librarian-response": "tg.flow.librarian-response", }, }, }); diff --git a/ts/scripts/test-pipeline.ts b/ts/scripts/test-pipeline.ts index bfc23bad..4ae364a8 100644 --- a/ts/scripts/test-pipeline.ts +++ b/ts/scripts/test-pipeline.ts @@ -134,22 +134,43 @@ async function testPushFlowConfig(): Promise { values: { default: { topics: { + // Document processing pipeline + "decode-input": "tg.flow.document", + "decode-output": "tg.flow.text-document", + "decode-triples": "tg.flow.triples", + "chunk-input": "tg.flow.text-document", + "chunk-output": "tg.flow.chunk", + "chunk-triples": "tg.flow.triples", + "extract-input": "tg.flow.chunk", + "extract-triples": "tg.flow.triples", + "extract-entity-contexts": "tg.flow.entity-contexts", + // Storage consumers + "store-triples-input": "tg.flow.triples", + "store-graph-embeddings-input": "tg.flow.entity-contexts", + // LLM text completion "text-completion-request": "tg.flow.text-completion-request", "text-completion-response": "tg.flow.text-completion-response", + // Prompt service "prompt-request": "tg.flow.prompt-request", "prompt-response": "tg.flow.prompt-response", + // Graph RAG "graph-rag-request": "tg.flow.graph-rag-request", "graph-rag-response": "tg.flow.graph-rag-response", + // Document RAG "document-rag-request": "tg.flow.document-rag-request", "document-rag-response": "tg.flow.document-rag-response", + // Triples query "triples-request": "tg.flow.triples-request", "triples-response": "tg.flow.triples-response", + // Agent "agent-request": "tg.flow.agent-request", "agent-response": "tg.flow.agent-response", - "input": "tg.flow.chunk", - "output": "tg.flow.chunk", - "triples": "tg.flow.triples", - "entity-contexts": "tg.flow.entity-contexts", + // Embeddings + "embeddings-request": "tg.flow.embeddings-request", + "embeddings-response": "tg.flow.embeddings-response", + // Librarian RPC (for PDF decoder) + "librarian-request": "tg.flow.librarian-request", + "librarian-response": "tg.flow.librarian-response", }, }, }, @@ -373,6 +394,69 @@ async function testLibrarianDelete(): Promise { } } +// ─── Document Load Test ────────────────────────────────────────────── + +async function testDocumentLoad(): Promise { + try { + // First upload a test document via librarian + const content = Buffer.from("Test document for pipeline processing.").toString("base64"); + const addRes = await post("/api/v1/librarian", { + operation: "add-document", + user: "test-user", + collection: "test-collection", + content, + documentMetadata: { + id: "", + time: Date.now(), + kind: "application/pdf", + title: "Test Pipeline Document", + comments: "", + user: "test-user", + tags: ["test"], + documentType: "source", + }, + }) as Record; + + const meta = addRes.documentMetadata as Record | undefined; + if (!meta?.id) { + fail("Document load", "failed to upload test document"); + return false; + } + const docId = meta.id as string; + + // Trigger document processing via the load endpoint + const res = await fetch(`${GATEWAY_URL}/api/v1/flow/default/load`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + documentId: docId, + user: "test-user", + collection: "test-collection", + }), + }); + const data = await res.json() as Record; + log("document-load", data); + + if (data.status === "processing") { + pass(`Document load triggered for ${docId.slice(0, 8)}...`); + + // Clean up the test document + await post("/api/v1/librarian", { + operation: "remove-document", + documentId: docId, + user: "test-user", + }); + + return true; + } + fail("Document load", "unexpected response"); + return false; + } catch (err) { + fail("Document load", err); + return false; + } +} + // ─── Agent Test ─────────────────────────────────────────────────────── async function testAgentQuery(): Promise { @@ -444,6 +528,14 @@ async function main(): Promise { // Flow config push await run("Push Flow Config", testPushFlowConfig); + // Document pipeline load test (requires librarian + gateway) + if (process.env.SKIP_PIPELINE !== "1" && process.env.SKIP_LIBRARIAN !== "1") { + console.log("\n (Testing document load — set SKIP_PIPELINE=1 to skip)"); + await run("Document Load", testDocumentLoad); + } else { + console.log("\n (Skipping document pipeline load test)"); + } + // LLM test (only if a running LLM service is available) if (process.env.SKIP_LLM !== "1") { console.log("\n (Testing text-completion — set SKIP_LLM=1 to skip)");