trustgraph/trustgraph-flow/trustgraph/cores/service.py
cybermaggedon fd8d5b2c42
Recent fixes -> release/v2.4 (#891)
* Fix publisher resource leak in librarian submit_document (#883)

Wrap pub.start()/pub.send() in try/finally to guarantee pub.stop() is
called on error. Remove unnecessary asyncio.sleep(1) kludge.

* Make Cassandra replication factor configurable (issue #787) (#887)

Add CASSANDRA_REPLICATION_FACTOR environment variable and
--cassandra-replication-factor CLI argument to cassandra_config.py.

Update all four table store constructors (ConfigTableStore,
KnowledgeTableStore, LibraryTableStore, IamTableStore) to accept
an optional replication_factor parameter and use it in keyspace
creation CQL queries.

Thread the replication factor through all service constructors:
Configuration, KnowledgeManager, Librarian, IamService, and
knowledge store Processor.

* Update tests

---------

Co-authored-by: gittihub-jpg <rico@springer-mail.net>
2026-05-08 19:48:12 +01:00

272 lines
7.8 KiB
Python
Executable file

"""
Knowledge core service, manages cores and exports them
"""
from functools import partial
import asyncio
import base64
import json
import logging
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .. schema import KnowledgeRequest, KnowledgeResponse, Error
from .. schema import knowledge_request_queue, knowledge_response_queue
from .. schema import Document, Metadata
from .. schema import TextDocument, Metadata
from .. exceptions import RequestError
from . knowledge import KnowledgeManager
# Module logger
logger = logging.getLogger(__name__)
default_ident = "knowledge"
default_knowledge_request_queue = knowledge_request_queue
default_knowledge_response_queue = knowledge_response_queue
default_cassandra_host = "cassandra"
def workspace_queue(base_queue, workspace):
return f"{base_queue}:{workspace}"
class Processor(WorkspaceProcessor):
def __init__(self, **params):
id = params.get("id")
self.knowledge_request_queue_base = params.get(
"knowledge_request_queue", default_knowledge_request_queue
)
self.knowledge_response_queue_base = params.get(
"knowledge_response_queue", default_knowledge_response_queue
)
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
hosts, username, password, keyspace, replication_factor = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password,
default_keyspace="knowledge"
)
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
super(Processor, self).__init__(
**params | {
"knowledge_request_queue": self.knowledge_request_queue_base,
"knowledge_response_queue": self.knowledge_response_queue_base,
"cassandra_host": self.cassandra_host,
"cassandra_username": self.cassandra_username,
"cassandra_password": self.cassandra_password,
}
)
self.knowledge = KnowledgeManager(
cassandra_host = self.cassandra_host,
cassandra_username = self.cassandra_username,
cassandra_password = self.cassandra_password,
keyspace = keyspace,
flow_config = self,
replication_factor = replication_factor,
)
self.register_config_handler(self.on_knowledge_config, types=["flow"])
self.flows = {}
self.workspace_consumers = {}
logger.info("Knowledge service initialized")
async def on_workspace_created(self, workspace):
if workspace in self.workspace_consumers:
return
req_queue = workspace_queue(
self.knowledge_request_queue_base, workspace,
)
resp_queue = workspace_queue(
self.knowledge_response_queue_base, workspace,
)
await self.pubsub.ensure_topic(req_queue)
await self.pubsub.ensure_topic(resp_queue)
response_producer = Producer(
backend=self.pubsub,
topic=resp_queue,
schema=KnowledgeResponse,
metrics=ProducerMetrics(
processor=self.id, flow=None,
name=f"knowledge-response-{workspace}",
),
)
consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=req_queue,
subscriber=self.id,
schema=KnowledgeRequest,
handler=partial(
self.on_knowledge_request, workspace=workspace,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"knowledge-request-{workspace}",
),
)
await response_producer.start()
await consumer.start()
self.workspace_consumers[workspace] = {
"consumer": consumer,
"response": response_producer,
}
logger.info(f"Subscribed to workspace queue: {workspace}")
async def on_workspace_deleted(self, workspace):
clients = self.workspace_consumers.pop(workspace, None)
if clients:
for client in clients.values():
await client.stop()
logger.info(f"Unsubscribed from workspace queue: {workspace}")
async def start(self):
await super(Processor, self).start()
async def on_knowledge_config(self, workspace, config, version):
logger.info(
f"Configuration version: {version} workspace: {workspace}"
)
if "flow" in config:
self.flows[workspace] = {
k: json.loads(v)
for k, v in config["flow"].items()
}
else:
self.flows[workspace] = {}
logger.debug(f"Flows for {workspace}: {self.flows[workspace]}")
async def process_request(self, v, id, workspace, producer):
if v.operation is None:
raise RequestError("Null operation")
logger.debug(f"Knowledge request: {v.operation}")
impls = {
"list-kg-cores": self.knowledge.list_kg_cores,
"get-kg-core": self.knowledge.get_kg_core,
"delete-kg-core": self.knowledge.delete_kg_core,
"put-kg-core": self.knowledge.put_kg_core,
"load-kg-core": self.knowledge.load_kg_core,
"unload-kg-core": self.knowledge.unload_kg_core,
}
if v.operation not in impls:
raise RequestError(f"Invalid operation: {v.operation}")
async def respond(x):
await producer.send(
x, { "id": id }
)
return await impls[v.operation](v, respond, workspace)
async def on_knowledge_request(self, msg, consumer, flow, *, workspace):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
logger.info(f"Handling knowledge input {id}...")
producer = self.workspace_consumers[workspace]["response"]
try:
# We don't send a response back here, the processing
# implementation sends whatever it needs to send.
await self.process_request(v, id, workspace, producer)
return
except RequestError as e:
resp = KnowledgeResponse(
error = Error(
type = "request-error",
message = str(e),
)
)
await producer.send(
resp, properties={"id": id}
)
return
except Exception as e:
resp = KnowledgeResponse(
error = Error(
type = "unexpected-error",
message = str(e),
)
)
await producer.send(
resp, properties={"id": id}
)
return
logger.debug("Knowledge input processing complete")
@staticmethod
def add_args(parser):
WorkspaceProcessor.add_args(parser)
parser.add_argument(
'--knowledge-request-queue',
default=default_knowledge_request_queue,
help=f'Config request queue (default: {default_knowledge_request_queue})'
)
parser.add_argument(
'--knowledge-response-queue',
default=default_knowledge_response_queue,
help=f'Config response queue {default_knowledge_response_queue}',
)
add_cassandra_args(parser)
def run():
Processor.launch(default_ident, __doc__)