Per-workspace queue routing for workspace-scoped services

Workspace identity is now determined by queue infrastructure instead of
message body fields, closing a privilege-escalation vector where a caller
could spoof workspace in the request payload.

- Add WorkspaceProcessor base class: discovers workspaces from config at
  startup, creates per-workspace consumers (queue:workspace), and manages
  consumer lifecycle on workspace create/delete events
- Roll out to librarian, flow-svc, and knowledge cores
- Remove workspace field from request schemas (FlowRequest,
  LibrarianRequest, KnowledgeRequest, CollectionManagementRequest) and
  from DocumentMetadata / ProcessingMetadata — table stores now accept
  workspace as an explicit parameter for Cassandra partition keys
- Strip workspace encode/decode from all message translators and gateway
  serializers
- Config service gets a dual-queue regime: a system queue for
  cross-workspace ops (getvalues-all-ws, bootstrapper writes to
  __workspaces__) and per-workspace queues for tenant-scoped ops, with
  workspace discovery from its own Cassandra store
- Gateway enforces workspace requirement for workspace dispatchers —
  config moves from system_dispatchers to workspace_dispatchers so the
  gateway can never route to the system config queue
- Add workspace lifecycle hooks to AsyncProcessor so any processor can
  react to workspace create/delete without subclassing WorkspaceProcessor
This commit is contained in:
Cyber MacGeddon 2026-05-01 13:26:49 +01:00
parent 9be257ceee
commit 115e325071
43 changed files with 1322 additions and 607 deletions

View file

@ -2,13 +2,16 @@
import logging
from trustgraph.schema import ConfigResponse
from trustgraph.schema import ConfigValue, Error
from trustgraph.schema import ConfigValue, WorkspaceChanges, Error
from ... tables.config import ConfigTableStore
# Module logger
logger = logging.getLogger(__name__)
WORKSPACES_NAMESPACE = "__workspaces__"
WORKSPACE_TYPE = "workspace"
class Configuration:
def __init__(self, push, host, username, password, keyspace):
@ -26,9 +29,7 @@ class Configuration:
async def get_version(self):
return await self.table_store.get_version()
async def handle_get(self, v):
workspace = v.workspace
async def handle_get(self, v, workspace):
values = [
ConfigValue(
@ -46,18 +47,18 @@ class Configuration:
values = values,
)
async def handle_list(self, v):
async def handle_list(self, v, workspace):
return ConfigResponse(
version = await self.get_version(),
directory = await self.table_store.get_keys(
v.workspace, v.type
workspace, v.type
),
)
async def handle_getvalues(self, v):
async def handle_getvalues(self, v, workspace):
vals = await self.table_store.get_values(v.workspace, v.type)
vals = await self.table_store.get_values(workspace, v.type)
values = map(
lambda x: ConfigValue(
@ -93,9 +94,8 @@ class Configuration:
values = values,
)
async def handle_delete(self, v):
async def handle_delete(self, v, workspace):
workspace = v.workspace
types = list(set(k.type for k in v.keys))
for k in v.keys:
@ -103,14 +103,22 @@ class Configuration:
await self.inc_version()
await self.push(changes={t: [workspace] for t in types})
workspace_changes = None
if workspace == WORKSPACES_NAMESPACE and WORKSPACE_TYPE in types:
deleted = [k.key for k in v.keys if k.type == WORKSPACE_TYPE]
if deleted:
workspace_changes = WorkspaceChanges(deleted=deleted)
await self.push(
changes={t: [workspace] for t in types},
workspace_changes=workspace_changes,
)
return ConfigResponse(
)
async def handle_put(self, v):
async def handle_put(self, v, workspace):
workspace = v.workspace
types = list(set(k.type for k in v.values))
for k in v.values:
@ -120,7 +128,16 @@ class Configuration:
await self.inc_version()
await self.push(changes={t: [workspace] for t in types})
workspace_changes = None
if workspace == WORKSPACES_NAMESPACE and WORKSPACE_TYPE in types:
created = [k.key for k in v.values if k.type == WORKSPACE_TYPE]
if created:
workspace_changes = WorkspaceChanges(created=created)
await self.push(
changes={t: [workspace] for t in types},
workspace_changes=workspace_changes,
)
return ConfigResponse(
)
@ -138,62 +155,87 @@ class Configuration:
return config
async def handle_config(self, v):
async def handle_config(self, v, workspace):
config = await self.get_config(v.workspace)
config = await self.get_config(workspace)
return ConfigResponse(
version = await self.get_version(),
config = config,
)
async def handle(self, msg):
async def handle_workspace(self, msg, workspace):
"""Handle workspace-scoped config operations.
Workspace is provided by queue infrastructure."""
logger.debug(
f"Handling config message: {msg.operation} "
f"workspace={msg.workspace}"
f"Handling workspace config message: {msg.operation} "
f"workspace={workspace}"
)
# getvalues-all-ws spans all workspaces, so no workspace
# required; everything else is workspace-scoped.
if msg.operation != "getvalues-all-ws" and not msg.workspace:
return ConfigResponse(
error=Error(
type = "bad-request",
message = "Workspace is required"
)
)
if msg.operation == "get":
resp = await self.handle_get(msg)
resp = await self.handle_get(msg, workspace)
elif msg.operation == "list":
resp = await self.handle_list(msg)
resp = await self.handle_list(msg, workspace)
elif msg.operation == "getvalues":
resp = await self.handle_getvalues(msg)
elif msg.operation == "getvalues-all-ws":
resp = await self.handle_getvalues_all_ws(msg)
resp = await self.handle_getvalues(msg, workspace)
elif msg.operation == "delete":
resp = await self.handle_delete(msg)
resp = await self.handle_delete(msg, workspace)
elif msg.operation == "put":
resp = await self.handle_put(msg)
resp = await self.handle_put(msg, workspace)
elif msg.operation == "config":
resp = await self.handle_config(msg)
resp = await self.handle_config(msg, workspace)
else:
resp = ConfigResponse(
error=Error(
type = "bad-operation",
message = "Bad operation"
)
)
return resp
async def handle_system(self, msg):
"""Handle system-level config operations.
Workspace, when needed, comes from message body."""
logger.debug(
f"Handling system config message: {msg.operation} "
f"workspace={msg.workspace}"
)
if msg.operation == "getvalues-all-ws":
resp = await self.handle_getvalues_all_ws(msg)
elif msg.operation in ("get", "list", "getvalues", "delete",
"put", "config"):
if not msg.workspace:
return ConfigResponse(
error=Error(
type = "bad-request",
message = "Workspace is required"
)
)
handler = {
"get": self.handle_get,
"list": self.handle_list,
"getvalues": self.handle_getvalues,
"delete": self.handle_delete,
"put": self.handle_put,
"config": self.handle_config,
}[msg.operation]
resp = await handler(msg, msg.workspace)
else:
resp = ConfigResponse(
error=Error(
type = "bad-operation",

View file

@ -1,20 +1,30 @@
"""
Config service. Manages system global configuration state
Config service. Manages system global configuration state.
Operates a dual-queue regime:
- System queue (config-request): handles cross-workspace operations like
getvalues-all-ws and bootstrapper put/delete on __workspaces__.
The gateway NEVER routes to this queue.
- Per-workspace queues (config-request:<workspace>): handles
workspace-scoped operations where workspace identity comes from
queue infrastructure, not message body.
"""
import logging
from functools import partial
from trustgraph.schema import Error
from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush
from trustgraph.schema import WorkspaceChanges
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.schema import config_push_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from . config import Configuration
from . config import Configuration, WORKSPACES_NAMESPACE, WORKSPACE_TYPE
from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from ... base import Consumer, Producer
@ -39,6 +49,11 @@ def is_reserved_workspace(workspace):
"""
return workspace.startswith("_")
def workspace_queue(base_queue, workspace):
return f"{base_queue}:{workspace}"
default_config_request_queue = config_request_queue
default_config_response_queue = config_response_queue
default_config_push_queue = config_push_queue
@ -48,7 +63,7 @@ default_cassandra_host = "cassandra"
class Processor(AsyncProcessor):
def __init__(self, **params):
config_request_queue = params.get(
"config_request_queue", default_config_request_queue
)
@ -70,7 +85,7 @@ class Processor(AsyncProcessor):
password=cassandra_password,
default_keyspace="config"
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
@ -99,17 +114,17 @@ class Processor(AsyncProcessor):
processor = self.id, flow = None, name = "config-push"
)
self.config_request_topic = config_request_queue
self.config_request_queue_base = config_request_queue
self.config_request_subscriber = id
self.config_request_consumer = Consumer(
self.system_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = config_request_queue,
subscriber = id,
schema = ConfigRequest,
handler = self.on_config_request,
handler = self.on_system_config_request,
metrics = config_request_metrics,
)
@ -135,20 +150,84 @@ class Processor(AsyncProcessor):
push = self.push
)
self.workspace_consumers = {}
self.register_workspace_handler(self._handle_workspace_changes)
logger.info("Config service initialized")
async def _discover_workspaces(self):
workspaces = await self.config.table_store.get_keys(
WORKSPACES_NAMESPACE, WORKSPACE_TYPE
)
for workspace_id in workspaces:
if workspace_id not in self.workspace_consumers:
await self._add_workspace_consumer(workspace_id)
async def _handle_workspace_changes(self, workspace_changes):
for workspace_id in workspace_changes.created:
if workspace_id not in self.workspace_consumers:
logger.info(f"Workspace created: {workspace_id}")
await self._add_workspace_consumer(workspace_id)
for workspace_id in workspace_changes.deleted:
if workspace_id in self.workspace_consumers:
logger.info(f"Workspace deleted: {workspace_id}")
await self._remove_workspace_consumer(workspace_id)
async def _add_workspace_consumer(self, workspace_id):
queue = workspace_queue(
self.config_request_queue_base, workspace_id,
)
await self.pubsub.ensure_topic(queue)
consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=queue,
subscriber=self.id,
schema=ConfigRequest,
handler=partial(
self.on_workspace_config_request,
workspace=workspace_id,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"config-request-{workspace_id}",
),
)
await consumer.start()
self.workspace_consumers[workspace_id] = consumer
logger.info(
f"Subscribed to workspace config queue: {workspace_id}"
)
async def _remove_workspace_consumer(self, workspace_id):
consumer = self.workspace_consumers.pop(workspace_id, None)
if consumer:
await consumer.stop()
logger.info(
f"Unsubscribed from workspace config queue: {workspace_id}"
)
async def start(self):
await self.pubsub.ensure_topic(self.config_request_topic)
await self.pubsub.ensure_topic(self.config_request_queue_base)
await self.push() # Startup poke: empty types = everything
await self.config_request_consumer.start()
await self.system_consumer.start()
await self._discover_workspaces()
async def push(self, changes=None):
async def push(self, changes=None, workspace_changes=None):
# Suppress notifications from reserved workspaces (ids starting
# with "_", e.g. "__template__"). Stored config is preserved;
# only the broadcast is filtered. Keeps services oblivious to
# template / bootstrap state.
# with "_", e.g. "__template__") for regular config changes.
# The __workspaces__ namespace is handled separately via
# workspace_changes.
if changes:
filtered = {}
for type_name, workspaces in changes.items():
@ -165,16 +244,20 @@ class Processor(AsyncProcessor):
resp = ConfigPush(
version = version,
changes = changes or {},
workspace_changes = workspace_changes,
)
await self.config_push_producer.send(resp)
logger.info(
f"Pushed config poke version {version}, "
f"changes={resp.changes}"
f"changes={resp.changes}, "
f"workspace_changes={resp.workspace_changes}"
)
async def on_config_request(self, msg, consumer, flow):
async def on_workspace_config_request(
self, msg, consumer, flow, *, workspace
):
try:
@ -183,16 +266,49 @@ class Processor(AsyncProcessor):
# Sender-produced ID
id = msg.properties()["id"]
logger.debug(f"Handling config request {id}...")
logger.debug(
f"Handling workspace config request {id} "
f"workspace={workspace}..."
)
resp = await self.config.handle(v)
resp = await self.config.handle_workspace(v, workspace)
await self.config_response_producer.send(
resp, properties={"id": id}
)
except Exception as e:
resp = ConfigResponse(
error=Error(
type = "config-error",
message = str(e),
),
)
await self.config_response_producer.send(
resp, properties={"id": id}
)
async def on_system_config_request(self, msg, consumer, flow):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
logger.debug(f"Handling system config request {id}...")
resp = await self.config.handle_system(v)
await self.config_response_producer.send(
resp, properties={"id": id}
)
except Exception as e:
resp = ConfigResponse(
error=Error(
type = "config-error",
@ -228,4 +344,3 @@ class Processor(AsyncProcessor):
def run():
Processor.launch(default_ident, __doc__)

View file

@ -28,12 +28,12 @@ class KnowledgeManager:
self.background_task = None
self.flow_config = flow_config
async def delete_kg_core(self, request, respond):
async def delete_kg_core(self, request, respond, workspace):
logger.info("Deleting knowledge core...")
await self.table_store.delete_kg_core(
request.workspace, request.id
workspace, request.id
)
await respond(
@ -46,7 +46,7 @@ class KnowledgeManager:
)
)
async def get_kg_core(self, request, respond):
async def get_kg_core(self, request, respond, workspace):
logger.info("Getting knowledge core...")
@ -61,9 +61,8 @@ class KnowledgeManager:
)
)
# Remove doc table row
await self.table_store.get_triples(
request.workspace,
workspace,
request.id,
publish_triples,
)
@ -79,9 +78,8 @@ class KnowledgeManager:
)
)
# Remove doc table row
await self.table_store.get_graph_embeddings(
request.workspace,
workspace,
request.id,
publish_ge,
)
@ -98,9 +96,9 @@ class KnowledgeManager:
)
)
async def list_kg_cores(self, request, respond):
async def list_kg_cores(self, request, respond, workspace):
ids = await self.table_store.list_kg_cores(request.workspace)
ids = await self.table_store.list_kg_cores(workspace)
await respond(
KnowledgeResponse(
@ -112,9 +110,7 @@ class KnowledgeManager:
)
)
async def put_kg_core(self, request, respond):
workspace = request.workspace
async def put_kg_core(self, request, respond, workspace):
if request.triples:
await self.table_store.add_triples(workspace, request.triples)
@ -134,20 +130,18 @@ class KnowledgeManager:
)
)
async def load_kg_core(self, request, respond):
async def load_kg_core(self, request, respond, workspace):
if self.background_task is None:
self.background_task = asyncio.create_task(
self.core_loader()
)
# Wait for it to start (yuck)
# await asyncio.sleep(0.5)
await self.loader_queue.put((request, respond))
await self.loader_queue.put((request, respond, workspace))
# Not sending a response, the loader thread can do that
async def unload_kg_core(self, request, respond):
async def unload_kg_core(self, request, respond, workspace):
await respond(
KnowledgeResponse(
@ -168,7 +162,7 @@ class KnowledgeManager:
while True:
logger.debug("Waiting for next load...")
request, respond = await self.loader_queue.get()
request, respond, workspace = await self.loader_queue.get()
logger.info(f"Loading knowledge: {request.id}")
@ -180,7 +174,6 @@ class KnowledgeManager:
if request.flow is None:
raise RuntimeError("Flow ID must be specified")
workspace = request.workspace
ws_flows = self.flow_config.flows.get(workspace, {})
if request.flow not in ws_flows:
raise RuntimeError(
@ -262,9 +255,8 @@ class KnowledgeManager:
logger.debug("Publishing triples...")
# Remove doc table row
await self.table_store.get_triples(
request.workspace,
workspace,
request.id,
publish_triples,
)
@ -277,9 +269,8 @@ class KnowledgeManager:
logger.debug("Publishing graph embeddings...")
# Remove doc table row
await self.table_store.get_graph_embeddings(
request.workspace,
workspace,
request.id,
publish_ge,
)

View file

@ -9,7 +9,7 @@ import base64
import json
import logging
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
@ -33,13 +33,18 @@ default_knowledge_response_queue = knowledge_response_queue
default_cassandra_host = "cassandra"
class Processor(AsyncProcessor):
def workspace_queue(base_queue, workspace):
return f"{base_queue}:{workspace}"
class Processor(WorkspaceProcessor):
def __init__(self, **params):
id = params.get("id")
knowledge_request_queue = params.get(
self.knowledge_request_queue_base = params.get(
"knowledge_request_queue", default_knowledge_request_queue
)
@ -51,7 +56,6 @@ class Processor(AsyncProcessor):
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
@ -59,14 +63,13 @@ class Processor(AsyncProcessor):
default_keyspace="knowledge"
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
super(Processor, self).__init__(
**params | {
"knowledge_request_queue": knowledge_request_queue,
"knowledge_request_queue": self.knowledge_request_queue_base,
"knowledge_response_queue": knowledge_response_queue,
"cassandra_host": self.cassandra_host,
"cassandra_username": self.cassandra_username,
@ -74,28 +77,10 @@ class Processor(AsyncProcessor):
}
)
knowledge_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "knowledge-request"
)
knowledge_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "knowledge-response"
)
self.knowledge_request_topic = knowledge_request_queue
self.knowledge_request_subscriber = id
self.knowledge_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = knowledge_request_queue,
subscriber = id,
schema = KnowledgeRequest,
handler = self.on_knowledge_request,
metrics = knowledge_request_metrics,
)
self.knowledge_response_producer = Producer(
backend = self.pubsub,
topic = knowledge_response_queue,
@ -115,13 +100,52 @@ class Processor(AsyncProcessor):
self.flows = {}
self.workspace_consumers = {}
logger.info("Knowledge service initialized")
async def on_workspace_created(self, workspace):
if workspace in self.workspace_consumers:
return
queue = workspace_queue(
self.knowledge_request_queue_base, workspace,
)
await self.pubsub.ensure_topic(queue)
consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=queue,
subscriber=self.id,
schema=KnowledgeRequest,
handler=partial(
self.on_knowledge_request, workspace=workspace,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"knowledge-request-{workspace}",
),
)
await consumer.start()
self.workspace_consumers[workspace] = consumer
logger.info(f"Subscribed to workspace queue: {workspace}")
async def on_workspace_deleted(self, workspace):
consumer = self.workspace_consumers.pop(workspace, None)
if consumer:
await consumer.stop()
logger.info(f"Unsubscribed from workspace queue: {workspace}")
async def start(self):
await self.pubsub.ensure_topic(self.knowledge_request_topic)
await super(Processor, self).start()
await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start()
async def on_knowledge_config(self, workspace, config, version):
@ -140,7 +164,7 @@ class Processor(AsyncProcessor):
logger.debug(f"Flows for {workspace}: {self.flows[workspace]}")
async def process_request(self, v, id):
async def process_request(self, v, id, workspace):
if v.operation is None:
raise RequestError("Null operation")
@ -163,9 +187,9 @@ class Processor(AsyncProcessor):
await self.knowledge_response_producer.send(
x, { "id": id }
)
return await impls[v.operation](v, respond)
return await impls[v.operation](v, respond, workspace)
async def on_knowledge_request(self, msg, consumer, flow):
async def on_knowledge_request(self, msg, consumer, flow, *, workspace):
v = msg.value()
@ -179,7 +203,7 @@ class Processor(AsyncProcessor):
# We don't send a response back here, the processing
# implementation sends whatever it needs to send.
await self.process_request(v, id)
await self.process_request(v, id, workspace)
return
@ -215,7 +239,7 @@ class Processor(AsyncProcessor):
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
WorkspaceProcessor.add_args(parser)
parser.add_argument(
'--knowledge-request-queue',

View file

@ -118,10 +118,10 @@ class FlowConfig:
return resolved
async def handle_list_blueprints(self, msg):
async def handle_list_blueprints(self, msg, workspace):
names = list(await self.config.keys(
msg.workspace, "flow-blueprint"
workspace, "flow-blueprint"
))
return FlowResponse(
@ -129,19 +129,19 @@ class FlowConfig:
blueprint_names = names,
)
async def handle_get_blueprint(self, msg):
async def handle_get_blueprint(self, msg, workspace):
return FlowResponse(
error = None,
blueprint_definition = await self.config.get(
msg.workspace, "flow-blueprint", msg.blueprint_name
workspace, "flow-blueprint", msg.blueprint_name
),
)
async def handle_put_blueprint(self, msg):
async def handle_put_blueprint(self, msg, workspace):
await self.config.put(
msg.workspace, "flow-blueprint",
workspace, "flow-blueprint",
msg.blueprint_name, msg.blueprint_definition
)
@ -149,31 +149,31 @@ class FlowConfig:
error = None,
)
async def handle_delete_blueprint(self, msg):
async def handle_delete_blueprint(self, msg, workspace):
logger.debug(f"Flow config message: {msg}")
await self.config.delete(
msg.workspace, "flow-blueprint", msg.blueprint_name
workspace, "flow-blueprint", msg.blueprint_name
)
return FlowResponse(
error = None,
)
async def handle_list_flows(self, msg):
async def handle_list_flows(self, msg, workspace):
names = list(await self.config.keys(msg.workspace, "flow"))
names = list(await self.config.keys(workspace, "flow"))
return FlowResponse(
error = None,
flow_ids = names,
)
async def handle_get_flow(self, msg):
async def handle_get_flow(self, msg, workspace):
flow_data = await self.config.get(
msg.workspace, "flow", msg.flow_id
workspace, "flow", msg.flow_id
)
flow = json.loads(flow_data)
@ -184,9 +184,7 @@ class FlowConfig:
parameters = flow.get("parameters", {}),
)
async def handle_start_flow(self, msg):
workspace = msg.workspace
async def handle_start_flow(self, msg, workspace):
if msg.blueprint_name is None:
raise RuntimeError("No blueprint name")
@ -222,7 +220,7 @@ class FlowConfig:
logger.debug(f"Resolved parameters (with defaults): {parameters}")
# Apply parameter substitution to template replacement function.
# {workspace} is substituted from msg.workspace to isolate
# {workspace} is substituted from workspace to isolate
# queue names across workspaces.
def repl_template_with_params(tmp):
@ -548,9 +546,7 @@ class FlowConfig:
f"attempts: {topic}"
)
async def handle_stop_flow(self, msg):
workspace = msg.workspace
async def handle_stop_flow(self, msg, workspace):
if msg.flow_id is None:
raise RuntimeError("No flow ID")
@ -641,37 +637,29 @@ class FlowConfig:
error = None,
)
async def handle(self, msg):
async def handle(self, msg, workspace):
logger.debug(
f"Handling flow message: {msg.operation} "
f"workspace={msg.workspace}"
f"workspace={workspace}"
)
if not msg.workspace:
return FlowResponse(
error=Error(
type="bad-request",
message="Workspace is required",
),
)
if msg.operation == "list-blueprints":
resp = await self.handle_list_blueprints(msg)
resp = await self.handle_list_blueprints(msg, workspace)
elif msg.operation == "get-blueprint":
resp = await self.handle_get_blueprint(msg)
resp = await self.handle_get_blueprint(msg, workspace)
elif msg.operation == "put-blueprint":
resp = await self.handle_put_blueprint(msg)
resp = await self.handle_put_blueprint(msg, workspace)
elif msg.operation == "delete-blueprint":
resp = await self.handle_delete_blueprint(msg)
resp = await self.handle_delete_blueprint(msg, workspace)
elif msg.operation == "list-flows":
resp = await self.handle_list_flows(msg)
resp = await self.handle_list_flows(msg, workspace)
elif msg.operation == "get-flow":
resp = await self.handle_get_flow(msg)
resp = await self.handle_get_flow(msg, workspace)
elif msg.operation == "start-flow":
resp = await self.handle_start_flow(msg)
resp = await self.handle_start_flow(msg, workspace)
elif msg.operation == "stop-flow":
resp = await self.handle_stop_flow(msg)
resp = await self.handle_stop_flow(msg, workspace)
else:
resp = FlowResponse(

View file

@ -4,6 +4,7 @@ Flow service. Manages flow lifecycle — starting and stopping flows
by coordinating with the config service via pub/sub.
"""
from functools import partial
import logging
import uuid
@ -14,7 +15,7 @@ from trustgraph.schema import flow_request_queue, flow_response_queue
from trustgraph.schema import ConfigRequest, ConfigResponse
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base import WorkspaceProcessor, Consumer, Producer
from trustgraph.base import ConsumerMetrics, ProducerMetrics, SubscriberMetrics
from trustgraph.base import ConfigClient
@ -29,11 +30,15 @@ default_flow_request_queue = flow_request_queue
default_flow_response_queue = flow_response_queue
class Processor(AsyncProcessor):
def workspace_queue(base_queue, workspace):
return f"{base_queue}:{workspace}"
class Processor(WorkspaceProcessor):
def __init__(self, **params):
flow_request_queue = params.get(
self.flow_request_queue_base = params.get(
"flow_request_queue", default_flow_request_queue
)
flow_response_queue = params.get(
@ -49,27 +54,10 @@ class Processor(AsyncProcessor):
}
)
flow_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "flow-request"
)
flow_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "flow-response"
)
self.flow_request_topic = flow_request_queue
self.flow_request_subscriber = id
self.flow_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = flow_request_queue,
subscriber = id,
schema = FlowRequest,
handler = self.on_flow_request,
metrics = flow_request_metrics,
)
self.flow_response_producer = Producer(
backend = self.pubsub,
topic = flow_response_queue,
@ -84,13 +72,6 @@ class Processor(AsyncProcessor):
processor=self.id, flow=None, name="config-response",
)
# Unique subscription suffix per process instance. Pulsar's
# exclusive subscriptions reject a second consumer on the same
# (topic, subscription-name) — so a deterministic name here
# collides with its own ghost when the supervisor restarts the
# process before Pulsar has timed out the previous session
# (ConsumerBusy). Matches the uuid convention used elsewhere
# (gateway/config/receiver.py, AsyncProcessor._create_config_client).
config_rr_id = str(uuid.uuid4())
self.config_client = ConfigClient(
backend=self.pubsub,
@ -106,21 +87,58 @@ class Processor(AsyncProcessor):
self.flow = FlowConfig(self.config_client, self.pubsub)
self.workspace_consumers = {}
logger.info("Flow service initialized")
async def on_workspace_created(self, workspace):
if workspace in self.workspace_consumers:
return
queue = workspace_queue(
self.flow_request_queue_base, workspace,
)
await self.pubsub.ensure_topic(queue)
consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=queue,
subscriber=self.id,
schema=FlowRequest,
handler=partial(
self.on_flow_request, workspace=workspace,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"flow-request-{workspace}",
),
)
await consumer.start()
self.workspace_consumers[workspace] = consumer
logger.info(f"Subscribed to workspace queue: {workspace}")
async def on_workspace_deleted(self, workspace):
consumer = self.workspace_consumers.pop(workspace, None)
if consumer:
await consumer.stop()
logger.info(f"Unsubscribed from workspace queue: {workspace}")
async def start(self):
await self.pubsub.ensure_topic(self.flow_request_topic)
await super(Processor, self).start()
await self.config_client.start()
# Discover workspaces with existing flow config and ensure
# their topics exist before we start accepting requests.
workspaces = await self.config_client.workspaces_for_type("flow")
await self.flow.ensure_existing_flow_topics(workspaces)
await self.flow_request_consumer.start()
async def on_flow_request(self, msg, consumer, flow):
async def on_flow_request(self, msg, consumer, flow, *, workspace):
try:
@ -131,7 +149,7 @@ class Processor(AsyncProcessor):
logger.debug(f"Handling flow request {id}...")
resp = await self.flow.handle(v)
resp = await self.flow.handle(v, workspace)
await self.flow_response_producer.send(
resp, properties={"id": id}
@ -155,7 +173,7 @@ class Processor(AsyncProcessor):
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
WorkspaceProcessor.add_args(parser)
parser.add_argument(
'--flow-request-queue',

View file

@ -7,6 +7,12 @@ import logging
# Module logger
logger = logging.getLogger(__name__)
from ... schema import flow_request_queue
from ... schema import librarian_request_queue
from ... schema import knowledge_request_queue
from ... schema import collection_request_queue
from ... schema import config_request_queue
from . config import ConfigRequestor
from . flow import FlowRequestor
from . iam import IamRequestor
@ -70,15 +76,28 @@ request_response_dispatchers = {
"sparql": SparqlQueryRequestor,
}
global_dispatchers = {
system_dispatchers = {
"iam": IamRequestor,
}
workspace_dispatchers = {
"config": ConfigRequestor,
"flow": FlowRequestor,
"iam": IamRequestor,
"librarian": LibrarianRequestor,
"knowledge": KnowledgeRequestor,
"collection-management": CollectionManagementRequestor,
}
workspace_default_request_queues = {
"config": config_request_queue,
"flow": flow_request_queue,
"librarian": librarian_request_queue,
"knowledge": knowledge_request_queue,
"collection-management": collection_request_queue,
}
global_dispatchers = {**system_dispatchers, **workspace_dispatchers}
sender_dispatchers = {
"text-load": TextLoad,
"document-load": DocumentLoad,
@ -219,11 +238,24 @@ class DispatcherManager:
async def process_global_service(self, data, responder, params):
kind = params.get("kind")
return await self.invoke_global_service(data, responder, kind)
workspace = params.get("workspace")
if not workspace and isinstance(data, dict):
workspace = data.get("workspace")
return await self.invoke_global_service(
data, responder, kind, workspace=workspace,
)
async def invoke_global_service(self, data, responder, kind):
async def invoke_global_service(self, data, responder, kind,
workspace=None):
key = (None, kind)
if kind in workspace_dispatchers:
if not workspace:
raise RuntimeError(
f"Workspace is required for {kind}"
)
key = (workspace, kind)
else:
key = (None, kind)
if key not in self.dispatchers:
async with self.dispatcher_lock:
@ -234,11 +266,21 @@ class DispatcherManager:
request_queue = self.queue_overrides[kind].get("request")
response_queue = self.queue_overrides[kind].get("response")
if kind in workspace_dispatchers and workspace:
base_queue = (
request_queue
or workspace_default_request_queues[kind]
)
request_queue = f"{base_queue}:{workspace}"
consumer_name = f"{self.prefix}-{kind}-{workspace}"
else:
consumer_name = f"{self.prefix}-{kind}-request"
dispatcher = global_dispatchers[kind](
backend = self.backend,
timeout = 120,
consumer = f"{self.prefix}-{kind}-request",
subscriber = f"{self.prefix}-{kind}-request",
consumer = consumer_name,
subscriber = consumer_name,
request_queue = request_queue,
response_queue = response_queue,
)

View file

@ -310,7 +310,7 @@ class Mux:
else:
await self.dispatcher_manager.invoke_global_service(
request, responder, svc
request, responder, svc, workspace=workspace,
)
except Exception as e:

View file

@ -116,9 +116,6 @@ def serialize_document_metadata(message):
if message.metadata:
ret["metadata"] = serialize_subgraph(message.metadata)
if message.workspace:
ret["workspace"] = message.workspace
if message.tags is not None:
ret["tags"] = message.tags
@ -140,9 +137,6 @@ def serialize_processing_metadata(message):
if message.flow:
ret["flow"] = message.flow
if message.workspace:
ret["workspace"] = message.workspace
if message.collection:
ret["collection"] = message.collection
@ -160,7 +154,6 @@ def to_document_metadata(x):
title = x.get("title", None),
comments = x.get("comments", None),
metadata = to_subgraph(x["metadata"]),
workspace = x.get("workspace", None),
tags = x.get("tags", None),
)
@ -171,7 +164,6 @@ def to_processing_metadata(x):
document_id = x.get("document-id", None),
time = x.get("time", None),
flow = x.get("flow", None),
workspace = x.get("workspace", None),
collection = x.get("collection", None),
tags = x.get("tags", None),
)

View file

@ -245,7 +245,8 @@ def _sign_jwt(kid, private_pem, claims):
class IamService:
def __init__(self, host, username, password, keyspace,
bootstrap_mode, bootstrap_token=None):
bootstrap_mode, bootstrap_token=None,
on_workspace_created=None, on_workspace_deleted=None):
self.table_store = IamTableStore(
host, username, password, keyspace,
)
@ -267,6 +268,12 @@ class IamService:
self.bootstrap_mode = bootstrap_mode
self.bootstrap_token = bootstrap_token
# Callbacks for workspace lifecycle events. Called after the
# workspace is created/deleted in IAM's own store so that the
# processor can announce it via the config service.
self._on_workspace_created = on_workspace_created
self._on_workspace_deleted = on_workspace_deleted
self._signing_key = None
self._signing_key_lock = asyncio.Lock()
@ -424,6 +431,9 @@ class IamService:
created=now,
)
if self._on_workspace_created:
await self._on_workspace_created(DEFAULT_WORKSPACE)
admin_user_id = str(uuid.uuid4())
admin_password = secrets.token_urlsafe(32)
await self.table_store.put_user(
@ -904,6 +914,10 @@ class IamService:
enabled=v.workspace_record.enabled,
created=now,
)
if self._on_workspace_created:
await self._on_workspace_created(v.workspace_record.id)
row = await self.table_store.get_workspace(v.workspace_record.id)
return IamResponse(workspace=self._row_to_workspace_record(row))
@ -982,6 +996,9 @@ class IamService:
for kr in key_rows:
await self.table_store.delete_api_key(kr[0])
if self._on_workspace_deleted:
await self._on_workspace_deleted(v.workspace_record.id)
return IamResponse()
# ------------------------------------------------------------------

View file

@ -12,9 +12,13 @@ import os
from trustgraph.schema import Error
from trustgraph.schema import IamRequest, IamResponse
from trustgraph.schema import iam_request_queue, iam_response_queue
from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigValue
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base import ConsumerMetrics, ProducerMetrics
from trustgraph.base.metrics import SubscriberMetrics
from trustgraph.base.request_response_spec import RequestResponse
from trustgraph.base.cassandra_config import (
add_cassandra_args, resolve_cassandra_config,
)
@ -147,6 +151,8 @@ class Processor(AsyncProcessor):
keyspace=keyspace,
bootstrap_mode=self.bootstrap_mode,
bootstrap_token=self.bootstrap_token,
on_workspace_created=self._announce_workspace_created,
on_workspace_deleted=self._announce_workspace_deleted,
)
logger.info(
@ -160,6 +166,87 @@ class Processor(AsyncProcessor):
await self.iam.auto_bootstrap_if_token_mode()
await self.iam_request_consumer.start()
def _create_config_client(self):
import uuid
config_rr_id = str(uuid.uuid4())
config_req_metrics = ProducerMetrics(
processor=self.id, flow=None, name="config-request",
)
config_resp_metrics = SubscriberMetrics(
processor=self.id, flow=None, name="config-response",
)
return RequestResponse(
backend=self.pubsub,
subscription=f"{self.id}--config--{config_rr_id}",
consumer_name=self.id,
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=config_req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=config_resp_metrics,
)
async def _config_put(self, workspace, type, key, value):
client = self._create_config_client()
try:
await client.start()
await client.request(
ConfigRequest(
operation="put",
workspace=workspace,
values=[ConfigValue(type=type, key=key, value=value)],
),
timeout=10,
)
finally:
await client.stop()
async def _config_delete(self, workspace, type, key):
from trustgraph.schema import ConfigKey
client = self._create_config_client()
try:
await client.start()
await client.request(
ConfigRequest(
operation="delete",
workspace=workspace,
keys=[ConfigKey(type=type, key=key)],
),
timeout=10,
)
finally:
await client.stop()
async def _announce_workspace_created(self, workspace_id):
try:
await self._config_put(
"__workspaces__", "workspace", workspace_id,
'{"enabled": true}',
)
logger.info(
f"Announced workspace creation: {workspace_id}"
)
except Exception as e:
logger.error(
f"Failed to announce workspace creation "
f"{workspace_id}: {e}", exc_info=True,
)
async def _announce_workspace_deleted(self, workspace_id):
try:
await self._config_delete(
"__workspaces__", "workspace", workspace_id,
)
logger.info(
f"Announced workspace deletion: {workspace_id}"
)
except Exception as e:
logger.error(
f"Failed to announce workspace deletion "
f"{workspace_id}: {e}", exc_info=True,
)
async def on_iam_request(self, msg, consumer, flow):
id = None

View file

@ -151,21 +151,11 @@ class CollectionManager:
logger.error(f"Error ensuring collection exists: {e}")
raise e
async def list_collections(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
List collections for a user from config service
Args:
request: Collection management request
Returns:
CollectionManagementResponse with list of collections
"""
async def list_collections(self, request, workspace):
try:
# Get all collections in this workspace from config service
config_request = ConfigRequest(
operation='getvalues',
workspace=request.workspace,
workspace=workspace,
type='collection'
)
@ -210,18 +200,8 @@ class CollectionManager:
logger.error(f"Error listing collections: {e}")
raise RequestError(f"Failed to list collections: {str(e)}")
async def update_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
Update collection metadata via config service (creates if doesn't exist)
Args:
request: Collection management request
Returns:
CollectionManagementResponse with updated collection
"""
async def update_collection(self, request, workspace):
try:
# Create metadata from request
name = request.name if request.name else request.collection
description = request.description if request.description else ""
tags = list(request.tags) if request.tags else []
@ -233,10 +213,9 @@ class CollectionManager:
tags=tags
)
# Send put request to config service
config_request = ConfigRequest(
operation='put',
workspace=request.workspace,
workspace=workspace,
values=[ConfigValue(
type='collection',
key=request.collection,
@ -249,7 +228,7 @@ class CollectionManager:
if response.error:
raise RuntimeError(f"Config update failed: {response.error.message}")
logger.info(f"Collection {request.workspace}/{request.collection} updated in config service")
logger.info(f"Collection {workspace}/{request.collection} updated in config service")
# Config service will trigger config push automatically
# Storage services will receive update and create/update collections
@ -264,23 +243,13 @@ class CollectionManager:
logger.error(f"Error updating collection: {e}")
raise RequestError(f"Failed to update collection: {str(e)}")
async def delete_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
Delete collection via config service
Args:
request: Collection management request
Returns:
CollectionManagementResponse indicating success or failure
"""
async def delete_collection(self, request, workspace):
try:
logger.info(f"Deleting collection {request.workspace}/{request.collection}")
logger.info(f"Deleting collection {workspace}/{request.collection}")
# Send delete request to config service
config_request = ConfigRequest(
operation='delete',
workspace=request.workspace,
workspace=workspace,
keys=[ConfigKey(type='collection', key=request.collection)]
)
@ -289,7 +258,7 @@ class CollectionManager:
if response.error:
raise RuntimeError(f"Config delete failed: {response.error.message}")
logger.info(f"Collection {request.workspace}/{request.collection} deleted from config service")
logger.info(f"Collection {workspace}/{request.collection} deleted from config service")
# Config service will trigger config push automatically
# Storage services will receive update and delete collections

View file

@ -42,13 +42,13 @@ class Librarian:
self.load_document = load_document
self.min_chunk_size = min_chunk_size
async def add_document(self, request):
async def add_document(self, request, workspace):
if not request.document_metadata.kind:
raise RequestError("Document kind (MIME type) is required")
if await self.table_store.document_exists(
request.document_metadata.workspace,
workspace,
request.document_metadata.id
):
raise RuntimeError("Document already exists")
@ -66,19 +66,19 @@ class Librarian:
logger.debug("Adding to table...")
await self.table_store.add_document(
request.document_metadata, object_id
workspace, request.document_metadata, object_id
)
logger.debug("Add complete")
return LibrarianResponse()
async def remove_document(self, request):
async def remove_document(self, request, workspace):
logger.debug("Removing document...")
if not await self.table_store.document_exists(
request.workspace,
workspace,
request.document_id,
):
raise RuntimeError("Document does not exist")
@ -89,17 +89,17 @@ class Librarian:
logger.debug(f"Cascade deleting child document {child.id}")
try:
child_object_id = await self.table_store.get_document_object_id(
child.workspace,
workspace,
child.id
)
await self.blob_store.remove(child_object_id)
await self.table_store.remove_document(child.workspace, child.id)
await self.table_store.remove_document(workspace, child.id)
except Exception as e:
logger.warning(f"Failed to delete child document {child.id}: {e}")
# Now remove the parent document
object_id = await self.table_store.get_document_object_id(
request.workspace,
workspace,
request.document_id
)
@ -108,7 +108,7 @@ class Librarian:
# Remove doc table row
await self.table_store.remove_document(
request.workspace,
workspace,
request.document_id
)
@ -116,30 +116,30 @@ class Librarian:
return LibrarianResponse()
async def update_document(self, request):
async def update_document(self, request, workspace):
logger.debug("Updating document...")
# You can't update the document ID, workspace or kind.
if not await self.table_store.document_exists(
request.document_metadata.workspace,
workspace,
request.document_metadata.id
):
raise RuntimeError("Document does not exist")
await self.table_store.update_document(request.document_metadata)
await self.table_store.update_document(workspace, request.document_metadata)
logger.debug("Update complete")
return LibrarianResponse()
async def get_document_metadata(self, request):
async def get_document_metadata(self, request, workspace):
logger.debug("Getting document metadata...")
doc = await self.table_store.get_document(
request.workspace,
workspace,
request.document_id
)
@ -151,12 +151,12 @@ class Librarian:
content = None,
)
async def get_document_content(self, request):
async def get_document_content(self, request, workspace):
logger.debug("Getting document content...")
object_id = await self.table_store.get_document_object_id(
request.workspace,
workspace,
request.document_id
)
@ -172,7 +172,7 @@ class Librarian:
content = base64.b64encode(content),
)
async def add_processing(self, request):
async def add_processing(self, request, workspace):
logger.debug("Adding processing metadata...")
@ -180,18 +180,18 @@ class Librarian:
raise RuntimeError("Collection parameter is required")
if await self.table_store.processing_exists(
request.processing_metadata.workspace,
workspace,
request.processing_metadata.id
):
raise RuntimeError("Processing already exists")
doc = await self.table_store.get_document(
request.processing_metadata.workspace,
workspace,
request.processing_metadata.document_id
)
object_id = await self.table_store.get_document_object_id(
request.processing_metadata.workspace,
workspace,
request.processing_metadata.document_id
)
@ -203,7 +203,7 @@ class Librarian:
logger.debug("Adding processing to table...")
await self.table_store.add_processing(request.processing_metadata)
await self.table_store.add_processing(workspace, request.processing_metadata)
logger.debug("Invoking document processing...")
@ -211,25 +211,26 @@ class Librarian:
document = doc,
processing = request.processing_metadata,
content = content,
workspace = workspace,
)
logger.debug("Add complete")
return LibrarianResponse()
async def remove_processing(self, request):
async def remove_processing(self, request, workspace):
logger.debug("Removing processing metadata...")
if not await self.table_store.processing_exists(
request.workspace,
workspace,
request.processing_id,
):
raise RuntimeError("Processing object does not exist")
# Remove doc table row
await self.table_store.remove_processing(
request.workspace,
workspace,
request.processing_id
)
@ -237,9 +238,9 @@ class Librarian:
return LibrarianResponse()
async def list_documents(self, request):
async def list_documents(self, request, workspace):
docs = await self.table_store.list_documents(request.workspace)
docs = await self.table_store.list_documents(workspace)
# Filter out child documents and answer documents by default
include_children = getattr(request, 'include_children', False)
@ -254,9 +255,9 @@ class Librarian:
document_metadatas = docs,
)
async def list_processing(self, request):
async def list_processing(self, request, workspace):
procs = await self.table_store.list_processing(request.workspace)
procs = await self.table_store.list_processing(workspace)
return LibrarianResponse(
processing_metadatas = procs,
@ -264,7 +265,7 @@ class Librarian:
# Chunked upload operations
async def begin_upload(self, request):
async def begin_upload(self, request, workspace):
"""
Initialize a chunked upload session.
@ -276,7 +277,7 @@ class Librarian:
raise RequestError("Document kind (MIME type) is required")
if await self.table_store.document_exists(
request.document_metadata.workspace,
workspace,
request.document_metadata.id
):
raise RequestError("Document already exists")
@ -312,14 +313,13 @@ class Librarian:
"kind": request.document_metadata.kind,
"title": request.document_metadata.title,
"comments": request.document_metadata.comments,
"workspace": request.document_metadata.workspace,
"tags": request.document_metadata.tags,
})
# Store session in Cassandra
await self.table_store.create_upload_session(
upload_id=upload_id,
workspace=request.document_metadata.workspace,
workspace=workspace,
document_id=request.document_metadata.id,
document_metadata=doc_meta_json,
s3_upload_id=s3_upload_id,
@ -338,7 +338,7 @@ class Librarian:
total_chunks=total_chunks,
)
async def upload_chunk(self, request):
async def upload_chunk(self, request, workspace):
"""
Upload a single chunk of a document.
@ -352,7 +352,7 @@ class Librarian:
raise RequestError("Upload session not found or expired")
# Validate ownership
if session["workspace"] != request.workspace:
if session["workspace"] != workspace:
raise RequestError("Not authorized to upload to this session")
# Validate chunk index
@ -405,7 +405,7 @@ class Librarian:
total_bytes=session["total_size"],
)
async def complete_upload(self, request):
async def complete_upload(self, request, workspace):
"""
Finalize a chunked upload and create the document.
@ -419,7 +419,7 @@ class Librarian:
raise RequestError("Upload session not found or expired")
# Validate ownership
if session["workspace"] != request.workspace:
if session["workspace"] != workspace:
raise RequestError("Not authorized to complete this upload")
# Verify all chunks received
@ -457,13 +457,13 @@ class Librarian:
kind=doc_meta_dict["kind"],
title=doc_meta_dict.get("title", ""),
comments=doc_meta_dict.get("comments", ""),
workspace=doc_meta_dict["workspace"],
tags=doc_meta_dict.get("tags", []),
metadata=[], # Triples not supported in chunked upload yet
)
# Add document to table
await self.table_store.add_document(doc_metadata, session["object_id"])
workspace = session["workspace"]
await self.table_store.add_document(workspace, doc_metadata, session["object_id"])
# Delete upload session
await self.table_store.delete_upload_session(request.upload_id)
@ -476,7 +476,7 @@ class Librarian:
object_id=str(session["object_id"]),
)
async def abort_upload(self, request):
async def abort_upload(self, request, workspace):
"""
Cancel a chunked upload and clean up resources.
"""
@ -488,7 +488,7 @@ class Librarian:
raise RequestError("Upload session not found or expired")
# Validate ownership
if session["workspace"] != request.workspace:
if session["workspace"] != workspace:
raise RequestError("Not authorized to abort this upload")
# Abort S3 multipart upload
@ -504,7 +504,7 @@ class Librarian:
return LibrarianResponse(error=None)
async def get_upload_status(self, request):
async def get_upload_status(self, request, workspace):
"""
Get the status of an in-progress upload.
"""
@ -520,7 +520,7 @@ class Librarian:
)
# Validate ownership
if session["workspace"] != request.workspace:
if session["workspace"] != workspace:
raise RequestError("Not authorized to view this upload")
chunks_received = session["chunks_received"]
@ -546,13 +546,13 @@ class Librarian:
total_bytes=session["total_size"],
)
async def list_uploads(self, request):
async def list_uploads(self, request, workspace):
"""
List all in-progress uploads for a workspace.
"""
logger.debug(f"Listing uploads for workspace {request.workspace}")
logger.debug(f"Listing uploads for workspace {workspace}")
sessions = await self.table_store.list_upload_sessions(request.workspace)
sessions = await self.table_store.list_upload_sessions(workspace)
upload_sessions = [
UploadSession(
@ -575,7 +575,7 @@ class Librarian:
# Child document operations
async def add_child_document(self, request):
async def add_child_document(self, request, workspace):
"""
Add a child document linked to a parent document.
@ -591,7 +591,7 @@ class Librarian:
# Verify parent exists
if not await self.table_store.document_exists(
request.document_metadata.workspace,
workspace,
request.document_metadata.parent_id
):
raise RequestError(
@ -599,7 +599,7 @@ class Librarian:
)
if await self.table_store.document_exists(
request.document_metadata.workspace,
workspace,
request.document_metadata.id
):
raise RequestError("Document already exists")
@ -622,7 +622,7 @@ class Librarian:
logger.debug("Adding to table...")
await self.table_store.add_document(
request.document_metadata, object_id
workspace, request.document_metadata, object_id
)
logger.debug("Add child document complete")
@ -632,7 +632,7 @@ class Librarian:
document_id=request.document_metadata.id,
)
async def list_children(self, request):
async def list_children(self, request, workspace):
"""
List all child documents for a given parent document.
"""
@ -645,7 +645,7 @@ class Librarian:
document_metadatas=children,
)
async def stream_document(self, request):
async def stream_document(self, request, workspace):
"""
Stream document content in chunks.
@ -665,7 +665,7 @@ class Librarian:
)
object_id = await self.table_store.get_document_object_id(
request.workspace,
workspace,
request.document_id
)
@ -697,4 +697,3 @@ class Librarian:
total_bytes=total_size,
is_final=is_last,
)

View file

@ -10,7 +10,7 @@ import json
import logging
from datetime import datetime
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
@ -46,6 +46,9 @@ default_collection_response_queue = collection_response_queue
default_config_request_queue = config_request_queue
default_config_response_queue = config_response_queue
def workspace_queue(base_queue, workspace):
return f"{base_queue}:{workspace}"
default_object_store_endpoint = "ceph-rgw:7480"
default_object_store_access_key = "object-user"
default_object_store_secret_key = "object-password"
@ -56,15 +59,13 @@ default_min_chunk_size = 1 # No minimum by default (for Garage)
bucket_name = "library"
class Processor(AsyncProcessor):
class Processor(WorkspaceProcessor):
def __init__(self, **params):
id = params.get("id")
# self.running = True
librarian_request_queue = params.get(
self.librarian_request_queue_base = params.get(
"librarian_request_queue", default_librarian_request_queue
)
@ -72,7 +73,7 @@ class Processor(AsyncProcessor):
"librarian_response_queue", default_librarian_response_queue
)
collection_request_queue = params.get(
self.collection_request_queue_base = params.get(
"collection_request_queue", default_collection_request_queue
)
@ -130,9 +131,9 @@ class Processor(AsyncProcessor):
super(Processor, self).__init__(
**params | {
"librarian_request_queue": librarian_request_queue,
"librarian_request_queue": self.librarian_request_queue_base,
"librarian_response_queue": librarian_response_queue,
"collection_request_queue": collection_request_queue,
"collection_request_queue": self.collection_request_queue_base,
"collection_response_queue": collection_response_queue,
"object_store_endpoint": object_store_endpoint,
"object_store_access_key": object_store_access_key,
@ -142,40 +143,14 @@ class Processor(AsyncProcessor):
}
)
librarian_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "librarian-request"
)
librarian_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "librarian-response"
)
collection_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "collection-request"
)
collection_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "collection-response"
)
storage_response_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "storage-response"
)
self.librarian_request_topic = librarian_request_queue
self.librarian_request_subscriber = id
self.librarian_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = librarian_request_queue,
subscriber = id,
schema = LibrarianRequest,
handler = self.on_librarian_request,
metrics = librarian_request_metrics,
)
self.librarian_response_producer = Producer(
backend = self.pubsub,
topic = librarian_response_queue,
@ -183,20 +158,6 @@ class Processor(AsyncProcessor):
metrics = librarian_response_metrics,
)
self.collection_request_topic = collection_request_queue
self.collection_request_subscriber = id
self.collection_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = collection_request_queue,
subscriber = id,
schema = CollectionManagementRequest,
handler = self.on_collection_request,
metrics = collection_request_metrics,
)
self.collection_response_producer = Producer(
backend = self.pubsub,
topic = collection_response_queue,
@ -259,16 +220,80 @@ class Processor(AsyncProcessor):
self.flows = {}
# Per-workspace consumers, keyed by workspace id
self.workspace_consumers = {}
logger.info("Librarian service initialized")
async def on_workspace_created(self, workspace):
if workspace in self.workspace_consumers:
return
lib_queue = workspace_queue(
self.librarian_request_queue_base, workspace,
)
col_queue = workspace_queue(
self.collection_request_queue_base, workspace,
)
await self.pubsub.ensure_topic(lib_queue)
await self.pubsub.ensure_topic(col_queue)
lib_consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=lib_queue,
subscriber=self.id,
schema=LibrarianRequest,
handler=partial(
self.on_librarian_request, workspace=workspace,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"librarian-request-{workspace}",
),
)
col_consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=col_queue,
subscriber=self.id,
schema=CollectionManagementRequest,
handler=partial(
self.on_collection_request, workspace=workspace,
),
metrics=ConsumerMetrics(
processor=self.id, flow=None,
name=f"collection-request-{workspace}",
),
)
await lib_consumer.start()
await col_consumer.start()
self.workspace_consumers[workspace] = {
"librarian": lib_consumer,
"collection": col_consumer,
}
logger.info(f"Subscribed to workspace queues: {workspace}")
async def on_workspace_deleted(self, workspace):
consumers = self.workspace_consumers.pop(workspace, None)
if consumers:
for consumer in consumers.values():
await consumer.stop()
logger.info(f"Unsubscribed from workspace queues: {workspace}")
async def start(self):
await self.pubsub.ensure_topic(self.librarian_request_topic)
await self.pubsub.ensure_topic(self.collection_request_topic)
await super(Processor, self).start()
await self.librarian_request_consumer.start()
await self.librarian_response_producer.start()
await self.collection_request_consumer.start()
await self.collection_response_producer.start()
await self.config_request_producer.start()
await self.config_response_consumer.start()
@ -360,13 +385,12 @@ class Processor(AsyncProcessor):
finally:
await triples_pub.stop()
async def load_document(self, document, processing, content):
async def load_document(self, document, processing, content, workspace):
logger.debug("Ready for document processing...")
logger.debug(f"Document: {document}, processing: {processing}, content length: {len(content)}")
workspace = processing.workspace
ws_flows = self.flows.get(workspace, {})
if processing.flow not in ws_flows:
raise RuntimeError(
@ -429,20 +453,14 @@ class Processor(AsyncProcessor):
logger.debug("Document submitted")
async def add_processing_with_collection(self, request):
"""
Wrapper for add_processing that ensures collection exists
"""
# Ensure collection exists when processing is added
async def add_processing_with_collection(self, request, workspace):
if hasattr(request, 'processing_metadata') and request.processing_metadata:
workspace = request.processing_metadata.workspace
collection = request.processing_metadata.collection
await self.collection_manager.ensure_collection_exists(workspace, collection)
# Call the original add_processing method
return await self.librarian.add_processing(request)
return await self.librarian.add_processing(request, workspace)
async def process_request(self, v):
async def process_request(self, v, workspace):
if v.operation is None:
raise RequestError("Null operation")
@ -475,9 +493,9 @@ class Processor(AsyncProcessor):
if v.operation not in impls:
raise RequestError(f"Invalid operation: {v.operation}")
return await impls[v.operation](v)
return await impls[v.operation](v, workspace)
async def on_librarian_request(self, msg, consumer, flow):
async def on_librarian_request(self, msg, consumer, flow, *, workspace):
v = msg.value()
@ -491,14 +509,14 @@ class Processor(AsyncProcessor):
# Handle streaming operations specially
if v.operation == "stream-document":
async for resp in self.librarian.stream_document(v):
async for resp in self.librarian.stream_document(v, workspace):
await self.librarian_response_producer.send(
resp, properties={"id": id}
)
return
# Non-streaming operations
resp = await self.process_request(v)
resp = await self.process_request(v, workspace)
await self.librarian_response_producer.send(
resp, properties={"id": id}
@ -535,10 +553,7 @@ class Processor(AsyncProcessor):
logger.debug("Librarian input processing complete")
async def process_collection_request(self, v):
"""
Process collection management requests
"""
async def process_collection_request(self, v, workspace):
if v.operation is None:
raise RequestError("Null operation")
@ -553,19 +568,16 @@ class Processor(AsyncProcessor):
if v.operation not in impls:
raise RequestError(f"Invalid collection operation: {v.operation}")
return await impls[v.operation](v)
return await impls[v.operation](v, workspace)
async def on_collection_request(self, msg, consumer, flow):
"""
Handle collection management request messages
"""
async def on_collection_request(self, msg, consumer, flow, *, workspace):
v = msg.value()
id = msg.properties().get("id", "unknown")
logger.info(f"Handling collection request {id}...")
try:
resp = await self.process_collection_request(v)
resp = await self.process_collection_request(v, workspace)
await self.collection_response_producer.send(
resp, properties={"id": id}
)
@ -597,7 +609,7 @@ class Processor(AsyncProcessor):
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
WorkspaceProcessor.add_args(parser)
parser.add_argument(
'--librarian-request-queue',

View file

@ -312,7 +312,7 @@ class LibraryTableStore:
return bool(rows)
async def add_document(self, document, object_id):
async def add_document(self, workspace, document, object_id):
logger.info(f"Adding document {document.id} {object_id}")
@ -332,7 +332,7 @@ class LibraryTableStore:
self.cassandra,
self.insert_document_stmt,
(
document.id, document.workspace, int(document.time * 1000),
document.id, workspace, int(document.time * 1000),
document.kind, document.title, document.comments,
metadata, document.tags, object_id,
parent_id, document_type
@ -344,7 +344,7 @@ class LibraryTableStore:
logger.debug("Add complete")
async def update_document(self, document):
async def update_document(self, workspace, document):
logger.info(f"Updating document {document.id}")
@ -362,7 +362,7 @@ class LibraryTableStore:
(
int(document.time * 1000), document.title,
document.comments, metadata, document.tags,
document.workspace, document.id
workspace, document.id
),
)
except Exception:
@ -404,7 +404,6 @@ class LibraryTableStore:
lst = [
DocumentMetadata(
id = row[0],
workspace = workspace,
time = int(time.mktime(row[1].timetuple())),
kind = row[2],
title = row[3],
@ -446,7 +445,6 @@ class LibraryTableStore:
lst = [
DocumentMetadata(
id = row[0],
workspace = row[1],
time = int(time.mktime(row[2].timetuple())),
kind = row[3],
title = row[4],
@ -487,7 +485,6 @@ class LibraryTableStore:
for row in rows:
doc = DocumentMetadata(
id = id,
workspace = workspace,
time = int(time.mktime(row[0].timetuple())),
kind = row[1],
title = row[2],
@ -540,7 +537,7 @@ class LibraryTableStore:
return bool(rows)
async def add_processing(self, processing):
async def add_processing(self, workspace, processing):
logger.info(f"Adding processing {processing.id}")
@ -551,7 +548,7 @@ class LibraryTableStore:
(
processing.id, processing.document_id,
int(processing.time * 1000), processing.flow,
processing.workspace, processing.collection,
workspace, processing.collection,
processing.tags
),
)
@ -597,7 +594,6 @@ class LibraryTableStore:
document_id = row[1],
time = int(time.mktime(row[2].timetuple())),
flow = row[3],
workspace = workspace,
collection = row[4],
tags = row[5] if row[5] else [],
)