From 0fb10aca73ea97306321748cc900ecea63f3a66a Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 08:52:29 -0500 Subject: [PATCH] Migrate metrics to Effect primitives --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 82 +++++++++++++++++++ ts/bun.lock | 8 +- ts/packages/base/package.json | 4 +- .../base/src/__tests__/metrics-effect.test.ts | 56 +++++++++++++ ts/packages/base/src/messaging/runtime.ts | 6 +- ts/packages/base/src/metrics/index.ts | 3 +- ts/packages/base/src/metrics/prometheus.ts | 79 +++++++++--------- ts/packages/flow/src/gateway/server.ts | 15 +++- 8 files changed, 197 insertions(+), 56 deletions(-) create mode 100644 ts/packages/base/src/__tests__/metrics-effect.test.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 9b03ed5f..8e6473be 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1450,6 +1450,33 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-02: Effect Metrics Prometheus Slice + +- Status: migrated and root-verified. +- Completed: + - Replaced the `@trustgraph/base` `prom-client` metric wrappers with + Effect-native `Metric.counter`, `Metric.histogram`, and + `effect/unstable/observability` `PrometheusMetrics.format`. + - Kept the existing Prometheus metric names and gateway + `/api/v1/metrics` scrape boundary while removing the direct `prom-client` + dependency and lockfile entries. + - Changed producer metric recording from a sync callback to an Effect value + that runs inside the producer send pipeline. + - Added isolated metric-registry tests for producer and consumer Prometheus + formatting. +- Verification: + - `cd ts && bun run check:tsgo` + - `cd ts/packages/base && bunx --bun vitest run src/__tests__/metrics-effect.test.ts src/__tests__/producer.test.ts src/__tests__/messaging-runtime.test.ts` + - `cd ts/packages/base && bun run build` + - `cd ts/packages/base && bun run test` + - `cd ts/packages/flow && bun run build` + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/gateway-dispatcher.test.ts` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1501,6 +1528,12 @@ Notes: and typed producer/requestor spec-object accessors. New service handlers should hoist spec objects and use those accessors; bare string accessors remain compatibility escapes. + - Base metrics are now Effect-native and Prometheus-formatted through + `PrometheusMetrics.format`; do not reopen `prom-client` unless a future + scrape requirement cannot be represented by Effect metrics. + - Numeric public timeout fields such as `timeoutMs` remain compatibility + surfaces. Internal runtime config with `Config.number(...Ms)` is still a + valid `Config.duration` / `Duration` cleanup target. - Gateway/client: - `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`. Socket errors/JSON parsing now use tagged errors and Schema decoding. @@ -1550,6 +1583,16 @@ Notes: - Shared text-completion stream iteration and the Mistral content assertion are complete. The remaining provider-layer item is parity-backed Effect AI adapter work, not a direct SDK swap. +- Scratch-note follow-ups: + - `Term` / compact client term serialization is the next strongest schema + migration: prefer `S.toTaggedUnion(...).match` or `Match` helpers over the + current native switches and unsafe serializer fallbacks. + - FlowManager and sibling service `() => Effect.gen(...)` factories remain a + broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled + after the term schema slice. + - Long-lived `Map` / `Set` state in ref-backed services can move toward + Effect collections later; static lookup tables and local pure traversal + maps/sets remain no-ops. ## Ranked Findings @@ -1619,6 +1662,42 @@ Notes: mapping, missing-token config failures, and OpenAI-compatible local-server behavior. +### No-op: Base Metrics Prometheus Wrapper + +- Status: + - Closed as a scratch-note migration target by the Effect Metrics Prometheus + slice. +- TrustGraph evidence: + - `ts/packages/base/src/metrics/prometheus.ts` + - `ts/packages/flow/src/gateway/server.ts` +- Effect primitives: + - `Metric.counter`, `Metric.histogram`, and + `effect/unstable/observability` `PrometheusMetrics.format`. +- Rule: + - Keep the gateway Fastify route as the external scrape boundary, but record + TrustGraph metrics through Effect `Metric` values. + - Use a fresh `Metric.MetricRegistry` in tests that assert exact scrape + content. + +### P1: Term And ClientTerm Tagged-Union Normalization + +- TrustGraph evidence: + - `ts/packages/base/src/schema/primitives.ts` + - `ts/packages/flow/src/gateway/dispatch/serialize.ts` + - `ts/packages/client/src/socket/trustgraph-socket.ts` +- Effect primitives: + - `S.toTaggedUnion(...).match` and `effect/Match` discriminator helpers. +- Rewrite shape: + - Add tagged-union helpers for internal `Term` and compact client terms. + - Replace serializer native switches with tagged-union matching or + `Match.discriminatorsExhaustive`. + - Remove unsafe default pass-through casts while preserving compact `g` + string compatibility. +- Tests: + - Extend base schema tests for recursive terms and add gateway serializer + coverage for all variants, nested triples, compact graph strings, and + malformed client triples. + ### P2: Canonicalize MCP Around The Effect Server - Status: @@ -1655,6 +1734,9 @@ Notes: ## Recommended PR Order 1. MCP Effect stdio parity and canonicalization. +2. Term/ClientTerm Schema tagged-union and Match normalization. +3. FlowManager/service `Effect.fn` normalization. +4. Messaging runtime `Config.duration` / `Duration` cleanup. ## No-Op Rules diff --git a/ts/bun.lock b/ts/bun.lock index 2a3136ac..43e157c1 100644 --- a/ts/bun.lock +++ b/ts/bun.lock @@ -35,8 +35,8 @@ "@effect/opentelemetry": "4.0.0-beta.75", "@effect/platform-browser": "4.0.0-beta.75", "@effect/platform-bun": "4.0.0-beta.75", + "effect": "4.0.0-beta.75", "nats": "^2.29.0", - "prom-client": "^15.1.0", }, "devDependencies": { "@effect/vitest": "4.0.0-beta.75", @@ -631,8 +631,6 @@ "bezier-js": ["bezier-js@6.1.4", "", {}, "sha512-PA0FW9ZpcHbojUCMu28z9Vg/fNkwTj5YhusSAjHHDfHDGLxJ6YUKrAN2vk1fP2MMOxVw4Oko16FMlRGVBGqLKg=="], - "bintrees": ["bintrees@1.0.2", "", {}, "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw=="], - "body-parser": ["body-parser@2.2.2", "", { "dependencies": { "bytes": "3.1.2", "content-type": "1.0.5", "debug": "4.4.3", "http-errors": "2.0.1", "iconv-lite": "0.7.2", "on-finished": "2.4.1", "qs": "6.15.0", "raw-body": "3.0.2", "type-is": "2.0.1" } }, "sha512-oP5VkATKlNwcgvxi0vM0p/D3n2C3EReYVX+DNYs5TjZFn/oQt2j+4sVJtSMr18pdRr8wjTcBl6LoV+FUwzPmNA=="], "browserslist": ["browserslist@4.28.2", "", { "dependencies": { "baseline-browser-mapping": "2.10.15", "caniuse-lite": "1.0.30001785", "electron-to-chromium": "1.5.331", "node-releases": "2.0.37", "update-browserslist-db": "1.2.3" }, "bin": { "browserslist": "cli.js" } }, "sha512-48xSriZYYg+8qXna9kwqjIVzuQxi+KYWp2+5nCYnYKPTr0LvD89Jqk2Or5ogxz0NUMfIjhh2lIUX/LyX9B4oIg=="], @@ -1115,8 +1113,6 @@ "process-warning": ["process-warning@5.0.0", "", {}, "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA=="], - "prom-client": ["prom-client@15.1.3", "", { "dependencies": { "@opentelemetry/api": "1.9.1", "tdigest": "0.1.2" } }, "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g=="], - "prop-types": ["prop-types@15.8.1", "", { "dependencies": { "loose-envify": "1.4.0", "object-assign": "4.1.1", "react-is": "16.13.1" } }, "sha512-oj87CgZICdulUohogVAR7AjlC0327U4el4L6eAvOqCeudMDVU0NThNaV+b9Df4dXgSP1gXMTnPdhfe/2qDH5cg=="], "property-information": ["property-information@7.1.0", "", {}, "sha512-TwEZ+X+yCJmYfL7TPUOcvBZ4QfoT5YenQiJuX//0th53DE6w0xxLEtfK3iyryQFddXuvkIk51EEgrJQ0WJkOmQ=="], @@ -1245,8 +1241,6 @@ "tapable": ["tapable@2.3.2", "", {}, "sha512-1MOpMXuhGzGL5TTCZFItxCc0AARf1EZFQkGqMm7ERKj8+Hgr5oLvJOVFcC+lRmR8hCe2S3jC4T5D7Vg/d7/fhA=="], - "tdigest": ["tdigest@0.1.2", "", { "dependencies": { "bintrees": "1.0.2" } }, "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA=="], - "thread-stream": ["thread-stream@4.0.0", "", { "dependencies": { "real-require": "0.2.0" } }, "sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA=="], "tinybench": ["tinybench@2.9.0", "", {}, "sha512-0+DUvqWMValLmha6lr4kD8iAMK1HzV0/aKnCtWb9v9641TnP/MFb7Pc2bxoxQjTXAErryXVgUOfv2YqNllqGeg=="], diff --git a/ts/packages/base/package.json b/ts/packages/base/package.json index 8a3fa439..61d31daf 100644 --- a/ts/packages/base/package.json +++ b/ts/packages/base/package.json @@ -28,8 +28,8 @@ "@effect/opentelemetry": "4.0.0-beta.75", "@effect/platform-browser": "4.0.0-beta.75", "@effect/platform-bun": "4.0.0-beta.75", - "nats": "^2.29.0", - "prom-client": "^15.1.0" + "effect": "4.0.0-beta.75", + "nats": "^2.29.0" }, "devDependencies": { "@effect/vitest": "4.0.0-beta.75", diff --git a/ts/packages/base/src/__tests__/metrics-effect.test.ts b/ts/packages/base/src/__tests__/metrics-effect.test.ts new file mode 100644 index 00000000..20887202 --- /dev/null +++ b/ts/packages/base/src/__tests__/metrics-effect.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Metric } from "effect"; +import { + formatPrometheusMetrics, + makeConsumerMetrics, + makeProducerMetrics, +} from "../metrics/index.js"; + +const withFreshMetrics = (effect: Effect.Effect) => + effect.pipe(Effect.provideService(Metric.MetricRegistry, new Map())); + +describe("Effect metrics", () => { + it.effect( + "formats producer metrics through Effect Prometheus exporter", + Effect.fnUntraced(function* () { + const output = yield* withFreshMetrics( + Effect.gen(function* () { + const metrics = makeProducerMetrics("processor-a", "flow-a", "producer-a"); + yield* metrics.inc; + yield* metrics.inc; + return yield* formatPrometheusMetrics; + }), + ); + + expect(output).toContain("# HELP tg_producer_items_total Producer items sent"); + expect(output).toContain("# TYPE tg_producer_items_total counter"); + expect(output).toContain('processor="processor-a"'); + expect(output).toContain('flow="flow-a"'); + expect(output).toContain('name="producer-a"'); + expect(output).toMatch(/tg_producer_items_total\{[^}]*\} 2/); + }), + ); + + it.effect( + "formats consumer metric timers and counters", + Effect.fnUntraced(function* () { + const output = yield* withFreshMetrics( + Effect.gen(function* () { + const metrics = makeConsumerMetrics("processor-a", "flow-a", "consumer-a"); + yield* metrics.recordTime(1.25); + yield* metrics.process("success"); + yield* metrics.process("error"); + yield* metrics.rateLimit; + return yield* formatPrometheusMetrics; + }), + ); + + expect(output).toContain("# TYPE tg_consumer_request_duration_seconds histogram"); + expect(output).toMatch(/tg_consumer_request_duration_seconds_count\{[^}]*\} 1/); + expect(output).toMatch(/tg_consumer_request_duration_seconds_sum\{[^}]*\} 1.25/); + expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="success"[^}]*\} 1/); + expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="error"[^}]*\} 1/); + expect(output).toMatch(/tg_consumer_rate_limit_total\{[^}]*\} 1/); + }), + ); +}); diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index b1ad12f7..82dcd75f 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -41,7 +41,7 @@ import { type MessagingTimeoutError, type PubSubError, } from "../errors.js"; -import type { ProducerMetrics } from "../metrics/prometheus.js"; +import type { ProducerMetrics } from "../metrics/index.js"; import type { FlowContext } from "./consumer.js"; import type { Flow } from "../processor/flow.js"; import type { SpecRuntimeRequirements } from "../spec/types.js"; @@ -169,9 +169,7 @@ export function makeEffectProducerHandle( Effect.tap(() => options.metrics === undefined ? Effect.void - : Effect.sync(() => { - options.metrics?.inc(); - }), + : options.metrics.inc, ), ), ), diff --git a/ts/packages/base/src/metrics/index.ts b/ts/packages/base/src/metrics/index.ts index 27011ef5..f63788ee 100644 --- a/ts/packages/base/src/metrics/index.ts +++ b/ts/packages/base/src/metrics/index.ts @@ -1,7 +1,8 @@ export { + formatPrometheusMetrics, makeConsumerMetrics, makeProducerMetrics, - registry, + prometheusContentType, type ConsumerMetrics, type ProducerMetrics, } from "./prometheus.js"; diff --git a/ts/packages/base/src/metrics/prometheus.ts b/ts/packages/base/src/metrics/prometheus.ts index 7e00133f..9dcfd374 100644 --- a/ts/packages/base/src/metrics/prometheus.ts +++ b/ts/packages/base/src/metrics/prometheus.ts @@ -1,18 +1,38 @@ /** - * Prometheus metrics wrappers. + * Effect-native metrics and Prometheus formatting helpers. * * Python reference: trustgraph-base/trustgraph/base/metrics.py */ -import { Counter, Histogram, Registry, collectDefaultMetrics } from "prom-client"; +import { Effect, Metric } from "effect"; +import { PrometheusMetrics } from "effect/unstable/observability"; -export const registry = new Registry(); -collectDefaultMetrics({ register: registry }); +export const prometheusContentType = "text/plain; version=0.0.4; charset=utf-8"; + +const consumerRequestDuration = Metric.histogram("tg_consumer_request_duration_seconds", { + description: "Consumer request processing time", + boundaries: Metric.exponentialBoundaries({ start: 0.005, factor: 2, count: 12 }), +}); + +const consumerProcessing = Metric.counter("tg_consumer_processing_total", { + description: "Consumer processing outcomes", + incremental: true, +}); + +const consumerRateLimit = Metric.counter("tg_consumer_rate_limit_total", { + description: "Consumer rate limit events", + incremental: true, +}); + +const producerItems = Metric.counter("tg_producer_items_total", { + description: "Producer items sent", + incremental: true, +}); export interface ConsumerMetrics { - readonly recordTime: (seconds: number) => void; - readonly process: (status: "success" | "error") => void; - readonly rateLimit: () => void; + readonly recordTime: (seconds: number) => Effect.Effect; + readonly process: (status: "success" | "error") => Effect.Effect; + readonly rateLimit: Effect.Effect; } export function makeConsumerMetrics( @@ -21,36 +41,22 @@ export function makeConsumerMetrics( name: string, ): ConsumerMetrics { const labels = { processor, flow, name }; - const requestHistogram = new Histogram({ - name: "tg_consumer_request_duration_seconds", - help: "Consumer request processing time", - labelNames: ["processor", "flow", "name"], - registers: [registry], - }); - - const processingCounter = new Counter({ - name: "tg_consumer_processing_total", - help: "Consumer processing outcomes", - labelNames: ["processor", "flow", "name", "status"], - registers: [registry], - }); - - const rateLimitCounter = new Counter({ - name: "tg_consumer_rate_limit_total", - help: "Consumer rate limit events", - labelNames: ["processor", "flow", "name"], - registers: [registry], - }); + const requestHistogram = Metric.withAttributes(consumerRequestDuration, labels); + const rateLimitCounter = Metric.withAttributes(consumerRateLimit, labels); return { - recordTime: (seconds) => requestHistogram.observe(labels, seconds), - process: (status) => processingCounter.inc({ ...labels, status }), - rateLimit: () => rateLimitCounter.inc(labels), + recordTime: (seconds) => Metric.update(requestHistogram, seconds), + process: (status) => + Metric.update( + Metric.withAttributes(consumerProcessing, { ...labels, status }), + 1, + ), + rateLimit: Metric.update(rateLimitCounter, 1), }; } export interface ProducerMetrics { - readonly inc: () => void; + readonly inc: Effect.Effect; } export function makeProducerMetrics( @@ -59,14 +65,11 @@ export function makeProducerMetrics( name: string, ): ProducerMetrics { const labels = { processor, flow, name }; - const counter = new Counter({ - name: "tg_producer_items_total", - help: "Producer items sent", - labelNames: ["processor", "flow", "name"], - registers: [registry], - }); + const counter = Metric.withAttributes(producerItems, labels); return { - inc: () => counter.inc(labels), + inc: Metric.update(counter, 1), }; } + +export const formatPrometheusMetrics = PrometheusMetrics.format(); diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts index 4e405250..4e775664 100644 --- a/ts/packages/flow/src/gateway/server.ts +++ b/ts/packages/flow/src/gateway/server.ts @@ -14,7 +14,14 @@ import { Cause, Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scop import * as O from "effect/Option"; import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as EffectSocket from "effect/unstable/socket/Socket"; -import { messagingLifecycleError, optionalStringConfig, registry, toTgError, type PubSubBackend } from "@trustgraph/base"; +import { + formatPrometheusMetrics, + messagingLifecycleError, + optionalStringConfig, + prometheusContentType, + toTgError, + type PubSubBackend, +} from "@trustgraph/base"; import { makeDispatcherManager } from "./dispatch/manager.js"; import { makeGatewayRpcServer } from "./rpc-server.js"; @@ -227,10 +234,10 @@ export function createGateway(config: GatewayConfig) { ); }); - // Metrics endpoint — returns Prometheus metrics from prom-client + // Metrics endpoint — returns Effect metrics in Prometheus exposition format. app.get("/api/v1/metrics", (_, reply) => { - reply.header("content-type", registry.contentType); - return registry.metrics(); + reply.header("content-type", prometheusContentType); + return Effect.runPromise(formatPrometheusMetrics); }); return {