From 3d710c856dbacb8f0bf780eb881b6669d7509dfd Mon Sep 17 00:00:00 2001 From: elipeter Date: Thu, 28 May 2026 12:16:10 -0500 Subject: [PATCH] refactor(dynamic): integrate worker timeout handling, JSON response parsing with serde, and extend Pubsub emulator with streaming pull lifecycle handling --- scripts/m7_ship_gate.sh | 94 ++++-- src/dynamic/build_pool/java.rs | 160 ++++----- src/dynamic/stubs/broker.rs | 306 ++++++++++++++++-- tests/eval_corpus/report.py | 38 +++ tests/eval_corpus/test_tabulate_regression.py | 41 +++ 5 files changed, 506 insertions(+), 133 deletions(-) diff --git a/scripts/m7_ship_gate.sh b/scripts/m7_ship_gate.sh index ea0248b5..132a4ec3 100755 --- a/scripts/m7_ship_gate.sh +++ b/scripts/m7_ship_gate.sh @@ -71,8 +71,8 @@ gate_1_static_corpus() { return 0 fi cargo run --release --quiet -- scan \ - --path "${REPO_ROOT}/tests/benchmark/corpus" \ - --format json > /tmp/m7_gate1.json + --format json \ + "${REPO_ROOT}/tests/benchmark/corpus" > /tmp/m7_gate1.json echo " PASS: static scan completed" } @@ -120,10 +120,11 @@ gate_3_verify_ratio() { # $2 = 0 for static-only, 1 for --verify time_scan() { local path="$1" verify="$2" - local args=("--path" "${path}" "--format" "json") + local args=("--format" "json") if [[ "${verify}" == "1" ]]; then args+=("--verify") fi + args+=("${path}") local start end start="$(python3 -c 'import time;print(time.monotonic())')" cargo run --release --quiet --features dynamic -- scan "${args[@]}" > /dev/null @@ -164,31 +165,84 @@ gate_6_owasp_scale() { return 0 fi - local report="/tmp/m7_gate6_report.json" - local start end wallclock - start="$(python3 -c 'import time;print(time.monotonic())')" - cargo run --release --quiet --features dynamic -- scan \ - --path "${corpus}" \ + local scan_report="/tmp/m7_gate6_scan.json" + local results_report="/tmp/m7_gate6_results.json" + local wallclock_report="/tmp/m7_gate6_wallclock.txt" + local gate_home="${TMPDIR:-/tmp}/nyx_m7_gate6_home" + local gate_build_pool="${TMPDIR:-/tmp}/nyx_m7_gate6_build_pool" + local wallclock + + cargo build --release --quiet --features dynamic + mkdir -p "${gate_home}" "${gate_build_pool}" + rm -f "${scan_report}" "${results_report}" "${wallclock_report}" + + set +e + HOME="${gate_home}" \ + NYX_BUILD_POOL_DIR="${gate_build_pool}" \ + python3 - "${GATE6_WALLCLOCK_BUDGET}" "${scan_report}" "${wallclock_report}" \ + "${REPO_ROOT}/target/release/nyx" scan \ --verify \ - --format json > "${report}" - end="$(python3 -c 'import time;print(time.monotonic())')" - wallclock="$(awk -v a="${start}" -v b="${end}" 'BEGIN { printf "%.1f", b - a }')" + --index off \ + --format json \ + --quiet \ + "${corpus}" <<'PY' +import subprocess +import sys +import time + +budget = float(sys.argv[1]) +scan_report = sys.argv[2] +wallclock_report = sys.argv[3] +cmd = sys.argv[4:] +start = time.monotonic() +rc = 0 +try: + with open(scan_report, "wb") as out: + completed = subprocess.run(cmd, stdout=out, timeout=budget) + rc = completed.returncode +except subprocess.TimeoutExpired: + rc = 124 +finally: + elapsed = time.monotonic() - start + with open(wallclock_report, "w") as f: + f.write(f"{elapsed:.1f}\n") +sys.exit(rc) +PY + local nyx_exit=$? + set -e + wallclock="$(cat "${wallclock_report}" 2>/dev/null || printf "%s" "${GATE6_WALLCLOCK_BUDGET}")" echo " OWASP verify wall-clock: ${wallclock}s (budget ${GATE6_WALLCLOCK_BUDGET}s)" + if [[ ${nyx_exit} -eq 124 ]]; then + echo " FAIL: nyx scan exceeded wall-clock budget" + return 1 + fi + if [[ ${nyx_exit} -ne 0 && ${nyx_exit} -ne 1 ]]; then + echo " FAIL: nyx scan exited ${nyx_exit}" + return 1 + fi + if [[ ! -s "${scan_report}" ]]; then + echo " FAIL: nyx scan produced no JSON report" + return 1 + fi + awk -v w="${wallclock}" -v b="${GATE6_WALLCLOCK_BUDGET}" \ 'BEGIN { if (w+0 > b+0) exit 1 }' \ || { echo " FAIL: wall-clock exceeds budget"; return 1; } - if [[ -x "${REPO_ROOT}/tests/eval_corpus/report.py" ]]; then - # Per-cap confirmed-rate report; the helper exits non-zero if - # any cap falls below the target. - NYX_CONFIRMED_RATE_TARGET="${GATE6_CONFIRMED_RATE_TARGET}" \ - python3 "${REPO_ROOT}/tests/eval_corpus/report.py" "${report}" \ - || { echo " FAIL: confirmed-rate below ${GATE6_CONFIRMED_RATE_TARGET}"; return 1; } - else - echo " NOTE: tests/eval_corpus/report.py not present; skipping per-cap check" - fi + echo "[]" > "${results_report}" + python3 "${REPO_ROOT}/tests/eval_corpus/tabulate.py" \ + --label owasp \ + --scan "${scan_report}" \ + --ground-truth "${REPO_ROOT}/tests/eval_corpus/ground_truth/owasp_benchmark_v1.2.json" \ + --append "${results_report}" \ + || { echo " FAIL: OWASP result tabulation failed"; return 1; } + + python3 "${REPO_ROOT}/tests/eval_corpus/report.py" \ + --results "${results_report}" \ + --min-confirmed-rate "${GATE6_CONFIRMED_RATE_TARGET}" \ + || { echo " FAIL: confirmed-rate below ${GATE6_CONFIRMED_RATE_TARGET}"; return 1; } echo " PASS" } diff --git a/src/dynamic/build_pool/java.rs b/src/dynamic/build_pool/java.rs index d4565977..baba5e4c 100644 --- a/src/dynamic/build_pool/java.rs +++ b/src/dynamic/build_pool/java.rs @@ -39,16 +39,20 @@ //! invokes so integration tests can swap in a wrapper. use super::{BuildPool, PoolCompileResult}; +use serde::Deserialize; use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; -use std::sync::Mutex; +use std::sync::{Mutex, mpsc}; +use std::thread; use std::time::{Duration, Instant}; /// Java source compiled at first use to drive the worker. const WORKER_SOURCE: &str = include_str!("java_worker/NyxJavacWorker.java"); const WORKER_CLASS: &str = "NyxJavacWorker"; const WORKER_FILENAME: &str = "NyxJavacWorker.java"; +const WORKER_READY_TIMEOUT: Duration = Duration::from_secs(10); +const COMPILE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(60); /// Live worker handle. Held inside a `Mutex` so concurrent /// `compile_batch` callers serialise on the single JVM. @@ -134,9 +138,13 @@ impl JavacPool { }; } - let mut line = String::new(); - match worker.stdout.read_line(&mut line) { - Ok(0) => { + match read_line_with_timeout( + &mut worker.child, + &mut worker.stdout, + COMPILE_RESPONSE_TIMEOUT, + "read response", + ) { + Ok(None) => { *guard = None; PoolCompileResult { success: false, @@ -148,11 +156,11 @@ impl JavacPool { *guard = None; PoolCompileResult { success: false, - stderr: format!("javac-pool: read failed: {e}"), + stderr: e, duration: start.elapsed(), } } - Ok(_) => match parse_response(&line) { + Ok(Some(line)) => match parse_response(&line) { Some((success, stderr)) => PoolCompileResult { success, stderr, @@ -285,15 +293,22 @@ fn spawn_worker(dir: &Path) -> Result { .ok_or_else(|| "javac-pool: missing stdout".to_owned())?; let mut stdout = BufReader::new(stdout); - // Read the banner line with a timeout via a polling read. We - // can't use `read_line` with a deadline directly, so spawn a - // bounded waiter: if the worker doesn't announce readiness inside - // 10s we declare bootstrap failure. - let banner = read_line_with_timeout(&mut stdout, Duration::from_secs(10))?; + let banner = + match read_line_with_timeout(&mut child, &mut stdout, WORKER_READY_TIMEOUT, "read banner")? + { + Some(line) => line, + None => { + let _ = child.kill(); + let stderr_tail = drain_stderr(&mut child); + return Err(format!( + "javac-pool: worker closed stdout before readiness; stderr: {stderr_tail}", + )); + } + }; if !banner.contains("\"ready\":true") { // Drain stderr for diagnostic context, then bail. - let stderr_tail = drain_stderr(&mut child); let _ = child.kill(); + let stderr_tail = drain_stderr(&mut child); return Err(format!( "javac-pool: worker did not announce readiness; got {banner:?}; stderr: {stderr_tail}", )); @@ -318,24 +333,31 @@ fn drain_stderr(child: &mut Child) -> String { } fn read_line_with_timeout( + child: &mut Child, stdout: &mut BufReader, timeout: Duration, -) -> Result { - // BufReader doesn't expose async/timeout primitives. The worker's - // first line lands within < 2s on every machine we ship to, so a - // synchronous read_line is fine -- the timeout is enforced by an - // outer watchdog thread that interrupts us via stdin close on - // failure. In practice if `java` blocks indefinitely the test - // suite catches the regression. - // - // We keep the API plumbed so the deadline can be tightened later - // without churning call sites. - let _ = timeout; - let mut line = String::new(); - stdout - .read_line(&mut line) - .map_err(|e| format!("javac-pool: read banner: {e}"))?; - Ok(line) + context: &str, +) -> Result, String> { + let (tx, rx) = mpsc::channel(); + thread::scope(|scope| { + scope.spawn(move || { + let mut line = String::new(); + let result = stdout.read_line(&mut line).map(|n| (n, line)); + let _ = tx.send(result); + }); + match rx.recv_timeout(timeout) { + Ok(Ok((0, _))) => Ok(None), + Ok(Ok((_n, line))) => Ok(Some(line)), + Ok(Err(e)) => Err(format!("javac-pool: {context} failed: {e}")), + Err(mpsc::RecvTimeoutError::Timeout) => { + let _ = child.kill(); + Err(format!("javac-pool: {context} timed out after {timeout:?}")) + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + Err(format!("javac-pool: {context} reader disconnected")) + } + } + }) } fn build_request(id: u64, workdir: &Path, args: &[String]) -> String { @@ -372,61 +394,18 @@ fn append_json_string(out: &mut String, s: &str) { } /// Extract `(success, stderr)` from a worker JSON response line. -/// -/// The wire shape is tightly constrained -- the worker only ever emits -/// `{"id":"N","success":TRUE|FALSE,"stderr_b64":"…"}`, so we use a -/// targeted decoder rather than pulling in `serde_json` and inflating -/// the dynamic feature footprint. Anything off-shape returns `None` -/// and the caller flags the worker unhealthy. fn parse_response(line: &str) -> Option<(bool, String)> { - let success = extract_bool_field(line, "success")?; - let b64 = extract_string_field(line, "stderr_b64").unwrap_or_default(); - let stderr = decode_b64(&b64).unwrap_or_else(|| "".to_owned()); - Some((success, stderr)) + let response: JavacWorkerResponse = serde_json::from_str(line).ok()?; + let stderr = + decode_b64(&response.stderr_b64).unwrap_or_else(|| "".to_owned()); + Some((response.success, stderr)) } -fn extract_bool_field(s: &str, name: &str) -> Option { - let needle = format!("\"{name}\":"); - let i = s.find(&needle)? + needle.len(); - let rest = s[i..].trim_start(); - if rest.starts_with("true") { - Some(true) - } else if rest.starts_with("false") { - Some(false) - } else { - None - } -} - -fn extract_string_field(s: &str, name: &str) -> Option { - let needle = format!("\"{name}\":\""); - let i = s.find(&needle)? + needle.len(); - let tail = &s[i..]; - let mut out = String::new(); - let mut chars = tail.chars(); - while let Some(c) = chars.next() { - match c { - '"' => return Some(out), - '\\' => match chars.next()? { - '"' => out.push('"'), - '\\' => out.push('\\'), - '/' => out.push('/'), - 'b' => out.push('\u{08}'), - 'f' => out.push('\u{0c}'), - 'n' => out.push('\n'), - 'r' => out.push('\r'), - 't' => out.push('\t'), - 'u' => { - let hex: String = (&mut chars).take(4).collect(); - let cp = u32::from_str_radix(&hex, 16).ok()?; - out.push(char::from_u32(cp)?); - } - _ => return None, - }, - c => out.push(c), - } - } - None +#[derive(Debug, Deserialize)] +struct JavacWorkerResponse { + success: bool, + #[serde(default)] + stderr_b64: String, } /// Tiny RFC 4648 base64 decoder. Used only for the worker's @@ -517,6 +496,14 @@ mod tests { assert!(parse_response("{\"id\":\"0\",\"stderr_b64\":\"\"}").is_none()); } + #[test] + fn parse_response_accepts_reordered_fields() { + let (ok, err) = + parse_response("{\"stderr_b64\":\"YQ==\",\"success\":true,\"id\":\"7\"}\n").unwrap(); + assert!(ok); + assert_eq!(err, "a"); + } + #[test] fn b64_decode_roundtrip() { for (raw, encoded) in &[ @@ -529,17 +516,4 @@ mod tests { assert_eq!(decode_b64(encoded).as_deref(), Some(*raw)); } } - - #[test] - fn extract_string_handles_escapes() { - let s = r#"{"id":"0","stderr_b64":"abc","note":"a\"b\\c"}"#; - assert_eq!(extract_string_field(s, "note").as_deref(), Some(r#"a"b\c"#)); - } - - #[test] - fn extract_bool_picks_first_match() { - let s = r#"{"success":false,"other":true}"#; - assert_eq!(extract_bool_field(s, "success"), Some(false)); - assert_eq!(extract_bool_field(s, "other"), Some(true)); - } } diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index bf9c6539..eba0b783 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -21,8 +21,8 @@ //! speaks enough of the binary protocol for metadata, produce, assigned //! partition fetch/list-offsets, and basic consumer-group compatibility. //! Pub/Sub exposes a plaintext h2/gRPC emulator for create-topic, -//! create-subscription, publish, pull, acknowledge, and the single -//! response shape used by streaming pull clients. +//! create-subscription, publish, pull, acknowledge, and the streaming +//! pull lifecycle used by the Go client. use super::{StubEvent, StubKind, StubProvider, monotonic_ns}; use prost::Message; @@ -1567,15 +1567,35 @@ async fn pubsub_grpc_read_all(mut body: h2::RecvStream) -> Vec { out } -async fn pubsub_grpc_read_one_message(mut body: h2::RecvStream) -> Option> { - let mut out = Vec::new(); - while out.len() < 5 || out.len() < 5 + u32::from_be_bytes(out[1..5].try_into().ok()?) as usize { +async fn pubsub_grpc_read_next_message( + body: &mut h2::RecvStream, + buffer: &mut Vec, +) -> Option> { + loop { + if let Some(payload) = pubsub_grpc_take_message(buffer) { + return Some(payload); + } let bytes = body.data().await?.ok()?; let len = bytes.len(); - out.extend_from_slice(&bytes); + buffer.extend_from_slice(&bytes); let _ = body.flow_control().release_capacity(len); } - pubsub_grpc_unframe(&out) +} + +fn pubsub_grpc_take_message(buffer: &mut Vec) -> Option> { + if buffer.len() < 5 { + return None; + } + if buffer[0] != 0 { + buffer.clear(); + return None; + } + let len = u32::from_be_bytes(buffer[1..5].try_into().ok()?) as usize; + if buffer.len() < 5 + len { + return None; + } + let frame: Vec = buffer.drain(..5 + len).collect(); + pubsub_grpc_unframe(&frame) } async fn handle_pubsub_unary( @@ -1633,29 +1653,130 @@ async fn handle_pubsub_unary( } async fn handle_pubsub_streaming_pull( - body: h2::RecvStream, - respond: h2::server::SendResponse, + mut body: h2::RecvStream, + mut respond: h2::server::SendResponse, state: Arc>, log_path: &Path, ) { - let Some(payload) = pubsub_grpc_read_one_message(body).await else { + let mut read_buffer = Vec::new(); + let Some(payload) = pubsub_grpc_read_next_message(&mut body, &mut read_buffer).await else { pubsub_send_grpc_status(respond, 3, "missing request").await; return; }; - let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default(); - if !req.ack_ids.is_empty() { - pubsub_ack(&state, log_path, &req.subscription, &req.ack_ids); - } - let max_messages = if req.max_outstanding_messages > 0 { - req.max_outstanding_messages.min(i64::from(i32::MAX)) as i32 - } else { - 1 + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let Ok(mut send) = respond.send_response(response, false) else { + return; }; - let pull = pubsub_pull(&state, log_path, &req.subscription, max_messages); + + let mut subscription = String::new(); + let mut max_messages = 1_i32; + let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default(); + pubsub_apply_streaming_pull_request( + &state, + log_path, + &req, + &mut subscription, + &mut max_messages, + ); + if !pubsub_send_available_streaming_messages( + &state, + log_path, + &subscription, + max_messages, + &mut send, + ) { + return; + } + + loop { + tokio::select! { + payload = pubsub_grpc_read_next_message(&mut body, &mut read_buffer) => { + let Some(payload) = payload else { break }; + let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default(); + pubsub_apply_streaming_pull_request( + &state, + log_path, + &req, + &mut subscription, + &mut max_messages, + ); + if !pubsub_send_available_streaming_messages( + &state, + log_path, + &subscription, + max_messages, + &mut send, + ) { + return; + } + } + _ = tokio::time::sleep(Duration::from_millis(25)), if !subscription.is_empty() => { + if !pubsub_send_available_streaming_messages( + &state, + log_path, + &subscription, + max_messages, + &mut send, + ) { + return; + } + } + _ = tokio::time::sleep(Duration::from_secs(5)) => { + break; + } + } + } + + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("0")); + let _ = send.send_trailers(trailers); +} + +fn pubsub_apply_streaming_pull_request( + state: &Arc>, + log_path: &Path, + req: &PubsubStreamingPullRequest, + subscription: &mut String, + max_messages: &mut i32, +) { + if !req.subscription.is_empty() { + *subscription = req.subscription.clone(); + } + if req.max_outstanding_messages > 0 { + *max_messages = req.max_outstanding_messages.min(i64::from(i32::MAX)) as i32; + } + if !req.ack_ids.is_empty() && !subscription.is_empty() { + pubsub_ack(state, log_path, subscription, &req.ack_ids); + } +} + +fn pubsub_send_available_streaming_messages( + state: &Arc>, + log_path: &Path, + subscription: &str, + max_messages: i32, + send: &mut h2::SendStream, +) -> bool { + if subscription.is_empty() { + return true; + } + let pull = pubsub_pull(state, log_path, subscription, max_messages); + if pull.received_messages.is_empty() { + return true; + } let response = PubsubStreamingPullResponse { received_messages: pull.received_messages, }; - pubsub_send_grpc_message(respond, &response).await; + let mut payload = Vec::new(); + if response.encode(&mut payload).is_err() { + return false; + } + send.send_data(bytes::Bytes::from(pubsub_grpc_frame(&payload)), false) + .is_ok() } fn pubsub_publish( @@ -3562,6 +3683,70 @@ mod tests { assert_eq!(events[2].detail.get("payload").unwrap(), &received.ack_id); } + #[test] + fn pubsub_grpc_streaming_pull_records_deliver_and_ack() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Pubsub, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://pubsub" { + return; + } + let port: u16 = endpoint + .trim_start_matches("pubsub://127.0.0.1:") + .parse() + .unwrap(); + let topic = "projects/nyx/topics/stream-orders"; + let subscription = "projects/nyx/subscriptions/stream-orders-sub"; + + let _: PubsubTopic = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Publisher/CreateTopic", + &PubsubTopic { + name: topic.to_owned(), + }, + ); + let _: PubsubSubscription = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Subscriber/CreateSubscription", + &PubsubSubscription { + name: subscription.to_owned(), + topic: topic.to_owned(), + ack_deadline_seconds: 10, + }, + ); + let _: PubsubPublishResponse = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Publisher/Publish", + &PubsubPublishRequest { + topic: topic.to_owned(), + messages: vec![PubsubMessage { + data: b"NYX\tSTREAM".to_vec(), + message_id: String::new(), + ordering_key: String::new(), + }], + }, + ); + + let pulled = pubsub_grpc_streaming_pull_once(port, subscription); + assert_eq!(pulled.received_messages.len(), 1); + let received = &pulled.received_messages[0]; + assert_eq!( + received.message.as_ref().unwrap().data, + b"NYX\tSTREAM".to_vec() + ); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[0].detail.get("destination").unwrap(), topic); + assert_eq!(events[1].detail.get("destination").unwrap(), subscription); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tSTREAM"); + assert_eq!(events[2].detail.get("payload").unwrap(), &received.ack_id); + } + #[test] fn rabbit_amqp_protocol_server_records_publish_deliver_ack() { let dir = TempDir::new().unwrap(); @@ -4083,6 +4268,87 @@ mod tests { R::decode(response.as_slice()).unwrap() } + fn pubsub_grpc_streaming_pull_once( + port: u16, + subscription: &str, + ) -> PubsubStreamingPullResponse { + let subscription = subscription.to_owned(); + tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap() + .block_on(async move { + let stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")) + .await + .unwrap(); + let (mut client, connection) = h2::client::handshake(stream).await.unwrap(); + tokio::spawn(async move { + let _ = connection.await; + }); + let request = http::Request::builder() + .method("POST") + .uri("/google.pubsub.v1.Subscriber/StreamingPull") + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let (response, mut send_stream) = client.send_request(request, false).unwrap(); + let init = PubsubStreamingPullRequest { + subscription: subscription.clone(), + ack_ids: Vec::new(), + stream_ack_deadline_seconds: 10, + client_id: "nyx-test".to_owned(), + max_outstanding_messages: 1, + max_outstanding_bytes: 1024 * 1024, + }; + let mut init_payload = Vec::new(); + init.encode(&mut init_payload).unwrap(); + send_stream + .send_data(bytes::Bytes::from(pubsub_grpc_frame(&init_payload)), false) + .unwrap(); + + let response = response.await.unwrap(); + assert_eq!(response.status(), 200); + let mut body = response.into_body(); + let mut response_buffer = Vec::new(); + let payload = tokio::time::timeout( + Duration::from_secs(2), + pubsub_grpc_read_next_message(&mut body, &mut response_buffer), + ) + .await + .expect("streaming pull response timed out") + .expect("streaming pull response closed"); + let pulled = PubsubStreamingPullResponse::decode(payload.as_slice()).unwrap(); + + let ack = PubsubStreamingPullRequest { + subscription, + ack_ids: pulled + .received_messages + .iter() + .map(|message| message.ack_id.clone()) + .collect(), + stream_ack_deadline_seconds: 10, + client_id: "nyx-test".to_owned(), + max_outstanding_messages: 1, + max_outstanding_bytes: 1024 * 1024, + }; + let mut ack_payload = Vec::new(); + ack.encode(&mut ack_payload).unwrap(); + send_stream + .send_data(bytes::Bytes::from(pubsub_grpc_frame(&ack_payload)), true) + .unwrap(); + let _ = tokio::time::timeout(Duration::from_secs(2), async { + while let Some(chunk) = body.data().await { + if let Ok(bytes) = chunk { + let _ = body.flow_control().release_capacity(bytes.len()); + } + } + }) + .await; + pulled + }) + } + fn http_post(port: u16, path: &str, body: &str) -> String { let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); let req = format!( diff --git a/tests/eval_corpus/report.py b/tests/eval_corpus/report.py index b940c83f..d674ed50 100644 --- a/tests/eval_corpus/report.py +++ b/tests/eval_corpus/report.py @@ -113,6 +113,15 @@ def main() -> int: default="", help="path to a previous results.json; fail on monotonic-improvement regression", ) + p.add_argument( + "--min-confirmed-rate", + type=float, + default=None, + help=( + "minimum Confirmed / total rate per cap; exits 2 when any cap " + "with findings falls below the threshold" + ), + ) args = p.parse_args() with open(args.results) as f: @@ -229,6 +238,35 @@ def main() -> int: else: print(" All gate thresholds met.") + # ── Optional confirmed-rate floor ──────────────────────────────────── + if args.min_confirmed_rate is not None: + print( + f"\n=== Confirmed-rate floor ({args.min_confirmed_rate*100:.1f}%) ===" + ) + cap_totals: dict[str, dict] = defaultdict(lambda: {"confirmed": 0, "total": 0}) + for (cap, _lang), v in agg.items(): + cap_totals[cap]["confirmed"] += v.get("confirmed", 0) + cap_totals[cap]["total"] += v.get("total", 0) + confirmed_fails: list[str] = [] + for cap, v in sorted(cap_totals.items()): + if v["total"] <= 0: + continue + rate = v["confirmed"] / v["total"] + line = ( + f" {cap:<20} {v['confirmed']:>5}/{v['total']:<5} " + f"{rate*100:>6.1f}%" + ) + if rate < args.min_confirmed_rate: + confirmed_fails.append(f"{line} FAIL") + else: + print(f"{line} OK") + if confirmed_fails: + for line in confirmed_fails: + print(line) + gate_failed = True + else: + print(" All confirmed-rate floors met.") + # ── Phase 29: monotonic-improvement diff ───────────────────────────── if args.diff: prev = load_previous_agg(args.diff) diff --git a/tests/eval_corpus/test_tabulate_regression.py b/tests/eval_corpus/test_tabulate_regression.py index 53d5541d..0f0f86e3 100644 --- a/tests/eval_corpus/test_tabulate_regression.py +++ b/tests/eval_corpus/test_tabulate_regression.py @@ -25,6 +25,7 @@ from pathlib import Path REPO = Path(__file__).resolve().parents[2] TABULATE = REPO / "tests/eval_corpus/tabulate.py" +REPORT = REPO / "tests/eval_corpus/report.py" BUDGET = REPO / "tests/eval_corpus/budget.toml" @@ -33,6 +34,11 @@ def run_tabulate(*args: str) -> subprocess.CompletedProcess: return subprocess.run(cmd, capture_output=True, text=True) +def run_report(*args: str) -> subprocess.CompletedProcess: + cmd = [sys.executable, str(REPORT), *args] + return subprocess.run(cmd, capture_output=True, text=True) + + def write_json(path: Path, data: object) -> None: path.write_text(json.dumps(data, indent=2)) @@ -307,6 +313,40 @@ def test_budget_malformed_exits_3(tmp: Path) -> None: ) +def test_report_confirmed_rate_floor(tmp: Path) -> None: + results = tmp / "results.json" + write_json( + results, + [ + { + "label": "owasp", + "total_findings": 5, + "cells": [ + { + "cap": "sqli", + "lang": "java", + "tp": 0, + "fp": 0, + "fn": 0, + "unsupported": 0, + "confirmed": 2, + "wrong_confirmed": 0, + "stable_replays": 0, + "total": 5, + } + ], + } + ], + ) + proc = run_report("--results", str(results), "--min-confirmed-rate", "0.40") + assert proc.returncode == 0, proc.stdout + proc.stderr + assert "All confirmed-rate floors met" in proc.stdout, proc.stdout + + proc = run_report("--results", str(results), "--min-confirmed-rate", "0.50") + assert proc.returncode == 2, proc.stdout + proc.stderr + assert "FAIL" in proc.stdout and "sqli" in proc.stdout, proc.stdout + + def main() -> int: with tempfile.TemporaryDirectory() as td: tmp = Path(td) @@ -318,6 +358,7 @@ def main() -> int: test_manual_triage_stamps_wrong_confirmed, test_manual_triage_ignores_vuln_true_entries, test_budget_malformed_exits_3, + test_report_confirmed_rate_floor, ): sub = tmp / fn.__name__ sub.mkdir()