[pitboss/grind] deferred session-0014 (20260516T052512Z-20f8)

This commit is contained in:
pitboss 2026-05-16 08:30:39 -05:00
parent a2cc5f7700
commit 6a169f51b8
23 changed files with 737 additions and 29 deletions

View file

@ -10,19 +10,41 @@
//!
//! Endpoint: `http://127.0.0.1:{port}`.
//!
//! # Side-channel recording
//!
//! In addition to the on-the-wire listener, [`HttpStub`] publishes a
//! companion log path under the [`HTTP_STUB_LOG_ENV_VAR`] env var
//! (`NYX_HTTP_LOG`). A per-language shim helper
//! (`__nyx_stub_http_record`) appends one record per attempted outbound
//! HTTP call to that file, in the same hash-prefixed detail-then-query
//! format the SQL stub uses. The host merges those records into
//! [`StubProvider::drain_events`] alongside the on-the-wire captures, so
//! a harness whose outbound call never reaches the listener (DNS-mocked,
//! network-isolated sandbox, pre-flight check) still produces an
//! event the oracle can match.
//!
//! # Drop
//!
//! Signals the accept thread to shut down and connects to itself to
//! wake the blocking `accept()`. The thread joins on its next loop
//! iteration; the listener socket is released by the OS.
//! iteration; the listener socket is released by the OS. The
//! recording log lives under the workdir-rooted tempdir which is
//! cleaned up by the verifier's tempdir handle.
use super::{monotonic_ns, StubEvent, StubKind, StubProvider};
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tempfile::TempDir;
/// Companion env var that publishes [`HttpStub::log_path`] so a
/// language-side shim can append outbound HTTP attempts the host will
/// pick up on [`HttpStub::drain_events`].
pub const HTTP_STUB_LOG_ENV_VAR: &str = "NYX_HTTP_LOG";
/// Localhost HTTP request recorder.
#[derive(Debug)]
@ -30,11 +52,22 @@ pub struct HttpStub {
port: u16,
events: Arc<Mutex<Vec<StubEvent>>>,
shutdown: Arc<AtomicBool>,
/// Tempdir holding the side-channel recording log. Drop releases
/// the file along with the directory.
tempdir: Option<TempDir>,
/// Path to the side-channel recording log.
log_path: PathBuf,
/// Read cursor on the log file so `drain_events` only surfaces
/// records appended since the last drain.
log_cursor: Mutex<u64>,
}
impl HttpStub {
/// Bind to a random loopback port and start the accept thread.
pub fn start() -> std::io::Result<Self> {
/// Bind to a random loopback port, start the accept thread, and
/// prepare a side-channel recording log under `workdir`. Falls
/// back to the process-wide temp directory when `workdir` is not
/// writable.
pub fn start(workdir: &Path) -> std::io::Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")?;
listener.set_nonblocking(false)?;
let port = listener.local_addr()?.port();
@ -46,7 +79,18 @@ impl HttpStub {
let shutdown_clone = Arc::clone(&shutdown);
std::thread::spawn(move || accept_loop(listener, events_clone, shutdown_clone));
Ok(Self { port, events, shutdown })
let tempdir = TempDir::new_in(workdir).or_else(|_| TempDir::new())?;
let log_path = tempdir.path().join("nyx_http_stub.requests.log");
std::fs::File::create(&log_path)?;
Ok(Self {
port,
events,
shutdown,
tempdir: Some(tempdir),
log_path,
log_cursor: Mutex::new(0),
})
}
/// Port the listener is bound to. Useful for tests that need to
@ -55,6 +99,13 @@ impl HttpStub {
self.port
}
/// Absolute path of the side-channel recording log. The
/// `__nyx_stub_http_record` shim helpers append outbound HTTP
/// attempts here; the stub reads new records on drain.
pub fn log_path(&self) -> &Path {
&self.log_path
}
/// Host-side helper to record a request as if it arrived on the
/// wire. The Phase 10 integration test uses this to bypass the
/// `connect → write → parse` path so the test runs without a real
@ -65,6 +116,60 @@ impl HttpStub {
g.push(ev);
}
}
/// Drain the side-channel log file, returning every record
/// appended since the previous call. Format mirrors the SQL stub
/// log: `# key: value` lines stitch onto the next non-comment line
/// (which becomes the event summary).
fn drain_log_file(&self) -> Vec<StubEvent> {
let mut cursor = match self.log_cursor.lock() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
let file = match std::fs::File::open(&self.log_path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
use std::io::Seek;
let mut reader = BufReader::new(file);
if reader.seek(std::io::SeekFrom::Start(*cursor)).is_err() {
return Vec::new();
}
let mut events = Vec::new();
let mut pending_detail = BTreeMap::<String, String>::new();
let mut bytes_read: u64 = 0;
let mut buf = String::new();
loop {
buf.clear();
let n = match reader.read_line(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
bytes_read += n as u64;
let line = buf.trim_end_matches(['\r', '\n']).to_owned();
if line.is_empty() {
continue;
}
if let Some(rest) = line.strip_prefix("# ") {
if let Some((k, v)) = rest.split_once(':') {
pending_detail.insert(k.trim().to_owned(), v.trim().to_owned());
}
continue;
}
let mut ev = StubEvent {
kind: StubKind::Http,
captured_at_ns: monotonic_ns(),
summary: line,
detail: BTreeMap::new(),
};
ev.detail.append(&mut pending_detail);
events.push(ev);
}
*cursor += bytes_read;
events
}
}
impl StubProvider for HttpStub {
@ -76,11 +181,17 @@ impl StubProvider for HttpStub {
format!("http://127.0.0.1:{}", self.port)
}
fn recording_endpoint(&self) -> Option<(&'static str, String)> {
Some((HTTP_STUB_LOG_ENV_VAR, self.log_path.to_string_lossy().into_owned()))
}
fn drain_events(&self) -> Vec<StubEvent> {
match self.events.lock() {
let mut out = match self.events.lock() {
Ok(mut g) => std::mem::take(&mut *g),
Err(_) => Vec::new(),
}
};
out.extend(self.drain_log_file());
out
}
}
@ -89,6 +200,8 @@ impl Drop for HttpStub {
self.shutdown.store(true, Ordering::Relaxed);
// Wake the blocking accept by connecting once.
let _ = TcpStream::connect(format!("127.0.0.1:{}", self.port));
// TempDir's own Drop deletes the side-channel log + dir.
self.tempdir.take();
}
}
@ -197,6 +310,7 @@ fn handle_connection(mut stream: TcpStream, max_bytes: usize) -> Option<StubEven
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn send_request(port: u16, request: &[u8]) -> Vec<u8> {
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
@ -207,9 +321,15 @@ mod tests {
out
}
fn start_stub() -> (TempDir, HttpStub) {
let dir = TempDir::new().unwrap();
let stub = HttpStub::start(dir.path()).unwrap();
(dir, stub)
}
#[test]
fn endpoint_uses_loopback_with_assigned_port() {
let stub = HttpStub::start().unwrap();
let (_dir, stub) = start_stub();
let ep = stub.endpoint();
assert!(ep.starts_with("http://127.0.0.1:"));
assert!(ep.ends_with(&stub.port().to_string()));
@ -217,7 +337,7 @@ mod tests {
#[test]
fn captures_request_line_via_real_socket() {
let stub = HttpStub::start().unwrap();
let (_dir, stub) = start_stub();
let reply = send_request(
stub.port(),
b"GET /api/users HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n",
@ -236,7 +356,7 @@ mod tests {
#[test]
fn captures_post_body() {
let stub = HttpStub::start().unwrap();
let (_dir, stub) = start_stub();
let body = b"username=admin&password=hunter2";
let req = format!(
"POST /login HTTP/1.1\r\nHost: 127.0.0.1\r\nContent-Length: {}\r\n\r\n",
@ -256,7 +376,7 @@ mod tests {
#[test]
fn drain_resets_event_buffer() {
let stub = HttpStub::start().unwrap();
let (_dir, stub) = start_stub();
stub.record("GET /first HTTP/1.1");
assert_eq!(stub.drain_events().len(), 1);
assert!(stub.drain_events().is_empty(), "second drain must be empty");
@ -265,7 +385,7 @@ mod tests {
#[test]
fn drop_releases_port_for_rebind() {
let port = {
let stub = HttpStub::start().unwrap();
let (_dir, stub) = start_stub();
stub.port()
};
// After drop, the OS releases the port. The accept thread may
@ -276,4 +396,75 @@ mod tests {
// We don't assert success here — the OS may hold the port in
// TIME_WAIT — but Drop must not panic or deadlock.
}
#[test]
fn recording_endpoint_publishes_log_path_under_nyx_http_log() {
let (_dir, stub) = start_stub();
let pair = stub
.recording_endpoint()
.expect("HttpStub must publish a recording endpoint");
assert_eq!(pair.0, HTTP_STUB_LOG_ENV_VAR);
assert_eq!(pair.0, "NYX_HTTP_LOG");
assert_eq!(pair.1, stub.log_path().to_string_lossy());
assert!(
stub.log_path().exists(),
"side-channel log file must be created on start",
);
}
#[test]
fn drain_events_merges_log_file_records_with_in_memory_events() {
let (_dir, stub) = start_stub();
// Simulate the on-the-wire path.
stub.record("GET /listener-hit HTTP/1.1");
// Simulate the shim path: append a detail-then-summary record
// mirroring the SQL stub log format.
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(stub.log_path())
.unwrap();
f.write_all(b"# method: POST\n# url: http://example.com/login\nPOST http://example.com/login\n")
.unwrap();
drop(f);
let events = stub.drain_events();
assert_eq!(events.len(), 2, "both sources must surface, got {events:?}");
let summaries: Vec<_> = events.iter().map(|e| e.summary.as_str()).collect();
assert!(summaries.contains(&"GET /listener-hit HTTP/1.1"));
assert!(summaries.contains(&"POST http://example.com/login"));
let shim_event = events
.iter()
.find(|e| e.summary.starts_with("POST http://example.com"))
.unwrap();
assert_eq!(
shim_event.detail.get("method").map(String::as_str),
Some("POST"),
);
assert_eq!(
shim_event.detail.get("url").map(String::as_str),
Some("http://example.com/login"),
);
}
#[test]
fn drain_log_file_returns_only_new_entries() {
let (_dir, stub) = start_stub();
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(stub.log_path())
.unwrap();
f.write_all(b"GET /one\n").unwrap();
drop(f);
assert_eq!(stub.drain_events().len(), 1);
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(stub.log_path())
.unwrap();
f.write_all(b"GET /two\n").unwrap();
drop(f);
let second = stub.drain_events();
assert_eq!(second.len(), 1, "drain must return only the new record");
assert_eq!(second[0].summary, "GET /two");
}
}

View file

@ -241,7 +241,7 @@ impl StubHarness {
seen.push(k);
let stub: Arc<dyn StubProvider> = match k {
StubKind::Sql => Arc::new(SqlStub::start(workdir)?),
StubKind::Http => Arc::new(HttpStub::start()?),
StubKind::Http => Arc::new(HttpStub::start(workdir)?),
StubKind::Redis => Arc::new(RedisStub::start()?),
StubKind::Filesystem => Arc::new(FilesystemStub::start(workdir)?),
};