feat: add query/retrieval FlowProcessor services and missing runner scripts

Wire up the query and retrieval side of the pipeline so the agent can
answer questions from stored knowledge:

- Triples query service (FalkorDB) — all SPO pattern queries via NATS
- Graph embeddings query service (Qdrant) — entity vector similarity
- Document embeddings query service (Qdrant) — chunk vector similarity
- Graph RAG service — full concept→entity→traverse→score→synthesize pipeline
- Document RAG service — embed→find chunks→synthesize pipeline
- Runner scripts for chunker, extractor, embeddings (missing from Phase 5)
- Add DocumentEmbeddingsRequest/Response schema types
- Add RAG prompt templates (extract-concepts, edge-scoring, synthesize)
- Add graph/doc embeddings query topics to seed config + flow manager
- Add all pipeline/query/retrieval services to docker-compose
- 8 new runner scripts, 8 new pnpm script aliases

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
elpresidank 2026-04-07 01:05:54 -05:00
parent 8f7008822a
commit c545213224
19 changed files with 763 additions and 1 deletions

View file

@ -82,6 +82,12 @@ const DEFAULT_BLUEPRINT: Blueprint = {
// Embeddings
"embeddings-request": "tg.flow.embeddings-request",
"embeddings-response": "tg.flow.embeddings-response",
// Graph embeddings query
"graph-embeddings-request": "tg.flow.graph-embeddings-request",
"graph-embeddings-response": "tg.flow.graph-embeddings-response",
// Document embeddings query
"document-embeddings-request": "tg.flow.document-embeddings-request",
"document-embeddings-response": "tg.flow.document-embeddings-response",
// Librarian RPC (for PDF decoder)
"librarian-request": "tg.flow.librarian-request",
"librarian-response": "tg.flow.librarian-response",

View file

@ -68,5 +68,14 @@ export { OllamaProcessor } from "./model/text-completion/ollama.js";
// PDF decoder
export { PdfDecoderService } from "./decoding/pdf-decoder.js";
// Query services (FlowProcessor wrappers)
export { TriplesQueryService } from "./query/triples/falkordb-service.js";
export { GraphEmbeddingsQueryService } from "./query/embeddings/qdrant-graph-service.js";
export { DocEmbeddingsQueryService } from "./query/embeddings/qdrant-doc-service.js";
// Retrieval services (FlowProcessor wrappers)
export { GraphRagService } from "./retrieval/graph-rag-service.js";
export { DocumentRagService } from "./retrieval/document-rag-service.js";
// Flow manager service
export { FlowManagerService } from "./flow-manager/service.js";

View file

@ -0,0 +1,81 @@
/**
* Document embeddings query service finds similar document chunks in Qdrant.
*
* Wraps QdrantDocEmbeddingsQuery as a NATS consumer so Document RAG can look up
* chunks by vector similarity over the message bus.
*
* Python reference: trustgraph-flow/trustgraph/query/doc_embeddings/qdrant/service.py
*/
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
type ProcessorConfig,
type FlowContext,
type DocumentEmbeddingsRequest,
type DocumentEmbeddingsResponse,
} from "@trustgraph/base";
import { QdrantDocEmbeddingsQuery } from "./qdrant-doc.js";
export class DocEmbeddingsQueryService extends FlowProcessor {
private query: QdrantDocEmbeddingsQuery;
constructor(config: ProcessorConfig) {
super(config);
this.query = new QdrantDocEmbeddingsQuery();
this.registerSpecification(
new ConsumerSpec<DocumentEmbeddingsRequest>(
"document-embeddings-request",
this.onMessage.bind(this),
),
);
this.registerSpecification(
new ProducerSpec<DocumentEmbeddingsResponse>("document-embeddings-response"),
);
console.log("[DocEmbeddingsQuery] Service initialized");
}
private async onMessage(
msg: DocumentEmbeddingsRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const producer = flowCtx.flow.producer<DocumentEmbeddingsResponse>("document-embeddings-response");
const collection = msg.collection ?? "default";
try {
const allChunks: DocumentEmbeddingsResponse["chunks"] = [];
for (const vector of msg.vectors ?? []) {
const matches = await this.query.query({
vector,
user: "default",
collection,
limit: msg.limit ?? 10,
});
for (const match of matches) {
allChunks.push({ chunkId: match.chunkId, score: match.score });
}
}
await producer.send(requestId, { chunks: allChunks });
} catch (err) {
console.error("[DocEmbeddingsQuery] Query failed:", err);
await producer.send(requestId, {
chunks: [],
error: { type: "query-error", message: String(err) },
});
}
}
}
export async function run(): Promise<void> {
await DocEmbeddingsQueryService.launch("doc-embeddings-query");
}

View file

@ -0,0 +1,83 @@
/**
* Graph embeddings query service finds similar entities in Qdrant via FlowProcessor.
*
* Wraps QdrantGraphEmbeddingsQuery as a NATS consumer so Graph RAG can look up
* entities by vector similarity over the message bus.
*
* Python reference: trustgraph-flow/trustgraph/query/graph_embeddings/qdrant/service.py
*/
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
type ProcessorConfig,
type FlowContext,
type GraphEmbeddingsRequest,
type GraphEmbeddingsResponse,
} from "@trustgraph/base";
import { QdrantGraphEmbeddingsQuery } from "./qdrant-graph.js";
export class GraphEmbeddingsQueryService extends FlowProcessor {
private query: QdrantGraphEmbeddingsQuery;
constructor(config: ProcessorConfig) {
super(config);
this.query = new QdrantGraphEmbeddingsQuery();
this.registerSpecification(
new ConsumerSpec<GraphEmbeddingsRequest>(
"graph-embeddings-request",
this.onMessage.bind(this),
),
);
this.registerSpecification(
new ProducerSpec<GraphEmbeddingsResponse>("graph-embeddings-response"),
);
console.log("[GraphEmbeddingsQuery] Service initialized");
}
private async onMessage(
msg: GraphEmbeddingsRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const producer = flowCtx.flow.producer<GraphEmbeddingsResponse>("graph-embeddings-response");
const user = msg.collection ?? "default";
const collection = msg.collection ?? "default";
try {
// Query for each vector and aggregate results
const allEntities: GraphEmbeddingsResponse["entities"] = [];
for (const vector of msg.vectors ?? []) {
const matches = await this.query.query({
vector,
user,
collection,
limit: msg.limit ?? 50,
});
for (const match of matches) {
allEntities.push(match.entity);
}
}
await producer.send(requestId, { entities: allEntities });
} catch (err) {
console.error("[GraphEmbeddingsQuery] Query failed:", err);
await producer.send(requestId, {
entities: [],
error: { type: "query-error", message: String(err) },
});
}
}
}
export async function run(): Promise<void> {
await GraphEmbeddingsQueryService.launch("graph-embeddings-query");
}

View file

@ -0,0 +1,67 @@
/**
* Triples query service queries RDF triples from FalkorDB via FlowProcessor.
*
* Wraps FalkorDBTriplesQuery as a NATS consumer so the agent and Graph RAG
* can query the knowledge graph over the message bus.
*
* Python reference: trustgraph-flow/trustgraph/query/triples/falkordb/service.py
*/
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
type ProcessorConfig,
type FlowContext,
type TriplesQueryRequest,
type TriplesQueryResponse,
} from "@trustgraph/base";
import { FalkorDBTriplesQuery } from "./falkordb.js";
export class TriplesQueryService extends FlowProcessor {
private query: FalkorDBTriplesQuery;
constructor(config: ProcessorConfig) {
super(config);
this.query = new FalkorDBTriplesQuery();
this.registerSpecification(
new ConsumerSpec<TriplesQueryRequest>("triples-request", this.onMessage.bind(this)),
);
this.registerSpecification(new ProducerSpec<TriplesQueryResponse>("triples-response"));
console.log("[TriplesQuery] Service initialized");
}
private async onMessage(
msg: TriplesQueryRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const producer = flowCtx.flow.producer<TriplesQueryResponse>("triples-response");
try {
const triples = await this.query.queryTriples(
msg.s,
msg.p,
msg.o,
msg.limit ?? 100,
);
await producer.send(requestId, { triples });
} catch (err) {
console.error("[TriplesQuery] Query failed:", err);
await producer.send(requestId, {
triples: [],
error: { type: "query-error", message: String(err) },
});
}
}
}
export async function run(): Promise<void> {
await TriplesQueryService.launch("triples-query");
}

View file

@ -0,0 +1,112 @@
/**
* Document RAG service FlowProcessor wrapper around the DocumentRag class.
*
* Consumes DocumentRagRequest messages, runs the document retrieval pipeline
* (embed query find similar chunks synthesize answer), emits DocumentRagResponse.
*
* Each request gets its own DocumentRag instance for security isolation.
*
* Python reference: trustgraph-flow/trustgraph/retrieval/document_rag/
*/
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
RequestResponseSpec,
type ProcessorConfig,
type FlowContext,
type DocumentRagRequest,
type DocumentRagResponse,
type TextCompletionRequest,
type TextCompletionResponse,
type EmbeddingsRequest,
type EmbeddingsResponse,
type DocumentEmbeddingsRequest,
type DocumentEmbeddingsResponse,
type PromptRequest,
type PromptResponse,
} from "@trustgraph/base";
import { DocumentRag } from "./document-rag.js";
export class DocumentRagService extends FlowProcessor {
constructor(config: ProcessorConfig) {
super(config);
// Consumer: document RAG requests
this.registerSpecification(
new ConsumerSpec<DocumentRagRequest>("document-rag-request", this.onRequest.bind(this)),
);
// Producer: document RAG responses
this.registerSpecification(new ProducerSpec<DocumentRagResponse>("document-rag-response"));
// Request-response clients
this.registerSpecification(
new RequestResponseSpec<TextCompletionRequest, TextCompletionResponse>(
"llm",
"text-completion-request",
"text-completion-response",
),
);
this.registerSpecification(
new RequestResponseSpec<EmbeddingsRequest, EmbeddingsResponse>(
"embeddings",
"embeddings-request",
"embeddings-response",
),
);
this.registerSpecification(
new RequestResponseSpec<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>(
"doc-embeddings",
"document-embeddings-request",
"document-embeddings-response",
),
);
this.registerSpecification(
new RequestResponseSpec<PromptRequest, PromptResponse>(
"prompt",
"prompt-request",
"prompt-response",
),
);
console.log("[DocumentRag] Service initialized");
}
private async onRequest(
msg: DocumentRagRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const producer = flowCtx.flow.producer<DocumentRagResponse>("document-rag-response");
try {
const documentRag = new DocumentRag({
llm: flowCtx.flow.requestor<TextCompletionRequest, TextCompletionResponse>("llm"),
embeddings: flowCtx.flow.requestor<EmbeddingsRequest, EmbeddingsResponse>("embeddings"),
docEmbeddings: flowCtx.flow.requestor<DocumentEmbeddingsRequest, DocumentEmbeddingsResponse>("doc-embeddings"),
prompt: flowCtx.flow.requestor<PromptRequest, PromptResponse>("prompt"),
});
const response = await documentRag.query(msg.query, {
collection: msg.collection,
});
await producer.send(requestId, { response });
} catch (err) {
console.error("[DocumentRag] Query failed:", err);
await producer.send(requestId, {
response: "",
error: { type: "rag-error", message: String(err) },
});
}
}
}
export async function run(): Promise<void> {
await DocumentRagService.launch("document-rag");
}

View file

@ -0,0 +1,133 @@
/**
* Graph RAG service FlowProcessor wrapper around the GraphRag class.
*
* Consumes GraphRagRequest messages from the agent/gateway, runs the full
* Graph RAG pipeline (concept extraction entity lookup graph traversal
* edge scoring answer synthesis), and emits GraphRagResponse.
*
* Each request gets its own GraphRag instance to prevent data leakage
* across requests (security requirement from the Python implementation).
*
* Python reference: trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py
*/
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
RequestResponseSpec,
type ProcessorConfig,
type FlowContext,
type GraphRagRequest,
type GraphRagResponse,
type TextCompletionRequest,
type TextCompletionResponse,
type EmbeddingsRequest,
type EmbeddingsResponse,
type GraphEmbeddingsRequest,
type GraphEmbeddingsResponse,
type TriplesQueryRequest,
type TriplesQueryResponse,
type PromptRequest,
type PromptResponse,
} from "@trustgraph/base";
import { GraphRag } from "./graph-rag.js";
export class GraphRagService extends FlowProcessor {
constructor(config: ProcessorConfig) {
super(config);
// Consumer: graph RAG requests
this.registerSpecification(
new ConsumerSpec<GraphRagRequest>("graph-rag-request", this.onRequest.bind(this)),
);
// Producer: graph RAG responses
this.registerSpecification(new ProducerSpec<GraphRagResponse>("graph-rag-response"));
// Request-response clients for the pipeline
this.registerSpecification(
new RequestResponseSpec<TextCompletionRequest, TextCompletionResponse>(
"llm",
"text-completion-request",
"text-completion-response",
),
);
this.registerSpecification(
new RequestResponseSpec<EmbeddingsRequest, EmbeddingsResponse>(
"embeddings",
"embeddings-request",
"embeddings-response",
),
);
this.registerSpecification(
new RequestResponseSpec<GraphEmbeddingsRequest, GraphEmbeddingsResponse>(
"graph-embeddings",
"graph-embeddings-request",
"graph-embeddings-response",
),
);
this.registerSpecification(
new RequestResponseSpec<TriplesQueryRequest, TriplesQueryResponse>(
"triples",
"triples-request",
"triples-response",
),
);
this.registerSpecification(
new RequestResponseSpec<PromptRequest, PromptResponse>(
"prompt",
"prompt-request",
"prompt-response",
),
);
console.log("[GraphRag] Service initialized");
}
private async onRequest(
msg: GraphRagRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const producer = flowCtx.flow.producer<GraphRagResponse>("graph-rag-response");
try {
// Create a per-request GraphRag instance with flow clients
const graphRag = new GraphRag(
{
llm: flowCtx.flow.requestor<TextCompletionRequest, TextCompletionResponse>("llm"),
embeddings: flowCtx.flow.requestor<EmbeddingsRequest, EmbeddingsResponse>("embeddings"),
graphEmbeddings: flowCtx.flow.requestor<GraphEmbeddingsRequest, GraphEmbeddingsResponse>("graph-embeddings"),
triples: flowCtx.flow.requestor<TriplesQueryRequest, TriplesQueryResponse>("triples"),
prompt: flowCtx.flow.requestor<PromptRequest, PromptResponse>("prompt"),
},
{
entityLimit: msg.entityLimit,
tripleLimit: msg.tripleLimit,
maxSubgraphSize: msg.maxSubgraphSize,
maxPathLength: msg.maxPathLength,
},
);
const response = await graphRag.query(msg.query, {
collection: msg.collection,
});
await producer.send(requestId, { response });
} catch (err) {
console.error("[GraphRag] Query failed:", err);
await producer.send(requestId, {
response: "",
error: { type: "rag-error", message: String(err) },
});
}
}
}
export async function run(): Promise<void> {
await GraphRagService.launch("graph-rag");
}