Feature/flow librarian (#361)

* Update librarian to new API

* Implementing new schema with document + processing objects
This commit is contained in:
cybermaggedon 2025-05-04 22:26:19 +01:00 committed by GitHub
parent 6bf485788a
commit ff28d26f4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1323 additions and 428 deletions

View file

@ -5,41 +5,27 @@ Librarian service, manages documents in collections
from functools import partial
import asyncio
import threading
import queue
import base64
import json
from pulsar.schema import JsonSchema
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. schema import LibrarianRequest, LibrarianResponse, Error
from .. schema import librarian_request_queue, librarian_response_queue
from .. schema import GraphEmbeddings
from .. schema import graph_embeddings_store_queue
from .. schema import Triples
from .. schema import triples_store_queue
from .. schema import DocumentEmbeddings
from .. schema import document_embeddings_store_queue
from .. schema import Document, Metadata
from .. schema import document_ingest_queue
from .. schema import TextDocument, Metadata
from .. schema import text_ingest_queue
from .. base import Publisher
from .. base import Subscriber
from .. log_level import LogLevel
from .. base import ConsumerProducer
from .. exceptions import RequestError
from . librarian import Librarian
module = "librarian"
default_ident = "librarian"
default_librarian_request_queue = librarian_request_queue
default_librarian_response_queue = librarian_response_queue
default_input_queue = librarian_request_queue
default_output_queue = librarian_response_queue
default_subscriber = module
default_minio_host = "minio:9000"
default_minio_access_key = "minioadmin"
default_minio_secret_key = "minioadmin"
@ -50,15 +36,21 @@ bucket_name = "library"
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "librarian"
class Processor(ConsumerProducer):
class Processor(AsyncProcessor):
def __init__(self, **params):
self.running = True
id = params.get("id")
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
# self.running = True
librarian_request_queue = params.get(
"librarian_request_queue", default_librarian_request_queue
)
librarian_response_queue = params.get(
"librarian_response_queue", default_librarian_response_queue
)
minio_host = params.get("minio_host", default_minio_host)
minio_access_key = params.get(
@ -74,19 +66,10 @@ class Processor(ConsumerProducer):
cassandra_user = params.get("cassandra_user")
cassandra_password = params.get("cassandra_password")
triples_queue = params.get("triples_queue")
graph_embeddings_queue = params.get("graph_embeddings_queue")
document_embeddings_queue = params.get("document_embeddings_queue")
document_load_queue = params.get("document_load_queue")
text_load_queue = params.get("text_load_queue")
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": LibrarianRequest,
"output_schema": LibrarianResponse,
"librarian_request_queue": librarian_request_queue,
"librarian_response_queue": librarian_response_queue,
"minio_host": minio_host,
"minio_access_key": minio_access_key,
"cassandra_host": cassandra_host,
@ -94,38 +77,30 @@ class Processor(ConsumerProducer):
}
)
self.document_load = Publisher(
self.client, document_load_queue, JsonSchema(Document),
librarian_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "librarian-request"
)
self.text_load = Publisher(
self.client, text_load_queue, JsonSchema(TextDocument),
librarian_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "librarian-response"
)
self.triples_brk = Subscriber(
self.client, triples_store_queue,
"librarian", "librarian",
schema=JsonSchema(Triples),
)
self.graph_embeddings_brk = Subscriber(
self.client, graph_embeddings_store_queue,
"librarian", "librarian",
schema=JsonSchema(GraphEmbeddings),
)
self.document_embeddings_brk = Subscriber(
self.client, document_embeddings_store_queue,
"librarian", "librarian",
schema=JsonSchema(DocumentEmbeddings),
self.librarian_request_consumer = Consumer(
taskgroup = self.taskgroup,
client = self.pulsar_client,
flow = None,
topic = librarian_request_queue,
subscriber = id,
schema = LibrarianRequest,
handler = self.on_librarian_request,
metrics = librarian_request_metrics,
)
self.triples_reader = threading.Thread(
target=self.receive_triples
)
self.graph_embeddings_reader = threading.Thread(
target=self.receive_graph_embeddings
)
self.document_embeddings_reader = threading.Thread(
target=self.receive_document_embeddings
self.librarian_response_producer = Producer(
client = self.pulsar_client,
topic = librarian_response_queue,
schema = LibrarianResponse,
metrics = librarian_response_metrics,
)
self.librarian = Librarian(
@ -141,87 +116,34 @@ class Processor(ConsumerProducer):
load_text = self.load_text,
)
self.register_config_handler(self.on_librarian_config)
self.flows = {}
print("Initialised.", flush=True)
async def start(self):
self.document_load.start()
self.text_load.start()
self.triples_brk.start()
self.graph_embeddings_brk.start()
self.document_embeddings_brk.start()
await super(Processor, self).start()
await self.librarian_request_consumer.start()
await self.librarian_response_producer.start()
self.triples_sub = self.triples_brk.subscribe_all("x")
self.graph_embeddings_sub = self.graph_embeddings_brk.subscribe_all("x")
self.document_embeddings_sub = self.document_embeddings_brk.subscribe_all("x")
async def on_librarian_config(self, config, version):
self.triples_reader.start()
self.graph_embeddings_reader.start()
self.document_embeddings_reader.start()
print("config version", version)
if "flows" in config:
self.flows = {
k: json.loads(v)
for k, v in config["flows"].items()
}
print(self.flows)
def __del__(self):
self.running = False
if hasattr(self, "document_load"):
self.document_load.stop()
self.document_load.join()
if hasattr(self, "text_load"):
self.text_load.stop()
self.text_load.join()
if hasattr(self, "triples_sub"):
self.triples_sub.unsubscribe_all("x")
if hasattr(self, "graph_embeddings_sub"):
self.graph_embeddings_sub.unsubscribe_all("x")
if hasattr(self, "document_embeddings_sub"):
self.document_embeddings_sub.unsubscribe_all("x")
if hasattr(self, "triples_brk"):
self.triples_brk.stop()
self.triples_brk.join()
if hasattr(self, "graph_embeddings_brk"):
self.graph_embeddings_brk.stop()
self.graph_embeddings_brk.join()
if hasattr(self, "document_embeddings_brk"):
self.document_embeddings_brk.stop()
self.document_embeddings_brk.join()
def receive_triples(self):
while self.running:
try:
msg = self.triples_sub.get(timeout=1)
except queue.Empty:
continue
self.librarian.handle_triples(msg)
def receive_graph_embeddings(self):
while self.running:
try:
msg = self.graph_embeddings_sub.get(timeout=1)
except queue.Empty:
continue
self.librarian.handle_graph_embeddings(msg)
def receive_document_embeddings(self):
while self.running:
try:
msg = self.document_embeddings_sub.get(timeout=1)
except queue.Empty:
continue
self.librarian.handle_document_embeddings(msg)
pass
async def load_document(self, document):
@ -235,6 +157,8 @@ class Processor(ConsumerProducer):
data = document.document
)
self.document_load.send(None, doc)
async def load_text(self, document):
@ -254,41 +178,31 @@ class Processor(ConsumerProducer):
self.text_load.send(None, doc)
def parse_request(self, v):
async def process_request(self, v):
if v.operation is None:
raise RequestError("Null operation")
print("op", v.operation)
print("requets", v.operation)
if v.operation == "add":
if (
v.document and v.document.id and v.document.metadata and
v.document.document and v.document.kind
):
return partial(
self.librarian.add,
document = v.document,
)
else:
raise RequestError("Invalid call")
impls = {
"add-document": self.librarian.add_document,
"remove-document": self.librarian.remove_document,
"update-document": self.librarian.update_document,
"get-document-metadata": self.librarian.get_document_metadata,
"get-document-content": self.librarian.get_document_content,
"add-processing": self.librarian.add_processing,
"remove-processing": self.librarian.remove_processing,
"list-documents": self.librarian.list_documents,
"list-processing": self.librarian.list_processing,
}
if v.operation == "list":
print("list", v)
print(v.user)
if v.user:
return partial(
self.librarian.list,
user = v.user,
collection = v.collection,
)
else:
print("BROK")
raise RequestError("Invalid call")
if v.operation not in impls:
raise RequestError(f"Invalid operation: {v.operation}")
raise RequestError("Invalid operation: " + v.operation)
return await impls[v.operation](v)
async def handle(self, msg):
async def on_librarian_request(self, msg, consumer, flow):
v = msg.value()
@ -299,20 +213,15 @@ class Processor(ConsumerProducer):
print(f"Handling input {id}...", flush=True)
try:
func = self.parse_request(v)
except RequestError as e:
resp = LibrarianResponse(
error = Error(
type = "request-error",
message = str(e),
)
resp = await self.process_request(v)
await self.librarian_response_producer.send(
resp, properties={"id": id}
)
await self.send(resp, properties={"id": id})
return
try:
resp = await func()
print("->", resp)
except RequestError as e:
resp = LibrarianResponse(
error = Error(
@ -320,31 +229,43 @@ class Processor(ConsumerProducer):
message = str(e),
)
)
await self.send(resp, properties={"id": id})
await self.librarian_response_producer.send(
resp, properties={"id": id}
)
return
except Exception as e:
print("Exception:", e, flush=True)
resp = LibrarianResponse(
error = Error(
type = "processing-error",
message = "Unhandled error: " + str(e),
type = "unexpected-error",
message = str(e),
)
)
await self.send(resp, properties={"id": id})
await self.librarian_response_producer.send(
resp, properties={"id": id}
)
return
print("Send response..!.", flush=True)
await self.send(resp, properties={"id": id})
print("Done.", flush=True)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
AsyncProcessor.add_args(parser)
parser.add_argument(
'--librarian-request-queue',
default=default_librarian_request_queue,
help=f'Config request queue (default: {default_librarian_request_queue})'
)
parser.add_argument(
'--librarian-response-queue',
default=default_librarian_response_queue,
help=f'Config response queue {default_librarian_response_queue}',
)
parser.add_argument(
@ -385,40 +306,7 @@ class Processor(ConsumerProducer):
help=f'Cassandra password'
)
parser.add_argument(
'--triples-queue',
default=triples_store_queue,
help=f'Triples queue (default: {triples_store_queue})'
)
parser.add_argument(
'--graph-embeddings-queue',
default=graph_embeddings_store_queue,
help=f'Graph embeddings queue (default: {triples_store_queue})'
)
parser.add_argument(
'--document-embeddings-queue',
default=document_embeddings_store_queue,
help='Document embeddings queue '
f'(default: {document_embeddings_store_queue})'
)
parser.add_argument(
'--document-load-queue',
default=document_ingest_queue,
help='Document load queue '
f'(default: {document_ingest_queue})'
)
parser.add_argument(
'--text-load-queue',
default=text_ingest_queue,
help='Text ingest queue '
f'(default: {text_ingest_queue})'
)
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)