From f44a7054287f54076053813052c8a08ec1ab31bf Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 30 Mar 2026 16:08:46 +0100 Subject: [PATCH] Add tg-monitor-prompts CLI tool for prompt queue monitoring (#737) Subscribes to prompt request/response Pulsar queues, correlates messages by ID, and logs a summary with template name, truncated terms, and elapsed time. Streaming responses are accumulated and shown at completion. Supports prompt and prompt-rag queue types. --- trustgraph-cli/pyproject.toml | 1 + .../trustgraph/cli/monitor_prompts.py | 344 ++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 trustgraph-cli/trustgraph/cli/monitor_prompts.py diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index fb3402c9..ab9dfb2f 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -35,6 +35,7 @@ tg-delete-kg-core = "trustgraph.cli.delete_kg_core:main" tg-delete-tool = "trustgraph.cli.delete_tool:main" tg-dump-msgpack = "trustgraph.cli.dump_msgpack:main" tg-dump-queues = "trustgraph.cli.dump_queues:main" +tg-monitor-prompts = "trustgraph.cli.monitor_prompts:main" tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-get-document-content = "trustgraph.cli.get_document_content:main" diff --git a/trustgraph-cli/trustgraph/cli/monitor_prompts.py b/trustgraph-cli/trustgraph/cli/monitor_prompts.py new file mode 100644 index 00000000..c412b643 --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/monitor_prompts.py @@ -0,0 +1,344 @@ +""" +Monitor prompt request/response queues and log activity with timing. + +Subscribes to prompt request and response Pulsar queues, correlates +them by message ID, and logs a summary of each request/response with +elapsed time. Streaming responses are accumulated and shown once at +completion. + +Examples: + tg-monitor-prompts + tg-monitor-prompts --flow default --max-lines 5 + tg-monitor-prompts --queue-type prompt-rag +""" + +import json +import asyncio +import sys +import argparse +from datetime import datetime +from collections import OrderedDict + +import pulsar +from pulsar.schema import BytesSchema + + +default_flow = "default" +default_queue_type = "prompt" +default_max_lines = 3 +default_max_width = 80 + + +def truncate_text(text, max_lines, max_width): + """Truncate text to max_lines lines, each at most max_width chars.""" + if not text: + return "(empty)" + + lines = text.splitlines() + result = [] + for line in lines[:max_lines]: + if len(line) > max_width: + result.append(line[:max_width - 3] + "...") + else: + result.append(line) + + if len(lines) > max_lines: + result.append(f" ... ({len(lines) - max_lines} more lines)") + + return "\n".join(result) + + +def summarise_value(value, max_width): + """Summarise a term value — show type and size for large values.""" + # Try to parse JSON + try: + parsed = json.loads(value) + except (json.JSONDecodeError, TypeError): + parsed = value + + if isinstance(parsed, list): + return f"[{len(parsed)} items]" + elif isinstance(parsed, dict): + return f"{{{len(parsed)} keys}}" + elif isinstance(parsed, str): + if len(parsed) > max_width: + return parsed[:max_width - 3] + "..." + return parsed + else: + s = str(parsed) + if len(s) > max_width: + return s[:max_width - 3] + "..." + return s + + +def format_terms(terms, max_lines, max_width): + """Format prompt terms for display — concise summary.""" + if not terms: + return "" + + parts = [] + for key, value in terms.items(): + summary = summarise_value(value, max_width - len(key) - 4) + parts.append(f" {key}: {summary}") + + return "\n".join(parts) + + +def parse_raw_message(msg): + """Parse a raw Pulsar message into (correlation_id, body_dict).""" + try: + props = msg.properties() + corr_id = props.get("id", "") + except Exception: + corr_id = "" + + try: + value = msg.value() + if isinstance(value, bytes): + value = value.decode("utf-8") + body = json.loads(value) if isinstance(value, str) else {} + except Exception: + body = {} + + return corr_id, body + + +def receive_with_timeout(consumer, timeout_ms=500): + """Receive a message with timeout, returning None on timeout.""" + try: + return consumer.receive(timeout_millis=timeout_ms) + except Exception: + return None + + +async def monitor(flow, queue_type, max_lines, max_width, + pulsar_host, listener_name): + + request_queue = f"non-persistent://tg/request/{queue_type}:{flow}" + response_queue = f"non-persistent://tg/response/{queue_type}:{flow}" + + print(f"Monitoring prompt queues:") + print(f" Request: {request_queue}") + print(f" Response: {response_queue}") + print(f"Press Ctrl+C to stop\n") + + client = pulsar.Client( + pulsar_host, + listener_name=listener_name, + ) + + req_consumer = client.subscribe( + request_queue, + subscription_name="prompt-monitor-req", + consumer_type=pulsar.ConsumerType.Shared, + schema=BytesSchema(), + initial_position=pulsar.InitialPosition.Latest, + ) + + resp_consumer = client.subscribe( + response_queue, + subscription_name="prompt-monitor-resp", + consumer_type=pulsar.ConsumerType.Shared, + schema=BytesSchema(), + initial_position=pulsar.InitialPosition.Latest, + ) + + # Track in-flight requests: corr_id -> (timestamp, template_id) + in_flight = OrderedDict() + + # Accumulate streaming responses: corr_id -> list of text chunks + streaming_chunks = {} + + print("Listening...\n") + + try: + while True: + got_message = False + + # Poll request queue + msg = receive_with_timeout(req_consumer, 100) + if msg: + got_message = True + timestamp = datetime.now() + corr_id, body = parse_raw_message(msg) + time_str = timestamp.strftime("%H:%M:%S.%f")[:-3] + + template_id = body.get("id", "(unknown)") + terms = body.get("terms", {}) + streaming = body.get("streaming", False) + + in_flight[corr_id] = (timestamp, template_id) + + # Limit size + while len(in_flight) > 1000: + in_flight.popitem(last=False) + + stream_flag = " [streaming]" if streaming else "" + id_display = corr_id[:8] if corr_id else "--------" + print(f"[{time_str}] REQ {id_display} " + f"template={template_id}{stream_flag}") + + if terms: + print(format_terms(terms, max_lines, max_width)) + + req_consumer.acknowledge(msg) + + # Poll response queue + msg = receive_with_timeout(resp_consumer, 100) + if msg: + got_message = True + timestamp = datetime.now() + corr_id, body = parse_raw_message(msg) + time_str = timestamp.strftime("%H:%M:%S.%f")[:-3] + id_display = corr_id[:8] if corr_id else "--------" + + error = body.get("error") + text = body.get("text", "") + obj = body.get("object", "") + eos = body.get("end_of_stream", False) + + if error: + # Error — show immediately + elapsed_str = "" + if corr_id in in_flight: + req_timestamp, _ = in_flight.pop(corr_id) + elapsed = (timestamp - req_timestamp).total_seconds() + elapsed_str = f" ({elapsed:.3f}s)" + streaming_chunks.pop(corr_id, None) + + err_msg = error + if isinstance(error, dict): + err_msg = error.get("message", str(error)) + print(f"[{time_str}] ERR {id_display} " + f"{err_msg}{elapsed_str}") + + elif eos: + # End of stream — show accumulated text + timing + elapsed_str = "" + if corr_id in in_flight: + req_timestamp, _ = in_flight.pop(corr_id) + elapsed = (timestamp - req_timestamp).total_seconds() + elapsed_str = f" ({elapsed:.3f}s)" + + accumulated = streaming_chunks.pop(corr_id, []) + if text: + accumulated.append(text) + + full_text = "".join(accumulated) + if full_text: + truncated = truncate_text( + full_text, max_lines, max_width + ) + print(f"[{time_str}] RESP {id_display}" + f"{elapsed_str}") + print(f" {truncated}") + else: + print(f"[{time_str}] RESP {id_display}" + f"{elapsed_str} (empty)") + + elif text or obj: + # Streaming chunk or non-streaming response + if corr_id in streaming_chunks or ( + corr_id in in_flight + ): + # Accumulate streaming chunk + if corr_id not in streaming_chunks: + streaming_chunks[corr_id] = [] + streaming_chunks[corr_id].append(text or obj) + else: + # Non-streaming single response + elapsed_str = "" + if corr_id in in_flight: + req_timestamp, _ = in_flight.pop(corr_id) + elapsed = ( + timestamp - req_timestamp + ).total_seconds() + elapsed_str = f" ({elapsed:.3f}s)" + + content = text or obj + label = "" if text else " (object)" + truncated = truncate_text( + content, max_lines, max_width + ) + print(f"[{time_str}] RESP {id_display}" + f"{label}{elapsed_str}") + print(f" {truncated}") + + resp_consumer.acknowledge(msg) + + if not got_message: + await asyncio.sleep(0.05) + + except KeyboardInterrupt: + print("\nStopping...") + finally: + req_consumer.close() + resp_consumer.close() + client.close() + + +def main(): + parser = argparse.ArgumentParser( + prog="tg-monitor-prompts", + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "-f", "--flow", + default=default_flow, + help=f"Flow ID (default: {default_flow})", + ) + + parser.add_argument( + "-q", "--queue-type", + default=default_queue_type, + help=f"Queue type: prompt or prompt-rag (default: {default_queue_type})", + ) + + parser.add_argument( + "-l", "--max-lines", + type=int, + default=default_max_lines, + help=f"Max lines of text per term/response (default: {default_max_lines})", + ) + + parser.add_argument( + "-w", "--max-width", + type=int, + default=default_max_width, + help=f"Max width per line (default: {default_max_width})", + ) + + parser.add_argument( + "--pulsar-host", + default="pulsar://localhost:6650", + help="Pulsar host URL (default: pulsar://localhost:6650)", + ) + + parser.add_argument( + "--listener-name", + default="localhost", + help="Pulsar listener name (default: localhost)", + ) + + args = parser.parse_args() + + try: + asyncio.run(monitor( + flow=args.flow, + queue_type=args.queue_type, + max_lines=args.max_lines, + max_width=args.max_width, + pulsar_host=args.pulsar_host, + listener_name=args.listener_name, + )) + except KeyboardInterrupt: + pass + except Exception as e: + print(f"Fatal error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()