trustgraph/trustgraph-flow/trustgraph/cores/service.py
cybermaggedon 31b7ade44d
Feature/knowledge load (#372)
* Switch off retry in Cassandra until we can differentiate retryable errors

* Fix config getvalues

* Loading knowledge cores works
2025-05-08 00:41:45 +01:00

230 lines
6.4 KiB
Python
Executable file

"""
Knowledge core service, manages cores and exports them
"""
from functools import partial
import asyncio
import base64
import json
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. schema import KnowledgeRequest, KnowledgeResponse, Error
from .. schema import knowledge_request_queue, knowledge_response_queue
from .. schema import Document, Metadata
from .. schema import TextDocument, Metadata
from .. exceptions import RequestError
from . knowledge import KnowledgeManager
default_ident = "knowledge"
default_knowledge_request_queue = knowledge_request_queue
default_knowledge_response_queue = knowledge_response_queue
default_cassandra_host = "cassandra"
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "knowledge"
class Processor(AsyncProcessor):
def __init__(self, **params):
id = params.get("id")
knowledge_request_queue = params.get(
"knowledge_request_queue", default_knowledge_request_queue
)
knowledge_response_queue = params.get(
"knowledge_response_queue", default_knowledge_response_queue
)
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_password = params.get("cassandra_password")
super(Processor, self).__init__(
**params | {
"knowledge_request_queue": knowledge_request_queue,
"knowledge_response_queue": knowledge_response_queue,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
}
)
knowledge_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "knowledge-request"
)
knowledge_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "knowledge-response"
)
self.knowledge_request_consumer = Consumer(
taskgroup = self.taskgroup,
client = self.pulsar_client,
flow = None,
topic = knowledge_request_queue,
subscriber = id,
schema = KnowledgeRequest,
handler = self.on_knowledge_request,
metrics = knowledge_request_metrics,
)
self.knowledge_response_producer = Producer(
client = self.pulsar_client,
topic = knowledge_response_queue,
schema = KnowledgeResponse,
metrics = knowledge_response_metrics,
)
self.knowledge = KnowledgeManager(
cassandra_host = cassandra_host.split(","),
cassandra_user = cassandra_user,
cassandra_password = cassandra_password,
keyspace = keyspace,
flow_config = self,
)
self.register_config_handler(self.on_knowledge_config)
self.flows = {}
print("Initialised.", flush=True)
async def start(self):
await super(Processor, self).start()
await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start()
async def on_knowledge_config(self, config, version):
print("config version", version)
if "flows" in config:
self.flows = {
k: json.loads(v)
for k, v in config["flows"].items()
}
print(self.flows)
async def process_request(self, v, id):
if v.operation is None:
raise RequestError("Null operation")
print("request", v.operation)
impls = {
"list-kg-cores": self.knowledge.list_kg_cores,
"get-kg-core": self.knowledge.get_kg_core,
"delete-kg-core": self.knowledge.delete_kg_core,
"put-kg-core": self.knowledge.put_kg_core,
"load-kg-core": self.knowledge.load_kg_core,
"unload-kg-core": self.knowledge.unload_kg_core,
}
if v.operation not in impls:
raise RequestError(f"Invalid operation: {v.operation}")
async def respond(x):
await self.knowledge_response_producer.send(
x, { "id": id }
)
return await impls[v.operation](v, respond)
async def on_knowledge_request(self, msg, consumer, flow):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
try:
# We don't send a response back here, the processing
# implementation sends whatever it needs to send.
await self.process_request(v, id)
return
except RequestError as e:
resp = KnowledgeResponse(
error = Error(
type = "request-error",
message = str(e),
)
)
await self.knowledge_response_producer.send(
resp, properties={"id": id}
)
return
except Exception as e:
resp = KnowledgeResponse(
error = Error(
type = "unexpected-error",
message = str(e),
)
)
await self.knowledge_response_producer.send(
resp, properties={"id": id}
)
return
print("Done.", flush=True)
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
parser.add_argument(
'--knowledge-request-queue',
default=default_knowledge_request_queue,
help=f'Config request queue (default: {default_knowledge_request_queue})'
)
parser.add_argument(
'--knowledge-response-queue',
default=default_knowledge_response_queue,
help=f'Config response queue {default_knowledge_response_queue}',
)
parser.add_argument(
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run():
Processor.launch(default_ident, __doc__)