mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-09 19:45:13 +02:00
[pitboss] phase 27: Track H.1 + H.2 — Telemetry schema versioning + sampling
This commit is contained in:
parent
ea722dc9ca
commit
3ed3a9e518
8 changed files with 799 additions and 41 deletions
|
|
@ -91,6 +91,79 @@ If scan time is unacceptable for a given workflow (e.g. IDE integration, quick
|
|||
pre-commit check), use `--no-verify` for that workflow and rely on the full scan
|
||||
in CI.
|
||||
|
||||
## Event schema
|
||||
|
||||
The dynamic layer writes one JSON record per verdict to
|
||||
`~/.cache/nyx/dynamic/events.jsonl`. Every record begins with a fixed envelope
|
||||
so older readers fail loudly instead of silently mixing incompatible shapes:
|
||||
|
||||
```json
|
||||
{
|
||||
"schema_version": 1,
|
||||
"nyx_version": "0.7.0",
|
||||
"corpus_version": "4",
|
||||
"kind": "verdict",
|
||||
"ts": "2026-05-15T18:42:09Z",
|
||||
"finding_id": "a3b1...",
|
||||
"spec_hash": "9f4e...",
|
||||
"lang": "python",
|
||||
"cap": "SQL_QUERY",
|
||||
"status": "Confirmed",
|
||||
"toolchain_id": "python-3.11",
|
||||
"toolchain_match": "exact",
|
||||
"duration_ms": 312,
|
||||
"build_attempts": 1
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Type | Meaning |
|
||||
| --- | --- | --- |
|
||||
| `schema_version` | integer | Bumped on any breaking change. Readers reject mismatches. |
|
||||
| `nyx_version` | string | `CARGO_PKG_VERSION` of the writing binary. |
|
||||
| `corpus_version` | string | Payload-corpus version the verdict was scored against. |
|
||||
| `kind` | string | `"verdict"` (per-finding) or `"rank_delta"` (rank-score shift). |
|
||||
| `ts` | RFC-3339 string | Wall-clock at write time. |
|
||||
| `finding_id` | string | Stable finding identifier. |
|
||||
| `spec_hash` | string | Hash of the `HarnessSpec` that drove the run. |
|
||||
| `lang` | string | Language slug; `"unknown"` when spec derivation failed. |
|
||||
| `cap` | string | Sink capability (e.g. `SQL_QUERY`, `CODE_EXEC`). |
|
||||
| `status` | string | `Confirmed`, `NotConfirmed`, `Inconclusive`, or `Unsupported`. |
|
||||
| `inconclusive_reason` | string | Present iff `status == Inconclusive`. |
|
||||
|
||||
A `rank_delta` record carries the envelope plus `finding_id`, `status`, and a
|
||||
signed `delta` applied to the rank score.
|
||||
|
||||
### Schema-version mismatch
|
||||
|
||||
`scripts/m7_ship_gate.sh` Gate 2 walks every line of the log, requires
|
||||
`schema_version == EXPECTED_SCHEMA_VERSION`, and exits 3 if any record fails
|
||||
the check. Programmatic readers use
|
||||
`crate::dynamic::telemetry::read_events(path)`, which surfaces the same
|
||||
condition as `TelemetryReadError::SchemaMismatch { expected, found, .. }`.
|
||||
|
||||
When schema bumps land, the canonical migration is to roll the log over (move
|
||||
or delete `events.jsonl`) so new and old records never coexist in a file. The
|
||||
gate refuses to skip silently on mismatch.
|
||||
|
||||
### Sampling
|
||||
|
||||
`[telemetry]` in `nyx.toml` controls the on-disk sampling policy:
|
||||
|
||||
```toml
|
||||
[telemetry]
|
||||
keep_all_confirmed = true # default: retain every Confirmed verdict
|
||||
keep_all_inconclusive = true # default: retain every Inconclusive verdict
|
||||
sample_rate_other = 1.0 # 0.0–1.0 for NotConfirmed / Unsupported
|
||||
```
|
||||
|
||||
`sample_rate_other < 1.0` downsamples NotConfirmed and Unsupported verdicts
|
||||
deterministically — the decision is seeded by the finding's `spec_hash`, so a
|
||||
given finding makes the same keep-or-drop call across reruns. Confirmed and
|
||||
Inconclusive verdicts ignore the rate and are always retained (they gate the
|
||||
false-Confirmed budget and drive the spec-derivation roadmap).
|
||||
|
||||
`NYX_NO_TELEMETRY=1` disables every write regardless of the policy.
|
||||
|
||||
## Opting in to feedback
|
||||
|
||||
False positives (nyx says `Confirmed` but you disagree) can be recorded:
|
||||
|
|
|
|||
|
|
@ -74,6 +74,14 @@ else
|
|||
fi
|
||||
|
||||
# ── Gate 2: False-Confirmed rate ─────────────────────────────────────────────
|
||||
#
|
||||
# Phase 27 (Track H.1): the telemetry log is schema-versioned. Gate 2 reads
|
||||
# `EXPECTED_SCHEMA_VERSION` against every record's `schema_version` field and
|
||||
# fails loudly with exit 3 when a mismatch is found — silently treating a
|
||||
# v0 (pre-Phase-27) log as "no data" would mask incompatible releases mixing
|
||||
# their records.
|
||||
EXPECTED_SCHEMA_VERSION=1
|
||||
|
||||
if skip false-confirmed; then
|
||||
info "Gate 2 (false-confirmed): SKIPPED"
|
||||
else
|
||||
|
|
@ -82,20 +90,35 @@ else
|
|||
if [[ ! -f "$EVENTS" ]]; then
|
||||
info "Gate 2: telemetry log not found at $EVENTS; skipping (no data)"
|
||||
else
|
||||
python3 - <<'PYEOF' "$EVENTS"
|
||||
set +e
|
||||
python3 - "$EVENTS" "$EXPECTED_SCHEMA_VERSION" <<'PYEOF'
|
||||
import json, sys, collections
|
||||
path = sys.argv[1]
|
||||
expected_schema = int(sys.argv[2])
|
||||
cap_counts = collections.defaultdict(lambda: {"confirmed": 0, "wrong": 0})
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
try:
|
||||
ev = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
for line_no, raw in enumerate(f, start=1):
|
||||
if not raw.strip():
|
||||
continue
|
||||
if ev.get("kind") == "feedback" and ev.get("wrong"):
|
||||
try:
|
||||
ev = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"FAIL malformed JSON at {path} line {line_no}: {e}")
|
||||
sys.exit(3)
|
||||
if "schema_version" not in ev:
|
||||
print(f"FAIL missing schema_version at {path} line {line_no}")
|
||||
sys.exit(3)
|
||||
if ev["schema_version"] != expected_schema:
|
||||
print(
|
||||
f"FAIL schema mismatch at {path} line {line_no}: "
|
||||
f"expected {expected_schema}, found {ev['schema_version']}"
|
||||
)
|
||||
sys.exit(3)
|
||||
kind = ev.get("kind", "")
|
||||
if kind == "feedback" and ev.get("wrong"):
|
||||
cap = ev.get("cap", "unknown")
|
||||
cap_counts[cap]["wrong"] += 1
|
||||
elif ev.get("kind") == "verdict" and ev.get("status") == "Confirmed":
|
||||
elif kind == "verdict" and ev.get("status") == "Confirmed":
|
||||
cap = ev.get("cap", "unknown")
|
||||
cap_counts[cap]["confirmed"] += 1
|
||||
|
||||
|
|
@ -115,8 +138,11 @@ for cap, counts in sorted(cap_counts.items()):
|
|||
sys.exit(2 if failed else 0)
|
||||
PYEOF
|
||||
RC=$?
|
||||
set -e
|
||||
if [[ $RC -eq 0 ]]; then
|
||||
pass "Gate 2: false-Confirmed rate within threshold"
|
||||
elif [[ $RC -eq 3 ]]; then
|
||||
die "Gate 2: telemetry schema mismatch (expected v$EXPECTED_SCHEMA_VERSION) — refusing to silently skip"
|
||||
else
|
||||
die "Gate 2: false-Confirmed rate exceeds 2% for one or more caps"
|
||||
fi
|
||||
|
|
|
|||
|
|
@ -3,9 +3,27 @@
|
|||
//! Writes one JSON line per verdict to `~/.cache/nyx/dynamic/events.jsonl`.
|
||||
//! `NYX_NO_TELEMETRY=1` silently disables all writes (§21.4).
|
||||
//!
|
||||
//! Schema (§21.1 minimal fields):
|
||||
//! # Schema (Phase 27)
|
||||
//!
|
||||
//! Every record starts with three envelope fields so the on-disk format can
|
||||
//! evolve across releases without silently mixing incompatible records:
|
||||
//!
|
||||
//! - `schema_version`: integer, bumped on any breaking shape change.
|
||||
//! - `nyx_version`: the Cargo package version that wrote the record.
|
||||
//! - `corpus_version`: the payload-corpus version active at write time.
|
||||
//!
|
||||
//! Followed by a `kind` discriminator (`"verdict"` or `"rank_delta"`). All
|
||||
//! readers (`read_events`, the M7 ship gate) require `schema_version ==
|
||||
//! [`SCHEMA_VERSION`]; mismatched records produce
|
||||
//! [`TelemetryReadError::SchemaMismatch`] instead of being silently parsed
|
||||
//! as if they matched.
|
||||
//!
|
||||
//! ```json
|
||||
//! {
|
||||
//! "schema_version": 1,
|
||||
//! "nyx_version": "0.7.0",
|
||||
//! "corpus_version": "4",
|
||||
//! "kind": "verdict",
|
||||
//! "ts": "<RFC-3339>",
|
||||
//! "finding_id": "...",
|
||||
//! "spec_hash": "...",
|
||||
|
|
@ -24,18 +42,37 @@ use crate::dynamic::spec::HarnessSpec;
|
|||
use crate::evidence::{InconclusiveReason, VerifyStatus};
|
||||
use directories::ProjectDirs;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
/// On-disk telemetry schema version. Bump on any breaking shape change to
|
||||
/// the JSON record. Readers reject any record whose `schema_version` does
|
||||
/// not match this constant.
|
||||
pub const SCHEMA_VERSION: u32 = 1;
|
||||
|
||||
/// Cargo package version of the Nyx build that wrote the record.
|
||||
pub const NYX_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
||||
/// Corpus-version label written into every record. Kept as a `&'static str`
|
||||
/// so it can sit on a `Serialize`-derived struct alongside the other envelope
|
||||
/// fields without an allocation. Mirrors
|
||||
/// [`crate::dynamic::corpus::CORPUS_VERSION`]; the
|
||||
/// [`corpus_version_const_matches_corpus_module`] test guards drift.
|
||||
pub const CORPUS_VERSION: &str = "4";
|
||||
|
||||
/// One telemetry event per verdict.
|
||||
///
|
||||
/// `lang` is `"unknown"` for findings whose language could not be resolved
|
||||
/// (e.g. spec derivation failed before `HarnessSpec::lang` was set). Counting
|
||||
/// these is the `lang_unknown_count` Phase 02 acceptance asks for:
|
||||
/// `grep '"lang":"unknown"' events.jsonl | wc -l`.
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct TelemetryEvent {
|
||||
pub schema_version: u32,
|
||||
pub nyx_version: &'static str,
|
||||
pub corpus_version: &'static str,
|
||||
pub kind: &'static str,
|
||||
pub ts: String,
|
||||
pub finding_id: String,
|
||||
pub spec_hash: String,
|
||||
|
|
@ -46,13 +83,13 @@ pub struct TelemetryEvent {
|
|||
pub toolchain_match: String,
|
||||
pub duration_ms: u64,
|
||||
pub build_attempts: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub inconclusive_reason: Option<String>,
|
||||
/// Path of the finding's source file, populated for spec-derivation
|
||||
/// failures so downstream consumers can map `lang="unknown"` events back
|
||||
/// to a file. Skipped on successful verdicts (the spec already carries
|
||||
/// `entry_file`).
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub path: Option<String>,
|
||||
}
|
||||
|
||||
|
|
@ -66,6 +103,10 @@ impl TelemetryEvent {
|
|||
build_attempts: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
nyx_version: NYX_VERSION,
|
||||
corpus_version: CORPUS_VERSION,
|
||||
kind: "verdict",
|
||||
ts: chrono::Utc::now().to_rfc3339(),
|
||||
finding_id: spec.finding_id.clone(),
|
||||
spec_hash: spec.spec_hash.clone(),
|
||||
|
|
@ -108,6 +149,10 @@ impl TelemetryEvent {
|
|||
.map(|e| format!("{:?}", e.sink_caps))
|
||||
.unwrap_or_else(|| "0".to_owned());
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
nyx_version: NYX_VERSION,
|
||||
corpus_version: CORPUS_VERSION,
|
||||
kind: "verdict",
|
||||
ts: chrono::Utc::now().to_rfc3339(),
|
||||
finding_id: format!("{:016x}", diag.stable_hash),
|
||||
spec_hash: String::new(),
|
||||
|
|
@ -143,6 +188,10 @@ impl TelemetryEvent {
|
|||
.map(|l| l.as_str().to_owned())
|
||||
.unwrap_or_else(|| "unknown".to_owned());
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
nyx_version: NYX_VERSION,
|
||||
corpus_version: CORPUS_VERSION,
|
||||
kind: "verdict",
|
||||
ts: chrono::Utc::now().to_rfc3339(),
|
||||
finding_id: String::new(),
|
||||
spec_hash: String::new(),
|
||||
|
|
@ -159,17 +208,112 @@ impl TelemetryEvent {
|
|||
}
|
||||
}
|
||||
|
||||
/// Sampling decision for telemetry writes (Phase 27, Track H.2).
|
||||
///
|
||||
/// Confirmed and Inconclusive verdicts are calibration-critical (false-Confirmed
|
||||
/// rate gates M7 ship; Inconclusive reasons drive the spec-derivation roadmap)
|
||||
/// and are always retained. Other verdict statuses can be downsampled to bound
|
||||
/// log growth on high-volume scans.
|
||||
///
|
||||
/// The decision is seeded by `spec_hash` so the *same* finding makes the *same*
|
||||
/// keep-or-drop call across reruns — without this, two scans of the same project
|
||||
/// would produce non-comparable event logs.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct SamplingPolicy {
|
||||
/// Always keep Confirmed verdicts. Default `true`.
|
||||
pub keep_all_confirmed: bool,
|
||||
/// Always keep Inconclusive verdicts. Default `true`.
|
||||
pub keep_all_inconclusive: bool,
|
||||
/// Probability of keeping any other verdict (NotConfirmed, Unsupported).
|
||||
/// `0.0` drops all non-retained; `1.0` keeps all. Default `1.0`.
|
||||
pub sample_rate_other: f32,
|
||||
}
|
||||
|
||||
impl Default for SamplingPolicy {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 1.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SamplingPolicy {
|
||||
/// Keep every record regardless of status. Equivalent to the pre-Phase-27
|
||||
/// behaviour and the right default for unit tests.
|
||||
pub fn keep_all() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Build the runtime policy from `[telemetry]` in `nyx.toml`.
|
||||
pub fn from_config(cfg: &crate::utils::config::TelemetryConfig) -> Self {
|
||||
Self {
|
||||
keep_all_confirmed: cfg.keep_all_confirmed,
|
||||
keep_all_inconclusive: cfg.keep_all_inconclusive,
|
||||
sample_rate_other: cfg.sample_rate_other,
|
||||
}
|
||||
}
|
||||
|
||||
/// Decide whether an event with the given status / spec_hash should be
|
||||
/// written. Deterministic for a fixed `(self, status, spec_hash)`.
|
||||
pub fn should_sample(&self, status: VerifyStatus, spec_hash: &str) -> bool {
|
||||
if matches!(status, VerifyStatus::Confirmed) && self.keep_all_confirmed {
|
||||
return true;
|
||||
}
|
||||
if matches!(status, VerifyStatus::Inconclusive) && self.keep_all_inconclusive {
|
||||
return true;
|
||||
}
|
||||
// Clamp the configured rate into [0, 1] and short-circuit the extremes
|
||||
// so we never hash a record we already know the answer for.
|
||||
let rate = self.sample_rate_other.clamp(0.0, 1.0);
|
||||
if rate >= 1.0 {
|
||||
return true;
|
||||
}
|
||||
if rate <= 0.0 {
|
||||
return false;
|
||||
}
|
||||
// Hash the spec_hash with a fixed key so the bucket is stable across
|
||||
// releases. blake3 is already in the dep tree; the first 8 bytes
|
||||
// give a uniform u64.
|
||||
let h = blake3::hash(spec_hash.as_bytes());
|
||||
let bytes: [u8; 8] = h.as_bytes()[..8].try_into().unwrap();
|
||||
let bucket = (u64::from_le_bytes(bytes) % 1_000_000) as f32 / 1_000_000.0;
|
||||
bucket < rate
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a telemetry event to the events log.
|
||||
///
|
||||
/// Silently no-ops when:
|
||||
/// - `NYX_NO_TELEMETRY=1`
|
||||
/// - The log directory cannot be created
|
||||
/// - The write fails (telemetry must never affect verdict)
|
||||
///
|
||||
/// Applies the default-`keep_all` sampling policy — every event is written.
|
||||
/// Call sites that want sampling go through [`emit_with_policy`] instead.
|
||||
pub fn emit(event: &TelemetryEvent) {
|
||||
emit_with_policy(event, &SamplingPolicy::keep_all());
|
||||
}
|
||||
|
||||
/// Like [`emit`] but consults `policy` before writing.
|
||||
///
|
||||
/// Drops the record when `policy.should_sample(...)` returns `false`. The
|
||||
/// decision is keyed on `event.spec_hash`, so the same finding produces the
|
||||
/// same keep-or-drop call across reruns.
|
||||
pub fn emit_with_policy(event: &TelemetryEvent, policy: &SamplingPolicy) {
|
||||
if std::env::var("NYX_NO_TELEMETRY").as_deref() == Ok("1") {
|
||||
return;
|
||||
}
|
||||
|
||||
// Map the &str status back into the VerifyStatus enum for the policy
|
||||
// check. Falls through to "keep" on any unrecognised string so we never
|
||||
// accidentally drop a record because of a future status variant.
|
||||
let status = parse_status(&event.status).unwrap_or(VerifyStatus::Confirmed);
|
||||
if !policy.should_sample(status, &event.spec_hash) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(path) = events_log_path() else {
|
||||
return;
|
||||
};
|
||||
|
|
@ -195,6 +339,16 @@ pub fn emit(event: &TelemetryEvent) {
|
|||
})();
|
||||
}
|
||||
|
||||
fn parse_status(s: &str) -> Option<VerifyStatus> {
|
||||
match s {
|
||||
"Confirmed" => Some(VerifyStatus::Confirmed),
|
||||
"NotConfirmed" => Some(VerifyStatus::NotConfirmed),
|
||||
"Inconclusive" => Some(VerifyStatus::Inconclusive),
|
||||
"Unsupported" => Some(VerifyStatus::Unsupported),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn events_log_path() -> Option<std::path::PathBuf> {
|
||||
// Respect explicit override for testing.
|
||||
if let Ok(p) = std::env::var("NYX_TELEMETRY_PATH") {
|
||||
|
|
@ -209,6 +363,94 @@ pub fn log_path() -> Option<std::path::PathBuf> {
|
|||
events_log_path()
|
||||
}
|
||||
|
||||
// ── Reading events back (Phase 27) ───────────────────────────────────────────
|
||||
|
||||
/// Structured error returned by [`read_events`].
|
||||
///
|
||||
/// Surfaced to the M7 ship gate so Gate 2 can fail loudly on schema-mismatch
|
||||
/// rather than silently treating mismatched records as "no data".
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TelemetryReadError {
|
||||
#[error("io error reading {path}: {source}")]
|
||||
Io {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error(
|
||||
"schema mismatch in {path} line {line}: expected schema_version={expected}, found {found}"
|
||||
)]
|
||||
SchemaMismatch {
|
||||
path: PathBuf,
|
||||
line: usize,
|
||||
expected: u32,
|
||||
found: u32,
|
||||
},
|
||||
#[error("missing schema_version in {path} line {line}")]
|
||||
MissingSchemaVersion { path: PathBuf, line: usize },
|
||||
#[error("malformed JSON in {path} line {line}: {source}")]
|
||||
Json {
|
||||
path: PathBuf,
|
||||
line: usize,
|
||||
#[source]
|
||||
source: serde_json::Error,
|
||||
},
|
||||
}
|
||||
|
||||
/// Read every event record from the JSONL log at `path`.
|
||||
///
|
||||
/// Returns each line as a `serde_json::Value` so callers can dispatch on the
|
||||
/// `kind` discriminator themselves. Rejects any record whose `schema_version`
|
||||
/// does not match [`SCHEMA_VERSION`] (this is the explicit failure mode the
|
||||
/// M7 ship gate Gate 2 consumes — a v0 record from an older release must not
|
||||
/// silently parse as if the schema had never changed).
|
||||
///
|
||||
/// Blank lines are skipped. Any malformed JSON or missing `schema_version`
|
||||
/// fails the whole read; partial recovery is not the contract here because
|
||||
/// the ship gate already treats "log missing or unreadable" as "no data,
|
||||
/// skip Gate 2 with a notice."
|
||||
pub fn read_events(path: &Path) -> Result<Vec<serde_json::Value>, TelemetryReadError> {
|
||||
let file = std::fs::File::open(path).map_err(|e| TelemetryReadError::Io {
|
||||
path: path.to_path_buf(),
|
||||
source: e,
|
||||
})?;
|
||||
let reader = BufReader::new(file);
|
||||
let mut out = Vec::new();
|
||||
for (idx, line) in reader.lines().enumerate() {
|
||||
let line_no = idx + 1;
|
||||
let line = line.map_err(|e| TelemetryReadError::Io {
|
||||
path: path.to_path_buf(),
|
||||
source: e,
|
||||
})?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&line).map_err(|e| TelemetryReadError::Json {
|
||||
path: path.to_path_buf(),
|
||||
line: line_no,
|
||||
source: e,
|
||||
})?;
|
||||
let found = value
|
||||
.get("schema_version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| TelemetryReadError::MissingSchemaVersion {
|
||||
path: path.to_path_buf(),
|
||||
line: line_no,
|
||||
})?;
|
||||
if found != SCHEMA_VERSION as u64 {
|
||||
return Err(TelemetryReadError::SchemaMismatch {
|
||||
path: path.to_path_buf(),
|
||||
line: line_no,
|
||||
expected: SCHEMA_VERSION,
|
||||
found: found as u32,
|
||||
});
|
||||
}
|
||||
out.push(value);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// ── Rank delta telemetry ──────────────────────────────────────────────────────
|
||||
|
||||
/// One telemetry event per ranked finding that carries a dynamic verdict delta.
|
||||
|
|
@ -216,11 +458,14 @@ pub fn log_path() -> Option<std::path::PathBuf> {
|
|||
/// Emitted by `rank::rank_diags` for every diag whose dynamic verdict shifts
|
||||
/// its rank score (delta != 0). Used by the M7 calibration pipeline to tune
|
||||
/// the N/M boost/penalty constants from real-world verdict distributions.
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct RankDeltaEvent {
|
||||
pub ts: String,
|
||||
pub schema_version: u32,
|
||||
pub nyx_version: &'static str,
|
||||
pub corpus_version: &'static str,
|
||||
/// Always `"rank_delta"` — distinguishes from verdict events in the log.
|
||||
pub event_type: &'static str,
|
||||
pub kind: &'static str,
|
||||
pub ts: String,
|
||||
pub finding_id: String,
|
||||
/// `"Confirmed"`, `"NotConfirmed"`, etc.
|
||||
pub status: String,
|
||||
|
|
@ -228,6 +473,21 @@ pub struct RankDeltaEvent {
|
|||
pub delta: f64,
|
||||
}
|
||||
|
||||
impl RankDeltaEvent {
|
||||
pub fn new(finding_id: String, status: String, delta: f64) -> Self {
|
||||
Self {
|
||||
schema_version: SCHEMA_VERSION,
|
||||
nyx_version: NYX_VERSION,
|
||||
corpus_version: CORPUS_VERSION,
|
||||
kind: "rank_delta",
|
||||
ts: chrono::Utc::now().to_rfc3339(),
|
||||
finding_id,
|
||||
status,
|
||||
delta,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a rank-delta telemetry event to the events log.
|
||||
///
|
||||
/// Silently no-ops under the same conditions as [`emit`]:
|
||||
|
|
@ -306,6 +566,10 @@ mod tests {
|
|||
let content = std::fs::read_to_string(&log).unwrap();
|
||||
assert!(!content.is_empty());
|
||||
let v: serde_json::Value = serde_json::from_str(content.trim()).unwrap();
|
||||
assert_eq!(v["schema_version"], SCHEMA_VERSION);
|
||||
assert_eq!(v["nyx_version"], NYX_VERSION);
|
||||
assert_eq!(v["corpus_version"], CORPUS_VERSION);
|
||||
assert_eq!(v["kind"], "verdict");
|
||||
assert_eq!(v["status"], "Confirmed");
|
||||
assert_eq!(v["toolchain_match"], "exact");
|
||||
|
||||
|
|
@ -328,6 +592,8 @@ mod tests {
|
|||
assert_eq!(event.path.as_deref(), Some("/tmp/some_script_no_ext"));
|
||||
assert!(event.spec_hash.is_empty());
|
||||
assert_eq!(event.status, "Unsupported");
|
||||
assert_eq!(event.schema_version, SCHEMA_VERSION);
|
||||
assert_eq!(event.kind, "verdict");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -347,8 +613,7 @@ mod tests {
|
|||
tried: vec![SpecDerivationStrategy::FromFlowSteps],
|
||||
hint: "kotlin source".to_owned(),
|
||||
};
|
||||
let event =
|
||||
TelemetryEvent::no_spec(&diag, VerifyStatus::Inconclusive, Some(reason));
|
||||
let event = TelemetryEvent::no_spec(&diag, VerifyStatus::Inconclusive, Some(reason));
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
assert!(json.contains("\"lang\":\"java\""));
|
||||
assert!(json.contains("SpecDerivationFailed"));
|
||||
|
|
@ -381,4 +646,186 @@ mod tests {
|
|||
std::env::remove_var("NYX_TELEMETRY_PATH");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corpus_version_const_matches_corpus_module() {
|
||||
assert_eq!(
|
||||
CORPUS_VERSION,
|
||||
crate::dynamic::corpus::CORPUS_VERSION.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_rejects_schema_zero() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
std::fs::write(
|
||||
&log,
|
||||
"{\"schema_version\":0,\"kind\":\"verdict\",\"status\":\"Confirmed\"}\n",
|
||||
)
|
||||
.unwrap();
|
||||
let err = read_events(&log).expect_err("schema 0 must be rejected");
|
||||
match err {
|
||||
TelemetryReadError::SchemaMismatch { expected, found, .. } => {
|
||||
assert_eq!(expected, SCHEMA_VERSION);
|
||||
assert_eq!(found, 0);
|
||||
}
|
||||
other => panic!("unexpected error: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_accepts_current_schema() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
let event = TelemetryEvent::new(
|
||||
&make_spec(),
|
||||
VerifyStatus::Confirmed,
|
||||
None,
|
||||
"exact",
|
||||
Duration::from_millis(1),
|
||||
1,
|
||||
);
|
||||
let line = serde_json::to_string(&event).unwrap();
|
||||
std::fs::write(&log, format!("{line}\n\n")).unwrap();
|
||||
let events = read_events(&log).unwrap();
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_eq!(events[0]["kind"], "verdict");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_rejects_missing_schema() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
std::fs::write(&log, "{\"kind\":\"verdict\"}\n").unwrap();
|
||||
match read_events(&log).unwrap_err() {
|
||||
TelemetryReadError::MissingSchemaVersion { .. } => {}
|
||||
other => panic!("expected MissingSchemaVersion, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_rejects_malformed_json() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
std::fs::write(&log, "{not json\n").unwrap();
|
||||
match read_events(&log).unwrap_err() {
|
||||
TelemetryReadError::Json { .. } => {}
|
||||
other => panic!("expected Json, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_keeps_confirmed_and_inconclusive() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.0,
|
||||
};
|
||||
assert!(policy.should_sample(VerifyStatus::Confirmed, "any"));
|
||||
assert!(policy.should_sample(VerifyStatus::Inconclusive, "any"));
|
||||
assert!(!policy.should_sample(VerifyStatus::NotConfirmed, "any"));
|
||||
assert!(!policy.should_sample(VerifyStatus::Unsupported, "any"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_is_deterministic_per_spec_hash() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.5,
|
||||
};
|
||||
let first = policy.should_sample(VerifyStatus::NotConfirmed, "deadbeef");
|
||||
for _ in 0..100 {
|
||||
assert_eq!(
|
||||
first,
|
||||
policy.should_sample(VerifyStatus::NotConfirmed, "deadbeef")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_rate_one_keeps_everything() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: false,
|
||||
keep_all_inconclusive: false,
|
||||
sample_rate_other: 1.0,
|
||||
};
|
||||
for hash in &["a", "b", "c", "deadbeef", ""] {
|
||||
assert!(policy.should_sample(VerifyStatus::NotConfirmed, hash));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_rate_zero_drops_everything_else() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.0,
|
||||
};
|
||||
for hash in &["a", "b", "c", "deadbeef"] {
|
||||
assert!(!policy.should_sample(VerifyStatus::NotConfirmed, hash));
|
||||
assert!(!policy.should_sample(VerifyStatus::Unsupported, hash));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_rate_half_buckets_roughly_evenly() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.5,
|
||||
};
|
||||
let kept = (0..1000)
|
||||
.filter(|i| {
|
||||
let h = format!("hash-{i:06x}");
|
||||
policy.should_sample(VerifyStatus::NotConfirmed, &h)
|
||||
})
|
||||
.count();
|
||||
// Loose envelope around 500/1000. Tight enough to catch a "always
|
||||
// keep" or "always drop" regression, wide enough to avoid flakes.
|
||||
assert!(
|
||||
kept > 350 && kept < 650,
|
||||
"expected ~500/1000 kept at rate 0.5, got {kept}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_with_policy_drops_when_unsampled() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
unsafe { std::env::set_var("NYX_TELEMETRY_PATH", log.to_str().unwrap()) };
|
||||
|
||||
let mut spec = make_spec();
|
||||
spec.spec_hash = "drop-me".into();
|
||||
let event = TelemetryEvent::new(
|
||||
&spec,
|
||||
VerifyStatus::NotConfirmed,
|
||||
None,
|
||||
"exact",
|
||||
Duration::from_millis(1),
|
||||
1,
|
||||
);
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.0,
|
||||
};
|
||||
emit_with_policy(&event, &policy);
|
||||
|
||||
assert!(!log.exists(), "event must not be written when policy drops");
|
||||
|
||||
unsafe { std::env::remove_var("NYX_TELEMETRY_PATH") };
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rank_delta_carries_envelope_fields() {
|
||||
let event = RankDeltaEvent::new("abc".into(), "Confirmed".into(), 2.5);
|
||||
assert_eq!(event.schema_version, SCHEMA_VERSION);
|
||||
assert_eq!(event.nyx_version, NYX_VERSION);
|
||||
assert_eq!(event.corpus_version, CORPUS_VERSION);
|
||||
assert_eq!(event.kind, "rank_delta");
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
assert!(json.starts_with("{\"schema_version\":1"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use crate::dynamic::runner::{run_spec, RunError};
|
|||
use crate::dynamic::sandbox::{toolchain_id_with_digest, SandboxOptions};
|
||||
use crate::dynamic::spec::{HarnessSpec, SPEC_FORMAT_VERSION};
|
||||
use crate::dynamic::stubs::StubHarness;
|
||||
use crate::dynamic::telemetry::{self, TelemetryEvent};
|
||||
use crate::dynamic::telemetry::{self, SamplingPolicy, TelemetryEvent};
|
||||
use crate::dynamic::toolchain;
|
||||
use crate::evidence::{InconclusiveReason, SpecDerivationStrategy, UnsupportedReason};
|
||||
use crate::summary::GlobalSummaries;
|
||||
|
|
@ -62,6 +62,10 @@ pub struct VerifyOptions {
|
|||
/// [`crate::evidence::InconclusiveReason::BackendInsufficient`]
|
||||
/// rather than running against an unhardened host.
|
||||
pub refuse_filesystem_confirm: bool,
|
||||
/// Phase 27 (Track H.2): sampling policy applied to every telemetry
|
||||
/// event emitted from the verify pipeline. Default `keep_all` so unit
|
||||
/// tests and embedded callers do not silently lose records.
|
||||
pub telemetry_policy: SamplingPolicy,
|
||||
}
|
||||
|
||||
impl VerifyOptions {
|
||||
|
|
@ -116,6 +120,7 @@ impl VerifyOptions {
|
|||
summaries: None,
|
||||
callgraph: None,
|
||||
refuse_filesystem_confirm,
|
||||
telemetry_policy: SamplingPolicy::from_config(&config.telemetry),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -242,6 +247,7 @@ fn entry_kind_unsupported_verdict(
|
|||
spec_entry_path: &str,
|
||||
lang: crate::symbol::Lang,
|
||||
attempted: crate::dynamic::spec::EntryKind,
|
||||
policy: &SamplingPolicy,
|
||||
) -> VerifyResult {
|
||||
let supported = crate::dynamic::lang::entry_kinds_supported(lang).to_vec();
|
||||
let hint = crate::dynamic::lang::entry_kind_hint(lang, attempted);
|
||||
|
|
@ -263,7 +269,7 @@ fn entry_kind_unsupported_verdict(
|
|||
Some(inconclusive_reason.clone()),
|
||||
),
|
||||
};
|
||||
telemetry::emit(&event);
|
||||
telemetry::emit_with_policy(&event, policy);
|
||||
VerifyResult {
|
||||
finding_id,
|
||||
status: VerifyStatus::Inconclusive,
|
||||
|
|
@ -290,6 +296,7 @@ fn spec_derivation_failed_verdict(
|
|||
finding_id: String,
|
||||
diag: &Diag,
|
||||
reason: UnsupportedReason,
|
||||
policy: &SamplingPolicy,
|
||||
) -> VerifyResult {
|
||||
if matches!(reason, UnsupportedReason::SpecDerivationFailed) && should_be_inconclusive(diag) {
|
||||
let strategies: Vec<SpecDerivationStrategy> =
|
||||
|
|
@ -304,7 +311,7 @@ fn spec_derivation_failed_verdict(
|
|||
VerifyStatus::Inconclusive,
|
||||
Some(inconclusive_reason.clone()),
|
||||
);
|
||||
telemetry::emit(&event);
|
||||
telemetry::emit_with_policy(&event, policy);
|
||||
return VerifyResult {
|
||||
finding_id,
|
||||
status: VerifyStatus::Inconclusive,
|
||||
|
|
@ -319,7 +326,7 @@ fn spec_derivation_failed_verdict(
|
|||
}
|
||||
|
||||
let event = TelemetryEvent::no_spec(diag, VerifyStatus::Unsupported, None);
|
||||
telemetry::emit(&event);
|
||||
telemetry::emit_with_policy(&event, policy);
|
||||
|
||||
VerifyResult {
|
||||
finding_id,
|
||||
|
|
@ -388,7 +395,12 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
|
|||
) {
|
||||
Ok(s) => s,
|
||||
Err(reason) => {
|
||||
return spec_derivation_failed_verdict(finding_id, diag, reason);
|
||||
return spec_derivation_failed_verdict(
|
||||
finding_id,
|
||||
diag,
|
||||
reason,
|
||||
&opts.telemetry_policy,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -404,6 +416,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
|
|||
&spec.entry_file,
|
||||
spec.lang,
|
||||
spec.entry_kind,
|
||||
&opts.telemetry_policy,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -574,7 +587,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
|
|||
elapsed,
|
||||
build_attempts,
|
||||
);
|
||||
telemetry::emit(&event);
|
||||
telemetry::emit_with_policy(&event, &opts.telemetry_policy);
|
||||
|
||||
verdict
|
||||
}
|
||||
|
|
@ -809,6 +822,7 @@ fn build_verdict(
|
|||
&spec.entry_file,
|
||||
spec.lang,
|
||||
spec.entry_kind,
|
||||
&opts.telemetry_policy,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -222,13 +222,11 @@ pub fn rank_diags(diags: &mut [Diag]) {
|
|||
.and_then(|ev| ev.dynamic_verdict.as_ref())
|
||||
.map(|dv| format!("{:?}", dv.status))
|
||||
.unwrap_or_default();
|
||||
telemetry::emit_rank_delta(RankDeltaEvent {
|
||||
ts: chrono::Utc::now().to_rfc3339(),
|
||||
event_type: "rank_delta",
|
||||
finding_id: d.finding_id.clone(),
|
||||
telemetry::emit_rank_delta(RankDeltaEvent::new(
|
||||
d.finding_id.clone(),
|
||||
status,
|
||||
delta,
|
||||
});
|
||||
));
|
||||
}
|
||||
}
|
||||
diags.sort_by(|a, b| {
|
||||
|
|
|
|||
|
|
@ -758,6 +758,30 @@ impl Default for ServerConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Phase 27 — `[telemetry]` section. Controls the on-disk event log
|
||||
/// sampling policy. Confirmed and Inconclusive verdicts are calibration
|
||||
/// critical and are retained by default; other verdict statuses can be
|
||||
/// downsampled via `sample_rate_other` to bound log growth on high-volume
|
||||
/// scans. Decisions are seeded by `spec_hash` for determinism — see
|
||||
/// [`crate::dynamic::telemetry::SamplingPolicy`].
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct TelemetryConfig {
|
||||
pub keep_all_confirmed: bool,
|
||||
pub keep_all_inconclusive: bool,
|
||||
pub sample_rate_other: f32,
|
||||
}
|
||||
|
||||
impl Default for TelemetryConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 1.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for scan run persistence and history.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(default)]
|
||||
|
|
@ -880,6 +904,10 @@ pub struct Config {
|
|||
pub detectors: crate::utils::detector_options::DetectorOptions,
|
||||
pub server: ServerConfig,
|
||||
pub runs: RunsConfig,
|
||||
/// Phase 27 — `[telemetry]` section. Sampling policy for the dynamic
|
||||
/// event log.
|
||||
#[serde(default)]
|
||||
pub telemetry: TelemetryConfig,
|
||||
pub profiles: HashMap<String, ScanProfile>,
|
||||
/// Detected frameworks for the current project, set by the scan pipeline,
|
||||
/// not persisted to config files.
|
||||
|
|
@ -1186,6 +1214,9 @@ pub(crate) fn merge_configs(mut default: Config, user: Config) -> Config {
|
|||
// --- RunsConfig ---
|
||||
default.runs = user.runs;
|
||||
|
||||
// --- TelemetryConfig ---
|
||||
default.telemetry = user.telemetry;
|
||||
|
||||
// --- Profiles (user profile with same name fully replaces) ---
|
||||
for (name, profile) in user.profiles {
|
||||
default.profiles.insert(name, profile);
|
||||
|
|
|
|||
|
|
@ -102,12 +102,7 @@ mod parity_tests {
|
|||
timeout: Duration::from_secs(10),
|
||||
..SandboxOptions::default()
|
||||
},
|
||||
project_root: None,
|
||||
db_path: None,
|
||||
verify_all_confidence: false,
|
||||
summaries: None,
|
||||
callgraph: None,
|
||||
refuse_filesystem_confirm: false,
|
||||
..VerifyOptions::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -118,12 +113,7 @@ mod parity_tests {
|
|||
timeout: Duration::from_secs(30),
|
||||
..SandboxOptions::default()
|
||||
},
|
||||
project_root: None,
|
||||
db_path: None,
|
||||
verify_all_confidence: false,
|
||||
summaries: None,
|
||||
callgraph: None,
|
||||
refuse_filesystem_confirm: false,
|
||||
..VerifyOptions::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
179
tests/telemetry_schema.rs
Normal file
179
tests/telemetry_schema.rs
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
//! Phase 27 — Track H.1 integration test.
|
||||
//!
|
||||
//! Locks in the on-disk telemetry schema contract that `scripts/m7_ship_gate.sh`
|
||||
//! Gate 2 relies on:
|
||||
//!
|
||||
//! - Records produced today carry the `schema_version`, `nyx_version`, and
|
||||
//! `corpus_version` envelope fields, plus a `kind` discriminator.
|
||||
//! - `read_events(path)` accepts the current schema.
|
||||
//! - A hand-crafted record with `schema_version: 0` is rejected by
|
||||
//! `read_events` with a typed [`TelemetryReadError::SchemaMismatch`] (this
|
||||
//! is the explicit Phase 27 acceptance bullet).
|
||||
//! - The sampling policy retains Confirmed and Inconclusive verdicts even at
|
||||
//! `sample_rate_other = 0.0`.
|
||||
|
||||
#![cfg(feature = "dynamic")]
|
||||
|
||||
use nyx_scanner::dynamic::telemetry::{
|
||||
self, RankDeltaEvent, SamplingPolicy, TelemetryEvent, TelemetryReadError, CORPUS_VERSION,
|
||||
NYX_VERSION, SCHEMA_VERSION,
|
||||
};
|
||||
use nyx_scanner::dynamic::spec::{EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy};
|
||||
use nyx_scanner::evidence::VerifyStatus;
|
||||
use nyx_scanner::labels::Cap;
|
||||
use nyx_scanner::symbol::Lang;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn make_spec(hash: &str) -> HarnessSpec {
|
||||
HarnessSpec {
|
||||
finding_id: "0000000000000001".into(),
|
||||
entry_file: "handler.py".into(),
|
||||
entry_name: "handle".into(),
|
||||
entry_kind: EntryKind::Function,
|
||||
lang: Lang::Python,
|
||||
toolchain_id: "python-3.11".into(),
|
||||
payload_slot: PayloadSlot::Param(0),
|
||||
expected_cap: Cap::SQL_QUERY,
|
||||
constraint_hints: vec![],
|
||||
sink_file: "handler.py".into(),
|
||||
sink_line: 5,
|
||||
spec_hash: hash.into(),
|
||||
derivation: SpecDerivationStrategy::FromFlowSteps,
|
||||
stubs_required: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn current_record_carries_envelope_fields() {
|
||||
let event = TelemetryEvent::new(
|
||||
&make_spec("abcd1234"),
|
||||
VerifyStatus::Confirmed,
|
||||
None,
|
||||
"exact",
|
||||
Duration::from_millis(7),
|
||||
1,
|
||||
);
|
||||
let v: serde_json::Value = serde_json::to_value(&event).unwrap();
|
||||
assert_eq!(v["schema_version"], SCHEMA_VERSION);
|
||||
assert_eq!(v["nyx_version"], NYX_VERSION);
|
||||
assert_eq!(v["corpus_version"], CORPUS_VERSION);
|
||||
assert_eq!(v["kind"], "verdict");
|
||||
|
||||
let rank = RankDeltaEvent::new("a".into(), "Confirmed".into(), 2.0);
|
||||
let v: serde_json::Value = serde_json::to_value(&rank).unwrap();
|
||||
assert_eq!(v["schema_version"], SCHEMA_VERSION);
|
||||
assert_eq!(v["kind"], "rank_delta");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_accepts_current_schema() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
let mut content = String::new();
|
||||
for i in 0..3 {
|
||||
let event = TelemetryEvent::new(
|
||||
&make_spec(&format!("hash{i}")),
|
||||
VerifyStatus::Confirmed,
|
||||
None,
|
||||
"exact",
|
||||
Duration::from_millis(1),
|
||||
1,
|
||||
);
|
||||
content.push_str(&serde_json::to_string(&event).unwrap());
|
||||
content.push('\n');
|
||||
}
|
||||
std::fs::write(&log, content).unwrap();
|
||||
|
||||
let records = telemetry::read_events(&log).unwrap();
|
||||
assert_eq!(records.len(), 3);
|
||||
for r in &records {
|
||||
assert_eq!(r["schema_version"], SCHEMA_VERSION);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_rejects_schema_zero_record() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
// Hand-crafted v0 record — exactly the case the Phase 27 acceptance pins.
|
||||
std::fs::write(
|
||||
&log,
|
||||
"{\"schema_version\":0,\"kind\":\"verdict\",\"status\":\"Confirmed\"}\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let err = telemetry::read_events(&log).expect_err("schema 0 must be rejected");
|
||||
match err {
|
||||
TelemetryReadError::SchemaMismatch {
|
||||
expected, found, ..
|
||||
} => {
|
||||
assert_eq!(expected, SCHEMA_VERSION);
|
||||
assert_eq!(found, 0);
|
||||
}
|
||||
other => panic!("expected SchemaMismatch, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_events_rejects_mixed_schema_record_inside_valid_log() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let log = dir.path().join("events.jsonl");
|
||||
let good = serde_json::to_string(&TelemetryEvent::new(
|
||||
&make_spec("good"),
|
||||
VerifyStatus::Confirmed,
|
||||
None,
|
||||
"exact",
|
||||
Duration::from_millis(1),
|
||||
1,
|
||||
))
|
||||
.unwrap();
|
||||
let bad = "{\"schema_version\":0,\"kind\":\"verdict\"}";
|
||||
std::fs::write(&log, format!("{good}\n{bad}\n")).unwrap();
|
||||
|
||||
match telemetry::read_events(&log).unwrap_err() {
|
||||
TelemetryReadError::SchemaMismatch { line, found, .. } => {
|
||||
assert_eq!(line, 2);
|
||||
assert_eq!(found, 0);
|
||||
}
|
||||
other => panic!("expected SchemaMismatch on line 2, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_retains_confirmed_and_inconclusive() {
|
||||
let strict = SamplingPolicy {
|
||||
keep_all_confirmed: true,
|
||||
keep_all_inconclusive: true,
|
||||
sample_rate_other: 0.0,
|
||||
};
|
||||
for hash in ["a", "b", "spec-1234", "deadbeef"] {
|
||||
assert!(strict.should_sample(VerifyStatus::Confirmed, hash));
|
||||
assert!(strict.should_sample(VerifyStatus::Inconclusive, hash));
|
||||
assert!(!strict.should_sample(VerifyStatus::NotConfirmed, hash));
|
||||
assert!(!strict.should_sample(VerifyStatus::Unsupported, hash));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sampling_policy_is_deterministic_across_runs() {
|
||||
let policy = SamplingPolicy {
|
||||
keep_all_confirmed: false,
|
||||
keep_all_inconclusive: false,
|
||||
sample_rate_other: 0.5,
|
||||
};
|
||||
let mut snapshot: Vec<(String, bool)> = Vec::new();
|
||||
for i in 0..50 {
|
||||
let hash = format!("spec-{i:08x}");
|
||||
let kept = policy.should_sample(VerifyStatus::NotConfirmed, &hash);
|
||||
snapshot.push((hash, kept));
|
||||
}
|
||||
// Re-evaluate; every decision must match the first pass.
|
||||
for (hash, expected) in &snapshot {
|
||||
assert_eq!(
|
||||
*expected,
|
||||
policy.should_sample(VerifyStatus::NotConfirmed, hash),
|
||||
"sampling decision flipped for spec_hash={hash}"
|
||||
);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue