diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 19429f61..164beaab 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -2254,6 +2254,24 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Prompt Template MutableHashMap Cache Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/prompt/template.ts` now stores loaded prompt + templates in `MutableHashMap` instead of a native `Map`. + - Config reload clears and repopulates the Effect collection, request lookup + narrows through `Option`, and logging uses `MutableHashMap.size` / `keys`. + - New focused coverage verifies a config-loaded prompt template renders + through the service request/response flow. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/prompt-template.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2440,9 +2458,9 @@ Notes: The workbench random id helper is complete; the remaining workbench `Effect.gen` match is a local one-shot command effect value. - Remaining real long-lived native collection targets include base processor - registries, Librarian service state, prompt template cache, and a workbench - module cache. The standalone Librarian collection manager is complete. - Local traversal sets and test fakes remain no-op boundaries. + registries, Librarian service state, and a workbench module cache. The + standalone Librarian collection manager and prompt template cache are + complete. Local traversal sets and test fakes remain no-op boundaries. ## Ranked Findings diff --git a/ts/packages/flow/src/__tests__/prompt-template.test.ts b/ts/packages/flow/src/__tests__/prompt-template.test.ts new file mode 100644 index 00000000..53f68dc9 --- /dev/null +++ b/ts/packages/flow/src/__tests__/prompt-template.test.ts @@ -0,0 +1,219 @@ +import { describe, expect, it } from "@effect/vitest"; +import { ConfigProvider, Effect, Fiber } from "effect"; +import * as S from "effect/Schema"; +import { + MessagingRuntimeLive, + PubSub, + runProcessorScoped, + topics, + type BackendConsumer, + type BackendProducer, + type CreateConsumerOptions, + type CreateProducerOptions, + type Message, + type PromptRequest, + type PromptResponse, + type PubSubBackend, +} from "@trustgraph/base"; +import { PromptTemplateService } from "../prompt/template.js"; + +class WaitForTimeout extends S.TaggedErrorClass()( + "WaitForTimeout", + { label: S.String }, +) {} + +const isWaitForTimeout = S.is(WaitForTimeout); + +function createMessage(value: T, properties: Record = {}): Message { + return { + value: () => value, + properties: () => properties, + }; +} + +const waitFor = (condition: () => boolean, label: string) => + Effect.tryPromise({ + try: () => + new Promise((resolve, reject) => { + const deadline = Date.now() + 1000; + const check = () => { + if (condition()) { + resolve(); + return; + } + if (Date.now() > deadline) { + reject(WaitForTimeout.make({ label })); + return; + } + setTimeout(check, 5); + }; + check(); + }), + catch: (error) => isWaitForTimeout(error) ? error : WaitForTimeout.make({ label }), + }); + +class RecordingProducer implements BackendProducer { + readonly sent: Array<{ readonly message: T; readonly properties?: Record }> = []; + + async send(message: T, properties?: Record): Promise { + this.sent.push(properties === undefined ? { message } : { message, properties }); + } + + async flush(): Promise {} + + async close(): Promise {} +} + +class PushConsumer implements BackendConsumer { + readonly acknowledged: Array> = []; + private readonly messages: Array> = []; + private readonly waiters: Array<(message: Message | null) => void> = []; + private closed = false; + + push(message: Message): void { + const waiter = this.waiters.shift(); + if (waiter !== undefined) { + waiter(message); + return; + } + this.messages.push(message); + } + + async receive(): Promise | null> { + const message = this.messages.shift(); + if (message !== undefined || this.closed) { + return message ?? null; + } + return await new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + async acknowledge(message: Message): Promise { + this.acknowledged.push(message); + } + + async negativeAcknowledge(): Promise {} + + async unsubscribe(): Promise {} + + async close(): Promise { + this.closed = true; + for (const waiter of this.waiters.splice(0)) { + waiter(null); + } + } +} + +class PromptBackend implements PubSubBackend { + readonly configConsumer = new PushConsumer<{ readonly version: number; readonly config: Record }>(); + readonly consumersByTopic = new Map>(); + readonly producersByTopic = new Map>(); + + async createProducer(options: CreateProducerOptions): Promise> { + const producer = new RecordingProducer(); + this.producersByTopic.set(options.topic, producer); + return producer as BackendProducer; + } + + async createConsumer(options: CreateConsumerOptions): Promise> { + if (options.topic === topics.configPush) { + return this.configConsumer as unknown as BackendConsumer; + } + const consumer = new PushConsumer(); + this.consumersByTopic.set(options.topic, consumer); + return consumer as BackendConsumer; + } + + async close(): Promise {} + + pushPromptConfig(): void { + this.configConsumer.push(createMessage({ + version: 1, + config: { + flows: { + default: { + topics: { + "prompt-request": "prompt-request-topic", + "prompt-response": "prompt-response-topic", + }, + }, + }, + prompt: { + greeting: { + system: "System for {name}", + prompt: "Hello {name} from {place}", + }, + }, + }, + })); + } +} + +const fastMessagingConfig = ConfigProvider.layer( + ConfigProvider.fromEnv({ + TG_CONSUMER_RECEIVE_TIMEOUT_MS: "1", + TG_CONSUMER_ERROR_BACKOFF_MS: "1", + TG_RATE_LIMIT_RETRY_MS: "1", + TG_REQUEST_TIMEOUT_MS: "250", + }), +); + +describe("PromptTemplateService", () => { + it.effect( + "renders prompt templates loaded from config through MutableHashMap state", + Effect.fnUntraced(function* () { + const backend = new PromptBackend(); + + yield* Effect.scoped( + Effect.gen(function* () { + const fiber = yield* runProcessorScoped( + { + id: "prompt", + pubsubUrl: "nats://unused:4222", + metricsPort: 8000, + manageProcessSignals: true, + }, + (config) => new PromptTemplateService(config), + ).pipe( + Effect.provide(MessagingRuntimeLive), + Effect.provide(PubSub.layer(backend)), + Effect.provide(fastMessagingConfig), + Effect.forkChild, + ); + + backend.pushPromptConfig(); + yield* waitFor(() => backend.consumersByTopic.has("prompt-request-topic"), "prompt consumer"); + yield* waitFor(() => backend.producersByTopic.has("prompt-response-topic"), "prompt producer"); + yield* waitFor(() => backend.configConsumer.acknowledged.length === 1, "config ack"); + + const inputConsumer = backend.consumersByTopic.get("prompt-request-topic") as PushConsumer; + const outputProducer = backend.producersByTopic.get("prompt-response-topic") as RecordingProducer; + + inputConsumer.push(createMessage({ + name: "greeting", + variables: { + name: "Ada", + place: "TrustGraph", + }, + }, { id: "request-1" })); + + yield* waitFor(() => outputProducer.sent.length === 1, "prompt response"); + + expect(inputConsumer.acknowledged.length).toBe(1); + expect(outputProducer.sent).toEqual([ + { + message: { + system: "System for Ada", + prompt: "Hello Ada from TrustGraph", + }, + properties: { id: "request-1" }, + }, + ]); + + yield* Fiber.interrupt(fiber); + }), + ); + }), + ); +}); diff --git a/ts/packages/flow/src/prompt/template.ts b/ts/packages/flow/src/prompt/template.ts index abbd764d..a82001a5 100644 --- a/ts/packages/flow/src/prompt/template.ts +++ b/ts/packages/flow/src/prompt/template.ts @@ -41,6 +41,8 @@ import { import { NodeRuntime } from "@effect/platform-node"; import { makeFlowProcessorProgram } from "@trustgraph/base"; import { Effect, Layer, ManagedRuntime } from "effect"; +import * as MutableHashMap from "effect/MutableHashMap"; +import * as O from "effect/Option"; import * as S from "effect/Schema"; export interface PromptTemplate { @@ -67,7 +69,7 @@ interface PromptTemplateRuntime { const programRuntimes = new WeakMap(); const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplateRuntime => { - const templates = new Map(); + const templates = MutableHashMap.empty(); const configKey = config.configKey ?? "prompt"; const PromptResponseProducer = makeProducerSpec("prompt-response"); @@ -93,17 +95,17 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate ); if (decoded === null) return; - templates.clear(); + MutableHashMap.clear(templates); for (const [name, template] of Object.entries(decoded)) { - templates.set(name, { + MutableHashMap.set(templates, name, { system: template.system ?? "", prompt: template.prompt ?? "", }); } yield* Effect.log( - `[PromptTemplate] Loaded ${templates.size} template(s): ${[...templates.keys()].join(", ")}`, + `[PromptTemplate] Loaded ${MutableHashMap.size(templates)} template(s): ${Array.from(MutableHashMap.keys(templates)).join(", ")}`, ); }); @@ -116,7 +118,7 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate if (requestId === undefined || requestId.length === 0) return; const responseProducer = yield* flowCtx.flow.producerEffect(PromptResponseProducer); - const template = templates.get(msg.name); + const template = O.getOrUndefined(MutableHashMap.get(templates, msg.name)); if (template === undefined) { yield* responseProducer.send(requestId, { system: "",