Revert "Feature/configure flows (#345)"

This reverts commit a9197d11ee.
This commit is contained in:
Cyber MacGeddon 2025-04-25 19:02:08 +01:00
parent 3adb3cf59c
commit 1822ca395f
125 changed files with 2628 additions and 3751 deletions

View file

@ -1,31 +1,8 @@
from . pubsub import PulsarClient
from . async_processor import AsyncProcessor
from . base_processor import BaseProcessor
from . consumer import Consumer
from . producer import Producer
from . consumer_producer import ConsumerProducer
from . publisher import Publisher
from . subscriber import Subscriber
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec
from . setting_spec import SettingSpec
from . producer_spec import ProducerSpec
from . subscriber_spec import SubscriberSpec
from . request_response_spec import RequestResponseSpec
from . llm_service import LlmService, LlmResult
from . embeddings_service import EmbeddingsService
from . embeddings_client import EmbeddingsClientSpec
from . text_completion_client import TextCompletionClientSpec
from . prompt_client import PromptClientSpec
from . triples_store_service import TriplesStoreService
from . graph_embeddings_store_service import GraphEmbeddingsStoreService
from . document_embeddings_store_service import DocumentEmbeddingsStoreService
from . triples_query_service import TriplesQueryService
from . graph_embeddings_query_service import GraphEmbeddingsQueryService
from . document_embeddings_query_service import DocumentEmbeddingsQueryService
from . graph_embeddings_client import GraphEmbeddingsClientSpec
from . triples_client import TriplesClientSpec
from . document_embeddings_client import DocumentEmbeddingsClientSpec
from . agent_service import AgentService
from . graph_rag_client import GraphRagClientSpec

View file

@ -1,39 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import AgentRequest, AgentResponse
from .. knowledge import Uri, Literal
class AgentClient(RequestResponse):
async def request(self, recipient, question, plan=None, state=None,
history=[], timeout=300):
resp = await self.request(
AgentRequest(
question = question,
plan = plan,
state = state,
history = history,
),
recipient=recipient,
timeout=timeout,
)
print(resp, flush=True)
if resp.error:
raise RuntimeError(resp.error.message)
return resp
class GraphEmbeddingsClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(GraphEmbeddingsClientSpec, self).__init__(
request_name = request_name,
request_schema = GraphEmbeddingsRequest,
response_name = response_name,
response_schema = GraphEmbeddingsResponse,
impl = GraphEmbeddingsClient,
)

View file

@ -1,100 +0,0 @@
"""
Agent manager service completion base class
"""
import time
from prometheus_client import Histogram
from .. schema import AgentRequest, AgentResponse, Error
from .. exceptions import TooManyRequests
from .. base import FlowProcessor, ConsumerSpec, ProducerSpec
default_ident = "agent-manager"
class AgentService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(AgentService, self).__init__(**params | { "id": id })
self.register_specification(
ConsumerSpec(
name = "request",
schema = AgentRequest,
handler = self.on_request
)
)
self.register_specification(
ProducerSpec(
name = "next",
schema = AgentRequest
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = AgentResponse
)
)
async def on_request(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
async def respond(resp):
await flow("response").send(
resp,
properties={"id": id}
)
async def next(resp):
await flow("next").send(
resp,
properties={"id": id}
)
await self.agent_request(
request = request, respond = respond, next = next,
flow = flow
)
except TooManyRequests as e:
raise e
except Exception as e:
# Apart from rate limits, treat all exceptions as unrecoverable
print(f"on_request Exception: {e}")
print("Send error response...", flush=True)
await flow.producer["response"].send(
AgentResponse(
error=Error(
type = "agent-error",
message = str(e),
),
thought = None,
observation = None,
answer = None,
),
properties={"id": id}
)
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -1,254 +0,0 @@
# Base class for processors. Implements:
# - Pulsar client, subscribe and consume basic
# - the async startup logic
# - Initialising metrics
import asyncio
import argparse
import _pulsar
import time
import uuid
from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, config_push_queue
from .. log_level import LogLevel
from .. exceptions import TooManyRequests
from . pubsub import PulsarClient
from . producer import Producer
from . consumer import Consumer
from . metrics import ProcessorMetrics
default_config_queue = config_push_queue
# Async processor
class AsyncProcessor:
def __init__(self, **params):
# Store the identity
self.id = params.get("id")
# Register a pulsar client
self.pulsar_client = PulsarClient(**params)
# Initialise metrics, records the parameters
ProcessorMetrics(id=self.id).info({
k: str(params[k])
for k in params
if k != "id"
})
# The processor runs all activity in a taskgroup, it's mandatory
# that this is provded
self.taskgroup = params.get("taskgroup")
if self.taskgroup is None:
raise RuntimeError("Essential taskgroup missing")
# Get the configuration topic
self.config_push_queue = params.get(
"config_push_queue", default_config_queue
)
# This records registered configuration handlers
self.config_handlers = []
# Create a random ID for this subscription to the configuration
# service
config_subscriber_id = str(uuid.uuid4())
# Subscribe to config queue
self.config_sub_task = Consumer(
taskgroup = self.taskgroup,
client = self.client,
subscriber = config_subscriber_id,
flow = None,
topic = self.config_push_queue,
schema = ConfigPush,
handler = self.on_config_change,
# This causes new subscriptions to view the entire history of
# configuration
start_of_messages = True
)
self.running = True
# This is called to start dynamic behaviour. An over-ride point for
# extra functionality
async def start(self):
await self.config_sub_task.start()
# This is called to stop all threads. An over-ride point for extra
# functionality
def stop(self):
self.client.close()
self.running = False
# Returns the pulsar host
@property
def pulsar_host(self): return self.client.pulsar_host
# Returns the pulsar client
@property
def client(self): return self.pulsar_client.client
# Register a new event handler for configuration change
def register_config_handler(self, handler):
self.config_handlers.append(handler)
# Called when a new configuration message push occurs
async def on_config_change(self, message, consumer):
# Get configuration data and version number
config = message.value().config
version = message.value().version
# Acknowledge the message
consumer.acknowledge(message)
# Invoke message handlers
print("Config change event", config, version, flush=True)
for ch in self.config_handlers:
await ch(config, version)
# This is the 'main' body of the handler. It is a point to override
# if needed. By default does nothing. Processors are implemented
# by adding consumer/producer functionality so maybe nothing is needed
# in the run() body
async def run(self):
while self.running:
await asyncio.sleep(2)
# Startup fabric. This runs in 'async' mode, creates a taskgroup and
# runs the producer.
@classmethod
async def launch_async(cls, args):
try:
# Create a taskgroup. This seems complicated, when an exception
# occurs, unhandled it looks like it cancels all threads in the
# taskgroup. Needs the exception to be caught in the right
# place.
async with asyncio.TaskGroup() as tg:
# Create a processor instance, and include the taskgroup
# as a paramter. A processor identity ident is used as
# - subscriber name
# - an identifier for flow configuration
p = cls(**args | { "taskgroup": tg })
# Start the processor
await p.start()
# Run the processor
task = tg.create_task(p.run())
# The taskgroup causes everything to wait until
# all threads have stopped
# This is here to output a debug message, shouldn't be needed.
except Exception as e:
print("Exception, closing taskgroup", flush=True)
raise e
# Startup fabric. launch calls launch_async in async mode.
@classmethod
def launch(cls, ident, doc):
# Start assembling CLI arguments
parser = argparse.ArgumentParser(
prog=ident,
description=doc
)
parser.add_argument(
'--id',
default=ident,
help=f'Configuration identity (default: {ident})',
)
# Invoke the class-specific add_args, which manages adding all the
# command-line arguments
cls.add_args(parser)
# Parse arguments
args = parser.parse_args()
args = vars(args)
# Debug
print(args, flush=True)
# Start the Prometheus metrics service if needed
if args["metrics"]:
start_http_server(args["metrics_port"])
# Loop forever, exception handler
while True:
print("Starting...", flush=True)
try:
# Launch the processor in an asyncio handler
asyncio.run(cls.launch_async(
args
))
except KeyboardInterrupt:
print("Keyboard interrupt.", flush=True)
return
except _pulsar.Interrupted:
print("Pulsar Interrupted.", flush=True)
return
# Exceptions from a taskgroup come in as an exception group
except ExceptionGroup as e:
print("Exception group:", flush=True)
for se in e.exceptions:
print(" Type:", type(se), flush=True)
print(f" Exception: {se}", flush=True)
except Exception as e:
print("Type:", type(e), flush=True)
print("Exception:", e, flush=True)
# Retry occurs here
print("Will retry...", flush=True)
time.sleep(4)
print("Retrying...", flush=True)
# The command-line arguments are built using a stack of add_args
# invocations
@staticmethod
def add_args(parser):
PulsarClient.add_args(parser)
parser.add_argument(
'--config-push-queue',
default=default_config_queue,
help=f'Config push queue {default_config_queue}',
)
parser.add_argument(
'--metrics',
action=argparse.BooleanOptionalAction,
default=True,
help=f'Metrics enabled (default: true)',
)
parser.add_argument(
'-P', '--metrics-port',
type=int,
default=8000,
help=f'Pulsar host (default: 8000)',
)

View file

@ -0,0 +1,210 @@
import asyncio
import os
import argparse
import pulsar
from pulsar.schema import JsonSchema
import _pulsar
import time
import uuid
from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, config_push_queue
from .. log_level import LogLevel
default_config_queue = config_push_queue
config_subscriber_id = str(uuid.uuid4())
class BaseProcessor:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
def __init__(self, **params):
self.client = None
if not hasattr(__class__, "params_metric"):
__class__.params_metric = Info(
'params', 'Parameters configuration'
)
# FIXME: Maybe outputs information it should not
__class__.params_metric.info({
k: str(params[k])
for k in params
})
pulsar_host = params.get("pulsar_host", self.default_pulsar_host)
pulsar_listener = params.get("pulsar_listener", None)
pulsar_api_key = params.get("pulsar_api_key", None)
log_level = params.get("log_level", LogLevel.INFO)
self.config_push_queue = params.get(
"config_push_queue",
default_config_queue
)
self.pulsar_host = pulsar_host
self.pulsar_api_key = pulsar_api_key
if pulsar_api_key:
auth = pulsar.AuthenticationToken(pulsar_api_key)
self.client = pulsar.Client(
pulsar_host,
authentication=auth,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
else:
self.client = pulsar.Client(
pulsar_host,
listener_name=pulsar_listener,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.pulsar_listener = pulsar_listener
self.config_subscriber = self.client.subscribe(
self.config_push_queue, config_subscriber_id,
consumer_type=pulsar.ConsumerType.Shared,
initial_position=pulsar.InitialPosition.Earliest,
schema=JsonSchema(ConfigPush),
)
def __del__(self):
if hasattr(self, "client"):
if self.client:
self.client.close()
@staticmethod
def add_args(parser):
parser.add_argument(
'-p', '--pulsar-host',
default=__class__.default_pulsar_host,
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)
parser.add_argument(
'--pulsar-api-key',
default=__class__.default_pulsar_api_key,
help=f'Pulsar API key',
)
parser.add_argument(
'--config-push-queue',
default=default_config_queue,
help=f'Config push queue {default_config_queue}',
)
parser.add_argument(
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'--metrics',
action=argparse.BooleanOptionalAction,
default=True,
help=f'Metrics enabled (default: true)',
)
parser.add_argument(
'-P', '--metrics-port',
type=int,
default=8000,
help=f'Pulsar host (default: 8000)',
)
async def start(self):
pass
async def run_config_queue(self):
if self.module == "config.service":
print("I am config-svc, not looking at config queue", flush=True)
return
print("Config thread running", flush=True)
while True:
try:
msg = await asyncio.to_thread(
self.config_subscriber.receive, timeout_millis=2000
)
except pulsar.Timeout:
continue
v = msg.value()
print("Got config version", v.version, flush=True)
await self.on_config(v.version, v.config)
async def on_config(self, version, config):
pass
async def run(self):
raise RuntimeError("Something should have implemented the run method")
@classmethod
async def launch_async(cls, args, prog):
p = cls(**args)
p.module = prog
await p.start()
task1 = asyncio.create_task(p.run_config_queue())
task2 = asyncio.create_task(p.run())
await asyncio.gather(task1, task2)
@classmethod
def launch(cls, prog, doc):
parser = argparse.ArgumentParser(
prog=prog,
description=doc
)
cls.add_args(parser)
args = parser.parse_args()
args = vars(args)
print(args)
if args["metrics"]:
start_http_server(args["metrics_port"])
while True:
try:
asyncio.run(cls.launch_async(args, prog))
except KeyboardInterrupt:
print("Keyboard interrupt.")
return
except _pulsar.Interrupted:
print("Pulsar Interrupted.")
return
except Exception as e:
print(type(e))
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(4)

View file

@ -1,136 +1,93 @@
import asyncio
from pulsar.schema import JsonSchema
import pulsar
import _pulsar
import asyncio
from prometheus_client import Histogram, Info, Counter, Enum
import time
from . base_processor import BaseProcessor
from .. exceptions import TooManyRequests
class Consumer:
default_rate_limit_retry = 10
default_rate_limit_timeout = 7200
def __init__(
self, taskgroup, flow, client, topic, subscriber, schema,
handler,
metrics = None,
start_of_messages=False,
rate_limit_retry_time = 10, rate_limit_timeout = 7200,
reconnect_time = 5,
):
class Consumer(BaseProcessor):
self.taskgroup = taskgroup
self.flow = flow
self.client = client
self.topic = topic
self.subscriber = subscriber
self.schema = schema
self.handler = handler
def __init__(self, **params):
self.rate_limit_retry_time = rate_limit_retry_time
self.rate_limit_timeout = rate_limit_timeout
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'processor_state', 'Processor state',
states=['starting', 'running', 'stopped']
)
__class__.state_metric.state('starting')
self.reconnect_time = 5
__class__.state_metric.state('starting')
self.start_of_messages = start_of_messages
super(Consumer, self).__init__(**params)
self.running = True
self.task = None
self.input_queue = params.get("input_queue")
self.subscriber = params.get("subscriber")
self.input_schema = params.get("input_schema")
self.metrics = metrics
self.rate_limit_retry = params.get(
"rate_limit_retry", default_rate_limit_retry
)
self.rate_limit_timeout = params.get(
"rate_limit_timeout", default_rate_limit_timeout
)
self.consumer = None
if self.input_schema == None:
raise RuntimeError("input_schema must be specified")
def __del__(self):
self.running = False
if not hasattr(__class__, "request_metric"):
__class__.request_metric = Histogram(
'request_latency', 'Request latency (seconds)'
)
if hasattr(self, "consumer"):
if self.consumer:
self.consumer.close()
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
async def stop(self):
if not hasattr(__class__, "processing_metric"):
__class__.processing_metric = Counter(
'processing_count', 'Processing count', ["status"]
)
self.running = False
await self.task
if not hasattr(__class__, "rate_limit_metric"):
__class__.rate_limit_metric = Counter(
'rate_limit_count', 'Rate limit event count',
)
async def start(self):
__class__.pubsub_metric.info({
"input_queue": self.input_queue,
"subscriber": self.subscriber,
"input_schema": self.input_schema.__name__,
"rate_limit_retry": str(self.rate_limit_retry),
"rate_limit_timeout": str(self.rate_limit_timeout),
})
self.running = True
self.consumer = self.client.subscribe(
self.input_queue, self.subscriber,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(self.input_schema),
)
# Puts it in the stopped state, the run thread should set running
if self.metrics:
self.metrics.state("stopped")
self.task = self.taskgroup.create_task(self.run())
print("Initialised consumer.", flush=True)
async def run(self):
while self.running:
__class__.state_metric.state('running')
if self.metrics:
self.metrics.state("stopped")
while True:
try:
print(self.topic, "subscribing...", flush=True)
if self.start_of_messages:
pos = pulsar.InitialPosition.Earliest
else:
pos = pulsar.InitialPosition.Latest
self.consumer = await asyncio.to_thread(
self.client.subscribe,
topic = self.topic,
subscription_name = self.subscriber,
schema = JsonSchema(self.schema),
initial_position = pos,
consumer_type = pulsar.ConsumerType.Shared,
)
except Exception as e:
print("consumer subs Exception:", e, flush=True)
await asyncio.sleep(self.reconnect_time)
continue
print(self.topic, "subscribed", flush=True)
if self.metrics:
self.metrics.state("running")
try:
await self.consume()
if self.metrics:
self.metrics.state("stopped")
except Exception as e:
print("consumer loop exception:", e, flush=True)
self.consumer.close()
self.consumer = None
await asyncio.sleep(self.reconnect_time)
continue
async def consume(self):
while self.running:
try:
msg = await asyncio.to_thread(
self.consumer.receive,
timeout_millis=2000
)
except _pulsar.Timeout:
continue
except Exception as e:
raise e
msg = await asyncio.to_thread(self.consumer.receive)
expiry = time.time() + self.rate_limit_timeout
# This loop is for retry on rate-limit / resource limits
while self.running:
while True:
if time.time() > expiry:
@ -140,31 +97,20 @@ class Consumer:
# be retried
self.consumer.negative_acknowledge(msg)
if self.metrics:
self.metrics.process("error")
__class__.processing_metric.labels(status="error").inc()
# Break out of retry loop, processes next message
break
try:
print("Handle...", flush=True)
if self.metrics:
with self.metrics.record_time():
await self.handler(msg, self, self.flow)
else:
await self.handler(msg, self.consumer)
print("Handled.", flush=True)
with __class__.request_metric.time():
await self.handle(msg)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
if self.metrics:
self.metrics.process("success")
__class__.processing_metric.labels(status="success").inc()
# Break out of retry loop
break
@ -173,25 +119,55 @@ class Consumer:
print("TooManyRequests: will retry...", flush=True)
if self.metrics:
self.metrics.rate_limit()
__class__.rate_limit_metric.inc()
# Sleep
await asyncio.sleep(self.rate_limit_retry_time)
time.sleep(self.rate_limit_retry)
# Contine from retry loop, just causes a reprocessing
continue
except Exception as e:
print("consume exception:", e, flush=True)
print("Exception:", e, flush=True)
# Message failed to be processed, this causes it to
# be retried
self.consumer.negative_acknowledge(msg)
if self.metrics:
self.metrics.process("error")
__class__.processing_metric.labels(status="error").inc()
# Break out of retry loop, processes next message
break
@staticmethod
def add_args(parser, default_input_queue, default_subscriber):
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'--rate-limit-retry',
type=int,
default=default_rate_limit_retry,
help=f'Rate limit retry (default: {default_rate_limit_retry})'
)
parser.add_argument(
'--rate-limit-timeout',
type=int,
default=default_rate_limit_timeout,
help=f'Rate limit timeout (default: {default_rate_limit_timeout})'
)

View file

@ -0,0 +1,62 @@
from pulsar.schema import JsonSchema
import pulsar
from prometheus_client import Histogram, Info, Counter, Enum
import time
from . consumer import Consumer
from .. exceptions import TooManyRequests
class ConsumerProducer(Consumer):
def __init__(self, **params):
super(ConsumerProducer, self).__init__(**params)
self.output_queue = params.get("output_queue")
self.output_schema = params.get("output_schema")
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created'
)
__class__.pubsub_metric.info({
"input_queue": self.input_queue,
"output_queue": self.output_queue,
"subscriber": self.subscriber,
"input_schema": self.input_schema.__name__,
"output_schema": self.output_schema.__name__,
"rate_limit_retry": str(self.rate_limit_retry),
"rate_limit_timeout": str(self.rate_limit_timeout),
})
if self.output_schema == None:
raise RuntimeError("output_schema must be specified")
self.producer = self.client.create_producer(
topic=self.output_queue,
schema=JsonSchema(self.output_schema),
chunking_enabled=True,
)
print("Initialised consumer/producer.")
async def send(self, msg, properties={}):
self.producer.send(msg, properties)
__class__.output_metric.inc()
@staticmethod
def add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
):
Consumer.add_args(parser, default_input_queue, default_subscriber)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)

View file

@ -1,36 +0,0 @@
from . metrics import ConsumerMetrics
from . consumer import Consumer
from . spec import Spec
class ConsumerSpec(Spec):
def __init__(self, name, schema, handler):
self.name = name
self.schema = schema
self.handler = handler
def add(self, flow, processor, definition):
consumer_metrics = ConsumerMetrics(
flow.id, f"{flow.name}-{self.name}"
)
consumer = Consumer(
taskgroup = processor.taskgroup,
flow = flow,
client = processor.client,
topic = definition[self.name],
subscriber = processor.id + "--" + self.name,
schema = self.schema,
handler = self.handler,
metrics = consumer_metrics,
)
# Consumer handle gets access to producers and other
# metadata
consumer.id = flow.id
consumer.name = self.name
consumer.flow = flow
flow.consumer[self.name] = consumer

View file

@ -1,38 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
from .. knowledge import Uri, Literal
class DocumentEmbeddingsClient(RequestResponse):
async def query(self, vectors, limit=20, user="trustgraph",
collection="default", timeout=30):
resp = await self.request(
DocumentEmbeddingsRequest(
vectors = vectors,
limit = limit,
user = user,
collection = collection
),
timeout=timeout
)
print(resp, flush=True)
if resp.error:
raise RuntimeError(resp.error.message)
return resp.documents
class DocumentEmbeddingsClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(DocumentEmbeddingsClientSpec, self).__init__(
request_name = request_name,
request_schema = DocumentEmbeddingsRequest,
response_name = response_name,
response_schema = DocumentEmbeddingsResponse,
impl = DocumentEmbeddingsClient,
)

View file

@ -1,84 +0,0 @@
"""
Document embeddings query service. Input is vectors. Output is list of
embeddings.
"""
from .. schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
from .. schema import Error, Value
from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec
from . producer_spec import ProducerSpec
default_ident = "ge-query"
class DocumentEmbeddingsQueryService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(DocumentEmbeddingsQueryService, self).__init__(
**params | { "id": id }
)
self.register_specification(
ConsumerSpec(
name = "request",
schema = DocumentEmbeddingsRequest,
handler = self.on_message
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = DocumentEmbeddingsResponse,
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
docs = await self.query_document_embeddings(request)
print("Send response...", flush=True)
r = DocumentEmbeddingsResponse(documents=docs, error=None)
await flow("response").send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = DocumentEmbeddingsResponse(
error=Error(
type = "document-embeddings-query-error",
message = str(e),
),
response=None,
)
await flow("response").send(r, properties={"id": id})
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -1,50 +0,0 @@
"""
Document embeddings store base class
"""
from .. schema import DocumentEmbeddings
from .. base import FlowProcessor, ConsumerSpec
from .. exceptions import TooManyRequests
default_ident = "document-embeddings-write"
class DocumentEmbeddingsStoreService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(DocumentEmbeddingsStoreService, self).__init__(
**params | { "id": id }
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = DocumentEmbeddings,
handler = self.on_message
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
await self.store_document_embeddings(request)
except TooManyRequests as e:
raise e
except Exception as e:
print(f"Exception: {e}")
raise e
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -1,31 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import EmbeddingsRequest, EmbeddingsResponse
class EmbeddingsClient(RequestResponse):
async def embed(self, text, timeout=30):
resp = await self.request(
EmbeddingsRequest(
text = text
),
timeout=timeout
)
if resp.error:
raise RuntimeError(resp.error.message)
return resp.vectors
class EmbeddingsClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(EmbeddingsClientSpec, self).__init__(
request_name = request_name,
request_schema = EmbeddingsRequest,
response_name = response_name,
response_schema = EmbeddingsResponse,
impl = EmbeddingsClient,
)

View file

@ -1,90 +0,0 @@
"""
Embeddings resolution base class
"""
import time
from prometheus_client import Histogram
from .. schema import EmbeddingsRequest, EmbeddingsResponse, Error
from .. exceptions import TooManyRequests
from .. base import FlowProcessor, ConsumerSpec, ProducerSpec
default_ident = "embeddings"
class EmbeddingsService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(EmbeddingsService, self).__init__(**params | { "id": id })
self.register_specification(
ConsumerSpec(
name = "request",
schema = EmbeddingsRequest,
handler = self.on_request
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = EmbeddingsResponse
)
)
async def on_request(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print("Handling request", id, "...", flush=True)
vectors = await self.on_embeddings(request.text)
await flow("response").send(
EmbeddingsResponse(
error = None,
vectors = vectors,
),
properties={"id": id}
)
print("Handled.", flush=True)
except TooManyRequests as e:
raise e
except Exception as e:
# Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}", flush=True)
print("Send error response...", flush=True)
await flow.producer["response"].send(
EmbeddingsResponse(
error=Error(
type = "embeddings-error",
message = str(e),
),
vectors=None,
),
properties={"id": id}
)
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -1,32 +0,0 @@
import asyncio
class Flow:
def __init__(self, id, flow, processor, defn):
self.id = id
self.name = flow
self.producer = {}
# Consumers and publishers. Is this a bit untidy?
self.consumer = {}
self.setting = {}
for spec in processor.specifications:
spec.add(self, processor, defn)
async def start(self):
for c in self.consumer.values():
await c.start()
async def stop(self):
for c in self.consumer.values():
await c.stop()
def __call__(self, key):
if key in self.producer: return self.producer[key]
if key in self.consumer: return self.consumer[key]
if key in self.setting: return self.setting[key].value
return None

View file

@ -1,115 +0,0 @@
# Base class for processor with management of flows in & out which are managed
# by configuration. This is probably all processor types, except for the
# configuration service which can't manage itself.
import json
from pulsar.schema import JsonSchema
from .. schema import Error
from .. schema import config_request_queue, config_response_queue
from .. schema import config_push_queue
from .. log_level import LogLevel
from . async_processor import AsyncProcessor
from . flow import Flow
# Parent class for configurable processors, configured with flows by
# the config service
class FlowProcessor(AsyncProcessor):
def __init__(self, **params):
# Initialise base class
super(FlowProcessor, self).__init__(**params)
# Register configuration handler
self.register_config_handler(self.on_configure_flows)
# Initialise flow information state
self.flows = {}
# These can be overriden by a derived class:
# Array of specifications: ConsumerSpec, ProducerSpec, SettingSpec
self.specifications = []
print("Service initialised.")
# Register a configuration variable
def register_specification(self, spec):
self.specifications.append(spec)
# Start processing for a new flow
async def start_flow(self, flow, defn):
self.flows[flow] = Flow(self.id, flow, self, defn)
await self.flows[flow].start()
print("Started flow: ", flow)
# Stop processing for a new flow
async def stop_flow(self, flow):
if flow in self.flows:
await self.flows[flow].stop()
del self.flows[flow]
print("Stopped flow: ", flow, flush=True)
# Event handler - called for a configuration change
async def on_configure_flows(self, config, version):
print("Got config version", version, flush=True)
# Skip over invalid data
if "flows-active" not in config: return
# Check there's configuration information for me
if self.id in config["flows-active"]:
# Get my flow config
flow_config = json.loads(config["flows-active"][self.id])
else:
print("No configuration settings for me.", flush=True)
flow_config = {}
# Get list of flows which should be running and are currently
# running
wanted_flows = flow_config.keys()
current_flows = self.flows.keys()
# Start all the flows which arent currently running
for flow in wanted_flows:
if flow not in current_flows:
await self.start_flow(flow, flow_config[flow])
# Stop all the unwanted flows which are due to be stopped
for flow in current_flows:
if flow not in wanted_flows:
await self.stop_flow(flow)
print("Handled config update")
# Start threads, just call parent
async def start(self):
await super(FlowProcessor, self).start()
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
# parser.add_argument(
# '--rate-limit-retry',
# type=int,
# default=default_rate_limit_retry,
# help=f'Rate limit retry (default: {default_rate_limit_retry})'
# )
# parser.add_argument(
# '--rate-limit-timeout',
# type=int,
# default=default_rate_limit_timeout,
# help=f'Rate limit timeout (default: {default_rate_limit_timeout})'
# )

View file

@ -1,45 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
from .. knowledge import Uri, Literal
def to_value(x):
if x.is_uri: return Uri(x.value)
return Literal(x.value)
class GraphEmbeddingsClient(RequestResponse):
async def query(self, vectors, limit=20, user="trustgraph",
collection="default", timeout=30):
resp = await self.request(
GraphEmbeddingsRequest(
vectors = vectors,
limit = limit,
user = user,
collection = collection
),
timeout=timeout
)
print(resp, flush=True)
if resp.error:
raise RuntimeError(resp.error.message)
return [
to_value(v)
for v in resp.entities
]
class GraphEmbeddingsClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(GraphEmbeddingsClientSpec, self).__init__(
request_name = request_name,
request_schema = GraphEmbeddingsRequest,
response_name = response_name,
response_schema = GraphEmbeddingsResponse,
impl = GraphEmbeddingsClient,
)

View file

@ -1,84 +0,0 @@
"""
Graph embeddings query service. Input is vectors. Output is list of
embeddings.
"""
from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
from .. schema import Error, Value
from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec
from . producer_spec import ProducerSpec
default_ident = "ge-query"
class GraphEmbeddingsQueryService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(GraphEmbeddingsQueryService, self).__init__(
**params | { "id": id }
)
self.register_specification(
ConsumerSpec(
name = "request",
schema = GraphEmbeddingsRequest,
handler = self.on_message
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = GraphEmbeddingsResponse,
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
entities = await self.query_graph_embeddings(request)
print("Send response...", flush=True)
r = GraphEmbeddingsResponse(entities=entities, error=None)
await flow("response").send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = GraphEmbeddingsResponse(
error=Error(
type = "graph-embeddings-query-error",
message = str(e),
),
response=None,
)
await flow("response").send(r, properties={"id": id})
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -1,50 +0,0 @@
"""
Graph embeddings store base class
"""
from .. schema import GraphEmbeddings
from .. base import FlowProcessor, ConsumerSpec
from .. exceptions import TooManyRequests
default_ident = "graph-embeddings-write"
class GraphEmbeddingsStoreService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(GraphEmbeddingsStoreService, self).__init__(
**params | { "id": id }
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = GraphEmbeddings,
handler = self.on_message
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
await self.store_graph_embeddings(request)
except TooManyRequests as e:
raise e
except Exception as e:
print(f"Exception: {e}")
raise e
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -1,33 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import GraphRagQuery, GraphRagResponse
class GraphRagClient(RequestResponse):
async def rag(self, query, user="trustgraph", collection="default",
timeout=600):
resp = await self.request(
GraphRagQuery(
query = query,
user = user,
collection = collection,
),
timeout=timeout
)
if resp.error:
raise RuntimeError(resp.error.message)
return resp.response
class GraphRagClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(GraphRagClientSpec, self).__init__(
request_name = request_name,
request_schema = GraphRagQuery,
response_name = response_name,
response_schema = GraphRagResponse,
impl = GraphRagClient,
)

View file

@ -1,114 +0,0 @@
"""
LLM text completion base class
"""
import time
from prometheus_client import Histogram
from .. schema import TextCompletionRequest, TextCompletionResponse, Error
from .. exceptions import TooManyRequests
from .. base import FlowProcessor, ConsumerSpec, ProducerSpec
default_ident = "text-completion"
class LlmResult:
__slots__ = ["text", "in_token", "out_token", "model"]
class LlmService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(LlmService, self).__init__(**params | { "id": id })
self.register_specification(
ConsumerSpec(
name = "request",
schema = TextCompletionRequest,
handler = self.on_request
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = TextCompletionResponse
)
)
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
["id", "flow"],
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
async def on_request(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
with __class__.text_completion_metric.labels(
id=self.id,
flow=f"{flow.name}-{consumer.name}",
).time():
response = await self.generate_content(
request.system, request.prompt
)
await flow("response").send(
TextCompletionResponse(
error=None,
response=response.text,
in_token=response.in_token,
out_token=response.out_token,
model=response.model
),
properties={"id": id}
)
except TooManyRequests as e:
raise e
except Exception as e:
# Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}")
print("Send error response...", flush=True)
await flow.producer["response"].send(
TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
),
properties={"id": id}
)
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -1,82 +0,0 @@
from prometheus_client import start_http_server, Info, Enum, Histogram
from prometheus_client import Counter
class ConsumerMetrics:
def __init__(self, id, flow=None):
self.id = id
self.flow = flow
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'consumer_state', 'Consumer state',
["id", "flow"],
states=['stopped', 'running']
)
if not hasattr(__class__, "request_metric"):
__class__.request_metric = Histogram(
'request_latency', 'Request latency (seconds)',
["id", "flow"],
)
if not hasattr(__class__, "processing_metric"):
__class__.processing_metric = Counter(
'processing_count', 'Processing count',
["id", "flow", "status"]
)
if not hasattr(__class__, "rate_limit_metric"):
__class__.rate_limit_metric = Counter(
'rate_limit_count', 'Rate limit event count',
["id", "flow"]
)
def process(self, status):
__class__.processing_metric.labels(
id=self.id, flow=self.flow, status=status
).inc()
def rate_limit(self):
__class__.rate_limit_metric.labels(
id=self.id, flow=self.flow
).inc()
def state(self, state):
__class__.state_metric.labels(
id=self.id, flow=self.flow
).state(state)
def record_time(self):
return __class__.request_metric.labels(
id=self.id, flow=self.flow
).time()
class ProducerMetrics:
def __init__(self, id, flow=None):
self.id = id
self.flow = flow
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created',
["id", "flow"]
)
def inc(self):
__class__.output_metric.labels(id=self.id, flow=self.flow).inc()
class ProcessorMetrics:
def __init__(self, id):
self.id = id
if not hasattr(__class__, "processor_metric"):
__class__.processor_metric = Info(
'processor', 'Processor configuration',
["id"]
)
def info(self, info):
__class__.processor_metric.labels(id=self.id).info(info)

View file

@ -1,69 +1,56 @@
from pulsar.schema import JsonSchema
import asyncio
from prometheus_client import Info, Counter
class Producer:
from . base_processor import BaseProcessor
def __init__(self, client, topic, schema, metrics=None):
self.client = client
self.topic = topic
self.schema = schema
class Producer(BaseProcessor):
self.metrics = metrics
def __init__(self, **params):
self.running = True
self.producer = None
output_queue = params.get("output_queue")
output_schema = params.get("output_schema")
def __del__(self):
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created'
)
self.running = False
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
if hasattr(self, "producer"):
if self.producer:
self.producer.close()
__class__.pubsub_metric.info({
"output_queue": output_queue,
"output_schema": output_schema.__name__,
})
async def start(self):
self.running = True
super(Producer, self).__init__(**params)
async def stop(self):
self.running = False
if output_schema == None:
raise RuntimeError("output_schema must be specified")
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(output_schema),
chunking_enabled=True,
)
async def send(self, msg, properties={}):
self.producer.send(msg, properties)
__class__.output_metric.inc()
if not self.running: return
@staticmethod
def add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
):
while self.running and self.producer is None:
try:
print("Connect publisher to", self.topic, "...", flush=True)
self.producer = self.client.create_producer(
topic = self.topic,
schema = JsonSchema(self.schema)
)
print("Connected to", self.topic, flush=True)
except Exception as e:
print("Exception:", e, flush=True)
await asyncio.sleep(2)
if not self.running: break
while self.running:
try:
await asyncio.to_thread(
self.producer.send,
msg, properties
)
if self.metrics:
self.metrics.inc()
# Delivery success, break out of loop
break
except Exception as e:
print("Exception:", e, flush=True)
self.producer.close()
self.producer = None
BaseProcessor.add_args(parser)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)

View file

@ -1,25 +0,0 @@
from . producer import Producer
from . metrics import ProducerMetrics
from . spec import Spec
class ProducerSpec(Spec):
def __init__(self, name, schema):
self.name = name
self.schema = schema
def add(self, flow, processor, definition):
producer_metrics = ProducerMetrics(
flow.id, f"{flow.name}-{self.name}"
)
producer = Producer(
client = processor.client,
topic = definition[self.name],
schema = self.schema,
metrics = producer_metrics,
)
flow.producer[self.name] = producer

View file

@ -1,93 +0,0 @@
import json
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import PromptRequest, PromptResponse
class PromptClient(RequestResponse):
async def prompt(self, id, variables, timeout=600):
resp = await self.request(
PromptRequest(
id = id,
terms = {
k: json.dumps(v)
for k, v in variables.items()
}
),
timeout=timeout
)
if resp.error:
raise RuntimeError(resp.error.message)
if resp.text: return resp.text
return json.loads(resp.object)
async def extract_definitions(self, text, timeout=600):
return await self.prompt(
id = "extract-definitions",
variables = { "text": text },
timeout = timeout,
)
async def extract_relationships(self, text, timeout=600):
return await self.prompt(
id = "extract-relationships",
variables = { "text": text },
timeout = timeout,
)
async def kg_prompt(self, query, kg, timeout=600):
return await self.prompt(
id = "kg-prompt",
variables = {
"query": query,
"knowledge": [
{ "s": v[0], "p": v[1], "o": v[2] }
for v in kg
]
},
timeout = timeout,
)
async def document_prompt(self, query, documents, timeout=600):
return await self.prompt(
id = "document-prompt",
variables = {
"query": query,
"documents": documents,
},
timeout = timeout,
)
async def agent_react(self, variables, timeout=600):
return await self.prompt(
id = "agent-react",
variables = variables,
timeout = timeout,
)
async def question(self, question, timeout=600):
return await self.prompt(
id = "question",
variables = {
"question": question,
},
timeout = timeout,
)
class PromptClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(PromptClientSpec, self).__init__(
request_name = request_name,
request_schema = PromptRequest,
response_name = response_name,
response_schema = PromptResponse,
impl = PromptClient,
)

View file

@ -1,52 +1,47 @@
from pulsar.schema import JsonSchema
import asyncio
import queue
import time
import pulsar
import threading
class Publisher:
def __init__(self, client, topic, schema=None, max_size=10,
def __init__(self, pulsar_client, topic, schema=None, max_size=10,
chunking_enabled=True):
self.client = client
self.client = pulsar_client
self.topic = topic
self.schema = schema
self.q = asyncio.Queue(maxsize=max_size)
self.q = queue.Queue(maxsize=max_size)
self.chunking_enabled = chunking_enabled
self.running = True
async def start(self):
self.task = asyncio.create_task(self.run())
def start(self):
self.task = threading.Thread(target=self.run)
self.task.start()
async def stop(self):
def stop(self):
self.running = False
async def join(self):
await self.stop()
await self.task
def join(self):
self.stop()
self.task.join()
async def run(self):
def run(self):
while self.running:
try:
producer = self.client.create_producer(
topic=self.topic,
schema=JsonSchema(self.schema),
schema=self.schema,
chunking_enabled=self.chunking_enabled,
)
while self.running:
try:
id, item = await asyncio.wait_for(
self.q.get(),
timeout=0.5
)
except asyncio.TimeoutError:
continue
except asyncio.QueueEmpty:
id, item = self.q.get(timeout=0.5)
except queue.Empty:
continue
if id:
@ -60,6 +55,7 @@ class Publisher:
# If handler drops out, sleep a retry
time.sleep(2)
async def send(self, id, item):
await self.q.put((id, item))
def send(self, id, msg):
self.q.put((id, msg))

View file

@ -1,80 +0,0 @@
import os
import pulsar
import uuid
from pulsar.schema import JsonSchema
from .. log_level import LogLevel
class PulsarClient:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
def __init__(self, **params):
self.client = None
pulsar_host = params.get("pulsar_host", self.default_pulsar_host)
pulsar_listener = params.get("pulsar_listener", None)
pulsar_api_key = params.get(
"pulsar_api_key",
self.default_pulsar_api_key
)
log_level = params.get("log_level", LogLevel.INFO)
self.pulsar_host = pulsar_host
self.pulsar_api_key = pulsar_api_key
if pulsar_api_key:
auth = pulsar.AuthenticationToken(pulsar_api_key)
self.client = pulsar.Client(
pulsar_host,
authentication=auth,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
else:
self.client = pulsar.Client(
pulsar_host,
listener_name=pulsar_listener,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.pulsar_listener = pulsar_listener
def close(self):
self.client.close()
def __del__(self):
if hasattr(self, "client"):
if self.client:
self.client.close()
@staticmethod
def add_args(parser):
parser.add_argument(
'-p', '--pulsar-host',
default=__class__.default_pulsar_host,
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)
parser.add_argument(
'--pulsar-api-key',
default=__class__.default_pulsar_api_key,
help=f'Pulsar API key',
)
parser.add_argument(
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)

View file

@ -1,136 +0,0 @@
import uuid
import asyncio
from . subscriber import Subscriber
from . producer import Producer
from . spec import Spec
from . metrics import ConsumerMetrics, ProducerMetrics
class RequestResponse(Subscriber):
def __init__(
self, client, subscription, consumer_name,
request_topic, request_schema,
request_metrics,
response_topic, response_schema,
response_metrics,
):
super(RequestResponse, self).__init__(
client = client,
subscription = subscription,
consumer_name = consumer_name,
topic = response_topic,
schema = response_schema,
)
self.producer = Producer(
client = client,
topic = request_topic,
schema = request_schema,
metrics = request_metrics,
)
async def start(self):
await self.producer.start()
await super(RequestResponse, self).start()
async def stop(self):
await self.producer.stop()
await super(RequestResponse, self).stop()
async def request(self, req, timeout=300, recipient=None):
id = str(uuid.uuid4())
print("Request", id, "...", flush=True)
q = await self.subscribe(id)
try:
await self.producer.send(
req,
properties={"id": id}
)
except Exception as e:
print("Exception:", e)
raise e
try:
while True:
resp = await asyncio.wait_for(
q.get(),
timeout=timeout
)
print("Got response.", flush=True)
if recipient is None:
# If no recipient handler, just return the first
# response we get
return resp
else:
# Recipient handler gets to decide when we're done b
# returning a boolean
fin = await recipient(resp)
# If done, return the last result otherwise loop round for
# next response
if fin:
return resp
else:
continue
except Exception as e:
print("Exception:", e)
raise e
finally:
await self.unsubscribe(id)
# This deals with the request/response case. The caller needs to
# use another service in request/response mode. Uses two topics:
# - we send on the request topic as a producer
# - we receive on the response topic as a subscriber
class RequestResponseSpec(Spec):
def __init__(
self, request_name, request_schema, response_name,
response_schema, impl=RequestResponse
):
self.request_name = request_name
self.request_schema = request_schema
self.response_name = response_name
self.response_schema = response_schema
self.impl = impl
def add(self, flow, processor, definition):
producer_metrics = ProducerMetrics(
flow.id, f"{flow.name}-{self.response_name}"
)
rr = self.impl(
client = processor.client,
subscription = flow.id,
consumer_name = flow.id,
request_topic = definition[self.request_name],
request_schema = self.request_schema,
request_metrics = producer_metrics,
response_topic = definition[self.response_name],
response_schema = self.response_schema,
response_metrics = None,
)
flow.consumer[self.request_name] = rr

View file

@ -1,19 +0,0 @@
from . spec import Spec
class Setting:
def __init__(self, value):
self.value = value
async def start():
pass
async def stop():
pass
class SettingSpec(Spec):
def __init__(self, name):
self.name = name
def add(self, flow, processor, definition):
flow.config[self.name] = Setting(definition[self.name])

View file

@ -1,4 +0,0 @@
class Spec:
pass

View file

@ -1,14 +1,14 @@
from pulsar.schema import JsonSchema
import asyncio
import _pulsar
import queue
import pulsar
import threading
import time
class Subscriber:
def __init__(self, client, topic, subscription, consumer_name,
def __init__(self, pulsar_client, topic, subscription, consumer_name,
schema=None, max_size=100):
self.client = client
self.client = pulsar_client
self.topic = topic
self.subscription = subscription
self.consumer_name = consumer_name
@ -16,50 +16,35 @@ class Subscriber:
self.q = {}
self.full = {}
self.max_size = max_size
self.lock = asyncio.Lock()
self.lock = threading.Lock()
self.running = True
async def __del__(self):
def start(self):
self.task = threading.Thread(target=self.run)
self.task.start()
def stop(self):
self.running = False
async def start(self):
self.task = asyncio.create_task(self.run())
def join(self):
self.task.join()
async def stop(self):
self.running = False
async def join(self):
await self.stop()
await self.task
async def run(self):
def run(self):
while self.running:
try:
consumer = self.client.subscribe(
topic = self.topic,
subscription_name = self.subscription,
consumer_name = self.consumer_name,
schema = JsonSchema(self.schema),
topic=self.topic,
subscription_name=self.subscription,
consumer_name=self.consumer_name,
schema=self.schema,
)
print("Subscriber running...", flush=True)
while self.running:
try:
msg = await asyncio.to_thread(
consumer.receive,
timeout_millis=2000
)
except _pulsar.Timeout:
continue
except Exception as e:
print("Exception:", e, flush=True)
print(type(e))
raise e
msg = consumer.receive()
# Acknowledge successful reception of the message
consumer.acknowledge(msg)
@ -71,68 +56,57 @@ class Subscriber:
value = msg.value()
async with self.lock:
# FIXME: Hard-coded timeouts
with self.lock:
if id in self.q:
try:
# FIXME: Timeout means data goes missing
await asyncio.wait_for(
self.q[id].put(value),
timeout=2
)
except Exception as e:
print("Q Put:", e, flush=True)
self.q[id].put(value, timeout=0.5)
except:
pass
for q in self.full.values():
try:
# FIXME: Timeout means data goes missing
await asyncio.wait_for(
q.put(value),
timeout=2
)
except Exception as e:
print("Q Put:", e, flush=True)
q.put(value, timeout=0.5)
except:
pass
except Exception as e:
print("Subscriber exception:", e, flush=True)
consumer.close()
print("Exception:", e, flush=True)
# If handler drops out, sleep a retry
time.sleep(2)
async def subscribe(self, id):
def subscribe(self, id):
async with self.lock:
with self.lock:
q = asyncio.Queue(maxsize=self.max_size)
q = queue.Queue(maxsize=self.max_size)
self.q[id] = q
return q
async def unsubscribe(self, id):
def unsubscribe(self, id):
async with self.lock:
with self.lock:
if id in self.q:
# self.q[id].shutdown(immediate=True)
del self.q[id]
async def subscribe_all(self, id):
def subscribe_all(self, id):
async with self.lock:
with self.lock:
q = asyncio.Queue(maxsize=self.max_size)
q = queue.Queue(maxsize=self.max_size)
self.full[id] = q
return q
async def unsubscribe_all(self, id):
def unsubscribe_all(self, id):
async with self.lock:
with self.lock:
if id in self.full:
# self.full[id].shutdown(immediate=True)

View file

@ -1,30 +0,0 @@
from . metrics import ConsumerMetrics
from . subscriber import Subscriber
from . spec import Spec
class SubscriberSpec(Spec):
def __init__(self, name, schema):
self.name = name
self.schema = schema
def add(self, flow, processor, definition):
# FIXME: Metrics not used
subscriber_metrics = ConsumerMetrics(
flow.id, f"{flow.name}-{self.name}"
)
subscriber = Subscriber(
client = processor.client,
topic = definition[self.name],
subscription = flow.id,
consumer_name = flow.id,
schema = self.schema,
)
# Put it in the consumer map, does that work?
# It means it gets start/stop call.
flow.consumer[self.name] = subscriber

View file

@ -1,30 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import TextCompletionRequest, TextCompletionResponse
class TextCompletionClient(RequestResponse):
async def text_completion(self, system, prompt, timeout=600):
resp = await self.request(
TextCompletionRequest(
system = system, prompt = prompt
),
timeout=timeout
)
if resp.error:
raise RuntimeError(resp.error.message)
return resp.response
class TextCompletionClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(TextCompletionClientSpec, self).__init__(
request_name = request_name,
request_schema = TextCompletionRequest,
response_name = response_name,
response_schema = TextCompletionResponse,
impl = TextCompletionClient,
)

View file

@ -1,61 +0,0 @@
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Value
from .. knowledge import Uri, Literal
class Triple:
def __init__(self, s, p, o):
self.s = s
self.p = p
self.o = o
def to_value(x):
if x.is_uri: return Uri(x.value)
return Literal(x.value)
def from_value(x):
if x is None: return None
if isinstance(x, Uri):
return Value(value=str(x), is_uri=True)
else:
return Value(value=str(x), is_uri=False)
class TriplesClient(RequestResponse):
async def query(self, s=None, p=None, o=None, limit=20,
user="trustgraph", collection="default",
timeout=30):
resp = await self.request(
TriplesQueryRequest(
s = from_value(s),
p = from_value(p),
o = from_value(o),
limit = limit,
user = user,
collection = collection,
),
timeout=timeout
)
if resp.error:
raise RuntimeError(resp.error.message)
triples = [
Triple(to_value(v.s), to_value(v.p), to_value(v.o))
for v in resp.triples
]
return triples
class TriplesClientSpec(RequestResponseSpec):
def __init__(
self, request_name, response_name,
):
super(TriplesClientSpec, self).__init__(
request_name = request_name,
request_schema = TriplesQueryRequest,
response_name = response_name,
response_schema = TriplesQueryResponse,
impl = TriplesClient,
)

View file

@ -1,82 +0,0 @@
"""
Triples query service. Input is a (s, p, o) triple, some values may be
null. Output is a list of triples.
"""
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .. schema import Value, Triple
from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec
from . producer_spec import ProducerSpec
default_ident = "triples-query"
class TriplesQueryService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(TriplesQueryService, self).__init__(**params | { "id": id })
self.register_specification(
ConsumerSpec(
name = "request",
schema = TriplesQueryRequest,
handler = self.on_message
)
)
self.register_specification(
ProducerSpec(
name = "response",
schema = TriplesQueryResponse,
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = await self.query_triples(request)
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples, error=None)
await flow("response").send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = TriplesQueryResponse(
error = Error(
type = "triples-query-error",
message = str(e),
),
triples = None,
)
await flow("response").send(r, properties={"id": id})
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -1,47 +0,0 @@
"""
Triples store base class
"""
from .. schema import Triples
from .. base import FlowProcessor, ConsumerSpec
default_ident = "triples-write"
class TriplesStoreService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(TriplesStoreService, self).__init__(**params | { "id": id })
self.register_specification(
ConsumerSpec(
name = "input",
schema = Triples,
handler = self.on_message
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
await self.store_triples(request)
except TooManyRequests as e:
raise e
except Exception as e:
print(f"Exception: {e}")
raise e
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)

View file

@ -26,5 +26,12 @@ class AgentResponse(Record):
thought = String()
observation = String()
agent_request_queue = topic(
'agent', kind='non-persistent', namespace='request'
)
agent_response_queue = topic(
'agent', kind='non-persistent', namespace='response'
)
############################################################################

View file

@ -2,7 +2,7 @@
from pulsar.schema import Record, Bytes, String, Boolean, Array, Map, Integer
from . topic import topic
from . types import Error
from . types import Error, RowSchema
############################################################################

View file

@ -11,6 +11,8 @@ class Document(Record):
metadata = Metadata()
data = Bytes()
document_ingest_queue = topic('document-load')
############################################################################
# Text documents / text from PDF
@ -19,6 +21,8 @@ class TextDocument(Record):
metadata = Metadata()
text = Bytes()
text_ingest_queue = topic('text-document-load')
############################################################################
# Chunks of text
@ -27,6 +31,8 @@ class Chunk(Record):
metadata = Metadata()
chunk = Bytes()
chunk_ingest_queue = topic('chunk-load')
############################################################################
# Document embeddings are embeddings associated with a chunk
@ -40,6 +46,8 @@ class DocumentEmbeddings(Record):
metadata = Metadata()
chunks = Array(ChunkEmbeddings())
document_embeddings_store_queue = topic('document-embeddings-store')
############################################################################
# Doc embeddings query
@ -54,3 +62,10 @@ class DocumentEmbeddingsResponse(Record):
error = Error()
documents = Array(Bytes())
document_embeddings_request_queue = topic(
'doc-embeddings', kind='non-persistent', namespace='request'
)
document_embeddings_response_queue = topic(
'doc-embeddings', kind='non-persistent', namespace='response',
)

View file

@ -1,66 +0,0 @@
from pulsar.schema import Record, Bytes, String, Boolean, Array, Map, Integer
from . topic import topic
from . types import Error
############################################################################
# Flow service:
# list_classes() -> (classname[])
# get_class(classname) -> (class)
# put_class(class) -> (class)
# delete_class(classname) -> ()
#
# list_flows() -> (flowid[])
# get_flow(flowid) -> (flow)
# start_flow(flowid, classname) -> ()
# stop_flow(flowid) -> ()
# Prompt services, abstract the prompt generation
class FlowRequest(Record):
operation = String() # list_classes, get_class, put_class, delete_class
# list_flows, get_flow, start_flow, stop_flow
# get_class, put_class, delete_class, start_flow
class_name = String()
# put_class
class = String()
# start_flow
description = String()
# get_flow, start_flow, stop_flow
flow_id = String()
class FlowResponse(Record):
# list_classes
class_names = Array(String())
# list_flows
flow_ids = Array(String())
# get_class
class = String()
# get_flow
flow = String()
# get_flow
description = String()
# Everything
error = Error()
flow_request_queue = topic(
'flow', kind='non-persistent', namespace='request'
)
flow_response_queue = topic(
'flow', kind='non-persistent', namespace='response'
)
############################################################################