diff --git a/src/dynamic/build_sandbox.rs b/src/dynamic/build_sandbox.rs index 80589d35..c509b933 100644 --- a/src/dynamic/build_sandbox.rs +++ b/src/dynamic/build_sandbox.rs @@ -338,8 +338,9 @@ fn build_cache_path( toolchain_id: &str, ) -> Result { // Respect test override. - let base = if let Ok(p) = std::env::var("NYX_BUILD_CACHE") { - PathBuf::from(p) + let override_base = std::env::var("NYX_BUILD_CACHE").ok().map(PathBuf::from); + let base = if let Some(p) = override_base.clone() { + p } else { let dirs = ProjectDirs::from("", "", "nyx").ok_or_else(|| { BuildError::Io(std::io::Error::new( @@ -352,13 +353,29 @@ fn build_cache_path( let name = format!("{lockfile_hash}-{language}-{toolchain_id}"); let path = base.join(&name); - std::fs::create_dir_all(&path)?; + match create_build_cache_dir(&path) { + Ok(()) => Ok(path), + Err(e) if override_base.is_none() && e.kind() == std::io::ErrorKind::PermissionDenied => { + let fallback = std::env::temp_dir() + .join("nyx") + .join("dynamic") + .join("build-cache") + .join(&name); + create_build_cache_dir(&fallback)?; + Ok(fallback) + } + Err(e) => Err(BuildError::Io(e)), + } +} + +fn create_build_cache_dir(path: &Path) -> std::io::Result<()> { + std::fs::create_dir_all(path)?; #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; - let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o700)); + let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o700)); } - Ok(path) + Ok(()) } // ── Ruby build sandbox ─────────────────────────────────────────────────────── diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 19803163..60897f60 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -2160,15 +2160,31 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc crate::dynamic::stubs::nats_source(crate::symbol::Lang::Go), crate::dynamic::stubs::NATS_PUBLISH_MARKER, format!( - r##" broker := NewNyxNatsLoopback() - broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{ - nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", "{queue}", string(msg.Data)) - nyxDispatch(msg) - nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", "{queue}", msg.Subject) - }}) - fmt.Println("{publish_marker} " + "{queue}") - nyxRecordBrokerPublish("NYX_NATS_LOG", "{queue}", payload) - broker.Publish("{queue}", payload)"##, + r##" if msg, ok := nyxFetchHttpBroker("NYX_NATS_ENDPOINT", "subjects", "{queue}", payload, "{publish_marker}"); ok {{ + data := msg["data"] + natsMsg := &NyxNatsMsg{{Subject: msg["subject"], Data: []byte(data), Reply: msg["reply"]}} + if natsMsg.Subject == "" {{ + natsMsg.Subject = "{queue}" + }} + nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", "{queue}", data) + nyxDispatch(natsMsg) + ackID := msg["ack_id"] + if ackID == "" {{ + ackID = natsMsg.Subject + }} + nyxAckHttpBroker("NYX_NATS_ENDPOINT", "subjects", "{queue}", ackID) + nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", "{queue}", ackID) + }} else {{ + broker := NewNyxNatsLoopback() + broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{ + nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", "{queue}", string(msg.Data)) + nyxDispatch(msg) + nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", "{queue}", msg.Subject) + }}) + fmt.Println("{publish_marker} " + "{queue}") + nyxRecordBrokerPublish("NYX_NATS_LOG", "{queue}", payload) + broker.Publish("{queue}", payload) + }}"##, queue = queue, publish_marker = crate::dynamic::stubs::NATS_PUBLISH_MARKER, ), @@ -2177,16 +2193,33 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Go), crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, format!( - r##" broker := NewNyxPubsubLoopback() - broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{ - nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", "{queue}", string(msg.Data)) - nyxDispatch(msg) - msg.Ack() - nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", "{queue}", msg.ID) - }}) - fmt.Println("{publish_marker} " + "{queue}") - nyxRecordBrokerPublish("NYX_PUBSUB_LOG", "{queue}", payload) - broker.Publish("{queue}", payload)"##, + r##" if msg, ok := nyxFetchHttpBroker("NYX_PUBSUB_ENDPOINT", "topics", "{queue}", payload, "{publish_marker}"); ok {{ + data := msg["data"] + pubsubMsg := &NyxPubsubMessage{{ID: msg["id"], Data: []byte(data)}} + if pubsubMsg.ID == "" {{ + pubsubMsg.ID = msg["ack_id"] + }} + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", "{queue}", data) + nyxDispatch(pubsubMsg) + pubsubMsg.Ack() + ackID := msg["ack_id"] + if ackID == "" {{ + ackID = pubsubMsg.ID + }} + nyxAckHttpBroker("NYX_PUBSUB_ENDPOINT", "topics", "{queue}", ackID) + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", "{queue}", ackID) + }} else {{ + broker := NewNyxPubsubLoopback() + broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{ + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", "{queue}", string(msg.Data)) + nyxDispatch(msg) + msg.Ack() + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", "{queue}", msg.ID) + }}) + fmt.Println("{publish_marker} " + "{queue}") + nyxRecordBrokerPublish("NYX_PUBSUB_LOG", "{queue}", payload) + broker.Publish("{queue}", payload) + }}"##, queue = queue, publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, ), @@ -2238,6 +2271,9 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" + "net/http" + "net/url" "os" "os/signal" "reflect" @@ -2289,6 +2325,77 @@ func nyxRecordBrokerPublish(envName string, destination string, payload string) nyxRecordBrokerEvent(envName, "publish", destination, payload) }} +func nyxFetchHttpBroker(envName string, root string, destination string, payload string, marker string) (map[string]string, bool) {{ + endpoint := os.Getenv(envName) + if !(strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://")) {{ + return nil, false + }} + client := http.Client{{Timeout: 2 * time.Second}} + base := strings.TrimRight(endpoint, "/") + escaped := url.PathEscape(destination) + fmt.Println(marker + " " + destination) + postReq, err := http.NewRequest( + "POST", + base+"/"+root+"/"+escaped+"/messages", + strings.NewReader(payload), + ) + if err != nil {{ + return nil, false + }} + postResp, err := client.Do(postReq) + if err != nil {{ + fmt.Fprintf(os.Stderr, "NYX_BROKER_HTTP_FALLBACK: %v\n", err) + return nil, false + }} + _, _ = io.Copy(io.Discard, postResp.Body) + _ = postResp.Body.Close() + if postResp.StatusCode >= 400 {{ + return nil, false + }} + getResp, err := client.Get(base + "/" + root + "/" + escaped + "/messages?max=1") + if err != nil {{ + fmt.Fprintf(os.Stderr, "NYX_BROKER_HTTP_FALLBACK: %v\n", err) + return nil, false + }} + defer getResp.Body.Close() + if getResp.StatusCode >= 400 {{ + return nil, false + }} + raw, err := io.ReadAll(getResp.Body) + if err != nil {{ + return nil, false + }} + var envelope struct {{ + Messages []map[string]string `json:"messages"` + }} + if err := json.Unmarshal(raw, &envelope); err != nil || len(envelope.Messages) == 0 {{ + return nil, false + }} + return envelope.Messages[0], true +}} + +func nyxAckHttpBroker(envName string, root string, destination string, ackID string) {{ + endpoint := os.Getenv(envName) + if !(strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://")) {{ + return + }} + client := http.Client{{Timeout: 2 * time.Second}} + base := strings.TrimRight(endpoint, "/") + escaped := url.PathEscape(destination) + values := url.Values{{}} + values.Set("ack_id", ackID) + resp, err := client.Post( + base+"/"+root+"/"+escaped+"/ack", + "application/x-www-form-urlencoded", + strings.NewReader(values.Encode()), + ) + if err != nil {{ + return + }} + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() +}} + func main() {{ __nyx_install_crash_guard("{handler}") payload := nyxPayload() diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 70dd1302..f7cd2ac6 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3834,37 +3834,39 @@ fn emit_message_handler_harness( JavaBroker::Rabbit => ( crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, format!( - r#" NyxRabbitChannel chan = new NyxRabbitChannel(); - chan.basicConsume({queue:?}, (mid, body) -> {{ - nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", {queue:?}, body); - System.out.println("__NYX_SINK_HIT__"); - boolean success = false; - try {{ - java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class); - m.setAccessible(true); - m.invoke(entryInst, mid, body); - success = true; - }} catch (NoSuchMethodException nsme) {{ + r#" if (!nyxTryRabbitHttp({queue:?}, payload, entryInst, {handler:?})) {{ + NyxRabbitChannel chan = new NyxRabbitChannel(); + chan.basicConsume({queue:?}, (mid, body) -> {{ + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", {queue:?}, body); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; try {{ - java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); - m2.setAccessible(true); - m2.invoke(entryInst, body); + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class); + m.setAccessible(true); + m.invoke(entryInst, mid, body); success = true; - }} catch (Exception ie) {{ - Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; + }} catch (NoSuchMethodException nsme) {{ + try {{ + java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); + m2.setAccessible(true); + m2.invoke(entryInst, body); + success = true; + }} catch (Exception ie) {{ + Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); }} - }} catch (Exception e) {{ - Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; - System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); - }} - if (success) {{ - nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", {queue:?}, mid); - }} - }}); - System.out.println({publish_marker:?} + " " + {queue:?}); - nyxRecordBrokerPublish("NYX_RABBIT_LOG", {queue:?}, payload); - chan.basicPublish("", {queue:?}, payload);"#, + if (success) {{ + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", {queue:?}, mid); + }} + }}); + System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_RABBIT_LOG", {queue:?}, payload); + chan.basicPublish("", {queue:?}, payload); + }}"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, @@ -3984,6 +3986,71 @@ public class NyxHarness {{ }} }} + static boolean nyxTryRabbitHttp(String queue, String payload, Object entryInst, String handler) {{ + String endpoint = System.getenv("NYX_RABBIT_ENDPOINT"); + if (endpoint == null || !(endpoint.startsWith("http://") || endpoint.startsWith("https://"))) {{ + return false; + }} + try {{ + String base = endpoint.replaceAll("/+$", ""); + String queuePath = java.net.URLEncoder.encode(queue, java.nio.charset.StandardCharsets.UTF_8); + System.out.println({rabbit_publish_marker:?} + " " + queue); + nyxHttpRequest( + "POST", + base + "/queues/" + queuePath + "/messages", + payload.getBytes(java.nio.charset.StandardCharsets.UTF_8) + ); + String messagesJson = nyxHttpRequest( + "GET", + base + "/queues/" + queuePath + "/messages?max=1", + new byte[0] + ); + if (messagesJson == null || !messagesJson.contains("\"messages\"") || !messagesJson.contains("\"body\"")) {{ + return false; + }} + String body = nyxJsonStringField(messagesJson, "body"); + String tag = nyxJsonStringField(messagesJson, "delivery_tag"); + if (tag == null || tag.isEmpty()) {{ + tag = nyxJsonStringField(messagesJson, "ack_id"); + }} + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", queue, body); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, String.class, String.class); + m.setAccessible(true); + m.invoke(entryInst, tag, body); + success = true; + }} catch (NoSuchMethodException nsme) {{ + try {{ + java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod(handler, String.class); + m2.setAccessible(true); + m2.invoke(entryInst, body); + success = true; + }} catch (Exception ie) {{ + Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + String ackBody = "ack_id=" + java.net.URLEncoder.encode(tag == null ? "" : tag, java.nio.charset.StandardCharsets.UTF_8); + nyxHttpRequest( + "POST", + base + "/queues/" + queuePath + "/ack", + ackBody.getBytes(java.nio.charset.StandardCharsets.UTF_8) + ); + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", queue, tag == null ? "" : tag); + }} + return true; + }} catch (Throwable e) {{ + System.err.println("NYX_RABBIT_HTTP_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} + }} + static boolean nyxTryRealKafkaClient(String topic, String payload, Object entryInst, String handler) {{ Object consumer = null; try {{ @@ -4257,6 +4324,7 @@ public class NyxHarness {{ entry_class = entry_class, dispatch_block = dispatch_block, kafka_publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, + rabbit_publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ); HarnessSource { diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 2386c7ab..71e2fe0f 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -980,8 +980,7 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ), PythonBroker::Pubsub => format!( - r#"_loop = NyxPubsubLoopback() -def _nyx_pubsub_dispatch(message): + r#"def _nyx_pubsub_dispatch(message): _h = getattr(_entry_mod, {handler:?}, None) if _h is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) @@ -991,17 +990,18 @@ def _nyx_pubsub_dispatch(message): if hasattr(message, "ack"): message.ack() _nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", "")) -_loop.subscribe({queue:?}, _nyx_pubsub_dispatch) -print({publish_marker:?} + " " + {queue:?}, flush=True) -_nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) -_loop.publish({queue:?}, payload)"#, +if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch): + _loop = NyxPubsubLoopback() + _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) + _loop.publish({queue:?}, payload)"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, ), PythonBroker::Rabbit => format!( - r#"_chan = NyxRabbitChannel() -def _nyx_rabbit_dispatch(ch, method, props, body): + r#"def _nyx_rabbit_dispatch(ch, method, props, body): _h = getattr(_entry_mod, {handler:?}, None) if _h is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) @@ -1009,10 +1009,12 @@ def _nyx_rabbit_dispatch(ch, method, props, body): _nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body) _h(ch, method, props, body) _nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", "")) -_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) -print({publish_marker:?} + " " + {queue:?}, flush=True) -_nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) -_chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, +if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch): + _chan = NyxRabbitChannel() + _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) + _chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, @@ -1109,6 +1111,96 @@ def _nyx_try_kafka_http(topic, body, handler_name): print(f"NYX_KAFKA_HTTP_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) return False +def _nyx_broker_http_roundtrip(env_name, root, destination, body, marker): + endpoint = os.environ.get(env_name, "") + if not (endpoint.startswith("http://") or endpoint.startswith("https://")): + return None + try: + import json + import urllib.parse + import urllib.request + base = endpoint.rstrip("/") + dest_path = urllib.parse.quote(str(destination), safe="") + print(marker + " " + str(destination), flush=True) + _send = urllib.request.Request( + base + "/" + root + "/" + dest_path + "/messages", + data=str(body).encode("utf-8"), + method="POST", + ) + urllib.request.urlopen(_send, timeout=2).read() + _raw = urllib.request.urlopen( + base + "/" + root + "/" + dest_path + "/messages?max=1", + timeout=2, + ).read() + return json.loads(_raw.decode("utf-8") or "{{}}").get("messages", []) + except SystemExit: + raise + except Exception as _e: + print(f"NYX_BROKER_HTTP_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return None + +def _nyx_broker_http_ack(env_name, root, destination, ack_id): + endpoint = os.environ.get(env_name, "") + if not (endpoint.startswith("http://") or endpoint.startswith("https://")): + return + try: + import urllib.parse + import urllib.request + base = endpoint.rstrip("/") + dest_path = urllib.parse.quote(str(destination), safe="") + body = urllib.parse.urlencode({{"ack_id": str(ack_id)}}).encode("utf-8") + _ack = urllib.request.Request( + base + "/" + root + "/" + dest_path + "/ack", + data=body, + method="POST", + ) + urllib.request.urlopen(_ack, timeout=2).read() + except Exception: + pass + +def _nyx_try_pubsub_http(topic, body, dispatcher): + messages = _nyx_broker_http_roundtrip( + "NYX_PUBSUB_ENDPOINT", + "topics", + topic, + body, + {pubsub_publish_marker:?}, + ) + if not messages: + return False + for _msg in messages: + _data = _msg.get("data", "") + _mid = _msg.get("id", "") or _msg.get("ack_id", "") + dispatcher(NyxPubsubMessage(_mid or "nyx-http", _data)) + _nyx_broker_http_ack( + "NYX_PUBSUB_ENDPOINT", + "topics", + topic, + _msg.get("ack_id", _mid), + ) + return True + +def _nyx_try_rabbit_http(queue, body, dispatcher): + messages = _nyx_broker_http_roundtrip( + "NYX_RABBIT_ENDPOINT", + "queues", + queue, + body, + {rabbit_publish_marker:?}, + ) + if not messages: + return False + _chan = NyxRabbitChannel() + for _msg in messages: + _tag = _msg.get("delivery_tag", "") or _msg.get("ack_id", "") + _body = _msg.get("body", "") + _method = NyxRabbitMethod(_tag or "nyx-http", queue) + _props = NyxRabbitProperties(_tag or "nyx-http") + _body_bytes = _body if isinstance(_body, (bytes, bytearray)) else str(_body).encode("utf-8", "replace") + dispatcher(_chan, _method, _props, _body_bytes) + _nyx_broker_http_ack("NYX_RABBIT_ENDPOINT", "queues", queue, _tag) + return True + def _nyx_try_real_sqs(queue, body, handler_name): endpoint = os.environ.get("NYX_SQS_ENDPOINT", "") if not (endpoint.startswith("http://") or endpoint.startswith("https://")): @@ -1178,6 +1270,8 @@ except Exception as _e: register_and_publish = indent_lines(®ister_and_publish, " "), kafka_publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, + pubsub_publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, + rabbit_publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, ); HarnessSource { source: format!("{preamble}\n{body}\n{postamble}"), diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index 6283e433..d5836dde 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -900,7 +900,12 @@ fn rewrite_extra_env_for_container( } if matches!( k.as_str(), - "NYX_HTTP_ENDPOINT" | "NYX_KAFKA_ENDPOINT" | "NYX_SQS_ENDPOINT" + "NYX_HTTP_ENDPOINT" + | "NYX_KAFKA_ENDPOINT" + | "NYX_SQS_ENDPOINT" + | "NYX_PUBSUB_ENDPOINT" + | "NYX_RABBIT_ENDPOINT" + | "NYX_NATS_ENDPOINT" ) && let Some(rest) = v.strip_prefix("http://127.0.0.1:") { return (k.clone(), format!("http://host-gateway:{rest}")); @@ -2286,6 +2291,18 @@ mod tests { "NYX_SQS_ENDPOINT".to_owned(), "http://127.0.0.1:23456/jobs".to_owned(), ), + ( + "NYX_PUBSUB_ENDPOINT".to_owned(), + "http://127.0.0.1:34567/topics".to_owned(), + ), + ( + "NYX_RABBIT_ENDPOINT".to_owned(), + "http://127.0.0.1:45678/queues".to_owned(), + ), + ( + "NYX_NATS_ENDPOINT".to_owned(), + "http://127.0.0.1:56789/subjects".to_owned(), + ), ]; let out = rewrite_extra_env_for_container(&extra, &[]); assert_eq!( @@ -2303,6 +2320,18 @@ mod tests { "NYX_SQS_ENDPOINT".to_owned(), "http://host-gateway:23456/jobs".to_owned(), ), + ( + "NYX_PUBSUB_ENDPOINT".to_owned(), + "http://host-gateway:34567/topics".to_owned(), + ), + ( + "NYX_RABBIT_ENDPOINT".to_owned(), + "http://host-gateway:45678/queues".to_owned(), + ), + ( + "NYX_NATS_ENDPOINT".to_owned(), + "http://host-gateway:56789/subjects".to_owned(), + ), ] ); } diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index 2c980c10..aa52631e 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -32,6 +32,7 @@ pub struct BrokerStub { cursor: Mutex, kafka_listener: Option, sqs_listener: Option, + http_listener: Option, } impl BrokerStub { @@ -53,6 +54,12 @@ impl BrokerStub { } else { None }; + let http_listener = if matches!(kind, StubKind::Pubsub | StubKind::Rabbit | StubKind::Nats) + { + start_http_broker_listener(kind, log_path.clone())? + } else { + None + }; Ok(Self { kind, tempdir: Some(tempdir), @@ -60,6 +67,7 @@ impl BrokerStub { cursor: Mutex::new(0), kafka_listener, sqs_listener, + http_listener, }) } @@ -120,6 +128,9 @@ impl StubProvider for BrokerStub { if let Some(listener) = &self.sqs_listener { return format!("http://127.0.0.1:{}", listener.port); } + if let Some(listener) = &self.http_listener { + return format!("http://127.0.0.1:{}", listener.port); + } format!("loopback://{}", self.kind.tag()) } @@ -200,6 +211,10 @@ impl Drop for BrokerStub { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); } + if let Some(listener) = &self.http_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } self.tempdir.take(); } } @@ -425,6 +440,7 @@ fn handle_sqs_connection(mut stream: TcpStream, state: Arc>, log #[derive(Debug)] struct HttpRequest { + method: String, path: String, query: String, body: String, @@ -437,7 +453,7 @@ fn read_http_request(stream: &TcpStream) -> Option { return None; } let mut parts = request_line.split_whitespace(); - let _method = parts.next()?; + let method = parts.next()?.to_owned(); let target = parts.next()?.to_owned(); let (path, query) = split_target(&target); @@ -463,12 +479,202 @@ fn read_http_request(stream: &TcpStream) -> Option { reader.read_exact(&mut body).ok()?; } Some(HttpRequest { + method, path, query, body: String::from_utf8_lossy(&body).into_owned(), }) } +#[derive(Debug)] +struct HttpBrokerListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Clone)] +struct HttpBrokerMessage { + id: String, + payload: String, +} + +#[derive(Debug, Default)] +struct HttpBrokerState { + next_id: u64, + streams: BTreeMap>, + inflight: BTreeMap, +} + +fn start_http_broker_listener( + kind: StubKind, + log_path: PathBuf, +) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(HttpBrokerState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || { + http_broker_accept_loop(listener, shutdown_clone, kind, state_clone, log_path) + }); + Ok(Some(HttpBrokerListener { port, shutdown })) +} + +fn http_broker_accept_loop( + listener: TcpListener, + shutdown: Arc, + kind: StubKind, + state: Arc>, + log_path: PathBuf, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(2))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || handle_http_broker_connection(stream, kind, state, &log_path)); + } +} + +fn handle_http_broker_connection( + mut stream: TcpStream, + kind: StubKind, + state: Arc>, + log_path: &Path, +) { + let Some(req) = read_http_request(&stream) else { + return; + }; + let response = match handle_http_broker_request(kind, &req, state, log_path) { + Ok(body) => http_response_with_type(200, "OK", "application/json", &body), + Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body), + }; + let _ = stream.write_all(response.as_bytes()); +} + +fn handle_http_broker_request( + kind: StubKind, + req: &HttpRequest, + state: Arc>, + log_path: &Path, +) -> Result { + let Some((destination, action)) = http_broker_path_parts(kind, &req.path) else { + return Err(json_error("invalid broker stub path")); + }; + match action.as_str() { + "messages" if req.method.eq_ignore_ascii_case("GET") => { + let params = parse_form(&req.query); + let max_messages = params + .get("max") + .and_then(|v| v.parse::().ok()) + .unwrap_or(1) + .clamp(1, 100); + let mut guard = state.lock().map_err(|_| json_error("internal error"))?; + let mut messages = Vec::new(); + for _ in 0..max_messages { + let Some(message) = guard + .streams + .entry(destination.clone()) + .or_default() + .pop_front() + else { + break; + }; + let _ = append_broker_event(log_path, "deliver", &destination, &message.payload); + guard + .inflight + .insert(message.id.clone(), (destination.clone(), message.clone())); + messages.push(http_broker_message_json(kind, &destination, &message)); + } + Ok(serde_json::json!({ "messages": messages }).to_string()) + } + "messages" => { + let mut guard = state.lock().map_err(|_| json_error("internal error"))?; + guard.next_id += 1; + let id = format!("nyx-{:08}", guard.next_id); + let message = HttpBrokerMessage { + id: id.clone(), + payload: req.body.clone(), + }; + guard + .streams + .entry(destination.clone()) + .or_default() + .push_back(message); + let _ = append_broker_event(log_path, "publish", &destination, &req.body); + Ok(serde_json::json!({ "id": id }).to_string()) + } + "ack" => { + let params = parse_form(&req.body); + let ack_id = params + .get("ack_id") + .or_else(|| params.get("id")) + .cloned() + .unwrap_or_default(); + if let Ok(mut guard) = state.lock() + && (ack_id.is_empty() || guard.inflight.remove(&ack_id).is_some()) + { + let _ = append_broker_event(log_path, "ack", &destination, &ack_id); + } + Ok(serde_json::json!({ "acked": true }).to_string()) + } + _ => Err(json_error("invalid broker stub action")), + } +} + +fn http_broker_path_parts(kind: StubKind, path: &str) -> Option<(String, String)> { + let expected_root = match kind { + StubKind::Pubsub => "topics", + StubKind::Rabbit => "queues", + StubKind::Nats => "subjects", + _ => return None, + }; + let mut parts = path.trim_matches('/').split('/'); + if parts.next()? != expected_root { + return None; + } + let destination = parts.next().map(percent_decode)?; + let action = parts.next()?.to_owned(); + if destination.is_empty() || parts.next().is_some() { + return None; + } + Some((destination, action)) +} + +fn http_broker_message_json( + kind: StubKind, + destination: &str, + message: &HttpBrokerMessage, +) -> serde_json::Value { + match kind { + StubKind::Pubsub => serde_json::json!({ + "id": &message.id, + "ack_id": &message.id, + "data": &message.payload + }), + StubKind::Rabbit => serde_json::json!({ + "delivery_tag": &message.id, + "body": &message.payload + }), + StubKind::Nats => serde_json::json!({ + "subject": destination, + "ack_id": &message.id, + "data": &message.payload, + "reply": "" + }), + _ => serde_json::json!({}), + } +} + fn split_target(target: &str) -> (String, String) { let (path, query) = target.split_once('?').unwrap_or((target, "")); (path.to_owned(), query.to_owned()) @@ -769,6 +975,22 @@ mod tests { ); } + #[test] + fn remaining_brokers_expose_http_emulators() { + for kind in [StubKind::Pubsub, StubKind::Rabbit, StubKind::Nats] { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(kind, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == format!("loopback://{}", kind.tag()) { + continue; + } + assert!( + endpoint.starts_with("http://127.0.0.1:"), + "{kind:?} endpoint should be a host-side HTTP emulator, got {endpoint}" + ); + } + } + #[test] fn kafka_http_emulator_records_publish_deliver_ack() { let dir = TempDir::new().unwrap(); @@ -853,6 +1075,69 @@ mod tests { assert_eq!(events[2].detail.get("payload").unwrap(), &receipt); } + #[test] + fn remaining_http_broker_emulators_record_publish_deliver_ack() { + let cases = [ + (StubKind::Pubsub, "topics", "projects/p/topics/orders"), + (StubKind::Rabbit, "queues", "work"), + (StubKind::Nats, "subjects", "events"), + ]; + for (kind, root, destination) in cases { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(kind, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == format!("loopback://{}", kind.tag()) { + continue; + } + let port: u16 = endpoint + .trim_start_matches("http://127.0.0.1:") + .parse() + .unwrap(); + let escaped_destination = form_escape(destination); + let send = http_post( + port, + &format!("/{root}/{escaped_destination}/messages"), + "NYX\tPAYLOAD", + ); + assert!(send.contains(r#""id":"nyx-00000001""#), "{send}"); + + let receive = http_get( + port, + &format!("/{root}/{escaped_destination}/messages?max=1"), + ); + let parsed: serde_json::Value = serde_json::from_str(response_body(&receive)).unwrap(); + let message = parsed["messages"][0].as_object().unwrap(); + let payload = message + .get("data") + .or_else(|| message.get("body")) + .and_then(|v| v.as_str()) + .unwrap(); + assert_eq!(payload, "NYX\tPAYLOAD"); + let ack_id = message + .get("ack_id") + .or_else(|| message.get("delivery_tag")) + .and_then(|v| v.as_str()) + .unwrap(); + + let ack = http_post( + port, + &format!("/{root}/{escaped_destination}/ack"), + &format!("ack_id={}", form_escape(ack_id)), + ); + assert!(ack.contains(r#""acked":true"#), "{ack}"); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"], "{kind:?}"); + assert_eq!(events[0].detail.get("destination").unwrap(), destination); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); + assert_eq!(events[2].detail.get("payload").unwrap(), ack_id); + } + } + #[test] fn broker_drain_understands_delivery_and_ack_events() { let dir = TempDir::new().unwrap(); @@ -900,6 +1185,10 @@ mod tests { out } + fn response_body(response: &str) -> &str { + response.split("\r\n\r\n").nth(1).unwrap_or("") + } + fn form_escape(input: &str) -> String { let mut out = String::new(); for b in input.bytes() { diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index 92102d57..d63a9d14 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -322,6 +322,11 @@ fn message_handler_remaining_brokers_emit_delivery_and_ack_events() { h.source.contains(log_env), "{adapter} harness must write the broker log env var", ); + let endpoint_env = log_env.replace("_LOG", "_ENDPOINT"); + assert!( + h.source.contains(&endpoint_env), + "{adapter} harness must try the host-side broker endpoint {endpoint_env}", + ); assert!( h.source.contains("\"deliver\"") || h.source.contains("'deliver'"), "{adapter} harness must record delivery events: {}", @@ -335,6 +340,61 @@ fn message_handler_remaining_brokers_emit_delivery_and_ack_events() { } } +#[test] +fn message_handler_remaining_brokers_try_http_emulators_before_loopback() { + let cases = [ + ( + Lang::Python, + "pubsub_python", + "projects/p/subscriptions/s", + "callback", + "pubsub-python", + "_nyx_try_pubsub_http", + ), + ( + Lang::Python, + "rabbit_python", + "work", + "on_message", + "rabbit-python", + "_nyx_try_rabbit_http", + ), + ( + Lang::Java, + "rabbit_java", + "work", + "onMessage", + "rabbit-java", + "nyxTryRabbitHttp", + ), + ( + Lang::Go, + "pubsub_go", + "my-sub", + "OnMessage", + "pubsub-go", + "nyxFetchHttpBroker", + ), + ( + Lang::Go, + "nats_go", + "events", + "OnMessage", + "nats-go", + "nyxFetchHttpBroker", + ), + ]; + for (lang, fixture, queue, handler, adapter, helper) in cases { + let spec = make_spec_with_adapter(lang, queue, handler, entry_file(fixture), adapter); + let h = lang::emit(&spec).expect("emit ok"); + assert!( + h.source.contains(helper), + "{adapter} harness should call {helper}: {}", + h.source + ); + } +} + // ── Framework-adapter assertions ────────────────────────────────────────────── fn ts_language_for(lang: Lang) -> tree_sitter::Language {