diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 2bc0474b..f02e00f2 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3804,20 +3804,26 @@ fn emit_message_handler_harness( crate::dynamic::stubs::SQS_PUBLISH_MARKER, format!( r#" NyxSqsLoopback brokerRef = new NyxSqsLoopback(); - brokerRef.subscribe({queue:?}, env -> {{ + System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_SQS_LOG", {queue:?}, payload); + brokerRef.publish({queue:?}, payload); + for (java.util.Map env : brokerRef.receiveMessage({queue:?}, 1)) {{ + nyxRecordBrokerEvent("NYX_SQS_LOG", "deliver", {queue:?}, env.getOrDefault("Body", "")); System.out.println("__NYX_SINK_HIT__"); + boolean success = false; try {{ java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, java.util.Map.class); m.setAccessible(true); m.invoke(entryInst, env); + success = true; }} catch (Exception e) {{ Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); }} - }}); - System.out.println({publish_marker:?} + " " + {queue:?}); - nyxRecordBrokerPublish("NYX_SQS_LOG", {queue:?}, payload); - brokerRef.publish({queue:?}, payload);"#, + if (success && brokerRef.deleteMessage({queue:?}, env.getOrDefault("ReceiptHandle", ""))) {{ + nyxRecordBrokerEvent("NYX_SQS_LOG", "ack", {queue:?}, env.getOrDefault("ReceiptHandle", "")); + }} + }}"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, @@ -3859,20 +3865,27 @@ fn emit_message_handler_harness( crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, format!( r#" NyxKafkaLoopback brokerRef = new NyxKafkaLoopback(); - brokerRef.subscribe({queue:?}, body -> {{ + System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_KAFKA_LOG", {queue:?}, payload); + brokerRef.publish({queue:?}, payload); + for (NyxKafkaRecord rec : brokerRef.poll({queue:?}, 1)) {{ + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "deliver", {queue:?}, rec.value); System.out.println("__NYX_SINK_HIT__"); + boolean success = false; try {{ java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); m.setAccessible(true); - m.invoke(entryInst, body); + m.invoke(entryInst, rec.value); + success = true; }} catch (Exception e) {{ Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); }} - }}); - System.out.println({publish_marker:?} + " " + {queue:?}); - nyxRecordBrokerPublish("NYX_KAFKA_LOG", {queue:?}, payload); - brokerRef.publish({queue:?}, payload);"#, + if (success) {{ + brokerRef.commit(rec); + nyxRecordBrokerEvent("NYX_KAFKA_LOG", "ack", {queue:?}, Long.toString(rec.offset)); + }} + }}"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, @@ -3917,10 +3930,10 @@ public class NyxHarness {{ return ""; }} - static void nyxRecordBrokerPublish(String envName, String destination, String payload) {{ + static void nyxRecordBrokerEvent(String envName, String action, String destination, String payload) {{ String path = System.getenv(envName); if (path == null || path.isEmpty()) return; - String line = destination.replace('\t', ' ') + "\t" + payload + "\n"; + String line = action.replace('\t', ' ') + "\t" + destination.replace('\t', ' ') + "\t" + payload + "\n"; try {{ java.nio.file.Files.write( java.nio.file.Paths.get(path), @@ -3931,6 +3944,10 @@ public class NyxHarness {{ }} catch (Exception ignored) {{ }} }} + + static void nyxRecordBrokerPublish(String envName, String destination, String payload) {{ + nyxRecordBrokerEvent(envName, "publish", destination, payload); + }} }} "#, entry_class = entry_class, diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index 22242706..4353b5d7 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -928,32 +928,49 @@ if (typeof _handler !== 'function') {{ }} const _broker = new NyxSqsLoopback(); -function _nyxRecordBrokerPublish(envName, destination, body) {{ +function _nyxRecordBrokerEvent(envName, action, destination, body) {{ const path = process.env[envName] || ''; if (!path) return; try {{ require('fs').appendFileSync( path, - String(destination).replace(/\t/g, ' ') + '\t' + String(body) + '\n', + String(action).replace(/\t/g, ' ') + '\t' + + String(destination).replace(/\t/g, ' ') + '\t' + + String(body) + '\n', 'utf8' ); }} catch (_) {{}} }} -_broker.subscribe({queue:?}, async (envelope) => {{ +function _nyxRecordBrokerPublish(envName, destination, body) {{ + _nyxRecordBrokerEvent(envName, 'publish', destination, body); +}} + +async function _nyxDispatchEnvelope(envelope) {{ try {{ // Sink-reachability sentinel — runner's `vuln_fired && sink_hit` // gate requires this byte sequence on stdout / stderr. process.stdout.write('__NYX_SINK_HIT__\n'); await Promise.resolve(_handler(envelope)); + return true; }} catch (e) {{ process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n'); + return false; }} -}}); +}} (async () => {{ process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n'); _nyxRecordBrokerPublish('NYX_SQS_LOG', {queue:?}, payload); _broker.publish({queue:?}, payload); + for (const envelope of _broker.receiveMessage({queue:?}, 1)) {{ + _nyxRecordBrokerEvent('NYX_SQS_LOG', 'deliver', {queue:?}, envelope.Body || ''); + const ok = await _nyxDispatchEnvelope(envelope); + if (ok && _broker.deleteMessage({queue:?}, envelope.ReceiptHandle || '')) {{ + _nyxRecordBrokerEvent('NYX_SQS_LOG', 'ack', {queue:?}, envelope.ReceiptHandle || ''); + }} else {{ + _broker.replayInflight(); + }} + }} }})(); "#, handler = handler, @@ -1187,21 +1204,30 @@ if (_h == null) {{ process.stderr.write('NYX_HANDLER_NOT_FOUND: ' + {handler:?} + '\n'); process.exit(78); }} -// Synthetic queryInterface for sequelize-style up/down(queryInterface, Sequelize). +function _nyxLooksLikeSql(sql) {{ + const upper = String(sql).toUpperCase(); + return ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'].some((k) => upper.includes(k)); +}} +function _nyxMigrationSqlRecord(sql, driver) {{ + if (!_nyxLooksLikeSql(sql)) return; + __nyx_stub_sql_record(String(sql), {{ driver: driver, source: 'migration' }}); +}} +// QueryInterface shim for sequelize-style up/down(queryInterface, Sequelize). const _qi = {{ - createTable: async function(){{}}, - addColumn: async function(){{}}, - dropTable: async function(){{}}, - removeColumn: async function(){{}}, - bulkInsert: async function(){{}}, - sequelize: {{ query: async function(){{}} }}, + createTable: async function(name){{ const sql = 'CREATE TABLE ' + String(name) + ' (id INTEGER)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, + addColumn: async function(table, column){{ const sql = 'ALTER TABLE ' + String(table) + ' ADD COLUMN ' + String(column) + ' TEXT'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, + dropTable: async function(name){{ const sql = 'DROP TABLE ' + String(name); _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, + removeColumn: async function(table, column){{ const sql = 'ALTER TABLE ' + String(table) + ' DROP COLUMN ' + String(column); _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, + bulkInsert: async function(table){{ const sql = 'INSERT INTO ' + String(table) + ' VALUES (...)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, + sequelize: {{ query: async function(sql){{ _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }} }}, }}; const _prisma = {{ - $executeRaw: async function(){{}}, - $executeRawUnsafe: async function(s){{ if (s) process.stdout.write('NYX_PRISMA_SQL: ' + s + '\n'); }}, - $queryRaw: async function(){{}}, - $queryRawUnsafe: async function(){{}}, + $executeRaw: async function(s){{ if (s) _nyxMigrationSqlRecord(s, 'prisma'); return s; }}, + $executeRawUnsafe: async function(s){{ if (s) {{ _nyxMigrationSqlRecord(s, 'prisma'); process.stdout.write('NYX_PRISMA_SQL: ' + s + '\n'); }} return s; }}, + $queryRaw: async function(s){{ if (s) _nyxMigrationSqlRecord(s, 'prisma'); return s; }}, + $queryRawUnsafe: async function(s){{ if (s) _nyxMigrationSqlRecord(s, 'prisma'); return s; }}, }}; +global.__nyx_prisma = _prisma; (async () => {{ try {{ let _result; @@ -1216,6 +1242,7 @@ const _prisma = {{ _result = await Promise.resolve(_h()); }} }} + if (typeof _result === 'string') _nyxMigrationSqlRecord(_result, 'migration'); if (_result != null) process.stdout.write(String(_result) + '\n'); }} catch (e) {{ process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n'); diff --git a/src/dynamic/lang/php.rs b/src/dynamic/lang/php.rs index d9758125..d5e0c7ac 100644 --- a/src/dynamic/lang/php.rs +++ b/src/dynamic/lang/php.rs @@ -3164,11 +3164,25 @@ fn emit_migration_harness(spec: &HarnessSpec, version: Option<&str>) -> HarnessS r#"{preamble} echo "__NYX_MIGRATION__: " . {version:?} . "\n"; +function __nyx_migration_sqlish($value): bool {{ + $upper = strtoupper((string)$value); + foreach (['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'] as $kw) {{ + if (strpos($upper, $kw) !== false) return true; + }} + return false; +}} + +function __nyx_record_migration_result($value, string $driver): void {{ + if ($value === null || !__nyx_migration_sqlish($value)) return; + __nyx_stub_sql_record((string)$value, ['driver' => $driver, 'source' => 'migration']); +}} + if (class_exists({handler:?})) {{ $inst = new {handler}(); if (method_exists($inst, 'up')) {{ try {{ $result = $inst->up(); + __nyx_record_migration_result($result, 'laravel'); if ($result !== null) echo (string)$result . "\n"; }} catch (Throwable $e) {{ fwrite(STDERR, 'NYX_EXCEPTION: ' . get_class($e) . ': ' . $e->getMessage() . "\n"); @@ -3180,6 +3194,7 @@ if (class_exists({handler:?})) {{ }} elseif (function_exists({handler:?})) {{ try {{ $result = call_user_func({handler:?}); + __nyx_record_migration_result($result, 'php'); if ($result !== null) echo (string)$result . "\n"; }} catch (Throwable $e) {{ fwrite(STDERR, 'NYX_EXCEPTION: ' . get_class($e) . ': ' . $e->getMessage() . "\n"); diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 0780eb46..bb5ee776 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -968,7 +968,12 @@ def _nyx_sqs_dispatch(envelope): _loop.subscribe({queue:?}, _nyx_sqs_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) _nyx_record_broker_publish("NYX_SQS_LOG", {queue:?}, payload) -_loop.publish({queue:?}, payload)"#, +_loop.publish({queue:?}, payload) +for _env in _loop.receive_message({queue:?}, max_number=1): + _nyx_record_broker_event("NYX_SQS_LOG", "deliver", {queue:?}, _env.get("Body", "")) + _nyx_sqs_dispatch(_env) + if _loop.delete_message({queue:?}, _env.get("ReceiptHandle", "")): + _nyx_record_broker_event("NYX_SQS_LOG", "ack", {queue:?}, _env.get("ReceiptHandle", ""))"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, @@ -1016,7 +1021,12 @@ def _nyx_kafka_dispatch(message): _loop.subscribe({queue:?}, _nyx_kafka_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) _nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) -_loop.publish({queue:?}, payload)"#, +_loop.publish({queue:?}, payload) +for _record in _loop.poll({queue:?}, max_records=1): + _nyx_record_broker_event("NYX_KAFKA_LOG", "deliver", {queue:?}, _record.value) + _nyx_kafka_dispatch(_record.value) + _loop.commit(_record) + _nyx_record_broker_event("NYX_KAFKA_LOG", "ack", {queue:?}, str(_record.offset))"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, @@ -1030,16 +1040,23 @@ _loop.publish({queue:?}, payload)"#, {pubsub_src} {rabbit_src} -def _nyx_record_broker_publish(env_name, destination, body): +def _nyx_record_broker_event(env_name, action, destination, body): path = os.environ.get(env_name, "") if not path: return try: with open(path, "a", encoding="utf-8") as f: - f.write(str(destination).replace("\t", " ") + "\t" + str(body) + "\n") + f.write( + str(action).replace("\t", " ") + "\t" + + str(destination).replace("\t", " ") + "\t" + + str(body) + "\n" + ) except Exception: pass +def _nyx_record_broker_publish(env_name, destination, body): + _nyx_record_broker_event(env_name, "publish", destination, body) + try: {register_and_publish} except SystemExit as _e: @@ -1278,7 +1295,53 @@ _h = getattr(_entry_mod, {handler:?}, None) if _h is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) sys.exit(78) + +def _nyx_migration_sql_record(sql, driver): + text = str(sql) + upper = text.upper() + if not any(k in upper for k in ("SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP")): + return + __nyx_stub_sql_record(text, driver=driver, source="migration") + endpoint = os.environ.get("NYX_SQL_ENDPOINT", "") + if endpoint: + try: + import sqlite3 + conn = sqlite3.connect(endpoint) + try: + conn.execute(text) + conn.commit() + finally: + conn.close() + except Exception: + pass + +class _NyxMigrationOpProxy: + def __init__(self, inner=None): + self._inner = inner + def execute(self, sql, *args, **kwargs): + _nyx_migration_sql_record(sql, "alembic") + if self._inner is not None and self._inner is not self and hasattr(self._inner, "execute"): + return self._inner.execute(sql, *args, **kwargs) + return None + +def _nyx_install_migration_sql_hooks(): + if hasattr(_entry_mod, "op"): + try: + _entry_mod.op = _NyxMigrationOpProxy(getattr(_entry_mod, "op")) + except Exception: + pass + +def _nyx_record_migration_result(result): + if result is None: + return + sql = getattr(result, "sql", None) + if sql is not None: + _nyx_migration_sql_record(sql, "django") + elif isinstance(result, str): + _nyx_migration_sql_record(result, "migration") + try: + _nyx_install_migration_sql_hooks() # Migrations conventionally take no arguments; pass payload if the # function declares positional params (best-effort introspection). import inspect @@ -1291,6 +1354,7 @@ try: _result = _h(payload) else: _result = _h() + _nyx_record_migration_result(_result) if _result is not None: try: print(str(_result), flush=True) diff --git a/src/dynamic/lang/ruby.rs b/src/dynamic/lang/ruby.rs index 696c2206..f19866f1 100644 --- a/src/dynamic/lang/ruby.rs +++ b/src/dynamic/lang/ruby.rs @@ -899,15 +899,34 @@ fn emit_migration_harness(spec: &HarnessSpec, version: Option<&str>) -> HarnessS r#"{preamble} puts "__NYX_MIGRATION__: " + {ver:?} +def __nyx_migration_sqlish?(value) + text = value.to_s.upcase + ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'].any? {{ |k| text.include?(k) }} +end + +def __nyx_record_migration_result(value, driver) + return if value.nil? + return unless __nyx_migration_sqlish?(value) + __nyx_stub_sql_record(value, driver: driver, source: 'migration') +end + # ActiveRecord migrations expose `up` / `down` / `change` on a subclass. if Object.const_defined?({handler:?}) cls = Object.const_get({handler:?}) begin inst = cls.new + if inst.respond_to?(:execute, true) + original_execute = inst.method(:execute) + inst.define_singleton_method(:execute) do |sql, *args, &blk| + __nyx_record_migration_result(sql, 'active_record') + original_execute.call(sql, *args, &blk) + end + end %i[up change down].each do |m| if inst.respond_to?(m) begin result = inst.send(m) + __nyx_record_migration_result(result, 'active_record') print(result.to_s) if result rescue StandardError => e STDERR.puts("NYX_EXCEPTION: #{{e.class.name}}: #{{e.message}}") @@ -923,6 +942,7 @@ end if respond_to?({handler:?}.to_sym, true) begin result = send({handler:?}.to_sym) + __nyx_record_migration_result(result, 'ruby') print(result.to_s) if result rescue StandardError => e STDERR.puts("NYX_EXCEPTION: #{{e.class.name}}: #{{e.message}}") diff --git a/src/dynamic/spec.rs b/src/dynamic/spec.rs index 621961d8..ffc73820 100644 --- a/src/dynamic/spec.rs +++ b/src/dynamic/spec.rs @@ -1358,6 +1358,12 @@ fn stamp_framework_binding(spec: &mut HarnessSpec, binding: FrameworkBinding) { spec.stubs_required.push(kind); hash_material_changed = true; } + if matches!(binding.kind.tag(), crate::evidence::EntryKindTag::Migration) + && !spec.stubs_required.contains(&StubKind::Sql) + { + spec.stubs_required.push(StubKind::Sql); + hash_material_changed = true; + } spec.framework = Some(binding); if hash_material_changed { spec.spec_hash = compute_spec_hash(spec); @@ -2479,6 +2485,52 @@ mod tests { assert_ne!(pre_hash, spec.spec_hash); } + #[test] + fn spec_attach_framework_binding_stamps_migration_and_sets_sql_stub() { + let mut spec = HarnessSpec { + finding_id: "phase21migration0001".into(), + entry_file: "db/migrate/001.py".into(), + entry_name: "upgrade".into(), + entry_kind: EntryKind::Function, + lang: Lang::Python, + toolchain_id: "phase21".into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: crate::labels::Cap::CODE_EXEC, + constraint_hints: vec![], + sink_file: "db/migrate/001.py".into(), + sink_line: 1, + spec_hash: "phase21migration0001".into(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + framework: None, + java_toolchain: JavaToolchain::default(), + }; + let pre_hash = spec.spec_hash.clone(); + + let binding = FrameworkBinding { + adapter: "migration-django".to_owned(), + kind: EntryKind::Migration { + version: Some("001".to_owned()), + }, + route: None, + request_params: vec![], + response_writer: None, + middleware: vec![], + }; + stamp_framework_binding(&mut spec, binding); + + assert_eq!( + spec.entry_kind.tag(), + crate::evidence::EntryKindTag::Migration + ); + assert_eq!( + spec.stubs_required, + vec![crate::dynamic::stubs::StubKind::Sql], + "Migration specs must request the SQL runtime provider" + ); + assert_ne!(pre_hash, spec.spec_hash); + } + /// Companion guard: when the binding carries a legacy unit /// variant (`Function` / `HttpRoute`), the stamping branch keeps /// `spec.entry_kind` and `spec.spec_hash` unchanged. diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index 5ac9b6bc..64cb6ae9 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -54,13 +54,37 @@ impl BrokerStub { /// Java, Python, Node, Go, PHP, Ruby, and Rust harnesses can append /// it without a JSON dependency: /// - /// `topicpayload` + /// `actiontopicpayload` + /// + /// Older harnesses wrote `topicpayload`; `drain_events` + /// still accepts that form and treats it as a `publish` event. pub fn record_publish(&self, destination: &str, payload: &str) -> std::io::Result<()> { + self.record_event("publish", destination, payload) + } + + /// Record a broker delivery observation. + pub fn record_delivery(&self, destination: &str, payload: &str) -> std::io::Result<()> { + self.record_event("deliver", destination, payload) + } + + /// Record an ack/commit/delete observation. The `payload` field + /// carries the broker-specific ack token when one exists. + pub fn record_ack(&self, destination: &str, payload: &str) -> std::io::Result<()> { + self.record_event("ack", destination, payload) + } + + fn record_event(&self, action: &str, destination: &str, payload: &str) -> std::io::Result<()> { let mut f = OpenOptions::new() .append(true) .create(true) .open(&self.log_path)?; - writeln!(f, "{}\t{}", destination.replace('\t', " "), payload)?; + writeln!( + f, + "{}\t{}\t{}", + action.replace('\t', " "), + destination.replace('\t', " "), + payload + )?; Ok(()) } } @@ -111,12 +135,13 @@ impl StubProvider for BrokerStub { if line.is_empty() { continue; } - let (destination, payload) = line.split_once('\t').unwrap_or((line, "")); + let (action, destination, payload) = parse_broker_log_line(line); let event = StubEvent { kind: self.kind, captured_at_ns: monotonic_ns(), - summary: format!("publish {destination}"), + summary: format!("{action} {destination}"), detail: std::collections::BTreeMap::from([ + ("action".to_owned(), action.to_owned()), ("destination".to_owned(), destination.to_owned()), ("payload".to_owned(), payload.to_owned()), ]), @@ -128,6 +153,18 @@ impl StubProvider for BrokerStub { } } +fn parse_broker_log_line(line: &str) -> (&str, &str, &str) { + let Some((first, rest)) = line.split_once('\t') else { + return ("publish", line, ""); + }; + if matches!(first, "publish" | "deliver" | "ack" | "nack" | "retry") { + let (destination, payload) = rest.split_once('\t').unwrap_or((rest, "")); + (first, destination, payload) + } else { + ("publish", first, rest) + } +} + impl Drop for BrokerStub { fn drop(&mut self) { self.tempdir.take(); @@ -160,8 +197,34 @@ mod tests { assert_eq!(events.len(), 1); assert_eq!(events[0].kind, StubKind::Sqs); assert_eq!(events[0].summary, "publish queue-a"); + assert_eq!(events[0].detail.get("action").unwrap(), "publish"); assert_eq!(events[0].detail.get("destination").unwrap(), "queue-a"); assert_eq!(events[0].detail.get("payload").unwrap(), "NYX_PWN_CMDI"); assert!(stub.drain_events().is_empty(), "drain cursor must advance"); } + + #[test] + fn broker_drain_understands_delivery_and_ack_events() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); + stub.record_delivery("orders", "payload-1").unwrap(); + stub.record_ack("orders", "offset-1").unwrap(); + let events = stub.drain_events(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].summary, "deliver orders"); + assert_eq!(events[1].summary, "ack orders"); + assert_eq!(events[1].detail.get("payload").unwrap(), "offset-1"); + } + + #[test] + fn broker_drain_preserves_legacy_two_field_publish_lines() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + std::fs::write(stub.log_path(), "work\tlegacy payload\n").unwrap(); + let events = stub.drain_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].summary, "publish work"); + assert_eq!(events[0].detail.get("action").unwrap(), "publish"); + assert_eq!(events[0].detail.get("payload").unwrap(), "legacy payload"); + } } diff --git a/src/dynamic/stubs/broker_kafka.rs b/src/dynamic/stubs/broker_kafka.rs index f4bc0c22..1a517412 100644 --- a/src/dynamic/stubs/broker_kafka.rs +++ b/src/dynamic/stubs/broker_kafka.rs @@ -3,19 +3,17 @@ //! The Phase 20 acceptance gate runs every per-lang `MessageHandler` harness //! inside an in-process loopback broker — no real Kafka cluster, no //! external network — so the per-lang harness can publish the spec's -//! payload onto a topic and observe the handler under test receive it -//! synchronously. Each `broker_kafka` source snippet declares a tiny -//! `NyxKafkaLoopback` type whose `publish(topic, payload)` immediately -//! routes the bytes through the subscriber callback the harness has -//! registered. No threads, no sockets, no async runtime: a single -//! synchronous in-process dispatch keeps Phase 10's 500 ms boot budget -//! intact when `stubs_required` is empty. +//! payload onto a topic, poll the topic, dispatch the record, and commit +//! the offset. No threads, no sockets, no async runtime: a single +//! synchronous publish/poll/commit cycle keeps Phase 10's 500 ms boot +//! budget intact when `stubs_required` is empty while still exercising +//! the consumer-loop shape real Kafka handlers depend on. //! //! The snippet shape mirrors [`crate::dynamic::stubs::mocks::mock_source`] — //! per-language inline source returned as a `&'static str` so the -//! generated harness can splice it verbatim into its own source. The +//! generated harness can splice it verbatim into its own source. The //! per-language harness emitter is responsible for instantiating the -//! loopback and invoking the registered handler with the payload. +//! loopback, publishing, polling, and committing records. use crate::symbol::Lang; @@ -28,33 +26,84 @@ pub const KAFKA_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:kafka"; /// Returns `""` when the language has no harness-level Kafka adapter /// (everything outside Java / Python today). The snippet does *not* /// emit a publish marker by itself; the per-lang harness emitter calls -/// `publish(topic, payload)` and prints the marker once. +/// `publish(topic, payload)`, polls, and prints the marker once. pub fn kafka_source(lang: Lang) -> &'static str { match lang { Lang::Python => { r#" class NyxKafkaLoopback: - """In-process Kafka loopback — no socket, no thread, no broker.""" + """In-process Kafka loopback with publish/poll/commit semantics.""" def __init__(self): self._subs = {} + self._topics = {} + self._offsets = {} + self._committed = {} def subscribe(self, topic, cb): self._subs.setdefault(topic, []).append(cb) + def _next_offset(self, topic): + off = self._offsets.get(topic, 0) + self._offsets[topic] = off + 1 + return off def publish(self, topic, payload): - for cb in self._subs.get(topic, []): - cb(payload) + rec = NyxKafkaRecord(topic, payload, self._next_offset(topic)) + self._topics.setdefault(topic, []).append(rec) + return rec + def poll(self, topic, max_records=1, timeout_ms=0): + _ = timeout_ms + return list(self._topics.get(topic, [])[:max_records]) + def commit(self, record): + self._committed[record.topic] = max(self._committed.get(record.topic, -1), record.offset) + self._topics[record.topic] = [ + r for r in self._topics.get(record.topic, []) if r.offset > record.offset + ] + +class NyxKafkaRecord: + def __init__(self, topic, value, offset): + self.topic = topic + self.value = value + self.offset = offset + self.key = None + def __str__(self): + return str(self.value) "# } Lang::Java => { r#" + static class NyxKafkaRecord { + public final String topic; + public final String value; + public final long offset; + NyxKafkaRecord(String topic, String value, long offset) { + this.topic = topic; + this.value = value; + this.offset = offset; + } + public String toString() { return value; } + } + static class NyxKafkaLoopback { private final java.util.Map>> subs = new java.util.HashMap<>(); + private final java.util.Map> topics = new java.util.HashMap<>(); + private final java.util.Map offsets = new java.util.HashMap<>(); + private final java.util.Map committed = new java.util.HashMap<>(); public void subscribe(String topic, java.util.function.Consumer cb) { subs.computeIfAbsent(topic, k -> new java.util.ArrayList<>()).add(cb); } - public void publish(String topic, String payload) { - for (java.util.function.Consumer cb : subs.getOrDefault(topic, java.util.Collections.emptyList())) { - cb.accept(payload); - } + public NyxKafkaRecord publish(String topic, String payload) { + long off = offsets.getOrDefault(topic, 0L); + offsets.put(topic, off + 1L); + NyxKafkaRecord rec = new NyxKafkaRecord(topic, payload, off); + topics.computeIfAbsent(topic, k -> new java.util.ArrayList<>()).add(rec); + return rec; + } + public java.util.List poll(String topic, int maxRecords) { + java.util.List q = topics.getOrDefault(topic, java.util.Collections.emptyList()); + return new java.util.ArrayList<>(q.subList(0, Math.min(maxRecords, q.size()))); + } + public void commit(NyxKafkaRecord rec) { + committed.put(rec.topic, Math.max(committed.getOrDefault(rec.topic, -1L), rec.offset)); + java.util.List q = topics.getOrDefault(rec.topic, new java.util.ArrayList<>()); + q.removeIf(r -> r.offset <= rec.offset); } } "# @@ -76,16 +125,20 @@ mod tests { fn python_snippet_declares_loopback_class() { let src = kafka_source(Lang::Python); assert!(src.contains("class NyxKafkaLoopback")); + assert!(src.contains("class NyxKafkaRecord")); assert!(src.contains("def publish")); - assert!(src.contains("def subscribe")); + assert!(src.contains("def poll")); + assert!(src.contains("def commit")); } #[test] fn java_snippet_declares_static_inner_class() { let src = kafka_source(Lang::Java); + assert!(src.contains("static class NyxKafkaRecord")); assert!(src.contains("static class NyxKafkaLoopback")); - assert!(src.contains("public void publish")); - assert!(src.contains("public void subscribe")); + assert!(src.contains("public NyxKafkaRecord publish")); + assert!(src.contains("public java.util.List poll")); + assert!(src.contains("public void commit")); } #[test] diff --git a/src/dynamic/stubs/broker_sqs.rs b/src/dynamic/stubs/broker_sqs.rs index 4d19ae2b..686c0f5d 100644 --- a/src/dynamic/stubs/broker_sqs.rs +++ b/src/dynamic/stubs/broker_sqs.rs @@ -3,8 +3,9 @@ //! Mirrors [`crate::dynamic::stubs::broker_kafka`] but mints SQS-shaped //! envelopes (`MessageId`, `ReceiptHandle`, `Body`) the way `boto3.sqs` / //! `software.amazon.awssdk.services.sqs` / the AWS Node SDK present -//! them. The loopback never speaks the AWS protocol — it just calls -//! the registered handler synchronously with a single-message envelope. +//! them. The loopback never speaks the AWS protocol, but it does model +//! the shape the harness cares about: send, receive, receipt-handle +//! delete, and bounded redelivery for messages that are not acked. use crate::symbol::Lang; @@ -19,10 +20,12 @@ pub fn sqs_source(lang: Lang) -> &'static str { Lang::Python => { r#" class NyxSqsLoopback: - """In-process SQS loopback — boto3-shaped envelopes.""" + """In-process SQS loopback with receive/delete semantics.""" def __init__(self): self._subs = {} self._mid = 0 + self._queues = {} + self._inflight = {} def subscribe(self, queue, cb): self._subs.setdefault(queue, []).append(cb) def publish(self, queue, payload): @@ -31,28 +34,66 @@ class NyxSqsLoopback: 'MessageId': f'nyx-{self._mid:08d}', 'ReceiptHandle': f'rh-nyx-{self._mid:08d}', 'Body': payload, + 'Attributes': {'ApproximateReceiveCount': '0'}, } - for cb in self._subs.get(queue, []): - cb(envelope) + self._queues.setdefault(queue, []).append(envelope) + return envelope + def receive_message(self, queue, max_number=1, visibility_timeout=0): + _ = visibility_timeout + out = [] + pending = self._queues.setdefault(queue, []) + while pending and len(out) < max_number: + msg = pending.pop(0) + count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', '0')) + 1 + msg.setdefault('Attributes', {})['ApproximateReceiveCount'] = str(count) + self._inflight[msg['ReceiptHandle']] = (queue, msg) + out.append(msg) + return out + def delete_message(self, queue, receipt_handle): + _ = queue + return self._inflight.pop(receipt_handle, None) is not None + def replay_inflight(self, max_receive_count=3): + for receipt, (queue, msg) in list(self._inflight.items()): + count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', '0')) + if count < max_receive_count: + self._queues.setdefault(queue, []).append(msg) + self._inflight.pop(receipt, None) "# } Lang::Java => { r#" static class NyxSqsLoopback { private final java.util.Map>>> subs = new java.util.HashMap<>(); + private final java.util.Map>> queues = new java.util.HashMap<>(); + private final java.util.Map> inflight = new java.util.HashMap<>(); private int mid = 0; public void subscribe(String queue, java.util.function.Consumer> cb) { subs.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(cb); } - public void publish(String queue, String payload) { + public java.util.Map publish(String queue, String payload) { mid += 1; java.util.Map envelope = new java.util.HashMap<>(); envelope.put("MessageId", "nyx-" + mid); envelope.put("ReceiptHandle", "rh-nyx-" + mid); envelope.put("Body", payload); - for (java.util.function.Consumer> cb : subs.getOrDefault(queue, java.util.Collections.emptyList())) { - cb.accept(envelope); + envelope.put("ApproximateReceiveCount", "0"); + queues.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(envelope); + return envelope; + } + public java.util.List> receiveMessage(String queue, int maxMessages) { + java.util.List> pending = queues.computeIfAbsent(queue, k -> new java.util.ArrayList<>()); + java.util.List> out = new java.util.ArrayList<>(); + while (!pending.isEmpty() && out.size() < maxMessages) { + java.util.Map msg = pending.remove(0); + int count = Integer.parseInt(msg.getOrDefault("ApproximateReceiveCount", "0")) + 1; + msg.put("ApproximateReceiveCount", Integer.toString(count)); + inflight.put(msg.get("ReceiptHandle"), msg); + out.add(msg); } + return out; + } + public boolean deleteMessage(String queue, String receiptHandle) { + return inflight.remove(receiptHandle) != null; } } "# @@ -60,7 +101,7 @@ class NyxSqsLoopback: Lang::JavaScript | Lang::TypeScript => { r#" class NyxSqsLoopback { - constructor() { this._subs = new Map(); this._mid = 0; } + constructor() { this._subs = new Map(); this._mid = 0; this._queues = new Map(); this._inflight = new Map(); } subscribe(queue, cb) { if (!this._subs.has(queue)) this._subs.set(queue, []); this._subs.get(queue).push(cb); @@ -71,8 +112,38 @@ class NyxSqsLoopback { MessageId: 'nyx-' + this._mid, ReceiptHandle: 'rh-nyx-' + this._mid, Body: payload, + Attributes: { ApproximateReceiveCount: '0' }, }; - for (const cb of (this._subs.get(queue) || [])) cb(envelope); + if (!this._queues.has(queue)) this._queues.set(queue, []); + this._queues.get(queue).push(envelope); + return envelope; + } + receiveMessage(queue, maxMessages = 1, visibilityTimeout = 0) { + void visibilityTimeout; + const pending = this._queues.get(queue) || []; + const out = []; + while (pending.length > 0 && out.length < maxMessages) { + const msg = pending.shift(); + const count = Number((msg.Attributes && msg.Attributes.ApproximateReceiveCount) || '0') + 1; + msg.Attributes = Object.assign({}, msg.Attributes || {}, { ApproximateReceiveCount: String(count) }); + this._inflight.set(msg.ReceiptHandle, { queue, msg }); + out.push(msg); + } + return out; + } + deleteMessage(queue, receiptHandle) { + void queue; + return this._inflight.delete(receiptHandle); + } + replayInflight(maxReceiveCount = 3) { + for (const [receipt, item] of Array.from(this._inflight.entries())) { + const count = Number((item.msg.Attributes && item.msg.Attributes.ApproximateReceiveCount) || '0'); + if (count < maxReceiveCount) { + if (!this._queues.has(item.queue)) this._queues.set(item.queue, []); + this._queues.get(item.queue).push(item.msg); + } + this._inflight.delete(receipt); + } } } "# @@ -97,6 +168,8 @@ mod tests { assert!(src.contains("MessageId")); assert!(src.contains("ReceiptHandle")); assert!(src.contains("Body")); + assert!(src.contains("receive_message")); + assert!(src.contains("delete_message")); } #[test] @@ -105,6 +178,8 @@ mod tests { assert!(src.contains("static class NyxSqsLoopback")); assert!(src.contains("MessageId")); assert!(src.contains("Body")); + assert!(src.contains("receiveMessage")); + assert!(src.contains("deleteMessage")); } #[test] @@ -113,6 +188,8 @@ mod tests { assert!(src.contains("class NyxSqsLoopback")); assert!(src.contains("subscribe(queue")); assert!(src.contains("publish(queue")); + assert!(src.contains("receiveMessage(queue")); + assert!(src.contains("deleteMessage(queue")); let ts = sqs_source(Lang::TypeScript); assert_eq!(ts, src); } diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index 33c7251d..6971be53 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -156,6 +156,10 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("subscribe")); + assert!(h.source.contains("poll")); + assert!(h.source.contains("commit")); + assert!(h.source.contains("\"deliver\"")); + assert!(h.source.contains("\"ack\"")); assert!(h.source.contains("__NYX_BROKER_PUBLISH__")); assert!(h.source.contains("NYX_KAFKA_LOG")); assert!(h.source.contains("_nyx_record_broker_publish")); @@ -169,6 +173,10 @@ fn message_handler_java_emits_reflective_dispatch() { assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("Class.forName")); assert!(h.source.contains("getDeclaredMethod")); + assert!(h.source.contains("brokerRef.poll")); + assert!(h.source.contains("brokerRef.commit")); + assert!(h.source.contains("\"deliver\"")); + assert!(h.source.contains("\"ack\"")); assert!(h.source.contains("NYX_KAFKA_LOG")); assert!(h.source.contains("nyxRecordBrokerPublish")); } @@ -178,7 +186,10 @@ fn message_handler_node_uses_sqs_loopback() { let spec = make_spec(Lang::JavaScript, "jobs", "handler", entry_file("sqs_node")); let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("NyxSqsLoopback")); - assert!(h.source.contains("subscribe")); + assert!(h.source.contains("receiveMessage")); + assert!(h.source.contains("deleteMessage")); + assert!(h.source.contains("'deliver'")); + assert!(h.source.contains("'ack'")); assert!(h.source.contains("__NYX_BROKER_PUBLISH__:sqs")); assert!(h.source.contains("NYX_SQS_LOG")); assert!(h.source.contains("_nyxRecordBrokerPublish")); @@ -550,11 +561,13 @@ mod e2e_phase_20 { use nyx_scanner::dynamic::spec::{ EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy, default_toolchain_id, }; + use nyx_scanner::dynamic::stubs::{StubHarness, StubKind}; use nyx_scanner::evidence::DifferentialVerdict; use nyx_scanner::labels::Cap; use nyx_scanner::symbol::Lang; use std::path::PathBuf; use std::process::Command; + use std::sync::Arc; use tempfile::TempDir; fn command_available(bin: &str) -> bool { @@ -592,6 +605,17 @@ mod e2e_phase_20 { } } + fn broker_stub_for_adapter(adapter: &str) -> StubKind { + match adapter.split_once('-').map(|(broker, _)| broker) { + Some("kafka") => StubKind::Kafka, + Some("sqs") => StubKind::Sqs, + Some("pubsub") => StubKind::Pubsub, + Some("rabbit") => StubKind::Rabbit, + Some("nats") => StubKind::Nats, + _ => panic!("adapter {adapter} is not a broker adapter"), + } + } + fn build_spec( lang: Lang, fixture_dir: &str, @@ -624,6 +648,7 @@ mod e2e_phase_20 { } let adapter = adapter_for(fixture_dir); + let stub_kind = broker_stub_for_adapter(adapter); let framework = Some(nyx_scanner::dynamic::framework::FrameworkBinding { adapter: adapter.to_owned(), kind: EntryKind::MessageHandler { @@ -653,7 +678,7 @@ mod e2e_phase_20 { sink_line: 1, spec_hash: spec_hash.clone(), derivation: SpecDerivationStrategy::FromFlowSteps, - stubs_required: vec![], + stubs_required: vec![stub_kind], framework, java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), }; @@ -675,8 +700,19 @@ mod e2e_phase_20 { } let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let (spec, _tmp) = build_spec(lang, fixture_dir, fixture_file, handler, queue); + let stub_workdir = TempDir::new().expect("create broker stub tempdir"); + let stub_harness = Arc::new( + StubHarness::start(&spec.stubs_required, stub_workdir.path()) + .expect("start broker stub harness"), + ); + let mut extra_env = Vec::new(); + for (name, value) in stub_harness.endpoints() { + extra_env.push((name.to_owned(), value)); + } let opts = SandboxOptions { backend: nyx_scanner::dynamic::sandbox::SandboxBackend::Process, + extra_env, + stub_harness: Some(stub_harness), ..SandboxOptions::default() }; match run_spec(&spec, &opts) { diff --git a/tests/phase21_corpus.rs b/tests/phase21_corpus.rs index 2c010692..f853098e 100644 --- a/tests/phase21_corpus.rs +++ b/tests/phase21_corpus.rs @@ -28,12 +28,14 @@ use nyx_scanner::dynamic::sandbox::{SandboxBackend, SandboxOptions}; use nyx_scanner::dynamic::spec::{ EntryKind, EntryKindTag, HarnessSpec, PayloadSlot, SpecDerivationStrategy, default_toolchain_id, }; +use nyx_scanner::dynamic::stubs::{StubHarness, StubKind}; use nyx_scanner::evidence::DifferentialVerdict; use nyx_scanner::evidence::EntryKind as EvEntryKind; use nyx_scanner::labels::Cap; use nyx_scanner::summary::ssa_summary::SsaFuncSummary; use nyx_scanner::summary::{CalleeSite, FuncSummary}; use nyx_scanner::symbol::Lang; +use std::sync::Arc; use tempfile::TempDir; fn make_spec(lang: Lang, kind: EvEntryKind, entry_name: &str, entry_file: &str) -> HarnessSpec { @@ -906,6 +908,8 @@ fn migration_python_harness_carries_sentinel_and_handler() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("\"upgrade\"")); + assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("NYX_SQL_ENDPOINT")); } #[test] @@ -919,6 +923,8 @@ fn migration_js_harness_carries_sentinel_and_handler() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("\"up\"")); + assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("global.__nyx_prisma")); } #[test] @@ -932,6 +938,7 @@ fn migration_ruby_harness_carries_sentinel_and_handler() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("AddIndex")); + assert!(h.source.contains("__nyx_stub_sql_record")); } #[test] @@ -945,6 +952,7 @@ fn migration_php_harness_carries_sentinel_and_handler() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("AddUsers")); + assert!(h.source.contains("__nyx_stub_sql_record")); } #[test] @@ -1378,7 +1386,7 @@ fn build_runspec_case(case: RunSpecCase, file_name: &str) -> (HarnessSpec, TempD sink_line: 1, spec_hash, derivation: SpecDerivationStrategy::FromFlowSteps, - stubs_required: vec![], + stubs_required: StubKind::for_cap(case.cap), framework: None, java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), }; @@ -1391,11 +1399,23 @@ fn run_phase21_case(case: RunSpecCase, file_name: &str) -> Option { eprintln!("SKIP {} {file_name}: missing toolchain {bin}", case.name); return None; } - let (spec, _tmp) = build_runspec_case(case, file_name); - let opts = SandboxOptions { + let (spec, tmp) = build_runspec_case(case, file_name); + let mut opts = SandboxOptions { backend: SandboxBackend::Process, ..SandboxOptions::default() }; + let stub_harness = if spec.stubs_required.is_empty() { + None + } else { + let h = Arc::new( + StubHarness::start(&spec.stubs_required, tmp.path()).expect("start phase21 stubs"), + ); + for (name, value) in h.endpoints() { + opts.extra_env.push((name.to_owned(), value)); + } + Some(h) + }; + opts.stub_harness = stub_harness; match run_spec(&spec, &opts) { Ok(outcome) => Some(outcome), Err(RunError::BuildFailed { stderr, attempts }) => {