mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-27 20:29:39 +02:00
**refactor(dynamic): add Kafka HTTP emulator with publish/poll/commit support, extend endpoint rewriting and stub event recording across Java, Python, and Rust**
This commit is contained in:
parent
433036aead
commit
57d3677bd4
7 changed files with 564 additions and 40 deletions
|
|
@ -30,6 +30,7 @@ pub struct BrokerStub {
|
|||
tempdir: Option<TempDir>,
|
||||
log_path: PathBuf,
|
||||
cursor: Mutex<u64>,
|
||||
kafka_listener: Option<KafkaListener>,
|
||||
sqs_listener: Option<SqsListener>,
|
||||
}
|
||||
|
||||
|
|
@ -42,6 +43,11 @@ impl BrokerStub {
|
|||
.path()
|
||||
.join(format!("nyx_{}_stub.events.log", kind.tag()));
|
||||
std::fs::File::create(&log_path)?;
|
||||
let kafka_listener = if kind == StubKind::Kafka {
|
||||
start_kafka_listener(log_path.clone())?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let sqs_listener = if kind == StubKind::Sqs {
|
||||
start_sqs_listener(log_path.clone())?
|
||||
} else {
|
||||
|
|
@ -52,6 +58,7 @@ impl BrokerStub {
|
|||
tempdir: Some(tempdir),
|
||||
log_path,
|
||||
cursor: Mutex::new(0),
|
||||
kafka_listener,
|
||||
sqs_listener,
|
||||
})
|
||||
}
|
||||
|
|
@ -107,6 +114,9 @@ impl StubProvider for BrokerStub {
|
|||
}
|
||||
|
||||
fn endpoint(&self) -> String {
|
||||
if let Some(listener) = &self.kafka_listener {
|
||||
return format!("http://127.0.0.1:{}", listener.port);
|
||||
}
|
||||
if let Some(listener) = &self.sqs_listener {
|
||||
return format!("http://127.0.0.1:{}", listener.port);
|
||||
}
|
||||
|
|
@ -182,6 +192,10 @@ fn parse_broker_log_line(line: &str) -> (&str, &str, &str) {
|
|||
|
||||
impl Drop for BrokerStub {
|
||||
fn drop(&mut self) {
|
||||
if let Some(listener) = &self.kafka_listener {
|
||||
listener.shutdown.store(true, Ordering::Relaxed);
|
||||
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
|
||||
}
|
||||
if let Some(listener) = &self.sqs_listener {
|
||||
listener.shutdown.store(true, Ordering::Relaxed);
|
||||
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
|
||||
|
|
@ -190,6 +204,159 @@ impl Drop for BrokerStub {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct KafkaListener {
|
||||
port: u16,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct KafkaMessage {
|
||||
offset: u64,
|
||||
value: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct KafkaState {
|
||||
next_offsets: BTreeMap<String, u64>,
|
||||
topics: BTreeMap<String, VecDeque<KafkaMessage>>,
|
||||
inflight: BTreeMap<(String, u64), KafkaMessage>,
|
||||
}
|
||||
|
||||
fn start_kafka_listener(log_path: PathBuf) -> std::io::Result<Option<KafkaListener>> {
|
||||
let listener = match TcpListener::bind("127.0.0.1:0") {
|
||||
Ok(listener) => listener,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let port = listener.local_addr()?.port();
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let state = Arc::new(Mutex::new(KafkaState::default()));
|
||||
let shutdown_clone = Arc::clone(&shutdown);
|
||||
let state_clone = Arc::clone(&state);
|
||||
std::thread::spawn(move || kafka_accept_loop(listener, shutdown_clone, state_clone, log_path));
|
||||
Ok(Some(KafkaListener { port, shutdown }))
|
||||
}
|
||||
|
||||
fn kafka_accept_loop(
|
||||
listener: TcpListener,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
state: Arc<Mutex<KafkaState>>,
|
||||
log_path: PathBuf,
|
||||
) {
|
||||
for stream in listener.incoming() {
|
||||
if shutdown.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let Ok(stream) = stream else { continue };
|
||||
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
|
||||
let _ = stream.set_write_timeout(Some(Duration::from_secs(2)));
|
||||
let state = Arc::clone(&state);
|
||||
let log_path = log_path.clone();
|
||||
std::thread::spawn(move || handle_kafka_connection(stream, state, &log_path));
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_kafka_connection(mut stream: TcpStream, state: Arc<Mutex<KafkaState>>, log_path: &Path) {
|
||||
let Some(req) = read_http_request(&stream) else {
|
||||
return;
|
||||
};
|
||||
let response = match handle_kafka_request(&req, state, log_path) {
|
||||
Ok(body) => http_response_with_type(200, "OK", "application/json", &body),
|
||||
Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body),
|
||||
};
|
||||
let _ = stream.write_all(response.as_bytes());
|
||||
}
|
||||
|
||||
fn handle_kafka_request(
|
||||
req: &HttpRequest,
|
||||
state: Arc<Mutex<KafkaState>>,
|
||||
log_path: &Path,
|
||||
) -> Result<String, String> {
|
||||
let Some((topic, action)) = kafka_path_parts(&req.path) else {
|
||||
return Err(json_error("invalid kafka stub path"));
|
||||
};
|
||||
match action.as_str() {
|
||||
"messages" => {
|
||||
let mut guard = state.lock().map_err(|_| json_error("internal error"))?;
|
||||
let offset = guard.next_offsets.entry(topic.clone()).or_insert(0);
|
||||
let message = KafkaMessage {
|
||||
offset: *offset,
|
||||
value: req.body.clone(),
|
||||
};
|
||||
*offset += 1;
|
||||
guard
|
||||
.topics
|
||||
.entry(topic.clone())
|
||||
.or_default()
|
||||
.push_back(message.clone());
|
||||
let _ = append_broker_event(log_path, "publish", &topic, &message.value);
|
||||
Ok(serde_json::json!({
|
||||
"topic": topic,
|
||||
"offset": message.offset
|
||||
})
|
||||
.to_string())
|
||||
}
|
||||
"records" => {
|
||||
let params = parse_form(&req.query);
|
||||
let max_records = params
|
||||
.get("max")
|
||||
.and_then(|v| v.parse::<usize>().ok())
|
||||
.unwrap_or(1)
|
||||
.clamp(1, 100);
|
||||
let mut guard = state.lock().map_err(|_| json_error("internal error"))?;
|
||||
let mut records = Vec::new();
|
||||
for _ in 0..max_records {
|
||||
let Some(message) = guard.topics.entry(topic.clone()).or_default().pop_front()
|
||||
else {
|
||||
break;
|
||||
};
|
||||
let _ = append_broker_event(log_path, "deliver", &topic, &message.value);
|
||||
guard
|
||||
.inflight
|
||||
.insert((topic.clone(), message.offset), message.clone());
|
||||
records.push(serde_json::json!({
|
||||
"topic": topic,
|
||||
"offset": message.offset,
|
||||
"value": message.value
|
||||
}));
|
||||
}
|
||||
Ok(serde_json::json!({ "records": records }).to_string())
|
||||
}
|
||||
"commit" => {
|
||||
let params = parse_form(&req.body);
|
||||
let offset = params
|
||||
.get("offset")
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(0);
|
||||
if let Ok(mut guard) = state.lock()
|
||||
&& guard.inflight.remove(&(topic.clone(), offset)).is_some()
|
||||
{
|
||||
let _ = append_broker_event(log_path, "ack", &topic, &offset.to_string());
|
||||
}
|
||||
Ok(serde_json::json!({ "committed": true }).to_string())
|
||||
}
|
||||
_ => Err(json_error("invalid kafka stub action")),
|
||||
}
|
||||
}
|
||||
|
||||
fn kafka_path_parts(path: &str) -> Option<(String, String)> {
|
||||
let mut parts = path.trim_matches('/').split('/');
|
||||
if parts.next()? != "topics" {
|
||||
return None;
|
||||
}
|
||||
let topic = parts.next().map(percent_decode)?;
|
||||
let action = parts.next()?.to_owned();
|
||||
if topic.is_empty() || parts.next().is_some() {
|
||||
return None;
|
||||
}
|
||||
Some((topic, action))
|
||||
}
|
||||
|
||||
fn json_error(message: &str) -> String {
|
||||
serde_json::json!({ "error": message }).to_string()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SqsListener {
|
||||
port: u16,
|
||||
|
|
@ -427,8 +594,12 @@ fn handle_sqs_request(
|
|||
}
|
||||
|
||||
fn http_response(status: u16, reason: &str, body: &str) -> String {
|
||||
http_response_with_type(status, reason, "text/xml", body)
|
||||
}
|
||||
|
||||
fn http_response_with_type(status: u16, reason: &str, content_type: &str, body: &str) -> String {
|
||||
format!(
|
||||
"HTTP/1.1 {status} {reason}\r\ncontent-type: text/xml\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}",
|
||||
"HTTP/1.1 {status} {reason}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}",
|
||||
body.len()
|
||||
)
|
||||
}
|
||||
|
|
@ -544,7 +715,11 @@ mod tests {
|
|||
let dir = TempDir::new().unwrap();
|
||||
let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap();
|
||||
assert!(stub.log_path().exists());
|
||||
assert_eq!(stub.endpoint(), "loopback://kafka");
|
||||
let endpoint = stub.endpoint();
|
||||
assert!(
|
||||
endpoint == "loopback://kafka" || endpoint.starts_with("http://127.0.0.1:"),
|
||||
"Kafka endpoint should be loopback fallback or HTTP emulator, got {endpoint}"
|
||||
);
|
||||
assert_eq!(
|
||||
stub.recording_endpoint().unwrap().0,
|
||||
StubKind::Kafka.broker_log_env_var().unwrap()
|
||||
|
|
@ -580,6 +755,52 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn kafka_broker_exposes_http_emulator() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap();
|
||||
let endpoint = stub.endpoint();
|
||||
if endpoint == "loopback://kafka" {
|
||||
return;
|
||||
}
|
||||
assert!(
|
||||
endpoint.starts_with("http://127.0.0.1:"),
|
||||
"Kafka endpoint should be a host-side HTTP emulator, got {endpoint}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn kafka_http_emulator_records_publish_deliver_ack() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap();
|
||||
let endpoint = stub.endpoint();
|
||||
if endpoint == "loopback://kafka" {
|
||||
return;
|
||||
}
|
||||
let port: u16 = endpoint
|
||||
.trim_start_matches("http://127.0.0.1:")
|
||||
.parse()
|
||||
.unwrap();
|
||||
let send = http_post(port, "/topics/orders/messages", "NYX\tPAYLOAD");
|
||||
assert!(send.contains(r#""offset":0"#), "{send}");
|
||||
|
||||
let receive = http_get(port, "/topics/orders/records?max=1");
|
||||
assert!(receive.contains(r#""value":"NYX\tPAYLOAD""#), "{receive}");
|
||||
|
||||
let commit = http_post(port, "/topics/orders/commit", "offset=0");
|
||||
assert!(commit.contains(r#""committed":true"#), "{commit}");
|
||||
|
||||
let events = stub.drain_events();
|
||||
let actions: Vec<&str> = events
|
||||
.iter()
|
||||
.map(|ev| ev.detail.get("action").unwrap().as_str())
|
||||
.collect();
|
||||
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
|
||||
assert_eq!(events[0].detail.get("destination").unwrap(), "orders");
|
||||
assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD");
|
||||
assert_eq!(events[2].detail.get("payload").unwrap(), "0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sqs_query_emulator_records_publish_deliver_ack() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
|
|
@ -669,6 +890,16 @@ mod tests {
|
|||
out
|
||||
}
|
||||
|
||||
fn http_get(port: u16, path: &str) -> String {
|
||||
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
|
||||
let req =
|
||||
format!("GET {path} HTTP/1.1\r\nhost: 127.0.0.1:{port}\r\nconnection: close\r\n\r\n");
|
||||
s.write_all(req.as_bytes()).unwrap();
|
||||
let mut out = String::new();
|
||||
s.read_to_string(&mut out).unwrap();
|
||||
out
|
||||
}
|
||||
|
||||
fn form_escape(input: &str) -> String {
|
||||
let mut out = String::new();
|
||||
for b in input.bytes() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue