mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-05-06 22:02:37 +02:00
Per-workspace queue routing for workspace-scoped services (#862)
Workspace identity is now determined by queue infrastructure instead of message body fields, closing a privilege-escalation vector where a caller could spoof workspace in the request payload. - Add WorkspaceProcessor base class: discovers workspaces from config at startup, creates per-workspace consumers (queue:workspace), and manages consumer lifecycle on workspace create/delete events - Roll out to librarian, flow-svc, knowledge cores, and config-svc - Config service gets a dual-queue regime: a system queue for cross-workspace ops (getvalues-all-ws, bootstrapper writes to __workspaces__) and per-workspace queues for tenant-scoped ops, with workspace discovery from its own Cassandra store - Remove workspace field from request schemas (FlowRequest, LibrarianRequest, KnowledgeRequest, CollectionManagementRequest) and from DocumentMetadata / ProcessingMetadata — table stores now accept workspace as an explicit parameter - Strip workspace encode/decode from all message translators and gateway serializers - Gateway enforces workspace existence: reject requests targeting non-existent workspaces instead of routing to queues with no consumer - Config service provisions new workspaces from __template__ on creation - Add workspace lifecycle hooks to AsyncProcessor so any processor can react to workspace create/delete without subclassing WorkspaceProcessor
This commit is contained in:
parent
9be257ceee
commit
9f2bfbce0c
53 changed files with 1565 additions and 677 deletions
|
|
@ -21,7 +21,7 @@ class InitContext:
|
|||
|
||||
logger: logging.Logger
|
||||
config: Any # ConfigClient
|
||||
flow: Any # RequestResponse client for flow-svc
|
||||
make_flow_client: Any # callable(workspace) -> RequestResponse
|
||||
|
||||
|
||||
class Initialiser:
|
||||
|
|
|
|||
|
|
@ -178,13 +178,13 @@ class Processor(AsyncProcessor):
|
|||
),
|
||||
)
|
||||
|
||||
def _make_flow_client(self):
|
||||
def _make_flow_client(self, workspace):
|
||||
rr_id = str(uuid.uuid4())
|
||||
return RequestResponse(
|
||||
backend=self.pubsub_backend,
|
||||
subscription=f"{self.id}--flow--{rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=flow_request_queue,
|
||||
request_topic=f"{flow_request_queue}:{workspace}",
|
||||
request_schema=FlowRequest,
|
||||
request_metrics=ProducerMetrics(
|
||||
processor=self.id, flow=None, name="flow-request",
|
||||
|
|
@ -198,14 +198,8 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def _open_clients(self):
|
||||
config = self._make_config_client()
|
||||
flow = self._make_flow_client()
|
||||
await config.start()
|
||||
try:
|
||||
await flow.start()
|
||||
except Exception:
|
||||
await self._safe_stop(config)
|
||||
raise
|
||||
return config, flow
|
||||
return config
|
||||
|
||||
async def _safe_stop(self, client):
|
||||
try:
|
||||
|
|
@ -217,7 +211,14 @@ class Processor(AsyncProcessor):
|
|||
# Service gate.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _gate_ready(self, config, flow):
|
||||
def _gate_workspace(self):
|
||||
for spec in self.specs:
|
||||
ws = getattr(spec.instance, "workspace", None)
|
||||
if ws and not ws.startswith("_"):
|
||||
return ws
|
||||
return None
|
||||
|
||||
async def _gate_ready(self, config):
|
||||
try:
|
||||
await config.keys(SYSTEM_WORKSPACE, INIT_STATE_TYPE)
|
||||
except Exception as e:
|
||||
|
|
@ -226,11 +227,16 @@ class Processor(AsyncProcessor):
|
|||
)
|
||||
return False
|
||||
|
||||
workspace = self._gate_workspace()
|
||||
if workspace is None:
|
||||
return True
|
||||
|
||||
flow = self._make_flow_client(workspace)
|
||||
try:
|
||||
await flow.start()
|
||||
resp = await flow.request(
|
||||
FlowRequest(
|
||||
operation="list-blueprints",
|
||||
workspace=SYSTEM_WORKSPACE,
|
||||
),
|
||||
timeout=5,
|
||||
)
|
||||
|
|
@ -245,6 +251,8 @@ class Processor(AsyncProcessor):
|
|||
f"Gate: flow-svc not ready ({type(e).__name__}: {e})"
|
||||
)
|
||||
return False
|
||||
finally:
|
||||
await self._safe_stop(flow)
|
||||
|
||||
return True
|
||||
|
||||
|
|
@ -271,7 +279,7 @@ class Processor(AsyncProcessor):
|
|||
# Per-spec execution.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_spec(self, spec, config, flow):
|
||||
async def _run_spec(self, spec, config):
|
||||
"""Run a single initialiser spec.
|
||||
|
||||
Returns one of:
|
||||
|
|
@ -298,7 +306,7 @@ class Processor(AsyncProcessor):
|
|||
child_ctx = InitContext(
|
||||
logger=child_logger,
|
||||
config=config,
|
||||
flow=flow,
|
||||
make_flow_client=self._make_flow_client,
|
||||
)
|
||||
|
||||
child_logger.info(
|
||||
|
|
@ -340,7 +348,7 @@ class Processor(AsyncProcessor):
|
|||
sleep_for = STEADY_INTERVAL
|
||||
|
||||
try:
|
||||
config, flow = await self._open_clients()
|
||||
config = await self._open_clients()
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
f"Failed to open clients "
|
||||
|
|
@ -358,11 +366,11 @@ class Processor(AsyncProcessor):
|
|||
pre_results = {}
|
||||
for spec in pre_specs:
|
||||
pre_results[spec.name] = await self._run_spec(
|
||||
spec, config, flow,
|
||||
spec, config,
|
||||
)
|
||||
|
||||
# Phase 2: gate.
|
||||
gate_ok = await self._gate_ready(config, flow)
|
||||
gate_ok = await self._gate_ready(config)
|
||||
|
||||
# Phase 3: post-service initialisers, if gate passed.
|
||||
post_results = {}
|
||||
|
|
@ -373,7 +381,7 @@ class Processor(AsyncProcessor):
|
|||
]
|
||||
for spec in post_specs:
|
||||
post_results[spec.name] = await self._run_spec(
|
||||
spec, config, flow,
|
||||
spec, config,
|
||||
)
|
||||
|
||||
# Cadence selection.
|
||||
|
|
@ -388,7 +396,6 @@ class Processor(AsyncProcessor):
|
|||
|
||||
finally:
|
||||
await self._safe_stop(config)
|
||||
await self._safe_stop(flow)
|
||||
|
||||
await asyncio.sleep(sleep_for)
|
||||
|
||||
|
|
|
|||
|
|
@ -49,53 +49,67 @@ class DefaultFlowStart(Initialiser):
|
|||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
|
||||
# Check whether the flow already exists. Belt-and-braces
|
||||
# beyond the flag gate: if an operator stops and restarts the
|
||||
# bootstrapper after the flow is already running, we don't
|
||||
# want to blindly try to start it again.
|
||||
list_resp = await ctx.flow.request(
|
||||
FlowRequest(
|
||||
operation="list-flows",
|
||||
workspace=self.workspace,
|
||||
),
|
||||
timeout=10,
|
||||
workspaces = await ctx.config.keys(
|
||||
"__workspaces__", "workspace",
|
||||
)
|
||||
if list_resp.error:
|
||||
if self.workspace not in workspaces:
|
||||
raise RuntimeError(
|
||||
f"list-flows failed: "
|
||||
f"{list_resp.error.type}: {list_resp.error.message}"
|
||||
f"Workspace {self.workspace!r} does not exist yet"
|
||||
)
|
||||
|
||||
if self.flow_id in (list_resp.flow_ids or []):
|
||||
flow = ctx.make_flow_client(self.workspace)
|
||||
await flow.start()
|
||||
|
||||
try:
|
||||
|
||||
# Check whether the flow already exists. Belt-and-braces
|
||||
# beyond the flag gate: if an operator stops and restarts the
|
||||
# bootstrapper after the flow is already running, we don't
|
||||
# want to blindly try to start it again.
|
||||
list_resp = await flow.request(
|
||||
FlowRequest(
|
||||
operation="list-flows",
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
if list_resp.error:
|
||||
raise RuntimeError(
|
||||
f"list-flows failed: "
|
||||
f"{list_resp.error.type}: {list_resp.error.message}"
|
||||
)
|
||||
|
||||
if self.flow_id in (list_resp.flow_ids or []):
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} already running in workspace "
|
||||
f"{self.workspace!r}; nothing to do"
|
||||
)
|
||||
return
|
||||
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} already running in workspace "
|
||||
f"{self.workspace!r}; nothing to do"
|
||||
)
|
||||
return
|
||||
|
||||
ctx.logger.info(
|
||||
f"Starting flow {self.flow_id!r} "
|
||||
f"(blueprint={self.blueprint!r}) "
|
||||
f"in workspace {self.workspace!r}"
|
||||
)
|
||||
|
||||
resp = await ctx.flow.request(
|
||||
FlowRequest(
|
||||
operation="start-flow",
|
||||
workspace=self.workspace,
|
||||
flow_id=self.flow_id,
|
||||
blueprint_name=self.blueprint,
|
||||
description=self.description,
|
||||
parameters=self.parameters,
|
||||
),
|
||||
timeout=30,
|
||||
)
|
||||
if resp.error:
|
||||
raise RuntimeError(
|
||||
f"start-flow failed: "
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
f"Starting flow {self.flow_id!r} "
|
||||
f"(blueprint={self.blueprint!r}) "
|
||||
f"in workspace {self.workspace!r}"
|
||||
)
|
||||
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} started"
|
||||
)
|
||||
resp = await flow.request(
|
||||
FlowRequest(
|
||||
operation="start-flow",
|
||||
flow_id=self.flow_id,
|
||||
blueprint_name=self.blueprint,
|
||||
description=self.description,
|
||||
parameters=self.parameters,
|
||||
),
|
||||
timeout=30,
|
||||
)
|
||||
if resp.error:
|
||||
raise RuntimeError(
|
||||
f"start-flow failed: "
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
)
|
||||
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} started"
|
||||
)
|
||||
|
||||
finally:
|
||||
await flow.stop()
|
||||
|
|
|
|||
|
|
@ -2,13 +2,17 @@
|
|||
import logging
|
||||
|
||||
from trustgraph.schema import ConfigResponse
|
||||
from trustgraph.schema import ConfigValue, Error
|
||||
from trustgraph.schema import ConfigValue, WorkspaceChanges, Error
|
||||
|
||||
from ... tables.config import ConfigTableStore
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
WORKSPACES_NAMESPACE = "__workspaces__"
|
||||
WORKSPACE_TYPE = "workspace"
|
||||
TEMPLATE_WORKSPACE = "__template__"
|
||||
|
||||
class Configuration:
|
||||
|
||||
def __init__(self, push, host, username, password, keyspace):
|
||||
|
|
@ -26,9 +30,7 @@ class Configuration:
|
|||
async def get_version(self):
|
||||
return await self.table_store.get_version()
|
||||
|
||||
async def handle_get(self, v):
|
||||
|
||||
workspace = v.workspace
|
||||
async def handle_get(self, v, workspace):
|
||||
|
||||
values = [
|
||||
ConfigValue(
|
||||
|
|
@ -46,18 +48,18 @@ class Configuration:
|
|||
values = values,
|
||||
)
|
||||
|
||||
async def handle_list(self, v):
|
||||
async def handle_list(self, v, workspace):
|
||||
|
||||
return ConfigResponse(
|
||||
version = await self.get_version(),
|
||||
directory = await self.table_store.get_keys(
|
||||
v.workspace, v.type
|
||||
workspace, v.type
|
||||
),
|
||||
)
|
||||
|
||||
async def handle_getvalues(self, v):
|
||||
async def handle_getvalues(self, v, workspace):
|
||||
|
||||
vals = await self.table_store.get_values(v.workspace, v.type)
|
||||
vals = await self.table_store.get_values(workspace, v.type)
|
||||
|
||||
values = map(
|
||||
lambda x: ConfigValue(
|
||||
|
|
@ -93,9 +95,8 @@ class Configuration:
|
|||
values = values,
|
||||
)
|
||||
|
||||
async def handle_delete(self, v):
|
||||
async def handle_delete(self, v, workspace):
|
||||
|
||||
workspace = v.workspace
|
||||
types = list(set(k.type for k in v.keys))
|
||||
|
||||
for k in v.keys:
|
||||
|
|
@ -103,14 +104,22 @@ class Configuration:
|
|||
|
||||
await self.inc_version()
|
||||
|
||||
await self.push(changes={t: [workspace] for t in types})
|
||||
workspace_changes = None
|
||||
if workspace == WORKSPACES_NAMESPACE and WORKSPACE_TYPE in types:
|
||||
deleted = [k.key for k in v.keys if k.type == WORKSPACE_TYPE]
|
||||
if deleted:
|
||||
workspace_changes = WorkspaceChanges(deleted=deleted)
|
||||
|
||||
await self.push(
|
||||
changes={t: [workspace] for t in types},
|
||||
workspace_changes=workspace_changes,
|
||||
)
|
||||
|
||||
return ConfigResponse(
|
||||
)
|
||||
|
||||
async def handle_put(self, v):
|
||||
async def handle_put(self, v, workspace):
|
||||
|
||||
workspace = v.workspace
|
||||
types = list(set(k.type for k in v.values))
|
||||
|
||||
for k in v.values:
|
||||
|
|
@ -120,11 +129,49 @@ class Configuration:
|
|||
|
||||
await self.inc_version()
|
||||
|
||||
await self.push(changes={t: [workspace] for t in types})
|
||||
workspace_changes = None
|
||||
if workspace == WORKSPACES_NAMESPACE and WORKSPACE_TYPE in types:
|
||||
created = [k.key for k in v.values if k.type == WORKSPACE_TYPE]
|
||||
if created:
|
||||
workspace_changes = WorkspaceChanges(created=created)
|
||||
|
||||
await self.push(
|
||||
changes={t: [workspace] for t in types},
|
||||
workspace_changes=workspace_changes,
|
||||
)
|
||||
|
||||
return ConfigResponse(
|
||||
)
|
||||
|
||||
async def provision_from_template(self, workspace):
|
||||
"""Copy all config from __template__ into a new workspace,
|
||||
skipping keys that already exist (upsert-missing)."""
|
||||
|
||||
template = await self.get_config(TEMPLATE_WORKSPACE)
|
||||
|
||||
if not template:
|
||||
logger.info(
|
||||
f"No template config to provision for {workspace}"
|
||||
)
|
||||
return 0
|
||||
|
||||
existing_types = await self.get_config(workspace)
|
||||
|
||||
written = 0
|
||||
for type_name, entries in template.items():
|
||||
existing_keys = set(existing_types.get(type_name, {}).keys())
|
||||
for key, value in entries.items():
|
||||
if key not in existing_keys:
|
||||
await self.table_store.put_config(
|
||||
workspace, type_name, key, value
|
||||
)
|
||||
written += 1
|
||||
|
||||
if written > 0:
|
||||
await self.inc_version()
|
||||
|
||||
return written
|
||||
|
||||
async def get_config(self, workspace):
|
||||
|
||||
table = await self.table_store.get_all_for_workspace(workspace)
|
||||
|
|
@ -138,62 +185,87 @@ class Configuration:
|
|||
|
||||
return config
|
||||
|
||||
async def handle_config(self, v):
|
||||
async def handle_config(self, v, workspace):
|
||||
|
||||
config = await self.get_config(v.workspace)
|
||||
config = await self.get_config(workspace)
|
||||
|
||||
return ConfigResponse(
|
||||
version = await self.get_version(),
|
||||
config = config,
|
||||
)
|
||||
|
||||
async def handle(self, msg):
|
||||
async def handle_workspace(self, msg, workspace):
|
||||
"""Handle workspace-scoped config operations.
|
||||
Workspace is provided by queue infrastructure."""
|
||||
|
||||
logger.debug(
|
||||
f"Handling config message: {msg.operation} "
|
||||
f"workspace={msg.workspace}"
|
||||
f"Handling workspace config message: {msg.operation} "
|
||||
f"workspace={workspace}"
|
||||
)
|
||||
|
||||
# getvalues-all-ws spans all workspaces, so no workspace
|
||||
# required; everything else is workspace-scoped.
|
||||
if msg.operation != "getvalues-all-ws" and not msg.workspace:
|
||||
return ConfigResponse(
|
||||
error=Error(
|
||||
type = "bad-request",
|
||||
message = "Workspace is required"
|
||||
)
|
||||
)
|
||||
|
||||
if msg.operation == "get":
|
||||
|
||||
resp = await self.handle_get(msg)
|
||||
resp = await self.handle_get(msg, workspace)
|
||||
|
||||
elif msg.operation == "list":
|
||||
|
||||
resp = await self.handle_list(msg)
|
||||
resp = await self.handle_list(msg, workspace)
|
||||
|
||||
elif msg.operation == "getvalues":
|
||||
|
||||
resp = await self.handle_getvalues(msg)
|
||||
|
||||
elif msg.operation == "getvalues-all-ws":
|
||||
|
||||
resp = await self.handle_getvalues_all_ws(msg)
|
||||
resp = await self.handle_getvalues(msg, workspace)
|
||||
|
||||
elif msg.operation == "delete":
|
||||
|
||||
resp = await self.handle_delete(msg)
|
||||
resp = await self.handle_delete(msg, workspace)
|
||||
|
||||
elif msg.operation == "put":
|
||||
|
||||
resp = await self.handle_put(msg)
|
||||
resp = await self.handle_put(msg, workspace)
|
||||
|
||||
elif msg.operation == "config":
|
||||
|
||||
resp = await self.handle_config(msg)
|
||||
resp = await self.handle_config(msg, workspace)
|
||||
|
||||
else:
|
||||
resp = ConfigResponse(
|
||||
error=Error(
|
||||
type = "bad-operation",
|
||||
message = "Bad operation"
|
||||
)
|
||||
)
|
||||
|
||||
return resp
|
||||
|
||||
async def handle_system(self, msg):
|
||||
"""Handle system-level config operations.
|
||||
Workspace, when needed, comes from message body."""
|
||||
|
||||
logger.debug(
|
||||
f"Handling system config message: {msg.operation} "
|
||||
f"workspace={msg.workspace}"
|
||||
)
|
||||
|
||||
if msg.operation == "getvalues-all-ws":
|
||||
resp = await self.handle_getvalues_all_ws(msg)
|
||||
|
||||
elif msg.operation in ("get", "list", "getvalues", "delete",
|
||||
"put", "config"):
|
||||
|
||||
if not msg.workspace:
|
||||
return ConfigResponse(
|
||||
error=Error(
|
||||
type = "bad-request",
|
||||
message = "Workspace is required"
|
||||
)
|
||||
)
|
||||
|
||||
handler = {
|
||||
"get": self.handle_get,
|
||||
"list": self.handle_list,
|
||||
"getvalues": self.handle_getvalues,
|
||||
"delete": self.handle_delete,
|
||||
"put": self.handle_put,
|
||||
"config": self.handle_config,
|
||||
}[msg.operation]
|
||||
|
||||
resp = await handler(msg, msg.workspace)
|
||||
|
||||
else:
|
||||
|
||||
resp = ConfigResponse(
|
||||
error=Error(
|
||||
type = "bad-operation",
|
||||
|
|
|
|||
|
|
@ -1,20 +1,30 @@
|
|||
|
||||
"""
|
||||
Config service. Manages system global configuration state
|
||||
Config service. Manages system global configuration state.
|
||||
|
||||
Operates a dual-queue regime:
|
||||
- System queue (config-request): handles cross-workspace operations like
|
||||
getvalues-all-ws and bootstrapper put/delete on __workspaces__.
|
||||
The gateway NEVER routes to this queue.
|
||||
- Per-workspace queues (config-request:<workspace>): handles
|
||||
workspace-scoped operations where workspace identity comes from
|
||||
queue infrastructure, not message body.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from functools import partial
|
||||
|
||||
from trustgraph.schema import Error
|
||||
|
||||
from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush
|
||||
from trustgraph.schema import WorkspaceChanges
|
||||
from trustgraph.schema import config_request_queue, config_response_queue
|
||||
from trustgraph.schema import config_push_queue
|
||||
|
||||
from trustgraph.base import AsyncProcessor, Consumer, Producer
|
||||
from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config
|
||||
|
||||
from . config import Configuration
|
||||
from . config import Configuration, WORKSPACES_NAMESPACE, WORKSPACE_TYPE
|
||||
|
||||
from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
|
||||
from ... base import Consumer, Producer
|
||||
|
|
@ -39,6 +49,11 @@ def is_reserved_workspace(workspace):
|
|||
"""
|
||||
return workspace.startswith("_")
|
||||
|
||||
|
||||
def workspace_queue(base_queue, workspace):
|
||||
return f"{base_queue}:{workspace}"
|
||||
|
||||
|
||||
default_config_request_queue = config_request_queue
|
||||
default_config_response_queue = config_response_queue
|
||||
default_config_push_queue = config_push_queue
|
||||
|
|
@ -48,7 +63,7 @@ default_cassandra_host = "cassandra"
|
|||
class Processor(AsyncProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
|
||||
config_request_queue = params.get(
|
||||
"config_request_queue", default_config_request_queue
|
||||
)
|
||||
|
|
@ -70,7 +85,7 @@ class Processor(AsyncProcessor):
|
|||
password=cassandra_password,
|
||||
default_keyspace="config"
|
||||
)
|
||||
|
||||
|
||||
# Store resolved configuration
|
||||
self.cassandra_host = hosts
|
||||
self.cassandra_username = username
|
||||
|
|
@ -99,17 +114,17 @@ class Processor(AsyncProcessor):
|
|||
processor = self.id, flow = None, name = "config-push"
|
||||
)
|
||||
|
||||
self.config_request_topic = config_request_queue
|
||||
self.config_request_queue_base = config_request_queue
|
||||
self.config_request_subscriber = id
|
||||
|
||||
self.config_request_consumer = Consumer(
|
||||
self.system_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = config_request_queue,
|
||||
subscriber = id,
|
||||
schema = ConfigRequest,
|
||||
handler = self.on_config_request,
|
||||
handler = self.on_system_config_request,
|
||||
metrics = config_request_metrics,
|
||||
)
|
||||
|
||||
|
|
@ -135,20 +150,120 @@ class Processor(AsyncProcessor):
|
|||
push = self.push
|
||||
)
|
||||
|
||||
self.workspace_consumers = {}
|
||||
|
||||
self.register_workspace_handler(self._handle_workspace_changes)
|
||||
|
||||
logger.info("Config service initialized")
|
||||
|
||||
async def _discover_workspaces(self):
|
||||
logger.info("Discovering workspaces from Cassandra...")
|
||||
try:
|
||||
workspaces = await self.config.table_store.get_keys(
|
||||
WORKSPACES_NAMESPACE, WORKSPACE_TYPE
|
||||
)
|
||||
logger.info(f"Discovered workspaces: {workspaces}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Workspace discovery failed: {e}", exc_info=True
|
||||
)
|
||||
return
|
||||
|
||||
for workspace_id in workspaces:
|
||||
if workspace_id not in self.workspace_consumers:
|
||||
await self._add_workspace_consumer(workspace_id)
|
||||
|
||||
async def _handle_workspace_changes(self, workspace_changes):
|
||||
for workspace_id in workspace_changes.created:
|
||||
if workspace_id not in self.workspace_consumers:
|
||||
logger.info(f"Workspace created: {workspace_id}")
|
||||
await self._add_workspace_consumer(workspace_id)
|
||||
await self._provision_workspace(workspace_id)
|
||||
|
||||
for workspace_id in workspace_changes.deleted:
|
||||
if workspace_id in self.workspace_consumers:
|
||||
logger.info(f"Workspace deleted: {workspace_id}")
|
||||
await self._remove_workspace_consumer(workspace_id)
|
||||
|
||||
async def _provision_workspace(self, workspace_id):
|
||||
try:
|
||||
written = await self.config.provision_from_template(
|
||||
workspace_id
|
||||
)
|
||||
if written > 0:
|
||||
logger.info(
|
||||
f"Provisioned workspace {workspace_id} with "
|
||||
f"{written} entries from template"
|
||||
)
|
||||
# Notify other services about the new config
|
||||
types = {}
|
||||
template = await self.config.get_config(workspace_id)
|
||||
for t in template:
|
||||
types[t] = [workspace_id]
|
||||
await self.push(changes=types)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to provision workspace {workspace_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
async def _add_workspace_consumer(self, workspace_id):
|
||||
queue = workspace_queue(
|
||||
self.config_request_queue_base, workspace_id,
|
||||
)
|
||||
|
||||
await self.pubsub.ensure_topic(queue)
|
||||
|
||||
consumer = Consumer(
|
||||
taskgroup=self.taskgroup,
|
||||
backend=self.pubsub,
|
||||
flow=None,
|
||||
topic=queue,
|
||||
subscriber=self.id,
|
||||
schema=ConfigRequest,
|
||||
handler=partial(
|
||||
self.on_workspace_config_request,
|
||||
workspace=workspace_id,
|
||||
),
|
||||
metrics=ConsumerMetrics(
|
||||
processor=self.id, flow=None,
|
||||
name=f"config-request-{workspace_id}",
|
||||
),
|
||||
)
|
||||
|
||||
await consumer.start()
|
||||
self.workspace_consumers[workspace_id] = consumer
|
||||
|
||||
logger.info(
|
||||
f"Subscribed to workspace config queue: {workspace_id}"
|
||||
)
|
||||
|
||||
async def _remove_workspace_consumer(self, workspace_id):
|
||||
consumer = self.workspace_consumers.pop(workspace_id, None)
|
||||
if consumer:
|
||||
await consumer.stop()
|
||||
logger.info(
|
||||
f"Unsubscribed from workspace config queue: {workspace_id}"
|
||||
)
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_topic(self.config_request_topic)
|
||||
await self.pubsub.ensure_topic(self.config_request_queue_base)
|
||||
await self.push() # Startup poke: empty types = everything
|
||||
await self.config_request_consumer.start()
|
||||
await self.system_consumer.start()
|
||||
|
||||
async def push(self, changes=None):
|
||||
# Start the config push subscriber so we receive our own
|
||||
# workspace change notifications.
|
||||
await self.config_sub_task.start()
|
||||
|
||||
await self._discover_workspaces()
|
||||
|
||||
async def push(self, changes=None, workspace_changes=None):
|
||||
|
||||
# Suppress notifications from reserved workspaces (ids starting
|
||||
# with "_", e.g. "__template__"). Stored config is preserved;
|
||||
# only the broadcast is filtered. Keeps services oblivious to
|
||||
# template / bootstrap state.
|
||||
# with "_", e.g. "__template__") for regular config changes.
|
||||
# The __workspaces__ namespace is handled separately via
|
||||
# workspace_changes.
|
||||
if changes:
|
||||
filtered = {}
|
||||
for type_name, workspaces in changes.items():
|
||||
|
|
@ -165,16 +280,20 @@ class Processor(AsyncProcessor):
|
|||
resp = ConfigPush(
|
||||
version = version,
|
||||
changes = changes or {},
|
||||
workspace_changes = workspace_changes,
|
||||
)
|
||||
|
||||
await self.config_push_producer.send(resp)
|
||||
|
||||
logger.info(
|
||||
f"Pushed config poke version {version}, "
|
||||
f"changes={resp.changes}"
|
||||
f"changes={resp.changes}, "
|
||||
f"workspace_changes={resp.workspace_changes}"
|
||||
)
|
||||
|
||||
async def on_config_request(self, msg, consumer, flow):
|
||||
|
||||
async def on_workspace_config_request(
|
||||
self, msg, consumer, flow, *, workspace
|
||||
):
|
||||
|
||||
try:
|
||||
|
||||
|
|
@ -183,16 +302,49 @@ class Processor(AsyncProcessor):
|
|||
# Sender-produced ID
|
||||
id = msg.properties()["id"]
|
||||
|
||||
logger.debug(f"Handling config request {id}...")
|
||||
logger.debug(
|
||||
f"Handling workspace config request {id} "
|
||||
f"workspace={workspace}..."
|
||||
)
|
||||
|
||||
resp = await self.config.handle(v)
|
||||
resp = await self.config.handle_workspace(v, workspace)
|
||||
|
||||
await self.config_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
|
||||
resp = ConfigResponse(
|
||||
error=Error(
|
||||
type = "config-error",
|
||||
message = str(e),
|
||||
),
|
||||
)
|
||||
|
||||
await self.config_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
async def on_system_config_request(self, msg, consumer, flow):
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
|
||||
# Sender-produced ID
|
||||
id = msg.properties()["id"]
|
||||
|
||||
logger.debug(f"Handling system config request {id}...")
|
||||
|
||||
resp = await self.config.handle_system(v)
|
||||
|
||||
await self.config_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
resp = ConfigResponse(
|
||||
error=Error(
|
||||
type = "config-error",
|
||||
|
|
@ -228,4 +380,3 @@ class Processor(AsyncProcessor):
|
|||
def run():
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
||||
|
|
|
|||
|
|
@ -28,12 +28,12 @@ class KnowledgeManager:
|
|||
self.background_task = None
|
||||
self.flow_config = flow_config
|
||||
|
||||
async def delete_kg_core(self, request, respond):
|
||||
async def delete_kg_core(self, request, respond, workspace):
|
||||
|
||||
logger.info("Deleting knowledge core...")
|
||||
|
||||
await self.table_store.delete_kg_core(
|
||||
request.workspace, request.id
|
||||
workspace, request.id
|
||||
)
|
||||
|
||||
await respond(
|
||||
|
|
@ -46,7 +46,7 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
async def get_kg_core(self, request, respond):
|
||||
async def get_kg_core(self, request, respond, workspace):
|
||||
|
||||
logger.info("Getting knowledge core...")
|
||||
|
||||
|
|
@ -61,9 +61,8 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
# Remove doc table row
|
||||
await self.table_store.get_triples(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.id,
|
||||
publish_triples,
|
||||
)
|
||||
|
|
@ -79,9 +78,8 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
# Remove doc table row
|
||||
await self.table_store.get_graph_embeddings(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.id,
|
||||
publish_ge,
|
||||
)
|
||||
|
|
@ -98,9 +96,9 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
async def list_kg_cores(self, request, respond):
|
||||
async def list_kg_cores(self, request, respond, workspace):
|
||||
|
||||
ids = await self.table_store.list_kg_cores(request.workspace)
|
||||
ids = await self.table_store.list_kg_cores(workspace)
|
||||
|
||||
await respond(
|
||||
KnowledgeResponse(
|
||||
|
|
@ -112,9 +110,7 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
async def put_kg_core(self, request, respond):
|
||||
|
||||
workspace = request.workspace
|
||||
async def put_kg_core(self, request, respond, workspace):
|
||||
|
||||
if request.triples:
|
||||
await self.table_store.add_triples(workspace, request.triples)
|
||||
|
|
@ -134,20 +130,18 @@ class KnowledgeManager:
|
|||
)
|
||||
)
|
||||
|
||||
async def load_kg_core(self, request, respond):
|
||||
async def load_kg_core(self, request, respond, workspace):
|
||||
|
||||
if self.background_task is None:
|
||||
self.background_task = asyncio.create_task(
|
||||
self.core_loader()
|
||||
)
|
||||
# Wait for it to start (yuck)
|
||||
# await asyncio.sleep(0.5)
|
||||
|
||||
await self.loader_queue.put((request, respond))
|
||||
await self.loader_queue.put((request, respond, workspace))
|
||||
|
||||
# Not sending a response, the loader thread can do that
|
||||
|
||||
async def unload_kg_core(self, request, respond):
|
||||
async def unload_kg_core(self, request, respond, workspace):
|
||||
|
||||
await respond(
|
||||
KnowledgeResponse(
|
||||
|
|
@ -168,7 +162,7 @@ class KnowledgeManager:
|
|||
while True:
|
||||
|
||||
logger.debug("Waiting for next load...")
|
||||
request, respond = await self.loader_queue.get()
|
||||
request, respond, workspace = await self.loader_queue.get()
|
||||
|
||||
logger.info(f"Loading knowledge: {request.id}")
|
||||
|
||||
|
|
@ -180,7 +174,6 @@ class KnowledgeManager:
|
|||
if request.flow is None:
|
||||
raise RuntimeError("Flow ID must be specified")
|
||||
|
||||
workspace = request.workspace
|
||||
ws_flows = self.flow_config.flows.get(workspace, {})
|
||||
if request.flow not in ws_flows:
|
||||
raise RuntimeError(
|
||||
|
|
@ -262,9 +255,8 @@ class KnowledgeManager:
|
|||
|
||||
logger.debug("Publishing triples...")
|
||||
|
||||
# Remove doc table row
|
||||
await self.table_store.get_triples(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.id,
|
||||
publish_triples,
|
||||
)
|
||||
|
|
@ -277,9 +269,8 @@ class KnowledgeManager:
|
|||
|
||||
logger.debug("Publishing graph embeddings...")
|
||||
|
||||
# Remove doc table row
|
||||
await self.table_store.get_graph_embeddings(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.id,
|
||||
publish_ge,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import base64
|
|||
import json
|
||||
import logging
|
||||
|
||||
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
|
||||
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
|
||||
from .. base import ConsumerMetrics, ProducerMetrics
|
||||
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
|
||||
|
||||
|
|
@ -33,13 +33,18 @@ default_knowledge_response_queue = knowledge_response_queue
|
|||
|
||||
default_cassandra_host = "cassandra"
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
||||
def workspace_queue(base_queue, workspace):
|
||||
return f"{base_queue}:{workspace}"
|
||||
|
||||
|
||||
class Processor(WorkspaceProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
id = params.get("id")
|
||||
|
||||
knowledge_request_queue = params.get(
|
||||
self.knowledge_request_queue_base = params.get(
|
||||
"knowledge_request_queue", default_knowledge_request_queue
|
||||
)
|
||||
|
||||
|
|
@ -51,7 +56,6 @@ class Processor(AsyncProcessor):
|
|||
cassandra_username = params.get("cassandra_username")
|
||||
cassandra_password = params.get("cassandra_password")
|
||||
|
||||
# Resolve configuration with environment variable fallback
|
||||
hosts, username, password, keyspace = resolve_cassandra_config(
|
||||
host=cassandra_host,
|
||||
username=cassandra_username,
|
||||
|
|
@ -59,14 +63,13 @@ class Processor(AsyncProcessor):
|
|||
default_keyspace="knowledge"
|
||||
)
|
||||
|
||||
# Store resolved configuration
|
||||
self.cassandra_host = hosts
|
||||
self.cassandra_username = username
|
||||
self.cassandra_password = password
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"knowledge_request_queue": knowledge_request_queue,
|
||||
"knowledge_request_queue": self.knowledge_request_queue_base,
|
||||
"knowledge_response_queue": knowledge_response_queue,
|
||||
"cassandra_host": self.cassandra_host,
|
||||
"cassandra_username": self.cassandra_username,
|
||||
|
|
@ -74,28 +77,10 @@ class Processor(AsyncProcessor):
|
|||
}
|
||||
)
|
||||
|
||||
knowledge_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "knowledge-request"
|
||||
)
|
||||
|
||||
knowledge_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "knowledge-response"
|
||||
)
|
||||
|
||||
self.knowledge_request_topic = knowledge_request_queue
|
||||
self.knowledge_request_subscriber = id
|
||||
|
||||
self.knowledge_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = knowledge_request_queue,
|
||||
subscriber = id,
|
||||
schema = KnowledgeRequest,
|
||||
handler = self.on_knowledge_request,
|
||||
metrics = knowledge_request_metrics,
|
||||
)
|
||||
|
||||
self.knowledge_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = knowledge_response_queue,
|
||||
|
|
@ -115,13 +100,52 @@ class Processor(AsyncProcessor):
|
|||
|
||||
self.flows = {}
|
||||
|
||||
self.workspace_consumers = {}
|
||||
|
||||
logger.info("Knowledge service initialized")
|
||||
|
||||
async def on_workspace_created(self, workspace):
|
||||
|
||||
if workspace in self.workspace_consumers:
|
||||
return
|
||||
|
||||
queue = workspace_queue(
|
||||
self.knowledge_request_queue_base, workspace,
|
||||
)
|
||||
|
||||
await self.pubsub.ensure_topic(queue)
|
||||
|
||||
consumer = Consumer(
|
||||
taskgroup=self.taskgroup,
|
||||
backend=self.pubsub,
|
||||
flow=None,
|
||||
topic=queue,
|
||||
subscriber=self.id,
|
||||
schema=KnowledgeRequest,
|
||||
handler=partial(
|
||||
self.on_knowledge_request, workspace=workspace,
|
||||
),
|
||||
metrics=ConsumerMetrics(
|
||||
processor=self.id, flow=None,
|
||||
name=f"knowledge-request-{workspace}",
|
||||
),
|
||||
)
|
||||
|
||||
await consumer.start()
|
||||
self.workspace_consumers[workspace] = consumer
|
||||
|
||||
logger.info(f"Subscribed to workspace queue: {workspace}")
|
||||
|
||||
async def on_workspace_deleted(self, workspace):
|
||||
|
||||
consumer = self.workspace_consumers.pop(workspace, None)
|
||||
if consumer:
|
||||
await consumer.stop()
|
||||
logger.info(f"Unsubscribed from workspace queue: {workspace}")
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_topic(self.knowledge_request_topic)
|
||||
await super(Processor, self).start()
|
||||
await self.knowledge_request_consumer.start()
|
||||
await self.knowledge_response_producer.start()
|
||||
|
||||
async def on_knowledge_config(self, workspace, config, version):
|
||||
|
|
@ -140,7 +164,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
logger.debug(f"Flows for {workspace}: {self.flows[workspace]}")
|
||||
|
||||
async def process_request(self, v, id):
|
||||
async def process_request(self, v, id, workspace):
|
||||
|
||||
if v.operation is None:
|
||||
raise RequestError("Null operation")
|
||||
|
|
@ -163,9 +187,9 @@ class Processor(AsyncProcessor):
|
|||
await self.knowledge_response_producer.send(
|
||||
x, { "id": id }
|
||||
)
|
||||
return await impls[v.operation](v, respond)
|
||||
return await impls[v.operation](v, respond, workspace)
|
||||
|
||||
async def on_knowledge_request(self, msg, consumer, flow):
|
||||
async def on_knowledge_request(self, msg, consumer, flow, *, workspace):
|
||||
|
||||
v = msg.value()
|
||||
|
||||
|
|
@ -179,7 +203,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
# We don't send a response back here, the processing
|
||||
# implementation sends whatever it needs to send.
|
||||
await self.process_request(v, id)
|
||||
await self.process_request(v, id, workspace)
|
||||
|
||||
return
|
||||
|
||||
|
|
@ -215,7 +239,7 @@ class Processor(AsyncProcessor):
|
|||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
WorkspaceProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'--knowledge-request-queue',
|
||||
|
|
|
|||
|
|
@ -118,10 +118,10 @@ class FlowConfig:
|
|||
|
||||
return resolved
|
||||
|
||||
async def handle_list_blueprints(self, msg):
|
||||
async def handle_list_blueprints(self, msg, workspace):
|
||||
|
||||
names = list(await self.config.keys(
|
||||
msg.workspace, "flow-blueprint"
|
||||
workspace, "flow-blueprint"
|
||||
))
|
||||
|
||||
return FlowResponse(
|
||||
|
|
@ -129,19 +129,19 @@ class FlowConfig:
|
|||
blueprint_names = names,
|
||||
)
|
||||
|
||||
async def handle_get_blueprint(self, msg):
|
||||
async def handle_get_blueprint(self, msg, workspace):
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
blueprint_definition = await self.config.get(
|
||||
msg.workspace, "flow-blueprint", msg.blueprint_name
|
||||
workspace, "flow-blueprint", msg.blueprint_name
|
||||
),
|
||||
)
|
||||
|
||||
async def handle_put_blueprint(self, msg):
|
||||
async def handle_put_blueprint(self, msg, workspace):
|
||||
|
||||
await self.config.put(
|
||||
msg.workspace, "flow-blueprint",
|
||||
workspace, "flow-blueprint",
|
||||
msg.blueprint_name, msg.blueprint_definition
|
||||
)
|
||||
|
||||
|
|
@ -149,31 +149,31 @@ class FlowConfig:
|
|||
error = None,
|
||||
)
|
||||
|
||||
async def handle_delete_blueprint(self, msg):
|
||||
async def handle_delete_blueprint(self, msg, workspace):
|
||||
|
||||
logger.debug(f"Flow config message: {msg}")
|
||||
|
||||
await self.config.delete(
|
||||
msg.workspace, "flow-blueprint", msg.blueprint_name
|
||||
workspace, "flow-blueprint", msg.blueprint_name
|
||||
)
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
)
|
||||
|
||||
async def handle_list_flows(self, msg):
|
||||
async def handle_list_flows(self, msg, workspace):
|
||||
|
||||
names = list(await self.config.keys(msg.workspace, "flow"))
|
||||
names = list(await self.config.keys(workspace, "flow"))
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
flow_ids = names,
|
||||
)
|
||||
|
||||
async def handle_get_flow(self, msg):
|
||||
async def handle_get_flow(self, msg, workspace):
|
||||
|
||||
flow_data = await self.config.get(
|
||||
msg.workspace, "flow", msg.flow_id
|
||||
workspace, "flow", msg.flow_id
|
||||
)
|
||||
flow = json.loads(flow_data)
|
||||
|
||||
|
|
@ -184,9 +184,7 @@ class FlowConfig:
|
|||
parameters = flow.get("parameters", {}),
|
||||
)
|
||||
|
||||
async def handle_start_flow(self, msg):
|
||||
|
||||
workspace = msg.workspace
|
||||
async def handle_start_flow(self, msg, workspace):
|
||||
|
||||
if msg.blueprint_name is None:
|
||||
raise RuntimeError("No blueprint name")
|
||||
|
|
@ -222,7 +220,7 @@ class FlowConfig:
|
|||
logger.debug(f"Resolved parameters (with defaults): {parameters}")
|
||||
|
||||
# Apply parameter substitution to template replacement function.
|
||||
# {workspace} is substituted from msg.workspace to isolate
|
||||
# {workspace} is substituted from workspace to isolate
|
||||
# queue names across workspaces.
|
||||
def repl_template_with_params(tmp):
|
||||
|
||||
|
|
@ -548,9 +546,7 @@ class FlowConfig:
|
|||
f"attempts: {topic}"
|
||||
)
|
||||
|
||||
async def handle_stop_flow(self, msg):
|
||||
|
||||
workspace = msg.workspace
|
||||
async def handle_stop_flow(self, msg, workspace):
|
||||
|
||||
if msg.flow_id is None:
|
||||
raise RuntimeError("No flow ID")
|
||||
|
|
@ -641,37 +637,29 @@ class FlowConfig:
|
|||
error = None,
|
||||
)
|
||||
|
||||
async def handle(self, msg):
|
||||
async def handle(self, msg, workspace):
|
||||
|
||||
logger.debug(
|
||||
f"Handling flow message: {msg.operation} "
|
||||
f"workspace={msg.workspace}"
|
||||
f"workspace={workspace}"
|
||||
)
|
||||
|
||||
if not msg.workspace:
|
||||
return FlowResponse(
|
||||
error=Error(
|
||||
type="bad-request",
|
||||
message="Workspace is required",
|
||||
),
|
||||
)
|
||||
|
||||
if msg.operation == "list-blueprints":
|
||||
resp = await self.handle_list_blueprints(msg)
|
||||
resp = await self.handle_list_blueprints(msg, workspace)
|
||||
elif msg.operation == "get-blueprint":
|
||||
resp = await self.handle_get_blueprint(msg)
|
||||
resp = await self.handle_get_blueprint(msg, workspace)
|
||||
elif msg.operation == "put-blueprint":
|
||||
resp = await self.handle_put_blueprint(msg)
|
||||
resp = await self.handle_put_blueprint(msg, workspace)
|
||||
elif msg.operation == "delete-blueprint":
|
||||
resp = await self.handle_delete_blueprint(msg)
|
||||
resp = await self.handle_delete_blueprint(msg, workspace)
|
||||
elif msg.operation == "list-flows":
|
||||
resp = await self.handle_list_flows(msg)
|
||||
resp = await self.handle_list_flows(msg, workspace)
|
||||
elif msg.operation == "get-flow":
|
||||
resp = await self.handle_get_flow(msg)
|
||||
resp = await self.handle_get_flow(msg, workspace)
|
||||
elif msg.operation == "start-flow":
|
||||
resp = await self.handle_start_flow(msg)
|
||||
resp = await self.handle_start_flow(msg, workspace)
|
||||
elif msg.operation == "stop-flow":
|
||||
resp = await self.handle_stop_flow(msg)
|
||||
resp = await self.handle_stop_flow(msg, workspace)
|
||||
else:
|
||||
|
||||
resp = FlowResponse(
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ Flow service. Manages flow lifecycle — starting and stopping flows
|
|||
by coordinating with the config service via pub/sub.
|
||||
"""
|
||||
|
||||
from functools import partial
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
|
|
@ -14,7 +15,7 @@ from trustgraph.schema import flow_request_queue, flow_response_queue
|
|||
from trustgraph.schema import ConfigRequest, ConfigResponse
|
||||
from trustgraph.schema import config_request_queue, config_response_queue
|
||||
|
||||
from trustgraph.base import AsyncProcessor, Consumer, Producer
|
||||
from trustgraph.base import WorkspaceProcessor, Consumer, Producer
|
||||
from trustgraph.base import ConsumerMetrics, ProducerMetrics, SubscriberMetrics
|
||||
from trustgraph.base import ConfigClient
|
||||
|
||||
|
|
@ -29,11 +30,15 @@ default_flow_request_queue = flow_request_queue
|
|||
default_flow_response_queue = flow_response_queue
|
||||
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
def workspace_queue(base_queue, workspace):
|
||||
return f"{base_queue}:{workspace}"
|
||||
|
||||
|
||||
class Processor(WorkspaceProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
flow_request_queue = params.get(
|
||||
self.flow_request_queue_base = params.get(
|
||||
"flow_request_queue", default_flow_request_queue
|
||||
)
|
||||
flow_response_queue = params.get(
|
||||
|
|
@ -49,27 +54,10 @@ class Processor(AsyncProcessor):
|
|||
}
|
||||
)
|
||||
|
||||
flow_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-request"
|
||||
)
|
||||
flow_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-response"
|
||||
)
|
||||
|
||||
self.flow_request_topic = flow_request_queue
|
||||
self.flow_request_subscriber = id
|
||||
|
||||
self.flow_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = flow_request_queue,
|
||||
subscriber = id,
|
||||
schema = FlowRequest,
|
||||
handler = self.on_flow_request,
|
||||
metrics = flow_request_metrics,
|
||||
)
|
||||
|
||||
self.flow_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = flow_response_queue,
|
||||
|
|
@ -84,13 +72,6 @@ class Processor(AsyncProcessor):
|
|||
processor=self.id, flow=None, name="config-response",
|
||||
)
|
||||
|
||||
# Unique subscription suffix per process instance. Pulsar's
|
||||
# exclusive subscriptions reject a second consumer on the same
|
||||
# (topic, subscription-name) — so a deterministic name here
|
||||
# collides with its own ghost when the supervisor restarts the
|
||||
# process before Pulsar has timed out the previous session
|
||||
# (ConsumerBusy). Matches the uuid convention used elsewhere
|
||||
# (gateway/config/receiver.py, AsyncProcessor._create_config_client).
|
||||
config_rr_id = str(uuid.uuid4())
|
||||
self.config_client = ConfigClient(
|
||||
backend=self.pubsub,
|
||||
|
|
@ -106,21 +87,58 @@ class Processor(AsyncProcessor):
|
|||
|
||||
self.flow = FlowConfig(self.config_client, self.pubsub)
|
||||
|
||||
self.workspace_consumers = {}
|
||||
|
||||
logger.info("Flow service initialized")
|
||||
|
||||
async def on_workspace_created(self, workspace):
|
||||
|
||||
if workspace in self.workspace_consumers:
|
||||
return
|
||||
|
||||
queue = workspace_queue(
|
||||
self.flow_request_queue_base, workspace,
|
||||
)
|
||||
|
||||
await self.pubsub.ensure_topic(queue)
|
||||
|
||||
consumer = Consumer(
|
||||
taskgroup=self.taskgroup,
|
||||
backend=self.pubsub,
|
||||
flow=None,
|
||||
topic=queue,
|
||||
subscriber=self.id,
|
||||
schema=FlowRequest,
|
||||
handler=partial(
|
||||
self.on_flow_request, workspace=workspace,
|
||||
),
|
||||
metrics=ConsumerMetrics(
|
||||
processor=self.id, flow=None,
|
||||
name=f"flow-request-{workspace}",
|
||||
),
|
||||
)
|
||||
|
||||
await consumer.start()
|
||||
self.workspace_consumers[workspace] = consumer
|
||||
|
||||
logger.info(f"Subscribed to workspace queue: {workspace}")
|
||||
|
||||
async def on_workspace_deleted(self, workspace):
|
||||
|
||||
consumer = self.workspace_consumers.pop(workspace, None)
|
||||
if consumer:
|
||||
await consumer.stop()
|
||||
logger.info(f"Unsubscribed from workspace queue: {workspace}")
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_topic(self.flow_request_topic)
|
||||
await super(Processor, self).start()
|
||||
await self.config_client.start()
|
||||
|
||||
# Discover workspaces with existing flow config and ensure
|
||||
# their topics exist before we start accepting requests.
|
||||
workspaces = await self.config_client.workspaces_for_type("flow")
|
||||
await self.flow.ensure_existing_flow_topics(workspaces)
|
||||
|
||||
await self.flow_request_consumer.start()
|
||||
|
||||
async def on_flow_request(self, msg, consumer, flow):
|
||||
async def on_flow_request(self, msg, consumer, flow, *, workspace):
|
||||
|
||||
try:
|
||||
|
||||
|
|
@ -131,7 +149,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
logger.debug(f"Handling flow request {id}...")
|
||||
|
||||
resp = await self.flow.handle(v)
|
||||
resp = await self.flow.handle(v, workspace)
|
||||
|
||||
await self.flow_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
|
|
@ -155,7 +173,7 @@ class Processor(AsyncProcessor):
|
|||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
WorkspaceProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'--flow-request-queue',
|
||||
|
|
|
|||
|
|
@ -141,6 +141,12 @@ class IamAuth:
|
|||
self._authz_cache: dict[str, tuple[bool, float]] = {}
|
||||
self._authz_cache_lock = asyncio.Lock()
|
||||
|
||||
# Known workspaces, maintained by the config receiver.
|
||||
# enforce_workspace checks this set to reject requests for
|
||||
# non-existent workspaces before routing to a queue that
|
||||
# has no consumer.
|
||||
self.known_workspaces: set[str] = set()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Short-lived client helper. Mirrors the pattern used by the
|
||||
# bootstrap framework and AsyncProcessor: a fresh uuid suffix per
|
||||
|
|
|
|||
|
|
@ -67,12 +67,22 @@ async def enforce(request, auth, capability):
|
|||
return identity
|
||||
|
||||
|
||||
def workspace_not_found():
|
||||
return web.HTTPNotFound(
|
||||
text='{"error":"workspace not found"}',
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
|
||||
async def enforce_workspace(data, identity, auth, capability=None):
|
||||
"""Default-fill the workspace on a request body and (optionally)
|
||||
authorise the caller for ``capability`` against that workspace.
|
||||
|
||||
- Target workspace = ``data["workspace"]`` if supplied, else the
|
||||
caller's bound workspace.
|
||||
- Rejects the request if the resolved workspace is not in
|
||||
``auth.known_workspaces`` (prevents routing to a queue with
|
||||
no consumer).
|
||||
- On success, ``data["workspace"]`` is overwritten with the
|
||||
resolved value so downstream code sees a single canonical
|
||||
address.
|
||||
|
|
@ -92,6 +102,9 @@ async def enforce_workspace(data, identity, auth, capability=None):
|
|||
target = requested or identity.workspace
|
||||
data["workspace"] = target
|
||||
|
||||
if target not in auth.known_workspaces:
|
||||
raise workspace_not_found()
|
||||
|
||||
if capability is not None:
|
||||
await auth.authorise(
|
||||
identity, capability, {"workspace": target}, {},
|
||||
|
|
|
|||
|
|
@ -24,9 +24,10 @@ logger.setLevel(logging.INFO)
|
|||
|
||||
class ConfigReceiver:
|
||||
|
||||
def __init__(self, backend):
|
||||
def __init__(self, backend, auth=None):
|
||||
|
||||
self.backend = backend
|
||||
self.auth = auth
|
||||
|
||||
self.flow_handlers = []
|
||||
|
||||
|
|
@ -54,6 +55,15 @@ class ConfigReceiver:
|
|||
)
|
||||
return
|
||||
|
||||
# Track workspace lifecycle
|
||||
if v.workspace_changes and self.auth:
|
||||
for ws in (v.workspace_changes.created or []):
|
||||
self.auth.known_workspaces.add(ws)
|
||||
logger.info(f"Workspace registered: {ws}")
|
||||
for ws in (v.workspace_changes.deleted or []):
|
||||
self.auth.known_workspaces.discard(ws)
|
||||
logger.info(f"Workspace deregistered: {ws}")
|
||||
|
||||
# Gateway cares about flow config — check if any flow
|
||||
# types changed in any workspace
|
||||
flow_workspaces = changes.get("flow", [])
|
||||
|
|
@ -195,6 +205,33 @@ class ConfigReceiver:
|
|||
try:
|
||||
await client.start()
|
||||
|
||||
# Discover all known workspaces
|
||||
ws_resp = await client.request(
|
||||
ConfigRequest(
|
||||
operation="getvalues",
|
||||
workspace="__workspaces__",
|
||||
type="workspace",
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if ws_resp.error:
|
||||
raise RuntimeError(
|
||||
f"Workspace discovery error: "
|
||||
f"{ws_resp.error.message}"
|
||||
)
|
||||
|
||||
discovered = {
|
||||
v.key for v in ws_resp.values if v.key
|
||||
}
|
||||
|
||||
if self.auth:
|
||||
self.auth.known_workspaces = discovered
|
||||
|
||||
logger.info(
|
||||
f"Known workspaces: {discovered}"
|
||||
)
|
||||
|
||||
# Discover workspaces that have any flow config
|
||||
resp = await client.request(
|
||||
ConfigRequest(
|
||||
|
|
|
|||
|
|
@ -7,6 +7,12 @@ import logging
|
|||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from ... schema import flow_request_queue
|
||||
from ... schema import librarian_request_queue
|
||||
from ... schema import knowledge_request_queue
|
||||
from ... schema import collection_request_queue
|
||||
from ... schema import config_request_queue
|
||||
|
||||
from . config import ConfigRequestor
|
||||
from . flow import FlowRequestor
|
||||
from . iam import IamRequestor
|
||||
|
|
@ -70,15 +76,28 @@ request_response_dispatchers = {
|
|||
"sparql": SparqlQueryRequestor,
|
||||
}
|
||||
|
||||
global_dispatchers = {
|
||||
system_dispatchers = {
|
||||
"iam": IamRequestor,
|
||||
}
|
||||
|
||||
workspace_dispatchers = {
|
||||
"config": ConfigRequestor,
|
||||
"flow": FlowRequestor,
|
||||
"iam": IamRequestor,
|
||||
"librarian": LibrarianRequestor,
|
||||
"knowledge": KnowledgeRequestor,
|
||||
"collection-management": CollectionManagementRequestor,
|
||||
}
|
||||
|
||||
workspace_default_request_queues = {
|
||||
"config": config_request_queue,
|
||||
"flow": flow_request_queue,
|
||||
"librarian": librarian_request_queue,
|
||||
"knowledge": knowledge_request_queue,
|
||||
"collection-management": collection_request_queue,
|
||||
}
|
||||
|
||||
global_dispatchers = {**system_dispatchers, **workspace_dispatchers}
|
||||
|
||||
sender_dispatchers = {
|
||||
"text-load": TextLoad,
|
||||
"document-load": DocumentLoad,
|
||||
|
|
@ -219,11 +238,24 @@ class DispatcherManager:
|
|||
async def process_global_service(self, data, responder, params):
|
||||
|
||||
kind = params.get("kind")
|
||||
return await self.invoke_global_service(data, responder, kind)
|
||||
workspace = params.get("workspace")
|
||||
if not workspace and isinstance(data, dict):
|
||||
workspace = data.get("workspace")
|
||||
return await self.invoke_global_service(
|
||||
data, responder, kind, workspace=workspace,
|
||||
)
|
||||
|
||||
async def invoke_global_service(self, data, responder, kind):
|
||||
async def invoke_global_service(self, data, responder, kind,
|
||||
workspace=None):
|
||||
|
||||
key = (None, kind)
|
||||
if kind in workspace_dispatchers:
|
||||
if not workspace:
|
||||
raise RuntimeError(
|
||||
f"Workspace is required for {kind}"
|
||||
)
|
||||
key = (workspace, kind)
|
||||
else:
|
||||
key = (None, kind)
|
||||
|
||||
if key not in self.dispatchers:
|
||||
async with self.dispatcher_lock:
|
||||
|
|
@ -234,11 +266,21 @@ class DispatcherManager:
|
|||
request_queue = self.queue_overrides[kind].get("request")
|
||||
response_queue = self.queue_overrides[kind].get("response")
|
||||
|
||||
if kind in workspace_dispatchers and workspace:
|
||||
base_queue = (
|
||||
request_queue
|
||||
or workspace_default_request_queues[kind]
|
||||
)
|
||||
request_queue = f"{base_queue}:{workspace}"
|
||||
consumer_name = f"{self.prefix}-{kind}-{workspace}"
|
||||
else:
|
||||
consumer_name = f"{self.prefix}-{kind}-request"
|
||||
|
||||
dispatcher = global_dispatchers[kind](
|
||||
backend = self.backend,
|
||||
timeout = 120,
|
||||
consumer = f"{self.prefix}-{kind}-request",
|
||||
subscriber = f"{self.prefix}-{kind}-request",
|
||||
consumer = consumer_name,
|
||||
subscriber = consumer_name,
|
||||
request_queue = request_queue,
|
||||
response_queue = response_queue,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -190,6 +190,16 @@ class Mux:
|
|||
await self.auth.authorise(
|
||||
self.identity, op.capability, resource, parameters,
|
||||
)
|
||||
except _web.HTTPNotFound:
|
||||
await self.ws.send_json({
|
||||
"id": request_id,
|
||||
"error": {
|
||||
"message": "workspace not found",
|
||||
"type": "workspace-not-found",
|
||||
},
|
||||
"complete": True,
|
||||
})
|
||||
return
|
||||
except _web.HTTPForbidden:
|
||||
await self.ws.send_json({
|
||||
"id": request_id,
|
||||
|
|
@ -310,7 +320,7 @@ class Mux:
|
|||
else:
|
||||
|
||||
await self.dispatcher_manager.invoke_global_service(
|
||||
request, responder, svc
|
||||
request, responder, svc, workspace=workspace,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -116,9 +116,6 @@ def serialize_document_metadata(message):
|
|||
if message.metadata:
|
||||
ret["metadata"] = serialize_subgraph(message.metadata)
|
||||
|
||||
if message.workspace:
|
||||
ret["workspace"] = message.workspace
|
||||
|
||||
if message.tags is not None:
|
||||
ret["tags"] = message.tags
|
||||
|
||||
|
|
@ -140,9 +137,6 @@ def serialize_processing_metadata(message):
|
|||
if message.flow:
|
||||
ret["flow"] = message.flow
|
||||
|
||||
if message.workspace:
|
||||
ret["workspace"] = message.workspace
|
||||
|
||||
if message.collection:
|
||||
ret["collection"] = message.collection
|
||||
|
||||
|
|
@ -160,7 +154,6 @@ def to_document_metadata(x):
|
|||
title = x.get("title", None),
|
||||
comments = x.get("comments", None),
|
||||
metadata = to_subgraph(x["metadata"]),
|
||||
workspace = x.get("workspace", None),
|
||||
tags = x.get("tags", None),
|
||||
)
|
||||
|
||||
|
|
@ -171,7 +164,6 @@ def to_processing_metadata(x):
|
|||
document_id = x.get("document-id", None),
|
||||
time = x.get("time", None),
|
||||
flow = x.get("flow", None),
|
||||
workspace = x.get("workspace", None),
|
||||
collection = x.get("collection", None),
|
||||
tags = x.get("tags", None),
|
||||
)
|
||||
|
|
|
|||
|
|
@ -12,8 +12,8 @@ from . auth_endpoints import AuthEndpoints
|
|||
from . iam_endpoint import IamEndpoint
|
||||
from . registry_endpoint import RegistryRoutedVariableEndpoint
|
||||
|
||||
from .. capabilities import PUBLIC, AUTHENTICATED, auth_failure
|
||||
from .. registry import lookup as _registry_lookup, RequestContext
|
||||
from .. capabilities import PUBLIC, AUTHENTICATED, auth_failure, workspace_not_found
|
||||
from .. registry import lookup as _registry_lookup, RequestContext, ResourceLevel
|
||||
|
||||
from .. dispatch.manager import DispatcherManager
|
||||
|
||||
|
|
@ -77,6 +77,10 @@ class _RoutedVariableEndpoint:
|
|||
identity, op.capability, resource, parameters,
|
||||
)
|
||||
|
||||
ws = resource.get("workspace", "")
|
||||
if ws and ws not in self.auth.known_workspaces:
|
||||
raise workspace_not_found()
|
||||
|
||||
async def responder(x, fin):
|
||||
pass
|
||||
|
||||
|
|
@ -140,6 +144,11 @@ class _RoutedSocketEndpoint:
|
|||
await self.auth.authorise(
|
||||
identity, op.capability, resource, parameters,
|
||||
)
|
||||
|
||||
ws = resource.get("workspace", "")
|
||||
if ws and ws not in self.auth.known_workspaces:
|
||||
raise workspace_not_found()
|
||||
|
||||
except web.HTTPException as e:
|
||||
return e
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ import logging
|
|||
from aiohttp import web
|
||||
|
||||
from .. capabilities import (
|
||||
PUBLIC, AUTHENTICATED, auth_failure,
|
||||
PUBLIC, AUTHENTICATED, auth_failure, workspace_not_found,
|
||||
)
|
||||
from .. registry import lookup, RequestContext
|
||||
from .. registry import lookup, RequestContext, ResourceLevel
|
||||
|
||||
logger = logging.getLogger("registry-endpoint")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
|
@ -107,6 +107,15 @@ class RegistryRoutedVariableEndpoint:
|
|||
if "workspace" in resource:
|
||||
body["workspace"] = resource["workspace"]
|
||||
|
||||
if (
|
||||
op.resource_level in (
|
||||
ResourceLevel.WORKSPACE, ResourceLevel.FLOW,
|
||||
)
|
||||
and resource.get("workspace")
|
||||
not in self.auth.known_workspaces
|
||||
):
|
||||
raise workspace_not_found()
|
||||
|
||||
async def responder(x, fin):
|
||||
pass
|
||||
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class Api:
|
|||
id=config.get("id", "api-gateway"),
|
||||
)
|
||||
|
||||
self.config_receiver = ConfigReceiver(self.pubsub_backend)
|
||||
self.config_receiver = ConfigReceiver(self.pubsub_backend, auth=self.auth)
|
||||
|
||||
# Build queue overrides dictionary from CLI arguments
|
||||
queue_overrides = {}
|
||||
|
|
|
|||
|
|
@ -245,7 +245,8 @@ def _sign_jwt(kid, private_pem, claims):
|
|||
class IamService:
|
||||
|
||||
def __init__(self, host, username, password, keyspace,
|
||||
bootstrap_mode, bootstrap_token=None):
|
||||
bootstrap_mode, bootstrap_token=None,
|
||||
on_workspace_created=None, on_workspace_deleted=None):
|
||||
self.table_store = IamTableStore(
|
||||
host, username, password, keyspace,
|
||||
)
|
||||
|
|
@ -267,6 +268,12 @@ class IamService:
|
|||
self.bootstrap_mode = bootstrap_mode
|
||||
self.bootstrap_token = bootstrap_token
|
||||
|
||||
# Callbacks for workspace lifecycle events. Called after the
|
||||
# workspace is created/deleted in IAM's own store so that the
|
||||
# processor can announce it via the config service.
|
||||
self._on_workspace_created = on_workspace_created
|
||||
self._on_workspace_deleted = on_workspace_deleted
|
||||
|
||||
self._signing_key = None
|
||||
self._signing_key_lock = asyncio.Lock()
|
||||
|
||||
|
|
@ -424,6 +431,9 @@ class IamService:
|
|||
created=now,
|
||||
)
|
||||
|
||||
if self._on_workspace_created:
|
||||
await self._on_workspace_created(DEFAULT_WORKSPACE)
|
||||
|
||||
admin_user_id = str(uuid.uuid4())
|
||||
admin_password = secrets.token_urlsafe(32)
|
||||
await self.table_store.put_user(
|
||||
|
|
@ -904,6 +914,10 @@ class IamService:
|
|||
enabled=v.workspace_record.enabled,
|
||||
created=now,
|
||||
)
|
||||
|
||||
if self._on_workspace_created:
|
||||
await self._on_workspace_created(v.workspace_record.id)
|
||||
|
||||
row = await self.table_store.get_workspace(v.workspace_record.id)
|
||||
return IamResponse(workspace=self._row_to_workspace_record(row))
|
||||
|
||||
|
|
@ -982,6 +996,9 @@ class IamService:
|
|||
for kr in key_rows:
|
||||
await self.table_store.delete_api_key(kr[0])
|
||||
|
||||
if self._on_workspace_deleted:
|
||||
await self._on_workspace_deleted(v.workspace_record.id)
|
||||
|
||||
return IamResponse()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -12,9 +12,13 @@ import os
|
|||
from trustgraph.schema import Error
|
||||
from trustgraph.schema import IamRequest, IamResponse
|
||||
from trustgraph.schema import iam_request_queue, iam_response_queue
|
||||
from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigValue
|
||||
from trustgraph.schema import config_request_queue, config_response_queue
|
||||
|
||||
from trustgraph.base import AsyncProcessor, Consumer, Producer
|
||||
from trustgraph.base import ConsumerMetrics, ProducerMetrics
|
||||
from trustgraph.base.metrics import SubscriberMetrics
|
||||
from trustgraph.base.request_response_spec import RequestResponse
|
||||
from trustgraph.base.cassandra_config import (
|
||||
add_cassandra_args, resolve_cassandra_config,
|
||||
)
|
||||
|
|
@ -147,6 +151,8 @@ class Processor(AsyncProcessor):
|
|||
keyspace=keyspace,
|
||||
bootstrap_mode=self.bootstrap_mode,
|
||||
bootstrap_token=self.bootstrap_token,
|
||||
on_workspace_created=self._announce_workspace_created,
|
||||
on_workspace_deleted=self._announce_workspace_deleted,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
|
|
@ -160,6 +166,87 @@ class Processor(AsyncProcessor):
|
|||
await self.iam.auto_bootstrap_if_token_mode()
|
||||
await self.iam_request_consumer.start()
|
||||
|
||||
def _create_config_client(self):
|
||||
import uuid
|
||||
config_rr_id = str(uuid.uuid4())
|
||||
config_req_metrics = ProducerMetrics(
|
||||
processor=self.id, flow=None, name="config-request",
|
||||
)
|
||||
config_resp_metrics = SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="config-response",
|
||||
)
|
||||
return RequestResponse(
|
||||
backend=self.pubsub,
|
||||
subscription=f"{self.id}--config--{config_rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=config_request_queue,
|
||||
request_schema=ConfigRequest,
|
||||
request_metrics=config_req_metrics,
|
||||
response_topic=config_response_queue,
|
||||
response_schema=ConfigResponse,
|
||||
response_metrics=config_resp_metrics,
|
||||
)
|
||||
|
||||
async def _config_put(self, workspace, type, key, value):
|
||||
client = self._create_config_client()
|
||||
try:
|
||||
await client.start()
|
||||
await client.request(
|
||||
ConfigRequest(
|
||||
operation="put",
|
||||
workspace=workspace,
|
||||
values=[ConfigValue(type=type, key=key, value=value)],
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
finally:
|
||||
await client.stop()
|
||||
|
||||
async def _config_delete(self, workspace, type, key):
|
||||
from trustgraph.schema import ConfigKey
|
||||
client = self._create_config_client()
|
||||
try:
|
||||
await client.start()
|
||||
await client.request(
|
||||
ConfigRequest(
|
||||
operation="delete",
|
||||
workspace=workspace,
|
||||
keys=[ConfigKey(type=type, key=key)],
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
finally:
|
||||
await client.stop()
|
||||
|
||||
async def _announce_workspace_created(self, workspace_id):
|
||||
try:
|
||||
await self._config_put(
|
||||
"__workspaces__", "workspace", workspace_id,
|
||||
'{"enabled": true}',
|
||||
)
|
||||
logger.info(
|
||||
f"Announced workspace creation: {workspace_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to announce workspace creation "
|
||||
f"{workspace_id}: {e}", exc_info=True,
|
||||
)
|
||||
|
||||
async def _announce_workspace_deleted(self, workspace_id):
|
||||
try:
|
||||
await self._config_delete(
|
||||
"__workspaces__", "workspace", workspace_id,
|
||||
)
|
||||
logger.info(
|
||||
f"Announced workspace deletion: {workspace_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to announce workspace deletion "
|
||||
f"{workspace_id}: {e}", exc_info=True,
|
||||
)
|
||||
|
||||
async def on_iam_request(self, msg, consumer, flow):
|
||||
|
||||
id = None
|
||||
|
|
|
|||
|
|
@ -151,21 +151,11 @@ class CollectionManager:
|
|||
logger.error(f"Error ensuring collection exists: {e}")
|
||||
raise e
|
||||
|
||||
async def list_collections(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
|
||||
"""
|
||||
List collections for a user from config service
|
||||
|
||||
Args:
|
||||
request: Collection management request
|
||||
|
||||
Returns:
|
||||
CollectionManagementResponse with list of collections
|
||||
"""
|
||||
async def list_collections(self, request, workspace):
|
||||
try:
|
||||
# Get all collections in this workspace from config service
|
||||
config_request = ConfigRequest(
|
||||
operation='getvalues',
|
||||
workspace=request.workspace,
|
||||
workspace=workspace,
|
||||
type='collection'
|
||||
)
|
||||
|
||||
|
|
@ -210,18 +200,8 @@ class CollectionManager:
|
|||
logger.error(f"Error listing collections: {e}")
|
||||
raise RequestError(f"Failed to list collections: {str(e)}")
|
||||
|
||||
async def update_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
|
||||
"""
|
||||
Update collection metadata via config service (creates if doesn't exist)
|
||||
|
||||
Args:
|
||||
request: Collection management request
|
||||
|
||||
Returns:
|
||||
CollectionManagementResponse with updated collection
|
||||
"""
|
||||
async def update_collection(self, request, workspace):
|
||||
try:
|
||||
# Create metadata from request
|
||||
name = request.name if request.name else request.collection
|
||||
description = request.description if request.description else ""
|
||||
tags = list(request.tags) if request.tags else []
|
||||
|
|
@ -233,10 +213,9 @@ class CollectionManager:
|
|||
tags=tags
|
||||
)
|
||||
|
||||
# Send put request to config service
|
||||
config_request = ConfigRequest(
|
||||
operation='put',
|
||||
workspace=request.workspace,
|
||||
workspace=workspace,
|
||||
values=[ConfigValue(
|
||||
type='collection',
|
||||
key=request.collection,
|
||||
|
|
@ -249,7 +228,7 @@ class CollectionManager:
|
|||
if response.error:
|
||||
raise RuntimeError(f"Config update failed: {response.error.message}")
|
||||
|
||||
logger.info(f"Collection {request.workspace}/{request.collection} updated in config service")
|
||||
logger.info(f"Collection {workspace}/{request.collection} updated in config service")
|
||||
|
||||
# Config service will trigger config push automatically
|
||||
# Storage services will receive update and create/update collections
|
||||
|
|
@ -264,23 +243,13 @@ class CollectionManager:
|
|||
logger.error(f"Error updating collection: {e}")
|
||||
raise RequestError(f"Failed to update collection: {str(e)}")
|
||||
|
||||
async def delete_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
|
||||
"""
|
||||
Delete collection via config service
|
||||
|
||||
Args:
|
||||
request: Collection management request
|
||||
|
||||
Returns:
|
||||
CollectionManagementResponse indicating success or failure
|
||||
"""
|
||||
async def delete_collection(self, request, workspace):
|
||||
try:
|
||||
logger.info(f"Deleting collection {request.workspace}/{request.collection}")
|
||||
logger.info(f"Deleting collection {workspace}/{request.collection}")
|
||||
|
||||
# Send delete request to config service
|
||||
config_request = ConfigRequest(
|
||||
operation='delete',
|
||||
workspace=request.workspace,
|
||||
workspace=workspace,
|
||||
keys=[ConfigKey(type='collection', key=request.collection)]
|
||||
)
|
||||
|
||||
|
|
@ -289,7 +258,7 @@ class CollectionManager:
|
|||
if response.error:
|
||||
raise RuntimeError(f"Config delete failed: {response.error.message}")
|
||||
|
||||
logger.info(f"Collection {request.workspace}/{request.collection} deleted from config service")
|
||||
logger.info(f"Collection {workspace}/{request.collection} deleted from config service")
|
||||
|
||||
# Config service will trigger config push automatically
|
||||
# Storage services will receive update and delete collections
|
||||
|
|
|
|||
|
|
@ -42,13 +42,13 @@ class Librarian:
|
|||
self.load_document = load_document
|
||||
self.min_chunk_size = min_chunk_size
|
||||
|
||||
async def add_document(self, request):
|
||||
async def add_document(self, request, workspace):
|
||||
|
||||
if not request.document_metadata.kind:
|
||||
raise RequestError("Document kind (MIME type) is required")
|
||||
|
||||
if await self.table_store.document_exists(
|
||||
request.document_metadata.workspace,
|
||||
workspace,
|
||||
request.document_metadata.id
|
||||
):
|
||||
raise RuntimeError("Document already exists")
|
||||
|
|
@ -66,19 +66,19 @@ class Librarian:
|
|||
logger.debug("Adding to table...")
|
||||
|
||||
await self.table_store.add_document(
|
||||
request.document_metadata, object_id
|
||||
workspace, request.document_metadata, object_id
|
||||
)
|
||||
|
||||
logger.debug("Add complete")
|
||||
|
||||
return LibrarianResponse()
|
||||
|
||||
async def remove_document(self, request):
|
||||
async def remove_document(self, request, workspace):
|
||||
|
||||
logger.debug("Removing document...")
|
||||
|
||||
if not await self.table_store.document_exists(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id,
|
||||
):
|
||||
raise RuntimeError("Document does not exist")
|
||||
|
|
@ -89,17 +89,17 @@ class Librarian:
|
|||
logger.debug(f"Cascade deleting child document {child.id}")
|
||||
try:
|
||||
child_object_id = await self.table_store.get_document_object_id(
|
||||
child.workspace,
|
||||
workspace,
|
||||
child.id
|
||||
)
|
||||
await self.blob_store.remove(child_object_id)
|
||||
await self.table_store.remove_document(child.workspace, child.id)
|
||||
await self.table_store.remove_document(workspace, child.id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete child document {child.id}: {e}")
|
||||
|
||||
# Now remove the parent document
|
||||
object_id = await self.table_store.get_document_object_id(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id
|
||||
)
|
||||
|
||||
|
|
@ -108,7 +108,7 @@ class Librarian:
|
|||
|
||||
# Remove doc table row
|
||||
await self.table_store.remove_document(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id
|
||||
)
|
||||
|
||||
|
|
@ -116,30 +116,30 @@ class Librarian:
|
|||
|
||||
return LibrarianResponse()
|
||||
|
||||
async def update_document(self, request):
|
||||
async def update_document(self, request, workspace):
|
||||
|
||||
logger.debug("Updating document...")
|
||||
|
||||
# You can't update the document ID, workspace or kind.
|
||||
|
||||
if not await self.table_store.document_exists(
|
||||
request.document_metadata.workspace,
|
||||
workspace,
|
||||
request.document_metadata.id
|
||||
):
|
||||
raise RuntimeError("Document does not exist")
|
||||
|
||||
await self.table_store.update_document(request.document_metadata)
|
||||
await self.table_store.update_document(workspace, request.document_metadata)
|
||||
|
||||
logger.debug("Update complete")
|
||||
|
||||
return LibrarianResponse()
|
||||
|
||||
async def get_document_metadata(self, request):
|
||||
async def get_document_metadata(self, request, workspace):
|
||||
|
||||
logger.debug("Getting document metadata...")
|
||||
|
||||
doc = await self.table_store.get_document(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id
|
||||
)
|
||||
|
||||
|
|
@ -151,12 +151,12 @@ class Librarian:
|
|||
content = None,
|
||||
)
|
||||
|
||||
async def get_document_content(self, request):
|
||||
async def get_document_content(self, request, workspace):
|
||||
|
||||
logger.debug("Getting document content...")
|
||||
|
||||
object_id = await self.table_store.get_document_object_id(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id
|
||||
)
|
||||
|
||||
|
|
@ -172,7 +172,7 @@ class Librarian:
|
|||
content = base64.b64encode(content),
|
||||
)
|
||||
|
||||
async def add_processing(self, request):
|
||||
async def add_processing(self, request, workspace):
|
||||
|
||||
logger.debug("Adding processing metadata...")
|
||||
|
||||
|
|
@ -180,18 +180,18 @@ class Librarian:
|
|||
raise RuntimeError("Collection parameter is required")
|
||||
|
||||
if await self.table_store.processing_exists(
|
||||
request.processing_metadata.workspace,
|
||||
workspace,
|
||||
request.processing_metadata.id
|
||||
):
|
||||
raise RuntimeError("Processing already exists")
|
||||
|
||||
doc = await self.table_store.get_document(
|
||||
request.processing_metadata.workspace,
|
||||
workspace,
|
||||
request.processing_metadata.document_id
|
||||
)
|
||||
|
||||
object_id = await self.table_store.get_document_object_id(
|
||||
request.processing_metadata.workspace,
|
||||
workspace,
|
||||
request.processing_metadata.document_id
|
||||
)
|
||||
|
||||
|
|
@ -203,7 +203,7 @@ class Librarian:
|
|||
|
||||
logger.debug("Adding processing to table...")
|
||||
|
||||
await self.table_store.add_processing(request.processing_metadata)
|
||||
await self.table_store.add_processing(workspace, request.processing_metadata)
|
||||
|
||||
logger.debug("Invoking document processing...")
|
||||
|
||||
|
|
@ -211,25 +211,26 @@ class Librarian:
|
|||
document = doc,
|
||||
processing = request.processing_metadata,
|
||||
content = content,
|
||||
workspace = workspace,
|
||||
)
|
||||
|
||||
logger.debug("Add complete")
|
||||
|
||||
return LibrarianResponse()
|
||||
|
||||
async def remove_processing(self, request):
|
||||
async def remove_processing(self, request, workspace):
|
||||
|
||||
logger.debug("Removing processing metadata...")
|
||||
|
||||
if not await self.table_store.processing_exists(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.processing_id,
|
||||
):
|
||||
raise RuntimeError("Processing object does not exist")
|
||||
|
||||
# Remove doc table row
|
||||
await self.table_store.remove_processing(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.processing_id
|
||||
)
|
||||
|
||||
|
|
@ -237,9 +238,9 @@ class Librarian:
|
|||
|
||||
return LibrarianResponse()
|
||||
|
||||
async def list_documents(self, request):
|
||||
async def list_documents(self, request, workspace):
|
||||
|
||||
docs = await self.table_store.list_documents(request.workspace)
|
||||
docs = await self.table_store.list_documents(workspace)
|
||||
|
||||
# Filter out child documents and answer documents by default
|
||||
include_children = getattr(request, 'include_children', False)
|
||||
|
|
@ -254,9 +255,9 @@ class Librarian:
|
|||
document_metadatas = docs,
|
||||
)
|
||||
|
||||
async def list_processing(self, request):
|
||||
async def list_processing(self, request, workspace):
|
||||
|
||||
procs = await self.table_store.list_processing(request.workspace)
|
||||
procs = await self.table_store.list_processing(workspace)
|
||||
|
||||
return LibrarianResponse(
|
||||
processing_metadatas = procs,
|
||||
|
|
@ -264,7 +265,7 @@ class Librarian:
|
|||
|
||||
# Chunked upload operations
|
||||
|
||||
async def begin_upload(self, request):
|
||||
async def begin_upload(self, request, workspace):
|
||||
"""
|
||||
Initialize a chunked upload session.
|
||||
|
||||
|
|
@ -276,7 +277,7 @@ class Librarian:
|
|||
raise RequestError("Document kind (MIME type) is required")
|
||||
|
||||
if await self.table_store.document_exists(
|
||||
request.document_metadata.workspace,
|
||||
workspace,
|
||||
request.document_metadata.id
|
||||
):
|
||||
raise RequestError("Document already exists")
|
||||
|
|
@ -312,14 +313,13 @@ class Librarian:
|
|||
"kind": request.document_metadata.kind,
|
||||
"title": request.document_metadata.title,
|
||||
"comments": request.document_metadata.comments,
|
||||
"workspace": request.document_metadata.workspace,
|
||||
"tags": request.document_metadata.tags,
|
||||
})
|
||||
|
||||
# Store session in Cassandra
|
||||
await self.table_store.create_upload_session(
|
||||
upload_id=upload_id,
|
||||
workspace=request.document_metadata.workspace,
|
||||
workspace=workspace,
|
||||
document_id=request.document_metadata.id,
|
||||
document_metadata=doc_meta_json,
|
||||
s3_upload_id=s3_upload_id,
|
||||
|
|
@ -338,7 +338,7 @@ class Librarian:
|
|||
total_chunks=total_chunks,
|
||||
)
|
||||
|
||||
async def upload_chunk(self, request):
|
||||
async def upload_chunk(self, request, workspace):
|
||||
"""
|
||||
Upload a single chunk of a document.
|
||||
|
||||
|
|
@ -352,7 +352,7 @@ class Librarian:
|
|||
raise RequestError("Upload session not found or expired")
|
||||
|
||||
# Validate ownership
|
||||
if session["workspace"] != request.workspace:
|
||||
if session["workspace"] != workspace:
|
||||
raise RequestError("Not authorized to upload to this session")
|
||||
|
||||
# Validate chunk index
|
||||
|
|
@ -405,7 +405,7 @@ class Librarian:
|
|||
total_bytes=session["total_size"],
|
||||
)
|
||||
|
||||
async def complete_upload(self, request):
|
||||
async def complete_upload(self, request, workspace):
|
||||
"""
|
||||
Finalize a chunked upload and create the document.
|
||||
|
||||
|
|
@ -419,7 +419,7 @@ class Librarian:
|
|||
raise RequestError("Upload session not found or expired")
|
||||
|
||||
# Validate ownership
|
||||
if session["workspace"] != request.workspace:
|
||||
if session["workspace"] != workspace:
|
||||
raise RequestError("Not authorized to complete this upload")
|
||||
|
||||
# Verify all chunks received
|
||||
|
|
@ -457,13 +457,13 @@ class Librarian:
|
|||
kind=doc_meta_dict["kind"],
|
||||
title=doc_meta_dict.get("title", ""),
|
||||
comments=doc_meta_dict.get("comments", ""),
|
||||
workspace=doc_meta_dict["workspace"],
|
||||
tags=doc_meta_dict.get("tags", []),
|
||||
metadata=[], # Triples not supported in chunked upload yet
|
||||
)
|
||||
|
||||
# Add document to table
|
||||
await self.table_store.add_document(doc_metadata, session["object_id"])
|
||||
workspace = session["workspace"]
|
||||
await self.table_store.add_document(workspace, doc_metadata, session["object_id"])
|
||||
|
||||
# Delete upload session
|
||||
await self.table_store.delete_upload_session(request.upload_id)
|
||||
|
|
@ -476,7 +476,7 @@ class Librarian:
|
|||
object_id=str(session["object_id"]),
|
||||
)
|
||||
|
||||
async def abort_upload(self, request):
|
||||
async def abort_upload(self, request, workspace):
|
||||
"""
|
||||
Cancel a chunked upload and clean up resources.
|
||||
"""
|
||||
|
|
@ -488,7 +488,7 @@ class Librarian:
|
|||
raise RequestError("Upload session not found or expired")
|
||||
|
||||
# Validate ownership
|
||||
if session["workspace"] != request.workspace:
|
||||
if session["workspace"] != workspace:
|
||||
raise RequestError("Not authorized to abort this upload")
|
||||
|
||||
# Abort S3 multipart upload
|
||||
|
|
@ -504,7 +504,7 @@ class Librarian:
|
|||
|
||||
return LibrarianResponse(error=None)
|
||||
|
||||
async def get_upload_status(self, request):
|
||||
async def get_upload_status(self, request, workspace):
|
||||
"""
|
||||
Get the status of an in-progress upload.
|
||||
"""
|
||||
|
|
@ -520,7 +520,7 @@ class Librarian:
|
|||
)
|
||||
|
||||
# Validate ownership
|
||||
if session["workspace"] != request.workspace:
|
||||
if session["workspace"] != workspace:
|
||||
raise RequestError("Not authorized to view this upload")
|
||||
|
||||
chunks_received = session["chunks_received"]
|
||||
|
|
@ -546,13 +546,13 @@ class Librarian:
|
|||
total_bytes=session["total_size"],
|
||||
)
|
||||
|
||||
async def list_uploads(self, request):
|
||||
async def list_uploads(self, request, workspace):
|
||||
"""
|
||||
List all in-progress uploads for a workspace.
|
||||
"""
|
||||
logger.debug(f"Listing uploads for workspace {request.workspace}")
|
||||
logger.debug(f"Listing uploads for workspace {workspace}")
|
||||
|
||||
sessions = await self.table_store.list_upload_sessions(request.workspace)
|
||||
sessions = await self.table_store.list_upload_sessions(workspace)
|
||||
|
||||
upload_sessions = [
|
||||
UploadSession(
|
||||
|
|
@ -575,7 +575,7 @@ class Librarian:
|
|||
|
||||
# Child document operations
|
||||
|
||||
async def add_child_document(self, request):
|
||||
async def add_child_document(self, request, workspace):
|
||||
"""
|
||||
Add a child document linked to a parent document.
|
||||
|
||||
|
|
@ -591,7 +591,7 @@ class Librarian:
|
|||
|
||||
# Verify parent exists
|
||||
if not await self.table_store.document_exists(
|
||||
request.document_metadata.workspace,
|
||||
workspace,
|
||||
request.document_metadata.parent_id
|
||||
):
|
||||
raise RequestError(
|
||||
|
|
@ -599,7 +599,7 @@ class Librarian:
|
|||
)
|
||||
|
||||
if await self.table_store.document_exists(
|
||||
request.document_metadata.workspace,
|
||||
workspace,
|
||||
request.document_metadata.id
|
||||
):
|
||||
raise RequestError("Document already exists")
|
||||
|
|
@ -622,7 +622,7 @@ class Librarian:
|
|||
logger.debug("Adding to table...")
|
||||
|
||||
await self.table_store.add_document(
|
||||
request.document_metadata, object_id
|
||||
workspace, request.document_metadata, object_id
|
||||
)
|
||||
|
||||
logger.debug("Add child document complete")
|
||||
|
|
@ -632,7 +632,7 @@ class Librarian:
|
|||
document_id=request.document_metadata.id,
|
||||
)
|
||||
|
||||
async def list_children(self, request):
|
||||
async def list_children(self, request, workspace):
|
||||
"""
|
||||
List all child documents for a given parent document.
|
||||
"""
|
||||
|
|
@ -645,7 +645,7 @@ class Librarian:
|
|||
document_metadatas=children,
|
||||
)
|
||||
|
||||
async def stream_document(self, request):
|
||||
async def stream_document(self, request, workspace):
|
||||
"""
|
||||
Stream document content in chunks.
|
||||
|
||||
|
|
@ -665,7 +665,7 @@ class Librarian:
|
|||
)
|
||||
|
||||
object_id = await self.table_store.get_document_object_id(
|
||||
request.workspace,
|
||||
workspace,
|
||||
request.document_id
|
||||
)
|
||||
|
||||
|
|
@ -697,4 +697,3 @@ class Librarian:
|
|||
total_bytes=total_size,
|
||||
is_final=is_last,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import json
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
|
||||
from .. base import WorkspaceProcessor, Consumer, Producer, Publisher, Subscriber
|
||||
from .. base import ConsumerMetrics, ProducerMetrics
|
||||
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
|
||||
|
||||
|
|
@ -46,6 +46,9 @@ default_collection_response_queue = collection_response_queue
|
|||
default_config_request_queue = config_request_queue
|
||||
default_config_response_queue = config_response_queue
|
||||
|
||||
def workspace_queue(base_queue, workspace):
|
||||
return f"{base_queue}:{workspace}"
|
||||
|
||||
default_object_store_endpoint = "ceph-rgw:7480"
|
||||
default_object_store_access_key = "object-user"
|
||||
default_object_store_secret_key = "object-password"
|
||||
|
|
@ -56,15 +59,13 @@ default_min_chunk_size = 1 # No minimum by default (for Garage)
|
|||
|
||||
bucket_name = "library"
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
class Processor(WorkspaceProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
id = params.get("id")
|
||||
|
||||
# self.running = True
|
||||
|
||||
librarian_request_queue = params.get(
|
||||
self.librarian_request_queue_base = params.get(
|
||||
"librarian_request_queue", default_librarian_request_queue
|
||||
)
|
||||
|
||||
|
|
@ -72,7 +73,7 @@ class Processor(AsyncProcessor):
|
|||
"librarian_response_queue", default_librarian_response_queue
|
||||
)
|
||||
|
||||
collection_request_queue = params.get(
|
||||
self.collection_request_queue_base = params.get(
|
||||
"collection_request_queue", default_collection_request_queue
|
||||
)
|
||||
|
||||
|
|
@ -130,9 +131,9 @@ class Processor(AsyncProcessor):
|
|||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"librarian_request_queue": librarian_request_queue,
|
||||
"librarian_request_queue": self.librarian_request_queue_base,
|
||||
"librarian_response_queue": librarian_response_queue,
|
||||
"collection_request_queue": collection_request_queue,
|
||||
"collection_request_queue": self.collection_request_queue_base,
|
||||
"collection_response_queue": collection_response_queue,
|
||||
"object_store_endpoint": object_store_endpoint,
|
||||
"object_store_access_key": object_store_access_key,
|
||||
|
|
@ -142,40 +143,14 @@ class Processor(AsyncProcessor):
|
|||
}
|
||||
)
|
||||
|
||||
librarian_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "librarian-request"
|
||||
)
|
||||
|
||||
librarian_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "librarian-response"
|
||||
)
|
||||
|
||||
collection_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "collection-request"
|
||||
)
|
||||
|
||||
collection_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "collection-response"
|
||||
)
|
||||
|
||||
storage_response_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "storage-response"
|
||||
)
|
||||
|
||||
self.librarian_request_topic = librarian_request_queue
|
||||
self.librarian_request_subscriber = id
|
||||
|
||||
self.librarian_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = librarian_request_queue,
|
||||
subscriber = id,
|
||||
schema = LibrarianRequest,
|
||||
handler = self.on_librarian_request,
|
||||
metrics = librarian_request_metrics,
|
||||
)
|
||||
|
||||
self.librarian_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = librarian_response_queue,
|
||||
|
|
@ -183,20 +158,6 @@ class Processor(AsyncProcessor):
|
|||
metrics = librarian_response_metrics,
|
||||
)
|
||||
|
||||
self.collection_request_topic = collection_request_queue
|
||||
self.collection_request_subscriber = id
|
||||
|
||||
self.collection_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = collection_request_queue,
|
||||
subscriber = id,
|
||||
schema = CollectionManagementRequest,
|
||||
handler = self.on_collection_request,
|
||||
metrics = collection_request_metrics,
|
||||
)
|
||||
|
||||
self.collection_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = collection_response_queue,
|
||||
|
|
@ -259,16 +220,80 @@ class Processor(AsyncProcessor):
|
|||
|
||||
self.flows = {}
|
||||
|
||||
# Per-workspace consumers, keyed by workspace id
|
||||
self.workspace_consumers = {}
|
||||
|
||||
logger.info("Librarian service initialized")
|
||||
|
||||
async def on_workspace_created(self, workspace):
|
||||
|
||||
if workspace in self.workspace_consumers:
|
||||
return
|
||||
|
||||
lib_queue = workspace_queue(
|
||||
self.librarian_request_queue_base, workspace,
|
||||
)
|
||||
col_queue = workspace_queue(
|
||||
self.collection_request_queue_base, workspace,
|
||||
)
|
||||
|
||||
await self.pubsub.ensure_topic(lib_queue)
|
||||
await self.pubsub.ensure_topic(col_queue)
|
||||
|
||||
lib_consumer = Consumer(
|
||||
taskgroup=self.taskgroup,
|
||||
backend=self.pubsub,
|
||||
flow=None,
|
||||
topic=lib_queue,
|
||||
subscriber=self.id,
|
||||
schema=LibrarianRequest,
|
||||
handler=partial(
|
||||
self.on_librarian_request, workspace=workspace,
|
||||
),
|
||||
metrics=ConsumerMetrics(
|
||||
processor=self.id, flow=None,
|
||||
name=f"librarian-request-{workspace}",
|
||||
),
|
||||
)
|
||||
|
||||
col_consumer = Consumer(
|
||||
taskgroup=self.taskgroup,
|
||||
backend=self.pubsub,
|
||||
flow=None,
|
||||
topic=col_queue,
|
||||
subscriber=self.id,
|
||||
schema=CollectionManagementRequest,
|
||||
handler=partial(
|
||||
self.on_collection_request, workspace=workspace,
|
||||
),
|
||||
metrics=ConsumerMetrics(
|
||||
processor=self.id, flow=None,
|
||||
name=f"collection-request-{workspace}",
|
||||
),
|
||||
)
|
||||
|
||||
await lib_consumer.start()
|
||||
await col_consumer.start()
|
||||
|
||||
self.workspace_consumers[workspace] = {
|
||||
"librarian": lib_consumer,
|
||||
"collection": col_consumer,
|
||||
}
|
||||
|
||||
logger.info(f"Subscribed to workspace queues: {workspace}")
|
||||
|
||||
async def on_workspace_deleted(self, workspace):
|
||||
|
||||
consumers = self.workspace_consumers.pop(workspace, None)
|
||||
if consumers:
|
||||
for consumer in consumers.values():
|
||||
await consumer.stop()
|
||||
logger.info(f"Unsubscribed from workspace queues: {workspace}")
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_topic(self.librarian_request_topic)
|
||||
await self.pubsub.ensure_topic(self.collection_request_topic)
|
||||
await super(Processor, self).start()
|
||||
await self.librarian_request_consumer.start()
|
||||
await self.librarian_response_producer.start()
|
||||
await self.collection_request_consumer.start()
|
||||
await self.collection_response_producer.start()
|
||||
await self.config_request_producer.start()
|
||||
await self.config_response_consumer.start()
|
||||
|
|
@ -360,13 +385,12 @@ class Processor(AsyncProcessor):
|
|||
finally:
|
||||
await triples_pub.stop()
|
||||
|
||||
async def load_document(self, document, processing, content):
|
||||
async def load_document(self, document, processing, content, workspace):
|
||||
|
||||
logger.debug("Ready for document processing...")
|
||||
|
||||
logger.debug(f"Document: {document}, processing: {processing}, content length: {len(content)}")
|
||||
|
||||
workspace = processing.workspace
|
||||
ws_flows = self.flows.get(workspace, {})
|
||||
if processing.flow not in ws_flows:
|
||||
raise RuntimeError(
|
||||
|
|
@ -429,20 +453,14 @@ class Processor(AsyncProcessor):
|
|||
|
||||
logger.debug("Document submitted")
|
||||
|
||||
async def add_processing_with_collection(self, request):
|
||||
"""
|
||||
Wrapper for add_processing that ensures collection exists
|
||||
"""
|
||||
# Ensure collection exists when processing is added
|
||||
async def add_processing_with_collection(self, request, workspace):
|
||||
if hasattr(request, 'processing_metadata') and request.processing_metadata:
|
||||
workspace = request.processing_metadata.workspace
|
||||
collection = request.processing_metadata.collection
|
||||
await self.collection_manager.ensure_collection_exists(workspace, collection)
|
||||
|
||||
# Call the original add_processing method
|
||||
return await self.librarian.add_processing(request)
|
||||
return await self.librarian.add_processing(request, workspace)
|
||||
|
||||
async def process_request(self, v):
|
||||
async def process_request(self, v, workspace):
|
||||
|
||||
if v.operation is None:
|
||||
raise RequestError("Null operation")
|
||||
|
|
@ -475,9 +493,9 @@ class Processor(AsyncProcessor):
|
|||
if v.operation not in impls:
|
||||
raise RequestError(f"Invalid operation: {v.operation}")
|
||||
|
||||
return await impls[v.operation](v)
|
||||
return await impls[v.operation](v, workspace)
|
||||
|
||||
async def on_librarian_request(self, msg, consumer, flow):
|
||||
async def on_librarian_request(self, msg, consumer, flow, *, workspace):
|
||||
|
||||
v = msg.value()
|
||||
|
||||
|
|
@ -491,14 +509,14 @@ class Processor(AsyncProcessor):
|
|||
|
||||
# Handle streaming operations specially
|
||||
if v.operation == "stream-document":
|
||||
async for resp in self.librarian.stream_document(v):
|
||||
async for resp in self.librarian.stream_document(v, workspace):
|
||||
await self.librarian_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
return
|
||||
|
||||
# Non-streaming operations
|
||||
resp = await self.process_request(v)
|
||||
resp = await self.process_request(v, workspace)
|
||||
|
||||
await self.librarian_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
|
|
@ -535,10 +553,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
logger.debug("Librarian input processing complete")
|
||||
|
||||
async def process_collection_request(self, v):
|
||||
"""
|
||||
Process collection management requests
|
||||
"""
|
||||
async def process_collection_request(self, v, workspace):
|
||||
if v.operation is None:
|
||||
raise RequestError("Null operation")
|
||||
|
||||
|
|
@ -553,19 +568,16 @@ class Processor(AsyncProcessor):
|
|||
if v.operation not in impls:
|
||||
raise RequestError(f"Invalid collection operation: {v.operation}")
|
||||
|
||||
return await impls[v.operation](v)
|
||||
return await impls[v.operation](v, workspace)
|
||||
|
||||
async def on_collection_request(self, msg, consumer, flow):
|
||||
"""
|
||||
Handle collection management request messages
|
||||
"""
|
||||
async def on_collection_request(self, msg, consumer, flow, *, workspace):
|
||||
v = msg.value()
|
||||
id = msg.properties().get("id", "unknown")
|
||||
|
||||
logger.info(f"Handling collection request {id}...")
|
||||
|
||||
try:
|
||||
resp = await self.process_collection_request(v)
|
||||
resp = await self.process_collection_request(v, workspace)
|
||||
await self.collection_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
|
@ -597,7 +609,7 @@ class Processor(AsyncProcessor):
|
|||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
WorkspaceProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'--librarian-request-queue',
|
||||
|
|
|
|||
|
|
@ -312,7 +312,7 @@ class LibraryTableStore:
|
|||
|
||||
return bool(rows)
|
||||
|
||||
async def add_document(self, document, object_id):
|
||||
async def add_document(self, workspace, document, object_id):
|
||||
|
||||
logger.info(f"Adding document {document.id} {object_id}")
|
||||
|
||||
|
|
@ -332,7 +332,7 @@ class LibraryTableStore:
|
|||
self.cassandra,
|
||||
self.insert_document_stmt,
|
||||
(
|
||||
document.id, document.workspace, int(document.time * 1000),
|
||||
document.id, workspace, int(document.time * 1000),
|
||||
document.kind, document.title, document.comments,
|
||||
metadata, document.tags, object_id,
|
||||
parent_id, document_type
|
||||
|
|
@ -344,7 +344,7 @@ class LibraryTableStore:
|
|||
|
||||
logger.debug("Add complete")
|
||||
|
||||
async def update_document(self, document):
|
||||
async def update_document(self, workspace, document):
|
||||
|
||||
logger.info(f"Updating document {document.id}")
|
||||
|
||||
|
|
@ -362,7 +362,7 @@ class LibraryTableStore:
|
|||
(
|
||||
int(document.time * 1000), document.title,
|
||||
document.comments, metadata, document.tags,
|
||||
document.workspace, document.id
|
||||
workspace, document.id
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
|
|
@ -404,7 +404,6 @@ class LibraryTableStore:
|
|||
lst = [
|
||||
DocumentMetadata(
|
||||
id = row[0],
|
||||
workspace = workspace,
|
||||
time = int(time.mktime(row[1].timetuple())),
|
||||
kind = row[2],
|
||||
title = row[3],
|
||||
|
|
@ -446,7 +445,6 @@ class LibraryTableStore:
|
|||
lst = [
|
||||
DocumentMetadata(
|
||||
id = row[0],
|
||||
workspace = row[1],
|
||||
time = int(time.mktime(row[2].timetuple())),
|
||||
kind = row[3],
|
||||
title = row[4],
|
||||
|
|
@ -487,7 +485,6 @@ class LibraryTableStore:
|
|||
for row in rows:
|
||||
doc = DocumentMetadata(
|
||||
id = id,
|
||||
workspace = workspace,
|
||||
time = int(time.mktime(row[0].timetuple())),
|
||||
kind = row[1],
|
||||
title = row[2],
|
||||
|
|
@ -540,7 +537,7 @@ class LibraryTableStore:
|
|||
|
||||
return bool(rows)
|
||||
|
||||
async def add_processing(self, processing):
|
||||
async def add_processing(self, workspace, processing):
|
||||
|
||||
logger.info(f"Adding processing {processing.id}")
|
||||
|
||||
|
|
@ -551,7 +548,7 @@ class LibraryTableStore:
|
|||
(
|
||||
processing.id, processing.document_id,
|
||||
int(processing.time * 1000), processing.flow,
|
||||
processing.workspace, processing.collection,
|
||||
workspace, processing.collection,
|
||||
processing.tags
|
||||
),
|
||||
)
|
||||
|
|
@ -597,7 +594,6 @@ class LibraryTableStore:
|
|||
document_id = row[1],
|
||||
time = int(time.mktime(row[2].timetuple())),
|
||||
flow = row[3],
|
||||
workspace = workspace,
|
||||
collection = row[4],
|
||||
tags = row[5] if row[5] else [],
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue