diff --git a/docs/dynamic.md b/docs/dynamic.md index 64aa68b6..aa2e7300 100644 --- a/docs/dynamic.md +++ b/docs/dynamic.md @@ -91,6 +91,79 @@ If scan time is unacceptable for a given workflow (e.g. IDE integration, quick pre-commit check), use `--no-verify` for that workflow and rely on the full scan in CI. +## Event schema + +The dynamic layer writes one JSON record per verdict to +`~/.cache/nyx/dynamic/events.jsonl`. Every record begins with a fixed envelope +so older readers fail loudly instead of silently mixing incompatible shapes: + +```json +{ + "schema_version": 1, + "nyx_version": "0.7.0", + "corpus_version": "4", + "kind": "verdict", + "ts": "2026-05-15T18:42:09Z", + "finding_id": "a3b1...", + "spec_hash": "9f4e...", + "lang": "python", + "cap": "SQL_QUERY", + "status": "Confirmed", + "toolchain_id": "python-3.11", + "toolchain_match": "exact", + "duration_ms": 312, + "build_attempts": 1 +} +``` + +| Field | Type | Meaning | +| --- | --- | --- | +| `schema_version` | integer | Bumped on any breaking change. Readers reject mismatches. | +| `nyx_version` | string | `CARGO_PKG_VERSION` of the writing binary. | +| `corpus_version` | string | Payload-corpus version the verdict was scored against. | +| `kind` | string | `"verdict"` (per-finding) or `"rank_delta"` (rank-score shift). | +| `ts` | RFC-3339 string | Wall-clock at write time. | +| `finding_id` | string | Stable finding identifier. | +| `spec_hash` | string | Hash of the `HarnessSpec` that drove the run. | +| `lang` | string | Language slug; `"unknown"` when spec derivation failed. | +| `cap` | string | Sink capability (e.g. `SQL_QUERY`, `CODE_EXEC`). | +| `status` | string | `Confirmed`, `NotConfirmed`, `Inconclusive`, or `Unsupported`. | +| `inconclusive_reason` | string | Present iff `status == Inconclusive`. | + +A `rank_delta` record carries the envelope plus `finding_id`, `status`, and a +signed `delta` applied to the rank score. + +### Schema-version mismatch + +`scripts/m7_ship_gate.sh` Gate 2 walks every line of the log, requires +`schema_version == EXPECTED_SCHEMA_VERSION`, and exits 3 if any record fails +the check. Programmatic readers use +`crate::dynamic::telemetry::read_events(path)`, which surfaces the same +condition as `TelemetryReadError::SchemaMismatch { expected, found, .. }`. + +When schema bumps land, the canonical migration is to roll the log over (move +or delete `events.jsonl`) so new and old records never coexist in a file. The +gate refuses to skip silently on mismatch. + +### Sampling + +`[telemetry]` in `nyx.toml` controls the on-disk sampling policy: + +```toml +[telemetry] +keep_all_confirmed = true # default: retain every Confirmed verdict +keep_all_inconclusive = true # default: retain every Inconclusive verdict +sample_rate_other = 1.0 # 0.0–1.0 for NotConfirmed / Unsupported +``` + +`sample_rate_other < 1.0` downsamples NotConfirmed and Unsupported verdicts +deterministically — the decision is seeded by the finding's `spec_hash`, so a +given finding makes the same keep-or-drop call across reruns. Confirmed and +Inconclusive verdicts ignore the rate and are always retained (they gate the +false-Confirmed budget and drive the spec-derivation roadmap). + +`NYX_NO_TELEMETRY=1` disables every write regardless of the policy. + ## Opting in to feedback False positives (nyx says `Confirmed` but you disagree) can be recorded: diff --git a/scripts/m7_ship_gate.sh b/scripts/m7_ship_gate.sh index fb718045..82644da6 100755 --- a/scripts/m7_ship_gate.sh +++ b/scripts/m7_ship_gate.sh @@ -74,6 +74,14 @@ else fi # ── Gate 2: False-Confirmed rate ───────────────────────────────────────────── +# +# Phase 27 (Track H.1): the telemetry log is schema-versioned. Gate 2 reads +# `EXPECTED_SCHEMA_VERSION` against every record's `schema_version` field and +# fails loudly with exit 3 when a mismatch is found — silently treating a +# v0 (pre-Phase-27) log as "no data" would mask incompatible releases mixing +# their records. +EXPECTED_SCHEMA_VERSION=1 + if skip false-confirmed; then info "Gate 2 (false-confirmed): SKIPPED" else @@ -82,20 +90,35 @@ else if [[ ! -f "$EVENTS" ]]; then info "Gate 2: telemetry log not found at $EVENTS; skipping (no data)" else - python3 - <<'PYEOF' "$EVENTS" + set +e + python3 - "$EVENTS" "$EXPECTED_SCHEMA_VERSION" <<'PYEOF' import json, sys, collections path = sys.argv[1] +expected_schema = int(sys.argv[2]) cap_counts = collections.defaultdict(lambda: {"confirmed": 0, "wrong": 0}) with open(path) as f: - for line in f: - try: - ev = json.loads(line) - except json.JSONDecodeError: + for line_no, raw in enumerate(f, start=1): + if not raw.strip(): continue - if ev.get("kind") == "feedback" and ev.get("wrong"): + try: + ev = json.loads(raw) + except json.JSONDecodeError as e: + print(f"FAIL malformed JSON at {path} line {line_no}: {e}") + sys.exit(3) + if "schema_version" not in ev: + print(f"FAIL missing schema_version at {path} line {line_no}") + sys.exit(3) + if ev["schema_version"] != expected_schema: + print( + f"FAIL schema mismatch at {path} line {line_no}: " + f"expected {expected_schema}, found {ev['schema_version']}" + ) + sys.exit(3) + kind = ev.get("kind", "") + if kind == "feedback" and ev.get("wrong"): cap = ev.get("cap", "unknown") cap_counts[cap]["wrong"] += 1 - elif ev.get("kind") == "verdict" and ev.get("status") == "Confirmed": + elif kind == "verdict" and ev.get("status") == "Confirmed": cap = ev.get("cap", "unknown") cap_counts[cap]["confirmed"] += 1 @@ -115,8 +138,11 @@ for cap, counts in sorted(cap_counts.items()): sys.exit(2 if failed else 0) PYEOF RC=$? + set -e if [[ $RC -eq 0 ]]; then pass "Gate 2: false-Confirmed rate within threshold" + elif [[ $RC -eq 3 ]]; then + die "Gate 2: telemetry schema mismatch (expected v$EXPECTED_SCHEMA_VERSION) — refusing to silently skip" else die "Gate 2: false-Confirmed rate exceeds 2% for one or more caps" fi diff --git a/src/dynamic/telemetry.rs b/src/dynamic/telemetry.rs index 665a0313..6934a976 100644 --- a/src/dynamic/telemetry.rs +++ b/src/dynamic/telemetry.rs @@ -3,9 +3,27 @@ //! Writes one JSON line per verdict to `~/.cache/nyx/dynamic/events.jsonl`. //! `NYX_NO_TELEMETRY=1` silently disables all writes (§21.4). //! -//! Schema (§21.1 minimal fields): +//! # Schema (Phase 27) +//! +//! Every record starts with three envelope fields so the on-disk format can +//! evolve across releases without silently mixing incompatible records: +//! +//! - `schema_version`: integer, bumped on any breaking shape change. +//! - `nyx_version`: the Cargo package version that wrote the record. +//! - `corpus_version`: the payload-corpus version active at write time. +//! +//! Followed by a `kind` discriminator (`"verdict"` or `"rank_delta"`). All +//! readers (`read_events`, the M7 ship gate) require `schema_version == +//! [`SCHEMA_VERSION`]; mismatched records produce +//! [`TelemetryReadError::SchemaMismatch`] instead of being silently parsed +//! as if they matched. +//! //! ```json //! { +//! "schema_version": 1, +//! "nyx_version": "0.7.0", +//! "corpus_version": "4", +//! "kind": "verdict", //! "ts": "", //! "finding_id": "...", //! "spec_hash": "...", @@ -24,18 +42,37 @@ use crate::dynamic::spec::HarnessSpec; use crate::evidence::{InconclusiveReason, VerifyStatus}; use directories::ProjectDirs; use std::fs::{self, OpenOptions}; -use std::io::Write; -use std::path::Path; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; use std::time::Duration; +/// On-disk telemetry schema version. Bump on any breaking shape change to +/// the JSON record. Readers reject any record whose `schema_version` does +/// not match this constant. +pub const SCHEMA_VERSION: u32 = 1; + +/// Cargo package version of the Nyx build that wrote the record. +pub const NYX_VERSION: &str = env!("CARGO_PKG_VERSION"); + +/// Corpus-version label written into every record. Kept as a `&'static str` +/// so it can sit on a `Serialize`-derived struct alongside the other envelope +/// fields without an allocation. Mirrors +/// [`crate::dynamic::corpus::CORPUS_VERSION`]; the +/// [`corpus_version_const_matches_corpus_module`] test guards drift. +pub const CORPUS_VERSION: &str = "4"; + /// One telemetry event per verdict. /// /// `lang` is `"unknown"` for findings whose language could not be resolved /// (e.g. spec derivation failed before `HarnessSpec::lang` was set). Counting /// these is the `lang_unknown_count` Phase 02 acceptance asks for: /// `grep '"lang":"unknown"' events.jsonl | wc -l`. -#[derive(Debug, serde::Serialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct TelemetryEvent { + pub schema_version: u32, + pub nyx_version: &'static str, + pub corpus_version: &'static str, + pub kind: &'static str, pub ts: String, pub finding_id: String, pub spec_hash: String, @@ -46,13 +83,13 @@ pub struct TelemetryEvent { pub toolchain_match: String, pub duration_ms: u64, pub build_attempts: u32, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none", default)] pub inconclusive_reason: Option, /// Path of the finding's source file, populated for spec-derivation /// failures so downstream consumers can map `lang="unknown"` events back /// to a file. Skipped on successful verdicts (the spec already carries /// `entry_file`). - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none", default)] pub path: Option, } @@ -66,6 +103,10 @@ impl TelemetryEvent { build_attempts: u32, ) -> Self { Self { + schema_version: SCHEMA_VERSION, + nyx_version: NYX_VERSION, + corpus_version: CORPUS_VERSION, + kind: "verdict", ts: chrono::Utc::now().to_rfc3339(), finding_id: spec.finding_id.clone(), spec_hash: spec.spec_hash.clone(), @@ -108,6 +149,10 @@ impl TelemetryEvent { .map(|e| format!("{:?}", e.sink_caps)) .unwrap_or_else(|| "0".to_owned()); Self { + schema_version: SCHEMA_VERSION, + nyx_version: NYX_VERSION, + corpus_version: CORPUS_VERSION, + kind: "verdict", ts: chrono::Utc::now().to_rfc3339(), finding_id: format!("{:016x}", diag.stable_hash), spec_hash: String::new(), @@ -143,6 +188,10 @@ impl TelemetryEvent { .map(|l| l.as_str().to_owned()) .unwrap_or_else(|| "unknown".to_owned()); Self { + schema_version: SCHEMA_VERSION, + nyx_version: NYX_VERSION, + corpus_version: CORPUS_VERSION, + kind: "verdict", ts: chrono::Utc::now().to_rfc3339(), finding_id: String::new(), spec_hash: String::new(), @@ -159,17 +208,112 @@ impl TelemetryEvent { } } +/// Sampling decision for telemetry writes (Phase 27, Track H.2). +/// +/// Confirmed and Inconclusive verdicts are calibration-critical (false-Confirmed +/// rate gates M7 ship; Inconclusive reasons drive the spec-derivation roadmap) +/// and are always retained. Other verdict statuses can be downsampled to bound +/// log growth on high-volume scans. +/// +/// The decision is seeded by `spec_hash` so the *same* finding makes the *same* +/// keep-or-drop call across reruns — without this, two scans of the same project +/// would produce non-comparable event logs. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct SamplingPolicy { + /// Always keep Confirmed verdicts. Default `true`. + pub keep_all_confirmed: bool, + /// Always keep Inconclusive verdicts. Default `true`. + pub keep_all_inconclusive: bool, + /// Probability of keeping any other verdict (NotConfirmed, Unsupported). + /// `0.0` drops all non-retained; `1.0` keeps all. Default `1.0`. + pub sample_rate_other: f32, +} + +impl Default for SamplingPolicy { + fn default() -> Self { + Self { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 1.0, + } + } +} + +impl SamplingPolicy { + /// Keep every record regardless of status. Equivalent to the pre-Phase-27 + /// behaviour and the right default for unit tests. + pub fn keep_all() -> Self { + Self::default() + } + + /// Build the runtime policy from `[telemetry]` in `nyx.toml`. + pub fn from_config(cfg: &crate::utils::config::TelemetryConfig) -> Self { + Self { + keep_all_confirmed: cfg.keep_all_confirmed, + keep_all_inconclusive: cfg.keep_all_inconclusive, + sample_rate_other: cfg.sample_rate_other, + } + } + + /// Decide whether an event with the given status / spec_hash should be + /// written. Deterministic for a fixed `(self, status, spec_hash)`. + pub fn should_sample(&self, status: VerifyStatus, spec_hash: &str) -> bool { + if matches!(status, VerifyStatus::Confirmed) && self.keep_all_confirmed { + return true; + } + if matches!(status, VerifyStatus::Inconclusive) && self.keep_all_inconclusive { + return true; + } + // Clamp the configured rate into [0, 1] and short-circuit the extremes + // so we never hash a record we already know the answer for. + let rate = self.sample_rate_other.clamp(0.0, 1.0); + if rate >= 1.0 { + return true; + } + if rate <= 0.0 { + return false; + } + // Hash the spec_hash with a fixed key so the bucket is stable across + // releases. blake3 is already in the dep tree; the first 8 bytes + // give a uniform u64. + let h = blake3::hash(spec_hash.as_bytes()); + let bytes: [u8; 8] = h.as_bytes()[..8].try_into().unwrap(); + let bucket = (u64::from_le_bytes(bytes) % 1_000_000) as f32 / 1_000_000.0; + bucket < rate + } +} + /// Write a telemetry event to the events log. /// /// Silently no-ops when: /// - `NYX_NO_TELEMETRY=1` /// - The log directory cannot be created /// - The write fails (telemetry must never affect verdict) +/// +/// Applies the default-`keep_all` sampling policy — every event is written. +/// Call sites that want sampling go through [`emit_with_policy`] instead. pub fn emit(event: &TelemetryEvent) { + emit_with_policy(event, &SamplingPolicy::keep_all()); +} + +/// Like [`emit`] but consults `policy` before writing. +/// +/// Drops the record when `policy.should_sample(...)` returns `false`. The +/// decision is keyed on `event.spec_hash`, so the same finding produces the +/// same keep-or-drop call across reruns. +pub fn emit_with_policy(event: &TelemetryEvent, policy: &SamplingPolicy) { if std::env::var("NYX_NO_TELEMETRY").as_deref() == Ok("1") { return; } + // Map the &str status back into the VerifyStatus enum for the policy + // check. Falls through to "keep" on any unrecognised string so we never + // accidentally drop a record because of a future status variant. + let status = parse_status(&event.status).unwrap_or(VerifyStatus::Confirmed); + if !policy.should_sample(status, &event.spec_hash) { + return; + } + let Some(path) = events_log_path() else { return; }; @@ -195,6 +339,16 @@ pub fn emit(event: &TelemetryEvent) { })(); } +fn parse_status(s: &str) -> Option { + match s { + "Confirmed" => Some(VerifyStatus::Confirmed), + "NotConfirmed" => Some(VerifyStatus::NotConfirmed), + "Inconclusive" => Some(VerifyStatus::Inconclusive), + "Unsupported" => Some(VerifyStatus::Unsupported), + _ => None, + } +} + fn events_log_path() -> Option { // Respect explicit override for testing. if let Ok(p) = std::env::var("NYX_TELEMETRY_PATH") { @@ -209,6 +363,94 @@ pub fn log_path() -> Option { events_log_path() } +// ── Reading events back (Phase 27) ─────────────────────────────────────────── + +/// Structured error returned by [`read_events`]. +/// +/// Surfaced to the M7 ship gate so Gate 2 can fail loudly on schema-mismatch +/// rather than silently treating mismatched records as "no data". +#[derive(Debug, thiserror::Error)] +pub enum TelemetryReadError { + #[error("io error reading {path}: {source}")] + Io { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error( + "schema mismatch in {path} line {line}: expected schema_version={expected}, found {found}" + )] + SchemaMismatch { + path: PathBuf, + line: usize, + expected: u32, + found: u32, + }, + #[error("missing schema_version in {path} line {line}")] + MissingSchemaVersion { path: PathBuf, line: usize }, + #[error("malformed JSON in {path} line {line}: {source}")] + Json { + path: PathBuf, + line: usize, + #[source] + source: serde_json::Error, + }, +} + +/// Read every event record from the JSONL log at `path`. +/// +/// Returns each line as a `serde_json::Value` so callers can dispatch on the +/// `kind` discriminator themselves. Rejects any record whose `schema_version` +/// does not match [`SCHEMA_VERSION`] (this is the explicit failure mode the +/// M7 ship gate Gate 2 consumes — a v0 record from an older release must not +/// silently parse as if the schema had never changed). +/// +/// Blank lines are skipped. Any malformed JSON or missing `schema_version` +/// fails the whole read; partial recovery is not the contract here because +/// the ship gate already treats "log missing or unreadable" as "no data, +/// skip Gate 2 with a notice." +pub fn read_events(path: &Path) -> Result, TelemetryReadError> { + let file = std::fs::File::open(path).map_err(|e| TelemetryReadError::Io { + path: path.to_path_buf(), + source: e, + })?; + let reader = BufReader::new(file); + let mut out = Vec::new(); + for (idx, line) in reader.lines().enumerate() { + let line_no = idx + 1; + let line = line.map_err(|e| TelemetryReadError::Io { + path: path.to_path_buf(), + source: e, + })?; + if line.trim().is_empty() { + continue; + } + let value: serde_json::Value = + serde_json::from_str(&line).map_err(|e| TelemetryReadError::Json { + path: path.to_path_buf(), + line: line_no, + source: e, + })?; + let found = value + .get("schema_version") + .and_then(|v| v.as_u64()) + .ok_or_else(|| TelemetryReadError::MissingSchemaVersion { + path: path.to_path_buf(), + line: line_no, + })?; + if found != SCHEMA_VERSION as u64 { + return Err(TelemetryReadError::SchemaMismatch { + path: path.to_path_buf(), + line: line_no, + expected: SCHEMA_VERSION, + found: found as u32, + }); + } + out.push(value); + } + Ok(out) +} + // ── Rank delta telemetry ────────────────────────────────────────────────────── /// One telemetry event per ranked finding that carries a dynamic verdict delta. @@ -216,11 +458,14 @@ pub fn log_path() -> Option { /// Emitted by `rank::rank_diags` for every diag whose dynamic verdict shifts /// its rank score (delta != 0). Used by the M7 calibration pipeline to tune /// the N/M boost/penalty constants from real-world verdict distributions. -#[derive(Debug, serde::Serialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct RankDeltaEvent { - pub ts: String, + pub schema_version: u32, + pub nyx_version: &'static str, + pub corpus_version: &'static str, /// Always `"rank_delta"` — distinguishes from verdict events in the log. - pub event_type: &'static str, + pub kind: &'static str, + pub ts: String, pub finding_id: String, /// `"Confirmed"`, `"NotConfirmed"`, etc. pub status: String, @@ -228,6 +473,21 @@ pub struct RankDeltaEvent { pub delta: f64, } +impl RankDeltaEvent { + pub fn new(finding_id: String, status: String, delta: f64) -> Self { + Self { + schema_version: SCHEMA_VERSION, + nyx_version: NYX_VERSION, + corpus_version: CORPUS_VERSION, + kind: "rank_delta", + ts: chrono::Utc::now().to_rfc3339(), + finding_id, + status, + delta, + } + } +} + /// Write a rank-delta telemetry event to the events log. /// /// Silently no-ops under the same conditions as [`emit`]: @@ -306,6 +566,10 @@ mod tests { let content = std::fs::read_to_string(&log).unwrap(); assert!(!content.is_empty()); let v: serde_json::Value = serde_json::from_str(content.trim()).unwrap(); + assert_eq!(v["schema_version"], SCHEMA_VERSION); + assert_eq!(v["nyx_version"], NYX_VERSION); + assert_eq!(v["corpus_version"], CORPUS_VERSION); + assert_eq!(v["kind"], "verdict"); assert_eq!(v["status"], "Confirmed"); assert_eq!(v["toolchain_match"], "exact"); @@ -328,6 +592,8 @@ mod tests { assert_eq!(event.path.as_deref(), Some("/tmp/some_script_no_ext")); assert!(event.spec_hash.is_empty()); assert_eq!(event.status, "Unsupported"); + assert_eq!(event.schema_version, SCHEMA_VERSION); + assert_eq!(event.kind, "verdict"); } #[test] @@ -347,8 +613,7 @@ mod tests { tried: vec![SpecDerivationStrategy::FromFlowSteps], hint: "kotlin source".to_owned(), }; - let event = - TelemetryEvent::no_spec(&diag, VerifyStatus::Inconclusive, Some(reason)); + let event = TelemetryEvent::no_spec(&diag, VerifyStatus::Inconclusive, Some(reason)); let json = serde_json::to_string(&event).unwrap(); assert!(json.contains("\"lang\":\"java\"")); assert!(json.contains("SpecDerivationFailed")); @@ -381,4 +646,186 @@ mod tests { std::env::remove_var("NYX_TELEMETRY_PATH"); } } + + #[test] + fn corpus_version_const_matches_corpus_module() { + assert_eq!( + CORPUS_VERSION, + crate::dynamic::corpus::CORPUS_VERSION.to_string() + ); + } + + #[test] + fn read_events_rejects_schema_zero() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + std::fs::write( + &log, + "{\"schema_version\":0,\"kind\":\"verdict\",\"status\":\"Confirmed\"}\n", + ) + .unwrap(); + let err = read_events(&log).expect_err("schema 0 must be rejected"); + match err { + TelemetryReadError::SchemaMismatch { expected, found, .. } => { + assert_eq!(expected, SCHEMA_VERSION); + assert_eq!(found, 0); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[test] + fn read_events_accepts_current_schema() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + let event = TelemetryEvent::new( + &make_spec(), + VerifyStatus::Confirmed, + None, + "exact", + Duration::from_millis(1), + 1, + ); + let line = serde_json::to_string(&event).unwrap(); + std::fs::write(&log, format!("{line}\n\n")).unwrap(); + let events = read_events(&log).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0]["kind"], "verdict"); + } + + #[test] + fn read_events_rejects_missing_schema() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + std::fs::write(&log, "{\"kind\":\"verdict\"}\n").unwrap(); + match read_events(&log).unwrap_err() { + TelemetryReadError::MissingSchemaVersion { .. } => {} + other => panic!("expected MissingSchemaVersion, got {other:?}"), + } + } + + #[test] + fn read_events_rejects_malformed_json() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + std::fs::write(&log, "{not json\n").unwrap(); + match read_events(&log).unwrap_err() { + TelemetryReadError::Json { .. } => {} + other => panic!("expected Json, got {other:?}"), + } + } + + #[test] + fn sampling_policy_keeps_confirmed_and_inconclusive() { + let policy = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.0, + }; + assert!(policy.should_sample(VerifyStatus::Confirmed, "any")); + assert!(policy.should_sample(VerifyStatus::Inconclusive, "any")); + assert!(!policy.should_sample(VerifyStatus::NotConfirmed, "any")); + assert!(!policy.should_sample(VerifyStatus::Unsupported, "any")); + } + + #[test] + fn sampling_policy_is_deterministic_per_spec_hash() { + let policy = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.5, + }; + let first = policy.should_sample(VerifyStatus::NotConfirmed, "deadbeef"); + for _ in 0..100 { + assert_eq!( + first, + policy.should_sample(VerifyStatus::NotConfirmed, "deadbeef") + ); + } + } + + #[test] + fn sampling_policy_rate_one_keeps_everything() { + let policy = SamplingPolicy { + keep_all_confirmed: false, + keep_all_inconclusive: false, + sample_rate_other: 1.0, + }; + for hash in &["a", "b", "c", "deadbeef", ""] { + assert!(policy.should_sample(VerifyStatus::NotConfirmed, hash)); + } + } + + #[test] + fn sampling_policy_rate_zero_drops_everything_else() { + let policy = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.0, + }; + for hash in &["a", "b", "c", "deadbeef"] { + assert!(!policy.should_sample(VerifyStatus::NotConfirmed, hash)); + assert!(!policy.should_sample(VerifyStatus::Unsupported, hash)); + } + } + + #[test] + fn sampling_policy_rate_half_buckets_roughly_evenly() { + let policy = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.5, + }; + let kept = (0..1000) + .filter(|i| { + let h = format!("hash-{i:06x}"); + policy.should_sample(VerifyStatus::NotConfirmed, &h) + }) + .count(); + // Loose envelope around 500/1000. Tight enough to catch a "always + // keep" or "always drop" regression, wide enough to avoid flakes. + assert!( + kept > 350 && kept < 650, + "expected ~500/1000 kept at rate 0.5, got {kept}" + ); + } + + #[test] + fn emit_with_policy_drops_when_unsampled() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + unsafe { std::env::set_var("NYX_TELEMETRY_PATH", log.to_str().unwrap()) }; + + let mut spec = make_spec(); + spec.spec_hash = "drop-me".into(); + let event = TelemetryEvent::new( + &spec, + VerifyStatus::NotConfirmed, + None, + "exact", + Duration::from_millis(1), + 1, + ); + let policy = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.0, + }; + emit_with_policy(&event, &policy); + + assert!(!log.exists(), "event must not be written when policy drops"); + + unsafe { std::env::remove_var("NYX_TELEMETRY_PATH") }; + } + + #[test] + fn rank_delta_carries_envelope_fields() { + let event = RankDeltaEvent::new("abc".into(), "Confirmed".into(), 2.5); + assert_eq!(event.schema_version, SCHEMA_VERSION); + assert_eq!(event.nyx_version, NYX_VERSION); + assert_eq!(event.corpus_version, CORPUS_VERSION); + assert_eq!(event.kind, "rank_delta"); + let json = serde_json::to_string(&event).unwrap(); + assert!(json.starts_with("{\"schema_version\":1")); + } } diff --git a/src/dynamic/verify.rs b/src/dynamic/verify.rs index a62c1ca0..4a64d589 100644 --- a/src/dynamic/verify.rs +++ b/src/dynamic/verify.rs @@ -12,7 +12,7 @@ use crate::dynamic::runner::{run_spec, RunError}; use crate::dynamic::sandbox::{toolchain_id_with_digest, SandboxOptions}; use crate::dynamic::spec::{HarnessSpec, SPEC_FORMAT_VERSION}; use crate::dynamic::stubs::StubHarness; -use crate::dynamic::telemetry::{self, TelemetryEvent}; +use crate::dynamic::telemetry::{self, SamplingPolicy, TelemetryEvent}; use crate::dynamic::toolchain; use crate::evidence::{InconclusiveReason, SpecDerivationStrategy, UnsupportedReason}; use crate::summary::GlobalSummaries; @@ -62,6 +62,10 @@ pub struct VerifyOptions { /// [`crate::evidence::InconclusiveReason::BackendInsufficient`] /// rather than running against an unhardened host. pub refuse_filesystem_confirm: bool, + /// Phase 27 (Track H.2): sampling policy applied to every telemetry + /// event emitted from the verify pipeline. Default `keep_all` so unit + /// tests and embedded callers do not silently lose records. + pub telemetry_policy: SamplingPolicy, } impl VerifyOptions { @@ -116,6 +120,7 @@ impl VerifyOptions { summaries: None, callgraph: None, refuse_filesystem_confirm, + telemetry_policy: SamplingPolicy::from_config(&config.telemetry), } } } @@ -242,6 +247,7 @@ fn entry_kind_unsupported_verdict( spec_entry_path: &str, lang: crate::symbol::Lang, attempted: crate::dynamic::spec::EntryKind, + policy: &SamplingPolicy, ) -> VerifyResult { let supported = crate::dynamic::lang::entry_kinds_supported(lang).to_vec(); let hint = crate::dynamic::lang::entry_kind_hint(lang, attempted); @@ -263,7 +269,7 @@ fn entry_kind_unsupported_verdict( Some(inconclusive_reason.clone()), ), }; - telemetry::emit(&event); + telemetry::emit_with_policy(&event, policy); VerifyResult { finding_id, status: VerifyStatus::Inconclusive, @@ -290,6 +296,7 @@ fn spec_derivation_failed_verdict( finding_id: String, diag: &Diag, reason: UnsupportedReason, + policy: &SamplingPolicy, ) -> VerifyResult { if matches!(reason, UnsupportedReason::SpecDerivationFailed) && should_be_inconclusive(diag) { let strategies: Vec = @@ -304,7 +311,7 @@ fn spec_derivation_failed_verdict( VerifyStatus::Inconclusive, Some(inconclusive_reason.clone()), ); - telemetry::emit(&event); + telemetry::emit_with_policy(&event, policy); return VerifyResult { finding_id, status: VerifyStatus::Inconclusive, @@ -319,7 +326,7 @@ fn spec_derivation_failed_verdict( } let event = TelemetryEvent::no_spec(diag, VerifyStatus::Unsupported, None); - telemetry::emit(&event); + telemetry::emit_with_policy(&event, policy); VerifyResult { finding_id, @@ -388,7 +395,12 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult { ) { Ok(s) => s, Err(reason) => { - return spec_derivation_failed_verdict(finding_id, diag, reason); + return spec_derivation_failed_verdict( + finding_id, + diag, + reason, + &opts.telemetry_policy, + ); } }; @@ -404,6 +416,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult { &spec.entry_file, spec.lang, spec.entry_kind, + &opts.telemetry_policy, ); } @@ -574,7 +587,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult { elapsed, build_attempts, ); - telemetry::emit(&event); + telemetry::emit_with_policy(&event, &opts.telemetry_policy); verdict } @@ -809,6 +822,7 @@ fn build_verdict( &spec.entry_file, spec.lang, spec.entry_kind, + &opts.telemetry_policy, ); } } diff --git a/src/rank.rs b/src/rank.rs index 37ddccb6..66235f51 100644 --- a/src/rank.rs +++ b/src/rank.rs @@ -222,13 +222,11 @@ pub fn rank_diags(diags: &mut [Diag]) { .and_then(|ev| ev.dynamic_verdict.as_ref()) .map(|dv| format!("{:?}", dv.status)) .unwrap_or_default(); - telemetry::emit_rank_delta(RankDeltaEvent { - ts: chrono::Utc::now().to_rfc3339(), - event_type: "rank_delta", - finding_id: d.finding_id.clone(), + telemetry::emit_rank_delta(RankDeltaEvent::new( + d.finding_id.clone(), status, delta, - }); + )); } } diags.sort_by(|a, b| { diff --git a/src/utils/config.rs b/src/utils/config.rs index 42bea9dc..e88f19a1 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -758,6 +758,30 @@ impl Default for ServerConfig { } } +/// Phase 27 — `[telemetry]` section. Controls the on-disk event log +/// sampling policy. Confirmed and Inconclusive verdicts are calibration +/// critical and are retained by default; other verdict statuses can be +/// downsampled via `sample_rate_other` to bound log growth on high-volume +/// scans. Decisions are seeded by `spec_hash` for determinism — see +/// [`crate::dynamic::telemetry::SamplingPolicy`]. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(default)] +pub struct TelemetryConfig { + pub keep_all_confirmed: bool, + pub keep_all_inconclusive: bool, + pub sample_rate_other: f32, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 1.0, + } + } +} + /// Configuration for scan run persistence and history. #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(default)] @@ -880,6 +904,10 @@ pub struct Config { pub detectors: crate::utils::detector_options::DetectorOptions, pub server: ServerConfig, pub runs: RunsConfig, + /// Phase 27 — `[telemetry]` section. Sampling policy for the dynamic + /// event log. + #[serde(default)] + pub telemetry: TelemetryConfig, pub profiles: HashMap, /// Detected frameworks for the current project, set by the scan pipeline, /// not persisted to config files. @@ -1186,6 +1214,9 @@ pub(crate) fn merge_configs(mut default: Config, user: Config) -> Config { // --- RunsConfig --- default.runs = user.runs; + // --- TelemetryConfig --- + default.telemetry = user.telemetry; + // --- Profiles (user profile with same name fully replaces) --- for (name, profile) in user.profiles { default.profiles.insert(name, profile); diff --git a/tests/dynamic_parity.rs b/tests/dynamic_parity.rs index 7dc62cd7..7bd8db2c 100644 --- a/tests/dynamic_parity.rs +++ b/tests/dynamic_parity.rs @@ -102,12 +102,7 @@ mod parity_tests { timeout: Duration::from_secs(10), ..SandboxOptions::default() }, - project_root: None, - db_path: None, - verify_all_confidence: false, - summaries: None, - callgraph: None, - refuse_filesystem_confirm: false, + ..VerifyOptions::default() } } @@ -118,12 +113,7 @@ mod parity_tests { timeout: Duration::from_secs(30), ..SandboxOptions::default() }, - project_root: None, - db_path: None, - verify_all_confidence: false, - summaries: None, - callgraph: None, - refuse_filesystem_confirm: false, + ..VerifyOptions::default() } } diff --git a/tests/telemetry_schema.rs b/tests/telemetry_schema.rs new file mode 100644 index 00000000..4b0fd027 --- /dev/null +++ b/tests/telemetry_schema.rs @@ -0,0 +1,179 @@ +//! Phase 27 — Track H.1 integration test. +//! +//! Locks in the on-disk telemetry schema contract that `scripts/m7_ship_gate.sh` +//! Gate 2 relies on: +//! +//! - Records produced today carry the `schema_version`, `nyx_version`, and +//! `corpus_version` envelope fields, plus a `kind` discriminator. +//! - `read_events(path)` accepts the current schema. +//! - A hand-crafted record with `schema_version: 0` is rejected by +//! `read_events` with a typed [`TelemetryReadError::SchemaMismatch`] (this +//! is the explicit Phase 27 acceptance bullet). +//! - The sampling policy retains Confirmed and Inconclusive verdicts even at +//! `sample_rate_other = 0.0`. + +#![cfg(feature = "dynamic")] + +use nyx_scanner::dynamic::telemetry::{ + self, RankDeltaEvent, SamplingPolicy, TelemetryEvent, TelemetryReadError, CORPUS_VERSION, + NYX_VERSION, SCHEMA_VERSION, +}; +use nyx_scanner::dynamic::spec::{EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy}; +use nyx_scanner::evidence::VerifyStatus; +use nyx_scanner::labels::Cap; +use nyx_scanner::symbol::Lang; +use std::time::Duration; +use tempfile::TempDir; + +fn make_spec(hash: &str) -> HarnessSpec { + HarnessSpec { + finding_id: "0000000000000001".into(), + entry_file: "handler.py".into(), + entry_name: "handle".into(), + entry_kind: EntryKind::Function, + lang: Lang::Python, + toolchain_id: "python-3.11".into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: Cap::SQL_QUERY, + constraint_hints: vec![], + sink_file: "handler.py".into(), + sink_line: 5, + spec_hash: hash.into(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + } +} + +#[test] +fn current_record_carries_envelope_fields() { + let event = TelemetryEvent::new( + &make_spec("abcd1234"), + VerifyStatus::Confirmed, + None, + "exact", + Duration::from_millis(7), + 1, + ); + let v: serde_json::Value = serde_json::to_value(&event).unwrap(); + assert_eq!(v["schema_version"], SCHEMA_VERSION); + assert_eq!(v["nyx_version"], NYX_VERSION); + assert_eq!(v["corpus_version"], CORPUS_VERSION); + assert_eq!(v["kind"], "verdict"); + + let rank = RankDeltaEvent::new("a".into(), "Confirmed".into(), 2.0); + let v: serde_json::Value = serde_json::to_value(&rank).unwrap(); + assert_eq!(v["schema_version"], SCHEMA_VERSION); + assert_eq!(v["kind"], "rank_delta"); +} + +#[test] +fn read_events_accepts_current_schema() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + let mut content = String::new(); + for i in 0..3 { + let event = TelemetryEvent::new( + &make_spec(&format!("hash{i}")), + VerifyStatus::Confirmed, + None, + "exact", + Duration::from_millis(1), + 1, + ); + content.push_str(&serde_json::to_string(&event).unwrap()); + content.push('\n'); + } + std::fs::write(&log, content).unwrap(); + + let records = telemetry::read_events(&log).unwrap(); + assert_eq!(records.len(), 3); + for r in &records { + assert_eq!(r["schema_version"], SCHEMA_VERSION); + } +} + +#[test] +fn read_events_rejects_schema_zero_record() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + // Hand-crafted v0 record — exactly the case the Phase 27 acceptance pins. + std::fs::write( + &log, + "{\"schema_version\":0,\"kind\":\"verdict\",\"status\":\"Confirmed\"}\n", + ) + .unwrap(); + + let err = telemetry::read_events(&log).expect_err("schema 0 must be rejected"); + match err { + TelemetryReadError::SchemaMismatch { + expected, found, .. + } => { + assert_eq!(expected, SCHEMA_VERSION); + assert_eq!(found, 0); + } + other => panic!("expected SchemaMismatch, got {other:?}"), + } +} + +#[test] +fn read_events_rejects_mixed_schema_record_inside_valid_log() { + let dir = TempDir::new().unwrap(); + let log = dir.path().join("events.jsonl"); + let good = serde_json::to_string(&TelemetryEvent::new( + &make_spec("good"), + VerifyStatus::Confirmed, + None, + "exact", + Duration::from_millis(1), + 1, + )) + .unwrap(); + let bad = "{\"schema_version\":0,\"kind\":\"verdict\"}"; + std::fs::write(&log, format!("{good}\n{bad}\n")).unwrap(); + + match telemetry::read_events(&log).unwrap_err() { + TelemetryReadError::SchemaMismatch { line, found, .. } => { + assert_eq!(line, 2); + assert_eq!(found, 0); + } + other => panic!("expected SchemaMismatch on line 2, got {other:?}"), + } +} + +#[test] +fn sampling_policy_retains_confirmed_and_inconclusive() { + let strict = SamplingPolicy { + keep_all_confirmed: true, + keep_all_inconclusive: true, + sample_rate_other: 0.0, + }; + for hash in ["a", "b", "spec-1234", "deadbeef"] { + assert!(strict.should_sample(VerifyStatus::Confirmed, hash)); + assert!(strict.should_sample(VerifyStatus::Inconclusive, hash)); + assert!(!strict.should_sample(VerifyStatus::NotConfirmed, hash)); + assert!(!strict.should_sample(VerifyStatus::Unsupported, hash)); + } +} + +#[test] +fn sampling_policy_is_deterministic_across_runs() { + let policy = SamplingPolicy { + keep_all_confirmed: false, + keep_all_inconclusive: false, + sample_rate_other: 0.5, + }; + let mut snapshot: Vec<(String, bool)> = Vec::new(); + for i in 0..50 { + let hash = format!("spec-{i:08x}"); + let kept = policy.should_sample(VerifyStatus::NotConfirmed, &hash); + snapshot.push((hash, kept)); + } + // Re-evaluate; every decision must match the first pass. + for (hash, expected) in &snapshot { + assert_eq!( + *expected, + policy.should_sample(VerifyStatus::NotConfirmed, hash), + "sampling decision flipped for spec_hash={hash}" + ); + } +}