iam: self-service ops, optional workspace filters, Mux service routing (#855)

Three threads, all reinforcing the contract's system-level vs.
workspace-association distinction.

WS Mux service routing
- tg-show-flows (and any workspace-level service over the WS) was
  failing with "unknown service" because the post-refactor Mux
  unconditionally looked up flow-service:<kind>.  Now branches on
  the envelope's flow field: with flow → flow-service:<kind>;
  without flow → <kind>:<op> from the inner body; with bare op
  lookup for service=iam.  Resource and parameters come from the
  matched op's own extractors — same path the HTTP endpoints take.

Optional workspace on system-level user/key ops
- list-users returns the deployment-wide list when no workspace is
  supplied, filters when one is.  get-user, update-user,
  disable-user, enable-user, delete-user, reset-password,
  create-api-key, list-api-keys, revoke-api-key all treat workspace
  as an optional integrity check rather than a required argument.
- create-user keeps workspace required — there it's the new user's
  home-workspace binding, a parameter rather than an address.
- API keys reclassified as SYSTEM-level resources.  By the same
  reasoning that makes users system-level, an API key is a
  credential record on a deployment-wide registry; the workspace it
  authenticates to is a property, not a containment.

Self-service surface
- whoami: returns the caller's own user record.  AUTHENTICATED-only;
  no users:read capability required.  Foundation for UI affordances
  that depend on the caller's permissions.
- bootstrap-status: POST /api/v1/auth/bootstrap-status, PUBLIC,
  side-effect-free.  Returns {bootstrap_available: bool} so a
  first-run UI can decide whether to render setup without consuming
  the bootstrap op.
- Gateway now injects actor=identity.handle on every authenticated
  forward to iam-svc (IamEndpoint and WS Mux iam path), overwriting
  any caller-supplied value.  Underpins whoami, audit logging, and
  future regime-side decisions that need actor identity.
- tg-whoami and tg-update-user CLIs.

Spec polish
- iam-contract.md: actor-injection rule documented; whoami /
  bootstrap-status added to operations list; permission-scope
  framing tightened (workspace scope is a property of the grant,
  not the user or role).
- iam.md: self-service section; gateway flow gains the actor-
  injection step; role section reframed so iam-svc constraints
  don't leak into contract-level prose.
- iam-protocol.md: ops table updated for whoami, bootstrap-status,
  optional-workspace pattern; bootstrap_available added to the
  IamResponse listing.
This commit is contained in:
cybermaggedon 2026-04-28 22:13:12 +01:00 committed by GitHub
parent 6302eb8c97
commit 9fc1d4527b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 555 additions and 147 deletions

View file

@ -123,14 +123,31 @@ class Mux:
# Per-service capability gating. Resolved through the
# operation registry so the WS path matches what HTTP
# callers see — same authority, same caps. Service
# kinds that aren't registered are refused.
# callers see — same authority, same caps.
#
# Lookup mirrors the HTTP routing decision in
# ``request_task``: presence of ``flow`` on the envelope
# means a flow-level data-plane service (graph-rag,
# agent, …); absence means a workspace-level service
# (config, flow management, librarian, …) whose specific
# operation is in the inner request body. ``iam`` is
# treated as workspace-level too — its operations are
# registered with bare names, no kind prefix.
from ..registry import lookup as _registry_lookup
from ..capabilities import enforce_workspace
from aiohttp import web as _web
service = data.get("service", "")
op = _registry_lookup(f"flow-service:{service}")
inner = data.get("request") or {}
inner_op = inner.get("operation", "") if isinstance(inner, dict) else ""
if data.get("flow"):
op = _registry_lookup(f"flow-service:{service}")
elif service == "iam":
op = _registry_lookup(inner_op) if inner_op else None
else:
op = _registry_lookup(f"{service}:{inner_op}") if inner_op else None
if op is None:
await self.ws.send_json({
"id": request_id,
@ -142,23 +159,36 @@ class Mux:
})
return
# Workspace + flow form the resource address for a
# flow-level service call. Resolve workspace first
# (default-fill from the caller's bound workspace),
# then ask the regime to authorise the service-level
# capability against that {workspace, flow} resource.
# Resolve workspace first (default-fill from the caller's
# bound workspace), then ask the regime to authorise the
# service-level capability against the matched
# operation's resource shape.
try:
await enforce_workspace(data, self.identity, self.auth)
inner = data.get("request")
if isinstance(inner, dict):
await enforce_workspace(inner, self.identity, self.auth)
resource = {
"workspace": data.get("workspace", ""),
"flow": data.get("flow", ""),
}
if data.get("flow"):
resource = {
"workspace": data.get("workspace", ""),
"flow": data.get("flow", ""),
}
parameters = {}
else:
# Build a minimal RequestContext so the matched
# operation's own extractors decide resource and
# parameters — same path the HTTP endpoints take.
from ..registry import RequestContext
ctx = RequestContext(
body=inner if isinstance(inner, dict) else {},
match_info={},
identity=self.identity,
)
resource = op.extract_resource(ctx)
parameters = op.extract_parameters(ctx)
await self.auth.authorise(
self.identity, op.capability, resource, {},
self.identity, op.capability, resource, parameters,
)
except _web.HTTPForbidden:
await self.ws.send_json({
@ -183,6 +213,17 @@ class Mux:
workspace = data["workspace"]
# Plumb authenticated caller's handle as ``actor`` so
# iam-svc handlers (whoami, future actor-scoped checks)
# know who is calling. Overwrite any caller-supplied
# value so it can't be spoofed over the WS.
if (
service == "iam"
and isinstance(data.get("request"), dict)
and self.identity is not None
):
data["request"]["actor"] = self.identity.handle
await self.q.put((
data["id"],
workspace,

View file

@ -36,6 +36,10 @@ class AuthEndpoints:
app.add_routes([
web.post("/api/v1/auth/login", self.login),
web.post("/api/v1/auth/bootstrap", self.bootstrap),
web.post(
"/api/v1/auth/bootstrap-status",
self.bootstrap_status,
),
web.post(
"/api/v1/auth/change-password",
self.change_password,
@ -83,6 +87,18 @@ class AuthEndpoints:
)
return web.json_response(resp)
async def bootstrap_status(self, request):
"""Public, side-effect-free. Returns ``{"bootstrap_available":
bool}`` so a UI can decide whether to render first-run setup
without invoking the consuming ``bootstrap`` op."""
await enforce(request, self.auth, PUBLIC)
resp = await self._forward({"operation": "bootstrap-status"})
if "error" in resp:
return web.json_response(
{"error": "auth failure"}, status=401,
)
return web.json_response(resp)
async def change_password(self, request):
"""Authenticated (any role). Accepts {current_password,
new_password}; user_id is taken from the authenticated

View file

@ -92,6 +92,14 @@ class IamEndpoint:
identity, op.capability, resource, parameters,
)
# Plumb the authenticated caller's handle through as ``actor``
# so iam-svc handlers (e.g. whoami, future actor-scoped
# checks) know who is making the request. The gateway is
# the only authority for this — body-supplied ``actor``
# values are overwritten so callers can't impersonate.
if identity is not None:
body["actor"] = identity.handle
async def responder(x, fin):
pass

View file

@ -271,27 +271,31 @@ register(Operation(
))
# API keys: workspace-level resource — keys live within a workspace.
# API keys: SYSTEM-level resource — like users, a key record exists
# in the deployment-wide keys registry. The workspace the key
# authenticates to is a property of the record, not a containment;
# it appears as a parameter so the regime can scope the admin's
# authority to issue / list / revoke against it.
register(Operation(
name="create-api-key",
capability="keys:admin",
resource_level=ResourceLevel.WORKSPACE,
extract_resource=_workspace_from_body,
extract_parameters=_no_parameters,
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_workspace_param_only,
))
register(Operation(
name="list-api-keys",
capability="keys:admin",
resource_level=ResourceLevel.WORKSPACE,
extract_resource=_workspace_from_body,
extract_parameters=_no_parameters,
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_workspace_param_only,
))
register(Operation(
name="revoke-api-key",
capability="keys:admin",
resource_level=ResourceLevel.WORKSPACE,
extract_resource=_workspace_from_body,
extract_parameters=_no_parameters,
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_workspace_param_only,
))
@ -370,6 +374,13 @@ register(Operation(
extract_resource=_empty_resource,
extract_parameters=_no_parameters,
))
register(Operation(
name="bootstrap-status",
capability=PUBLIC,
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_no_parameters,
))
register(Operation(
name="change-password",
capability=AUTHENTICATED,
@ -377,6 +388,13 @@ register(Operation(
extract_resource=_empty_resource,
extract_parameters=_no_parameters,
))
register(Operation(
name="whoami",
capability=AUTHENTICATED,
resource_level=ResourceLevel.SYSTEM,
extract_resource=_empty_resource,
extract_parameters=_no_parameters,
))
# ---------------------------------------------------------------------------

View file

@ -280,6 +280,10 @@ class IamService:
try:
if op == "bootstrap":
return await self.handle_bootstrap(v)
if op == "bootstrap-status":
return await self.handle_bootstrap_status(v)
if op == "whoami":
return await self.handle_whoami(v)
if op == "resolve-api-key":
return await self.handle_resolve_api_key(v)
if op == "create-user":
@ -483,6 +487,39 @@ class IamService:
bootstrap_admin_api_key=plaintext,
)
async def handle_whoami(self, v):
"""Return the caller's own user record. ``v.actor`` is the
authenticated identity's handle (the gateway populates it
from ``identity.handle``). No ``users:read`` capability
required every authenticated user can read themselves."""
if not v.actor:
return _err(
"invalid-argument",
"actor required (gateway should populate this)",
)
user_row = await self.table_store.get_user(v.actor)
if user_row is None:
return _err("not-found", "user not found")
return IamResponse(user=self._row_to_user_record(user_row))
async def handle_bootstrap_status(self, v):
"""Probe op: returns whether the deployment is currently in
the unconsumed-bootstrap state (i.e. ``bootstrap`` mode with
empty tables, where an explicit ``bootstrap`` call would
succeed). PUBLIC so a UI can decide whether to render the
first-run setup flow without invoking the side-effectful
``bootstrap`` op.
The information leaked is intentionally narrow: an empty
deployment in bootstrap mode is already inferable (no users,
no logins succeed); this just makes the answer explicit
instead of forcing callers to probe the masked-failure path."""
available = (
self.bootstrap_mode == "bootstrap"
and not await self.table_store.any_workspace_exists()
)
return IamResponse(bootstrap_available=available)
# ------------------------------------------------------------------
# Signing key helpers
# ------------------------------------------------------------------
@ -612,15 +649,22 @@ class IamService:
created=_iso(created),
)
async def _user_in_workspace(self, user_id, workspace):
async def _resolve_user(self, user_id, workspace=None):
"""Return (user_row, error_response_or_None). Loads the user
record, verifies it exists, is enabled, and belongs to
``workspace``. The workspace scope check rejects cross-
workspace admin attempts."""
record by id and (when ``workspace`` is supplied) verifies the
record's home workspace matches.
Workspace is an *optional integrity check* the user record
is system-level, identified by id alone. If the caller asserts
a workspace, we verify; if they omit it, we just return the
record. Authorisation (whether the caller is permitted to
operate on this user) is the gateway's responsibility via the
contract's ``authorise`` call before the handler is reached.
"""
user_row = await self.table_store.get_user(user_id)
if user_row is None:
return None, _err("not-found", "user not found")
if user_row[1] != workspace:
if workspace and user_row[1] != workspace:
return None, _err(
"operation-not-permitted",
"user is in a different workspace",
@ -665,15 +709,10 @@ class IamService:
# ------------------------------------------------------------------
async def handle_reset_password(self, v):
if not v.workspace:
return _err(
"invalid-argument",
"workspace required for reset-password",
)
if not v.user_id:
return _err("invalid-argument", "user_id required")
_, err = await self._user_in_workspace(v.user_id, v.workspace)
_, err = await self._resolve_user(v.user_id, v.workspace or None)
if err is not None:
return err
@ -690,13 +729,11 @@ class IamService:
# ------------------------------------------------------------------
async def handle_get_user(self, v):
if not v.workspace:
return _err("invalid-argument", "workspace required")
if not v.user_id:
return _err("invalid-argument", "user_id required")
user_row, err = await self._user_in_workspace(
v.user_id, v.workspace,
user_row, err = await self._resolve_user(
v.user_id, v.workspace or None,
)
if err is not None:
return err
@ -707,8 +744,6 @@ class IamService:
must_change_password. Username is immutable change it by
creating a new user and disabling the old one. Password
changes go through change-password / reset-password."""
if not v.workspace:
return _err("invalid-argument", "workspace required")
if not v.user_id:
return _err("invalid-argument", "user_id required")
if v.user is None:
@ -719,25 +754,17 @@ class IamService:
"password cannot be changed via update-user; "
"use change-password or reset-password",
)
if v.user.username and v.user.username != "":
# Compare to existing. Username-change not allowed.
existing, err = await self._user_in_workspace(
v.user_id, v.workspace,
existing, err = await self._resolve_user(
v.user_id, v.workspace or None,
)
if err is not None:
return err
if v.user.username and v.user.username != existing[2]:
return _err(
"invalid-argument",
"username is immutable; create a new user instead",
)
if err is not None:
return err
if v.user.username != existing[2]:
return _err(
"invalid-argument",
"username is immutable; create a new user "
"instead",
)
else:
existing, err = await self._user_in_workspace(
v.user_id, v.workspace,
)
if err is not None:
return err
# Carry forward fields the caller didn't provide.
(
@ -774,12 +801,10 @@ class IamService:
async def handle_disable_user(self, v):
"""Soft-delete: set enabled=false and revoke every API key
belonging to the user."""
if not v.workspace:
return _err("invalid-argument", "workspace required")
if not v.user_id:
return _err("invalid-argument", "user_id required")
_, err = await self._user_in_workspace(v.user_id, v.workspace)
_, err = await self._resolve_user(v.user_id, v.workspace or None)
if err is not None:
return err
@ -797,12 +822,10 @@ class IamService:
async def handle_enable_user(self, v):
"""Re-enable a previously disabled user. Does not restore
API keys those have to be re-issued by the admin."""
if not v.workspace:
return _err("invalid-argument", "workspace required")
if not v.user_id:
return _err("invalid-argument", "user_id required")
_, err = await self._user_in_workspace(v.user_id, v.workspace)
_, err = await self._resolve_user(v.user_id, v.workspace or None)
if err is not None:
return err
@ -821,29 +844,30 @@ class IamService:
cover GDPR erasure-style requirements). When audit logging
lands, the decision to delete vs. anonymise referenced audit
rows will need to be revisited."""
if not v.workspace:
return _err("invalid-argument", "workspace required")
if not v.user_id:
return _err("invalid-argument", "user_id required")
user_row, err = await self._user_in_workspace(
v.user_id, v.workspace,
user_row, err = await self._resolve_user(
v.user_id, v.workspace or None,
)
if err is not None:
return err
# user_row indices match get_user columns. Username is [2].
username = user_row[2]
record_workspace = user_row[1]
# Revoke all API keys.
key_rows = await self.table_store.list_api_keys_by_user(v.user_id)
for kr in key_rows:
await self.table_store.delete_api_key(kr[0])
# Remove username lookup.
# Remove username lookup — keyed on (workspace, username),
# so use the resolved workspace from the user record rather
# than relying on the caller-supplied filter.
if username:
await self.table_store.delete_username_lookup(
v.workspace, username,
record_workspace, username,
)
# Remove user record.
@ -1098,12 +1122,15 @@ class IamService:
# ------------------------------------------------------------------
async def handle_list_users(self, v):
if not v.workspace:
return _err(
"invalid-argument", "workspace required for list-users",
)
rows = await self.table_store.list_users_by_workspace(v.workspace)
# System-level operation: workspace, when supplied, is a
# filter on the user record's home-workspace association.
# Empty workspace returns the deployment-wide list — the
# gateway has already authorised the caller's authority to
# see that scope.
if v.workspace:
rows = await self.table_store.list_users_by_workspace(v.workspace)
else:
rows = await self.table_store.list_users()
return IamResponse(
users=[self._row_to_user_record(r) for r in rows],
)
@ -1113,24 +1140,21 @@ class IamService:
# ------------------------------------------------------------------
async def handle_create_api_key(self, v):
if not v.workspace:
return _err(
"invalid-argument", "workspace required for create-api-key",
)
if v.key is None or not v.key.user_id:
return _err("invalid-argument", "key.user_id required")
if not v.key.name:
return _err("invalid-argument", "key.name required")
# Target user must exist and belong to the caller's workspace.
user_row = await self.table_store.get_user(v.key.user_id)
if user_row is None:
return _err("not-found", "user not found")
if user_row[1] != v.workspace:
return _err(
"operation-not-permitted",
"target user is in a different workspace",
)
# API keys are system-level records with a workspace
# association (the user's home workspace). Workspace is an
# optional integrity check on the caller's request — when
# supplied it must match the target user's home workspace;
# when omitted, the user's home workspace is used.
user_row, err = await self._resolve_user(
v.key.user_id, v.workspace or None,
)
if err is not None:
return err
plaintext = _generate_api_key()
key_id = str(uuid.uuid4())
@ -1161,20 +1185,15 @@ class IamService:
# ------------------------------------------------------------------
async def handle_list_api_keys(self, v):
if not v.workspace:
return _err(
"invalid-argument",
"workspace required for list-api-keys",
)
if not v.user_id:
return _err(
"invalid-argument", "user_id required for list-api-keys",
)
# Workspace-scope check: user must live in this workspace.
user_row = await self.table_store.get_user(v.user_id)
if user_row is None or user_row[1] != v.workspace:
return _err("not-found", "user not found in workspace")
# Workspace is an optional integrity check.
_, err = await self._resolve_user(v.user_id, v.workspace or None)
if err is not None:
return err
rows = await self.table_store.list_api_keys_by_user(v.user_id)
return IamResponse(
@ -1186,11 +1205,6 @@ class IamService:
# ------------------------------------------------------------------
async def handle_revoke_api_key(self, v):
if not v.workspace:
return _err(
"invalid-argument",
"workspace required for revoke-api-key",
)
if not v.key_id:
return _err("invalid-argument", "key_id required")
@ -1199,13 +1213,15 @@ class IamService:
return _err("not-found", "api key not found")
key_hash, _id, user_id, _name, _prefix, _expires, _c, _lu = row
# Workspace-scope check via the owning user.
user_row = await self.table_store.get_user(user_id)
if user_row is None or user_row[1] != v.workspace:
return _err(
"operation-not-permitted",
"key belongs to a different workspace",
)
# Workspace is an optional integrity check via the owning user.
if v.workspace:
user_row = await self.table_store.get_user(user_id)
if user_row is None or user_row[1] != v.workspace:
return _err(
"operation-not-permitted",
"key belongs to a different workspace",
)
await self.table_store.delete_api_key(key_hash)
return IamResponse()

View file

@ -167,6 +167,11 @@ class IamTableStore:
roles, enabled, must_change_password, created
FROM iam_users WHERE workspace = ? ALLOW FILTERING
""")
self.list_users_stmt = c.prepare("""
SELECT id, workspace, username, name, email, password_hash,
roles, enabled, must_change_password, created
FROM iam_users
""")
self.put_username_lookup_stmt = c.prepare("""
INSERT INTO iam_users_by_username (workspace, username, user_id)
@ -304,6 +309,15 @@ class IamTableStore:
self.cassandra, self.list_users_by_workspace_stmt, (workspace,),
)
async def list_users(self):
"""List every user across the deployment. Used by the
system-level list-users handler when no workspace filter is
supplied; the gateway has already authorised the call against
the caller's authority."""
return await async_execute(
self.cassandra, self.list_users_stmt, (),
)
async def delete_user(self, id):
await async_execute(
self.cassandra, self.delete_user_stmt, (id,),