trustgraph/tests/unit/test_gateway/test_capabilities.py
cybermaggedon 5e28d3cce0
refactor(iam): pluggable IAM regime via authenticate/authorise contract (#853)
The gateway no longer holds any policy state — capability sets, role
definitions, workspace scope rules.  Per the IAM contract it asks the
regime "may this identity perform this capability on this resource?"
per request.  That moves the OSS role-based regime entirely into
iam-svc, which can be replaced (SSO, ABAC, ReBAC) without changing
the gateway, the wire protocol, or backend services.

Contract:
- authenticate(credential) -> Identity (handle, workspace,
  principal_id, source).  No roles, claims, or policy state surface
  to the gateway.
- authorise(identity, capability, resource, parameters) -> (allow,
  ttl).  Cached per-decision (regime TTL clamped above; fail-closed
  on regime errors).
- authorise_many available as a fan-out variant.

Operation registry drives every authorisation decision:
- /api/v1/iam -> IamEndpoint, looks up bare op name (create-user,
  list-workspaces, ...).
- /api/v1/{kind} -> RegistryRoutedVariableEndpoint, <kind>:<op>
  (config:get, flow:list-blueprints, librarian:add-document, ...).
- /api/v1/flow/{flow}/service/{kind} -> flow-service:<kind>.
- /api/v1/flow/{flow}/{import,export}/{kind} ->
  flow-{import,export}:<kind>.
- WS Mux per-frame -> flow-service:<kind>; closes a gap where
  authenticated users could hit any service kind.
85 operations registered across the surface.

JWT carries identity only — sub + workspace.  The roles claim is gone;
the gateway never reads policy state from a credential.

The three coarse *_KIND_CAPABILITY maps are removed.  The registry is
the only source of truth for the capability + resource shape of an
operation.  Tests migrated to the new Identity shape and to
authorise()-mocked auth doubles.

Specs updated: docs/tech-specs/iam-contract.md (Identity surface,
caching, registry-naming conventions), iam.md (JWT shape, gateway
flow, role section reframed as OSS-regime detail), iam-protocol.md
(positioned as one implementation of the contract).
2026-04-28 16:19:41 +01:00

171 lines
5.3 KiB
Python

"""
Tests for gateway/capabilities.py — the thin authorisation surface
under the IAM contract.
The gateway no longer holds policy state (roles, capability sets,
workspace scopes); those live in iam-svc. These tests cover only
what the gateway shim does itself: PUBLIC / AUTHENTICATED short-
circuiting, default-fill of workspace, and forwarding of capability
checks to ``auth.authorise``.
"""
import pytest
from aiohttp import web
from unittest.mock import AsyncMock, MagicMock
from trustgraph.gateway.capabilities import (
PUBLIC, AUTHENTICATED,
enforce, enforce_workspace,
access_denied, auth_failure,
)
# -- test fixtures ---------------------------------------------------------
class _Identity:
"""Stand-in for auth.Identity — under the IAM contract it has
just ``handle``, ``workspace``, ``principal_id``, ``source``."""
def __init__(self, handle="user-1", workspace="default"):
self.handle = handle
self.workspace = workspace
self.principal_id = handle
self.source = "api-key"
def _allow_auth(identity=None):
"""Build an Auth double that authenticates to ``identity`` and
allows every authorise() call."""
auth = MagicMock()
auth.authenticate = AsyncMock(
return_value=identity or _Identity(),
)
auth.authorise = AsyncMock(return_value=None)
return auth
def _deny_auth(identity=None):
"""Build an Auth double that authenticates but denies authorise."""
auth = MagicMock()
auth.authenticate = AsyncMock(
return_value=identity or _Identity(),
)
auth.authorise = AsyncMock(side_effect=access_denied())
return auth
# -- enforce() -------------------------------------------------------------
class TestEnforce:
@pytest.mark.asyncio
async def test_public_returns_none_no_auth(self):
auth = _allow_auth()
result = await enforce(MagicMock(), auth, PUBLIC)
assert result is None
auth.authenticate.assert_not_called()
auth.authorise.assert_not_called()
@pytest.mark.asyncio
async def test_authenticated_skips_authorise(self):
identity = _Identity()
auth = _allow_auth(identity)
result = await enforce(MagicMock(), auth, AUTHENTICATED)
assert result is identity
auth.authenticate.assert_awaited_once()
auth.authorise.assert_not_called()
@pytest.mark.asyncio
async def test_capability_calls_authorise_system_level(self):
identity = _Identity()
auth = _allow_auth(identity)
result = await enforce(MagicMock(), auth, "graph:read")
assert result is identity
auth.authorise.assert_awaited_once_with(
identity, "graph:read", {}, {},
)
@pytest.mark.asyncio
async def test_capability_denied_raises_forbidden(self):
auth = _deny_auth()
with pytest.raises(web.HTTPForbidden):
await enforce(MagicMock(), auth, "users:admin")
# -- enforce_workspace() ---------------------------------------------------
class TestEnforceWorkspace:
@pytest.mark.asyncio
async def test_default_fills_from_identity(self):
data = {"operation": "x"}
auth = _allow_auth()
await enforce_workspace(data, _Identity(workspace="default"), auth)
assert data["workspace"] == "default"
@pytest.mark.asyncio
async def test_caller_supplied_workspace_kept(self):
data = {"workspace": "acme", "operation": "x"}
auth = _allow_auth()
await enforce_workspace(data, _Identity(workspace="default"), auth)
assert data["workspace"] == "acme"
@pytest.mark.asyncio
async def test_no_capability_skips_authorise(self):
data = {"workspace": "default"}
auth = _allow_auth()
await enforce_workspace(data, _Identity(), auth)
auth.authorise.assert_not_called()
@pytest.mark.asyncio
async def test_capability_calls_authorise_with_resource(self):
data = {"workspace": "acme"}
identity = _Identity()
auth = _allow_auth(identity)
await enforce_workspace(
data, identity, auth, capability="graph:read",
)
auth.authorise.assert_awaited_once_with(
identity, "graph:read", {"workspace": "acme"}, {},
)
@pytest.mark.asyncio
async def test_capability_denied_propagates(self):
data = {"workspace": "acme"}
auth = _deny_auth()
with pytest.raises(web.HTTPForbidden):
await enforce_workspace(
data, _Identity(), auth, capability="users:admin",
)
@pytest.mark.asyncio
async def test_non_dict_passthrough(self):
auth = _allow_auth()
result = await enforce_workspace("not-a-dict", _Identity(), auth)
assert result == "not-a-dict"
auth.authorise.assert_not_called()
# -- helpers ---------------------------------------------------------------
class TestResponseHelpers:
def test_auth_failure_is_401(self):
exc = auth_failure()
assert exc.status == 401
assert "auth failure" in exc.text
def test_access_denied_is_403(self):
exc = access_denied()
assert exc.status == 403
assert "access denied" in exc.text
class TestSentinels:
def test_public_and_authenticated_are_distinct(self):
assert PUBLIC != AUTHENTICATED