api-gateway blocks non-existent workspaces

This commit is contained in:
Cyber MacGeddon 2026-05-02 10:58:33 +01:00
parent 226af71a2f
commit bae11415e1
5 changed files with 68 additions and 2 deletions

View file

@ -141,6 +141,12 @@ class IamAuth:
self._authz_cache: dict[str, tuple[bool, float]] = {}
self._authz_cache_lock = asyncio.Lock()
# Known workspaces, maintained by the config receiver.
# enforce_workspace checks this set to reject requests for
# non-existent workspaces before routing to a queue that
# has no consumer.
self.known_workspaces: set[str] = set()
# ------------------------------------------------------------------
# Short-lived client helper. Mirrors the pattern used by the
# bootstrap framework and AsyncProcessor: a fresh uuid suffix per

View file

@ -67,12 +67,22 @@ async def enforce(request, auth, capability):
return identity
def workspace_not_found():
return web.HTTPNotFound(
text='{"error":"workspace not found"}',
content_type="application/json",
)
async def enforce_workspace(data, identity, auth, capability=None):
"""Default-fill the workspace on a request body and (optionally)
authorise the caller for ``capability`` against that workspace.
- Target workspace = ``data["workspace"]`` if supplied, else the
caller's bound workspace.
- Rejects the request if the resolved workspace is not in
``auth.known_workspaces`` (prevents routing to a queue with
no consumer).
- On success, ``data["workspace"]`` is overwritten with the
resolved value so downstream code sees a single canonical
address.
@ -92,6 +102,9 @@ async def enforce_workspace(data, identity, auth, capability=None):
target = requested or identity.workspace
data["workspace"] = target
if auth.known_workspaces and target not in auth.known_workspaces:
raise workspace_not_found()
if capability is not None:
await auth.authorise(
identity, capability, {"workspace": target}, {},

View file

@ -24,9 +24,10 @@ logger.setLevel(logging.INFO)
class ConfigReceiver:
def __init__(self, backend):
def __init__(self, backend, auth=None):
self.backend = backend
self.auth = auth
self.flow_handlers = []
@ -54,6 +55,15 @@ class ConfigReceiver:
)
return
# Track workspace lifecycle
if v.workspace_changes and self.auth:
for ws in (v.workspace_changes.created or []):
self.auth.known_workspaces.add(ws)
logger.info(f"Workspace registered: {ws}")
for ws in (v.workspace_changes.deleted or []):
self.auth.known_workspaces.discard(ws)
logger.info(f"Workspace deregistered: {ws}")
# Gateway cares about flow config — check if any flow
# types changed in any workspace
flow_workspaces = changes.get("flow", [])
@ -195,6 +205,33 @@ class ConfigReceiver:
try:
await client.start()
# Discover all known workspaces
ws_resp = await client.request(
ConfigRequest(
operation="getvalues",
workspace="__workspaces__",
type="workspace",
),
timeout=10,
)
if ws_resp.error:
raise RuntimeError(
f"Workspace discovery error: "
f"{ws_resp.error.message}"
)
discovered = {
v.key for v in ws_resp.values if v.key
}
if self.auth:
self.auth.known_workspaces = discovered
logger.info(
f"Known workspaces: {discovered}"
)
# Discover workspaces that have any flow config
resp = await client.request(
ConfigRequest(

View file

@ -190,6 +190,16 @@ class Mux:
await self.auth.authorise(
self.identity, op.capability, resource, parameters,
)
except _web.HTTPNotFound:
await self.ws.send_json({
"id": request_id,
"error": {
"message": "workspace not found",
"type": "workspace-not-found",
},
"complete": True,
})
return
except _web.HTTPForbidden:
await self.ws.send_json({
"id": request_id,

View file

@ -68,7 +68,7 @@ class Api:
id=config.get("id", "api-gateway"),
)
self.config_receiver = ConfigReceiver(self.pubsub_backend)
self.config_receiver = ConfigReceiver(self.pubsub_backend, auth=self.auth)
# Build queue overrides dictionary from CLI arguments
queue_overrides = {}