fix: skip authorise() for AUTHENTICATED/PUBLIC sentinels in WebSocket mux

The mux unconditionally called auth.authorise() for every operation,
passing capability sentinels like AUTHENTICATED ("__authenticated__")
to the IAM regime. Since no role grants "__authenticated__", the regime
denied the request — breaking whoami (and any future AUTHENTICATED-only
operation) over the WebSocket path while the HTTP endpoints worked fine.

Match the guard pattern used by iam_endpoint.py and registry_endpoint.py:
only call authorise() for real capability strings, not sentinels.
This commit is contained in:
Cyber MacGeddon 2026-06-03 09:43:54 +01:00
parent 60f861bac4
commit ac9968b1e0

View file

@ -4,6 +4,8 @@ import queue
import uuid
import logging
from ..capabilities import PUBLIC, AUTHENTICATED
# Module logger
logger = logging.getLogger(__name__)
@ -156,37 +158,41 @@ class Mux:
})
return
# Resolve workspace first (default-fill from the caller's
# bound workspace), then ask the regime to authorise the
# service-level capability against the matched
# operation's resource shape.
# Resolve workspace (default-fill from the caller's
# bound workspace). Workspace resolution applies to all
# operations regardless of capability level.
try:
await enforce_workspace(data, self.identity, self.auth)
if isinstance(inner, dict):
await enforce_workspace(inner, self.identity, self.auth)
if data.get("flow"):
resource = {
"workspace": data.get("workspace", ""),
"flow": data.get("flow", ""),
}
parameters = {}
else:
# Build a minimal RequestContext so the matched
# operation's own extractors decide resource and
# parameters — same path the HTTP endpoints take.
from ..registry import RequestContext
ctx = RequestContext(
body=inner if isinstance(inner, dict) else {},
match_info={},
identity=self.identity,
)
resource = op.extract_resource(ctx)
parameters = op.extract_parameters(ctx)
# Authorisation: capability sentinels short-circuit
# the regime call; capability strings go through
# authorise().
if op.capability not in (PUBLIC, AUTHENTICATED):
if data.get("flow"):
resource = {
"workspace": data.get("workspace", ""),
"flow": data.get("flow", ""),
}
parameters = {}
else:
# Build a minimal RequestContext so the matched
# operation's own extractors decide resource
# and parameters — same path the HTTP
# endpoints take.
from ..registry import RequestContext
ctx = RequestContext(
body=inner if isinstance(inner, dict) else {},
match_info={},
identity=self.identity,
)
resource = op.extract_resource(ctx)
parameters = op.extract_parameters(ctx)
await self.auth.authorise(
self.identity, op.capability, resource, parameters,
)
await self.auth.authorise(
self.identity, op.capability, resource, parameters,
)
except _web.HTTPNotFound:
await self.ws.send_json({
"id": request_id,