refactor(dynamic): introduce publish/poll/commit cycle for Kafka, expand SQS loopback with receive/delete support, enhance event recording, and unify migration SQL handling across frameworks

This commit is contained in:
elipeter 2026-05-26 15:39:18 -05:00
parent ed96f94bb5
commit 6ee2bdda36
11 changed files with 515 additions and 71 deletions

View file

@ -54,13 +54,37 @@ impl BrokerStub {
/// Java, Python, Node, Go, PHP, Ruby, and Rust harnesses can append
/// it without a JSON dependency:
///
/// `topic<TAB>payload`
/// `action<TAB>topic<TAB>payload`
///
/// Older harnesses wrote `topic<TAB>payload`; `drain_events`
/// still accepts that form and treats it as a `publish` event.
pub fn record_publish(&self, destination: &str, payload: &str) -> std::io::Result<()> {
self.record_event("publish", destination, payload)
}
/// Record a broker delivery observation.
pub fn record_delivery(&self, destination: &str, payload: &str) -> std::io::Result<()> {
self.record_event("deliver", destination, payload)
}
/// Record an ack/commit/delete observation. The `payload` field
/// carries the broker-specific ack token when one exists.
pub fn record_ack(&self, destination: &str, payload: &str) -> std::io::Result<()> {
self.record_event("ack", destination, payload)
}
fn record_event(&self, action: &str, destination: &str, payload: &str) -> std::io::Result<()> {
let mut f = OpenOptions::new()
.append(true)
.create(true)
.open(&self.log_path)?;
writeln!(f, "{}\t{}", destination.replace('\t', " "), payload)?;
writeln!(
f,
"{}\t{}\t{}",
action.replace('\t', " "),
destination.replace('\t', " "),
payload
)?;
Ok(())
}
}
@ -111,12 +135,13 @@ impl StubProvider for BrokerStub {
if line.is_empty() {
continue;
}
let (destination, payload) = line.split_once('\t').unwrap_or((line, ""));
let (action, destination, payload) = parse_broker_log_line(line);
let event = StubEvent {
kind: self.kind,
captured_at_ns: monotonic_ns(),
summary: format!("publish {destination}"),
summary: format!("{action} {destination}"),
detail: std::collections::BTreeMap::from([
("action".to_owned(), action.to_owned()),
("destination".to_owned(), destination.to_owned()),
("payload".to_owned(), payload.to_owned()),
]),
@ -128,6 +153,18 @@ impl StubProvider for BrokerStub {
}
}
fn parse_broker_log_line(line: &str) -> (&str, &str, &str) {
let Some((first, rest)) = line.split_once('\t') else {
return ("publish", line, "");
};
if matches!(first, "publish" | "deliver" | "ack" | "nack" | "retry") {
let (destination, payload) = rest.split_once('\t').unwrap_or((rest, ""));
(first, destination, payload)
} else {
("publish", first, rest)
}
}
impl Drop for BrokerStub {
fn drop(&mut self) {
self.tempdir.take();
@ -160,8 +197,34 @@ mod tests {
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, StubKind::Sqs);
assert_eq!(events[0].summary, "publish queue-a");
assert_eq!(events[0].detail.get("action").unwrap(), "publish");
assert_eq!(events[0].detail.get("destination").unwrap(), "queue-a");
assert_eq!(events[0].detail.get("payload").unwrap(), "NYX_PWN_CMDI");
assert!(stub.drain_events().is_empty(), "drain cursor must advance");
}
#[test]
fn broker_drain_understands_delivery_and_ack_events() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap();
stub.record_delivery("orders", "payload-1").unwrap();
stub.record_ack("orders", "offset-1").unwrap();
let events = stub.drain_events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].summary, "deliver orders");
assert_eq!(events[1].summary, "ack orders");
assert_eq!(events[1].detail.get("payload").unwrap(), "offset-1");
}
#[test]
fn broker_drain_preserves_legacy_two_field_publish_lines() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
std::fs::write(stub.log_path(), "work\tlegacy payload\n").unwrap();
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].summary, "publish work");
assert_eq!(events[0].detail.get("action").unwrap(), "publish");
assert_eq!(events[0].detail.get("payload").unwrap(), "legacy payload");
}
}

View file

@ -3,19 +3,17 @@
//! The Phase 20 acceptance gate runs every per-lang `MessageHandler` harness
//! inside an in-process loopback broker — no real Kafka cluster, no
//! external network — so the per-lang harness can publish the spec's
//! payload onto a topic and observe the handler under test receive it
//! synchronously. Each `broker_kafka` source snippet declares a tiny
//! `NyxKafkaLoopback` type whose `publish(topic, payload)` immediately
//! routes the bytes through the subscriber callback the harness has
//! registered. No threads, no sockets, no async runtime: a single
//! synchronous in-process dispatch keeps Phase 10's 500 ms boot budget
//! intact when `stubs_required` is empty.
//! payload onto a topic, poll the topic, dispatch the record, and commit
//! the offset. No threads, no sockets, no async runtime: a single
//! synchronous publish/poll/commit cycle keeps Phase 10's 500 ms boot
//! budget intact when `stubs_required` is empty while still exercising
//! the consumer-loop shape real Kafka handlers depend on.
//!
//! The snippet shape mirrors [`crate::dynamic::stubs::mocks::mock_source`] —
//! per-language inline source returned as a `&'static str` so the
//! generated harness can splice it verbatim into its own source. The
//! generated harness can splice it verbatim into its own source. The
//! per-language harness emitter is responsible for instantiating the
//! loopback and invoking the registered handler with the payload.
//! loopback, publishing, polling, and committing records.
use crate::symbol::Lang;
@ -28,33 +26,84 @@ pub const KAFKA_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:kafka";
/// Returns `""` when the language has no harness-level Kafka adapter
/// (everything outside Java / Python today). The snippet does *not*
/// emit a publish marker by itself; the per-lang harness emitter calls
/// `publish(topic, payload)` and prints the marker once.
/// `publish(topic, payload)`, polls, and prints the marker once.
pub fn kafka_source(lang: Lang) -> &'static str {
match lang {
Lang::Python => {
r#"
class NyxKafkaLoopback:
"""In-process Kafka loopback — no socket, no thread, no broker."""
"""In-process Kafka loopback with publish/poll/commit semantics."""
def __init__(self):
self._subs = {}
self._topics = {}
self._offsets = {}
self._committed = {}
def subscribe(self, topic, cb):
self._subs.setdefault(topic, []).append(cb)
def _next_offset(self, topic):
off = self._offsets.get(topic, 0)
self._offsets[topic] = off + 1
return off
def publish(self, topic, payload):
for cb in self._subs.get(topic, []):
cb(payload)
rec = NyxKafkaRecord(topic, payload, self._next_offset(topic))
self._topics.setdefault(topic, []).append(rec)
return rec
def poll(self, topic, max_records=1, timeout_ms=0):
_ = timeout_ms
return list(self._topics.get(topic, [])[:max_records])
def commit(self, record):
self._committed[record.topic] = max(self._committed.get(record.topic, -1), record.offset)
self._topics[record.topic] = [
r for r in self._topics.get(record.topic, []) if r.offset > record.offset
]
class NyxKafkaRecord:
def __init__(self, topic, value, offset):
self.topic = topic
self.value = value
self.offset = offset
self.key = None
def __str__(self):
return str(self.value)
"#
}
Lang::Java => {
r#"
static class NyxKafkaRecord {
public final String topic;
public final String value;
public final long offset;
NyxKafkaRecord(String topic, String value, long offset) {
this.topic = topic;
this.value = value;
this.offset = offset;
}
public String toString() { return value; }
}
static class NyxKafkaLoopback {
private final java.util.Map<String, java.util.List<java.util.function.Consumer<String>>> subs = new java.util.HashMap<>();
private final java.util.Map<String, java.util.List<NyxKafkaRecord>> topics = new java.util.HashMap<>();
private final java.util.Map<String, Long> offsets = new java.util.HashMap<>();
private final java.util.Map<String, Long> committed = new java.util.HashMap<>();
public void subscribe(String topic, java.util.function.Consumer<String> cb) {
subs.computeIfAbsent(topic, k -> new java.util.ArrayList<>()).add(cb);
}
public void publish(String topic, String payload) {
for (java.util.function.Consumer<String> cb : subs.getOrDefault(topic, java.util.Collections.emptyList())) {
cb.accept(payload);
}
public NyxKafkaRecord publish(String topic, String payload) {
long off = offsets.getOrDefault(topic, 0L);
offsets.put(topic, off + 1L);
NyxKafkaRecord rec = new NyxKafkaRecord(topic, payload, off);
topics.computeIfAbsent(topic, k -> new java.util.ArrayList<>()).add(rec);
return rec;
}
public java.util.List<NyxKafkaRecord> poll(String topic, int maxRecords) {
java.util.List<NyxKafkaRecord> q = topics.getOrDefault(topic, java.util.Collections.emptyList());
return new java.util.ArrayList<>(q.subList(0, Math.min(maxRecords, q.size())));
}
public void commit(NyxKafkaRecord rec) {
committed.put(rec.topic, Math.max(committed.getOrDefault(rec.topic, -1L), rec.offset));
java.util.List<NyxKafkaRecord> q = topics.getOrDefault(rec.topic, new java.util.ArrayList<>());
q.removeIf(r -> r.offset <= rec.offset);
}
}
"#
@ -76,16 +125,20 @@ mod tests {
fn python_snippet_declares_loopback_class() {
let src = kafka_source(Lang::Python);
assert!(src.contains("class NyxKafkaLoopback"));
assert!(src.contains("class NyxKafkaRecord"));
assert!(src.contains("def publish"));
assert!(src.contains("def subscribe"));
assert!(src.contains("def poll"));
assert!(src.contains("def commit"));
}
#[test]
fn java_snippet_declares_static_inner_class() {
let src = kafka_source(Lang::Java);
assert!(src.contains("static class NyxKafkaRecord"));
assert!(src.contains("static class NyxKafkaLoopback"));
assert!(src.contains("public void publish"));
assert!(src.contains("public void subscribe"));
assert!(src.contains("public NyxKafkaRecord publish"));
assert!(src.contains("public java.util.List<NyxKafkaRecord> poll"));
assert!(src.contains("public void commit"));
}
#[test]

View file

@ -3,8 +3,9 @@
//! Mirrors [`crate::dynamic::stubs::broker_kafka`] but mints SQS-shaped
//! envelopes (`MessageId`, `ReceiptHandle`, `Body`) the way `boto3.sqs` /
//! `software.amazon.awssdk.services.sqs` / the AWS Node SDK present
//! them. The loopback never speaks the AWS protocol — it just calls
//! the registered handler synchronously with a single-message envelope.
//! them. The loopback never speaks the AWS protocol, but it does model
//! the shape the harness cares about: send, receive, receipt-handle
//! delete, and bounded redelivery for messages that are not acked.
use crate::symbol::Lang;
@ -19,10 +20,12 @@ pub fn sqs_source(lang: Lang) -> &'static str {
Lang::Python => {
r#"
class NyxSqsLoopback:
"""In-process SQS loopback — boto3-shaped envelopes."""
"""In-process SQS loopback with receive/delete semantics."""
def __init__(self):
self._subs = {}
self._mid = 0
self._queues = {}
self._inflight = {}
def subscribe(self, queue, cb):
self._subs.setdefault(queue, []).append(cb)
def publish(self, queue, payload):
@ -31,28 +34,66 @@ class NyxSqsLoopback:
'MessageId': f'nyx-{self._mid:08d}',
'ReceiptHandle': f'rh-nyx-{self._mid:08d}',
'Body': payload,
'Attributes': {'ApproximateReceiveCount': '0'},
}
for cb in self._subs.get(queue, []):
cb(envelope)
self._queues.setdefault(queue, []).append(envelope)
return envelope
def receive_message(self, queue, max_number=1, visibility_timeout=0):
_ = visibility_timeout
out = []
pending = self._queues.setdefault(queue, [])
while pending and len(out) < max_number:
msg = pending.pop(0)
count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', '0')) + 1
msg.setdefault('Attributes', {})['ApproximateReceiveCount'] = str(count)
self._inflight[msg['ReceiptHandle']] = (queue, msg)
out.append(msg)
return out
def delete_message(self, queue, receipt_handle):
_ = queue
return self._inflight.pop(receipt_handle, None) is not None
def replay_inflight(self, max_receive_count=3):
for receipt, (queue, msg) in list(self._inflight.items()):
count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', '0'))
if count < max_receive_count:
self._queues.setdefault(queue, []).append(msg)
self._inflight.pop(receipt, None)
"#
}
Lang::Java => {
r#"
static class NyxSqsLoopback {
private final java.util.Map<String, java.util.List<java.util.function.Consumer<java.util.Map<String, String>>>> subs = new java.util.HashMap<>();
private final java.util.Map<String, java.util.List<java.util.Map<String, String>>> queues = new java.util.HashMap<>();
private final java.util.Map<String, java.util.Map<String, String>> inflight = new java.util.HashMap<>();
private int mid = 0;
public void subscribe(String queue, java.util.function.Consumer<java.util.Map<String, String>> cb) {
subs.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(cb);
}
public void publish(String queue, String payload) {
public java.util.Map<String, String> publish(String queue, String payload) {
mid += 1;
java.util.Map<String, String> envelope = new java.util.HashMap<>();
envelope.put("MessageId", "nyx-" + mid);
envelope.put("ReceiptHandle", "rh-nyx-" + mid);
envelope.put("Body", payload);
for (java.util.function.Consumer<java.util.Map<String, String>> cb : subs.getOrDefault(queue, java.util.Collections.emptyList())) {
cb.accept(envelope);
envelope.put("ApproximateReceiveCount", "0");
queues.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(envelope);
return envelope;
}
public java.util.List<java.util.Map<String, String>> receiveMessage(String queue, int maxMessages) {
java.util.List<java.util.Map<String, String>> pending = queues.computeIfAbsent(queue, k -> new java.util.ArrayList<>());
java.util.List<java.util.Map<String, String>> out = new java.util.ArrayList<>();
while (!pending.isEmpty() && out.size() < maxMessages) {
java.util.Map<String, String> msg = pending.remove(0);
int count = Integer.parseInt(msg.getOrDefault("ApproximateReceiveCount", "0")) + 1;
msg.put("ApproximateReceiveCount", Integer.toString(count));
inflight.put(msg.get("ReceiptHandle"), msg);
out.add(msg);
}
return out;
}
public boolean deleteMessage(String queue, String receiptHandle) {
return inflight.remove(receiptHandle) != null;
}
}
"#
@ -60,7 +101,7 @@ class NyxSqsLoopback:
Lang::JavaScript | Lang::TypeScript => {
r#"
class NyxSqsLoopback {
constructor() { this._subs = new Map(); this._mid = 0; }
constructor() { this._subs = new Map(); this._mid = 0; this._queues = new Map(); this._inflight = new Map(); }
subscribe(queue, cb) {
if (!this._subs.has(queue)) this._subs.set(queue, []);
this._subs.get(queue).push(cb);
@ -71,8 +112,38 @@ class NyxSqsLoopback {
MessageId: 'nyx-' + this._mid,
ReceiptHandle: 'rh-nyx-' + this._mid,
Body: payload,
Attributes: { ApproximateReceiveCount: '0' },
};
for (const cb of (this._subs.get(queue) || [])) cb(envelope);
if (!this._queues.has(queue)) this._queues.set(queue, []);
this._queues.get(queue).push(envelope);
return envelope;
}
receiveMessage(queue, maxMessages = 1, visibilityTimeout = 0) {
void visibilityTimeout;
const pending = this._queues.get(queue) || [];
const out = [];
while (pending.length > 0 && out.length < maxMessages) {
const msg = pending.shift();
const count = Number((msg.Attributes && msg.Attributes.ApproximateReceiveCount) || '0') + 1;
msg.Attributes = Object.assign({}, msg.Attributes || {}, { ApproximateReceiveCount: String(count) });
this._inflight.set(msg.ReceiptHandle, { queue, msg });
out.push(msg);
}
return out;
}
deleteMessage(queue, receiptHandle) {
void queue;
return this._inflight.delete(receiptHandle);
}
replayInflight(maxReceiveCount = 3) {
for (const [receipt, item] of Array.from(this._inflight.entries())) {
const count = Number((item.msg.Attributes && item.msg.Attributes.ApproximateReceiveCount) || '0');
if (count < maxReceiveCount) {
if (!this._queues.has(item.queue)) this._queues.set(item.queue, []);
this._queues.get(item.queue).push(item.msg);
}
this._inflight.delete(receipt);
}
}
}
"#
@ -97,6 +168,8 @@ mod tests {
assert!(src.contains("MessageId"));
assert!(src.contains("ReceiptHandle"));
assert!(src.contains("Body"));
assert!(src.contains("receive_message"));
assert!(src.contains("delete_message"));
}
#[test]
@ -105,6 +178,8 @@ mod tests {
assert!(src.contains("static class NyxSqsLoopback"));
assert!(src.contains("MessageId"));
assert!(src.contains("Body"));
assert!(src.contains("receiveMessage"));
assert!(src.contains("deleteMessage"));
}
#[test]
@ -113,6 +188,8 @@ mod tests {
assert!(src.contains("class NyxSqsLoopback"));
assert!(src.contains("subscribe(queue"));
assert!(src.contains("publish(queue"));
assert!(src.contains("receiveMessage(queue"));
assert!(src.contains("deleteMessage(queue"));
let ts = sqs_source(Lang::TypeScript);
assert_eq!(ts, src);
}