refactor(dynamic): prioritize real clients over HTTP fallbacks for Rabbit, Kafka, and Pubsub across Java, Python, Go; integrate native SDK handling and extend test coverage

This commit is contained in:
elipeter 2026-05-27 12:49:41 -05:00
parent 8eeb9590b4
commit d5c51c5d8a
4 changed files with 662 additions and 34 deletions

View file

@ -990,12 +990,13 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource {
if hasattr(message, "ack"):
message.ack()
_nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", ""))
if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch):
_loop = NyxPubsubLoopback()
_loop.subscribe({queue:?}, _nyx_pubsub_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload)
_loop.publish({queue:?}, payload)"#,
if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch):
if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch):
_loop = NyxPubsubLoopback()
_loop.subscribe({queue:?}, _nyx_pubsub_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload)
_loop.publish({queue:?}, payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER,
@ -1009,34 +1010,36 @@ if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch):
_nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body)
_h(ch, method, props, body)
_nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", ""))
if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch):
_chan = NyxRabbitChannel()
_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload)
_chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#,
if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch):
if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch):
_chan = NyxRabbitChannel()
_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload)
_chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER,
),
PythonBroker::Kafka => format!(
r#"if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}):
_loop = NyxKafkaLoopback()
def _nyx_kafka_dispatch(message):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(message)
_loop.subscribe({queue:?}, _nyx_kafka_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload)
_loop.publish({queue:?}, payload)
for _record in _loop.poll({queue:?}, max_records=1):
_nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value)
_nyx_kafka_dispatch(_record.value)
_loop.commit(_record)
_nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#,
r#"if not _nyx_try_real_kafka({queue:?}, payload, {handler:?}):
if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}):
_loop = NyxKafkaLoopback()
def _nyx_kafka_dispatch(message):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(message)
_loop.subscribe({queue:?}, _nyx_kafka_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload)
_loop.publish({queue:?}, payload)
for _record in _loop.poll({queue:?}, max_records=1):
_nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value)
_nyx_kafka_dispatch(_record.value)
_loop.commit(_record)
_nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER,
@ -1067,6 +1070,77 @@ def _nyx_record_broker_event(env_name, action, destination, body):
def _nyx_record_broker_publish(env_name, destination, body):
_nyx_record_broker_event(env_name, "publish", destination, body)
def _nyx_kafka_bootstrap(endpoint):
endpoint = (endpoint or "").strip()
if endpoint.startswith(("http://", "https://")):
return ""
for prefix in ("kafka://", "plaintext://"):
if endpoint.startswith(prefix):
endpoint = endpoint[len(prefix):]
break
return endpoint.strip("/")
def _nyx_try_real_kafka(topic, body, handler_name):
bootstrap = _nyx_kafka_bootstrap(os.environ.get("NYX_KAFKA_ENDPOINT", ""))
if not bootstrap:
return False
try:
from kafka import KafkaConsumer, KafkaProducer
except Exception:
return False
_h = getattr(_entry_mod, handler_name, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + handler_name, file=sys.stderr, flush=True)
sys.exit(78)
_producer = None
_consumer = None
try:
_producer = KafkaProducer(
bootstrap_servers=[bootstrap],
value_serializer=lambda v: v if isinstance(v, (bytes, bytearray)) else str(v).encode("utf-8", "replace"),
request_timeout_ms=1000,
api_version_auto_timeout_ms=1000,
max_block_ms=1000,
retries=0,
)
_consumer = KafkaConsumer(
str(topic),
bootstrap_servers=[bootstrap],
group_id="nyx-" + str(os.getpid()),
auto_offset_reset="earliest",
enable_auto_commit=False,
consumer_timeout_ms=2000,
value_deserializer=lambda v: v.decode("utf-8", "replace"),
request_timeout_ms=1000,
api_version_auto_timeout_ms=1000,
)
print({kafka_publish_marker:?} + " " + str(topic), flush=True)
_nyx_record_broker_publish("NYX_KAFKA_LOG", topic, body)
_producer.send(str(topic), body).get(timeout=2)
_producer.flush(timeout=2)
for _record in _consumer:
_nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value)
_h(_record.value)
try:
_consumer.commit()
except Exception:
pass
_nyx_record_broker_event("NYX_KAFKA_LOG", "ack", topic, str(getattr(_record, "offset", "")))
return True
return False
except SystemExit:
raise
except Exception as _e:
print(f"NYX_REAL_KAFKA_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True)
return False
finally:
for _client in (_consumer, _producer):
try:
if _client is not None:
_client.close()
except Exception:
pass
def _nyx_try_kafka_http(topic, body, handler_name):
endpoint = os.environ.get("NYX_KAFKA_ENDPOINT", "")
if not (endpoint.startswith("http://") or endpoint.startswith("https://")):
@ -1158,6 +1232,85 @@ def _nyx_broker_http_ack(env_name, root, destination, ack_id):
except Exception:
pass
def _nyx_pubsub_emulator_host(endpoint):
host = os.environ.get("PUBSUB_EMULATOR_HOST", "")
if host:
return host
endpoint = (endpoint or "").strip()
for prefix in ("grpc://", "pubsub://"):
if endpoint.startswith(prefix):
return endpoint[len(prefix):].strip("/")
return ""
def _nyx_pubsub_name(raw, fallback):
tail = str(raw or "").rstrip("/").split("/")[-1]
out = "".join(ch if (ch.isalnum() or ch in "-_") else "-" for ch in tail)
return (out or fallback)[:200]
def _nyx_try_real_pubsub(subscription, body, dispatcher):
endpoint = os.environ.get("NYX_PUBSUB_ENDPOINT", "")
emulator_host = _nyx_pubsub_emulator_host(endpoint)
if not emulator_host:
return False
try:
from google.cloud import pubsub_v1
except Exception:
return False
old_emulator = os.environ.get("PUBSUB_EMULATOR_HOST")
if not old_emulator:
os.environ["PUBSUB_EMULATOR_HOST"] = emulator_host
_publisher = None
_subscriber = None
try:
project = os.environ.get("NYX_PUBSUB_PROJECT", "nyx")
sub_id = _nyx_pubsub_name(subscription, "nyx-sub")
topic_id = _nyx_pubsub_name(subscription + "-topic", "nyx-topic")
if "/topics/" in str(subscription):
topic_id = sub_id
sub_id = _nyx_pubsub_name(topic_id + "-sub", "nyx-sub")
_publisher = pubsub_v1.PublisherClient()
_subscriber = pubsub_v1.SubscriberClient()
topic_path = _publisher.topic_path(project, topic_id)
subscription_path = _subscriber.subscription_path(project, sub_id)
try:
_publisher.create_topic(request={{"name": topic_path}})
except Exception:
pass
try:
_subscriber.create_subscription(request={{"name": subscription_path, "topic": topic_path}})
except Exception:
pass
print({pubsub_publish_marker:?} + " " + str(subscription), flush=True)
_future = _publisher.publish(topic_path, str(body).encode("utf-8", "replace"))
_future.result(timeout=2)
_response = _subscriber.pull(
request={{"subscription": subscription_path, "max_messages": 1}},
timeout=2,
)
if not getattr(_response, "received_messages", None):
return False
for _received in _response.received_messages:
_message = _received.message
dispatcher(NyxPubsubMessage(getattr(_message, "message_id", "nyx-real"), _message.data))
_subscriber.acknowledge(
request={{"subscription": subscription_path, "ack_ids": [_received.ack_id]}}
)
return True
except SystemExit:
raise
except Exception as _e:
print(f"NYX_REAL_PUBSUB_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True)
return False
finally:
if not old_emulator:
os.environ.pop("PUBSUB_EMULATOR_HOST", None)
for _client in (_subscriber, _publisher):
try:
if _client is not None and hasattr(_client, "transport"):
_client.transport.close()
except Exception:
pass
def _nyx_try_pubsub_http(topic, body, dispatcher):
messages = _nyx_broker_http_roundtrip(
"NYX_PUBSUB_ENDPOINT",
@ -1180,6 +1333,48 @@ def _nyx_try_pubsub_http(topic, body, dispatcher):
)
return True
def _nyx_rabbit_amqp_url(endpoint):
endpoint = (endpoint or "").strip()
if endpoint.startswith(("amqp://", "amqps://")):
return endpoint
return ""
def _nyx_try_real_rabbit(queue, body, dispatcher):
amqp_url = _nyx_rabbit_amqp_url(os.environ.get("NYX_RABBIT_ENDPOINT", ""))
if not amqp_url:
return False
try:
import pika
except Exception:
return False
_conn = None
try:
_conn = pika.BlockingConnection(pika.URLParameters(amqp_url))
_chan = _conn.channel()
_chan.queue_declare(queue=str(queue), durable=False, exclusive=False, auto_delete=True)
print({rabbit_publish_marker:?} + " " + str(queue), flush=True)
_nyx_record_broker_publish("NYX_RABBIT_LOG", queue, body)
_body = body if isinstance(body, (bytes, bytearray)) else str(body).encode("utf-8", "replace")
_chan.basic_publish(exchange="", routing_key=str(queue), body=_body)
_method, _props, _payload = _chan.basic_get(queue=str(queue), auto_ack=False)
if _method is None:
return False
dispatcher(_chan, _method, _props, _payload)
if getattr(_method, "delivery_tag", None):
_chan.basic_ack(_method.delivery_tag)
return True
except SystemExit:
raise
except Exception as _e:
print(f"NYX_REAL_RABBIT_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True)
return False
finally:
try:
if _conn is not None and _conn.is_open:
_conn.close()
except Exception:
pass
def _nyx_try_rabbit_http(queue, body, dispatcher):
messages = _nyx_broker_http_roundtrip(
"NYX_RABBIT_ENDPOINT",