refactor(dynamic): integrate worker timeout handling, JSON response parsing with serde, and extend Pubsub emulator with streaming pull lifecycle handling

This commit is contained in:
elipeter 2026-05-28 12:16:10 -05:00
parent c3a1550315
commit 3d710c856d
5 changed files with 506 additions and 133 deletions

View file

@ -39,16 +39,20 @@
//! invokes so integration tests can swap in a wrapper.
use super::{BuildPool, PoolCompileResult};
use serde::Deserialize;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::Mutex;
use std::sync::{Mutex, mpsc};
use std::thread;
use std::time::{Duration, Instant};
/// Java source compiled at first use to drive the worker.
const WORKER_SOURCE: &str = include_str!("java_worker/NyxJavacWorker.java");
const WORKER_CLASS: &str = "NyxJavacWorker";
const WORKER_FILENAME: &str = "NyxJavacWorker.java";
const WORKER_READY_TIMEOUT: Duration = Duration::from_secs(10);
const COMPILE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(60);
/// Live worker handle. Held inside a `Mutex` so concurrent
/// `compile_batch` callers serialise on the single JVM.
@ -134,9 +138,13 @@ impl JavacPool {
};
}
let mut line = String::new();
match worker.stdout.read_line(&mut line) {
Ok(0) => {
match read_line_with_timeout(
&mut worker.child,
&mut worker.stdout,
COMPILE_RESPONSE_TIMEOUT,
"read response",
) {
Ok(None) => {
*guard = None;
PoolCompileResult {
success: false,
@ -148,11 +156,11 @@ impl JavacPool {
*guard = None;
PoolCompileResult {
success: false,
stderr: format!("javac-pool: read failed: {e}"),
stderr: e,
duration: start.elapsed(),
}
}
Ok(_) => match parse_response(&line) {
Ok(Some(line)) => match parse_response(&line) {
Some((success, stderr)) => PoolCompileResult {
success,
stderr,
@ -285,15 +293,22 @@ fn spawn_worker(dir: &Path) -> Result<Worker, String> {
.ok_or_else(|| "javac-pool: missing stdout".to_owned())?;
let mut stdout = BufReader::new(stdout);
// Read the banner line with a timeout via a polling read. We
// can't use `read_line` with a deadline directly, so spawn a
// bounded waiter: if the worker doesn't announce readiness inside
// 10s we declare bootstrap failure.
let banner = read_line_with_timeout(&mut stdout, Duration::from_secs(10))?;
let banner =
match read_line_with_timeout(&mut child, &mut stdout, WORKER_READY_TIMEOUT, "read banner")?
{
Some(line) => line,
None => {
let _ = child.kill();
let stderr_tail = drain_stderr(&mut child);
return Err(format!(
"javac-pool: worker closed stdout before readiness; stderr: {stderr_tail}",
));
}
};
if !banner.contains("\"ready\":true") {
// Drain stderr for diagnostic context, then bail.
let stderr_tail = drain_stderr(&mut child);
let _ = child.kill();
let stderr_tail = drain_stderr(&mut child);
return Err(format!(
"javac-pool: worker did not announce readiness; got {banner:?}; stderr: {stderr_tail}",
));
@ -318,24 +333,31 @@ fn drain_stderr(child: &mut Child) -> String {
}
fn read_line_with_timeout(
child: &mut Child,
stdout: &mut BufReader<ChildStdout>,
timeout: Duration,
) -> Result<String, String> {
// BufReader doesn't expose async/timeout primitives. The worker's
// first line lands within < 2s on every machine we ship to, so a
// synchronous read_line is fine -- the timeout is enforced by an
// outer watchdog thread that interrupts us via stdin close on
// failure. In practice if `java` blocks indefinitely the test
// suite catches the regression.
//
// We keep the API plumbed so the deadline can be tightened later
// without churning call sites.
let _ = timeout;
let mut line = String::new();
stdout
.read_line(&mut line)
.map_err(|e| format!("javac-pool: read banner: {e}"))?;
Ok(line)
context: &str,
) -> Result<Option<String>, String> {
let (tx, rx) = mpsc::channel();
thread::scope(|scope| {
scope.spawn(move || {
let mut line = String::new();
let result = stdout.read_line(&mut line).map(|n| (n, line));
let _ = tx.send(result);
});
match rx.recv_timeout(timeout) {
Ok(Ok((0, _))) => Ok(None),
Ok(Ok((_n, line))) => Ok(Some(line)),
Ok(Err(e)) => Err(format!("javac-pool: {context} failed: {e}")),
Err(mpsc::RecvTimeoutError::Timeout) => {
let _ = child.kill();
Err(format!("javac-pool: {context} timed out after {timeout:?}"))
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(format!("javac-pool: {context} reader disconnected"))
}
}
})
}
fn build_request(id: u64, workdir: &Path, args: &[String]) -> String {
@ -372,61 +394,18 @@ fn append_json_string(out: &mut String, s: &str) {
}
/// Extract `(success, stderr)` from a worker JSON response line.
///
/// The wire shape is tightly constrained -- the worker only ever emits
/// `{"id":"N","success":TRUE|FALSE,"stderr_b64":"…"}`, so we use a
/// targeted decoder rather than pulling in `serde_json` and inflating
/// the dynamic feature footprint. Anything off-shape returns `None`
/// and the caller flags the worker unhealthy.
fn parse_response(line: &str) -> Option<(bool, String)> {
let success = extract_bool_field(line, "success")?;
let b64 = extract_string_field(line, "stderr_b64").unwrap_or_default();
let stderr = decode_b64(&b64).unwrap_or_else(|| "<unable to decode stderr>".to_owned());
Some((success, stderr))
let response: JavacWorkerResponse = serde_json::from_str(line).ok()?;
let stderr =
decode_b64(&response.stderr_b64).unwrap_or_else(|| "<unable to decode stderr>".to_owned());
Some((response.success, stderr))
}
fn extract_bool_field(s: &str, name: &str) -> Option<bool> {
let needle = format!("\"{name}\":");
let i = s.find(&needle)? + needle.len();
let rest = s[i..].trim_start();
if rest.starts_with("true") {
Some(true)
} else if rest.starts_with("false") {
Some(false)
} else {
None
}
}
fn extract_string_field(s: &str, name: &str) -> Option<String> {
let needle = format!("\"{name}\":\"");
let i = s.find(&needle)? + needle.len();
let tail = &s[i..];
let mut out = String::new();
let mut chars = tail.chars();
while let Some(c) = chars.next() {
match c {
'"' => return Some(out),
'\\' => match chars.next()? {
'"' => out.push('"'),
'\\' => out.push('\\'),
'/' => out.push('/'),
'b' => out.push('\u{08}'),
'f' => out.push('\u{0c}'),
'n' => out.push('\n'),
'r' => out.push('\r'),
't' => out.push('\t'),
'u' => {
let hex: String = (&mut chars).take(4).collect();
let cp = u32::from_str_radix(&hex, 16).ok()?;
out.push(char::from_u32(cp)?);
}
_ => return None,
},
c => out.push(c),
}
}
None
#[derive(Debug, Deserialize)]
struct JavacWorkerResponse {
success: bool,
#[serde(default)]
stderr_b64: String,
}
/// Tiny RFC 4648 base64 decoder. Used only for the worker's
@ -517,6 +496,14 @@ mod tests {
assert!(parse_response("{\"id\":\"0\",\"stderr_b64\":\"\"}").is_none());
}
#[test]
fn parse_response_accepts_reordered_fields() {
let (ok, err) =
parse_response("{\"stderr_b64\":\"YQ==\",\"success\":true,\"id\":\"7\"}\n").unwrap();
assert!(ok);
assert_eq!(err, "a");
}
#[test]
fn b64_decode_roundtrip() {
for (raw, encoded) in &[
@ -529,17 +516,4 @@ mod tests {
assert_eq!(decode_b64(encoded).as_deref(), Some(*raw));
}
}
#[test]
fn extract_string_handles_escapes() {
let s = r#"{"id":"0","stderr_b64":"abc","note":"a\"b\\c"}"#;
assert_eq!(extract_string_field(s, "note").as_deref(), Some(r#"a"b\c"#));
}
#[test]
fn extract_bool_picks_first_match() {
let s = r#"{"success":false,"other":true}"#;
assert_eq!(extract_bool_field(s, "success"), Some(false));
assert_eq!(extract_bool_field(s, "other"), Some(true));
}
}

View file

@ -21,8 +21,8 @@
//! speaks enough of the binary protocol for metadata, produce, assigned
//! partition fetch/list-offsets, and basic consumer-group compatibility.
//! Pub/Sub exposes a plaintext h2/gRPC emulator for create-topic,
//! create-subscription, publish, pull, acknowledge, and the single
//! response shape used by streaming pull clients.
//! create-subscription, publish, pull, acknowledge, and the streaming
//! pull lifecycle used by the Go client.
use super::{StubEvent, StubKind, StubProvider, monotonic_ns};
use prost::Message;
@ -1567,15 +1567,35 @@ async fn pubsub_grpc_read_all(mut body: h2::RecvStream) -> Vec<u8> {
out
}
async fn pubsub_grpc_read_one_message(mut body: h2::RecvStream) -> Option<Vec<u8>> {
let mut out = Vec::new();
while out.len() < 5 || out.len() < 5 + u32::from_be_bytes(out[1..5].try_into().ok()?) as usize {
async fn pubsub_grpc_read_next_message(
body: &mut h2::RecvStream,
buffer: &mut Vec<u8>,
) -> Option<Vec<u8>> {
loop {
if let Some(payload) = pubsub_grpc_take_message(buffer) {
return Some(payload);
}
let bytes = body.data().await?.ok()?;
let len = bytes.len();
out.extend_from_slice(&bytes);
buffer.extend_from_slice(&bytes);
let _ = body.flow_control().release_capacity(len);
}
pubsub_grpc_unframe(&out)
}
fn pubsub_grpc_take_message(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
if buffer.len() < 5 {
return None;
}
if buffer[0] != 0 {
buffer.clear();
return None;
}
let len = u32::from_be_bytes(buffer[1..5].try_into().ok()?) as usize;
if buffer.len() < 5 + len {
return None;
}
let frame: Vec<u8> = buffer.drain(..5 + len).collect();
pubsub_grpc_unframe(&frame)
}
async fn handle_pubsub_unary(
@ -1633,29 +1653,130 @@ async fn handle_pubsub_unary(
}
async fn handle_pubsub_streaming_pull(
body: h2::RecvStream,
respond: h2::server::SendResponse<bytes::Bytes>,
mut body: h2::RecvStream,
mut respond: h2::server::SendResponse<bytes::Bytes>,
state: Arc<Mutex<PubsubGrpcState>>,
log_path: &Path,
) {
let Some(payload) = pubsub_grpc_read_one_message(body).await else {
let mut read_buffer = Vec::new();
let Some(payload) = pubsub_grpc_read_next_message(&mut body, &mut read_buffer).await else {
pubsub_send_grpc_status(respond, 3, "missing request").await;
return;
};
let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default();
if !req.ack_ids.is_empty() {
pubsub_ack(&state, log_path, &req.subscription, &req.ack_ids);
}
let max_messages = if req.max_outstanding_messages > 0 {
req.max_outstanding_messages.min(i64::from(i32::MAX)) as i32
} else {
1
let response = http::Response::builder()
.status(200)
.header("content-type", "application/grpc")
.body(())
.unwrap();
let Ok(mut send) = respond.send_response(response, false) else {
return;
};
let pull = pubsub_pull(&state, log_path, &req.subscription, max_messages);
let mut subscription = String::new();
let mut max_messages = 1_i32;
let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default();
pubsub_apply_streaming_pull_request(
&state,
log_path,
&req,
&mut subscription,
&mut max_messages,
);
if !pubsub_send_available_streaming_messages(
&state,
log_path,
&subscription,
max_messages,
&mut send,
) {
return;
}
loop {
tokio::select! {
payload = pubsub_grpc_read_next_message(&mut body, &mut read_buffer) => {
let Some(payload) = payload else { break };
let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default();
pubsub_apply_streaming_pull_request(
&state,
log_path,
&req,
&mut subscription,
&mut max_messages,
);
if !pubsub_send_available_streaming_messages(
&state,
log_path,
&subscription,
max_messages,
&mut send,
) {
return;
}
}
_ = tokio::time::sleep(Duration::from_millis(25)), if !subscription.is_empty() => {
if !pubsub_send_available_streaming_messages(
&state,
log_path,
&subscription,
max_messages,
&mut send,
) {
return;
}
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
break;
}
}
}
let mut trailers = http::HeaderMap::new();
trailers.insert("grpc-status", http::HeaderValue::from_static("0"));
let _ = send.send_trailers(trailers);
}
fn pubsub_apply_streaming_pull_request(
state: &Arc<Mutex<PubsubGrpcState>>,
log_path: &Path,
req: &PubsubStreamingPullRequest,
subscription: &mut String,
max_messages: &mut i32,
) {
if !req.subscription.is_empty() {
*subscription = req.subscription.clone();
}
if req.max_outstanding_messages > 0 {
*max_messages = req.max_outstanding_messages.min(i64::from(i32::MAX)) as i32;
}
if !req.ack_ids.is_empty() && !subscription.is_empty() {
pubsub_ack(state, log_path, subscription, &req.ack_ids);
}
}
fn pubsub_send_available_streaming_messages(
state: &Arc<Mutex<PubsubGrpcState>>,
log_path: &Path,
subscription: &str,
max_messages: i32,
send: &mut h2::SendStream<bytes::Bytes>,
) -> bool {
if subscription.is_empty() {
return true;
}
let pull = pubsub_pull(state, log_path, subscription, max_messages);
if pull.received_messages.is_empty() {
return true;
}
let response = PubsubStreamingPullResponse {
received_messages: pull.received_messages,
};
pubsub_send_grpc_message(respond, &response).await;
let mut payload = Vec::new();
if response.encode(&mut payload).is_err() {
return false;
}
send.send_data(bytes::Bytes::from(pubsub_grpc_frame(&payload)), false)
.is_ok()
}
fn pubsub_publish(
@ -3562,6 +3683,70 @@ mod tests {
assert_eq!(events[2].detail.get("payload").unwrap(), &received.ack_id);
}
#[test]
fn pubsub_grpc_streaming_pull_records_deliver_and_ack() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Pubsub, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://pubsub" {
return;
}
let port: u16 = endpoint
.trim_start_matches("pubsub://127.0.0.1:")
.parse()
.unwrap();
let topic = "projects/nyx/topics/stream-orders";
let subscription = "projects/nyx/subscriptions/stream-orders-sub";
let _: PubsubTopic = pubsub_grpc_unary(
port,
"/google.pubsub.v1.Publisher/CreateTopic",
&PubsubTopic {
name: topic.to_owned(),
},
);
let _: PubsubSubscription = pubsub_grpc_unary(
port,
"/google.pubsub.v1.Subscriber/CreateSubscription",
&PubsubSubscription {
name: subscription.to_owned(),
topic: topic.to_owned(),
ack_deadline_seconds: 10,
},
);
let _: PubsubPublishResponse = pubsub_grpc_unary(
port,
"/google.pubsub.v1.Publisher/Publish",
&PubsubPublishRequest {
topic: topic.to_owned(),
messages: vec![PubsubMessage {
data: b"NYX\tSTREAM".to_vec(),
message_id: String::new(),
ordering_key: String::new(),
}],
},
);
let pulled = pubsub_grpc_streaming_pull_once(port, subscription);
assert_eq!(pulled.received_messages.len(), 1);
let received = &pulled.received_messages[0];
assert_eq!(
received.message.as_ref().unwrap().data,
b"NYX\tSTREAM".to_vec()
);
let events = stub.drain_events();
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
.collect();
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
assert_eq!(events[0].detail.get("destination").unwrap(), topic);
assert_eq!(events[1].detail.get("destination").unwrap(), subscription);
assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tSTREAM");
assert_eq!(events[2].detail.get("payload").unwrap(), &received.ack_id);
}
#[test]
fn rabbit_amqp_protocol_server_records_publish_deliver_ack() {
let dir = TempDir::new().unwrap();
@ -4083,6 +4268,87 @@ mod tests {
R::decode(response.as_slice()).unwrap()
}
fn pubsub_grpc_streaming_pull_once(
port: u16,
subscription: &str,
) -> PubsubStreamingPullResponse {
let subscription = subscription.to_owned();
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap()
.block_on(async move {
let stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
.await
.unwrap();
let (mut client, connection) = h2::client::handshake(stream).await.unwrap();
tokio::spawn(async move {
let _ = connection.await;
});
let request = http::Request::builder()
.method("POST")
.uri("/google.pubsub.v1.Subscriber/StreamingPull")
.header("content-type", "application/grpc")
.body(())
.unwrap();
let (response, mut send_stream) = client.send_request(request, false).unwrap();
let init = PubsubStreamingPullRequest {
subscription: subscription.clone(),
ack_ids: Vec::new(),
stream_ack_deadline_seconds: 10,
client_id: "nyx-test".to_owned(),
max_outstanding_messages: 1,
max_outstanding_bytes: 1024 * 1024,
};
let mut init_payload = Vec::new();
init.encode(&mut init_payload).unwrap();
send_stream
.send_data(bytes::Bytes::from(pubsub_grpc_frame(&init_payload)), false)
.unwrap();
let response = response.await.unwrap();
assert_eq!(response.status(), 200);
let mut body = response.into_body();
let mut response_buffer = Vec::new();
let payload = tokio::time::timeout(
Duration::from_secs(2),
pubsub_grpc_read_next_message(&mut body, &mut response_buffer),
)
.await
.expect("streaming pull response timed out")
.expect("streaming pull response closed");
let pulled = PubsubStreamingPullResponse::decode(payload.as_slice()).unwrap();
let ack = PubsubStreamingPullRequest {
subscription,
ack_ids: pulled
.received_messages
.iter()
.map(|message| message.ack_id.clone())
.collect(),
stream_ack_deadline_seconds: 10,
client_id: "nyx-test".to_owned(),
max_outstanding_messages: 1,
max_outstanding_bytes: 1024 * 1024,
};
let mut ack_payload = Vec::new();
ack.encode(&mut ack_payload).unwrap();
send_stream
.send_data(bytes::Bytes::from(pubsub_grpc_frame(&ack_payload)), true)
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(2), async {
while let Some(chunk) = body.data().await {
if let Ok(bytes) = chunk {
let _ = body.flow_control().release_capacity(bytes.len());
}
}
})
.await;
pulled
})
}
fn http_post(port: u16, path: &str, body: &str) -> String {
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let req = format!(