trustgraph/trustgraph-cli/trustgraph/cli/monitor_prompts.py

329 lines
10 KiB
Python
Raw Normal View History

"""
Monitor prompt request/response queues and log activity with timing.
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
Subscribes to prompt request and response queues, correlates
them by message ID, and logs a summary of each request/response with
elapsed time. Streaming responses are accumulated and shown once at
completion.
Examples:
tg-monitor-prompts
tg-monitor-prompts --flow default --max-lines 5
tg-monitor-prompts --queue-type prompt-rag
"""
import json
import asyncio
import sys
import argparse
from datetime import datetime
from collections import OrderedDict
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
default_flow = "default"
default_queue_type = "prompt-rag"
default_max_lines = 3
default_max_width = 80
def truncate_text(text, max_lines, max_width):
"""Truncate text to max_lines lines, each at most max_width chars."""
if not text:
return "(empty)"
lines = text.splitlines()
result = []
for line in lines[:max_lines]:
if len(line) > max_width:
result.append(line[:max_width - 3] + "...")
else:
result.append(line)
if len(lines) > max_lines:
result.append(f" ... ({len(lines) - max_lines} more lines)")
return "\n".join(result)
def summarise_value(value, max_width):
"""Summarise a term value — show type and size for large values."""
# Try to parse JSON
try:
parsed = json.loads(value)
except (json.JSONDecodeError, TypeError):
parsed = value
if isinstance(parsed, list):
return f"[{len(parsed)} items]"
elif isinstance(parsed, dict):
return f"{{{len(parsed)} keys}}"
elif isinstance(parsed, str):
if len(parsed) > max_width:
return parsed[:max_width - 3] + "..."
return parsed
else:
s = str(parsed)
if len(s) > max_width:
return s[:max_width - 3] + "..."
return s
def format_terms(terms, max_lines, max_width):
"""Format prompt terms for display — concise summary."""
if not terms:
return ""
parts = []
for key, value in terms.items():
summary = summarise_value(value, max_width - len(key) - 4)
parts.append(f" {key}: {summary}")
return "\n".join(parts)
def parse_raw_message(msg):
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
"""Parse a raw message into (correlation_id, body_dict)."""
try:
props = msg.properties()
corr_id = props.get("id", "")
except Exception:
corr_id = ""
try:
value = msg.value()
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
if isinstance(value, dict):
body = value
elif isinstance(value, bytes):
body = json.loads(value.decode("utf-8"))
elif isinstance(value, str):
body = json.loads(value)
else:
body = {}
except Exception:
body = {}
return corr_id, body
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
async def monitor(flow, queue_type, max_lines, max_width, **config):
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
request_queue = f"request:tg:{queue_type}:{flow}"
response_queue = f"response:tg:{queue_type}:{flow}"
print(f"Monitoring prompt queues:")
print(f" Request: {request_queue}")
print(f" Response: {response_queue}")
print(f"Press Ctrl+C to stop\n")
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
backend = get_pubsub(**config)
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
req_consumer = backend.create_consumer(
topic=request_queue,
subscription="prompt-monitor-req",
schema=None,
initial_position='latest',
)
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
resp_consumer = backend.create_consumer(
topic=response_queue,
subscription="prompt-monitor-resp",
schema=None,
initial_position='latest',
)
# Track in-flight requests: corr_id -> (timestamp, template_id)
in_flight = OrderedDict()
# Accumulate streaming responses: corr_id -> list of text chunks
streaming_chunks = {}
print("Listening...\n")
try:
while True:
got_message = False
# Poll request queue
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
try:
msg = req_consumer.receive(timeout_millis=100)
got_message = True
timestamp = datetime.now()
corr_id, body = parse_raw_message(msg)
time_str = timestamp.strftime("%H:%M:%S.%f")[:-3]
template_id = body.get("id", "(unknown)")
terms = body.get("terms", {})
streaming = body.get("streaming", False)
in_flight[corr_id] = (timestamp, template_id)
# Limit size
while len(in_flight) > 1000:
in_flight.popitem(last=False)
stream_flag = " [streaming]" if streaming else ""
id_display = corr_id[:8] if corr_id else "--------"
print(f"[{time_str}] REQ {id_display} "
f"template={template_id}{stream_flag}")
if terms:
print(format_terms(terms, max_lines, max_width))
req_consumer.acknowledge(msg)
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
except TimeoutError:
pass
# Poll response queue
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
try:
msg = resp_consumer.receive(timeout_millis=100)
got_message = True
timestamp = datetime.now()
corr_id, body = parse_raw_message(msg)
time_str = timestamp.strftime("%H:%M:%S.%f")[:-3]
id_display = corr_id[:8] if corr_id else "--------"
error = body.get("error")
text = body.get("text", "")
obj = body.get("object", "")
eos = body.get("end_of_stream", False)
if error:
# Error — show immediately
elapsed_str = ""
if corr_id in in_flight:
req_timestamp, _ = in_flight.pop(corr_id)
elapsed = (timestamp - req_timestamp).total_seconds()
elapsed_str = f" ({elapsed:.3f}s)"
streaming_chunks.pop(corr_id, None)
err_msg = error
if isinstance(error, dict):
err_msg = error.get("message", str(error))
print(f"[{time_str}] ERR {id_display} "
f"{err_msg}{elapsed_str}")
elif eos:
# End of stream — show accumulated text + timing
elapsed_str = ""
if corr_id in in_flight:
req_timestamp, _ = in_flight.pop(corr_id)
elapsed = (timestamp - req_timestamp).total_seconds()
elapsed_str = f" ({elapsed:.3f}s)"
accumulated = streaming_chunks.pop(corr_id, [])
if text:
accumulated.append(text)
full_text = "".join(accumulated)
if full_text:
truncated = truncate_text(
full_text, max_lines, max_width
)
print(f"[{time_str}] RESP {id_display}"
f"{elapsed_str}")
print(f" {truncated}")
else:
print(f"[{time_str}] RESP {id_display}"
f"{elapsed_str} (empty)")
elif text or obj:
# Streaming chunk or non-streaming response
if corr_id in streaming_chunks or (
corr_id in in_flight
):
# Accumulate streaming chunk
if corr_id not in streaming_chunks:
streaming_chunks[corr_id] = []
streaming_chunks[corr_id].append(text or obj)
else:
# Non-streaming single response
elapsed_str = ""
if corr_id in in_flight:
req_timestamp, _ = in_flight.pop(corr_id)
elapsed = (
timestamp - req_timestamp
).total_seconds()
elapsed_str = f" ({elapsed:.3f}s)"
content = text or obj
label = "" if text else " (object)"
truncated = truncate_text(
content, max_lines, max_width
)
print(f"[{time_str}] RESP {id_display}"
f"{label}{elapsed_str}")
print(f" {truncated}")
resp_consumer.acknowledge(msg)
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
except TimeoutError:
pass
if not got_message:
await asyncio.sleep(0.05)
except KeyboardInterrupt:
print("\nStopping...")
finally:
req_consumer.close()
resp_consumer.close()
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
backend.close()
def main():
parser = argparse.ArgumentParser(
prog="tg-monitor-prompts",
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-f", "--flow",
default=default_flow,
help=f"Flow ID (default: {default_flow})",
)
parser.add_argument(
"-q", "--queue-type",
default=default_queue_type,
help=f"Queue type: prompt or prompt-rag (default: {default_queue_type})",
)
parser.add_argument(
"-l", "--max-lines",
type=int,
default=default_max_lines,
help=f"Max lines of text per term/response (default: {default_max_lines})",
)
parser.add_argument(
"-w", "--max-width",
type=int,
default=default_max_width,
help=f"Max width per line (default: {default_max_width})",
)
Pub/sub abstraction: decouple from Pulsar (#751) Remove Pulsar-specific concepts from application code so that the pub/sub backend is swappable via configuration. Rename translators: - to_pulsar/from_pulsar → decode/encode across all translator classes, dispatch handlers, and tests (55+ files) - from_response_with_completion → encode_with_completion - Remove pulsar.schema.Record from translator base class Queue naming (CLASS:TOPICSPACE:TOPIC): - Replace topic() helper with queue() using new format: flow:tg:name, request:tg:name, response:tg:name, state:tg:name - Queue class implies persistence/TTL (no QoS in names) - Update Pulsar backend map_topic() to parse new format - Librarian queues use flow class (persistent, for chunking) - Config push uses state class (persistent, last-value) - Remove 15 dead topic imports from schema files - Update init_trustgraph.py namespace: config → state Confine Pulsar to pulsar_backend.py: - Delete legacy PulsarClient class from pubsub.py - Move add_args to add_pubsub_args() with standalone flag for CLI tools (defaults to localhost) - PulsarBackendConsumer.receive() catches _pulsar.Timeout, raises standard TimeoutError - Remove Pulsar imports from: async_processor, flow_processor, log_level, all 11 client files, 4 storage writers, gateway service, gateway config receiver - Remove log_level/LoggerLevel from client API - Rewrite tg-monitor-prompts to use backend abstraction - Update tg-dump-queues to use add_pubsub_args Also: pubsub-abstraction.md tech spec covering problem statement, design goals, as-is requirements, candidate broker assessment, approach, and implementation order.
2026-04-01 20:16:53 +01:00
add_pubsub_args(parser, standalone=True)
args = parser.parse_args()
try:
asyncio.run(monitor(
flow=args.flow,
queue_type=args.queue_type,
max_lines=args.max_lines,
max_width=args.max_width,
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
**{k: v for k, v in vars(args).items()
if k not in ('flow', 'queue_type', 'max_lines', 'max_width')},
))
except KeyboardInterrupt:
pass
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()