refactor(dynamic): extend Kafka protocol emulator with binary protocol support, Pubsub gRPC emulator, and enhance listener and endpoint handling

This commit is contained in:
elipeter 2026-05-27 14:11:31 -05:00
parent 030b054843
commit 1a0e2d204b
6 changed files with 1530 additions and 106 deletions

View file

@ -4192,7 +4192,6 @@ public class NyxHarness {{
java.util.Properties consumerProps = new java.util.Properties();
consumerProps.put("bootstrap.servers", bootstrap);
consumerProps.put("group.id", "nyx-" + Long.toString(System.nanoTime()));
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
@ -4209,8 +4208,18 @@ public class NyxHarness {{
.invoke(future, Long.valueOf(2L), java.util.concurrent.TimeUnit.SECONDS);
producerClass.getMethod("flush").invoke(producer);
consumerClass.getMethod("subscribe", java.util.Collection.class)
.invoke(consumer, java.util.Collections.singletonList(topic));
Class<?> topicPartitionClass = Class.forName("org.apache.kafka.common.TopicPartition");
Object partition = topicPartitionClass.getConstructor(String.class, int.class)
.newInstance(topic, Integer.valueOf(0));
java.util.List<Object> partitions = java.util.Collections.singletonList(partition);
consumerClass.getMethod("assign", java.util.Collection.class).invoke(consumer, partitions);
try {{
consumerClass.getMethod("seekToBeginning", java.util.Collection.class)
.invoke(consumer, partitions);
}} catch (Throwable seekError) {{
consumerClass.getMethod("seek", topicPartitionClass, long.class)
.invoke(consumer, partition, Long.valueOf(0L));
}}
Object records = consumerClass.getMethod("poll", java.time.Duration.class)
.invoke(consumer, java.time.Duration.ofSeconds(2));
if (!(records instanceof Iterable)) {{
@ -4233,7 +4242,6 @@ public class NyxHarness {{
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
if (success) {{
consumerClass.getMethod("commitSync").invoke(consumer);
nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", topic, Long.toString(offset));
}}
delivered = true;

View file

@ -1085,7 +1085,7 @@ def _nyx_try_real_kafka(topic, body, handler_name):
if not bootstrap:
return False
try:
from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
except Exception:
return False
_h = getattr(_entry_mod, handler_name, None)
@ -1104,9 +1104,8 @@ def _nyx_try_real_kafka(topic, body, handler_name):
retries=0,
)
_consumer = KafkaConsumer(
str(topic),
bootstrap_servers=[bootstrap],
group_id="nyx-" + str(os.getpid()),
group_id=None,
auto_offset_reset="earliest",
enable_auto_commit=False,
consumer_timeout_ms=2000,
@ -1118,6 +1117,23 @@ def _nyx_try_real_kafka(topic, body, handler_name):
_nyx_record_broker_publish("NYX_KAFKA_LOG", topic, body)
_producer.send(str(topic), body).get(timeout=2)
_producer.flush(timeout=2)
_tp = TopicPartition(str(topic), 0)
_consumer.assign([_tp])
try:
_consumer.seek_to_beginning(_tp)
except Exception:
_consumer.seek(_tp, 0)
_records = _consumer.poll(timeout_ms=2000, max_records=1)
for _partition_records in _records.values():
for _record in _partition_records:
_nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value)
_h(_record.value)
try:
_consumer.commit()
except Exception:
pass
_nyx_record_broker_event("NYX_KAFKA_LOG", "ack", topic, str(getattr(_record, "offset", "")))
return True
for _record in _consumer:
_nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", topic, _record.value)
_h(_record.value)

File diff suppressed because it is too large Load diff