feat: add document pipeline — PDF decoder, Ollama LLM, storage services

Add end-to-end document processing pipeline:
- PDF decoder service (pdfjs-dist) extracts text per page from librarian docs
- Ollama native LLM service for local model inference
- FalkorDB triples store FlowProcessor consumer
- Qdrant graph embeddings store FlowProcessor consumer
- Fix spec name collisions in chunker/extractor (input→chunk-input, etc.)
- Gateway /load endpoint to trigger document processing
- Align flow manager blueprint and seed config with full pipeline topics
- Add runner scripts and test coverage for document load

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
elpresidank 2026-04-06 23:47:43 -05:00
parent 8f9de7604e
commit 8f7008822a
20 changed files with 894 additions and 37 deletions

View file

@ -30,10 +30,10 @@ export class ChunkingService extends FlowProcessor {
super(config);
this.registerSpecification(
new ConsumerSpec<TextDocument>("input", this.onMessage.bind(this)),
new ConsumerSpec<TextDocument>("chunk-input", this.onMessage.bind(this)),
);
this.registerSpecification(new ProducerSpec<Chunk>("output"));
this.registerSpecification(new ProducerSpec<Triples>("triples"));
this.registerSpecification(new ProducerSpec<Chunk>("chunk-output"));
this.registerSpecification(new ProducerSpec<Triples>("chunk-triples"));
this.registerSpecification(new ParameterSpec("chunk-size"));
this.registerSpecification(new ParameterSpec("chunk-overlap"));
@ -75,7 +75,7 @@ export class ChunkingService extends FlowProcessor {
`[ChunkingService] Split document ${msg.documentId} into ${chunks.length} chunks (size=${chunkSize}, overlap=${chunkOverlap})`,
);
const outputProducer = flowCtx.flow.producer<Chunk>("output");
const outputProducer = flowCtx.flow.producer<Chunk>("chunk-output");
for (const chunkText of chunks) {
const chunk: Chunk = {

View file

@ -0,0 +1,203 @@
/**
* PDF decoder service extracts text from PDF documents page by page.
*
* A FlowProcessor that:
* 1. Consumes Document messages (documentId + pipeline metadata)
* 2. Fetches document content from librarian via request/response
* 3. Validates it is a PDF (checks MIME type from librarian metadata)
* 4. Extracts text per page using pdfjs-dist
* 5. Saves each page as a child document in librarian
* 6. Emits TextDocument per page (to chunking pipeline)
* 7. Emits Triples per page (provenance)
*
* Python reference: trustgraph-flow/trustgraph/decoding/pdf/decoder.py
*/
import { getDocument } from "pdfjs-dist/legacy/build/pdf.mjs";
import type { TextItem } from "pdfjs-dist/types/src/display/api.js";
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
RequestResponseSpec,
type ProcessorConfig,
type FlowContext,
type Document,
type TextDocument,
type Triples,
type Triple,
type Term,
type LibrarianRequest,
type LibrarianResponse,
} from "@trustgraph/base";
export class PdfDecoderService extends FlowProcessor {
constructor(config: ProcessorConfig) {
super(config);
this.registerSpecification(
new ConsumerSpec<Document>("decode-input", this.onMessage.bind(this)),
);
this.registerSpecification(new ProducerSpec<TextDocument>("decode-output"));
this.registerSpecification(new ProducerSpec<Triples>("decode-triples"));
this.registerSpecification(
new RequestResponseSpec<LibrarianRequest, LibrarianResponse>(
"librarian-client",
"librarian-request",
"librarian-response",
),
);
console.log("[PdfDecoder] Service initialized");
}
private async onMessage(
msg: Document,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const { documentId } = msg;
const user = msg.metadata.user;
const librarian = flowCtx.flow.requestor<LibrarianRequest, LibrarianResponse>(
"librarian-client",
);
// 1. Fetch document metadata to check MIME type
const metadataResp = await librarian.request({
operation: "get-document-metadata",
documentId,
user,
});
if (metadataResp.error) {
console.error(
`[PdfDecoder] Failed to get metadata for ${documentId}:`,
metadataResp.error.message,
);
return;
}
const kind = metadataResp.documentMetadata?.kind;
if (kind !== "application/pdf") {
console.log(
`[PdfDecoder] Skipping document ${documentId}: kind=${kind} (not PDF)`,
);
return;
}
// 2. Fetch document content
const contentResp = await librarian.request({
operation: "get-document-content",
documentId,
user,
});
if (contentResp.error || !contentResp.content) {
console.error(
`[PdfDecoder] Failed to get content for ${documentId}:`,
contentResp.error?.message ?? "no content",
);
return;
}
// 3. Decode base64 content and extract text per page
const pdfBuffer = Buffer.from(contentResp.content, "base64");
const pdf = await getDocument({ data: new Uint8Array(pdfBuffer) }).promise;
console.log(
`[PdfDecoder] Document ${documentId}: ${pdf.numPages} pages`,
);
const outputProducer = flowCtx.flow.producer<TextDocument>("decode-output");
const triplesProducer = flowCtx.flow.producer<Triples>("decode-triples");
for (let i = 1; i <= pdf.numPages; i++) {
const page = await pdf.getPage(i);
const textContent = await page.getTextContent();
const pageText = textContent.items
.filter((item): item is TextItem => "str" in item)
.map((item) => item.str)
.join(" ");
if (!pageText.trim()) {
console.log(
`[PdfDecoder] Skipping empty page ${i} of document ${documentId}`,
);
continue;
}
// 4. Save as child document in librarian
const childResp = await librarian.request({
operation: "add-child-document",
documentMetadata: {
id: "",
user,
kind: "text/plain",
title: `Page ${i}`,
parentId: documentId,
documentType: "page",
time: Date.now(),
comments: "",
tags: [],
},
content: Buffer.from(pageText).toString("base64"),
});
if (childResp.error) {
console.error(
`[PdfDecoder] Failed to save page ${i} of ${documentId}:`,
childResp.error.message,
);
continue;
}
const childDocId = childResp.documentMetadata?.id ?? "";
// 5. Emit TextDocument for the chunking pipeline
await outputProducer.send(requestId, {
metadata: msg.metadata,
text: pageText,
documentId: childDocId,
});
// 6. Emit provenance triples
const triples: Triple[] = [
{
s: iriTerm(`urn:tg:page:${childDocId}`),
p: iriTerm("http://www.w3.org/ns/prov#wasDerivedFrom"),
o: iriTerm(`urn:tg:doc:${documentId}`),
},
{
s: iriTerm(`urn:tg:page:${childDocId}`),
p: iriTerm("http://www.w3.org/2000/01/rdf-schema#label"),
o: literalTerm(`Page ${i}`),
},
];
await triplesProducer.send(requestId, {
metadata: msg.metadata,
triples,
});
}
console.log(
`[PdfDecoder] Finished processing document ${documentId}`,
);
}
}
function iriTerm(iri: string): Term {
return { type: "IRI", iri };
}
function literalTerm(value: string): Term {
return { type: "LITERAL", value };
}
export async function run(): Promise<void> {
await PdfDecoderService.launch("pdf-decoder");
}

View file

@ -49,10 +49,10 @@ export class KnowledgeExtractService extends FlowProcessor {
super(config);
this.registerSpecification(
new ConsumerSpec<Chunk>("input", this.onMessage.bind(this)),
new ConsumerSpec<Chunk>("extract-input", this.onMessage.bind(this)),
);
this.registerSpecification(new ProducerSpec<Triples>("triples"));
this.registerSpecification(new ProducerSpec<EntityContexts>("entity-contexts"));
this.registerSpecification(new ProducerSpec<Triples>("extract-triples"));
this.registerSpecification(new ProducerSpec<EntityContexts>("extract-entity-contexts"));
this.registerSpecification(
new RequestResponseSpec<PromptRequest, PromptResponse>(
@ -85,8 +85,8 @@ export class KnowledgeExtractService extends FlowProcessor {
const promptClient = flowCtx.flow.requestor<PromptRequest, PromptResponse>("prompt-client");
const llmClient = flowCtx.flow.requestor<TextCompletionRequest, TextCompletionResponse>("llm-client");
const triplesProducer = flowCtx.flow.producer<Triples>("triples");
const entityContextsProducer = flowCtx.flow.producer<EntityContexts>("entity-contexts");
const triplesProducer = flowCtx.flow.producer<Triples>("extract-triples");
const entityContextsProducer = flowCtx.flow.producer<EntityContexts>("extract-entity-contexts");
const allTriples: Triple[] = [];
const allEntityContexts: EntityContext[] = [];

View file

@ -48,22 +48,43 @@ interface Blueprint {
const DEFAULT_BLUEPRINT: Blueprint = {
description: "Default processing pipeline with all services",
topics: {
"request": "tg.flow.text-completion-request",
"response": "tg.flow.text-completion-response",
"prompt-request": "tg.flow.prompt-request",
"prompt-response": "tg.flow.prompt-response",
"graph-rag-request": "tg.flow.graph-rag-request",
"graph-rag-response": "tg.flow.graph-rag-response",
"document-rag-request": "tg.flow.document-rag-request",
"document-rag-response": "tg.flow.document-rag-response",
"triples-request": "tg.flow.triples-request",
"triples-response": "tg.flow.triples-response",
// Document processing pipeline
"decode-input": "tg.flow.document",
"decode-output": "tg.flow.text-document",
"decode-triples": "tg.flow.triples",
"chunk-input": "tg.flow.text-document",
"chunk-output": "tg.flow.chunk",
"chunk-triples": "tg.flow.triples",
"extract-input": "tg.flow.chunk",
"extract-triples": "tg.flow.triples",
"extract-entity-contexts": "tg.flow.entity-contexts",
// Storage consumers
"store-triples-input": "tg.flow.triples",
"store-graph-embeddings-input": "tg.flow.entity-contexts",
// LLM text completion
"text-completion-request": "tg.flow.text-completion-request",
"text-completion-response": "tg.flow.text-completion-response",
"input": "tg.flow.chunk",
"output": "tg.flow.chunk",
"triples": "tg.flow.triples",
"entity-contexts": "tg.flow.entity-contexts",
// Prompt service
"prompt-request": "tg.flow.prompt-request",
"prompt-response": "tg.flow.prompt-response",
// Graph RAG
"graph-rag-request": "tg.flow.graph-rag-request",
"graph-rag-response": "tg.flow.graph-rag-response",
// Document RAG
"document-rag-request": "tg.flow.document-rag-request",
"document-rag-response": "tg.flow.document-rag-response",
// Triples query
"triples-request": "tg.flow.triples-request",
"triples-response": "tg.flow.triples-response",
// Agent
"agent-request": "tg.flow.agent-request",
"agent-response": "tg.flow.agent-response",
// Embeddings
"embeddings-request": "tg.flow.embeddings-request",
"embeddings-response": "tg.flow.embeddings-response",
// Librarian RPC (for PDF decoder)
"librarian-request": "tg.flow.librarian-request",
"librarian-response": "tg.flow.librarian-response",
},
};

View file

@ -235,6 +235,18 @@ export class DispatcherManager {
});
}
// ---------- Fire-and-forget publish ----------
/**
* Publish a single message to an arbitrary topic (no request/response).
* Used for injecting documents into the processing pipeline.
*/
async publishToTopic(topic: string, message: unknown): Promise<void> {
const producer = await this.pubsub.createProducer<unknown>({ topic });
await producer.send(message);
await producer.close();
}
// ---------- Static introspection ----------
static get flowServiceNames(): readonly string[] {

View file

@ -70,6 +70,48 @@ export async function createGateway(config: GatewayConfig) {
},
);
// REST endpoint: POST /api/v1/flow/:flow/load (trigger document processing)
app.post<{ Params: { flow: string } }>(
"/api/v1/flow/:flow/load",
async (request, reply) => {
const { flow } = request.params;
const body = request.body as {
documentId?: string;
user?: string;
collection?: string;
};
if (!body.documentId) {
return reply.code(400).send({
error: { type: "bad-request", message: "documentId is required" },
});
}
try {
const user = body.user ?? "default";
const collection = body.collection ?? "default";
const documentId = body.documentId;
// Publish Document message to the decode-input topic
const topic = "tg.flow.document";
const metadata = {
id: `load-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
root: documentId,
user,
collection,
};
await dispatcher.publishToTopic(topic, { metadata, documentId });
return { status: "processing", documentId, flow };
} catch (err) {
reply.code(500).send({
error: { type: "internal", message: String(err) },
});
}
},
);
// WebSocket endpoint: /api/v1/socket
// Uses Mux for queue-based request buffering and concurrency control.
app.get("/api/v1/socket", { websocket: true }, (socket, request) => {

View file

@ -62,5 +62,11 @@ export { KnowledgeExtractService } from "./extract/knowledge-extract.js";
// Knowledge core service
export { KnowledgeCoreService, type KnowledgeCoreServiceConfig } from "./cores/service.js";
// Ollama text completion
export { OllamaProcessor } from "./model/text-completion/ollama.js";
// PDF decoder
export { PdfDecoderService } from "./decoding/pdf-decoder.js";
// Flow manager service
export { FlowManagerService } from "./flow-manager/service.js";

View file

@ -0,0 +1,117 @@
/**
* Ollama text completion service.
*
* Connects to a local Ollama instance for text generation.
*
* Python reference: trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py
*/
import { Ollama } from "ollama";
import { LlmService, type ProcessorConfig, type LlmResult, type LlmChunk } from "@trustgraph/base";
export class OllamaProcessor extends LlmService {
private client: Ollama;
private readonly defaultModel: string;
constructor(config: ProcessorConfig & {
model?: string;
ollamaUrl?: string;
}) {
super(config);
this.defaultModel =
config.model ??
process.env.OLLAMA_MODEL ??
"qwen2.5:0.5b";
const host =
config.ollamaUrl ??
process.env.OLLAMA_URL ??
"http://localhost:11434";
this.client = new Ollama({ host });
console.log(
`[Ollama] LLM service initialized (host=${host}, model=${this.defaultModel})`,
);
}
async generateContent(
system: string,
prompt: string,
model?: string,
_temperature?: number,
): Promise<LlmResult> {
const modelName = model ?? this.defaultModel;
const fullPrompt = system + "\n\n" + prompt;
const resp = await this.client.generate({
model: modelName,
prompt: fullPrompt,
stream: false,
});
return {
text: resp.response,
inToken: resp.prompt_eval_count ?? 0,
outToken: resp.eval_count ?? 0,
model: modelName,
};
}
override supportsStreaming(): boolean {
return true;
}
async *generateContentStream(
system: string,
prompt: string,
model?: string,
_temperature?: number,
): AsyncGenerator<LlmChunk> {
const modelName = model ?? this.defaultModel;
const fullPrompt = system + "\n\n" + prompt;
const stream = await this.client.generate({
model: modelName,
prompt: fullPrompt,
stream: true,
});
let totalInputTokens = 0;
let totalOutputTokens = 0;
for await (const chunk of stream) {
// Token counts accumulate across chunks; keep the latest values
if (chunk.prompt_eval_count !== undefined) {
totalInputTokens = chunk.prompt_eval_count;
}
if (chunk.eval_count !== undefined) {
totalOutputTokens = chunk.eval_count;
}
if (chunk.response) {
yield {
text: chunk.response,
inToken: null,
outToken: null,
model: modelName,
isFinal: false,
};
}
}
// Final chunk with accumulated token counts
yield {
text: "",
inToken: totalInputTokens,
outToken: totalOutputTokens,
model: modelName,
isFinal: true,
};
}
}
export async function run(): Promise<void> {
await OllamaProcessor.launch("text-completion");
}

View file

@ -0,0 +1,91 @@
/**
* Graph embeddings store service vectorizes entity contexts and writes to Qdrant.
*
* A FlowProcessor that:
* 1. Consumes EntityContexts messages
* 2. Calls the embeddings service to vectorize entity context strings
* 3. Writes entity+vector pairs to Qdrant using QdrantGraphEmbeddingsStore
*
* Python reference: trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/service.py
*/
import {
FlowProcessor,
ConsumerSpec,
RequestResponseSpec,
type ProcessorConfig,
type FlowContext,
type EntityContexts,
type EmbeddingsRequest,
type EmbeddingsResponse,
} from "@trustgraph/base";
import { QdrantGraphEmbeddingsStore } from "./qdrant-graph.js";
export class GraphEmbeddingsStoreService extends FlowProcessor {
private store: QdrantGraphEmbeddingsStore;
constructor(config: ProcessorConfig) {
super(config);
this.store = new QdrantGraphEmbeddingsStore();
this.registerSpecification(
new ConsumerSpec<EntityContexts>(
"store-graph-embeddings-input",
this.onMessage.bind(this),
),
);
this.registerSpecification(
new RequestResponseSpec<EmbeddingsRequest, EmbeddingsResponse>(
"embeddings-client",
"embeddings-request",
"embeddings-response",
),
);
console.log("[GraphEmbeddingsStore] Service initialized");
}
private async onMessage(
msg: EntityContexts,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
if (!msg.entities || msg.entities.length === 0) return;
const embeddingsClient =
flowCtx.flow.requestor<EmbeddingsRequest, EmbeddingsResponse>("embeddings-client");
const user = msg.metadata?.user ?? "default";
const collection = msg.metadata?.collection ?? "default";
// Get text contexts for vectorization
const texts = msg.entities.map((e) => e.context);
// Call embeddings service
const embResponse = await embeddingsClient.request({ text: texts });
if (embResponse.error) {
console.error(
"[GraphEmbeddingsStore] Embeddings error:",
embResponse.error.message,
);
return;
}
// Store entity+vector pairs
const entities = msg.entities.map((e, i) => ({
entity: e.entity,
vector: embResponse.vectors[i],
chunkId: e.chunkId,
}));
await this.store.store({ user, collection, entities });
console.log(
`[GraphEmbeddingsStore] Stored ${entities.length} embeddings for ${user}/${collection}`,
);
}
}
export async function run(): Promise<void> {
await GraphEmbeddingsStoreService.launch("graph-embeddings-store");
}

View file

@ -0,0 +1,54 @@
/**
* Triples store service writes RDF triples to FalkorDB via FlowProcessor.
*
* A FlowProcessor that:
* 1. Consumes Triples messages
* 2. Writes each triple to FalkorDB using FalkorDBTriplesStore
*
* Python reference: trustgraph-flow/trustgraph/storage/triples/falkordb/service.py
*/
import {
FlowProcessor,
ConsumerSpec,
type ProcessorConfig,
type FlowContext,
type Triples,
} from "@trustgraph/base";
import { FalkorDBTriplesStore } from "./falkordb.js";
export class TriplesStoreService extends FlowProcessor {
private store: FalkorDBTriplesStore;
constructor(config: ProcessorConfig) {
super(config);
this.store = new FalkorDBTriplesStore();
this.registerSpecification(
new ConsumerSpec<Triples>("store-triples-input", this.onMessage.bind(this)),
);
console.log("[TriplesStore] Service initialized");
}
private async onMessage(
msg: Triples,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
if (!msg.triples || msg.triples.length === 0) return;
const user = msg.metadata?.user ?? "default";
const collection = msg.metadata?.collection ?? "default";
await this.store.storeTriples(msg.triples, user, collection);
console.log(
`[TriplesStore] Stored ${msg.triples.length} triples for ${user}/${collection}`,
);
}
}
export async function run(): Promise<void> {
await TriplesStoreService.launch("triples-store");
}