From 1a0e2d204b25ae41d2ba1909d0f45b04a2a9f74f Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 27 May 2026 14:11:31 -0500 Subject: [PATCH] refactor(dynamic): extend Kafka protocol emulator with binary protocol support, Pubsub gRPC emulator, and enhance listener and endpoint handling --- Cargo.lock | 53 ++ Cargo.toml | 8 +- src/dynamic/lang/java.rs | 16 +- src/dynamic/lang/python.rs | 22 +- src/dynamic/stubs/broker.rs | 1535 +++++++++++++++++++++++++++++-- tests/message_handler_corpus.rs | 2 +- 6 files changed, 1530 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91a67215..e3c2346e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -637,6 +637,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -741,6 +747,25 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "h2" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.7.1" @@ -1142,6 +1167,7 @@ dependencies = [ "axum", "bitflags", "blake3", + "bytes", "bytesize", "chrono", "clap", @@ -1151,6 +1177,8 @@ dependencies = [ "dashmap", "directories", "glob", + "h2", + "http", "ignore", "indicatif", "num_cpus", @@ -1159,6 +1187,7 @@ dependencies = [ "petgraph", "phf", "predicates", + "prost", "r2d2", "r2d2_sqlite", "rayon", @@ -1413,6 +1442,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quote" version = "1.0.45" @@ -1925,6 +1977,7 @@ version = "1.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ + "bytes", "libc", "mio", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 8dbdf5b9..61fcdb43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ smt-system-z3 = ["dep:z3"] docgen = [] # Dynamic verification layer: builds harnesses from findings, runs them in a # sandbox, reports back whether the sink fires. -dynamic = ["dep:tempfile"] +dynamic = ["dep:bytes", "dep:h2", "dep:http", "dep:prost", "dep:tempfile", "dep:tokio"] # Phase 19 (Track E.3): the `nyx-image-builder` helper binary that builds # and pins per-toolchain Docker images. Gated so it does not bloat the # default `nyx` build with extra TOML-write logic CI-only operators need. @@ -141,7 +141,11 @@ smallvec = { version = "1.15.1", features = ["serde"] } rustc-hash = "2.1.2" uuid = { version = "1.23.1", features = ["v4"] } axum = { version = "0.8.9", optional = true } -tokio = { version = "1.52.3", features = ["rt-multi-thread", "macros", "signal", "sync"], optional = true } +bytes = { version = "1.11.0", optional = true } +h2 = { version = "0.4.14", optional = true } +http = { version = "1.3.1", optional = true } +prost = { version = "0.14.3", optional = true } +tokio = { version = "1.52.3", features = ["rt-multi-thread", "macros", "signal", "sync", "net", "io-util"], optional = true } tokio-stream = { version = "0.1.18", features = ["sync"], optional = true } tower-http = { version = "0.6.10", features = ["cors", "compression-gzip", "trace", "set-header", "limit"], optional = true } z3 = { version = "0.20.0", optional = true} diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 8236b9b7..0e94131c 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -4192,7 +4192,6 @@ public class NyxHarness {{ java.util.Properties consumerProps = new java.util.Properties(); consumerProps.put("bootstrap.servers", bootstrap); - consumerProps.put("group.id", "nyx-" + Long.toString(System.nanoTime())); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("auto.offset.reset", "earliest"); @@ -4209,8 +4208,18 @@ public class NyxHarness {{ .invoke(future, Long.valueOf(2L), java.util.concurrent.TimeUnit.SECONDS); producerClass.getMethod("flush").invoke(producer); - consumerClass.getMethod("subscribe", java.util.Collection.class) - .invoke(consumer, java.util.Collections.singletonList(topic)); + Class topicPartitionClass = Class.forName("org.apache.kafka.common.TopicPartition"); + Object partition = topicPartitionClass.getConstructor(String.class, int.class) + .newInstance(topic, Integer.valueOf(0)); + java.util.List partitions = java.util.Collections.singletonList(partition); + consumerClass.getMethod("assign", java.util.Collection.class).invoke(consumer, partitions); + try {{ + consumerClass.getMethod("seekToBeginning", java.util.Collection.class) + .invoke(consumer, partitions); + }} catch (Throwable seekError) {{ + consumerClass.getMethod("seek", topicPartitionClass, long.class) + .invoke(consumer, partition, Long.valueOf(0L)); + }} Object records = consumerClass.getMethod("poll", java.time.Duration.class) .invoke(consumer, java.time.Duration.ofSeconds(2)); if (!(records instanceof Iterable)) {{ @@ -4233,7 +4242,6 @@ public class NyxHarness {{ System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); }} if (success) {{ - consumerClass.getMethod("commitSync").invoke(consumer); nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", topic, Long.toString(offset)); }} delivered = true; diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 4483d7af..81898b74 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -1085,7 +1085,7 @@ def _nyx_try_real_kafka(topic, body, handler_name): if not bootstrap: return False try: - from kafka import KafkaConsumer, KafkaProducer + from kafka import KafkaConsumer, KafkaProducer, TopicPartition except Exception: return False _h = getattr(_entry_mod, handler_name, None) @@ -1104,9 +1104,8 @@ def _nyx_try_real_kafka(topic, body, handler_name): retries=0, ) _consumer = KafkaConsumer( - str(topic), bootstrap_servers=[bootstrap], - group_id="nyx-" + str(os.getpid()), + group_id=None, auto_offset_reset="earliest", enable_auto_commit=False, consumer_timeout_ms=2000, @@ -1118,6 +1117,23 @@ def _nyx_try_real_kafka(topic, body, handler_name): _nyx_record_broker_publish("NYX_KAFKA_LOG", topic, body) _producer.send(str(topic), body).get(timeout=2) _producer.flush(timeout=2) + _tp = TopicPartition(str(topic), 0) + _consumer.assign([_tp]) + try: + _consumer.seek_to_beginning(_tp) + except Exception: + _consumer.seek(_tp, 0) + _records = _consumer.poll(timeout_ms=2000, max_records=1) + for _partition_records in _records.values(): + for _record in _partition_records: + _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value) + _h(_record.value) + try: + _consumer.commit() + except Exception: + pass + _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", topic, str(getattr(_record, "offset", ""))) + return True for _record in _consumer: _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value) _h(_record.value) diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index 495ab120..d6ef6dd2 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -16,9 +16,17 @@ //! and heartbeats. It does not emulate broker policies such as TLS, //! federation, DLX, permissions, or exchange-type routing beyond direct //! queue bindings. +//! +//! Kafka and Pub/Sub follow the same bounded-provider model. Kafka +//! speaks enough of the binary protocol for metadata, produce, assigned +//! partition fetch/list-offsets, and basic consumer-group compatibility. +//! Pub/Sub exposes a plaintext h2/gRPC emulator for create-topic, +//! create-subscription, publish, pull, acknowledge, and the single +//! response shape used by streaming pull clients. use super::{StubEvent, StubKind, StubProvider, monotonic_ns}; -use std::collections::{BTreeMap, VecDeque}; +use prost::Message; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fs::OpenOptions; use std::io::{BufRead, BufReader, Read, Write}; use std::net::{TcpListener, TcpStream}; @@ -40,6 +48,7 @@ pub struct BrokerStub { cursor: Mutex, kafka_listener: Option, sqs_listener: Option, + pubsub_grpc_listener: Option, http_listener: Option, rabbit_amqp_listener: Option, nats_listener: Option, @@ -64,6 +73,11 @@ impl BrokerStub { } else { None }; + let pubsub_grpc_listener = if kind == StubKind::Pubsub { + start_pubsub_grpc_listener(log_path.clone())? + } else { + None + }; let http_listener = if matches!(kind, StubKind::Pubsub | StubKind::Rabbit) { start_http_broker_listener(kind, log_path.clone())? } else { @@ -86,6 +100,7 @@ impl BrokerStub { cursor: Mutex::new(0), kafka_listener, sqs_listener, + pubsub_grpc_listener, http_listener, rabbit_amqp_listener, nats_listener, @@ -144,7 +159,7 @@ impl StubProvider for BrokerStub { fn endpoint(&self) -> String { if let Some(listener) = &self.kafka_listener { - return format!("http://127.0.0.1:{}", listener.port); + return format!("kafka://127.0.0.1:{}", listener.port); } if let Some(listener) = &self.sqs_listener { return format!("http://127.0.0.1:{}", listener.port); @@ -152,6 +167,9 @@ impl StubProvider for BrokerStub { if let Some(listener) = &self.rabbit_amqp_listener { return format!("amqp://127.0.0.1:{}/%2f", listener.port); } + if let Some(listener) = &self.pubsub_grpc_listener { + return format!("pubsub://127.0.0.1:{}", listener.port); + } if let Some(listener) = &self.http_listener { return format!("http://127.0.0.1:{}", listener.port); } @@ -238,6 +256,10 @@ impl Drop for BrokerStub { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); } + if let Some(listener) = &self.pubsub_grpc_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } if let Some(listener) = &self.http_listener { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); @@ -308,14 +330,20 @@ fn kafka_accept_loop( } fn handle_kafka_connection(mut stream: TcpStream, state: Arc>, log_path: &Path) { - let Some(req) = read_http_request(&stream) else { + let mut prefix = [0_u8; 4]; + let n = stream.peek(&mut prefix).unwrap_or(0); + if n > 0 && matches!(prefix[0], b'G' | b'P' | b'D' | b'H') { + let Some(req) = read_http_request(&stream) else { + return; + }; + let response = match handle_kafka_request(&req, state, log_path) { + Ok(body) => http_response_with_type(200, "OK", "application/json", &body), + Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body), + }; + let _ = stream.write_all(response.as_bytes()); return; - }; - let response = match handle_kafka_request(&req, state, log_path) { - Ok(body) => http_response_with_type(200, "OK", "application/json", &body), - Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body), - }; - let _ = stream.write_all(response.as_bytes()); + } + handle_kafka_binary_connection(stream, state, log_path); } fn handle_kafka_request( @@ -403,6 +431,724 @@ fn kafka_path_parts(path: &str) -> Option<(String, String)> { Some((topic, action)) } +const KAFKA_API_PRODUCE: i16 = 0; +const KAFKA_API_FETCH: i16 = 1; +const KAFKA_API_LIST_OFFSETS: i16 = 2; +const KAFKA_API_METADATA: i16 = 3; +const KAFKA_API_OFFSET_COMMIT: i16 = 8; +const KAFKA_API_OFFSET_FETCH: i16 = 9; +const KAFKA_API_FIND_COORDINATOR: i16 = 10; +const KAFKA_API_JOIN_GROUP: i16 = 11; +const KAFKA_API_HEARTBEAT: i16 = 12; +const KAFKA_API_LEAVE_GROUP: i16 = 13; +const KAFKA_API_SYNC_GROUP: i16 = 14; +const KAFKA_API_API_VERSIONS: i16 = 18; + +fn handle_kafka_binary_connection( + mut stream: TcpStream, + state: Arc>, + log_path: &Path, +) { + loop { + let mut len_buf = [0_u8; 4]; + if stream.read_exact(&mut len_buf).is_err() { + break; + } + let len = i32::from_be_bytes(len_buf); + if len <= 0 || len > 1024 * 1024 { + break; + } + let mut body = vec![0_u8; len as usize]; + if stream.read_exact(&mut body).is_err() { + break; + } + let Some(request) = KafkaRequest::parse(&body) else { + break; + }; + let response = kafka_binary_response(&request, Arc::clone(&state), log_path); + let mut framed = Vec::with_capacity(4 + response.len()); + kafka_push_i32(&mut framed, response.len() as i32); + framed.extend_from_slice(&response); + if stream.write_all(&framed).is_err() { + break; + } + } +} + +#[derive(Debug)] +struct KafkaRequest<'a> { + api_key: i16, + api_version: i16, + correlation_id: i32, + body: &'a [u8], +} + +impl<'a> KafkaRequest<'a> { + fn parse(input: &'a [u8]) -> Option { + let mut reader = KafkaReader::new(input); + let api_key = reader.i16()?; + let api_version = reader.i16()?; + let correlation_id = reader.i32()?; + let _client_id = reader.nullable_string()?; + if kafka_api_uses_flexible_header(api_key, api_version) { + reader.tagged_fields()?; + } + Some(Self { + api_key, + api_version, + correlation_id, + body: &input[reader.pos..], + }) + } +} + +fn kafka_api_uses_flexible_header(api_key: i16, version: i16) -> bool { + matches!(api_key, KAFKA_API_API_VERSIONS if version >= 3) +} + +fn kafka_binary_response( + req: &KafkaRequest<'_>, + state: Arc>, + log_path: &Path, +) -> Vec { + let mut out = Vec::new(); + kafka_push_i32(&mut out, req.correlation_id); + if kafka_api_uses_flexible_header(req.api_key, req.api_version) { + kafka_push_unsigned_varint(&mut out, 0); + } + let body = match req.api_key { + KAFKA_API_API_VERSIONS => kafka_api_versions_response(req.api_version), + KAFKA_API_METADATA => kafka_metadata_response(req.api_version, req.body, &state), + KAFKA_API_PRODUCE => kafka_produce_response(req.api_version, req.body, &state, log_path), + KAFKA_API_FETCH => kafka_fetch_response(req.api_version, req.body, &state, log_path), + KAFKA_API_LIST_OFFSETS => kafka_list_offsets_response(req.api_version, req.body, &state), + KAFKA_API_FIND_COORDINATOR => kafka_find_coordinator_response(req.api_version), + KAFKA_API_OFFSET_COMMIT => kafka_offset_commit_response(req.api_version, req.body), + KAFKA_API_OFFSET_FETCH => kafka_offset_fetch_response(req.api_version, req.body), + KAFKA_API_JOIN_GROUP => kafka_join_group_response(req.api_version), + KAFKA_API_SYNC_GROUP => kafka_sync_group_response(req.api_version), + KAFKA_API_HEARTBEAT => kafka_errorless_group_response(req.api_version), + KAFKA_API_LEAVE_GROUP => kafka_errorless_group_response(req.api_version), + _ => kafka_error_response(35), + }; + out.extend_from_slice(&body); + out +} + +fn kafka_api_versions_response(version: i16) -> Vec { + let apis = [ + (KAFKA_API_PRODUCE, 0, 2), + (KAFKA_API_FETCH, 0, 2), + (KAFKA_API_LIST_OFFSETS, 0, 1), + (KAFKA_API_METADATA, 0, 1), + (KAFKA_API_OFFSET_COMMIT, 0, 2), + (KAFKA_API_OFFSET_FETCH, 0, 1), + (KAFKA_API_FIND_COORDINATOR, 0, 1), + (KAFKA_API_JOIN_GROUP, 0, 1), + (KAFKA_API_HEARTBEAT, 0, 0), + (KAFKA_API_LEAVE_GROUP, 0, 0), + (KAFKA_API_SYNC_GROUP, 0, 0), + (KAFKA_API_API_VERSIONS, 0, 3), + ]; + let mut out = Vec::new(); + kafka_push_i16(&mut out, 0); + if version >= 3 { + kafka_push_compact_array_len(&mut out, apis.len()); + for (api, min, max) in apis { + kafka_push_i16(&mut out, api); + kafka_push_i16(&mut out, min); + kafka_push_i16(&mut out, max); + kafka_push_unsigned_varint(&mut out, 0); + } + kafka_push_i32(&mut out, 0); + kafka_push_unsigned_varint(&mut out, 0); + } else { + kafka_push_i32(&mut out, apis.len() as i32); + for (api, min, max) in apis { + kafka_push_i16(&mut out, api); + kafka_push_i16(&mut out, min); + kafka_push_i16(&mut out, max); + } + if version >= 1 { + kafka_push_i32(&mut out, 0); + } + } + out +} + +fn kafka_metadata_response(version: i16, body: &[u8], state: &Arc>) -> Vec { + let mut topics = kafka_metadata_topics(body); + if topics.is_empty() + && let Ok(guard) = state.lock() + { + topics.extend(guard.topics.keys().cloned()); + } + if topics.is_empty() { + topics.push("nyx".to_owned()); + } + topics.sort(); + topics.dedup(); + + let mut out = Vec::new(); + kafka_push_i32(&mut out, 1); + kafka_push_i32(&mut out, 0); + kafka_push_string(&mut out, "127.0.0.1"); + kafka_push_i32(&mut out, 0); + if version >= 1 { + kafka_push_nullable_string(&mut out, None); + kafka_push_i32(&mut out, 0); + } + kafka_push_i32(&mut out, topics.len() as i32); + for topic in topics { + kafka_push_i16(&mut out, 0); + kafka_push_string(&mut out, &topic); + if version >= 1 { + out.push(0); + } + kafka_push_i32(&mut out, 1); + kafka_push_i16(&mut out, 0); + kafka_push_i32(&mut out, 0); + kafka_push_i32(&mut out, 0); + kafka_push_i32(&mut out, 1); + kafka_push_i32(&mut out, 0); + kafka_push_i32(&mut out, 1); + kafka_push_i32(&mut out, 0); + } + out +} + +fn kafka_metadata_topics(body: &[u8]) -> Vec { + let mut reader = KafkaReader::new(body); + let Some(len) = reader.array_len() else { + return Vec::new(); + }; + if len < 0 { + return Vec::new(); + } + let mut topics = Vec::new(); + for _ in 0..len.min(256) { + let Some(topic) = reader.string() else { + break; + }; + topics.push(topic); + } + topics +} + +fn kafka_produce_response( + version: i16, + body: &[u8], + state: &Arc>, + log_path: &Path, +) -> Vec { + let produced = kafka_parse_produce_request(version, body); + let mut response_topics = BTreeMap::>::new(); + if let Ok(mut guard) = state.lock() { + for (topic, partition, value) in produced { + let offset = guard.next_offsets.entry(topic.clone()).or_insert(0); + let message = KafkaMessage { + offset: *offset, + value, + }; + *offset += 1; + guard + .topics + .entry(topic.clone()) + .or_default() + .push_back(message.clone()); + let _ = append_broker_event(log_path, "publish", &topic, &message.value); + response_topics + .entry(topic) + .or_default() + .push((partition, message.offset as i64)); + } + } + if response_topics.is_empty() { + response_topics.insert("nyx".to_owned(), vec![(0, 0)]); + } + + let mut out = Vec::new(); + kafka_push_i32(&mut out, response_topics.len() as i32); + for (topic, partitions) in response_topics { + kafka_push_string(&mut out, &topic); + kafka_push_i32(&mut out, partitions.len() as i32); + for (partition, offset) in partitions { + kafka_push_i32(&mut out, partition); + kafka_push_i16(&mut out, 0); + kafka_push_i64(&mut out, offset); + if version >= 2 { + kafka_push_i64(&mut out, -1); + } + } + } + if version >= 1 { + kafka_push_i32(&mut out, 0); + } + out +} + +fn kafka_parse_produce_request(version: i16, body: &[u8]) -> Vec<(String, i32, String)> { + let mut reader = KafkaReader::new(body); + if version >= 3 { + let _ = reader.nullable_string(); + } + let _acks = reader.i16(); + let _timeout = reader.i32(); + let Some(topic_len) = reader.array_len() else { + return Vec::new(); + }; + let mut out = Vec::new(); + for _ in 0..topic_len.max(0).min(256) { + let Some(topic) = reader.string() else { + break; + }; + let Some(partition_len) = reader.array_len() else { + break; + }; + for _ in 0..partition_len.max(0).min(256) { + let Some(partition) = reader.i32() else { + break; + }; + let Some(record_set) = reader.bytes() else { + break; + }; + for value in kafka_message_set_values(record_set) { + out.push((topic.clone(), partition, value)); + } + } + } + out +} + +fn kafka_fetch_response( + version: i16, + body: &[u8], + state: &Arc>, + log_path: &Path, +) -> Vec { + let requested = kafka_parse_fetch_request(version, body); + let mut out = Vec::new(); + if version >= 1 { + kafka_push_i32(&mut out, 0); + } + kafka_push_i32(&mut out, requested.len() as i32); + let guard = state.lock().ok(); + for (topic, partitions) in requested { + kafka_push_string(&mut out, &topic); + kafka_push_i32(&mut out, partitions.len() as i32); + for (partition, fetch_offset) in partitions { + let messages: Vec = guard + .as_ref() + .and_then(|g| g.topics.get(&topic)) + .map(|queue| { + queue + .iter() + .filter(|m| m.offset >= fetch_offset as u64) + .take(32) + .cloned() + .collect() + }) + .unwrap_or_default(); + let high_watermark = guard + .as_ref() + .and_then(|g| g.next_offsets.get(&topic).copied()) + .unwrap_or(0) as i64; + let mut message_set = Vec::new(); + for message in messages { + kafka_push_message_set_entry( + &mut message_set, + message.offset, + message.value.as_bytes(), + ); + let _ = append_broker_event(log_path, "deliver", &topic, &message.value); + } + kafka_push_i32(&mut out, partition); + kafka_push_i16(&mut out, 0); + kafka_push_i64(&mut out, high_watermark); + kafka_push_bytes(&mut out, &message_set); + } + } + out +} + +fn kafka_parse_fetch_request(version: i16, body: &[u8]) -> BTreeMap> { + let mut reader = KafkaReader::new(body); + let _replica_id = reader.i32(); + let _max_wait_ms = reader.i32(); + let _min_bytes = reader.i32(); + if version >= 3 { + let _max_bytes = reader.i32(); + } + let mut out = BTreeMap::new(); + let Some(topic_len) = reader.array_len() else { + return out; + }; + for _ in 0..topic_len.max(0).min(256) { + let Some(topic) = reader.string() else { + break; + }; + let Some(partition_len) = reader.array_len() else { + break; + }; + let mut partitions = Vec::new(); + for _ in 0..partition_len.max(0).min(256) { + let Some(partition) = reader.i32() else { + break; + }; + let Some(fetch_offset) = reader.i64() else { + break; + }; + let _max_bytes = reader.i32(); + partitions.push((partition, fetch_offset)); + } + out.insert(topic, partitions); + } + out +} + +fn kafka_list_offsets_response( + _version: i16, + body: &[u8], + state: &Arc>, +) -> Vec { + let requested = kafka_parse_list_offsets_request(body); + let mut out = Vec::new(); + kafka_push_i32(&mut out, requested.len() as i32); + let guard = state.lock().ok(); + for (topic, partitions) in requested { + kafka_push_string(&mut out, &topic); + kafka_push_i32(&mut out, partitions.len() as i32); + for (partition, timestamp) in partitions { + let end_offset = guard + .as_ref() + .and_then(|g| g.next_offsets.get(&topic).copied()) + .unwrap_or(0) as i64; + let offset = if timestamp == -1 { end_offset } else { 0 }; + kafka_push_i32(&mut out, partition); + kafka_push_i16(&mut out, 0); + kafka_push_i64(&mut out, timestamp); + kafka_push_i64(&mut out, offset); + } + } + out +} + +fn kafka_parse_list_offsets_request(body: &[u8]) -> BTreeMap> { + let mut reader = KafkaReader::new(body); + let _replica_id = reader.i32(); + let mut out = BTreeMap::new(); + let Some(topic_len) = reader.array_len() else { + return out; + }; + for _ in 0..topic_len.max(0).min(256) { + let Some(topic) = reader.string() else { + break; + }; + let Some(partition_len) = reader.array_len() else { + break; + }; + let mut partitions = Vec::new(); + for _ in 0..partition_len.max(0).min(256) { + let Some(partition) = reader.i32() else { + break; + }; + let Some(timestamp) = reader.i64() else { + break; + }; + partitions.push((partition, timestamp)); + } + out.insert(topic, partitions); + } + out +} + +fn kafka_find_coordinator_response(version: i16) -> Vec { + let mut out = Vec::new(); + if version >= 1 { + kafka_push_i32(&mut out, 0); + } + kafka_push_i16(&mut out, 0); + kafka_push_i32(&mut out, 0); + kafka_push_string(&mut out, "127.0.0.1"); + kafka_push_i32(&mut out, 0); + out +} + +fn kafka_offset_commit_response(_version: i16, body: &[u8]) -> Vec { + let mut reader = KafkaReader::new(body); + let _group = reader.string(); + let mut out = Vec::new(); + let topic_len = reader.array_len().unwrap_or(0).max(0); + kafka_push_i32(&mut out, topic_len); + for _ in 0..topic_len.min(256) { + let topic = reader.string().unwrap_or_else(|| "nyx".to_owned()); + kafka_push_string(&mut out, &topic); + let partition_len = reader.array_len().unwrap_or(0).max(0); + kafka_push_i32(&mut out, partition_len); + for _ in 0..partition_len.min(256) { + let partition = reader.i32().unwrap_or(0); + let _offset = reader.i64(); + let _metadata = reader.string(); + kafka_push_i32(&mut out, partition); + kafka_push_i16(&mut out, 0); + } + } + out +} + +fn kafka_offset_fetch_response(_version: i16, body: &[u8]) -> Vec { + let mut reader = KafkaReader::new(body); + let _group = reader.string(); + let topic_len = reader.array_len().unwrap_or(0).max(0); + let mut out = Vec::new(); + kafka_push_i32(&mut out, topic_len); + for _ in 0..topic_len.min(256) { + let topic = reader.string().unwrap_or_else(|| "nyx".to_owned()); + kafka_push_string(&mut out, &topic); + let partition_len = reader.array_len().unwrap_or(0).max(0); + kafka_push_i32(&mut out, partition_len); + for _ in 0..partition_len.min(256) { + let partition = reader.i32().unwrap_or(0); + kafka_push_i32(&mut out, partition); + kafka_push_i64(&mut out, -1); + kafka_push_string(&mut out, ""); + kafka_push_i16(&mut out, 0); + } + } + out +} + +fn kafka_join_group_response(_version: i16) -> Vec { + let mut out = Vec::new(); + kafka_push_i16(&mut out, 0); + kafka_push_i32(&mut out, 1); + kafka_push_string(&mut out, "range"); + kafka_push_string(&mut out, "nyx-member"); + kafka_push_string(&mut out, "nyx-member"); + kafka_push_i32(&mut out, 0); + out +} + +fn kafka_sync_group_response(_version: i16) -> Vec { + let mut out = Vec::new(); + kafka_push_i16(&mut out, 0); + kafka_push_bytes(&mut out, &[]); + out +} + +fn kafka_errorless_group_response(_version: i16) -> Vec { + let mut out = Vec::new(); + kafka_push_i16(&mut out, 0); + out +} + +fn kafka_error_response(error_code: i16) -> Vec { + let mut out = Vec::new(); + kafka_push_i16(&mut out, error_code); + out +} + +fn kafka_message_set_values(mut input: &[u8]) -> Vec { + let mut out = Vec::new(); + while input.len() >= 12 { + let mut reader = KafkaReader::new(input); + let _offset = reader.i64(); + let Some(size) = reader.i32() else { + break; + }; + if size < 0 || reader.pos + size as usize > input.len() { + break; + } + let message = &input[reader.pos..reader.pos + size as usize]; + if let Some(value) = kafka_message_value(message) { + out.push(value); + } + input = &input[reader.pos + size as usize..]; + } + out +} + +fn kafka_message_value(message: &[u8]) -> Option { + let mut reader = KafkaReader::new(message); + let _crc = reader.i32()?; + let magic = reader.u8()?; + let _attributes = reader.u8()?; + if magic == 1 { + let _timestamp = reader.i64()?; + } + let _key = reader.bytes()?; + let value = reader.bytes()?; + Some(String::from_utf8_lossy(value).into_owned()) +} + +fn kafka_push_message_set_entry(out: &mut Vec, offset: u64, value: &[u8]) { + let mut message = Vec::new(); + message.extend_from_slice(&[0, 0, 0, 0]); + message.push(1); + message.push(0); + kafka_push_i64(&mut message, 0); + kafka_push_i32(&mut message, -1); + kafka_push_i32(&mut message, value.len() as i32); + message.extend_from_slice(value); + let crc = crc32_ieee(&message[4..]); + message[0..4].copy_from_slice(&crc.to_be_bytes()); + kafka_push_i64(out, offset as i64); + kafka_push_i32(out, message.len() as i32); + out.extend_from_slice(&message); +} + +fn crc32_ieee(bytes: &[u8]) -> u32 { + let mut crc = 0xffff_ffff_u32; + for byte in bytes { + crc ^= u32::from(*byte); + for _ in 0..8 { + let mask = 0_u32.wrapping_sub(crc & 1); + crc = (crc >> 1) ^ (0xedb8_8320 & mask); + } + } + !crc +} + +#[derive(Debug)] +struct KafkaReader<'a> { + input: &'a [u8], + pos: usize, +} + +impl<'a> KafkaReader<'a> { + fn new(input: &'a [u8]) -> Self { + Self { input, pos: 0 } + } + + fn take(&mut self, len: usize) -> Option<&'a [u8]> { + let end = self.pos.checked_add(len)?; + let bytes = self.input.get(self.pos..end)?; + self.pos = end; + Some(bytes) + } + + fn u8(&mut self) -> Option { + Some(*self.take(1)?.first()?) + } + + fn i16(&mut self) -> Option { + Some(i16::from_be_bytes(self.take(2)?.try_into().ok()?)) + } + + fn i32(&mut self) -> Option { + Some(i32::from_be_bytes(self.take(4)?.try_into().ok()?)) + } + + fn i64(&mut self) -> Option { + Some(i64::from_be_bytes(self.take(8)?.try_into().ok()?)) + } + + fn array_len(&mut self) -> Option { + self.i32() + } + + fn string(&mut self) -> Option { + let len = self.i16()?; + if len < 0 { + return None; + } + Some(String::from_utf8_lossy(self.take(len as usize)?).into_owned()) + } + + fn nullable_string(&mut self) -> Option> { + let len = self.i16()?; + if len < 0 { + return Some(None); + } + Some(Some( + String::from_utf8_lossy(self.take(len as usize)?).into_owned(), + )) + } + + fn bytes(&mut self) -> Option<&'a [u8]> { + let len = self.i32()?; + if len < 0 { + return Some(&[]); + } + self.take(len as usize) + } + + fn unsigned_varint(&mut self) -> Option { + let mut value = 0_u32; + let mut shift = 0; + loop { + let byte = self.u8()?; + value |= u32::from(byte & 0x7f) << shift; + if byte & 0x80 == 0 { + return Some(value); + } + shift += 7; + if shift > 28 { + return None; + } + } + } + + fn tagged_fields(&mut self) -> Option<()> { + let fields = self.unsigned_varint()?; + for _ in 0..fields.min(1024) { + let _tag = self.unsigned_varint()?; + let len = self.unsigned_varint()? as usize; + let _ = self.take(len)?; + } + Some(()) + } +} + +fn kafka_push_i16(out: &mut Vec, value: i16) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn kafka_push_i32(out: &mut Vec, value: i32) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn kafka_push_i64(out: &mut Vec, value: i64) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn kafka_push_string(out: &mut Vec, value: &str) { + let bytes = value.as_bytes(); + kafka_push_i16(out, bytes.len().min(i16::MAX as usize) as i16); + out.extend_from_slice(&bytes[..bytes.len().min(i16::MAX as usize)]); +} + +fn kafka_push_nullable_string(out: &mut Vec, value: Option<&str>) { + if let Some(value) = value { + kafka_push_string(out, value); + } else { + kafka_push_i16(out, -1); + } +} + +fn kafka_push_bytes(out: &mut Vec, value: &[u8]) { + kafka_push_i32(out, value.len() as i32); + out.extend_from_slice(value); +} + +fn kafka_push_unsigned_varint(out: &mut Vec, mut value: u32) { + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + out.push(byte); + if value == 0 { + break; + } + } +} + +fn kafka_push_compact_array_len(out: &mut Vec, len: usize) { + kafka_push_unsigned_varint(out, len.saturating_add(1) as u32); +} + fn json_error(message: &str) -> String { serde_json::json!({ "error": message }).to_string() } @@ -710,6 +1456,475 @@ fn http_broker_message_json( } } +#[derive(Debug)] +struct PubsubGrpcListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Clone)] +struct PubsubGrpcQueuedMessage { + ack_id: String, + message_id: String, + data: Vec, +} + +#[derive(Debug, Default)] +struct PubsubGrpcState { + topics: BTreeSet, + subscriptions: BTreeMap, + queues: BTreeMap>, + inflight: BTreeMap, + next_id: u64, +} + +fn start_pubsub_grpc_listener(log_path: PathBuf) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(PubsubGrpcState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || { + pubsub_grpc_accept_loop(listener, shutdown_clone, state_clone, log_path) + }); + Ok(Some(PubsubGrpcListener { port, shutdown })) +} + +fn pubsub_grpc_accept_loop( + listener: TcpListener, + shutdown: Arc, + state: Arc>, + log_path: PathBuf, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(5))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(5))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || { + if stream.set_nonblocking(true).is_err() { + return; + } + let Ok(runtime) = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + else { + return; + }; + runtime.block_on(async move { + let Ok(stream) = tokio::net::TcpStream::from_std(stream) else { + return; + }; + handle_pubsub_grpc_connection(stream, state, log_path).await; + }); + }); + } +} + +async fn handle_pubsub_grpc_connection( + stream: tokio::net::TcpStream, + state: Arc>, + log_path: PathBuf, +) { + let Ok(mut connection) = h2::server::handshake(stream).await else { + return; + }; + while let Some(request) = connection.accept().await { + let Ok((request, respond)) = request else { + break; + }; + let path = request.uri().path().to_owned(); + let body = request.into_body(); + if path.ends_with("/StreamingPull") { + handle_pubsub_streaming_pull(body, respond, Arc::clone(&state), &log_path).await; + } else { + let body = pubsub_grpc_read_all(body).await; + handle_pubsub_unary(&path, &body, respond, Arc::clone(&state), &log_path).await; + } + } +} + +async fn pubsub_grpc_read_all(mut body: h2::RecvStream) -> Vec { + let mut out = Vec::new(); + while let Some(chunk) = body.data().await { + let Ok(bytes) = chunk else { + break; + }; + let len = bytes.len(); + out.extend_from_slice(&bytes); + let _ = body.flow_control().release_capacity(len); + } + out +} + +async fn pubsub_grpc_read_one_message(mut body: h2::RecvStream) -> Option> { + let mut out = Vec::new(); + while out.len() < 5 || out.len() < 5 + u32::from_be_bytes(out[1..5].try_into().ok()?) as usize { + let bytes = body.data().await?.ok()?; + let len = bytes.len(); + out.extend_from_slice(&bytes); + let _ = body.flow_control().release_capacity(len); + } + pubsub_grpc_unframe(&out) +} + +async fn handle_pubsub_unary( + path: &str, + framed_body: &[u8], + respond: h2::server::SendResponse, + state: Arc>, + log_path: &Path, +) { + let payload = pubsub_grpc_unframe(framed_body).unwrap_or_default(); + match path { + "/google.pubsub.v1.Publisher/CreateTopic" => { + let topic = PubsubTopic::decode(payload.as_slice()).unwrap_or_default(); + if let Ok(mut guard) = state.lock() { + guard.topics.insert(topic.name.clone()); + } + pubsub_send_grpc_message(respond, &topic).await; + } + "/google.pubsub.v1.Publisher/GetTopic" => { + let req = PubsubGetTopicRequest::decode(payload.as_slice()).unwrap_or_default(); + let topic = PubsubTopic { name: req.topic }; + if let Ok(mut guard) = state.lock() { + guard.topics.insert(topic.name.clone()); + } + pubsub_send_grpc_message(respond, &topic).await; + } + "/google.pubsub.v1.Publisher/Publish" => { + let req = PubsubPublishRequest::decode(payload.as_slice()).unwrap_or_default(); + let response = pubsub_publish(&state, log_path, req); + pubsub_send_grpc_message(respond, &response).await; + } + "/google.pubsub.v1.Subscriber/CreateSubscription" => { + let sub = PubsubSubscription::decode(payload.as_slice()).unwrap_or_default(); + if let Ok(mut guard) = state.lock() { + guard.topics.insert(sub.topic.clone()); + guard + .subscriptions + .insert(sub.name.clone(), sub.topic.clone()); + guard.queues.entry(sub.name.clone()).or_default(); + } + pubsub_send_grpc_message(respond, &sub).await; + } + "/google.pubsub.v1.Subscriber/Pull" => { + let req = PubsubPullRequest::decode(payload.as_slice()).unwrap_or_default(); + let response = pubsub_pull(&state, log_path, &req.subscription, req.max_messages); + pubsub_send_grpc_message(respond, &response).await; + } + "/google.pubsub.v1.Subscriber/Acknowledge" => { + let req = PubsubAcknowledgeRequest::decode(payload.as_slice()).unwrap_or_default(); + pubsub_ack(&state, log_path, &req.subscription, &req.ack_ids); + pubsub_send_grpc_message(respond, &PubsubEmpty::default()).await; + } + _ => pubsub_send_grpc_status(respond, 12, "unimplemented").await, + } +} + +async fn handle_pubsub_streaming_pull( + body: h2::RecvStream, + respond: h2::server::SendResponse, + state: Arc>, + log_path: &Path, +) { + let Some(payload) = pubsub_grpc_read_one_message(body).await else { + pubsub_send_grpc_status(respond, 3, "missing request").await; + return; + }; + let req = PubsubStreamingPullRequest::decode(payload.as_slice()).unwrap_or_default(); + if !req.ack_ids.is_empty() { + pubsub_ack(&state, log_path, &req.subscription, &req.ack_ids); + } + let max_messages = if req.max_outstanding_messages > 0 { + req.max_outstanding_messages.min(i64::from(i32::MAX)) as i32 + } else { + 1 + }; + let pull = pubsub_pull(&state, log_path, &req.subscription, max_messages); + let response = PubsubStreamingPullResponse { + received_messages: pull.received_messages, + }; + pubsub_send_grpc_message(respond, &response).await; +} + +fn pubsub_publish( + state: &Arc>, + log_path: &Path, + req: PubsubPublishRequest, +) -> PubsubPublishResponse { + let mut ids = Vec::new(); + let Ok(mut guard) = state.lock() else { + return PubsubPublishResponse { message_ids: ids }; + }; + guard.topics.insert(req.topic.clone()); + let subscriptions: Vec = guard + .subscriptions + .iter() + .filter_map(|(sub, topic)| (topic == &req.topic).then_some(sub.clone())) + .collect(); + for message in req.messages { + guard.next_id += 1; + let id = format!("nyx-{:08}", guard.next_id); + let ack_id = format!("ack-{}", id); + ids.push(id.clone()); + let payload = String::from_utf8_lossy(&message.data).into_owned(); + let _ = append_broker_event(log_path, "publish", &req.topic, &payload); + let queued = PubsubGrpcQueuedMessage { + ack_id, + message_id: id, + data: message.data, + }; + for sub in &subscriptions { + guard + .queues + .entry(sub.clone()) + .or_default() + .push_back(queued.clone()); + } + } + PubsubPublishResponse { message_ids: ids } +} + +fn pubsub_pull( + state: &Arc>, + log_path: &Path, + subscription: &str, + max_messages: i32, +) -> PubsubPullResponse { + let mut received = Vec::new(); + let Ok(mut guard) = state.lock() else { + return PubsubPullResponse { + received_messages: received, + }; + }; + let max_messages = max_messages.clamp(1, 100) as usize; + for _ in 0..max_messages { + let Some(message) = guard + .queues + .entry(subscription.to_owned()) + .or_default() + .pop_front() + else { + break; + }; + let payload = String::from_utf8_lossy(&message.data).into_owned(); + let _ = append_broker_event(log_path, "deliver", subscription, &payload); + guard.inflight.insert( + message.ack_id.clone(), + (subscription.to_owned(), message.clone()), + ); + received.push(PubsubReceivedMessage { + ack_id: message.ack_id.clone(), + message: Some(PubsubMessage { + data: message.data, + message_id: message.message_id, + ordering_key: String::new(), + }), + delivery_attempt: 1, + }); + } + PubsubPullResponse { + received_messages: received, + } +} + +fn pubsub_ack( + state: &Arc>, + log_path: &Path, + subscription: &str, + ack_ids: &[String], +) { + if let Ok(mut guard) = state.lock() { + for ack_id in ack_ids { + if guard.inflight.remove(ack_id).is_some() { + let _ = append_broker_event(log_path, "ack", subscription, ack_id); + } + } + } +} + +async fn pubsub_send_grpc_message( + respond: h2::server::SendResponse, + message: &M, +) { + let mut payload = Vec::new(); + if message.encode(&mut payload).is_err() { + pubsub_send_grpc_status(respond, 13, "encode failed").await; + return; + } + pubsub_send_grpc_payload(respond, pubsub_grpc_frame(&payload)).await; +} + +async fn pubsub_send_grpc_payload( + mut respond: h2::server::SendResponse, + framed_payload: Vec, +) { + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let Ok(mut send) = respond.send_response(response, false) else { + return; + }; + if send + .send_data(bytes::Bytes::from(framed_payload), false) + .is_err() + { + return; + } + let mut trailers = http::HeaderMap::new(); + trailers.insert("grpc-status", http::HeaderValue::from_static("0")); + let _ = send.send_trailers(trailers); +} + +async fn pubsub_send_grpc_status( + mut respond: h2::server::SendResponse, + code: u16, + message: &str, +) { + let response = http::Response::builder() + .status(200) + .header("content-type", "application/grpc") + .header("grpc-status", code.to_string()) + .header("grpc-message", message) + .body(()) + .unwrap(); + let _ = respond.send_response(response, true); +} + +fn pubsub_grpc_frame(payload: &[u8]) -> Vec { + let mut out = Vec::with_capacity(5 + payload.len()); + out.push(0); + out.extend_from_slice(&(payload.len() as u32).to_be_bytes()); + out.extend_from_slice(payload); + out +} + +fn pubsub_grpc_unframe(input: &[u8]) -> Option> { + if input.len() < 5 || input[0] != 0 { + return None; + } + let len = u32::from_be_bytes(input[1..5].try_into().ok()?) as usize; + Some(input.get(5..5 + len)?.to_vec()) +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubEmpty {} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubTopic { + #[prost(string, tag = "1")] + name: String, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubGetTopicRequest { + #[prost(string, tag = "1")] + topic: String, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubMessage { + #[prost(bytes = "vec", tag = "1")] + data: Vec, + #[prost(string, tag = "3")] + message_id: String, + #[prost(string, tag = "5")] + ordering_key: String, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubPublishRequest { + #[prost(string, tag = "1")] + topic: String, + #[prost(message, repeated, tag = "2")] + messages: Vec, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubPublishResponse { + #[prost(string, repeated, tag = "1")] + message_ids: Vec, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubSubscription { + #[prost(string, tag = "1")] + name: String, + #[prost(string, tag = "2")] + topic: String, + #[prost(int32, tag = "5")] + ack_deadline_seconds: i32, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubPullRequest { + #[prost(string, tag = "1")] + subscription: String, + #[prost(bool, tag = "2")] + return_immediately: bool, + #[prost(int32, tag = "3")] + max_messages: i32, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubReceivedMessage { + #[prost(string, tag = "1")] + ack_id: String, + #[prost(message, optional, tag = "2")] + message: Option, + #[prost(int32, tag = "3")] + delivery_attempt: i32, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubPullResponse { + #[prost(message, repeated, tag = "1")] + received_messages: Vec, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubAcknowledgeRequest { + #[prost(string, tag = "1")] + subscription: String, + #[prost(string, repeated, tag = "2")] + ack_ids: Vec, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubStreamingPullRequest { + #[prost(string, tag = "1")] + subscription: String, + #[prost(string, repeated, tag = "2")] + ack_ids: Vec, + #[prost(int32, tag = "3")] + stream_ack_deadline_seconds: i32, + #[prost(string, tag = "5")] + client_id: String, + #[prost(int64, tag = "7")] + max_outstanding_messages: i64, + #[prost(int64, tag = "8")] + max_outstanding_bytes: i64, +} + +#[derive(Clone, PartialEq, prost::Message)] +struct PubsubStreamingPullResponse { + #[prost(message, repeated, tag = "1")] + received_messages: Vec, +} + #[derive(Debug)] struct RabbitAmqpListener { port: u16, @@ -2052,8 +3267,8 @@ mod tests { assert!(stub.log_path().exists()); let endpoint = stub.endpoint(); assert!( - endpoint == "loopback://kafka" || endpoint.starts_with("http://127.0.0.1:"), - "Kafka endpoint should be loopback fallback or HTTP emulator, got {endpoint}" + endpoint == "loopback://kafka" || endpoint.starts_with("kafka://127.0.0.1:"), + "Kafka endpoint should be loopback fallback or Kafka protocol endpoint, got {endpoint}" ); assert_eq!( stub.recording_endpoint().unwrap().0, @@ -2091,7 +3306,7 @@ mod tests { } #[test] - fn kafka_broker_exposes_http_emulator() { + fn kafka_broker_exposes_protocol_endpoint() { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); let endpoint = stub.endpoint(); @@ -2099,13 +3314,13 @@ mod tests { return; } assert!( - endpoint.starts_with("http://127.0.0.1:"), - "Kafka endpoint should be a host-side HTTP emulator, got {endpoint}" + endpoint.starts_with("kafka://127.0.0.1:"), + "Kafka endpoint should be a protocol-compatible endpoint, got {endpoint}" ); } #[test] - fn pubsub_broker_exposes_http_emulator() { + fn pubsub_broker_exposes_grpc_endpoint() { for kind in [StubKind::Pubsub] { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(kind, dir.path()).unwrap(); @@ -2114,8 +3329,8 @@ mod tests { continue; } assert!( - endpoint.starts_with("http://127.0.0.1:"), - "{kind:?} endpoint should be a host-side HTTP emulator, got {endpoint}" + endpoint.starts_with("pubsub://127.0.0.1:"), + "{kind:?} endpoint should be a protocol-compatible gRPC endpoint, got {endpoint}" ); } } @@ -2149,7 +3364,7 @@ mod tests { } #[test] - fn kafka_http_emulator_records_publish_deliver_ack() { + fn kafka_protocol_server_records_publish_deliver() { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); let endpoint = stub.endpoint(); @@ -2157,27 +3372,57 @@ mod tests { return; } let port: u16 = endpoint - .trim_start_matches("http://127.0.0.1:") + .trim_start_matches("kafka://127.0.0.1:") .parse() .unwrap(); - let send = http_post(port, "/topics/orders/messages", "NYX\tPAYLOAD"); - assert!(send.contains(r#""offset":0"#), "{send}"); + let api_versions = kafka_roundtrip(port, kafka_test_request(18, 3, 1, &[])); + let mut api_reader = KafkaReader::new(&api_versions); + assert_eq!(api_reader.i32(), Some(1)); + assert!(api_versions.len() > 8, "{api_versions:?}"); - let receive = http_get(port, "/topics/orders/records?max=1"); - assert!(receive.contains(r#""value":"NYX\tPAYLOAD""#), "{receive}"); + let mut metadata_body = Vec::new(); + kafka_push_i32(&mut metadata_body, 1); + kafka_push_string(&mut metadata_body, "orders"); + let metadata = kafka_roundtrip(port, kafka_test_request(3, 1, 2, &metadata_body)); + let mut metadata_reader = KafkaReader::new(&metadata); + assert_eq!(metadata_reader.i32(), Some(2)); + assert!(metadata.windows("orders".len()).any(|w| w == b"orders")); - let commit = http_post(port, "/topics/orders/commit", "offset=0"); - assert!(commit.contains(r#""committed":true"#), "{commit}"); + let mut message_set = Vec::new(); + kafka_push_message_set_entry(&mut message_set, 0, b"NYX\tPAYLOAD"); + let mut produce_body = Vec::new(); + kafka_push_i16(&mut produce_body, 1); + kafka_push_i32(&mut produce_body, 1000); + kafka_push_i32(&mut produce_body, 1); + kafka_push_string(&mut produce_body, "orders"); + kafka_push_i32(&mut produce_body, 1); + kafka_push_i32(&mut produce_body, 0); + kafka_push_bytes(&mut produce_body, &message_set); + let produce = kafka_roundtrip(port, kafka_test_request(0, 2, 3, &produce_body)); + assert_eq!(&produce[..4], &3_i32.to_be_bytes()); + + let mut fetch_body = Vec::new(); + kafka_push_i32(&mut fetch_body, -1); + kafka_push_i32(&mut fetch_body, 100); + kafka_push_i32(&mut fetch_body, 1); + kafka_push_i32(&mut fetch_body, 1); + kafka_push_string(&mut fetch_body, "orders"); + kafka_push_i32(&mut fetch_body, 1); + kafka_push_i32(&mut fetch_body, 0); + kafka_push_i64(&mut fetch_body, 0); + kafka_push_i32(&mut fetch_body, 1024 * 1024); + let fetch = kafka_roundtrip(port, kafka_test_request(1, 2, 4, &fetch_body)); + let fetched_values = kafka_test_fetch_values(&fetch); + assert_eq!(fetched_values, vec!["NYX\tPAYLOAD".to_owned()]); let events = stub.drain_events(); let actions: Vec<&str> = events .iter() .map(|ev| ev.detail.get("action").unwrap().as_str()) .collect(); - assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(actions, vec!["publish", "deliver"]); assert_eq!(events[0].detail.get("destination").unwrap(), "orders"); assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); - assert_eq!(events[2].detail.get("payload").unwrap(), "0"); } #[test] @@ -2233,62 +3478,88 @@ mod tests { } #[test] - fn pubsub_http_broker_emulator_records_publish_deliver_ack() { - let cases = [(StubKind::Pubsub, "topics", "projects/p/topics/orders")]; - for (kind, root, destination) in cases { - let dir = TempDir::new().unwrap(); - let stub = BrokerStub::start(kind, dir.path()).unwrap(); - let endpoint = stub.endpoint(); - if endpoint == format!("loopback://{}", kind.tag()) { - continue; - } - let port: u16 = endpoint - .trim_start_matches("http://127.0.0.1:") - .parse() - .unwrap(); - let escaped_destination = form_escape(destination); - let send = http_post( - port, - &format!("/{root}/{escaped_destination}/messages"), - "NYX\tPAYLOAD", - ); - assert!(send.contains(r#""id":"nyx-00000001""#), "{send}"); - - let receive = http_get( - port, - &format!("/{root}/{escaped_destination}/messages?max=1"), - ); - let parsed: serde_json::Value = serde_json::from_str(response_body(&receive)).unwrap(); - let message = parsed["messages"][0].as_object().unwrap(); - let payload = message - .get("data") - .or_else(|| message.get("body")) - .and_then(|v| v.as_str()) - .unwrap(); - assert_eq!(payload, "NYX\tPAYLOAD"); - let ack_id = message - .get("ack_id") - .or_else(|| message.get("delivery_tag")) - .and_then(|v| v.as_str()) - .unwrap(); - - let ack = http_post( - port, - &format!("/{root}/{escaped_destination}/ack"), - &format!("ack_id={}", form_escape(ack_id)), - ); - assert!(ack.contains(r#""acked":true"#), "{ack}"); - - let events = stub.drain_events(); - let actions: Vec<&str> = events - .iter() - .map(|ev| ev.detail.get("action").unwrap().as_str()) - .collect(); - assert_eq!(actions, vec!["publish", "deliver", "ack"], "{kind:?}"); - assert_eq!(events[0].detail.get("destination").unwrap(), destination); - assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); - assert_eq!(events[2].detail.get("payload").unwrap(), ack_id); + fn pubsub_grpc_emulator_records_publish_deliver_ack() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Pubsub, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://pubsub" { + return; } + let port: u16 = endpoint + .trim_start_matches("pubsub://127.0.0.1:") + .parse() + .unwrap(); + let topic = "projects/nyx/topics/orders"; + let subscription = "projects/nyx/subscriptions/orders-sub"; + + let created_topic: PubsubTopic = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Publisher/CreateTopic", + &PubsubTopic { + name: topic.to_owned(), + }, + ); + assert_eq!(created_topic.name, topic); + + let created_subscription: PubsubSubscription = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Subscriber/CreateSubscription", + &PubsubSubscription { + name: subscription.to_owned(), + topic: topic.to_owned(), + ack_deadline_seconds: 10, + }, + ); + assert_eq!(created_subscription.name, subscription); + + let published: PubsubPublishResponse = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Publisher/Publish", + &PubsubPublishRequest { + topic: topic.to_owned(), + messages: vec![PubsubMessage { + data: b"NYX\tPAYLOAD".to_vec(), + message_id: String::new(), + ordering_key: String::new(), + }], + }, + ); + assert_eq!(published.message_ids, vec!["nyx-00000001"]); + + let pulled: PubsubPullResponse = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Subscriber/Pull", + &PubsubPullRequest { + subscription: subscription.to_owned(), + return_immediately: true, + max_messages: 1, + }, + ); + assert_eq!(pulled.received_messages.len(), 1); + let received = &pulled.received_messages[0]; + assert_eq!( + received.message.as_ref().unwrap().data, + b"NYX\tPAYLOAD".to_vec() + ); + + let _empty: PubsubEmpty = pubsub_grpc_unary( + port, + "/google.pubsub.v1.Subscriber/Acknowledge", + &PubsubAcknowledgeRequest { + subscription: subscription.to_owned(), + ack_ids: vec![received.ack_id.clone()], + }, + ); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[0].detail.get("destination").unwrap(), topic); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); + assert_eq!(events[2].detail.get("payload").unwrap(), &received.ack_id); } #[test] @@ -2726,6 +3997,92 @@ mod tests { assert_eq!(events[0].detail.get("payload").unwrap(), "legacy payload"); } + fn kafka_test_request(api_key: i16, version: i16, correlation_id: i32, body: &[u8]) -> Vec { + let mut request = Vec::new(); + kafka_push_i16(&mut request, api_key); + kafka_push_i16(&mut request, version); + kafka_push_i32(&mut request, correlation_id); + kafka_push_nullable_string(&mut request, Some("nyx-test")); + if kafka_api_uses_flexible_header(api_key, version) { + kafka_push_unsigned_varint(&mut request, 0); + } + request.extend_from_slice(body); + let mut framed = Vec::new(); + kafka_push_i32(&mut framed, request.len() as i32); + framed.extend_from_slice(&request); + framed + } + + fn kafka_roundtrip(port: u16, request: Vec) -> Vec { + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + s.write_all(&request).unwrap(); + let mut len_buf = [0_u8; 4]; + s.read_exact(&mut len_buf).unwrap(); + let len = i32::from_be_bytes(len_buf) as usize; + let mut response = vec![0_u8; len]; + s.read_exact(&mut response).unwrap(); + response + } + + fn kafka_test_fetch_values(response: &[u8]) -> Vec { + let mut reader = KafkaReader::new(response); + let _correlation_id = reader.i32().unwrap(); + let _throttle_ms = reader.i32().unwrap(); + let topic_len = reader.array_len().unwrap(); + let mut values = Vec::new(); + for _ in 0..topic_len { + let _topic = reader.string().unwrap(); + let partition_len = reader.array_len().unwrap(); + for _ in 0..partition_len { + let _partition = reader.i32().unwrap(); + assert_eq!(reader.i16().unwrap(), 0); + let _high_watermark = reader.i64().unwrap(); + let message_set = reader.bytes().unwrap(); + values.extend(kafka_message_set_values(message_set)); + } + } + values + } + + fn pubsub_grpc_unary(port: u16, path: &str, message: &M) -> R + where + M: prost::Message, + R: prost::Message + Default, + { + let mut payload = Vec::new(); + message.encode(&mut payload).unwrap(); + let framed = pubsub_grpc_frame(&payload); + let response = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap() + .block_on(async move { + let stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")) + .await + .unwrap(); + let (mut client, connection) = h2::client::handshake(stream).await.unwrap(); + tokio::spawn(async move { + let _ = connection.await; + }); + let request = http::Request::builder() + .method("POST") + .uri(path) + .header("content-type", "application/grpc") + .body(()) + .unwrap(); + let (response, mut send_stream) = client.send_request(request, false).unwrap(); + send_stream + .send_data(bytes::Bytes::from(framed), true) + .unwrap(); + let response = response.await.unwrap(); + assert_eq!(response.status(), 200); + let framed_response = pubsub_grpc_read_all(response.into_body()).await; + pubsub_grpc_unframe(&framed_response).unwrap_or_default() + }); + R::decode(response.as_slice()).unwrap() + } + fn http_post(port: u16, path: &str, body: &str) -> String { let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); let req = format!( @@ -2738,20 +4095,6 @@ mod tests { out } - fn http_get(port: u16, path: &str) -> String { - let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); - let req = - format!("GET {path} HTTP/1.1\r\nhost: 127.0.0.1:{port}\r\nconnection: close\r\n\r\n"); - s.write_all(req.as_bytes()).unwrap(); - let mut out = String::new(); - s.read_to_string(&mut out).unwrap(); - out - } - - fn response_body(response: &str) -> &str { - response.split("\r\n\r\n").nth(1).unwrap_or("") - } - fn read_until(reader: &mut BufReader, needle: &str) -> String { let mut out = String::new(); while !out.contains(needle) { diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index bb38478d..bc02742c 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -577,7 +577,7 @@ fn message_handler_remaining_brokers_emit_delivery_and_ack_events() { } #[test] -fn message_handler_remaining_brokers_try_http_emulators_before_loopback() { +fn message_handler_remaining_brokers_keep_http_fallbacks_after_real_clients() { let cases = [ ( Lang::Python,