diff --git a/ts/package.json b/ts/package.json index 97f1dbef..efdb93af 100644 --- a/ts/package.json +++ b/ts/package.json @@ -13,12 +13,14 @@ "llm:openai": "tsx scripts/run-llm-openai.ts", "test:pipeline": "tsx scripts/test-pipeline.ts", "seed": "tsx scripts/seed-config.ts", + "prompt": "tsx scripts/run-prompt.ts", "agent": "tsx scripts/run-agent.ts", "librarian": "tsx scripts/run-librarian.ts", "knowledge": "tsx scripts/run-knowledge.ts", "flow-manager": "tsx scripts/run-flow-manager.ts" }, "devDependencies": { + "nats": "^2.29.0", "tsx": "^4.21.0", "turbo": "^2.5.0", "typescript": "^5.8.0" diff --git a/ts/packages/base/src/services/llm-service.ts b/ts/packages/base/src/services/llm-service.ts index ea3770a6..a58ff739 100644 --- a/ts/packages/base/src/services/llm-service.ts +++ b/ts/packages/base/src/services/llm-service.ts @@ -4,102 +4,120 @@ * Python reference: trustgraph-base/trustgraph/base/llm_service.py */ -import { FlowProcessor } from "../processor/flow-processor.js"; -import { ConsumerSpec } from "../spec/consumer-spec.js"; -import { ProducerSpec } from "../spec/producer-spec.js"; -import { ParameterSpec } from "../spec/parameter-spec.js"; -import type { ProcessorConfig } from "../processor/async-processor.js"; -import type { FlowContext } from "../messaging/consumer.js"; +import {FlowProcessor} from "../processor/index.js"; +import { + ConsumerSpec, ProducerSpec, + ParameterSpec +} from "../spec/index.js"; +import type {ProcessorConfig} from "../processor/index.js"; +import type {FlowContext} from "../messaging/consumer.js"; import type { - TextCompletionRequest, - TextCompletionResponse, + TextCompletionRequest, + TextCompletionResponse, } from "../schema/messages.js"; -import type { LlmResult, LlmChunk } from "../schema/primitives.js"; +import type {LlmResult, LlmChunk} from "../schema/index.js"; export abstract class LlmService extends FlowProcessor { - constructor(config: ProcessorConfig) { - super(config); + protected constructor(config: ProcessorConfig) { + super(config); - this.registerSpecification( - new ConsumerSpec( - "request", - this.onRequest.bind(this), - ), - ); - this.registerSpecification(new ProducerSpec("response")); - this.registerSpecification(new ParameterSpec("model")); - this.registerSpecification(new ParameterSpec("temperature")); - } + this.registerSpecification( + new ConsumerSpec( + "text-completion-request", + this.onRequest.bind(this), + ), + ); + this.registerSpecification(new ProducerSpec("text-completion-response")); + this.registerSpecification(new ParameterSpec("model")); + this.registerSpecification(new ParameterSpec("temperature")); + } - private async onRequest( - msg: TextCompletionRequest, - properties: Record, - flowCtx: FlowContext, - ): Promise { - const requestId = properties.id; - if (!requestId) return; + private async onRequest( + msg: TextCompletionRequest, + properties: Record, + flowCtx: FlowContext, + ): Promise { + const requestId = properties.id; + if (!requestId) return; - const responseProducer = flowCtx.flow.producer("response"); + const responseProducer = flowCtx.flow.producer("text-completion-response"); - try { - if (msg.streaming && this.supportsStreaming()) { - for await (const chunk of this.generateContentStream( - msg.system, - msg.prompt, - msg.model, - msg.temperature, - )) { - await responseProducer.send(requestId, { - response: chunk.text, - model: chunk.model, - inToken: chunk.inToken ?? undefined, - outToken: chunk.outToken ?? undefined, - endOfStream: chunk.isFinal, - }); - } - } else { - const result = await this.generateContent( - msg.system, - msg.prompt, - msg.model, - msg.temperature, - ); + try { + if (msg.streaming && this.supportsStreaming()) { + for await (const chunk of this.generateContentStream( + msg.system, + msg.prompt, + msg.model, + msg.temperature, + )) { + await responseProducer.send( + requestId, + { + response: chunk.text, + model: chunk.model, + inToken: chunk.inToken ?? undefined, + outToken: chunk.outToken ?? undefined, + endOfStream: chunk.isFinal, + } + ); + } + } else { + const result = await this.generateContent( + msg.system, + msg.prompt, + msg.model, + msg.temperature, + ); - await responseProducer.send(requestId, { - response: result.text, - model: result.model, - inToken: result.inToken, - outToken: result.outToken, - endOfStream: true, - }); - } - } catch (err) { - console.error(`[LlmService] Error processing request:`, err); + await responseProducer.send( + requestId, + { + response: result.text, + model: result.model, + inToken: result.inToken, + outToken: result.outToken, + endOfStream: true, + } + ); + } + } catch (err) { + console.error( + `[LlmService] Error processing request:`, + err + ); - const message = err instanceof Error ? err.message : String(err); - await responseProducer.send(requestId, { - response: "", - error: { type: "llm-error", message }, - endOfStream: true, - }); - } - } + const message = err instanceof Error + ? err.message + : String(err); + await responseProducer.send( + requestId, + { + response: "", + error: { + type: "llm-error", + message + }, + endOfStream: true, + } + ); + } + } - abstract generateContent( - system: string, - prompt: string, - model?: string, - temperature?: number, - ): Promise; + abstract generateContent( + system: string, + prompt: string, + model?: string, + temperature?: number, + ): Promise; - abstract generateContentStream( - system: string, - prompt: string, - model?: string, - temperature?: number, - ): AsyncGenerator; + abstract generateContentStream( + system: string, + prompt: string, + model?: string, + temperature?: number, + ): AsyncGenerator; - supportsStreaming(): boolean { - return false; - } + supportsStreaming(): boolean { + return false; + } } diff --git a/ts/packages/flow/src/agent/react/service.ts b/ts/packages/flow/src/agent/react/service.ts index 36a821f1..a007446b 100644 --- a/ts/packages/flow/src/agent/react/service.ts +++ b/ts/packages/flow/src/agent/react/service.ts @@ -47,11 +47,11 @@ export class AgentService extends FlowProcessor { // Consumer: agent requests this.registerSpecification( - new ConsumerSpec("request", this.onRequest.bind(this)), + new ConsumerSpec("agent-request", this.onRequest.bind(this)), ); // Producer: agent responses (streaming chunks) - this.registerSpecification(new ProducerSpec("response")); + this.registerSpecification(new ProducerSpec("agent-response")); // Request-response clients for tool execution this.registerSpecification( @@ -94,7 +94,7 @@ export class AgentService extends FlowProcessor { const requestId = properties.id; if (!requestId) return; - const responseProducer = flowCtx.flow.producer("response"); + const responseProducer = flowCtx.flow.producer("agent-response"); try { // Build tools from flow requestors diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index 93f52c0c..9cb72a8b 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -83,14 +83,14 @@ export class LibrarianService extends AsyncProcessor { while (this.running) { try { // Poll librarian requests - const libMsg = await this.libConsumer.receive(500); + const libMsg = await this.libConsumer.receive(2000); if (libMsg) { await this.handleLibrarianMessage(libMsg); await this.libConsumer.acknowledge(libMsg); } // Poll collection management requests - const colMsg = await this.colConsumer.receive(500); + const colMsg = await this.colConsumer.receive(2000); if (colMsg) { await this.handleCollectionMessage(colMsg); await this.colConsumer.acknowledge(colMsg); diff --git a/ts/packages/flow/src/prompt/template.ts b/ts/packages/flow/src/prompt/template.ts index d517e11c..23cc5a3a 100644 --- a/ts/packages/flow/src/prompt/template.ts +++ b/ts/packages/flow/src/prompt/template.ts @@ -54,11 +54,11 @@ export class PromptTemplateService extends FlowProcessor { this.registerSpecification( new ConsumerSpec( - "request", + "prompt-request", this.onRequest.bind(this), ), ); - this.registerSpecification(new ProducerSpec("response")); + this.registerSpecification(new ProducerSpec("prompt-response")); this.registerConfigHandler(this.onPromptConfig.bind(this)); @@ -106,7 +106,7 @@ export class PromptTemplateService extends FlowProcessor { const requestId = properties.id; if (!requestId) return; - const responseProducer = flowCtx.flow.producer("response"); + const responseProducer = flowCtx.flow.producer("prompt-response"); try { const template = this.templates.get(msg.name); diff --git a/ts/scripts/run-prompt.ts b/ts/scripts/run-prompt.ts new file mode 100644 index 00000000..5ffb8320 --- /dev/null +++ b/ts/scripts/run-prompt.ts @@ -0,0 +1,14 @@ +/** + * Start the prompt template service. + * + * Usage: pnpm tsx scripts/run-prompt.ts + * + * Env: + * NATS_URL (default: nats://localhost:4222) + */ +import { run } from "../packages/flow/src/prompt/template.js"; + +run().catch((err) => { + console.error("Prompt service failed:", err); + process.exit(1); +}); diff --git a/ts/scripts/seed-config.ts b/ts/scripts/seed-config.ts index 22e49e1c..075bdc91 100644 --- a/ts/scripts/seed-config.ts +++ b/ts/scripts/seed-config.ts @@ -96,8 +96,6 @@ async function main(): Promise { default: { topics: { // LLM text completion - "request": "tg.flow.text-completion-request", - "response": "tg.flow.text-completion-response", "text-completion-request": "tg.flow.text-completion-request", "text-completion-response": "tg.flow.text-completion-response", // Prompt service @@ -112,6 +110,9 @@ async function main(): Promise { // Triples "triples-request": "tg.flow.triples-request", "triples-response": "tg.flow.triples-response", + // Agent + "agent-request": "tg.flow.agent-request", + "agent-response": "tg.flow.agent-response", // Chunking pipeline "input": "tg.flow.chunk", "output": "tg.flow.chunk", diff --git a/ts/scripts/test-pipeline.ts b/ts/scripts/test-pipeline.ts index 1fa4d75c..bfc23bad 100644 --- a/ts/scripts/test-pipeline.ts +++ b/ts/scripts/test-pipeline.ts @@ -127,15 +127,29 @@ async function testConfigDelete(): Promise { async function testPushFlowConfig(): Promise { try { - // Push a flow definition that LLM services will pick up + // Push a full flow definition with all service topic mappings const res = await post("/api/v1/config", { operation: "put", keys: ["flows"], values: { default: { topics: { - request: "tg.flow.text-completion-request", - response: "tg.flow.text-completion-response", + "text-completion-request": "tg.flow.text-completion-request", + "text-completion-response": "tg.flow.text-completion-response", + "prompt-request": "tg.flow.prompt-request", + "prompt-response": "tg.flow.prompt-response", + "graph-rag-request": "tg.flow.graph-rag-request", + "graph-rag-response": "tg.flow.graph-rag-response", + "document-rag-request": "tg.flow.document-rag-request", + "document-rag-response": "tg.flow.document-rag-response", + "triples-request": "tg.flow.triples-request", + "triples-response": "tg.flow.triples-response", + "agent-request": "tg.flow.agent-request", + "agent-response": "tg.flow.agent-response", + "input": "tg.flow.chunk", + "output": "tg.flow.chunk", + "triples": "tg.flow.triples", + "entity-contexts": "tg.flow.entity-contexts", }, }, },