diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 60897f60..76fba69a 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -2155,12 +2155,65 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc let handler = &spec.entry_name; let broker = go_broker_for_adapter(spec); - let (broker_src, publish_marker, dispatch) = match broker { + let (broker_src, publish_marker, broker_imports, broker_helpers, dispatch) = match broker { GoBroker::Nats => ( crate::dynamic::stubs::nats_source(crate::symbol::Lang::Go), crate::dynamic::stubs::NATS_PUBLISH_MARKER, + "\tnats \"github.com/nats-io/nats.go\"\n", + r##" +func nyxTryRealNats(subject string, payload string, dispatcher func(interface{}), marker string) bool { + endpoint := os.Getenv("NYX_NATS_ENDPOINT") + if !(strings.HasPrefix(endpoint, "nats://") || strings.HasPrefix(endpoint, "tls://")) { + return false + } + nc, err := nats.Connect(endpoint, nats.Name("nyx-harness"), nats.Timeout(2*time.Second)) + if err != nil { + fmt.Fprintf(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: %v\n", err) + return false + } + defer nc.Close() + done := make(chan struct{}, 1) + sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { + natsMsg := &NyxNatsMsg{Subject: msg.Subject, Data: msg.Data, Reply: msg.Reply} + nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", subject, string(msg.Data)) + dispatcher(natsMsg) + nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", subject, msg.Subject) + select { + case done <- struct{}{}: + default: + } + }) + if err != nil { + fmt.Fprintf(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: %v\n", err) + return false + } + defer sub.Unsubscribe() + if err := nc.FlushTimeout(2 * time.Second); err != nil { + fmt.Fprintf(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: %v\n", err) + return false + } + fmt.Println(marker + " " + subject) + if err := nc.Publish(subject, []byte(payload)); err != nil { + fmt.Fprintf(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: %v\n", err) + return false + } + if err := nc.FlushTimeout(2 * time.Second); err != nil { + fmt.Fprintf(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: %v\n", err) + return false + } + select { + case <-done: + return true + case <-time.After(2 * time.Second): + fmt.Fprintln(os.Stderr, "NYX_NATS_CLIENT_FALLBACK: timeout waiting for delivery") + return false + } +} +"##, format!( - r##" if msg, ok := nyxFetchHttpBroker("NYX_NATS_ENDPOINT", "subjects", "{queue}", payload, "{publish_marker}"); ok {{ + r##" if nyxTryRealNats("{queue}", payload, nyxDispatch, "{publish_marker}") {{ + return + }} else if msg, ok := nyxFetchHttpBroker("NYX_NATS_ENDPOINT", "subjects", "{queue}", payload, "{publish_marker}"); ok {{ data := msg["data"] natsMsg := &NyxNatsMsg{{Subject: msg["subject"], Data: []byte(data), Reply: msg["reply"]}} if natsMsg.Subject == "" {{ @@ -2192,6 +2245,8 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc GoBroker::Pubsub => ( crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Go), crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, + "", + "", format!( r##" if msg, ok := nyxFetchHttpBroker("NYX_PUBSUB_ENDPOINT", "topics", "{queue}", payload, "{publish_marker}"); ok {{ data := msg["data"] @@ -2281,7 +2336,7 @@ import ( "syscall" "time" - "nyx-harness/entry" +{broker_imports} "nyx-harness/entry" ) {shim} @@ -2290,6 +2345,8 @@ import ( {dispatch_inner} +{broker_helpers} + func nyxPayload() string {{ if v := os.Getenv("NYX_PAYLOAD"); v != "" {{ return v @@ -2403,6 +2460,8 @@ func main() {{ }} "##, broker_src = broker_src, + broker_imports = broker_imports, + broker_helpers = broker_helpers, dispatch_inner = dispatch_inner, dispatch = dispatch, handler = handler, diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 71e2fe0f..33b2ab14 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -1495,7 +1495,8 @@ fn emit_migration(spec: &HarnessSpec, version: Option<&str>) -> HarnessSource { r#"# Shape: migration — Phase 21 / Track M.3. print("__NYX_MIGRATION__: " + {version:?}, flush=True) _h = getattr(_entry_mod, {handler:?}, None) -if _h is None: +_migration_cls = getattr(_entry_mod, "Migration", None) +if _h is None and _migration_cls is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) sys.exit(78) @@ -1504,6 +1505,7 @@ def _nyx_migration_sql_record(sql, driver): upper = text.upper() if not any(k in upper for k in ("SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP")): return + print("NYX_MIGRATION_SQL: " + text, flush=True) __nyx_stub_sql_record(text, driver=driver, source="migration") endpoint = os.environ.get("NYX_SQL_ENDPOINT", "") if endpoint: @@ -1521,11 +1523,38 @@ def _nyx_migration_sql_record(sql, driver): class _NyxMigrationOpProxy: def __init__(self, inner=None): self._inner = inner + def _call_inner(self, name, *args, **kwargs): + if self._inner is not None and self._inner is not self and hasattr(self._inner, name): + return getattr(self._inner, name)(*args, **kwargs) + return None def execute(self, sql, *args, **kwargs): _nyx_migration_sql_record(sql, "alembic") - if self._inner is not None and self._inner is not self and hasattr(self._inner, "execute"): - return self._inner.execute(sql, *args, **kwargs) - return None + return self._call_inner("execute", sql, *args, **kwargs) + def create_table(self, name, *args, **kwargs): + _nyx_migration_sql_record("CREATE TABLE " + str(name) + " (id INTEGER)", "alembic") + return self._call_inner("create_table", name, *args, **kwargs) + def drop_table(self, name, *args, **kwargs): + _nyx_migration_sql_record("DROP TABLE " + str(name), "alembic") + return self._call_inner("drop_table", name, *args, **kwargs) + def add_column(self, table_name, column, *args, **kwargs): + col_name = getattr(column, "name", column) + _nyx_migration_sql_record( + "ALTER TABLE " + str(table_name) + " ADD COLUMN " + str(col_name) + " TEXT", + "alembic", + ) + return self._call_inner("add_column", table_name, column, *args, **kwargs) + def drop_column(self, table_name, column_name, *args, **kwargs): + _nyx_migration_sql_record( + "ALTER TABLE " + str(table_name) + " DROP COLUMN " + str(column_name), + "alembic", + ) + return self._call_inner("drop_column", table_name, column_name, *args, **kwargs) + def alter_column(self, table_name, column_name, *args, **kwargs): + _nyx_migration_sql_record( + "ALTER TABLE " + str(table_name) + " ALTER COLUMN " + str(column_name), + "alembic", + ) + return self._call_inner("alter_column", table_name, column_name, *args, **kwargs) def __getattr__(self, name): if self._inner is not None and self._inner is not self: return getattr(self._inner, name) @@ -1572,9 +1601,7 @@ def _nyx_record_migration_result(result): elif isinstance(result, str): _nyx_migration_sql_record(result, "migration") elif hasattr(result, "database_forwards"): - sql = getattr(result, "sql", None) - if sql is not None: - _nyx_migration_sql_record(sql, "django") + _nyx_record_django_operation_shape(result) try: from django.conf import settings if not settings.configured: @@ -1592,21 +1619,59 @@ def _nyx_record_migration_result(result): except Exception: pass +def _nyx_record_django_operation_shape(op): + sql = getattr(op, "sql", None) + if sql is not None: + _nyx_migration_sql_record(sql, "django") + return + name = op.__class__.__name__ + if name == "CreateModel": + model = getattr(op, "name", "nyx_model") + _nyx_migration_sql_record("CREATE TABLE " + str(model) + " (id INTEGER)", "django") + elif name == "DeleteModel": + model = getattr(op, "name", "nyx_model") + _nyx_migration_sql_record("DROP TABLE " + str(model), "django") + elif name in ("AddField", "RemoveField", "AlterField"): + model = getattr(op, "model_name", "nyx_model") + field = getattr(op, "name", "nyx_field") + verb = "ADD COLUMN" if name == "AddField" else ("DROP COLUMN" if name == "RemoveField" else "ALTER COLUMN") + _nyx_migration_sql_record("ALTER TABLE " + str(model) + " " + verb + " " + str(field), "django") + +def _nyx_run_django_migration_operations(cls): + if cls is None: + return False + operations = getattr(cls, "operations", None) + if operations is None: + try: + operations = cls().operations + except Exception: + operations = None + if not operations: + return False + for op in list(operations): + _nyx_record_migration_result(op) + return True + try: _nyx_install_migration_sql_hooks() - # Migrations conventionally take no arguments; pass payload if the - # function declares positional params (best-effort introspection). - import inspect - sig = None - try: - sig = inspect.signature(_h) - except (TypeError, ValueError): - sig = None - if sig is not None and len(sig.parameters) >= 1: - _result = _h(payload) + _result = None + if _h is _migration_cls or ({handler:?} == "Migration" and _migration_cls is not None): + _nyx_run_django_migration_operations(_migration_cls) else: - _result = _h() - _nyx_record_migration_result(_result) + # Migrations conventionally take no arguments; pass payload if the + # function declares positional params (best-effort introspection). + import inspect + sig = None + try: + sig = inspect.signature(_h) + except (TypeError, ValueError): + sig = None + if sig is not None and len(sig.parameters) >= 1: + _result = _h(payload) + else: + _result = _h() + _nyx_record_migration_result(_result) + _nyx_run_django_migration_operations(_migration_cls) if _result is not None: try: print(str(_result), flush=True) diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index d5836dde..00d39555 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -910,6 +910,11 @@ fn rewrite_extra_env_for_container( { return (k.clone(), format!("http://host-gateway:{rest}")); } + if k == "NYX_NATS_ENDPOINT" + && let Some(rest) = v.strip_prefix("nats://127.0.0.1:") + { + return (k.clone(), format!("nats://host-gateway:{rest}")); + } (k.clone(), v.clone()) }) .collect() @@ -2301,7 +2306,7 @@ mod tests { ), ( "NYX_NATS_ENDPOINT".to_owned(), - "http://127.0.0.1:56789/subjects".to_owned(), + "nats://127.0.0.1:56789".to_owned(), ), ]; let out = rewrite_extra_env_for_container(&extra, &[]); @@ -2330,7 +2335,7 @@ mod tests { ), ( "NYX_NATS_ENDPOINT".to_owned(), - "http://host-gateway:56789/subjects".to_owned(), + "nats://host-gateway:56789".to_owned(), ), ] ); diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index aa52631e..b4cd4768 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -33,6 +33,7 @@ pub struct BrokerStub { kafka_listener: Option, sqs_listener: Option, http_listener: Option, + nats_listener: Option, } impl BrokerStub { @@ -54,12 +55,16 @@ impl BrokerStub { } else { None }; - let http_listener = if matches!(kind, StubKind::Pubsub | StubKind::Rabbit | StubKind::Nats) - { + let http_listener = if matches!(kind, StubKind::Pubsub | StubKind::Rabbit) { start_http_broker_listener(kind, log_path.clone())? } else { None }; + let nats_listener = if kind == StubKind::Nats { + start_nats_listener(log_path.clone())? + } else { + None + }; Ok(Self { kind, tempdir: Some(tempdir), @@ -68,6 +73,7 @@ impl BrokerStub { kafka_listener, sqs_listener, http_listener, + nats_listener, }) } @@ -131,6 +137,9 @@ impl StubProvider for BrokerStub { if let Some(listener) = &self.http_listener { return format!("http://127.0.0.1:{}", listener.port); } + if let Some(listener) = &self.nats_listener { + return format!("nats://127.0.0.1:{}", listener.port); + } format!("loopback://{}", self.kind.tag()) } @@ -215,6 +224,10 @@ impl Drop for BrokerStub { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); } + if let Some(listener) = &self.nats_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } self.tempdir.take(); } } @@ -675,6 +688,219 @@ fn http_broker_message_json( } } +#[derive(Debug)] +struct NatsListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Clone)] +struct NatsSubscriber { + sid: String, + writer: Arc>, +} + +#[derive(Debug, Default)] +struct NatsState { + subscribers: BTreeMap>, +} + +fn start_nats_listener(log_path: PathBuf) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(NatsState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || { + nats_accept_loop(listener, shutdown_clone, state_clone, log_path, port) + }); + Ok(Some(NatsListener { port, shutdown })) +} + +fn nats_accept_loop( + listener: TcpListener, + shutdown: Arc, + state: Arc>, + log_path: PathBuf, + port: u16, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(5))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(5))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || handle_nats_connection(stream, state, &log_path, port)); + } +} + +fn handle_nats_connection( + mut stream: TcpStream, + state: Arc>, + log_path: &Path, + port: u16, +) { + let info = format!( + concat!( + "INFO {{", + r#""server_id":"nyx","#, + r#""server_name":"nyx-broker-stub","#, + r#""version":"0.0.0","#, + r#""proto":1,"#, + r#""go":"rust","#, + r#""host":"127.0.0.1","#, + r#""port":{port},"#, + r#""headers":false,"#, + r#""auth_required":false,"#, + r#""tls_required":false,"#, + r#""max_payload":1048576"#, + "}}\r\n" + ), + port = port + ); + if stream.write_all(info.as_bytes()).is_err() { + return; + } + let writer = match stream.try_clone() { + Ok(stream) => Arc::new(Mutex::new(stream)), + Err(_) => return, + }; + let mut reader = BufReader::new(stream); + let mut owned_sids = Vec::new(); + let mut line = String::new(); + loop { + line.clear(); + let Ok(n) = reader.read_line(&mut line) else { + break; + }; + if n == 0 { + break; + } + let trimmed = line.trim_end_matches(['\r', '\n']); + if trimmed.is_empty() { + continue; + } + let mut parts = trimmed.split_whitespace(); + let Some(command) = parts.next() else { + continue; + }; + match command.to_ascii_uppercase().as_str() { + "CONNECT" => { + let _ = nats_write(&writer, b"+OK\r\n"); + } + "PING" => { + let _ = nats_write(&writer, b"PONG\r\n"); + } + "PONG" | "+OK" => {} + "SUB" => { + let Some(subject) = parts.next() else { + let _ = nats_write(&writer, b"-ERR 'missing subject'\r\n"); + continue; + }; + let fields: Vec<&str> = parts.collect(); + let Some(sid) = fields.last() else { + let _ = nats_write(&writer, b"-ERR 'missing sid'\r\n"); + continue; + }; + if let Ok(mut guard) = state.lock() { + guard + .subscribers + .entry(subject.to_owned()) + .or_default() + .push(NatsSubscriber { + sid: (*sid).to_owned(), + writer: Arc::clone(&writer), + }); + owned_sids.push((*sid).to_owned()); + } + } + "UNSUB" => { + if let Some(sid) = parts.next() { + nats_remove_subscription(&state, sid); + } + } + "PUB" => { + let Some(subject) = parts.next() else { + let _ = nats_write(&writer, b"-ERR 'missing subject'\r\n"); + continue; + }; + let fields: Vec<&str> = parts.collect(); + let Some(size_str) = fields.last() else { + let _ = nats_write(&writer, b"-ERR 'missing size'\r\n"); + continue; + }; + let Ok(size) = size_str.parse::() else { + let _ = nats_write(&writer, b"-ERR 'bad size'\r\n"); + continue; + }; + if size > 1024 * 1024 { + let _ = nats_write(&writer, b"-ERR 'payload too large'\r\n"); + break; + } + let mut payload = vec![0_u8; size]; + if reader.read_exact(&mut payload).is_err() { + break; + } + let mut crlf = [0_u8; 2]; + if reader.read_exact(&mut crlf).is_err() { + break; + } + let payload_text = String::from_utf8_lossy(&payload).into_owned(); + let _ = append_broker_event(log_path, "publish", subject, &payload_text); + nats_deliver(&state, log_path, subject, &payload); + } + _ => { + let _ = nats_write(&writer, b"-ERR 'unknown command'\r\n"); + } + } + } + for sid in owned_sids { + nats_remove_subscription(&state, &sid); + } +} + +fn nats_write(writer: &Arc>, bytes: &[u8]) -> std::io::Result<()> { + let mut guard = writer + .lock() + .map_err(|_| std::io::Error::other("nats writer poisoned"))?; + guard.write_all(bytes) +} + +fn nats_deliver(state: &Arc>, log_path: &Path, subject: &str, payload: &[u8]) { + let subscribers = state + .lock() + .ok() + .and_then(|guard| guard.subscribers.get(subject).cloned()) + .unwrap_or_default(); + let payload_text = String::from_utf8_lossy(payload).into_owned(); + for subscriber in subscribers { + let header = format!("MSG {subject} {} {}\r\n", subscriber.sid, payload.len()); + if nats_write(&subscriber.writer, header.as_bytes()) + .and_then(|_| nats_write(&subscriber.writer, payload)) + .and_then(|_| nats_write(&subscriber.writer, b"\r\n")) + .is_ok() + { + let _ = append_broker_event(log_path, "deliver", subject, &payload_text); + } + } +} + +fn nats_remove_subscription(state: &Arc>, sid: &str) { + if let Ok(mut guard) = state.lock() { + for subscribers in guard.subscribers.values_mut() { + subscribers.retain(|subscriber| subscriber.sid != sid); + } + } +} + fn split_target(target: &str) -> (String, String) { let (path, query) = target.split_once('?').unwrap_or((target, "")); (path.to_owned(), query.to_owned()) @@ -977,7 +1203,7 @@ mod tests { #[test] fn remaining_brokers_expose_http_emulators() { - for kind in [StubKind::Pubsub, StubKind::Rabbit, StubKind::Nats] { + for kind in [StubKind::Pubsub, StubKind::Rabbit] { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(kind, dir.path()).unwrap(); let endpoint = stub.endpoint(); @@ -991,6 +1217,20 @@ mod tests { } } + #[test] + fn nats_broker_exposes_protocol_endpoint() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Nats, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://nats" { + return; + } + assert!( + endpoint.starts_with("nats://127.0.0.1:"), + "NATS endpoint should be a protocol-compatible endpoint, got {endpoint}" + ); + } + #[test] fn kafka_http_emulator_records_publish_deliver_ack() { let dir = TempDir::new().unwrap(); @@ -1080,7 +1320,6 @@ mod tests { let cases = [ (StubKind::Pubsub, "topics", "projects/p/topics/orders"), (StubKind::Rabbit, "queues", "work"), - (StubKind::Nats, "subjects", "events"), ]; for (kind, root, destination) in cases { let dir = TempDir::new().unwrap(); @@ -1138,6 +1377,50 @@ mod tests { } } + #[test] + fn nats_protocol_server_records_publish_deliver() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Nats, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://nats" { + return; + } + let port: u16 = endpoint + .trim_start_matches("nats://127.0.0.1:") + .parse() + .unwrap(); + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let mut reader = BufReader::new(s.try_clone().unwrap()); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + assert!(line.starts_with("INFO "), "{line}"); + + s.write_all(b"CONNECT {\"verbose\":false}\r\nPING\r\n") + .unwrap(); + let handshake = read_until(&mut reader, "PONG\r\n"); + assert!(handshake.contains("PONG"), "{handshake}"); + + s.write_all(b"SUB events 1\r\nPING\r\n").unwrap(); + let flush = read_until(&mut reader, "PONG\r\n"); + assert!(flush.contains("PONG"), "{flush}"); + + s.write_all(b"PUB events 11\r\nhello world\r\n").unwrap(); + let delivery = read_until(&mut reader, "hello world\r\n"); + assert!( + delivery.contains("MSG events 1 11\r\nhello world\r\n"), + "{delivery:?}" + ); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver"]); + assert_eq!(events[0].detail.get("destination").unwrap(), "events"); + assert_eq!(events[1].detail.get("payload").unwrap(), "hello world"); + } + #[test] fn broker_drain_understands_delivery_and_ack_events() { let dir = TempDir::new().unwrap(); @@ -1189,6 +1472,29 @@ mod tests { response.split("\r\n\r\n").nth(1).unwrap_or("") } + fn read_until(reader: &mut BufReader, needle: &str) -> String { + let mut out = String::new(); + while !out.contains(needle) { + let mut line = String::new(); + let n = reader.read_line(&mut line).unwrap(); + if n == 0 { + break; + } + out.push_str(&line); + if line.starts_with("MSG ") { + let size = line + .split_whitespace() + .last() + .and_then(|s| s.parse::().ok()) + .unwrap(); + let mut payload = vec![0_u8; size + 2]; + reader.read_exact(&mut payload).unwrap(); + out.push_str(&String::from_utf8_lossy(&payload)); + } + } + out + } + fn form_escape(input: &str) -> String { let mut out = String::new(); for b in input.bytes() { diff --git a/tests/dynamic_fixtures/migration/django_ops/vuln.py b/tests/dynamic_fixtures/migration/django_ops/vuln.py new file mode 100644 index 00000000..34b1e584 --- /dev/null +++ b/tests/dynamic_fixtures/migration/django_ops/vuln.py @@ -0,0 +1,19 @@ +"""Phase 21 — Django Migration.operations runtime fixture.""" +_NYX_ADAPTER_MARKER = "from django.db import migrations" + +import os + + +class _RunSQL: + def __init__(self, sql): + self.sql = sql + + +class Migration: + operations = [ + _RunSQL( + "CREATE INDEX idx_" + + (os.environ.get("NYX_PAYLOAD") or "users") + + " ON users(name)" + ) + ] diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index d63a9d14..f47a21df 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -395,6 +395,27 @@ fn message_handler_remaining_brokers_try_http_emulators_before_loopback() { } } +#[test] +fn message_handler_nats_go_tries_real_client_before_fallbacks() { + let spec = make_spec_with_adapter( + Lang::Go, + "events", + "OnMessage", + entry_file("nats_go"), + "nats-go", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryRealNats")); + assert!(h.source.contains("github.com/nats-io/nats.go")); + assert!(h.source.contains("nats.Connect")); + assert!(h.source.contains("nc.Subscribe")); + assert!(h.source.contains("nc.Publish")); + assert!( + h.source.find("nyxTryRealNats").unwrap() < h.source.find("nyxFetchHttpBroker").unwrap(), + "nats-go should try the real protocol client before the HTTP fallback" + ); +} + // ── Framework-adapter assertions ────────────────────────────────────────────── fn ts_language_for(lang: Lang) -> tree_sitter::Language { diff --git a/tests/phase21_corpus.rs b/tests/phase21_corpus.rs index 9b471bf5..365b9914 100644 --- a/tests/phase21_corpus.rs +++ b/tests/phase21_corpus.rs @@ -911,6 +911,9 @@ fn migration_python_harness_carries_sentinel_and_handler() { assert!(h.source.contains("__nyx_stub_sql_record")); assert!(h.source.contains("MigrationContext.configure")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); + assert!(h.source.contains("def create_table")); + assert!(h.source.contains("def add_column")); + assert!(h.source.contains("_nyx_run_django_migration_operations")); } #[test] @@ -1582,6 +1585,27 @@ fn phase_21_vuln_fixtures_confirm_via_run_spec() { } } +#[test] +fn migration_django_operations_class_confirms_via_run_spec() { + let case = RunSpecCase { + name: "migration-django-operations", + lang: Lang::Python, + kind: migration_kind, + entry_name: "Migration", + fixture_dir: "tests/dynamic_fixtures/migration/django_ops", + vuln_file: "vuln.py", + benign_file: "vuln.py", + cap: Cap::SQL_QUERY, + }; + let Some(outcome) = run_phase21_case(case, case.vuln_file) else { + return; + }; + assert!( + outcome.triggered_by.is_some(), + "Django Migration.operations fixture must Confirm via run_spec; got {outcome:?}", + ); +} + #[test] fn phase_21_benign_fixtures_do_not_confirm_via_run_spec() { for case in RUNSPEC_CASES {