[pitboss] phase 20: Track M.2 — MessageHandler end-to-end (Kafka / SQS / Pub-Sub / NATS / RabbitMQ)

This commit is contained in:
pitboss 2026-05-20 16:03:40 -05:00
parent fedc507e6a
commit bd0135e423
45 changed files with 3227 additions and 25 deletions

View file

@ -56,6 +56,7 @@ const SUPPORTED: &[EntryKindTag] = &[
EntryKindTag::HttpRoute,
EntryKindTag::CliSubcommand,
EntryKindTag::ClassMethod,
EntryKindTag::MessageHandler,
];
impl LangEmitter for GoEmitter {
@ -583,6 +584,14 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
return Ok(emit_class_method_harness(class, method));
}
// Phase 20 (Track M.2): MessageHandler short-circuit. Picks the
// broker loopback (Pub/Sub or NATS) by inspecting the spec's
// framework adapter id and dispatches the payload synchronously to
// the named handler function in the entry package.
if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind {
return Ok(emit_message_handler_harness(spec, queue));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = GoShape::detect(spec, &entry_source);
let main_go = generate_main_go(spec, shape);
@ -1129,6 +1138,155 @@ func main() {{
}
}
/// Phase 20 (Track M.2) — message-handler harness for Go.
///
/// The entry package is expected to declare a top-level handler
/// function named `spec.entry_name` taking either a `*entry.NyxPubsubMessage`
/// / `*entry.NyxNatsMsg` envelope or a `string` payload. The harness
/// mounts the broker loopback declared by [`broker_pubsub`] /
/// [`broker_nats`], subscribes the handler reflectively, and publishes
/// the payload. Broker pick is derived from
/// `spec.framework.adapter`: `pubsub-go` → Pub/Sub, `nats-go` → NATS,
/// default → Pub/Sub.
fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSource {
let shim = probe_shim();
let go_mod = generate_go_mod();
let handler = &spec.entry_name;
let broker = go_broker_for_adapter(spec);
let (broker_src, publish_marker, dispatch) = match broker {
GoBroker::Nats => (
crate::dynamic::stubs::nats_source(crate::symbol::Lang::Go),
crate::dynamic::stubs::NATS_PUBLISH_MARKER,
format!(
r##" broker := NewNyxNatsLoopback()
broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{
nyxDispatch(msg)
}})
fmt.Println("{publish_marker} " + "{queue}")
broker.Publish("{queue}", payload)"##,
queue = queue,
publish_marker = crate::dynamic::stubs::NATS_PUBLISH_MARKER,
),
),
GoBroker::Pubsub => (
crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Go),
crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER,
format!(
r##" broker := NewNyxPubsubLoopback()
broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{
nyxDispatch(msg)
}})
fmt.Println("{publish_marker} " + "{queue}")
broker.Publish("{queue}", payload)"##,
queue = queue,
publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER,
),
),
};
// The handler is looked up reflectively through a per-package
// `NyxHandlers` registry the entry file publishes (mirrors the
// Phase 19 `NyxReceivers` contract). A fallback path probes a few
// common exported names so a fixture without the registry still
// wires up.
let dispatch_inner = format!(
r##"func nyxDispatch(msg interface{{}}) {{
defer func() {{
if r := recover(); r != nil {{
fmt.Fprintf(os.Stderr, "NYX_EXCEPTION: panic: %v\n", r)
}}
}}()
fmt.Println("__NYX_SINK_HIT__")
cb, ok := entry.NyxHandlers["{handler}"]
if !ok {{
fmt.Fprintln(os.Stderr, "NYX_HANDLER_NOT_FOUND: " + "{handler}")
os.Exit(78)
}}
v := reflect.ValueOf(cb)
args := make([]reflect.Value, v.Type().NumIn())
for i := 0; i < v.Type().NumIn(); i++ {{
want := v.Type().In(i)
got := reflect.ValueOf(msg)
if got.Type().AssignableTo(want) {{
args[i] = got
}} else if want.Kind() == reflect.String {{
args[i] = reflect.ValueOf(os.Getenv("NYX_PAYLOAD"))
}} else {{
args[i] = reflect.Zero(want)
}}
}}
v.Call(args)
}}
"##,
handler = handler,
);
let source = format!(
r##"// Nyx dynamic harness — message handler (Phase 20 / Track M.2).
package main
import (
"fmt"
"os"
"reflect"
"nyx-harness/entry"
)
{shim}
{broker_src}
{dispatch_inner}
func nyxPayload() string {{
if v := os.Getenv("NYX_PAYLOAD"); v != "" {{
return v
}}
return ""
}}
func main() {{
__nyx_install_crash_guard("{handler}")
payload := nyxPayload()
{dispatch}
}}
"##,
broker_src = broker_src,
dispatch_inner = dispatch_inner,
dispatch = dispatch,
handler = handler,
);
let _ = publish_marker;
HarnessSource {
source,
filename: "main.go".to_owned(),
command: vec!["./nyx_harness".to_owned()],
extra_files: vec![("go.mod".to_owned(), go_mod)],
entry_subpath: Some("entry/entry.go".to_owned()),
}
}
#[derive(Debug, Clone, Copy)]
enum GoBroker {
Pubsub,
Nats,
}
fn go_broker_for_adapter(spec: &HarnessSpec) -> GoBroker {
let adapter = spec
.framework
.as_ref()
.map(|b| b.adapter.as_str())
.unwrap_or("");
match adapter {
"nats-go" => GoBroker::Nats,
_ => GoBroker::Pubsub,
}
}
/// Minimal `gin` stub package used by [`GoShape::GinHandler`] fixtures
/// so the toolchain can compile without a real gin dependency.
/// Exposes just enough surface (Context.Query, Context.JSON,

View file

@ -55,6 +55,7 @@ const SUPPORTED: &[EntryKindTag] = &[
EntryKindTag::HttpRoute,
EntryKindTag::CliSubcommand,
EntryKindTag::ClassMethod,
EntryKindTag::MessageHandler,
];
impl LangEmitter for JavaEmitter {
@ -601,6 +602,15 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
return Ok(emit_class_method_harness(spec, class, method, &entry_class));
}
// Phase 20 (Track M.2): MessageHandler short-circuit. Mounts the
// in-process broker loopback declared by `broker_{kafka,sqs,rabbit}`
// and dispatches the payload synchronously to the named handler.
if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind {
let entry_source = read_entry_source(&spec.entry_file);
let entry_class = derive_entry_class(&entry_source);
return Ok(emit_message_handler_harness(spec, queue, &entry_class));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = JavaShape::detect(spec, &entry_source);
let entry_class = derive_entry_class(&entry_source);
@ -1937,6 +1947,182 @@ public class NyxHarness {{
}
}
/// Phase 20 (Track M.2) — message-handler harness for Java.
///
/// Locates `entry_class` (the fixture's public class) reflectively,
/// instantiates it via its no-arg ctor (or via the stubbed-dependency
/// fallback path used by [`emit_class_method_harness`]), mounts the
/// broker loopback selected by `spec.framework.adapter`
/// (`kafka-java` → `NyxKafkaLoopback`, `sqs-java` → `NyxSqsLoopback`,
/// `rabbit-java` → `NyxRabbitChannel`; default → Kafka), subscribes the
/// handler method named by `spec.entry_name`, and publishes the payload
/// onto `queue`.
fn emit_message_handler_harness(
spec: &HarnessSpec,
queue: &str,
entry_class: &str,
) -> HarnessSource {
let probe = probe_shim();
let handler = &spec.entry_name;
let broker = java_broker_for_adapter(spec);
let kafka_src = crate::dynamic::stubs::kafka_source(crate::symbol::Lang::Java);
let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::Java);
let rabbit_src = crate::dynamic::stubs::rabbit_source(crate::symbol::Lang::Java);
let (publish_marker, dispatch_block) = match broker {
JavaBroker::Sqs => (
crate::dynamic::stubs::SQS_PUBLISH_MARKER,
format!(
r#" NyxSqsLoopback brokerRef = new NyxSqsLoopback();
brokerRef.subscribe({queue:?}, env -> {{
System.out.println("__NYX_SINK_HIT__");
try {{
java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, java.util.Map.class);
m.setAccessible(true);
m.invoke(entryInst, env);
}} catch (Exception e) {{
Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
}});
System.out.println({publish_marker:?} + " " + {queue:?});
brokerRef.publish({queue:?}, payload);"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER,
),
),
JavaBroker::Rabbit => (
crate::dynamic::stubs::RABBIT_PUBLISH_MARKER,
format!(
r#" NyxRabbitChannel chan = new NyxRabbitChannel();
chan.basicConsume({queue:?}, (mid, body) -> {{
System.out.println("__NYX_SINK_HIT__");
try {{
java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class);
m.setAccessible(true);
m.invoke(entryInst, mid, body);
}} catch (NoSuchMethodException nsme) {{
try {{
java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class);
m2.setAccessible(true);
m2.invoke(entryInst, body);
}} catch (Exception ie) {{
Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
}} catch (Exception e) {{
Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
}});
System.out.println({publish_marker:?} + " " + {queue:?});
chan.basicPublish("", {queue:?}, payload);"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER,
),
),
JavaBroker::Kafka => (
crate::dynamic::stubs::KAFKA_PUBLISH_MARKER,
format!(
r#" NyxKafkaLoopback brokerRef = new NyxKafkaLoopback();
brokerRef.subscribe({queue:?}, body -> {{
System.out.println("__NYX_SINK_HIT__");
try {{
java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class);
m.setAccessible(true);
m.invoke(entryInst, body);
}} catch (Exception e) {{
Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e;
System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage());
}}
}});
System.out.println({publish_marker:?} + " " + {queue:?});
brokerRef.publish({queue:?}, payload);"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER,
),
),
};
let _ = publish_marker;
let source = format!(
r#"// Nyx dynamic harness — message handler (Phase 20 / Track M.2).
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
public class NyxHarness {{
{probe}
{kafka_src}
{sqs_src}
{rabbit_src}
public static void main(String[] args) {{
String payload = nyxPayload();
try {{
Class<?> entryCls = Class.forName({entry_class:?});
Constructor<?> ctor = entryCls.getDeclaredConstructor();
ctor.setAccessible(true);
final Object entryInst = ctor.newInstance();
{dispatch_block}
}} catch (Throwable e) {{
System.err.println("NYX_EXCEPTION: " + e.getClass().getName() + ": " + e.getMessage());
}}
}}
static String nyxPayload() {{
String v = System.getenv("NYX_PAYLOAD");
if (v != null && !v.isEmpty()) return v;
String b64 = System.getenv("NYX_PAYLOAD_B64");
if (b64 != null && !b64.isEmpty()) {{
byte[] decoded = java.util.Base64.getDecoder().decode(b64);
return new String(decoded, java.nio.charset.StandardCharsets.UTF_8);
}}
return "";
}}
}}
"#,
entry_class = entry_class,
dispatch_block = dispatch_block,
);
HarnessSource {
source,
filename: "NyxHarness.java".to_owned(),
command: vec![
"java".to_owned(),
"-cp".to_owned(),
".".to_owned(),
"NyxHarness".to_owned(),
],
extra_files: vec![],
entry_subpath: Some(format!("{entry_class}.java")),
}
}
#[derive(Debug, Clone, Copy)]
enum JavaBroker {
Kafka,
Sqs,
Rabbit,
}
fn java_broker_for_adapter(spec: &HarnessSpec) -> JavaBroker {
let adapter = spec
.framework
.as_ref()
.map(|b| b.adapter.as_str())
.unwrap_or("");
match adapter {
"sqs-java" => JavaBroker::Sqs,
"rabbit-java" => JavaBroker::Rabbit,
_ => JavaBroker::Kafka,
}
}
/// Reflective JUnit-shape invocation. Reads the payload from
/// `NYX_PAYLOAD` (no method argument) — JUnit tests typically capture
/// inputs through fields or `System.getenv`.

View file

@ -575,6 +575,14 @@ pub fn emit(spec: &HarnessSpec, is_typescript: bool) -> Result<HarnessSource, Un
return Ok(emit_class_method(spec, class, method, is_typescript));
}
// Phase 20 (Track M.2): MessageHandler short-circuit. Mounts the
// in-process SQS loopback (the only broker Node has a dedicated
// adapter for in this phase) and dispatches the payload to the
// named handler synchronously.
if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind {
return Ok(emit_message_handler(spec, queue, is_typescript));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = JsShape::detect(spec, &entry_source);
let entry_subpath = entry_subpath_for_shape(shape, is_typescript);
@ -694,6 +702,84 @@ if (typeof _m !== 'function') {{
}
}
/// Phase 20 (Track M.2) — message-handler harness for Node.js / TypeScript.
///
/// Imports the entry module, locates the handler function named by
/// `spec.entry_name`, mounts the `NyxSqsLoopback` in-process loopback,
/// and publishes the payload onto `queue` so the handler fires
/// synchronously. SQS is the only broker Node has a dedicated Phase
/// 20 adapter for (`sqs-node`); the dispatch defaults to it.
fn emit_message_handler(
spec: &HarnessSpec,
queue: &str,
is_typescript: bool,
) -> HarnessSource {
let probe = probe_shim();
let entry_subpath = if is_typescript { "entry.ts" } else { "entry.js" };
let entry_require_path = entry_require_path(entry_subpath);
let handler = &spec.entry_name;
let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::JavaScript);
let publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER;
let body = format!(
r#"'use strict';
// Nyx dynamic harness — message handler (Phase 20 / Track M.2).
{probe}
{sqs_src}
const payload = (process.env.NYX_PAYLOAD && process.env.NYX_PAYLOAD.length > 0)
? process.env.NYX_PAYLOAD
: (process.env.NYX_PAYLOAD_B64
? Buffer.from(process.env.NYX_PAYLOAD_B64, 'base64').toString('utf8')
: '');
let _entry;
try {{
_entry = require('./{entry_require_path}');
}} catch (e) {{
process.stderr.write('NYX_IMPORT_ERROR: ' + e.message + '\n');
process.exit(77);
}}
const _handler = _entry[{handler:?}]
|| (_entry.default && _entry.default[{handler:?}])
|| (typeof _entry.default === 'function' && _entry.default.name === {handler:?} ? _entry.default : null);
if (typeof _handler !== 'function') {{
process.stderr.write('NYX_HANDLER_NOT_FOUND: ' + {handler:?} + '\n');
process.exit(78);
}}
const _broker = new NyxSqsLoopback();
_broker.subscribe({queue:?}, async (envelope) => {{
try {{
// Sink-reachability sentinel — runner's `vuln_fired && sink_hit`
// gate requires this byte sequence on stdout / stderr.
process.stdout.write('__NYX_SINK_HIT__\n');
await Promise.resolve(_handler(envelope));
}} catch (e) {{
process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n');
}}
}});
(async () => {{
process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n');
_broker.publish({queue:?}, payload);
}})();
"#,
handler = handler,
queue = queue,
publish_marker = publish_marker,
);
HarnessSource {
source: body,
filename: "harness.js".to_owned(),
command: vec!["node".to_owned(), "harness.js".to_owned()],
extra_files: Vec::new(),
entry_subpath: Some(entry_subpath.to_owned()),
}
}
/// Phase 04 — Track J.2 SSTI harness for Node (Handlebars).
///
/// Reads `NYX_PAYLOAD`, simulates Handlebars's `{{helper a b}}`
@ -1748,6 +1834,7 @@ pub const SUPPORTED: &[EntryKindTag] = &[
EntryKindTag::CliSubcommand,
EntryKindTag::LibraryApi,
EntryKindTag::ClassMethod,
EntryKindTag::MessageHandler,
];
#[cfg(test)]

View file

@ -394,17 +394,16 @@ mod tests {
assert_eq!(EntryKind::Unknown.tag(), T::Unknown);
}
/// Phase 18 (Track M.0) baseline — the Phase 18 variants not yet
/// wired by a follow-up phase still route through the
/// supported-set gate so the verifier produces a structured
/// `Inconclusive(EntryKindUnsupported)` rather than degrading
/// silently. Phase 19 lands `ClassMethod`, so it is excluded
/// from the still-unsupported set.
/// Phase 18 (Track M.0) baseline — the variants not yet wired by a
/// follow-up phase still route through the supported-set gate so the
/// verifier produces a structured `Inconclusive(EntryKindUnsupported)`
/// rather than degrading silently. Phase 19 lands `ClassMethod`;
/// Phase 20 lands `MessageHandler` on five langs (Python, Java,
/// JavaScript, TypeScript, Go); the rest stay unsupported.
#[test]
fn entry_kind_phase_20_21_variants_are_unsupported_everywhere() {
fn entry_kind_phase_21_variants_are_unsupported_everywhere() {
use crate::evidence::EntryKindTag as T;
let still_unsupported = [
T::MessageHandler,
T::ScheduledJob,
T::GraphQLResolver,
T::WebSocket,
@ -427,7 +426,7 @@ mod tests {
for tag in still_unsupported {
assert!(
!supported.contains(&tag),
"{lang:?} prematurely advertised {tag:?} — Phase 20 / 21 has not landed the per-lang adapters for this variant"
"{lang:?} prematurely advertised {tag:?} — Phase 21 has not landed the per-lang adapters for this variant"
);
let hint = entry_kind_hint(lang, tag);
assert!(
@ -438,6 +437,44 @@ mod tests {
}
}
/// Phase 20 (Track M.2) — `MessageHandler` is supported on the five
/// langs the brief lists (Python, Java, JavaScript, TypeScript, Go)
/// and remains unsupported on the rest (Ruby, PHP, Rust, C, Cpp).
/// The verifier should produce a structured
/// `Inconclusive(EntryKindUnsupported)` for the unsupported set.
#[test]
fn entry_kind_message_handler_supported_in_phase_20_langs() {
use crate::evidence::EntryKindTag as T;
let supported_langs = [
Lang::Python,
Lang::Java,
Lang::JavaScript,
Lang::TypeScript,
Lang::Go,
];
let unsupported_langs = [
Lang::Php,
Lang::Ruby,
Lang::Rust,
Lang::C,
Lang::Cpp,
];
for lang in supported_langs {
let supported = entry_kinds_supported(lang);
assert!(
supported.contains(&T::MessageHandler),
"{lang:?} must advertise MessageHandler after Phase 20; got {supported:?}",
);
}
for lang in unsupported_langs {
let supported = entry_kinds_supported(lang);
assert!(
!supported.contains(&T::MessageHandler),
"{lang:?} must not yet advertise MessageHandler — Phase 20 only covers 5 langs",
);
}
}
/// Phase 19 (Track M.1) — every lang emitter now advertises
/// `ClassMethod` so the verifier dispatches structurally instead
/// of degrading to `Inconclusive(EntryKindUnsupported)`.

View file

@ -46,6 +46,7 @@ const SUPPORTED: &[EntryKindTag] = &[
EntryKindTag::HttpRoute,
EntryKindTag::CliSubcommand,
EntryKindTag::ClassMethod,
EntryKindTag::MessageHandler,
];
impl LangEmitter for PythonEmitter {
@ -691,6 +692,18 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
return Ok(emit_class_method(spec, class, method));
}
// Phase 20 (Track M.2): MessageHandler short-circuit. The harness
// publishes the payload through one of the in-process broker
// loopbacks (`NyxKafkaLoopback`, `NyxSqsLoopback`,
// `NyxPubsubLoopback`, `NyxRabbitChannel`) which routes synchronously
// to the registered handler. Broker selection is picked by
// `spec.framework.adapter`; an unknown / missing adapter falls back
// to the Kafka loopback (kept stable so test fixtures with no
// framework binding still drive the message-handler dispatch).
if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind {
return Ok(emit_message_handler(spec, queue));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = PythonShape::detect(spec, &entry_source);
let body = generate_for_shape(spec, shape);
@ -805,6 +818,160 @@ except Exception as _e:
}
}
/// Phase 20 (Track M.2) — message-handler harness for Python.
///
/// Imports the entry module, locates the handler function named by
/// `spec.entry_name`, registers it against the requested broker
/// loopback (`NyxKafkaLoopback` / `NyxSqsLoopback` / `NyxPubsubLoopback`
/// / `NyxRabbitChannel`), then publishes the payload onto `queue`. The
/// loopback dispatches synchronously so the handler under test fires
/// the sink before `main` returns.
///
/// Broker pick: derived from the spec's framework adapter id when
/// present (`kafka-python`, `sqs-python`, `pubsub-python`,
/// `rabbit-python`); otherwise defaults to Kafka, which keeps the
/// dispatch deterministic for fixtures with no framework binding.
fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource {
let preamble = harness_preamble(spec);
let postamble = harness_postamble();
let handler = &spec.entry_name;
let broker = python_broker_for_adapter(spec);
let kafka_src = crate::dynamic::stubs::kafka_source(crate::symbol::Lang::Python);
let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::Python);
let pubsub_src = crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Python);
let rabbit_src = crate::dynamic::stubs::rabbit_source(crate::symbol::Lang::Python);
let register_and_publish = match broker {
PythonBroker::Sqs => format!(
r#"_loop = NyxSqsLoopback()
def _nyx_sqs_dispatch(envelope):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(envelope)
_loop.subscribe({queue:?}, _nyx_sqs_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_loop.publish({queue:?}, payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER,
),
PythonBroker::Pubsub => format!(
r#"_loop = NyxPubsubLoopback()
def _nyx_pubsub_dispatch(message):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(message)
_loop.subscribe({queue:?}, _nyx_pubsub_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_loop.publish({queue:?}, payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER,
),
PythonBroker::Rabbit => format!(
r#"_chan = NyxRabbitChannel()
def _nyx_rabbit_dispatch(ch, method, props, body):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(ch, method, props, body)
_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER,
),
PythonBroker::Kafka => format!(
r#"_loop = NyxKafkaLoopback()
def _nyx_kafka_dispatch(message):
_h = getattr(_entry_mod, {handler:?}, None)
if _h is None:
print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True)
sys.exit(78)
_h(message)
_loop.subscribe({queue:?}, _nyx_kafka_dispatch)
print({publish_marker:?} + " " + {queue:?}, flush=True)
_loop.publish({queue:?}, payload)"#,
handler = handler,
queue = queue,
publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER,
),
};
let body = format!(
r#"# Shape: message handler — Phase 20 / Track M.2.
{kafka_src}
{sqs_src}
{pubsub_src}
{rabbit_src}
try:
{register_and_publish}
except SystemExit as _e:
sys.exit(_e.code)
except Exception as _e:
print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True)
"#,
kafka_src = kafka_src,
sqs_src = sqs_src,
pubsub_src = pubsub_src,
rabbit_src = rabbit_src,
register_and_publish = indent_lines(&register_and_publish, " "),
);
HarnessSource {
source: format!("{preamble}\n{body}\n{postamble}"),
filename: "harness.py".to_owned(),
command: vec!["python3".to_owned(), "harness.py".to_owned()],
extra_files: vec![],
entry_subpath: None,
}
}
#[derive(Debug, Clone, Copy)]
enum PythonBroker {
Kafka,
Sqs,
Pubsub,
Rabbit,
}
fn python_broker_for_adapter(spec: &HarnessSpec) -> PythonBroker {
let adapter = spec
.framework
.as_ref()
.map(|b| b.adapter.as_str())
.unwrap_or("");
match adapter {
"sqs-python" => PythonBroker::Sqs,
"pubsub-python" => PythonBroker::Pubsub,
"rabbit-python" => PythonBroker::Rabbit,
_ => PythonBroker::Kafka,
}
}
fn indent_lines(src: &str, prefix: &str) -> String {
let mut out = String::with_capacity(src.len() + 16);
let mut first = true;
for line in src.lines() {
if !first {
out.push('\n');
}
first = false;
if !line.is_empty() {
out.push_str(prefix);
}
out.push_str(line);
}
out
}
/// Phase 03 — Track J.1 deserialize harness for Python.
///
/// Reads the payload (`NYX_GADGET_CLASS:<class>`), constructs a