From d5c51c5d8a41e359d763bb5a29e1fcb1bbb61000 Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 27 May 2026 12:49:41 -0500 Subject: [PATCH] refactor(dynamic): prioritize real clients over HTTP fallbacks for Rabbit, Kafka, and Pubsub across Java, Python, Go; integrate native SDK handling and extend test coverage --- src/dynamic/lang/go.rs | 126 +++++++++++++++- src/dynamic/lang/java.rs | 211 +++++++++++++++++++++++++- src/dynamic/lang/python.rs | 253 ++++++++++++++++++++++++++++---- tests/message_handler_corpus.rs | 106 +++++++++++++ 4 files changed, 662 insertions(+), 34 deletions(-) diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 76fba69a..535e0ebe 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -2245,10 +2245,130 @@ func nyxTryRealNats(subject string, payload string, dispatcher func(interface{}) GoBroker::Pubsub => ( crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Go), crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, - "", - "", + "\t\"context\"\n\tpubsubapi \"cloud.google.com/go/pubsub\"\n", + r##" +func nyxPubsubEmulatorHost(endpoint string) string { + if host := os.Getenv("PUBSUB_EMULATOR_HOST"); host != "" { + return host + } + endpoint = strings.TrimSpace(endpoint) + for _, prefix := range []string{"grpc://", "pubsub://"} { + if strings.HasPrefix(endpoint, prefix) { + return strings.Trim(strings.TrimPrefix(endpoint, prefix), "/") + } + } + return "" +} + +func nyxPubsubID(raw string, fallback string) string { + tail := strings.TrimRight(raw, "/") + if idx := strings.LastIndex(tail, "/"); idx >= 0 { + tail = tail[idx+1:] + } + var b strings.Builder + for _, ch := range tail { + switch { + case ch >= 'a' && ch <= 'z': + b.WriteRune(ch) + case ch >= 'A' && ch <= 'Z': + b.WriteRune(ch) + case ch >= '0' && ch <= '9': + b.WriteRune(ch) + case ch == '-' || ch == '_': + b.WriteRune(ch) + default: + b.WriteByte('-') + } + if b.Len() >= 200 { + break + } + } + if b.Len() == 0 { + return fallback + } + return b.String() +} + +func nyxTryRealPubsub(subscription string, payload string, dispatcher func(interface{}), marker string) bool { + emulatorHost := nyxPubsubEmulatorHost(os.Getenv("NYX_PUBSUB_ENDPOINT")) + if emulatorHost == "" { + return false + } + oldEmulator, hadOldEmulator := os.LookupEnv("PUBSUB_EMULATOR_HOST") + if !hadOldEmulator { + _ = os.Setenv("PUBSUB_EMULATOR_HOST", emulatorHost) + defer os.Unsetenv("PUBSUB_EMULATOR_HOST") + } else { + _ = oldEmulator + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client, err := pubsubapi.NewClient(ctx, "nyx") + if err != nil { + fmt.Fprintf(os.Stderr, "NYX_REAL_PUBSUB_FALLBACK: %v\n", err) + return false + } + defer client.Close() + + subID := nyxPubsubID(subscription, "nyx-sub") + topicID := nyxPubsubID(subscription+"-topic", "nyx-topic") + if strings.Contains(subscription, "/topics/") { + topicID = subID + subID = nyxPubsubID(topicID+"-sub", "nyx-sub") + } + + topic, err := client.CreateTopic(ctx, topicID) + if err != nil { + topic = client.Topic(topicID) + } + if ok, err := topic.Exists(ctx); err == nil && !ok { + fmt.Fprintln(os.Stderr, "NYX_REAL_PUBSUB_FALLBACK: topic missing") + return false + } + + sub, err := client.CreateSubscription(ctx, subID, pubsubapi.SubscriptionConfig{Topic: topic}) + if err != nil { + sub = client.Subscription(subID) + } + fmt.Println(marker + " " + subscription) + nyxRecordBrokerPublish("NYX_PUBSUB_LOG", subscription, payload) + result := topic.Publish(ctx, &pubsubapi.Message{Data: []byte(payload)}) + if _, err := result.Get(ctx); err != nil { + fmt.Fprintf(os.Stderr, "NYX_REAL_PUBSUB_FALLBACK: %v\n", err) + return false + } + + delivered := make(chan bool, 1) + receiveCtx, receiveCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer receiveCancel() + err = sub.Receive(receiveCtx, func(ctx context.Context, msg *pubsubapi.Message) { + pubsubMsg := &NyxPubsubMessage{ID: msg.ID, Data: msg.Data} + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", subscription, string(msg.Data)) + dispatcher(pubsubMsg) + msg.Ack() + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", subscription, msg.ID) + select { + case delivered <- true: + default: + } + receiveCancel() + }) + select { + case ok := <-delivered: + return ok + default: + } + if err != nil { + fmt.Fprintf(os.Stderr, "NYX_REAL_PUBSUB_FALLBACK: %v\n", err) + } + return false +} +"##, format!( - r##" if msg, ok := nyxFetchHttpBroker("NYX_PUBSUB_ENDPOINT", "topics", "{queue}", payload, "{publish_marker}"); ok {{ + r##" if nyxTryRealPubsub("{queue}", payload, nyxDispatch, "{publish_marker}") {{ + return + }} else if msg, ok := nyxFetchHttpBroker("NYX_PUBSUB_ENDPOINT", "topics", "{queue}", payload, "{publish_marker}"); ok {{ data := msg["data"] pubsubMsg := &NyxPubsubMessage{{ID: msg["id"], Data: []byte(data)}} if pubsubMsg.ID == "" {{ diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 025ff145..8236b9b7 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3834,7 +3834,8 @@ fn emit_message_handler_harness( JavaBroker::Rabbit => ( crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, format!( - r#" if (!nyxTryRabbitHttp({queue:?}, payload, entryInst, {handler:?})) {{ + r#" if (!nyxTryRealRabbitClient({queue:?}, payload, entryInst, {handler:?}) + && !nyxTryRabbitHttp({queue:?}, payload, entryInst, {handler:?})) {{ NyxRabbitChannel chan = new NyxRabbitChannel(); chan.basicConsume({queue:?}, (mid, body) -> {{ nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", {queue:?}, body); @@ -3875,7 +3876,8 @@ fn emit_message_handler_harness( JavaBroker::Kafka => ( crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, format!( - r#" if (!nyxTryRealKafkaClient({queue:?}, payload, entryInst, {handler:?}) + r#" if (!nyxTryLiveKafkaClient({queue:?}, payload, entryInst, {handler:?}) + && !nyxTryRealKafkaClient({queue:?}, payload, entryInst, {handler:?}) && !nyxTryKafkaHttp({queue:?}, payload, entryInst, {handler:?})) {{ NyxKafkaLoopback brokerRef = new NyxKafkaLoopback(); System.out.println({publish_marker:?} + " " + {queue:?}); @@ -3986,6 +3988,102 @@ public class NyxHarness {{ }} }} + static boolean nyxTryRealRabbitClient(String queue, String payload, Object entryInst, String handler) {{ + String endpoint = System.getenv("NYX_RABBIT_ENDPOINT"); + if (endpoint == null || !(endpoint.startsWith("amqp://") || endpoint.startsWith("amqps://"))) {{ + return false; + }} + Object connection = null; + Object channel = null; + try {{ + Class factoryClass = Class.forName("com.rabbitmq.client.ConnectionFactory"); + Object factory = factoryClass.getConstructor().newInstance(); + factoryClass.getMethod("setUri", String.class).invoke(factory, endpoint); + connection = factoryClass.getMethod("newConnection").invoke(factory); + channel = connection.getClass().getMethod("createChannel").invoke(connection); + Class channelClass = Class.forName("com.rabbitmq.client.Channel"); + channelClass.getMethod( + "queueDeclare", + String.class, + boolean.class, + boolean.class, + boolean.class, + java.util.Map.class + ).invoke(channel, queue, false, false, true, null); + + Class propsClass = Class.forName("com.rabbitmq.client.AMQP$BasicProperties"); + System.out.println({rabbit_publish_marker:?} + " " + queue); + nyxRecordBrokerPublish("NYX_RABBIT_LOG", queue, payload); + channelClass.getMethod( + "basicPublish", + String.class, + String.class, + propsClass, + byte[].class + ).invoke( + channel, + "", + queue, + null, + payload.getBytes(java.nio.charset.StandardCharsets.UTF_8) + ); + + Object response = channelClass.getMethod("basicGet", String.class, boolean.class) + .invoke(channel, queue, false); + if (response == null) {{ + return false; + }} + byte[] rawBody = (byte[]) response.getClass().getMethod("getBody").invoke(response); + String body = new String(rawBody, java.nio.charset.StandardCharsets.UTF_8); + Object envelope = response.getClass().getMethod("getEnvelope").invoke(response); + long deliveryTag = ((Number) envelope.getClass().getMethod("getDeliveryTag").invoke(envelope)).longValue(); + String tag = Long.toString(deliveryTag); + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", queue, body); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, String.class, String.class); + m.setAccessible(true); + m.invoke(entryInst, tag, body); + success = true; + }} catch (NoSuchMethodException nsme) {{ + try {{ + java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod(handler, String.class); + m2.setAccessible(true); + m2.invoke(entryInst, body); + success = true; + }} catch (Exception ie) {{ + Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + channelClass.getMethod("basicAck", long.class, boolean.class) + .invoke(channel, deliveryTag, false); + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", queue, tag); + }} + return true; + }} catch (ClassNotFoundException missingRabbitClient) {{ + return false; + }} catch (Throwable e) {{ + System.err.println("NYX_REAL_RABBIT_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} finally {{ + for (Object closeable : new Object[] {{ channel, connection }}) {{ + if (closeable == null) {{ + continue; + }} + try {{ + closeable.getClass().getMethod("close").invoke(closeable); + }} catch (Exception ignored) {{ + }} + }} + }} + }} + static boolean nyxTryRabbitHttp(String queue, String payload, Object entryInst, String handler) {{ String endpoint = System.getenv("NYX_RABBIT_ENDPOINT"); if (endpoint == null || !(endpoint.startsWith("http://") || endpoint.startsWith("https://"))) {{ @@ -4051,6 +4149,115 @@ public class NyxHarness {{ }} }} + static String nyxKafkaBootstrap(String endpoint) {{ + if (endpoint == null) {{ + return ""; + }} + endpoint = endpoint.trim(); + if (endpoint.startsWith("http://") || endpoint.startsWith("https://")) {{ + return ""; + }} + if (endpoint.startsWith("kafka://")) {{ + endpoint = endpoint.substring("kafka://".length()); + }} else if (endpoint.startsWith("plaintext://")) {{ + endpoint = endpoint.substring("plaintext://".length()); + }} + while (endpoint.endsWith("/")) {{ + endpoint = endpoint.substring(0, endpoint.length() - 1); + }} + return endpoint; + }} + + static boolean nyxTryLiveKafkaClient(String topic, String payload, Object entryInst, String handler) {{ + String bootstrap = nyxKafkaBootstrap(System.getenv("NYX_KAFKA_ENDPOINT")); + if (bootstrap.isEmpty()) {{ + return false; + }} + Object producer = null; + Object consumer = null; + try {{ + Class producerClass = Class.forName("org.apache.kafka.clients.producer.KafkaProducer"); + Class producerRecordClass = Class.forName("org.apache.kafka.clients.producer.ProducerRecord"); + Class consumerClass = Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"); + + java.util.Properties producerProps = new java.util.Properties(); + producerProps.put("bootstrap.servers", bootstrap); + producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("acks", "all"); + producerProps.put("max.block.ms", "1000"); + producerProps.put("request.timeout.ms", "1000"); + producerProps.put("retries", "0"); + producer = producerClass.getConstructor(java.util.Properties.class).newInstance(producerProps); + + java.util.Properties consumerProps = new java.util.Properties(); + consumerProps.put("bootstrap.servers", bootstrap); + consumerProps.put("group.id", "nyx-" + Long.toString(System.nanoTime())); + consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put("auto.offset.reset", "earliest"); + consumerProps.put("enable.auto.commit", "false"); + consumerProps.put("request.timeout.ms", "1000"); + consumer = consumerClass.getConstructor(java.util.Properties.class).newInstance(consumerProps); + + Object record = producerRecordClass.getConstructor(String.class, Object.class) + .newInstance(topic, payload); + System.out.println({kafka_publish_marker:?} + " " + topic); + nyxRecordBrokerPublish("NYX_KAFKA_LOG", topic, payload); + Object future = producerClass.getMethod("send", producerRecordClass).invoke(producer, record); + future.getClass().getMethod("get", long.class, java.util.concurrent.TimeUnit.class) + .invoke(future, Long.valueOf(2L), java.util.concurrent.TimeUnit.SECONDS); + producerClass.getMethod("flush").invoke(producer); + + consumerClass.getMethod("subscribe", java.util.Collection.class) + .invoke(consumer, java.util.Collections.singletonList(topic)); + Object records = consumerClass.getMethod("poll", java.time.Duration.class) + .invoke(consumer, java.time.Duration.ofSeconds(2)); + if (!(records instanceof Iterable)) {{ + return false; + }} + boolean delivered = false; + for (Object rec : (Iterable) records) {{ + String value = String.valueOf(rec.getClass().getMethod("value").invoke(rec)); + long offset = ((Number) rec.getClass().getMethod("offset").invoke(rec)).longValue(); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "deliver", topic, value); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, String.class); + m.setAccessible(true); + m.invoke(entryInst, value); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success) {{ + consumerClass.getMethod("commitSync").invoke(consumer); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", topic, Long.toString(offset)); + }} + delivered = true; + break; + }} + return delivered; + }} catch (ClassNotFoundException missingKafkaClient) {{ + return false; + }} catch (Throwable e) {{ + System.err.println("NYX_LIVE_KAFKA_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} finally {{ + for (Object closeable : new Object[] {{ consumer, producer }}) {{ + if (closeable == null) {{ + continue; + }} + try {{ + closeable.getClass().getMethod("close").invoke(closeable); + }} catch (Exception ignored) {{ + }} + }} + }} + }} + static boolean nyxTryRealKafkaClient(String topic, String payload, Object entryInst, String handler) {{ Object consumer = null; try {{ diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index acd578eb..4483d7af 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -990,12 +990,13 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { if hasattr(message, "ack"): message.ack() _nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", "")) -if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch): - _loop = NyxPubsubLoopback() - _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) - print({publish_marker:?} + " " + {queue:?}, flush=True) - _nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) - _loop.publish({queue:?}, payload)"#, +if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch): + if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch): + _loop = NyxPubsubLoopback() + _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) + _loop.publish({queue:?}, payload)"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, @@ -1009,34 +1010,36 @@ if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch): _nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body) _h(ch, method, props, body) _nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", "")) -if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch): - _chan = NyxRabbitChannel() - _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) - print({publish_marker:?} + " " + {queue:?}, flush=True) - _nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) - _chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, +if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch): + if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch): + _chan = NyxRabbitChannel() + _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) + _chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, ), PythonBroker::Kafka => format!( - r#"if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}): - _loop = NyxKafkaLoopback() - def _nyx_kafka_dispatch(message): - _h = getattr(_entry_mod, {handler:?}, None) - if _h is None: - print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) - sys.exit(78) - _h(message) - _loop.subscribe({queue:?}, _nyx_kafka_dispatch) - print({publish_marker:?} + " " + {queue:?}, flush=True) - _nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) - _loop.publish({queue:?}, payload) - for _record in _loop.poll({queue:?}, max_records=1): - _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value) - _nyx_kafka_dispatch(_record.value) - _loop.commit(_record) - _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#, + r#"if not _nyx_try_real_kafka({queue:?}, payload, {handler:?}): + if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}): + _loop = NyxKafkaLoopback() + def _nyx_kafka_dispatch(message): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(message) + _loop.subscribe({queue:?}, _nyx_kafka_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) + _loop.publish({queue:?}, payload) + for _record in _loop.poll({queue:?}, max_records=1): + _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value) + _nyx_kafka_dispatch(_record.value) + _loop.commit(_record) + _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, @@ -1067,6 +1070,77 @@ def _nyx_record_broker_event(env_name, action, destination, body): def _nyx_record_broker_publish(env_name, destination, body): _nyx_record_broker_event(env_name, "publish", destination, body) +def _nyx_kafka_bootstrap(endpoint): + endpoint = (endpoint or "").strip() + if endpoint.startswith(("http://", "https://")): + return "" + for prefix in ("kafka://", "plaintext://"): + if endpoint.startswith(prefix): + endpoint = endpoint[len(prefix):] + break + return endpoint.strip("/") + +def _nyx_try_real_kafka(topic, body, handler_name): + bootstrap = _nyx_kafka_bootstrap(os.environ.get("NYX_KAFKA_ENDPOINT", "")) + if not bootstrap: + return False + try: + from kafka import KafkaConsumer, KafkaProducer + except Exception: + return False + _h = getattr(_entry_mod, handler_name, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + handler_name, file=sys.stderr, flush=True) + sys.exit(78) + _producer = None + _consumer = None + try: + _producer = KafkaProducer( + bootstrap_servers=[bootstrap], + value_serializer=lambda v: v if isinstance(v, (bytes, bytearray)) else str(v).encode("utf-8", "replace"), + request_timeout_ms=1000, + api_version_auto_timeout_ms=1000, + max_block_ms=1000, + retries=0, + ) + _consumer = KafkaConsumer( + str(topic), + bootstrap_servers=[bootstrap], + group_id="nyx-" + str(os.getpid()), + auto_offset_reset="earliest", + enable_auto_commit=False, + consumer_timeout_ms=2000, + value_deserializer=lambda v: v.decode("utf-8", "replace"), + request_timeout_ms=1000, + api_version_auto_timeout_ms=1000, + ) + print({kafka_publish_marker:?} + " " + str(topic), flush=True) + _nyx_record_broker_publish("NYX_KAFKA_LOG", topic, body) + _producer.send(str(topic), body).get(timeout=2) + _producer.flush(timeout=2) + for _record in _consumer: + _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value) + _h(_record.value) + try: + _consumer.commit() + except Exception: + pass + _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", topic, str(getattr(_record, "offset", ""))) + return True + return False + except SystemExit: + raise + except Exception as _e: + print(f"NYX_REAL_KAFKA_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return False + finally: + for _client in (_consumer, _producer): + try: + if _client is not None: + _client.close() + except Exception: + pass + def _nyx_try_kafka_http(topic, body, handler_name): endpoint = os.environ.get("NYX_KAFKA_ENDPOINT", "") if not (endpoint.startswith("http://") or endpoint.startswith("https://")): @@ -1158,6 +1232,85 @@ def _nyx_broker_http_ack(env_name, root, destination, ack_id): except Exception: pass +def _nyx_pubsub_emulator_host(endpoint): + host = os.environ.get("PUBSUB_EMULATOR_HOST", "") + if host: + return host + endpoint = (endpoint or "").strip() + for prefix in ("grpc://", "pubsub://"): + if endpoint.startswith(prefix): + return endpoint[len(prefix):].strip("/") + return "" + +def _nyx_pubsub_name(raw, fallback): + tail = str(raw or "").rstrip("/").split("/")[-1] + out = "".join(ch if (ch.isalnum() or ch in "-_") else "-" for ch in tail) + return (out or fallback)[:200] + +def _nyx_try_real_pubsub(subscription, body, dispatcher): + endpoint = os.environ.get("NYX_PUBSUB_ENDPOINT", "") + emulator_host = _nyx_pubsub_emulator_host(endpoint) + if not emulator_host: + return False + try: + from google.cloud import pubsub_v1 + except Exception: + return False + old_emulator = os.environ.get("PUBSUB_EMULATOR_HOST") + if not old_emulator: + os.environ["PUBSUB_EMULATOR_HOST"] = emulator_host + _publisher = None + _subscriber = None + try: + project = os.environ.get("NYX_PUBSUB_PROJECT", "nyx") + sub_id = _nyx_pubsub_name(subscription, "nyx-sub") + topic_id = _nyx_pubsub_name(subscription + "-topic", "nyx-topic") + if "/topics/" in str(subscription): + topic_id = sub_id + sub_id = _nyx_pubsub_name(topic_id + "-sub", "nyx-sub") + _publisher = pubsub_v1.PublisherClient() + _subscriber = pubsub_v1.SubscriberClient() + topic_path = _publisher.topic_path(project, topic_id) + subscription_path = _subscriber.subscription_path(project, sub_id) + try: + _publisher.create_topic(request={{"name": topic_path}}) + except Exception: + pass + try: + _subscriber.create_subscription(request={{"name": subscription_path, "topic": topic_path}}) + except Exception: + pass + print({pubsub_publish_marker:?} + " " + str(subscription), flush=True) + _future = _publisher.publish(topic_path, str(body).encode("utf-8", "replace")) + _future.result(timeout=2) + _response = _subscriber.pull( + request={{"subscription": subscription_path, "max_messages": 1}}, + timeout=2, + ) + if not getattr(_response, "received_messages", None): + return False + for _received in _response.received_messages: + _message = _received.message + dispatcher(NyxPubsubMessage(getattr(_message, "message_id", "nyx-real"), _message.data)) + _subscriber.acknowledge( + request={{"subscription": subscription_path, "ack_ids": [_received.ack_id]}} + ) + return True + except SystemExit: + raise + except Exception as _e: + print(f"NYX_REAL_PUBSUB_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return False + finally: + if not old_emulator: + os.environ.pop("PUBSUB_EMULATOR_HOST", None) + for _client in (_subscriber, _publisher): + try: + if _client is not None and hasattr(_client, "transport"): + _client.transport.close() + except Exception: + pass + def _nyx_try_pubsub_http(topic, body, dispatcher): messages = _nyx_broker_http_roundtrip( "NYX_PUBSUB_ENDPOINT", @@ -1180,6 +1333,48 @@ def _nyx_try_pubsub_http(topic, body, dispatcher): ) return True +def _nyx_rabbit_amqp_url(endpoint): + endpoint = (endpoint or "").strip() + if endpoint.startswith(("amqp://", "amqps://")): + return endpoint + return "" + +def _nyx_try_real_rabbit(queue, body, dispatcher): + amqp_url = _nyx_rabbit_amqp_url(os.environ.get("NYX_RABBIT_ENDPOINT", "")) + if not amqp_url: + return False + try: + import pika + except Exception: + return False + _conn = None + try: + _conn = pika.BlockingConnection(pika.URLParameters(amqp_url)) + _chan = _conn.channel() + _chan.queue_declare(queue=str(queue), durable=False, exclusive=False, auto_delete=True) + print({rabbit_publish_marker:?} + " " + str(queue), flush=True) + _nyx_record_broker_publish("NYX_RABBIT_LOG", queue, body) + _body = body if isinstance(body, (bytes, bytearray)) else str(body).encode("utf-8", "replace") + _chan.basic_publish(exchange="", routing_key=str(queue), body=_body) + _method, _props, _payload = _chan.basic_get(queue=str(queue), auto_ack=False) + if _method is None: + return False + dispatcher(_chan, _method, _props, _payload) + if getattr(_method, "delivery_tag", None): + _chan.basic_ack(_method.delivery_tag) + return True + except SystemExit: + raise + except Exception as _e: + print(f"NYX_REAL_RABBIT_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return False + finally: + try: + if _conn is not None and _conn.is_open: + _conn.close() + except Exception: + pass + def _nyx_try_rabbit_http(queue, body, dispatcher): messages = _nyx_broker_http_roundtrip( "NYX_RABBIT_ENDPOINT", diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index f47a21df..3c5f4dfc 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -176,6 +176,9 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { entry_file("kafka_python"), ); let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("_nyx_try_real_kafka")); + assert!(h.source.contains("KafkaConsumer")); + assert!(h.source.contains("KafkaProducer")); assert!(h.source.contains("_nyx_try_kafka_http")); assert!(h.source.contains("NYX_KAFKA_ENDPOINT")); assert!(h.source.contains("NyxKafkaLoopback")); @@ -188,12 +191,21 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { assert!(h.source.contains("NYX_KAFKA_LOG")); assert!(h.source.contains("_nyx_record_broker_publish")); assert!(h.source.contains("payload")); + assert!( + h.source.find("_nyx_try_real_kafka").unwrap() + < h.source.find("_nyx_try_kafka_http").unwrap(), + "kafka-python should try the real kafka-python client before HTTP fallback" + ); } #[test] fn message_handler_java_emits_reflective_dispatch() { let spec = make_spec(Lang::Java, "orders", "onMessage", entry_file("kafka_java")); let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryLiveKafkaClient")); + assert!(h.source.contains("KafkaProducer")); + assert!(h.source.contains("KafkaConsumer")); + assert!(h.source.contains("ProducerRecord")); assert!(h.source.contains("nyxTryRealKafkaClient")); assert!(h.source.contains("MockConsumer")); assert!(h.source.contains("commitSync")); @@ -208,6 +220,11 @@ fn message_handler_java_emits_reflective_dispatch() { assert!(h.source.contains("\"ack\"")); assert!(h.source.contains("NYX_KAFKA_LOG")); assert!(h.source.contains("nyxRecordBrokerPublish")); + assert!( + h.source.find("nyxTryLiveKafkaClient").unwrap() + < h.source.find("nyxTryRealKafkaClient").unwrap(), + "kafka-java should try a live Kafka client before MockConsumer" + ); } #[test] @@ -269,6 +286,95 @@ fn message_handler_java_sqs_tries_real_aws_sdk_client_first() { assert!(h.source.contains("NyxSqsLoopback")); } +#[test] +fn message_handler_python_pubsub_tries_real_client_before_fallbacks() { + let spec = make_spec_with_adapter( + Lang::Python, + "projects/p/subscriptions/s", + "callback", + entry_file("pubsub_python"), + "pubsub-python", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("_nyx_try_real_pubsub")); + assert!(h.source.contains("google.cloud")); + assert!(h.source.contains("PublisherClient")); + assert!(h.source.contains("SubscriberClient")); + assert!(h.source.contains("_nyx_try_pubsub_http")); + assert!( + h.source.find("_nyx_try_real_pubsub").unwrap() + < h.source.find("_nyx_try_pubsub_http").unwrap(), + "pubsub-python should try google-cloud-pubsub before HTTP fallback" + ); +} + +#[test] +fn message_handler_python_rabbit_tries_real_client_before_fallbacks() { + let spec = make_spec_with_adapter( + Lang::Python, + "work", + "on_message", + entry_file("rabbit_python"), + "rabbit-python", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("_nyx_try_real_rabbit")); + assert!(h.source.contains("import pika")); + assert!(h.source.contains("BlockingConnection")); + assert!(h.source.contains("basic_get")); + assert!(h.source.contains("_nyx_try_rabbit_http")); + assert!( + h.source.find("_nyx_try_real_rabbit").unwrap() + < h.source.find("_nyx_try_rabbit_http").unwrap(), + "rabbit-python should try pika before HTTP fallback" + ); +} + +#[test] +fn message_handler_java_rabbit_tries_real_client_before_fallbacks() { + let spec = make_spec_with_adapter( + Lang::Java, + "work", + "onMessage", + entry_file("rabbit_java"), + "rabbit-java", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryRealRabbitClient")); + assert!(h.source.contains("com.rabbitmq.client.ConnectionFactory")); + assert!(h.source.contains("basicPublish")); + assert!(h.source.contains("basicGet")); + assert!(h.source.contains("basicAck")); + assert!(h.source.contains("nyxTryRabbitHttp")); + assert!(h.command.iter().any(|arg| arg == ".:lib/*")); + assert!( + h.source.find("nyxTryRealRabbitClient").unwrap() + < h.source.find("nyxTryRabbitHttp").unwrap(), + "rabbit-java should try the RabbitMQ Java client before HTTP fallback" + ); +} + +#[test] +fn message_handler_go_pubsub_tries_real_client_before_fallbacks() { + let spec = make_spec_with_adapter( + Lang::Go, + "my-sub", + "OnMessage", + entry_file("pubsub_go"), + "pubsub-go", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryRealPubsub")); + assert!(h.source.contains("cloud.google.com/go/pubsub")); + assert!(h.source.contains("pubsubapi.NewClient")); + assert!(h.source.contains("CreateSubscription")); + assert!(h.source.contains("nyxFetchHttpBroker")); + assert!( + h.source.find("nyxTryRealPubsub").unwrap() < h.source.find("nyxFetchHttpBroker").unwrap(), + "pubsub-go should try the real Pub/Sub client before HTTP fallback" + ); +} + #[test] fn message_handler_go_uses_nyx_handlers_registry() { let spec = make_spec(Lang::Go, "my-sub", "OnMessage", entry_file("pubsub_go"));