From 3edb17e60b9249b0fd49a08f95901b2aa8e90db9 Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 3 Jun 2026 23:26:31 -0500 Subject: [PATCH] fix linux java --- src/dynamic/lang/python.rs | 45 ++++++++++++++++++++++++++++++++------ src/dynamic/runner.rs | 18 +++++++++++++++ src/resolve/tests.rs | 24 +++++++++++++++----- 3 files changed, 75 insertions(+), 12 deletions(-) diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 3e9d8ff2..887b3822 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -958,7 +958,7 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { let register_and_publish = match broker { PythonBroker::Sqs => format!( - r#"if not _nyx_try_real_sqs({queue:?}, payload, {handler:?}): + r#"if not _nyx_call_bounded(lambda: _nyx_try_real_sqs({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE): _loop = NyxSqsLoopback() def _nyx_sqs_dispatch(envelope): _h = getattr(_entry_mod, {handler:?}, None) @@ -990,8 +990,8 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { if hasattr(message, "ack"): message.ack() _nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", "")) -if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch): - if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch): +if not _nyx_call_bounded(lambda: _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch), _NYX_LIVE_BROKER_DEADLINE): + if not _nyx_call_bounded(lambda: _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch), _NYX_LIVE_BROKER_DEADLINE): _loop = NyxPubsubLoopback() _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) @@ -1010,8 +1010,8 @@ if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch): _nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body) _h(ch, method, props, body) _nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", "")) -if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch): - if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch): +if not _nyx_call_bounded(lambda: _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch), _NYX_LIVE_BROKER_DEADLINE): + if not _nyx_call_bounded(lambda: _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch), _NYX_LIVE_BROKER_DEADLINE): _chan = NyxRabbitChannel() _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) @@ -1022,8 +1022,8 @@ if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch): publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, ), PythonBroker::Kafka => format!( - r#"if not _nyx_try_real_kafka({queue:?}, payload, {handler:?}): - if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}): + r#"if not _nyx_call_bounded(lambda: _nyx_try_real_kafka({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE): + if not _nyx_call_bounded(lambda: _nyx_try_kafka_http({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE): _loop = NyxKafkaLoopback() def _nyx_kafka_dispatch(message): _h = getattr(_entry_mod, {handler:?}, None) @@ -1070,6 +1070,37 @@ def _nyx_record_broker_event(env_name, action, destination, body): def _nyx_record_broker_publish(env_name, destination, body): _nyx_record_broker_event(env_name, "publish", destination, body) +# Hard wall-clock cap for a live-broker *upgrade* attempt. The harness +# prefers driving the handler through the real client library when one is +# importable (higher fidelity), but a stalled attempt — unreachable broker, +# a protocol-incompatible stub, a metadata/consumer-group fetch that never +# returns — must never consume the whole sandbox budget and starve the +# in-process loopback fallback that confirms the finding deterministically. +# A responsive broker completes in milliseconds and is unaffected; only a +# stall is bounded. Tunable via NYX_LIVE_BROKER_DEADLINE_MS for deployments +# that point at a genuinely slow real broker. +_NYX_LIVE_BROKER_DEADLINE = max( + 0.5, float(os.environ.get("NYX_LIVE_BROKER_DEADLINE_MS", "2500")) / 1000.0 +) + +def _nyx_call_bounded(fn, seconds): + import threading + _box = {{"v": False}} + def _worker(): + try: + _box["v"] = fn() + except SystemExit: + _box["v"] = False + except Exception: + _box["v"] = False + _t = threading.Thread(target=_worker, daemon=True) + _t.start() + _t.join(seconds) + # Thread still alive => the live attempt stalled past the deadline; abandon + # it (reaped as a daemon at process exit) and report failure so the caller + # falls through to the loopback. + return _box["v"] if not _t.is_alive() else False + def _nyx_kafka_bootstrap(endpoint): endpoint = (endpoint or "").strip() if endpoint.startswith(("http://", "https://")): diff --git a/src/dynamic/runner.rs b/src/dynamic/runner.rs index 5782dff0..9ef9d365 100644 --- a/src/dynamic/runner.rs +++ b/src/dynamic/runner.rs @@ -370,6 +370,24 @@ pub fn run_spec(spec: &HarnessSpec, opts: &SandboxOptions) -> Result50ms ceiling)", - elapsed.as_millis() + best.as_millis() < 50, + "build_module_graph took {}ms warm (>50ms ceiling)", + best.as_millis() ); let delta_kib = bytes_after.saturating_sub(bytes_before);