Migrate metrics to Effect primitives

This commit is contained in:
elpresidank 2026-06-02 08:52:29 -05:00
parent 39db6d8235
commit 0fb10aca73
8 changed files with 197 additions and 56 deletions

View file

@ -0,0 +1,56 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect, Metric } from "effect";
import {
formatPrometheusMetrics,
makeConsumerMetrics,
makeProducerMetrics,
} from "../metrics/index.js";
const withFreshMetrics = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
effect.pipe(Effect.provideService(Metric.MetricRegistry, new Map()));
describe("Effect metrics", () => {
it.effect(
"formats producer metrics through Effect Prometheus exporter",
Effect.fnUntraced(function* () {
const output = yield* withFreshMetrics(
Effect.gen(function* () {
const metrics = makeProducerMetrics("processor-a", "flow-a", "producer-a");
yield* metrics.inc;
yield* metrics.inc;
return yield* formatPrometheusMetrics;
}),
);
expect(output).toContain("# HELP tg_producer_items_total Producer items sent");
expect(output).toContain("# TYPE tg_producer_items_total counter");
expect(output).toContain('processor="processor-a"');
expect(output).toContain('flow="flow-a"');
expect(output).toContain('name="producer-a"');
expect(output).toMatch(/tg_producer_items_total\{[^}]*\} 2/);
}),
);
it.effect(
"formats consumer metric timers and counters",
Effect.fnUntraced(function* () {
const output = yield* withFreshMetrics(
Effect.gen(function* () {
const metrics = makeConsumerMetrics("processor-a", "flow-a", "consumer-a");
yield* metrics.recordTime(1.25);
yield* metrics.process("success");
yield* metrics.process("error");
yield* metrics.rateLimit;
return yield* formatPrometheusMetrics;
}),
);
expect(output).toContain("# TYPE tg_consumer_request_duration_seconds histogram");
expect(output).toMatch(/tg_consumer_request_duration_seconds_count\{[^}]*\} 1/);
expect(output).toMatch(/tg_consumer_request_duration_seconds_sum\{[^}]*\} 1.25/);
expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="success"[^}]*\} 1/);
expect(output).toMatch(/tg_consumer_processing_total\{[^}]*status="error"[^}]*\} 1/);
expect(output).toMatch(/tg_consumer_rate_limit_total\{[^}]*\} 1/);
}),
);
});

View file

@ -41,7 +41,7 @@ import {
type MessagingTimeoutError,
type PubSubError,
} from "../errors.js";
import type { ProducerMetrics } from "../metrics/prometheus.js";
import type { ProducerMetrics } from "../metrics/index.js";
import type { FlowContext } from "./consumer.js";
import type { Flow } from "../processor/flow.js";
import type { SpecRuntimeRequirements } from "../spec/types.js";
@ -169,9 +169,7 @@ export function makeEffectProducerHandle<T>(
Effect.tap(() =>
options.metrics === undefined
? Effect.void
: Effect.sync(() => {
options.metrics?.inc();
}),
: options.metrics.inc,
),
),
),

View file

@ -1,7 +1,8 @@
export {
formatPrometheusMetrics,
makeConsumerMetrics,
makeProducerMetrics,
registry,
prometheusContentType,
type ConsumerMetrics,
type ProducerMetrics,
} from "./prometheus.js";

View file

@ -1,18 +1,38 @@
/**
* Prometheus metrics wrappers.
* Effect-native metrics and Prometheus formatting helpers.
*
* Python reference: trustgraph-base/trustgraph/base/metrics.py
*/
import { Counter, Histogram, Registry, collectDefaultMetrics } from "prom-client";
import { Effect, Metric } from "effect";
import { PrometheusMetrics } from "effect/unstable/observability";
export const registry = new Registry();
collectDefaultMetrics({ register: registry });
export const prometheusContentType = "text/plain; version=0.0.4; charset=utf-8";
const consumerRequestDuration = Metric.histogram("tg_consumer_request_duration_seconds", {
description: "Consumer request processing time",
boundaries: Metric.exponentialBoundaries({ start: 0.005, factor: 2, count: 12 }),
});
const consumerProcessing = Metric.counter("tg_consumer_processing_total", {
description: "Consumer processing outcomes",
incremental: true,
});
const consumerRateLimit = Metric.counter("tg_consumer_rate_limit_total", {
description: "Consumer rate limit events",
incremental: true,
});
const producerItems = Metric.counter("tg_producer_items_total", {
description: "Producer items sent",
incremental: true,
});
export interface ConsumerMetrics {
readonly recordTime: (seconds: number) => void;
readonly process: (status: "success" | "error") => void;
readonly rateLimit: () => void;
readonly recordTime: (seconds: number) => Effect.Effect<void>;
readonly process: (status: "success" | "error") => Effect.Effect<void>;
readonly rateLimit: Effect.Effect<void>;
}
export function makeConsumerMetrics(
@ -21,36 +41,22 @@ export function makeConsumerMetrics(
name: string,
): ConsumerMetrics {
const labels = { processor, flow, name };
const requestHistogram = new Histogram({
name: "tg_consumer_request_duration_seconds",
help: "Consumer request processing time",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
const processingCounter = new Counter({
name: "tg_consumer_processing_total",
help: "Consumer processing outcomes",
labelNames: ["processor", "flow", "name", "status"],
registers: [registry],
});
const rateLimitCounter = new Counter({
name: "tg_consumer_rate_limit_total",
help: "Consumer rate limit events",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
const requestHistogram = Metric.withAttributes(consumerRequestDuration, labels);
const rateLimitCounter = Metric.withAttributes(consumerRateLimit, labels);
return {
recordTime: (seconds) => requestHistogram.observe(labels, seconds),
process: (status) => processingCounter.inc({ ...labels, status }),
rateLimit: () => rateLimitCounter.inc(labels),
recordTime: (seconds) => Metric.update(requestHistogram, seconds),
process: (status) =>
Metric.update(
Metric.withAttributes(consumerProcessing, { ...labels, status }),
1,
),
rateLimit: Metric.update(rateLimitCounter, 1),
};
}
export interface ProducerMetrics {
readonly inc: () => void;
readonly inc: Effect.Effect<void>;
}
export function makeProducerMetrics(
@ -59,14 +65,11 @@ export function makeProducerMetrics(
name: string,
): ProducerMetrics {
const labels = { processor, flow, name };
const counter = new Counter({
name: "tg_producer_items_total",
help: "Producer items sent",
labelNames: ["processor", "flow", "name"],
registers: [registry],
});
const counter = Metric.withAttributes(producerItems, labels);
return {
inc: () => counter.inc(labels),
inc: Metric.update(counter, 1),
};
}
export const formatPrometheusMetrics = PrometheusMetrics.format();