refactor(dynamic): enhance event recording across brokers, improve SQL migration handling for frameworks, update runtime dependency management, and add test coverage

This commit is contained in:
elipeter 2026-05-27 08:23:48 -05:00
parent 9bf085ee48
commit 0903231189
9 changed files with 225 additions and 9 deletions

View file

@ -137,10 +137,16 @@ const NODE_PRISMA: &[VersionedPackage] = &[VersionedPackage {
name: "@prisma/client",
version: "^5.14.0",
}];
const NODE_SEQUELIZE: &[VersionedPackage] = &[VersionedPackage {
name: "sequelize",
version: "^6.37.3",
}];
const NODE_SEQUELIZE: &[VersionedPackage] = &[
VersionedPackage {
name: "sequelize",
version: "^6.37.3",
},
VersionedPackage {
name: "sqlite3",
version: "^5.1.7",
},
];
const RUBY_RACK: &[&str] = &["rack"];
const RUBY_SINATRA: &[&str] = &["rack", "sinatra"];

View file

@ -2162,7 +2162,9 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc
format!(
r##" broker := NewNyxNatsLoopback()
broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{
nyxRecordBrokerEvent("NYX_NATS_LOG", "deliver", "{queue}", string(msg.Data))
nyxDispatch(msg)
nyxRecordBrokerEvent("NYX_NATS_LOG", "ack", "{queue}", msg.Subject)
}})
fmt.Println("{publish_marker} " + "{queue}")
nyxRecordBrokerPublish("NYX_NATS_LOG", "{queue}", payload)
@ -2177,7 +2179,10 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc
format!(
r##" broker := NewNyxPubsubLoopback()
broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{
nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "deliver", "{queue}", string(msg.Data))
nyxDispatch(msg)
msg.Ack()
nyxRecordBrokerEvent("NYX_PUBSUB_LOG", "ack", "{queue}", msg.ID)
}})
fmt.Println("{publish_marker} " + "{queue}")
nyxRecordBrokerPublish("NYX_PUBSUB_LOG", "{queue}", payload)
@ -2261,7 +2266,7 @@ func nyxPayload() string {{
return ""
}}
func nyxRecordBrokerPublish(envName string, destination string, payload string) {{
func nyxRecordBrokerEvent(envName string, action string, destination string, payload string) {{
path := os.Getenv(envName)
if path == "" {{
return
@ -2271,7 +2276,17 @@ func nyxRecordBrokerPublish(envName string, destination string, payload string)
return
}}
defer f.Close()
_, _ = fmt.Fprintf(f, "%s\t%s\n", strings.ReplaceAll(destination, "\t", " "), payload)
_, _ = fmt.Fprintf(
f,
"%s\t%s\t%s\n",
strings.ReplaceAll(action, "\t", " "),
strings.ReplaceAll(destination, "\t", " "),
payload,
)
}}
func nyxRecordBrokerPublish(envName string, destination string, payload string) {{
nyxRecordBrokerEvent(envName, "publish", destination, payload)
}}
func main() {{

View file

@ -3834,16 +3834,20 @@ fn emit_message_handler_harness(
format!(
r#" NyxRabbitChannel chan = new NyxRabbitChannel();
chan.basicConsume({queue:?}, (mid, body) -> {{
nyxRecordBrokerEvent("NYX_RABBIT_LOG", "deliver", {queue:?}, body);
System.out.println("__NYX_SINK_HIT__");
boolean success = false;
try {{
java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class);
m.setAccessible(true);
m.invoke(entryInst, mid, body);
success = true;
}} catch (NoSuchMethodException nsme) {{
try {{
java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class);
m2.setAccessible(true);
m2.invoke(entryInst, body);
success = true;
}} catch (Exception ie) {{
Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
@ -3852,6 +3856,9 @@ fn emit_message_handler_harness(
Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
if (success) {{
nyxRecordBrokerEvent("NYX_RABBIT_LOG", "ack", {queue:?}, mid);
}}
}});
System.out.println({publish_marker:?} + " " + {queue:?});
nyxRecordBrokerPublish("NYX_RABBIT_LOG", {queue:?}, payload);

View file

@ -1231,8 +1231,24 @@ function _nyxMigrationSqlRecord(sql, driver) {{
const sqliteDriver = _nyxTryExecuteSqlite(sql);
__nyx_stub_sql_record(String(sql), {{ driver: driver, source: 'migration', sqlite_driver: sqliteDriver }});
}}
function _nyxTryRealSequelize() {{
try {{
const SequelizeLib = require('sequelize');
const SequelizeCtor = SequelizeLib.Sequelize || SequelizeLib;
const endpoint = process.env.NYX_SQL_ENDPOINT || ':memory:';
const sequelize = new SequelizeCtor({{ dialect: 'sqlite', storage: endpoint, logging: false }});
return {{
queryInterface: sequelize.getQueryInterface(),
Sequelize: SequelizeLib,
close: async function() {{ try {{ await sequelize.close(); }} catch (e) {{}} }},
}};
}} catch (e) {{
return null;
}}
}}
const _realSequelize = _nyxTryRealSequelize();
// QueryInterface shim for sequelize-style up/down(queryInterface, Sequelize).
const _qi = {{
const _qi = _realSequelize ? _realSequelize.queryInterface : {{
createTable: async function(name){{ const sql = 'CREATE TABLE ' + String(name) + ' (id INTEGER)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }},
addColumn: async function(table, column){{ const sql = 'ALTER TABLE ' + String(table) + ' ADD COLUMN ' + String(column) + ' TEXT'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }},
dropTable: async function(name){{ const sql = 'DROP TABLE ' + String(name); _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }},
@ -1240,6 +1256,7 @@ const _qi = {{
bulkInsert: async function(table){{ const sql = 'INSERT INTO ' + String(table) + ' VALUES (...)'; _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }},
sequelize: {{ query: async function(sql){{ _nyxMigrationSqlRecord(sql, 'sequelize'); return sql; }} }},
}};
const _sequelizeNamespace = _realSequelize ? _realSequelize.Sequelize : {{}};
const _prisma = {{
$executeRaw: async function(s){{ if (s) _nyxMigrationSqlRecord(s, 'prisma'); return s; }},
$executeRawUnsafe: async function(s){{ if (s) {{ _nyxMigrationSqlRecord(s, 'prisma'); process.stdout.write('NYX_PRISMA_SQL: ' + s + '\n'); }} return s; }},
@ -1254,7 +1271,7 @@ global.__nyx_prisma = _prisma;
// Single-arg migrations are Prisma/raw shapes and should receive payload.
try {{
if (_h.length >= 2) {{
_result = await Promise.resolve(_h(_qi, {{}}));
_result = await Promise.resolve(_h(_qi, _sequelizeNamespace));
}} else {{
_result = await Promise.resolve(_h(payload));
}}
@ -1269,6 +1286,8 @@ global.__nyx_prisma = _prisma;
if (_result != null) process.stdout.write(String(_result) + '\n');
}} catch (e) {{
process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n');
}} finally {{
if (_realSequelize && _realSequelize.close) await _realSequelize.close();
}}
}})();
"#,

View file

@ -3091,6 +3091,11 @@ if ((!$payload || $payload === '') && is_string($_b64) && $_b64 !== '') {{
if ($decoded !== false) $payload = $decoded;
}}
$autoload = __DIR__ . '/vendor/autoload.php';
if (is_file($autoload)) {{
require_once $autoload;
}}
try {{
require_once __DIR__ . '/entry.php';
}} catch (Throwable $e) {{

View file

@ -985,7 +985,11 @@ def _nyx_pubsub_dispatch(message):
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_nyx_record_broker_event("NYX_PUBSUB_LOG", "deliver", {queue:?}, getattr(message, "data", message))
_h(message)
if hasattr(message, "ack"):
message.ack()
_nyx_record_broker_event("NYX_PUBSUB_LOG", "ack", {queue:?}, getattr(message, "message_id", ""))
_loop.subscribe({queue:?}, _nyx_pubsub_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload)
@ -1001,7 +1005,9 @@ def _nyx_rabbit_dispatch(ch, method, props, body):
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_nyx_record_broker_event("NYX_RABBIT_LOG", "deliver", {queue:?}, body)
_h(ch, method, props, body)
_nyx_record_broker_event("NYX_RABBIT_LOG", "ack", {queue:?}, getattr(method, "delivery_tag", ""))
_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload)
@ -1323,11 +1329,40 @@ class _NyxMigrationOpProxy:
if self._inner is not None and self._inner is not self and hasattr(self._inner, "execute"):
return self._inner.execute(sql, *args, **kwargs)
return None
def __getattr__(self, name):
if self._inner is not None and self._inner is not self:
return getattr(self._inner, name)
raise AttributeError(name)
_nyx_migration_cleanup = None
def _nyx_real_alembic_operations():
endpoint = os.environ.get("NYX_SQL_ENDPOINT", "")
url = "sqlite:///" + endpoint if endpoint else "sqlite:///:memory:"
try:
from sqlalchemy import create_engine
from alembic.migration import MigrationContext
from alembic.operations import Operations
engine = create_engine(url)
conn = engine.connect()
ctx = MigrationContext.configure(conn)
ops = Operations(ctx)
def _cleanup():
try:
conn.close()
finally:
engine.dispose()
return ops, _cleanup
except Exception:
return None, None
def _nyx_install_migration_sql_hooks():
global _nyx_migration_cleanup
if hasattr(_entry_mod, "op"):
try:
_entry_mod.op = _NyxMigrationOpProxy(getattr(_entry_mod, "op"))
real_ops, cleanup = _nyx_real_alembic_operations()
_nyx_migration_cleanup = cleanup
_entry_mod.op = _NyxMigrationOpProxy(real_ops or getattr(_entry_mod, "op"))
except Exception:
pass
@ -1339,6 +1374,26 @@ def _nyx_record_migration_result(result):
_nyx_migration_sql_record(sql, "django")
elif isinstance(result, str):
_nyx_migration_sql_record(result, "migration")
elif hasattr(result, "database_forwards"):
sql = getattr(result, "sql", None)
if sql is not None:
_nyx_migration_sql_record(sql, "django")
try:
from django.conf import settings
if not settings.configured:
endpoint = os.environ.get("NYX_SQL_ENDPOINT", ":memory:")
settings.configure(
INSTALLED_APPS=[],
DATABASES={{"default": {{"ENGINE": "django.db.backends.sqlite3", "NAME": endpoint}}}},
SECRET_KEY="nyx-dynamic-harness",
)
import django
django.setup()
from django.db import connection
with connection.schema_editor() as schema_editor:
result.database_forwards("nyx_dynamic", schema_editor, None, None)
except Exception:
pass
try:
_nyx_install_migration_sql_hooks()
@ -1360,10 +1415,14 @@ try:
print(str(_result), flush=True)
except Exception:
pass
if _nyx_migration_cleanup is not None:
_nyx_migration_cleanup()
except SystemExit as _e:
sys.exit(_e.code)
except Exception as _e:
print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True)
if _nyx_migration_cleanup is not None:
_nyx_migration_cleanup()
"#,
version = version_repr,
handler = handler,

View file

@ -718,6 +718,22 @@ end
$nyx_payload = nyx_payload
begin
require 'bundler/setup' if File.exist?(File.join(__dir__, 'Gemfile'))
rescue LoadError
end
begin
require 'active_record'
endpoint = ENV['NYX_SQL_ENDPOINT']
if endpoint && !endpoint.empty?
ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: endpoint)
ActiveRecord::Migration.verbose = false if defined?(ActiveRecord::Migration)
end
rescue LoadError, StandardError => e
STDERR.puts('NYX_ACTIVE_RECORD_BOOTSTRAP_SKIPPED: ' + e.class.name + ': ' + e.message) if ENV['NYX_DEBUG']
end
begin
require_relative './entry'
rescue LoadError, ScriptError => e
@ -940,6 +956,12 @@ if Object.const_defined?({handler:?})
cls = Object.const_get({handler:?})
begin
inst = cls.new
if inst.respond_to?(:table_name=)
begin
inst.table_name = $nyx_payload
rescue StandardError
end
end
if inst.respond_to?(:execute, true)
original_execute = inst.method(:execute)
inst.define_singleton_method(:execute) do |sql, *args, &blk|

View file

@ -79,6 +79,28 @@ fn make_spec(lang: Lang, queue: &str, handler: &str, fixture: &str) -> HarnessSp
}
}
fn make_spec_with_adapter(
lang: Lang,
queue: &str,
handler: &str,
fixture: &str,
adapter: &str,
) -> HarnessSpec {
let mut spec = make_spec(lang, queue, handler, fixture);
spec.framework = Some(FrameworkBinding {
adapter: adapter.to_owned(),
kind: EntryKind::MessageHandler {
queue: queue.to_owned(),
message_schema: None,
},
route: None,
request_params: vec![],
response_writer: None,
middleware: vec![],
});
spec
}
// ── Supported-set assertions ──────────────────────────────────────────────────
#[test]
@ -205,6 +227,62 @@ fn message_handler_go_uses_nyx_handlers_registry() {
assert!(h.source.contains("nyxRecordBrokerPublish"));
}
#[test]
fn message_handler_remaining_brokers_emit_delivery_and_ack_events() {
let cases = [
(
Lang::Python,
"pubsub_python",
"projects/p/subscriptions/s",
"callback",
"pubsub-python",
"NYX_PUBSUB_LOG",
),
(
Lang::Python,
"rabbit_python",
"work",
"on_message",
"rabbit-python",
"NYX_RABBIT_LOG",
),
(
Lang::Java,
"rabbit_java",
"work",
"onMessage",
"rabbit-java",
"NYX_RABBIT_LOG",
),
(
Lang::Go,
"nats_go",
"events",
"OnMessage",
"nats-go",
"NYX_NATS_LOG",
),
];
for (lang, fixture, queue, handler, adapter, log_env) in cases {
let spec = make_spec_with_adapter(lang, queue, handler, entry_file(fixture), adapter);
let h = lang::emit(&spec).expect("emit ok");
assert!(
h.source.contains(log_env),
"{adapter} harness must write the broker log env var",
);
assert!(
h.source.contains("\"deliver\"") || h.source.contains("'deliver'"),
"{adapter} harness must record delivery events: {}",
h.source
);
assert!(
h.source.contains("\"ack\"") || h.source.contains("'ack'"),
"{adapter} harness must record ack events: {}",
h.source
);
}
}
// ── Framework-adapter assertions ──────────────────────────────────────────────
fn ts_language_for(lang: Lang) -> tree_sitter::Language {

View file

@ -909,6 +909,7 @@ fn migration_python_harness_carries_sentinel_and_handler() {
assert!(h.source.contains("__NYX_MIGRATION__"));
assert!(h.source.contains("\"upgrade\""));
assert!(h.source.contains("__nyx_stub_sql_record"));
assert!(h.source.contains("MigrationContext.configure"));
assert!(h.source.contains("NYX_SQL_ENDPOINT"));
}
@ -924,6 +925,8 @@ fn migration_js_harness_carries_sentinel_and_handler() {
assert!(h.source.contains("__NYX_MIGRATION__"));
assert!(h.source.contains("\"up\""));
assert!(h.source.contains("__nyx_stub_sql_record"));
assert!(h.source.contains("require('sequelize')"));
assert!(h.source.contains("getQueryInterface"));
assert!(h.source.contains("global.__nyx_prisma"));
assert!(h.source.contains("node:sqlite"));
assert!(h.source.contains("NYX_SQL_ENDPOINT"));
@ -941,6 +944,7 @@ fn migration_ruby_harness_carries_sentinel_and_handler() {
assert!(h.source.contains("__NYX_MIGRATION__"));
assert!(h.source.contains("AddIndex"));
assert!(h.source.contains("__nyx_stub_sql_record"));
assert!(h.source.contains("ActiveRecord::Base.establish_connection"));
assert!(h.source.contains("SQLite3::Database"));
assert!(h.source.contains("NYX_SQL_ENDPOINT"));
}
@ -957,6 +961,7 @@ fn migration_php_harness_carries_sentinel_and_handler() {
assert!(h.source.contains("__NYX_MIGRATION__"));
assert!(h.source.contains("AddUsers"));
assert!(h.source.contains("__nyx_stub_sql_record"));
assert!(h.source.contains("vendor/autoload.php"));
assert!(h.source.contains("new SQLite3"));
assert!(h.source.contains("NYX_SQL_ENDPOINT"));
}