mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-18 20:15:14 +02:00
fix linux java
This commit is contained in:
parent
8974b91bfc
commit
3edb17e60b
3 changed files with 75 additions and 12 deletions
|
|
@ -958,7 +958,7 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource {
|
|||
|
||||
let register_and_publish = match broker {
|
||||
PythonBroker::Sqs => format!(
|
||||
r#"if not _nyx_try_real_sqs({queue:?}, payload, {handler:?}):
|
||||
r#"if not _nyx_call_bounded(lambda: _nyx_try_real_sqs({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE):
|
||||
_loop = NyxSqsLoopback()
|
||||
def _nyx_sqs_dispatch(envelope):
|
||||
_h = getattr(_entry_mod, {handler:?}, None)
|
||||
|
|
@ -990,8 +990,8 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource {
|
|||
if hasattr(message, "ack"):
|
||||
message.ack()
|
||||
_nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", ""))
|
||||
if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch):
|
||||
if not _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch):
|
||||
if not _nyx_call_bounded(lambda: _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch), _NYX_LIVE_BROKER_DEADLINE):
|
||||
if not _nyx_call_bounded(lambda: _nyx_try_pubsub_http({queue:?}, payload, _nyx_pubsub_dispatch), _NYX_LIVE_BROKER_DEADLINE):
|
||||
_loop = NyxPubsubLoopback()
|
||||
_loop.subscribe({queue:?}, _nyx_pubsub_dispatch)
|
||||
print({publish_marker:?} + " " + {queue:?}, flush=True)
|
||||
|
|
@ -1010,8 +1010,8 @@ if not _nyx_try_real_pubsub({queue:?}, payload, _nyx_pubsub_dispatch):
|
|||
_nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body)
|
||||
_h(ch, method, props, body)
|
||||
_nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", ""))
|
||||
if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch):
|
||||
if not _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch):
|
||||
if not _nyx_call_bounded(lambda: _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch), _NYX_LIVE_BROKER_DEADLINE):
|
||||
if not _nyx_call_bounded(lambda: _nyx_try_rabbit_http({queue:?}, payload, _nyx_rabbit_dispatch), _NYX_LIVE_BROKER_DEADLINE):
|
||||
_chan = NyxRabbitChannel()
|
||||
_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch)
|
||||
print({publish_marker:?} + " " + {queue:?}, flush=True)
|
||||
|
|
@ -1022,8 +1022,8 @@ if not _nyx_try_real_rabbit({queue:?}, payload, _nyx_rabbit_dispatch):
|
|||
publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER,
|
||||
),
|
||||
PythonBroker::Kafka => format!(
|
||||
r#"if not _nyx_try_real_kafka({queue:?}, payload, {handler:?}):
|
||||
if not _nyx_try_kafka_http({queue:?}, payload, {handler:?}):
|
||||
r#"if not _nyx_call_bounded(lambda: _nyx_try_real_kafka({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE):
|
||||
if not _nyx_call_bounded(lambda: _nyx_try_kafka_http({queue:?}, payload, {handler:?}), _NYX_LIVE_BROKER_DEADLINE):
|
||||
_loop = NyxKafkaLoopback()
|
||||
def _nyx_kafka_dispatch(message):
|
||||
_h = getattr(_entry_mod, {handler:?}, None)
|
||||
|
|
@ -1070,6 +1070,37 @@ def _nyx_record_broker_event(env_name, action, destination, body):
|
|||
def _nyx_record_broker_publish(env_name, destination, body):
|
||||
_nyx_record_broker_event(env_name, "publish", destination, body)
|
||||
|
||||
# Hard wall-clock cap for a live-broker *upgrade* attempt. The harness
|
||||
# prefers driving the handler through the real client library when one is
|
||||
# importable (higher fidelity), but a stalled attempt — unreachable broker,
|
||||
# a protocol-incompatible stub, a metadata/consumer-group fetch that never
|
||||
# returns — must never consume the whole sandbox budget and starve the
|
||||
# in-process loopback fallback that confirms the finding deterministically.
|
||||
# A responsive broker completes in milliseconds and is unaffected; only a
|
||||
# stall is bounded. Tunable via NYX_LIVE_BROKER_DEADLINE_MS for deployments
|
||||
# that point at a genuinely slow real broker.
|
||||
_NYX_LIVE_BROKER_DEADLINE = max(
|
||||
0.5, float(os.environ.get("NYX_LIVE_BROKER_DEADLINE_MS", "2500")) / 1000.0
|
||||
)
|
||||
|
||||
def _nyx_call_bounded(fn, seconds):
|
||||
import threading
|
||||
_box = {{"v": False}}
|
||||
def _worker():
|
||||
try:
|
||||
_box["v"] = fn()
|
||||
except SystemExit:
|
||||
_box["v"] = False
|
||||
except Exception:
|
||||
_box["v"] = False
|
||||
_t = threading.Thread(target=_worker, daemon=True)
|
||||
_t.start()
|
||||
_t.join(seconds)
|
||||
# Thread still alive => the live attempt stalled past the deadline; abandon
|
||||
# it (reaped as a daemon at process exit) and report failure so the caller
|
||||
# falls through to the loopback.
|
||||
return _box["v"] if not _t.is_alive() else False
|
||||
|
||||
def _nyx_kafka_bootstrap(endpoint):
|
||||
endpoint = (endpoint or "").strip()
|
||||
if endpoint.startswith(("http://", "https://")):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue