From 57d3677bd48992dcb701450fd9d26264a82233ba Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 27 May 2026 11:01:46 -0500 Subject: [PATCH] **refactor(dynamic): add Kafka HTTP emulator with publish/poll/commit support, extend endpoint rewriting and stub event recording across Java, Python, and Rust** --- src/dynamic/lang/java.rs | 245 +++++++++++++++++++++++++++++--- src/dynamic/lang/python.rs | 78 +++++++--- src/dynamic/lang/ruby.rs | 24 ++++ src/dynamic/sandbox/mod.rs | 14 +- src/dynamic/stubs/broker.rs | 235 +++++++++++++++++++++++++++++- tests/message_handler_corpus.rs | 7 + tests/phase21_corpus.rs | 1 + 7 files changed, 564 insertions(+), 40 deletions(-) diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index e0e4d10c..70dd1302 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3873,26 +3873,29 @@ fn emit_message_handler_harness( JavaBroker::Kafka => ( crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, format!( - r#" NyxKafkaLoopback brokerRef = new NyxKafkaLoopback(); - System.out.println({publish_marker:?} + " " + {queue:?}); - nyxRecordBrokerPublish("NYX_KAFKA_LOG", {queue:?}, payload); - brokerRef.publish({queue:?}, payload); - for (NyxKafkaRecord rec : brokerRef.poll({queue:?}, 1)) {{ - nyxRecordBrokerEvent("NYX_KAFKA_LOG", "deliver", {queue:?}, rec.value); - System.out.println("__NYX_SINK_HIT__"); - boolean success = false; - try {{ - java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); - m.setAccessible(true); - m.invoke(entryInst, rec.value); - success = true; - }} catch (Exception e) {{ - Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; - System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); - }} - if (success) {{ - brokerRef.commit(rec); - nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", {queue:?}, Long.toString(rec.offset)); + r#" if (!nyxTryRealKafkaClient({queue:?}, payload, entryInst, {handler:?}) + && !nyxTryKafkaHttp({queue:?}, payload, entryInst, {handler:?})) {{ + NyxKafkaLoopback brokerRef = new NyxKafkaLoopback(); + System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_KAFKA_LOG", {queue:?}, payload); + brokerRef.publish({queue:?}, payload); + for (NyxKafkaRecord rec : brokerRef.poll({queue:?}, 1)) {{ + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "deliver", {queue:?}, rec.value); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); + m.setAccessible(true); + m.invoke(entryInst, rec.value); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + brokerRef.commit(rec); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", {queue:?}, Long.toString(rec.offset)); + }} }} }}"#, handler = handler, @@ -3928,6 +3931,207 @@ public class NyxHarness {{ }} }} + static boolean nyxTryKafkaHttp(String topic, String payload, Object entryInst, String handler) {{ + String endpoint = System.getenv("NYX_KAFKA_ENDPOINT"); + if (endpoint == null || !(endpoint.startsWith("http://") || endpoint.startsWith("https://"))) {{ + return false; + }} + try {{ + String base = endpoint.replaceAll("/+$", ""); + String topicPath = java.net.URLEncoder.encode(topic, java.nio.charset.StandardCharsets.UTF_8); + System.out.println({kafka_publish_marker:?} + " " + topic); + nyxHttpRequest( + "POST", + base + "/topics/" + topicPath + "/messages", + payload.getBytes(java.nio.charset.StandardCharsets.UTF_8) + ); + String recordsJson = nyxHttpRequest( + "GET", + base + "/topics/" + topicPath + "/records?max=1", + new byte[0] + ); + if (recordsJson == null || !recordsJson.contains("\"records\"") || !recordsJson.contains("\"value\"")) {{ + return false; + }} + String value = nyxJsonStringField(recordsJson, "value"); + String offset = nyxJsonNumberField(recordsJson, "offset"); + if (offset == null || offset.isEmpty()) {{ + offset = "0"; + }} + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, String.class); + m.setAccessible(true); + m.invoke(entryInst, value); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + String body = "offset=" + java.net.URLEncoder.encode(offset, java.nio.charset.StandardCharsets.UTF_8); + nyxHttpRequest( + "POST", + base + "/topics/" + topicPath + "/commit", + body.getBytes(java.nio.charset.StandardCharsets.UTF_8) + ); + }} + return true; + }} catch (Throwable e) {{ + System.err.println("NYX_KAFKA_HTTP_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} + }} + + static boolean nyxTryRealKafkaClient(String topic, String payload, Object entryInst, String handler) {{ + Object consumer = null; + try {{ + Class mockConsumerClass = Class.forName("org.apache.kafka.clients.consumer.MockConsumer"); + Class resetClass = Class.forName("org.apache.kafka.clients.consumer.OffsetResetStrategy"); + Object earliest = java.lang.Enum.valueOf(resetClass.asSubclass(java.lang.Enum.class), "EARLIEST"); + consumer = mockConsumerClass.getConstructor(resetClass).newInstance(earliest); + + Class topicPartitionClass = Class.forName("org.apache.kafka.common.TopicPartition"); + Object partition = topicPartitionClass.getConstructor(String.class, int.class).newInstance(topic, 0); + java.util.List partitions = java.util.Collections.singletonList(partition); + mockConsumerClass.getMethod("subscribe", java.util.Collection.class) + .invoke(consumer, java.util.Collections.singletonList(topic)); + mockConsumerClass.getMethod("rebalance", java.util.Collection.class).invoke(consumer, partitions); + java.util.Map beginnings = new java.util.HashMap<>(); + beginnings.put(partition, Long.valueOf(0L)); + mockConsumerClass.getMethod("updateBeginningOffsets", java.util.Map.class).invoke(consumer, beginnings); + + Class recordClass = Class.forName("org.apache.kafka.clients.consumer.ConsumerRecord"); + Object record = null; + for (java.lang.reflect.Constructor ctor : recordClass.getConstructors()) {{ + if (ctor.getParameterCount() == 5) {{ + record = ctor.newInstance(topic, Integer.valueOf(0), Long.valueOf(0L), null, payload); + break; + }} + }} + if (record == null) {{ + return false; + }} + + System.out.println({kafka_publish_marker:?} + " " + topic); + nyxRecordBrokerPublish("NYX_KAFKA_LOG", topic, payload); + mockConsumerClass.getMethod("addRecord", recordClass).invoke(consumer, record); + Object records = mockConsumerClass.getMethod("poll", java.time.Duration.class) + .invoke(consumer, java.time.Duration.ofMillis(10)); + if (!(records instanceof Iterable)) {{ + return false; + }} + + boolean delivered = false; + for (Object rec : (Iterable) records) {{ + String value = String.valueOf(rec.getClass().getMethod("value").invoke(rec)); + long offset = ((Number) rec.getClass().getMethod("offset").invoke(rec)).longValue(); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "deliver", topic, value); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, String.class); + m.setAccessible(true); + m.invoke(entryInst, value); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + Class offsetClass = Class.forName("org.apache.kafka.clients.consumer.OffsetAndMetadata"); + Object metadata = offsetClass.getConstructor(long.class).newInstance(Long.valueOf(offset + 1L)); + java.util.Map commits = new java.util.HashMap<>(); + commits.put(partition, metadata); + mockConsumerClass.getMethod("commitSync", java.util.Map.class).invoke(consumer, commits); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", topic, Long.toString(offset)); + }} + delivered = true; + }} + return delivered; + }} catch (ClassNotFoundException missingKafkaClient) {{ + return false; + }} catch (Throwable e) {{ + System.err.println("NYX_REAL_KAFKA_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} finally {{ + if (consumer instanceof AutoCloseable) {{ + try {{ + ((AutoCloseable) consumer).close(); + }} catch (Exception ignored) {{ + }} + }} + }} + }} + + static String nyxHttpRequest(String method, String target, byte[] body) throws Exception {{ + java.net.HttpURLConnection conn = (java.net.HttpURLConnection) java.net.URI.create(target).toURL().openConnection(); + conn.setRequestMethod(method); + conn.setConnectTimeout(1000); + conn.setReadTimeout(2000); + if (body != null && body.length > 0) {{ + conn.setDoOutput(true); + conn.setRequestProperty("Content-Type", "application/octet-stream"); + conn.setRequestProperty("Content-Length", Integer.toString(body.length)); + try (java.io.OutputStream os = conn.getOutputStream()) {{ + os.write(body); + }} + }} + java.io.InputStream is = conn.getResponseCode() >= 400 ? conn.getErrorStream() : conn.getInputStream(); + if (is == null) {{ + return ""; + }} + try (java.io.InputStream input = is) {{ + byte[] data = input.readAllBytes(); + return new String(data, java.nio.charset.StandardCharsets.UTF_8); + }} finally {{ + conn.disconnect(); + }} + }} + + static String nyxJsonStringField(String json, String field) {{ + String needle = "\"" + field + "\":\""; + int start = json.indexOf(needle); + if (start < 0) return ""; + start += needle.length(); + StringBuilder out = new StringBuilder(); + boolean escaped = false; + for (int i = start; i < json.length(); i++) {{ + char ch = json.charAt(i); + if (escaped) {{ + switch (ch) {{ + case 'n': out.append('\n'); break; + case 'r': out.append('\r'); break; + case 't': out.append('\t'); break; + case '"': out.append('"'); break; + case '\\': out.append('\\'); break; + default: out.append(ch); break; + }} + escaped = false; + }} else if (ch == '\\') {{ + escaped = true; + }} else if (ch == '"') {{ + break; + }} else {{ + out.append(ch); + }} + }} + return out.toString(); + }} + + static String nyxJsonNumberField(String json, String field) {{ + String needle = "\"" + field + "\":"; + int start = json.indexOf(needle); + if (start < 0) return ""; + start += needle.length(); + int end = start; + while (end < json.length() && Character.isDigit(json.charAt(end))) {{ + end++; + }} + return json.substring(start, end); + }} + static boolean nyxTryRealSqs(String queue, String payload, Object entryInst, String handler) {{ String endpoint = System.getenv("NYX_SQS_ENDPOINT"); if (endpoint == null || !(endpoint.startsWith("http://") || endpoint.startsWith("https://"))) {{ @@ -4052,6 +4256,7 @@ public class NyxHarness {{ "#, entry_class = entry_class, dispatch_block = dispatch_block, + kafka_publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ); HarnessSource { diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 6c39d9e0..2386c7ab 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -1018,22 +1018,23 @@ _chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, ), PythonBroker::Kafka => format!( - r#"_loop = NyxKafkaLoopback() -def _nyx_kafka_dispatch(message): - _h = getattr(_entry_mod, {handler:?}, None) - if _h is None: - print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) - sys.exit(78) - _h(message) -_loop.subscribe({queue:?}, _nyx_kafka_dispatch) -print({publish_marker:?} + " " + {queue:?}, flush=True) -_nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) -_loop.publish({queue:?}, payload) -for _record in _loop.poll({queue:?}, max_records=1): - _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value) - _nyx_kafka_dispatch(_record.value) - _loop.commit(_record) - _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#, + r#"if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}): + _loop = NyxKafkaLoopback() + def _nyx_kafka_dispatch(message): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(message) + _loop.subscribe({queue:?}, _nyx_kafka_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) + _loop.publish({queue:?}, payload) + for _record in _loop.poll({queue:?}, max_records=1): + _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value) + _nyx_kafka_dispatch(_record.value) + _loop.commit(_record) + _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, @@ -1064,6 +1065,50 @@ def _nyx_record_broker_event(env_name, action, destination, body): def _nyx_record_broker_publish(env_name, destination, body): _nyx_record_broker_event(env_name, "publish", destination, body) +def _nyx_try_kafka_http(topic, body, handler_name): + endpoint = os.environ.get("NYX_KAFKA_ENDPOINT", "") + if not (endpoint.startswith("http://") or endpoint.startswith("https://")): + return False + _h = getattr(_entry_mod, handler_name, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + handler_name, file=sys.stderr, flush=True) + sys.exit(78) + try: + import json + import urllib.parse + import urllib.request + base = endpoint.rstrip("/") + topic_path = urllib.parse.quote(str(topic), safe="") + print({kafka_publish_marker:?} + " " + str(topic), flush=True) + _send = urllib.request.Request( + base + "/topics/" + topic_path + "/messages", + data=str(body).encode("utf-8"), + method="POST", + ) + urllib.request.urlopen(_send, timeout=2).read() + _records_raw = urllib.request.urlopen( + base + "/topics/" + topic_path + "/records?max=1", + timeout=2, + ).read() + _records = json.loads(_records_raw.decode("utf-8") or "{{}}").get("records", []) + if not _records: + return False + for _rec in _records: + _h(_rec.get("value", "")) + _offset = str(_rec.get("offset", "0")) + _commit = urllib.request.Request( + base + "/topics/" + topic_path + "/commit", + data=urllib.parse.urlencode({{"offset": _offset}}).encode("utf-8"), + method="POST", + ) + urllib.request.urlopen(_commit, timeout=2).read() + return True + except SystemExit: + raise + except Exception as _e: + print(f"NYX_KAFKA_HTTP_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return False + def _nyx_try_real_sqs(queue, body, handler_name): endpoint = os.environ.get("NYX_SQS_ENDPOINT", "") if not (endpoint.startswith("http://") or endpoint.startswith("https://")): @@ -1131,6 +1176,7 @@ except Exception as _e: pubsub_src = pubsub_src, rabbit_src = rabbit_src, register_and_publish = indent_lines(®ister_and_publish, " "), + kafka_publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ); HarnessSource { diff --git a/src/dynamic/lang/ruby.rs b/src/dynamic/lang/ruby.rs index b7511fce..c91443fc 100644 --- a/src/dynamic/lang/ruby.rs +++ b/src/dynamic/lang/ruby.rs @@ -951,10 +951,34 @@ def __nyx_try_execute_migration_sqlite(value) end end +def __nyx_patch_active_record_sql_recording + return unless defined?(ActiveRecord::Base) + return unless ActiveRecord::Base.respond_to?(:connection) + conn = ActiveRecord::Base.connection + return if conn.instance_variable_defined?(:@__nyx_sql_recording_patched) + conn.instance_variable_set(:@__nyx_sql_recording_patched, true) + if conn.respond_to?(:execute) + original_execute = conn.method(:execute) + conn.define_singleton_method(:execute) do |sql, *args, &blk| + __nyx_record_migration_result(sql, 'active_record') + original_execute.call(sql, *args, &blk) + end + end +end + # ActiveRecord migrations expose `up` / `down` / `change` on a subclass. if Object.const_defined?({handler:?}) cls = Object.const_get({handler:?}) begin + if defined?(ActiveRecord::Migration) && cls.is_a?(Class) && cls < ActiveRecord::Migration + begin + __nyx_patch_active_record_sql_recording + cls.migrate(:up) + exit 0 + rescue StandardError => e + STDERR.puts("NYX_ACTIVE_RECORD_MIGRATION_FALLBACK: #{{e.class.name}}: #{{e.message}}") + end + end inst = cls.new if inst.respond_to?(:table_name=) begin diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index e6ba234c..6283e433 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -898,8 +898,10 @@ fn rewrite_extra_env_for_container( { return (k.clone(), format!("{}/{idx}", docker::STUB_MOUNT_ROOT)); } - if matches!(k.as_str(), "NYX_HTTP_ENDPOINT" | "NYX_SQS_ENDPOINT") - && let Some(rest) = v.strip_prefix("http://127.0.0.1:") + if matches!( + k.as_str(), + "NYX_HTTP_ENDPOINT" | "NYX_KAFKA_ENDPOINT" | "NYX_SQS_ENDPOINT" + ) && let Some(rest) = v.strip_prefix("http://127.0.0.1:") { return (k.clone(), format!("http://host-gateway:{rest}")); } @@ -2276,6 +2278,10 @@ mod tests { "NYX_HTTP_ENDPOINT".to_owned(), "http://127.0.0.1:12345".to_owned(), ), + ( + "NYX_KAFKA_ENDPOINT".to_owned(), + "http://127.0.0.1:22334/topics".to_owned(), + ), ( "NYX_SQS_ENDPOINT".to_owned(), "http://127.0.0.1:23456/jobs".to_owned(), @@ -2289,6 +2295,10 @@ mod tests { "NYX_HTTP_ENDPOINT".to_owned(), "http://host-gateway:12345".to_owned(), ), + ( + "NYX_KAFKA_ENDPOINT".to_owned(), + "http://host-gateway:22334/topics".to_owned(), + ), ( "NYX_SQS_ENDPOINT".to_owned(), "http://host-gateway:23456/jobs".to_owned(), diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index 1f2c40b1..2c980c10 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -30,6 +30,7 @@ pub struct BrokerStub { tempdir: Option, log_path: PathBuf, cursor: Mutex, + kafka_listener: Option, sqs_listener: Option, } @@ -42,6 +43,11 @@ impl BrokerStub { .path() .join(format!("nyx_{}_stub.events.log", kind.tag())); std::fs::File::create(&log_path)?; + let kafka_listener = if kind == StubKind::Kafka { + start_kafka_listener(log_path.clone())? + } else { + None + }; let sqs_listener = if kind == StubKind::Sqs { start_sqs_listener(log_path.clone())? } else { @@ -52,6 +58,7 @@ impl BrokerStub { tempdir: Some(tempdir), log_path, cursor: Mutex::new(0), + kafka_listener, sqs_listener, }) } @@ -107,6 +114,9 @@ impl StubProvider for BrokerStub { } fn endpoint(&self) -> String { + if let Some(listener) = &self.kafka_listener { + return format!("http://127.0.0.1:{}", listener.port); + } if let Some(listener) = &self.sqs_listener { return format!("http://127.0.0.1:{}", listener.port); } @@ -182,6 +192,10 @@ fn parse_broker_log_line(line: &str) -> (&str, &str, &str) { impl Drop for BrokerStub { fn drop(&mut self) { + if let Some(listener) = &self.kafka_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } if let Some(listener) = &self.sqs_listener { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); @@ -190,6 +204,159 @@ impl Drop for BrokerStub { } } +#[derive(Debug)] +struct KafkaListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Clone)] +struct KafkaMessage { + offset: u64, + value: String, +} + +#[derive(Debug, Default)] +struct KafkaState { + next_offsets: BTreeMap, + topics: BTreeMap>, + inflight: BTreeMap<(String, u64), KafkaMessage>, +} + +fn start_kafka_listener(log_path: PathBuf) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(KafkaState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || kafka_accept_loop(listener, shutdown_clone, state_clone, log_path)); + Ok(Some(KafkaListener { port, shutdown })) +} + +fn kafka_accept_loop( + listener: TcpListener, + shutdown: Arc, + state: Arc>, + log_path: PathBuf, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(2))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || handle_kafka_connection(stream, state, &log_path)); + } +} + +fn handle_kafka_connection(mut stream: TcpStream, state: Arc>, log_path: &Path) { + let Some(req) = read_http_request(&stream) else { + return; + }; + let response = match handle_kafka_request(&req, state, log_path) { + Ok(body) => http_response_with_type(200, "OK", "application/json", &body), + Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body), + }; + let _ = stream.write_all(response.as_bytes()); +} + +fn handle_kafka_request( + req: &HttpRequest, + state: Arc>, + log_path: &Path, +) -> Result { + let Some((topic, action)) = kafka_path_parts(&req.path) else { + return Err(json_error("invalid kafka stub path")); + }; + match action.as_str() { + "messages" => { + let mut guard = state.lock().map_err(|_| json_error("internal error"))?; + let offset = guard.next_offsets.entry(topic.clone()).or_insert(0); + let message = KafkaMessage { + offset: *offset, + value: req.body.clone(), + }; + *offset += 1; + guard + .topics + .entry(topic.clone()) + .or_default() + .push_back(message.clone()); + let _ = append_broker_event(log_path, "publish", &topic, &message.value); + Ok(serde_json::json!({ + "topic": topic, + "offset": message.offset + }) + .to_string()) + } + "records" => { + let params = parse_form(&req.query); + let max_records = params + .get("max") + .and_then(|v| v.parse::().ok()) + .unwrap_or(1) + .clamp(1, 100); + let mut guard = state.lock().map_err(|_| json_error("internal error"))?; + let mut records = Vec::new(); + for _ in 0..max_records { + let Some(message) = guard.topics.entry(topic.clone()).or_default().pop_front() + else { + break; + }; + let _ = append_broker_event(log_path, "deliver", &topic, &message.value); + guard + .inflight + .insert((topic.clone(), message.offset), message.clone()); + records.push(serde_json::json!({ + "topic": topic, + "offset": message.offset, + "value": message.value + })); + } + Ok(serde_json::json!({ "records": records }).to_string()) + } + "commit" => { + let params = parse_form(&req.body); + let offset = params + .get("offset") + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + if let Ok(mut guard) = state.lock() + && guard.inflight.remove(&(topic.clone(), offset)).is_some() + { + let _ = append_broker_event(log_path, "ack", &topic, &offset.to_string()); + } + Ok(serde_json::json!({ "committed": true }).to_string()) + } + _ => Err(json_error("invalid kafka stub action")), + } +} + +fn kafka_path_parts(path: &str) -> Option<(String, String)> { + let mut parts = path.trim_matches('/').split('/'); + if parts.next()? != "topics" { + return None; + } + let topic = parts.next().map(percent_decode)?; + let action = parts.next()?.to_owned(); + if topic.is_empty() || parts.next().is_some() { + return None; + } + Some((topic, action)) +} + +fn json_error(message: &str) -> String { + serde_json::json!({ "error": message }).to_string() +} + #[derive(Debug)] struct SqsListener { port: u16, @@ -427,8 +594,12 @@ fn handle_sqs_request( } fn http_response(status: u16, reason: &str, body: &str) -> String { + http_response_with_type(status, reason, "text/xml", body) +} + +fn http_response_with_type(status: u16, reason: &str, content_type: &str, body: &str) -> String { format!( - "HTTP/1.1 {status} {reason}\r\ncontent-type: text/xml\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}", + "HTTP/1.1 {status} {reason}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}", body.len() ) } @@ -544,7 +715,11 @@ mod tests { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); assert!(stub.log_path().exists()); - assert_eq!(stub.endpoint(), "loopback://kafka"); + let endpoint = stub.endpoint(); + assert!( + endpoint == "loopback://kafka" || endpoint.starts_with("http://127.0.0.1:"), + "Kafka endpoint should be loopback fallback or HTTP emulator, got {endpoint}" + ); assert_eq!( stub.recording_endpoint().unwrap().0, StubKind::Kafka.broker_log_env_var().unwrap() @@ -580,6 +755,52 @@ mod tests { ); } + #[test] + fn kafka_broker_exposes_http_emulator() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://kafka" { + return; + } + assert!( + endpoint.starts_with("http://127.0.0.1:"), + "Kafka endpoint should be a host-side HTTP emulator, got {endpoint}" + ); + } + + #[test] + fn kafka_http_emulator_records_publish_deliver_ack() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://kafka" { + return; + } + let port: u16 = endpoint + .trim_start_matches("http://127.0.0.1:") + .parse() + .unwrap(); + let send = http_post(port, "/topics/orders/messages", "NYX\tPAYLOAD"); + assert!(send.contains(r#""offset":0"#), "{send}"); + + let receive = http_get(port, "/topics/orders/records?max=1"); + assert!(receive.contains(r#""value":"NYX\tPAYLOAD""#), "{receive}"); + + let commit = http_post(port, "/topics/orders/commit", "offset=0"); + assert!(commit.contains(r#""committed":true"#), "{commit}"); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[0].detail.get("destination").unwrap(), "orders"); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); + assert_eq!(events[2].detail.get("payload").unwrap(), "0"); + } + #[test] fn sqs_query_emulator_records_publish_deliver_ack() { let dir = TempDir::new().unwrap(); @@ -669,6 +890,16 @@ mod tests { out } + fn http_get(port: u16, path: &str) -> String { + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let req = + format!("GET {path} HTTP/1.1\r\nhost: 127.0.0.1:{port}\r\nconnection: close\r\n\r\n"); + s.write_all(req.as_bytes()).unwrap(); + let mut out = String::new(); + s.read_to_string(&mut out).unwrap(); + out + } + fn form_escape(input: &str) -> String { let mut out = String::new(); for b in input.bytes() { diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index 49712fe8..92102d57 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -176,6 +176,8 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { entry_file("kafka_python"), ); let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("_nyx_try_kafka_http")); + assert!(h.source.contains("NYX_KAFKA_ENDPOINT")); assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("subscribe")); assert!(h.source.contains("poll")); @@ -192,6 +194,11 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { fn message_handler_java_emits_reflective_dispatch() { let spec = make_spec(Lang::Java, "orders", "onMessage", entry_file("kafka_java")); let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryRealKafkaClient")); + assert!(h.source.contains("MockConsumer")); + assert!(h.source.contains("commitSync")); + assert!(h.source.contains("nyxTryKafkaHttp")); + assert!(h.source.contains("NYX_KAFKA_ENDPOINT")); assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("Class.forName")); assert!(h.source.contains("getDeclaredMethod")); diff --git a/tests/phase21_corpus.rs b/tests/phase21_corpus.rs index 72b506bf..9b471bf5 100644 --- a/tests/phase21_corpus.rs +++ b/tests/phase21_corpus.rs @@ -945,6 +945,7 @@ fn migration_ruby_harness_carries_sentinel_and_handler() { assert!(h.source.contains("AddIndex")); assert!(h.source.contains("__nyx_stub_sql_record")); assert!(h.source.contains("ActiveRecord::Base.establish_connection")); + assert!(h.source.contains("cls.migrate(:up)")); assert!(h.source.contains("SQLite3::Database")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); }