From 09032311894f663184896439ffcb638f6bbb2ce2 Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 27 May 2026 08:23:48 -0500 Subject: [PATCH] refactor(dynamic): enhance event recording across brokers, improve SQL migration handling for frameworks, update runtime dependency management, and add test coverage --- src/dynamic/framework/runtime_deps.rs | 14 +++-- src/dynamic/lang/go.rs | 19 ++++++- src/dynamic/lang/java.rs | 7 +++ src/dynamic/lang/js_shared.rs | 23 +++++++- src/dynamic/lang/php.rs | 5 ++ src/dynamic/lang/python.rs | 61 ++++++++++++++++++++- src/dynamic/lang/ruby.rs | 22 ++++++++ tests/message_handler_corpus.rs | 78 +++++++++++++++++++++++++++ tests/phase21_corpus.rs | 5 ++ 9 files changed, 225 insertions(+), 9 deletions(-) diff --git a/src/dynamic/framework/runtime_deps.rs b/src/dynamic/framework/runtime_deps.rs index 3bd6c179..532c323d 100644 --- a/src/dynamic/framework/runtime_deps.rs +++ b/src/dynamic/framework/runtime_deps.rs @@ -137,10 +137,16 @@ const NODE_PRISMA: &[VersionedPackage] = &[VersionedPackage { name: "@prisma/client", version: "^5.14.0", }]; -const NODE_SEQUELIZE: &[VersionedPackage] = &[VersionedPackage { - name: "sequelize", - version: "^6.37.3", -}]; +const NODE_SEQUELIZE: &[VersionedPackage] = &[ + VersionedPackage { + name: "sequelize", + version: "^6.37.3", + }, + VersionedPackage { + name: "sqlite3", + version: "^5.1.7", + }, +]; const RUBY_RACK: &[&str] = &["rack"]; const RUBY_SINATRA: &[&str] = &["rack", "sinatra"]; diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 6575b984..19803163 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -2162,7 +2162,9 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc format!( r##" broker := NewNyxNatsLoopback() broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{ + nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", "{queue}", string(msg.Data)) nyxDispatch(msg) + nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", "{queue}", msg.Subject) }}) fmt.Println("{publish_marker} " + "{queue}") nyxRecordBrokerPublish("NYX_NATS_LOG", "{queue}", payload) @@ -2177,7 +2179,10 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc format!( r##" broker := NewNyxPubsubLoopback() broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{ + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", "{queue}", string(msg.Data)) nyxDispatch(msg) + msg.Ack() + nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", "{queue}", msg.ID) }}) fmt.Println("{publish_marker} " + "{queue}") nyxRecordBrokerPublish("NYX_PUBSUB_LOG", "{queue}", payload) @@ -2261,7 +2266,7 @@ func nyxPayload() string {{ return "" }} -func nyxRecordBrokerPublish(envName string, destination string, payload string) {{ +func nyxRecordBrokerEvent(envName string, action string, destination string, payload string) {{ path := os.Getenv(envName) if path == "" {{ return @@ -2271,7 +2276,17 @@ func nyxRecordBrokerPublish(envName string, destination string, payload string) return }} defer f.Close() - _, _ = fmt.Fprintf(f, "%s\t%s\n", strings.ReplaceAll(destination, "\t", " "), payload) + _, _ = fmt.Fprintf( + f, + "%s\t%s\t%s\n", + strings.ReplaceAll(action, "\t", " "), + strings.ReplaceAll(destination, "\t", " "), + payload, + ) +}} + +func nyxRecordBrokerPublish(envName string, destination string, payload string) {{ + nyxRecordBrokerEvent(envName, "publish", destination, payload) }} func main() {{ diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index f02e00f2..20de0bfa 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3834,16 +3834,20 @@ fn emit_message_handler_harness( format!( r#" NyxRabbitChannel chan = new NyxRabbitChannel(); chan.basicConsume({queue:?}, (mid, body) -> {{ + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", {queue:?}, body); System.out.println("__NYX_SINK_HIT__"); + boolean success = false; try {{ java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class); m.setAccessible(true); m.invoke(entryInst, mid, body); + success = true; }} catch (NoSuchMethodException nsme) {{ try {{ java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); m2.setAccessible(true); m2.invoke(entryInst, body); + success = true; }} catch (Exception ie) {{ Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); @@ -3852,6 +3856,9 @@ fn emit_message_handler_harness( Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); }} + if (success) {{ + nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", {queue:?}, mid); + }} }}); System.out.println({publish_marker:?} + " " + {queue:?}); nyxRecordBrokerPublish("NYX_RABBIT_LOG", {queue:?}, payload); diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index c2b3dffd..d789552e 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -1231,8 +1231,24 @@ function _nyxMigrationSqlRecord(sql, driver) {{ const sqliteDriver = _nyxTryExecuteSqlite(sql); __nyx_stub_sql_record(String(sql), {{ driver: driver, source: 'migration', sqlite_driver: sqliteDriver }}); }} +function _nyxTryRealSequelize() {{ + try {{ + const SequelizeLib = require('sequelize'); + const SequelizeCtor = SequelizeLib.Sequelize || SequelizeLib; + const endpoint = process.env.NYX_SQL_ENDPOINT || ':memory:'; + const sequelize = new SequelizeCtor({{ dialect: 'sqlite', storage: endpoint, logging: false }}); + return {{ + queryInterface: sequelize.getQueryInterface(), + Sequelize: SequelizeLib, + close: async function() {{ try {{ await sequelize.close(); }} catch (e) {{}} }}, + }}; + }} catch (e) {{ + return null; + }} +}} +const _realSequelize = _nyxTryRealSequelize(); // QueryInterface shim for sequelize-style up/down(queryInterface, Sequelize). -const _qi = {{ +const _qi = _realSequelize ? _realSequelize.queryInterface : {{ createTable: async function(name){{ const sql = 'CREATE TABLE ' + String(name) + ' (id INTEGER)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, addColumn: async function(table, column){{ const sql = 'ALTER TABLE ' + String(table) + ' ADD COLUMN ' + String(column) + ' TEXT'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, dropTable: async function(name){{ const sql = 'DROP TABLE ' + String(name); _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, @@ -1240,6 +1256,7 @@ const _qi = {{ bulkInsert: async function(table){{ const sql = 'INSERT INTO ' + String(table) + ' VALUES (...)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }}, sequelize: {{ query: async function(sql){{ _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }} }}, }}; +const _sequelizeNamespace = _realSequelize ? _realSequelize.Sequelize : {{}}; const _prisma = {{ $executeRaw: async function(s){{ if (s) _nyxMigrationSqlRecord(s, 'prisma'); return s; }}, $executeRawUnsafe: async function(s){{ if (s) {{ _nyxMigrationSqlRecord(s, 'prisma'); process.stdout.write('NYX_PRISMA_SQL: ' + s + '\n'); }} return s; }}, @@ -1254,7 +1271,7 @@ global.__nyx_prisma = _prisma; // Single-arg migrations are Prisma/raw shapes and should receive payload. try {{ if (_h.length >= 2) {{ - _result = await Promise.resolve(_h(_qi, {{}})); + _result = await Promise.resolve(_h(_qi, _sequelizeNamespace)); }} else {{ _result = await Promise.resolve(_h(payload)); }} @@ -1269,6 +1286,8 @@ global.__nyx_prisma = _prisma; if (_result != null) process.stdout.write(String(_result) + '\n'); }} catch (e) {{ process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n'); + }} finally {{ + if (_realSequelize && _realSequelize.close) await _realSequelize.close(); }} }})(); "#, diff --git a/src/dynamic/lang/php.rs b/src/dynamic/lang/php.rs index eaa64ba0..7297d4eb 100644 --- a/src/dynamic/lang/php.rs +++ b/src/dynamic/lang/php.rs @@ -3091,6 +3091,11 @@ if ((!$payload || $payload === '') && is_string($_b64) && $_b64 !== '') {{ if ($decoded !== false) $payload = $decoded; }} +$autoload = __DIR__ . '/vendor/autoload.php'; +if (is_file($autoload)) {{ + require_once $autoload; +}} + try {{ require_once __DIR__ . '/entry.php'; }} catch (Throwable $e) {{ diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index bb5ee776..5dd1b335 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -985,7 +985,11 @@ def _nyx_pubsub_dispatch(message): if _h is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) sys.exit(78) + _nyx_record_broker_event("NYX_PUBSUB_LOG", "deliver", {queue:?}, getattr(message, "data", message)) _h(message) + if hasattr(message, "ack"): + message.ack() + _nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", "")) _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) _nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) @@ -1001,7 +1005,9 @@ def _nyx_rabbit_dispatch(ch, method, props, body): if _h is None: print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) sys.exit(78) + _nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body) _h(ch, method, props, body) + _nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", "")) _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) _nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) @@ -1323,11 +1329,40 @@ class _NyxMigrationOpProxy: if self._inner is not None and self._inner is not self and hasattr(self._inner, "execute"): return self._inner.execute(sql, *args, **kwargs) return None + def __getattr__(self, name): + if self._inner is not None and self._inner is not self: + return getattr(self._inner, name) + raise AttributeError(name) + +_nyx_migration_cleanup = None + +def _nyx_real_alembic_operations(): + endpoint = os.environ.get("NYX_SQL_ENDPOINT", "") + url = "sqlite:///" + endpoint if endpoint else "sqlite:///:memory:" + try: + from sqlalchemy import create_engine + from alembic.migration import MigrationContext + from alembic.operations import Operations + engine = create_engine(url) + conn = engine.connect() + ctx = MigrationContext.configure(conn) + ops = Operations(ctx) + def _cleanup(): + try: + conn.close() + finally: + engine.dispose() + return ops, _cleanup + except Exception: + return None, None def _nyx_install_migration_sql_hooks(): + global _nyx_migration_cleanup if hasattr(_entry_mod, "op"): try: - _entry_mod.op = _NyxMigrationOpProxy(getattr(_entry_mod, "op")) + real_ops, cleanup = _nyx_real_alembic_operations() + _nyx_migration_cleanup = cleanup + _entry_mod.op = _NyxMigrationOpProxy(real_ops or getattr(_entry_mod, "op")) except Exception: pass @@ -1339,6 +1374,26 @@ def _nyx_record_migration_result(result): _nyx_migration_sql_record(sql, "django") elif isinstance(result, str): _nyx_migration_sql_record(result, "migration") + elif hasattr(result, "database_forwards"): + sql = getattr(result, "sql", None) + if sql is not None: + _nyx_migration_sql_record(sql, "django") + try: + from django.conf import settings + if not settings.configured: + endpoint = os.environ.get("NYX_SQL_ENDPOINT", ":memory:") + settings.configure( + INSTALLED_APPS=[], + DATABASES={{"default": {{"ENGINE": "django.db.backends.sqlite3", "NAME": endpoint}}}}, + SECRET_KEY="nyx-dynamic-harness", + ) + import django + django.setup() + from django.db import connection + with connection.schema_editor() as schema_editor: + result.database_forwards("nyx_dynamic", schema_editor, None, None) + except Exception: + pass try: _nyx_install_migration_sql_hooks() @@ -1360,10 +1415,14 @@ try: print(str(_result), flush=True) except Exception: pass + if _nyx_migration_cleanup is not None: + _nyx_migration_cleanup() except SystemExit as _e: sys.exit(_e.code) except Exception as _e: print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + if _nyx_migration_cleanup is not None: + _nyx_migration_cleanup() "#, version = version_repr, handler = handler, diff --git a/src/dynamic/lang/ruby.rs b/src/dynamic/lang/ruby.rs index 5c91aa68..b7511fce 100644 --- a/src/dynamic/lang/ruby.rs +++ b/src/dynamic/lang/ruby.rs @@ -718,6 +718,22 @@ end $nyx_payload = nyx_payload +begin + require 'bundler/setup' if File.exist?(File.join(__dir__, 'Gemfile')) +rescue LoadError +end + +begin + require 'active_record' + endpoint = ENV['NYX_SQL_ENDPOINT'] + if endpoint && !endpoint.empty? + ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: endpoint) + ActiveRecord::Migration.verbose = false if defined?(ActiveRecord::Migration) + end +rescue LoadError, StandardError => e + STDERR.puts('NYX_ACTIVE_RECORD_BOOTSTRAP_SKIPPED: ' + e.class.name + ': ' + e.message) if ENV['NYX_DEBUG'] +end + begin require_relative './entry' rescue LoadError, ScriptError => e @@ -940,6 +956,12 @@ if Object.const_defined?({handler:?}) cls = Object.const_get({handler:?}) begin inst = cls.new + if inst.respond_to?(:table_name=) + begin + inst.table_name = $nyx_payload + rescue StandardError + end + end if inst.respond_to?(:execute, true) original_execute = inst.method(:execute) inst.define_singleton_method(:execute) do |sql, *args, &blk| diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index 6971be53..de483140 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -79,6 +79,28 @@ fn make_spec(lang: Lang, queue: &str, handler: &str, fixture: &str) -> HarnessSp } } +fn make_spec_with_adapter( + lang: Lang, + queue: &str, + handler: &str, + fixture: &str, + adapter: &str, +) -> HarnessSpec { + let mut spec = make_spec(lang, queue, handler, fixture); + spec.framework = Some(FrameworkBinding { + adapter: adapter.to_owned(), + kind: EntryKind::MessageHandler { + queue: queue.to_owned(), + message_schema: None, + }, + route: None, + request_params: vec![], + response_writer: None, + middleware: vec![], + }); + spec +} + // ── Supported-set assertions ────────────────────────────────────────────────── #[test] @@ -205,6 +227,62 @@ fn message_handler_go_uses_nyx_handlers_registry() { assert!(h.source.contains("nyxRecordBrokerPublish")); } +#[test] +fn message_handler_remaining_brokers_emit_delivery_and_ack_events() { + let cases = [ + ( + Lang::Python, + "pubsub_python", + "projects/p/subscriptions/s", + "callback", + "pubsub-python", + "NYX_PUBSUB_LOG", + ), + ( + Lang::Python, + "rabbit_python", + "work", + "on_message", + "rabbit-python", + "NYX_RABBIT_LOG", + ), + ( + Lang::Java, + "rabbit_java", + "work", + "onMessage", + "rabbit-java", + "NYX_RABBIT_LOG", + ), + ( + Lang::Go, + "nats_go", + "events", + "OnMessage", + "nats-go", + "NYX_NATS_LOG", + ), + ]; + for (lang, fixture, queue, handler, adapter, log_env) in cases { + let spec = make_spec_with_adapter(lang, queue, handler, entry_file(fixture), adapter); + let h = lang::emit(&spec).expect("emit ok"); + assert!( + h.source.contains(log_env), + "{adapter} harness must write the broker log env var", + ); + assert!( + h.source.contains("\"deliver\"") || h.source.contains("'deliver'"), + "{adapter} harness must record delivery events: {}", + h.source + ); + assert!( + h.source.contains("\"ack\"") || h.source.contains("'ack'"), + "{adapter} harness must record ack events: {}", + h.source + ); + } +} + // ── Framework-adapter assertions ────────────────────────────────────────────── fn ts_language_for(lang: Lang) -> tree_sitter::Language { diff --git a/tests/phase21_corpus.rs b/tests/phase21_corpus.rs index e0c6094d..72b506bf 100644 --- a/tests/phase21_corpus.rs +++ b/tests/phase21_corpus.rs @@ -909,6 +909,7 @@ fn migration_python_harness_carries_sentinel_and_handler() { assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("\"upgrade\"")); assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("MigrationContext.configure")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); } @@ -924,6 +925,8 @@ fn migration_js_harness_carries_sentinel_and_handler() { assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("\"up\"")); assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("require('sequelize')")); + assert!(h.source.contains("getQueryInterface")); assert!(h.source.contains("global.__nyx_prisma")); assert!(h.source.contains("node:sqlite")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); @@ -941,6 +944,7 @@ fn migration_ruby_harness_carries_sentinel_and_handler() { assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("AddIndex")); assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("ActiveRecord::Base.establish_connection")); assert!(h.source.contains("SQLite3::Database")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); } @@ -957,6 +961,7 @@ fn migration_php_harness_carries_sentinel_and_handler() { assert!(h.source.contains("__NYX_MIGRATION__")); assert!(h.source.contains("AddUsers")); assert!(h.source.contains("__nyx_stub_sql_record")); + assert!(h.source.contains("vendor/autoload.php")); assert!(h.source.contains("new SQLite3")); assert!(h.source.contains("NYX_SQL_ENDPOINT")); }