trustgraph/trustgraph-base/trustgraph/clients/llm_client.py
cybermaggedon 4fb0b4d8e8
Pub/sub abstraction: decouple from Pulsar (#751)
Remove Pulsar-specific concepts from application code so that
the pub/sub backend is swappable via configuration.

Rename translators:
- to_pulsar/from_pulsar → decode/encode across all translator
  classes, dispatch handlers, and tests (55+ files)
- from_response_with_completion → encode_with_completion
- Remove pulsar.schema.Record from translator base class

Queue naming (CLASS:TOPICSPACE:TOPIC):
- Replace topic() helper with queue() using new format:
  flow:tg:name, request:tg:name, response:tg:name, state:tg:name
- Queue class implies persistence/TTL (no QoS in names)
- Update Pulsar backend map_topic() to parse new format
- Librarian queues use flow class (persistent, for chunking)
- Config push uses state class (persistent, last-value)
- Remove 15 dead topic imports from schema files
- Update init_trustgraph.py namespace: config → state

Confine Pulsar to pulsar_backend.py:
- Delete legacy PulsarClient class from pubsub.py
- Move add_args to add_pubsub_args() with standalone flag
  for CLI tools (defaults to localhost)
- PulsarBackendConsumer.receive() catches _pulsar.Timeout,
  raises standard TimeoutError
- Remove Pulsar imports from: async_processor, flow_processor,
  log_level, all 11 client files, 4 storage writers, gateway
  service, gateway config receiver
- Remove log_level/LoggerLevel from client API
- Rewrite tg-monitor-prompts to use backend abstraction
- Update tg-dump-queues to use add_pubsub_args

Also: pubsub-abstraction.md tech spec covering problem statement,
design goals, as-is requirements, candidate broker assessment,
approach, and implementation order.
2026-04-01 20:16:53 +01:00

99 lines
3.1 KiB
Python

from .. schema import TextCompletionRequest, TextCompletionResponse
from .. schema import text_completion_request_queue
from .. schema import text_completion_response_queue
from . base import BaseClient
from .. exceptions import LlmError
# Ugly
class LlmClient(BaseClient):
def __init__(
self,
subscriber=None,
input_queue=None,
output_queue=None,
pulsar_host="pulsar://pulsar:6650",
pulsar_api_key=None,
):
if input_queue is None: input_queue = text_completion_request_queue
if output_queue is None: output_queue = text_completion_response_queue
super(LlmClient, self).__init__(
subscriber=subscriber,
input_queue=input_queue,
output_queue=output_queue,
pulsar_host=pulsar_host,
pulsar_api_key=pulsar_api_key,
input_schema=TextCompletionRequest,
output_schema=TextCompletionResponse,
)
def request(self, system, prompt, timeout=300, streaming=False):
"""
Non-streaming request (backward compatible).
Returns complete response string.
"""
if streaming:
raise ValueError("Use request_stream() for streaming requests")
return self.call(
system=system, prompt=prompt, streaming=False, timeout=timeout
).response
def request_stream(self, system, prompt, timeout=300):
"""
Streaming request generator.
Yields response chunks as they arrive.
Usage:
for chunk in client.request_stream(system, prompt):
print(chunk.response, end='', flush=True)
"""
import time
import uuid
id = str(uuid.uuid4())
request = TextCompletionRequest(
system=system, prompt=prompt, streaming=True
)
end_time = time.time() + timeout
self.producer.send(request, properties={"id": id})
# Collect responses until end_of_stream
while time.time() < end_time:
try:
msg = self.consumer.receive(timeout_millis=2500)
except Exception:
continue
mid = msg.properties()["id"]
if mid == id:
value = msg.value()
# Handle errors
if value.error:
self.consumer.acknowledge(msg)
if value.error.type == "llm-error":
raise LlmError(value.error.message)
else:
raise RuntimeError(
f"{value.error.type}: {value.error.message}"
)
self.consumer.acknowledge(msg)
yield value
# Check if this is the final chunk
if getattr(value, 'end_of_stream', True):
break
else:
# Ignore messages with wrong ID
self.consumer.acknowledge(msg)
if time.time() >= end_time:
raise TimeoutError("Timed out waiting for response")