diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md
index 9b03ed5f..8e6473be 100644
--- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md
+++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md
@@ -1450,6 +1450,33 @@ Notes:
- `cd ts && bun run lint`
- `git diff --check`
+### 2026-06-02: Effect Metrics Prometheus Slice
+
+- Status: migrated and root-verified.
+- Completed:
+ - Replaced the `@trustgraph/base` `prom-client` metric wrappers with
+ Effect-native `Metric.counter`, `Metric.histogram`, and
+ `effect/unstable/observability` `PrometheusMetrics.format`.
+ - Kept the existing Prometheus metric names and gateway
+ `/api/v1/metrics` scrape boundary while removing the direct `prom-client`
+ dependency and lockfile entries.
+ - Changed producer metric recording from a sync callback to an Effect value
+ that runs inside the producer send pipeline.
+ - Added isolated metric-registry tests for producer and consumer Prometheus
+ formatting.
+- Verification:
+ - `cd ts && bun run check:tsgo`
+ - `cd ts/packages/base && bunx --bun vitest run src/__tests__/metrics-effect.test.ts src/__tests__/producer.test.ts src/__tests__/messaging-runtime.test.ts`
+ - `cd ts/packages/base && bun run build`
+ - `cd ts/packages/base && bun run test`
+ - `cd ts/packages/flow && bun run build`
+ - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/gateway-dispatcher.test.ts`
+ - `cd ts && bun run check`
+ - `cd ts && bun run build`
+ - `cd ts && bun run test`
+ - `cd ts && bun run lint`
+ - `git diff --check`
+
## Subagent Findings To Preserve
- MCP/workbench:
@@ -1501,6 +1528,12 @@ Notes:
and typed producer/requestor spec-object accessors. New service handlers
should hoist spec objects and use those accessors; bare string accessors
remain compatibility escapes.
+ - Base metrics are now Effect-native and Prometheus-formatted through
+ `PrometheusMetrics.format`; do not reopen `prom-client` unless a future
+ scrape requirement cannot be represented by Effect metrics.
+ - Numeric public timeout fields such as `timeoutMs` remain compatibility
+ surfaces. Internal runtime config with `Config.number(...Ms)` is still a
+ valid `Config.duration` / `Duration` cleanup target.
- Gateway/client:
- `EffectRpcClient` now owns its socket/RPC layer with `ManagedRuntime`.
Socket errors/JSON parsing now use tagged errors and Schema decoding.
@@ -1550,6 +1583,16 @@ Notes:
- Shared text-completion stream iteration and the Mistral content assertion are
complete. The remaining provider-layer item is parity-backed Effect AI
adapter work, not a direct SDK swap.
+- Scratch-note follow-ups:
+ - `Term` / compact client term serialization is the next strongest schema
+ migration: prefer `S.toTaggedUnion(...).match` or `Match` helpers over the
+ current native switches and unsafe serializer fallbacks.
+ - FlowManager and sibling service `() => Effect.gen(...)` factories remain a
+ broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled
+ after the term schema slice.
+ - Long-lived `Map` / `Set` state in ref-backed services can move toward
+ Effect collections later; static lookup tables and local pure traversal
+ maps/sets remain no-ops.
## Ranked Findings
@@ -1619,6 +1662,42 @@ Notes:
mapping, missing-token config failures, and OpenAI-compatible local-server
behavior.
+### No-op: Base Metrics Prometheus Wrapper
+
+- Status:
+ - Closed as a scratch-note migration target by the Effect Metrics Prometheus
+ slice.
+- TrustGraph evidence:
+ - `ts/packages/base/src/metrics/prometheus.ts`
+ - `ts/packages/flow/src/gateway/server.ts`
+- Effect primitives:
+ - `Metric.counter`, `Metric.histogram`, and
+ `effect/unstable/observability` `PrometheusMetrics.format`.
+- Rule:
+ - Keep the gateway Fastify route as the external scrape boundary, but record
+ TrustGraph metrics through Effect `Metric` values.
+ - Use a fresh `Metric.MetricRegistry` in tests that assert exact scrape
+ content.
+
+### P1: Term And ClientTerm Tagged-Union Normalization
+
+- TrustGraph evidence:
+ - `ts/packages/base/src/schema/primitives.ts`
+ - `ts/packages/flow/src/gateway/dispatch/serialize.ts`
+ - `ts/packages/client/src/socket/trustgraph-socket.ts`
+- Effect primitives:
+ - `S.toTaggedUnion(...).match` and `effect/Match` discriminator helpers.
+- Rewrite shape:
+ - Add tagged-union helpers for internal `Term` and compact client terms.
+ - Replace serializer native switches with tagged-union matching or
+ `Match.discriminatorsExhaustive`.
+ - Remove unsafe default pass-through casts while preserving compact `g`
+ string compatibility.
+- Tests:
+ - Extend base schema tests for recursive terms and add gateway serializer
+ coverage for all variants, nested triples, compact graph strings, and
+ malformed client triples.
+
### P2: Canonicalize MCP Around The Effect Server
- Status:
@@ -1655,6 +1734,9 @@ Notes:
## Recommended PR Order
1. MCP Effect stdio parity and canonicalization.
+2. Term/ClientTerm Schema tagged-union and Match normalization.
+3. FlowManager/service `Effect.fn` normalization.
+4. Messaging runtime `Config.duration` / `Duration` cleanup.
## No-Op Rules
diff --git a/ts/bun.lock b/ts/bun.lock
index 2a3136ac..43e157c1 100644
--- a/ts/bun.lock
+++ b/ts/bun.lock
@@ -35,8 +35,8 @@
"@effect/opentelemetry": "4.0.0-beta.75",
"@effect/platform-browser": "4.0.0-beta.75",
"@effect/platform-bun": "4.0.0-beta.75",
+ "effect": "4.0.0-beta.75",
"nats": "^2.29.0",
- "prom-client": "^15.1.0",
},
"devDependencies": {
"@effect/vitest": "4.0.0-beta.75",
@@ -631,8 +631,6 @@
"bezier-js": ["bezier-js@6.1.4", "", {}, "sha512-PA0FW9ZpcHbojUCMu28z9Vg/fNkwTj5YhusSAjHHDfHDGLxJ6YUKrAN2vk1fP2MMOxVw4Oko16FMlRGVBGqLKg=="],
- "bintrees": ["bintrees@1.0.2", "", {}, "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw=="],
-
"body-parser": ["body-parser@2.2.2", "", { "dependencies": { "bytes": "3.1.2", "content-type": "1.0.5", "debug": "4.4.3", "http-errors": "2.0.1", "iconv-lite": "0.7.2", "on-finished": "2.4.1", "qs": "6.15.0", "raw-body": "3.0.2", "type-is": "2.0.1" } }, "sha512-oP5VkATKlNwcgvxi0vM0p/D3n2C3EReYVX+DNYs5TjZFn/oQt2j+4sVJtSMr18pdRr8wjTcBl6LoV+FUwzPmNA=="],
"browserslist": ["browserslist@4.28.2", "", { "dependencies": { "baseline-browser-mapping": "2.10.15", "caniuse-lite": "1.0.30001785", "electron-to-chromium": "1.5.331", "node-releases": "2.0.37", "update-browserslist-db": "1.2.3" }, "bin": { "browserslist": "cli.js" } }, "sha512-48xSriZYYg+8qXna9kwqjIVzuQxi+KYWp2+5nCYnYKPTr0LvD89Jqk2Or5ogxz0NUMfIjhh2lIUX/LyX9B4oIg=="],
@@ -1115,8 +1113,6 @@
"process-warning": ["process-warning@5.0.0", "", {}, "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA=="],
- "prom-client": ["prom-client@15.1.3", "", { "dependencies": { "@opentelemetry/api": "1.9.1", "tdigest": "0.1.2" } }, "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g=="],
-
"prop-types": ["prop-types@15.8.1", "", { "dependencies": { "loose-envify": "1.4.0", "object-assign": "4.1.1", "react-is": "16.13.1" } }, "sha512-oj87CgZICdulUohogVAR7AjlC0327U4el4L6eAvOqCeudMDVU0NThNaV+b9Df4dXgSP1gXMTnPdhfe/2qDH5cg=="],
"property-information": ["property-information@7.1.0", "", {}, "sha512-TwEZ+X+yCJmYfL7TPUOcvBZ4QfoT5YenQiJuX//0th53DE6w0xxLEtfK3iyryQFddXuvkIk51EEgrJQ0WJkOmQ=="],
@@ -1245,8 +1241,6 @@
"tapable": ["tapable@2.3.2", "", {}, "sha512-1MOpMXuhGzGL5TTCZFItxCc0AARf1EZFQkGqMm7ERKj8+Hgr5oLvJOVFcC+lRmR8hCe2S3jC4T5D7Vg/d7/fhA=="],
- "tdigest": ["tdigest@0.1.2", "", { "dependencies": { "bintrees": "1.0.2" } }, "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA=="],
-
"thread-stream": ["thread-stream@4.0.0", "", { "dependencies": { "real-require": "0.2.0" } }, "sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA=="],
"tinybench": ["tinybench@2.9.0", "", {}, "sha512-0+DUvqWMValLmha6lr4kD8iAMK1HzV0/aKnCtWb9v9641TnP/MFb7Pc2bxoxQjTXAErryXVgUOfv2YqNllqGeg=="],
diff --git a/ts/packages/base/package.json b/ts/packages/base/package.json
index 8a3fa439..61d31daf 100644
--- a/ts/packages/base/package.json
+++ b/ts/packages/base/package.json
@@ -28,8 +28,8 @@
"@effect/opentelemetry": "4.0.0-beta.75",
"@effect/platform-browser": "4.0.0-beta.75",
"@effect/platform-bun": "4.0.0-beta.75",
- "nats": "^2.29.0",
- "prom-client": "^15.1.0"
+ "effect": "4.0.0-beta.75",
+ "nats": "^2.29.0"
},
"devDependencies": {
"@effect/vitest": "4.0.0-beta.75",
diff --git a/ts/packages/base/src/__tests__/metrics-effect.test.ts b/ts/packages/base/src/__tests__/metrics-effect.test.ts
new file mode 100644
index 00000000..20887202
--- /dev/null
+++ b/ts/packages/base/src/__tests__/metrics-effect.test.ts
@@ -0,0 +1,56 @@
+import { describe, expect, it } from "@effect/vitest";
+import { Effect, Metric } from "effect";
+import {
+ formatPrometheusMetrics,
+ makeConsumerMetrics,
+ makeProducerMetrics,
+} from "../metrics/index.js";
+
+const withFreshMetrics = (effect: Effect.Effect) =>
+ effect.pipe(Effect.provideService(Metric.MetricRegistry, new Map()));
+
+describe("Effect metrics", () => {
+ it.effect(
+ "formats producer metrics through Effect Prometheus exporter",
+ Effect.fnUntraced(function* () {
+ const output = yield* withFreshMetrics(
+ Effect.gen(function* () {
+ const metrics = makeProducerMetrics("processor-a", "flow-a", "producer-a");
+ yield* metrics.inc;
+ yield* metrics.inc;
+ return yield* formatPrometheusMetrics;
+ }),
+ );
+
+ expect(output).toContain("# HELP tg_producer_items_total Producer items sent");
+ expect(output).toContain("# TYPE tg_producer_items_total counter");
+ expect(output).toContain('processor="processor-a"');
+ expect(output).toContain('flow="flow-a"');
+ expect(output).toContain('name="producer-a"');
+ expect(output).toMatch(/tg_producer_items_total\{[^}]*\} 2/);
+ }),
+ );
+
+ it.effect(
+ "formats consumer metric timers and counters",
+ Effect.fnUntraced(function* () {
+ const output = yield* withFreshMetrics(
+ Effect.gen(function* () {
+ const metrics = makeConsumerMetrics("processor-a", "flow-a", "consumer-a");
+ yield* metrics.recordTime(1.25);
+ yield* metrics.process("success");
+ yield* metrics.process("error");
+ yield* metrics.rateLimit;
+ return yield* formatPrometheusMetrics;
+ }),
+ );
+
+ expect(output).toContain("# TYPE tg_consumer_request_duration_seconds histogram");
+ expect(output).toMatch(/tg_consumer_request_duration_seconds_count\{[^}]*\} 1/);
+ expect(output).toMatch(/tg_consumer_request_duration_seconds_sum\{[^}]*\} 1.25/);
+ expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="success"[^}]*\} 1/);
+ expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="error"[^}]*\} 1/);
+ expect(output).toMatch(/tg_consumer_rate_limit_total\{[^}]*\} 1/);
+ }),
+ );
+});
diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts
index b1ad12f7..82dcd75f 100644
--- a/ts/packages/base/src/messaging/runtime.ts
+++ b/ts/packages/base/src/messaging/runtime.ts
@@ -41,7 +41,7 @@ import {
type MessagingTimeoutError,
type PubSubError,
} from "../errors.js";
-import type { ProducerMetrics } from "../metrics/prometheus.js";
+import type { ProducerMetrics } from "../metrics/index.js";
import type { FlowContext } from "./consumer.js";
import type { Flow } from "../processor/flow.js";
import type { SpecRuntimeRequirements } from "../spec/types.js";
@@ -169,9 +169,7 @@ export function makeEffectProducerHandle(
Effect.tap(() =>
options.metrics === undefined
? Effect.void
- : Effect.sync(() => {
- options.metrics?.inc();
- }),
+ : options.metrics.inc,
),
),
),
diff --git a/ts/packages/base/src/metrics/index.ts b/ts/packages/base/src/metrics/index.ts
index 27011ef5..f63788ee 100644
--- a/ts/packages/base/src/metrics/index.ts
+++ b/ts/packages/base/src/metrics/index.ts
@@ -1,7 +1,8 @@
export {
+ formatPrometheusMetrics,
makeConsumerMetrics,
makeProducerMetrics,
- registry,
+ prometheusContentType,
type ConsumerMetrics,
type ProducerMetrics,
} from "./prometheus.js";
diff --git a/ts/packages/base/src/metrics/prometheus.ts b/ts/packages/base/src/metrics/prometheus.ts
index 7e00133f..9dcfd374 100644
--- a/ts/packages/base/src/metrics/prometheus.ts
+++ b/ts/packages/base/src/metrics/prometheus.ts
@@ -1,18 +1,38 @@
/**
- * Prometheus metrics wrappers.
+ * Effect-native metrics and Prometheus formatting helpers.
*
* Python reference: trustgraph-base/trustgraph/base/metrics.py
*/
-import { Counter, Histogram, Registry, collectDefaultMetrics } from "prom-client";
+import { Effect, Metric } from "effect";
+import { PrometheusMetrics } from "effect/unstable/observability";
-export const registry = new Registry();
-collectDefaultMetrics({ register: registry });
+export const prometheusContentType = "text/plain; version=0.0.4; charset=utf-8";
+
+const consumerRequestDuration = Metric.histogram("tg_consumer_request_duration_seconds", {
+ description: "Consumer request processing time",
+ boundaries: Metric.exponentialBoundaries({ start: 0.005, factor: 2, count: 12 }),
+});
+
+const consumerProcessing = Metric.counter("tg_consumer_processing_total", {
+ description: "Consumer processing outcomes",
+ incremental: true,
+});
+
+const consumerRateLimit = Metric.counter("tg_consumer_rate_limit_total", {
+ description: "Consumer rate limit events",
+ incremental: true,
+});
+
+const producerItems = Metric.counter("tg_producer_items_total", {
+ description: "Producer items sent",
+ incremental: true,
+});
export interface ConsumerMetrics {
- readonly recordTime: (seconds: number) => void;
- readonly process: (status: "success" | "error") => void;
- readonly rateLimit: () => void;
+ readonly recordTime: (seconds: number) => Effect.Effect;
+ readonly process: (status: "success" | "error") => Effect.Effect;
+ readonly rateLimit: Effect.Effect;
}
export function makeConsumerMetrics(
@@ -21,36 +41,22 @@ export function makeConsumerMetrics(
name: string,
): ConsumerMetrics {
const labels = { processor, flow, name };
- const requestHistogram = new Histogram({
- name: "tg_consumer_request_duration_seconds",
- help: "Consumer request processing time",
- labelNames: ["processor", "flow", "name"],
- registers: [registry],
- });
-
- const processingCounter = new Counter({
- name: "tg_consumer_processing_total",
- help: "Consumer processing outcomes",
- labelNames: ["processor", "flow", "name", "status"],
- registers: [registry],
- });
-
- const rateLimitCounter = new Counter({
- name: "tg_consumer_rate_limit_total",
- help: "Consumer rate limit events",
- labelNames: ["processor", "flow", "name"],
- registers: [registry],
- });
+ const requestHistogram = Metric.withAttributes(consumerRequestDuration, labels);
+ const rateLimitCounter = Metric.withAttributes(consumerRateLimit, labels);
return {
- recordTime: (seconds) => requestHistogram.observe(labels, seconds),
- process: (status) => processingCounter.inc({ ...labels, status }),
- rateLimit: () => rateLimitCounter.inc(labels),
+ recordTime: (seconds) => Metric.update(requestHistogram, seconds),
+ process: (status) =>
+ Metric.update(
+ Metric.withAttributes(consumerProcessing, { ...labels, status }),
+ 1,
+ ),
+ rateLimit: Metric.update(rateLimitCounter, 1),
};
}
export interface ProducerMetrics {
- readonly inc: () => void;
+ readonly inc: Effect.Effect;
}
export function makeProducerMetrics(
@@ -59,14 +65,11 @@ export function makeProducerMetrics(
name: string,
): ProducerMetrics {
const labels = { processor, flow, name };
- const counter = new Counter({
- name: "tg_producer_items_total",
- help: "Producer items sent",
- labelNames: ["processor", "flow", "name"],
- registers: [registry],
- });
+ const counter = Metric.withAttributes(producerItems, labels);
return {
- inc: () => counter.inc(labels),
+ inc: Metric.update(counter, 1),
};
}
+
+export const formatPrometheusMetrics = PrometheusMetrics.format();
diff --git a/ts/packages/flow/src/gateway/server.ts b/ts/packages/flow/src/gateway/server.ts
index 4e405250..4e775664 100644
--- a/ts/packages/flow/src/gateway/server.ts
+++ b/ts/packages/flow/src/gateway/server.ts
@@ -14,7 +14,14 @@ import { Cause, Clock, Config, Effect, Exit, Layer, ManagedRuntime, Random, Scop
import * as O from "effect/Option";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";
import * as EffectSocket from "effect/unstable/socket/Socket";
-import { messagingLifecycleError, optionalStringConfig, registry, toTgError, type PubSubBackend } from "@trustgraph/base";
+import {
+ formatPrometheusMetrics,
+ messagingLifecycleError,
+ optionalStringConfig,
+ prometheusContentType,
+ toTgError,
+ type PubSubBackend,
+} from "@trustgraph/base";
import { makeDispatcherManager } from "./dispatch/manager.js";
import { makeGatewayRpcServer } from "./rpc-server.js";
@@ -227,10 +234,10 @@ export function createGateway(config: GatewayConfig) {
);
});
- // Metrics endpoint — returns Prometheus metrics from prom-client
+ // Metrics endpoint — returns Effect metrics in Prometheus exposition format.
app.get("/api/v1/metrics", (_, reply) => {
- reply.header("content-type", registry.contentType);
- return registry.metrics();
+ reply.header("content-type", prometheusContentType);
+ return Effect.runPromise(formatPrometheusMetrics);
});
return {