Use MutableHashMap for prompt templates

This commit is contained in:
elpresidank 2026-06-04 07:46:43 -05:00
parent 7d77a5c1de
commit 338232efa8
3 changed files with 247 additions and 8 deletions

View file

@ -0,0 +1,219 @@
import { describe, expect, it } from "@effect/vitest";
import { ConfigProvider, Effect, Fiber } from "effect";
import * as S from "effect/Schema";
import {
MessagingRuntimeLive,
PubSub,
runProcessorScoped,
topics,
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
type CreateProducerOptions,
type Message,
type PromptRequest,
type PromptResponse,
type PubSubBackend,
} from "@trustgraph/base";
import { PromptTemplateService } from "../prompt/template.js";
class WaitForTimeout extends S.TaggedErrorClass<WaitForTimeout>()(
"WaitForTimeout",
{ label: S.String },
) {}
const isWaitForTimeout = S.is(WaitForTimeout);
function createMessage<T>(value: T, properties: Record<string, string> = {}): Message<T> {
return {
value: () => value,
properties: () => properties,
};
}
const waitFor = (condition: () => boolean, label: string) =>
Effect.tryPromise({
try: () =>
new Promise<void>((resolve, reject) => {
const deadline = Date.now() + 1000;
const check = () => {
if (condition()) {
resolve();
return;
}
if (Date.now() > deadline) {
reject(WaitForTimeout.make({ label }));
return;
}
setTimeout(check, 5);
};
check();
}),
catch: (error) => isWaitForTimeout(error) ? error : WaitForTimeout.make({ label }),
});
class RecordingProducer<T> implements BackendProducer<T> {
readonly sent: Array<{ readonly message: T; readonly properties?: Record<string, string> }> = [];
async send(message: T, properties?: Record<string, string>): Promise<void> {
this.sent.push(properties === undefined ? { message } : { message, properties });
}
async flush(): Promise<void> {}
async close(): Promise<void> {}
}
class PushConsumer<T> implements BackendConsumer<T> {
readonly acknowledged: Array<Message<T>> = [];
private readonly messages: Array<Message<T>> = [];
private readonly waiters: Array<(message: Message<T> | null) => void> = [];
private closed = false;
push(message: Message<T>): void {
const waiter = this.waiters.shift();
if (waiter !== undefined) {
waiter(message);
return;
}
this.messages.push(message);
}
async receive(): Promise<Message<T> | null> {
const message = this.messages.shift();
if (message !== undefined || this.closed) {
return message ?? null;
}
return await new Promise((resolve) => {
this.waiters.push(resolve);
});
}
async acknowledge(message: Message<T>): Promise<void> {
this.acknowledged.push(message);
}
async negativeAcknowledge(): Promise<void> {}
async unsubscribe(): Promise<void> {}
async close(): Promise<void> {
this.closed = true;
for (const waiter of this.waiters.splice(0)) {
waiter(null);
}
}
}
class PromptBackend implements PubSubBackend {
readonly configConsumer = new PushConsumer<{ readonly version: number; readonly config: Record<string, unknown> }>();
readonly consumersByTopic = new Map<string, PushConsumer<unknown>>();
readonly producersByTopic = new Map<string, RecordingProducer<unknown>>();
async createProducer<T>(options: CreateProducerOptions): Promise<BackendProducer<T>> {
const producer = new RecordingProducer<unknown>();
this.producersByTopic.set(options.topic, producer);
return producer as BackendProducer<T>;
}
async createConsumer<T>(options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
if (options.topic === topics.configPush) {
return this.configConsumer as unknown as BackendConsumer<T>;
}
const consumer = new PushConsumer<unknown>();
this.consumersByTopic.set(options.topic, consumer);
return consumer as BackendConsumer<T>;
}
async close(): Promise<void> {}
pushPromptConfig(): void {
this.configConsumer.push(createMessage({
version: 1,
config: {
flows: {
default: {
topics: {
"prompt-request": "prompt-request-topic",
"prompt-response": "prompt-response-topic",
},
},
},
prompt: {
greeting: {
system: "System for {name}",
prompt: "Hello {name} from {place}",
},
},
},
}));
}
}
const fastMessagingConfig = ConfigProvider.layer(
ConfigProvider.fromEnv({
TG_CONSUMER_RECEIVE_TIMEOUT_MS: "1",
TG_CONSUMER_ERROR_BACKOFF_MS: "1",
TG_RATE_LIMIT_RETRY_MS: "1",
TG_REQUEST_TIMEOUT_MS: "250",
}),
);
describe("PromptTemplateService", () => {
it.effect(
"renders prompt templates loaded from config through MutableHashMap state",
Effect.fnUntraced(function* () {
const backend = new PromptBackend();
yield* Effect.scoped(
Effect.gen(function* () {
const fiber = yield* runProcessorScoped(
{
id: "prompt",
pubsubUrl: "nats://unused:4222",
metricsPort: 8000,
manageProcessSignals: true,
},
(config) => new PromptTemplateService(config),
).pipe(
Effect.provide(MessagingRuntimeLive),
Effect.provide(PubSub.layer(backend)),
Effect.provide(fastMessagingConfig),
Effect.forkChild,
);
backend.pushPromptConfig();
yield* waitFor(() => backend.consumersByTopic.has("prompt-request-topic"), "prompt consumer");
yield* waitFor(() => backend.producersByTopic.has("prompt-response-topic"), "prompt producer");
yield* waitFor(() => backend.configConsumer.acknowledged.length === 1, "config ack");
const inputConsumer = backend.consumersByTopic.get("prompt-request-topic") as PushConsumer<PromptRequest>;
const outputProducer = backend.producersByTopic.get("prompt-response-topic") as RecordingProducer<PromptResponse>;
inputConsumer.push(createMessage({
name: "greeting",
variables: {
name: "Ada",
place: "TrustGraph",
},
}, { id: "request-1" }));
yield* waitFor(() => outputProducer.sent.length === 1, "prompt response");
expect(inputConsumer.acknowledged.length).toBe(1);
expect(outputProducer.sent).toEqual([
{
message: {
system: "System for Ada",
prompt: "Hello Ada from TrustGraph",
},
properties: { id: "request-1" },
},
]);
yield* Fiber.interrupt(fiber);
}),
);
}),
);
});

View file

@ -41,6 +41,8 @@ import {
import { NodeRuntime } from "@effect/platform-node";
import { makeFlowProcessorProgram } from "@trustgraph/base";
import { Effect, Layer, ManagedRuntime } from "effect";
import * as MutableHashMap from "effect/MutableHashMap";
import * as O from "effect/Option";
import * as S from "effect/Schema";
export interface PromptTemplate {
@ -67,7 +69,7 @@ interface PromptTemplateRuntime {
const programRuntimes = new WeakMap<PromptTemplateConfig, PromptTemplateRuntime>();
const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplateRuntime => {
const templates = new Map<string, PromptTemplate>();
const templates = MutableHashMap.empty<string, PromptTemplate>();
const configKey = config.configKey ?? "prompt";
const PromptResponseProducer = makeProducerSpec<PromptResponse>("prompt-response");
@ -93,17 +95,17 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate
);
if (decoded === null) return;
templates.clear();
MutableHashMap.clear(templates);
for (const [name, template] of Object.entries(decoded)) {
templates.set(name, {
MutableHashMap.set(templates, name, {
system: template.system ?? "",
prompt: template.prompt ?? "",
});
}
yield* Effect.log(
`[PromptTemplate] Loaded ${templates.size} template(s): ${[...templates.keys()].join(", ")}`,
`[PromptTemplate] Loaded ${MutableHashMap.size(templates)} template(s): ${Array.from(MutableHashMap.keys(templates)).join(", ")}`,
);
});
@ -116,7 +118,7 @@ const makePromptTemplateRuntime = (config: PromptTemplateConfig): PromptTemplate
if (requestId === undefined || requestId.length === 0) return;
const responseProducer = yield* flowCtx.flow.producerEffect(PromptResponseProducer);
const template = templates.get(msg.name);
const template = O.getOrUndefined(MutableHashMap.get(templates, msg.name));
if (template === undefined) {
yield* responseProducer.send(requestId, {
system: "",