From bae11415e1412b0e237546b4c8327e2defb6821b Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Sat, 2 May 2026 10:58:33 +0100 Subject: [PATCH] api-gateway blocks non-existent workspaces --- trustgraph-flow/trustgraph/gateway/auth.py | 6 +++ .../trustgraph/gateway/capabilities.py | 13 +++++++ .../trustgraph/gateway/config/receiver.py | 39 ++++++++++++++++++- .../trustgraph/gateway/dispatch/mux.py | 10 +++++ trustgraph-flow/trustgraph/gateway/service.py | 2 +- 5 files changed, 68 insertions(+), 2 deletions(-) diff --git a/trustgraph-flow/trustgraph/gateway/auth.py b/trustgraph-flow/trustgraph/gateway/auth.py index 6abcbe15..1309ecfc 100644 --- a/trustgraph-flow/trustgraph/gateway/auth.py +++ b/trustgraph-flow/trustgraph/gateway/auth.py @@ -141,6 +141,12 @@ class IamAuth: self._authz_cache: dict[str, tuple[bool, float]] = {} self._authz_cache_lock = asyncio.Lock() + # Known workspaces, maintained by the config receiver. + # enforce_workspace checks this set to reject requests for + # non-existent workspaces before routing to a queue that + # has no consumer. + self.known_workspaces: set[str] = set() + # ------------------------------------------------------------------ # Short-lived client helper. Mirrors the pattern used by the # bootstrap framework and AsyncProcessor: a fresh uuid suffix per diff --git a/trustgraph-flow/trustgraph/gateway/capabilities.py b/trustgraph-flow/trustgraph/gateway/capabilities.py index 72ca51c7..1c444d5f 100644 --- a/trustgraph-flow/trustgraph/gateway/capabilities.py +++ b/trustgraph-flow/trustgraph/gateway/capabilities.py @@ -67,12 +67,22 @@ async def enforce(request, auth, capability): return identity +def workspace_not_found(): + return web.HTTPNotFound( + text='{"error":"workspace not found"}', + content_type="application/json", + ) + + async def enforce_workspace(data, identity, auth, capability=None): """Default-fill the workspace on a request body and (optionally) authorise the caller for ``capability`` against that workspace. - Target workspace = ``data["workspace"]`` if supplied, else the caller's bound workspace. + - Rejects the request if the resolved workspace is not in + ``auth.known_workspaces`` (prevents routing to a queue with + no consumer). - On success, ``data["workspace"]`` is overwritten with the resolved value so downstream code sees a single canonical address. @@ -92,6 +102,9 @@ async def enforce_workspace(data, identity, auth, capability=None): target = requested or identity.workspace data["workspace"] = target + if auth.known_workspaces and target not in auth.known_workspaces: + raise workspace_not_found() + if capability is not None: await auth.authorise( identity, capability, {"workspace": target}, {}, diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index 5bc781a9..8c42381f 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -24,9 +24,10 @@ logger.setLevel(logging.INFO) class ConfigReceiver: - def __init__(self, backend): + def __init__(self, backend, auth=None): self.backend = backend + self.auth = auth self.flow_handlers = [] @@ -54,6 +55,15 @@ class ConfigReceiver: ) return + # Track workspace lifecycle + if v.workspace_changes and self.auth: + for ws in (v.workspace_changes.created or []): + self.auth.known_workspaces.add(ws) + logger.info(f"Workspace registered: {ws}") + for ws in (v.workspace_changes.deleted or []): + self.auth.known_workspaces.discard(ws) + logger.info(f"Workspace deregistered: {ws}") + # Gateway cares about flow config — check if any flow # types changed in any workspace flow_workspaces = changes.get("flow", []) @@ -195,6 +205,33 @@ class ConfigReceiver: try: await client.start() + # Discover all known workspaces + ws_resp = await client.request( + ConfigRequest( + operation="getvalues", + workspace="__workspaces__", + type="workspace", + ), + timeout=10, + ) + + if ws_resp.error: + raise RuntimeError( + f"Workspace discovery error: " + f"{ws_resp.error.message}" + ) + + discovered = { + v.key for v in ws_resp.values if v.key + } + + if self.auth: + self.auth.known_workspaces = discovered + + logger.info( + f"Known workspaces: {discovered}" + ) + # Discover workspaces that have any flow config resp = await client.request( ConfigRequest( diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index 134c120e..02c0eed2 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -190,6 +190,16 @@ class Mux: await self.auth.authorise( self.identity, op.capability, resource, parameters, ) + except _web.HTTPNotFound: + await self.ws.send_json({ + "id": request_id, + "error": { + "message": "workspace not found", + "type": "workspace-not-found", + }, + "complete": True, + }) + return except _web.HTTPForbidden: await self.ws.send_json({ "id": request_id, diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index f75f3b25..0f6a5070 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -68,7 +68,7 @@ class Api: id=config.get("id", "api-gateway"), ) - self.config_receiver = ConfigReceiver(self.pubsub_backend) + self.config_receiver = ConfigReceiver(self.pubsub_backend, auth=self.auth) # Build queue overrides dictionary from CLI arguments queue_overrides = {}