From 5e28d3cce0231afa24930aa9e2b339e8ef5de153 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 28 Apr 2026 16:19:41 +0100 Subject: [PATCH] refactor(iam): pluggable IAM regime via authenticate/authorise contract (#853) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway no longer holds any policy state — capability sets, role definitions, workspace scope rules. Per the IAM contract it asks the regime "may this identity perform this capability on this resource?" per request. That moves the OSS role-based regime entirely into iam-svc, which can be replaced (SSO, ABAC, ReBAC) without changing the gateway, the wire protocol, or backend services. Contract: - authenticate(credential) -> Identity (handle, workspace, principal_id, source). No roles, claims, or policy state surface to the gateway. - authorise(identity, capability, resource, parameters) -> (allow, ttl). Cached per-decision (regime TTL clamped above; fail-closed on regime errors). - authorise_many available as a fan-out variant. Operation registry drives every authorisation decision: - /api/v1/iam -> IamEndpoint, looks up bare op name (create-user, list-workspaces, ...). - /api/v1/{kind} -> RegistryRoutedVariableEndpoint, : (config:get, flow:list-blueprints, librarian:add-document, ...). - /api/v1/flow/{flow}/service/{kind} -> flow-service:. - /api/v1/flow/{flow}/{import,export}/{kind} -> flow-{import,export}:. - WS Mux per-frame -> flow-service:; closes a gap where authenticated users could hit any service kind. 85 operations registered across the surface. JWT carries identity only — sub + workspace. The roles claim is gone; the gateway never reads policy state from a credential. The three coarse *_KIND_CAPABILITY maps are removed. The registry is the only source of truth for the capability + resource shape of an operation. Tests migrated to the new Identity shape and to authorise()-mocked auth doubles. Specs updated: docs/tech-specs/iam-contract.md (Identity surface, caching, registry-naming conventions), iam.md (JWT shape, gateway flow, role section reframed as OSS-regime detail), iam-protocol.md (positioned as one implementation of the contract). --- docs/tech-specs/capabilities.md | 95 +++- docs/tech-specs/data-ownership-model.md | 32 +- ...nition.md => flow-blueprint-definition.md} | 0 docs/tech-specs/iam-contract.md | 366 +++++++++++++ docs/tech-specs/iam-protocol.md | 48 +- docs/tech-specs/iam.md | 323 ++++++++--- tests/unit/test_gateway/test_auth.py | 165 +++++- tests/unit/test_gateway/test_capabilities.py | 236 ++++---- .../test_gateway/test_endpoint_manager.py | 16 +- .../test_socket_graceful_shutdown.py | 17 +- trustgraph-base/trustgraph/base/iam_client.py | 44 +- .../trustgraph/schema/services/iam.py | 27 + trustgraph-flow/trustgraph/gateway/auth.py | 141 ++++- .../trustgraph/gateway/capabilities.py | 234 ++------ .../trustgraph/gateway/dispatch/mux.py | 51 +- .../gateway/endpoint/auth_endpoints.py | 2 +- .../gateway/endpoint/constant_endpoint.py | 2 +- .../gateway/endpoint/iam_endpoint.py | 106 ++++ .../trustgraph/gateway/endpoint/manager.py | 177 +++--- .../gateway/endpoint/registry_endpoint.py | 123 +++++ .../trustgraph/gateway/endpoint/socket.py | 10 +- .../gateway/endpoint/variable_endpoint.py | 2 +- .../trustgraph/gateway/registry.py | 515 ++++++++++++++++++ trustgraph-flow/trustgraph/iam/service/iam.py | 214 +++++++- 24 files changed, 2359 insertions(+), 587 deletions(-) rename docs/tech-specs/{flow-class-definition.md => flow-blueprint-definition.md} (100%) create mode 100644 docs/tech-specs/iam-contract.md create mode 100644 trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py create mode 100644 trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py create mode 100644 trustgraph-flow/trustgraph/gateway/registry.py diff --git a/docs/tech-specs/capabilities.md b/docs/tech-specs/capabilities.md index 60f5acbf..7717cbc9 100644 --- a/docs/tech-specs/capabilities.md +++ b/docs/tech-specs/capabilities.md @@ -8,22 +8,41 @@ parent: "Tech Specs" ## Overview -Authorisation in TrustGraph is **capability-based**. Every gateway -endpoint maps to exactly one *capability*; a user's roles each grant -a set of capabilities; an authenticated request is permitted when -the required capability is a member of the union of the caller's -role capability sets. +Every gateway endpoint maps to exactly one *capability* — a string +from a closed vocabulary defined in this document. When the +gateway authorises a request, it hands the IAM regime four things: +the authenticated identity, the required capability, the +operation's resource (the structured identifier of what's being +operated on), and the operation's parameters. The IAM regime +decides allow or deny; see the [IAM contract](iam-contract.md) for +the full abstraction. -This document defines the capability vocabulary — the closed list -of capability strings that the gateway recognises — and the -open-source edition's role bundles. +A capability is a **permission**, not a structural classification. +`graph:read` says "the caller may read graphs"; it does not say +where graphs live or how they are addressed. The shape of a +request — whether workspace appears in the URL, the envelope, or +the body, and whether it is a resource address component or an +operation parameter — is determined by what the operation operates +on, not by what permission it requires. Permission and structure +are orthogonal; the contract takes both. -The capability mechanism is shared between open-source and potential -3rd party enterprise capability. The open-source edition ships a -fixed three-role bundle (`reader`, `writer`, `admin`). Enterprise -capability may define additional roles by composing their own -capability bundles from the same vocabulary; no protocol, gateway, -or backend-service change is required. +This document defines: + +- The **capability vocabulary** — the closed list of capability + strings the gateway uses as input to `authorise`. All IAM + regimes share this vocabulary; that's the only schema the + gateway and the IAM regime have to agree on. +- The **open-source role bundles** — the role-and-scope table the + OSS IAM regime uses to answer `authorise` calls. Other regimes + answer the same call differently; the bundles below are an + OSS-specific implementation detail, not a contract assertion. + +A regime may evaluate `authorise` using role bundles (OSS), IdP +group memberships, attribute-based policies, relationship tuples, +or any other mechanism. The gateway is unaware of which. The +capability strings — and the resource component vocabulary the +gateway populates alongside them — are the only thing both sides +have to agree on. ## Motivation @@ -113,19 +132,50 @@ granting `llm` expresses exactly that. An administrator granting `agent` should treat it as a grant of everything the agent composes at deployment time. -### Authorisation evaluation +### Authorisation evaluation (OSS regime) + +This section describes how the OSS IAM regime answers +`authorise(identity, capability, resource, parameters)`. Other +regimes answer the same contract differently; only the inputs (the +capability vocabulary, the resource components, the parameter +shape) are shared. For a request bearing a resolved set of roles -`R = {r1, r2, ...}` against an endpoint that requires capability -`c`: +`R = {r1, r2, ...}`, a required capability `c`, a resource, and +parameters: ``` -allow if c IN union(bundle(r) for r in R) +let target_workspace = + resource.workspace (workspace-/flow-level resources) + or parameters.workspace (system-level resources whose + parameters reference a workspace) + or unset (system-level operations with no + workspace context) + +allow if some role r in R has c in its capability bundle + and (target_workspace is unset + or r's workspace_scope permits target_workspace) ``` -No hierarchy, no precedence, no role-order sensitivity. A user +The OSS regime considers workspace from whichever role it plays in +the operation: + +- For workspace-level and flow-level resources, the workspace lives + in `resource.workspace` and that is what the role's scope is + checked against. +- For system-level resources whose operation parameters reference a + workspace (e.g. `create-user with workspace association W`), + workspace lives in `parameters.workspace` and that is what the + role's scope is checked against. The resource is system-level + (`resource = {}`) but the workspace constraint still bites. +- For system-level operations with no workspace context (e.g. + `bootstrap`, `rotate-signing-key`), the workspace-scope check + collapses — only capability-bundle membership matters. + +No hierarchy, no precedence, no role-order sensitivity. A user with a single role is the common case; a user with multiple roles -gets the union of their bundles. +is allowed if any role independently grants both the capability +and the relevant workspace scope. ### Enforcement boundary @@ -214,5 +264,10 @@ ships that feature. ## References +- [IAM Contract Specification](iam-contract.md) — the abstract + gateway↔IAM regime contract; capability strings are inputs to + `authorise`. - [Identity and Access Management Specification](iam.md) +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. - [Architecture Principles](architecture-principles.md) diff --git a/docs/tech-specs/data-ownership-model.md b/docs/tech-specs/data-ownership-model.md index ea94ec46..b112d195 100644 --- a/docs/tech-specs/data-ownership-model.md +++ b/docs/tech-specs/data-ownership-model.md @@ -22,8 +22,16 @@ are the boundaries around data, and who owns what? A workspace is the primary isolation boundary. It represents an organisation, team, or independent operating unit. All data belongs to -exactly one workspace. Cross-workspace access is never permitted through -the API. +exactly one workspace. + +Cross-workspace access through the API is gated by the IAM regime +(see [`iam-contract.md`](iam-contract.md)). In the OSS distribution, +the role table defined in [`capabilities.md`](capabilities.md) +permits cross-workspace operation only to the `admin` role; the +`reader` and `writer` roles are constrained to a single assigned +workspace per credential. Other regimes can model the relationship +between identity and workspace differently — the gateway makes no +assumption. A workspace owns: - Source documents @@ -279,9 +287,18 @@ A typical workflow: The current codebase uses a `user` field in message metadata and storage partition keys to identify the workspace. The `collection` field -identifies the collection within that workspace. The IAM spec describes -how the gateway maps authenticated credentials to a workspace identity -and sets these fields. +identifies the collection within that workspace. + +The gateway is the single point at which workspace gets stamped onto +outbound pub/sub messages. An incoming credential authenticates to a +workspace (the credential's binding, not a user-to-workspace lookup — +see [`iam-contract.md`](iam-contract.md) and the *Identity surface* +section of [`iam.md`](iam.md)); any caller-supplied workspace on the +request is reconciled against the authenticated identity by the IAM +regime; the resolved value is what the gateway writes into outgoing +messages and the storage layers' partition keys. Backend services +trust the workspace they receive — defense-in-depth happens at the +gateway, not at the bus. For details on how each storage backend implements this scoping, see: @@ -302,7 +319,10 @@ For details on how each storage backend implements this scoping, see: ## References -- [Identity and Access Management](iam.md) +- [IAM Contract](iam-contract.md) — gateway↔IAM regime abstraction. +- [Identity and Access Management](iam.md) — gateway-side framing. +- [Capability Vocabulary](capabilities.md) — capability strings and + the OSS role bundles that decide cross-workspace eligibility. - [Collection Management](collection-management.md) - [Entity-Centric Graph](entity-centric-graph.md) - [Neo4j User Collection Isolation](neo4j-user-collection-isolation.md) diff --git a/docs/tech-specs/flow-class-definition.md b/docs/tech-specs/flow-blueprint-definition.md similarity index 100% rename from docs/tech-specs/flow-class-definition.md rename to docs/tech-specs/flow-blueprint-definition.md diff --git a/docs/tech-specs/iam-contract.md b/docs/tech-specs/iam-contract.md new file mode 100644 index 00000000..3289add1 --- /dev/null +++ b/docs/tech-specs/iam-contract.md @@ -0,0 +1,366 @@ +--- +layout: default +title: "IAM Contract Technical Specification" +parent: "Tech Specs" +--- + +# IAM Contract Technical Specification + +## Overview + +The IAM contract is the abstraction between the API gateway and any +identity / access management regime that fronts it. The gateway +treats IAM as a black box behind two operations — *authenticate* and +*authorise* — plus a small surface of management operations. No +regime-specific concept (roles, scopes, groups, claims, policy +languages) is visible to the gateway, and no gateway-specific +concept (capability vocabulary, request anatomy) is visible to +backend services. + +The TrustGraph open-source distribution ships one IAM regime — a +role-based implementation defined in +[`iam-protocol.md`](iam-protocol.md) — that is one implementation of +this contract. Enterprise editions can replace it with a different +regime (OIDC / SSO, ABAC, ReBAC, external policy engine) without +changing the gateway, the wire protocol, or the backends. + +## Motivation + +Authorisation models vary by deployment. A small team might be +happy with three predefined roles; an enterprise might need group- +mapping from an upstream IdP, attribute-based policies, or +relationship-based access control. Hard-wiring any one of those +into the gateway forces every other regime to either compromise its +model or be re-implemented. + +A narrow contract — "authenticate this credential" and "may this +identity perform this operation on this resource" — captures what +the gateway actually needs to know without committing to a policy +shape. The IAM regime owns the policy decision; the gateway is a +generic enforcement point. + +## Operations + +### `authenticate` + +``` +authenticate(credential: bytes) → Identity | AuthFailure +``` + +Validates a credential the client presented. The gateway treats +the credential as opaque bytes — for the OSS regime today that's +either an API key plaintext or a JWT, but the gateway does not +parse them; the IAM regime decides. + +On success, returns an `Identity`. On any failure the IAM regime +returns the same opaque `AuthFailure` — never a description of which +condition failed. This is the spec's masked-error rule: an +attacker probing the endpoint cannot distinguish "no such key", +"expired", "wrong signature", "revoked", "user disabled", etc. + +### `authorise` + +``` +authorise(identity: Identity, + capability: str, + resource: Resource, + parameters: dict) + → Decision +``` + +Asks whether the identity is permitted to perform the named +capability on the named resource, given the operation's +parameters. Returns `allow` or `deny`. `identity` is whatever +`authenticate` returned for this caller; the gateway never +decomposes it. + +The four arguments separate concerns: + +- **`identity`** — who is asking. +- **`capability`** — what permission they are exercising (e.g. + `users:write`, `graph:read`). Permission, not structure. +- **`resource`** — what is being operated on, as a structured + identifier. See *The Resource model* below. +- **`parameters`** — operation-specific data that the regime may + need to consider beyond the resource identifier. Used when a + decision depends on attributes the request supplies — e.g. an + admin scoped to one workspace creating a user *with workspace + association W*: the resource is the system-level user registry, + and W is a parameter the regime checks against the admin's + scope. + +Different regimes use the four arguments differently — the OSS +regime checks role bundles against the capability and the role's +workspace scope against parameters; an SSO regime might consult an +upstream IdP's group memberships; an ABAC regime evaluates a +policy with all four as inputs. The contract is unchanged. + +### `authorise_many` + +``` +authorise_many(identity: Identity, + checks: list[(str, Resource, dict)]) + → list[Decision] +``` + +Bulk variant of `authorise`. Same semantics, one round-trip for +many decisions. Used when an operation fans out to multiple +resources (e.g. an agent that touches several workspaces) and a +single permission check isn't sufficient. + +`authorise_many` is not just a performance optimisation; it pins +the contract for fan-out operations early, before clients (or +internal callers) build patterns that assume one-permission-check- +per-request. Regimes implement it as a loop over `authorise` +unless they have a more efficient path. + +### Management operations + +Beyond the request-time `authenticate` / `authorise`, the contract +also covers identity-lifecycle and credential-lifecycle operations +that are invoked by administrative requests rather than by the +authentication path. These are regime-specific in detail (an SSO +regime that delegates user management to the IdP may not implement +most of them) but the operation set the gateway can forward is: + +- User management: `create-user`, `list-users`, `get-user`, + `update-user`, `disable-user`, `enable-user`, `delete-user` +- Credential management: `create-api-key`, `list-api-keys`, + `revoke-api-key`, `change-password`, `reset-password` +- Workspace management: `create-workspace`, `list-workspaces`, + `get-workspace`, `update-workspace`, `disable-workspace` +- Session management: `login` +- Key management: `get-signing-key-public`, `rotate-signing-key` +- Bootstrap: `bootstrap` + +A regime that does not support one of these (e.g. an SSO regime +where users are managed in the IdP) returns a defined "not +supported" error; the gateway surfaces it as a 501. + +## The `Identity` surface + +`Identity` is *mostly* opaque. The gateway holds the value as a +token to quote back when calling `authorise`, never decomposing it. +But there are a few gateway-side concerns that need a small +surface: + +| Field | Purpose | +|---|---| +| `handle` | Opaque reference passed back to `authorise`. Regime-defined; gateway treats as a string. | +| `workspace` | The workspace this credential authenticates to. Used by the gateway only as a default-fill-in for operations that omit a workspace. Never used as policy input — when authorisation needs to know which workspace the operation acts on, the operation places it in the resource address (or a parameter), and the regime decides. | +| `principal_id` | Stable identifier the gateway logs for audit (a user id, a sub claim, a service account id). Never used for authorisation — that's `authorise`'s job. | +| `source` | How the credential was presented (`api-key`, `jwt`, …). Non-policy; useful for logs and metrics only. | + +Anything else — roles, claims, group memberships, policy attributes +— stays inside the regime and is reachable only via `authorise`. + +## The `Resource` model + +A `Resource` is a structured value identifying *what is being +operated on*. Resources live at one of three levels in TrustGraph, +based on where the resource exists in the deployment: + +### Resource levels + +| Level | What lives there | Resource shape | +|---|---|---| +| **System** | The user registry, the workspace registry, the signing key, the audit log — anything that exists once per deployment. | `{}` | +| **Workspace** | A workspace's config, flow definitions, library (documents), knowledge cores, collections — things that exist *within* a workspace. | `{workspace: "..."}` | +| **Flow** | A flow's knowledge graph, agent state, LLM context, embedding state, MCP context — things that exist *within* a flow within a workspace. | `{workspace: "...", flow: "..."}` | + +Note carefully: + +- **Users are a system-level resource.** A user record exists at + the deployment level; the fact that a user has a *workspace + association* (one in OSS, possibly many in other regimes) is a + property of the user record, not a containment. Operations on + the user registry have `resource = {}`; the workspace + association appears as a *parameter*, not as a resource address + component. +- **Workspaces themselves are a system-level resource.** The + workspace registry exists at the deployment level. `create- + workspace` and `list-workspaces` are system-level operations; + the workspace identifier in their bodies is a parameter, not an + address. +- **A workspace's contents are workspace-level resources.** A + workspace's config, flows, library, etc. live within a + workspace. Their resource address is `{workspace: ...}`. +- **A flow's contents are flow-level resources.** Knowledge + graphs, agents, etc. live within a flow. Their resource + address is `{workspace: ..., flow: ...}`. + +### Component vocabulary + +| Component | Type | Meaning | Used by | +|---|---|---|---| +| `workspace` | string | Identifier of the workspace whose contents are being operated on | workspace-level and flow-level resource addresses | +| `flow` | string | Identifier of a flow within a workspace; always paired with `workspace` | flow-level resource addresses | +| `collection` | string | Reserved for finer-grained scoping within a workspace | future / enterprise | +| `document` | string | Reserved for per-document scoping | future / enterprise | + +A `Resource` is a partial mapping of these components to values. +The level of the resource (system / workspace / flow) determines +which components must be present. An empty `{}` is the +system-level resource. + +### Workspace as parameter vs. address + +Workspace plays two distinct roles in operations and shows up in +two distinct places: + +- **As a resource address component** — workspace identifies the + thing being operated on. Lives in `resource.workspace`. Example: + `config:read` reads the config *of* workspace W. +- **As an operation parameter** — workspace is data the operation + acts on or filters by, while the resource itself is system-level. + Lives in `parameters.workspace`. Example: `users:write` + creates a user *with workspace association* W; the resource is + the user registry (system), and W is a parameter. + +These are not interchangeable. The IAM regime considers each role +separately; the OSS role table, for instance, applies workspace- +scope to the address component when checking workspace-level +operations, and to a parameter when checking +"create-user-with-workspace-W". Both end up enforcing the admin's +scope, but through different code paths. + +### Extension rules + +The vocabulary is closed but extensible. Adding a new component: + +1. The component is added to the vocabulary in this spec, with a + defined name, type, and meaning. +2. Existing IAM regimes ignore unknown components (forward + compatibility — adding a new component does not break older + regimes that don't understand it). +3. Older gateways that don't populate a new component leave it + unset; regimes that need it for a decision treat "unset" as + "absent" and decide accordingly (typically: cannot grant + permission scoped to a component the gateway didn't supply). + +A regime that wants stricter behaviour (e.g. fail-closed on +unknown components rather than ignoring them) declares so as part +of its own configuration; the contract default is "ignore unknown". + +## Operation registry (gateway-side) + +Mapping a request onto `(capability, resource, parameters)` is +service-specific — it cannot be inferred from the capability +alone. The gateway maintains an **operation registry** that +declares, per operation: + +- The required capability. +- The resource level (system / workspace / flow) — determines the + shape of the resource identifier. +- How to extract the resource address components (workspace, + flow) from the request — from URL path, WebSocket envelope, or + body. +- Which body fields are operation parameters (and which of those + the IAM regime should see in the `parameters` argument). + +This registry is part of the gateway's endpoint declarations, not +part of the IAM contract. The contract specifies what arguments +`authorise` receives; how the gateway populates them is its own +concern. + +In the OSS gateway, registry keys follow these conventions: + +| Pattern | Used by | Resource level | +|---|---|---| +| bare op name (`create-user`, `list-users`, `login`, …) | `/api/v1/iam` and the auth surface | system / workspace, per op | +| `:` (`config:get`, `flow:list-blueprints`, `librarian:add-document`, …) | `/api/v1/{kind}` (workspace-scoped global services) | workspace | +| `flow-service:` (`flow-service:agent`, `flow-service:graph-rag`, …) | `/api/v1/flow/{flow}/service/{kind}` and the WS Mux | flow | +| `flow-import:` / `flow-export:` | `/api/v1/flow/{flow}/{import,export}/{kind}` streaming sockets | flow | + +Keys are an OSS-gateway implementation detail — the contract does +not constrain naming. The conventions above exist so the registry +key is uniquely derivable from the request path and (where +applicable) body without ambiguity. + +## Caching + +Both `authenticate` and `authorise` results are cached at the +gateway, on different policies: + +- **`authenticate`** — cached by a hash of the credential. The OSS + gateway uses a fixed short TTL (currently 60 s) so that revoked + API keys and disabled users stop working within the TTL window + without any push mechanism. Regimes that want a different + behaviour can return an `expires` hint with the identity; the + gateway honours the smaller of `expires` and its own ceiling. + +- **`authorise`** — cached by a hash of `(handle, capability, + resource, parameters)`. The regime returns a suggested TTL with + the decision; the gateway clamps it above by a deployment-set + ceiling (currently 60 s). Both allow and deny decisions are + cached; denies briefly, to avoid hammering the regime with + repeated rejected attempts. + +The TTL ceiling caps the revocation latency window — a role +revoked at the regime takes effect at the gateway no later than +the ceiling. Operators that need stricter revocation can lower +the ceiling. + +## Failure modes + +| Condition | Behaviour | +|---|---| +| `authenticate` returns AuthFailure | Gateway responds 401 with the masked `auth failure` body. | +| `authorise` returns deny | Gateway responds 403 with the masked `access denied` body. | +| IAM regime unreachable | Gateway responds 401 / 503 (deployment-defined). No fail-open. | +| `authorise_many` partial deny | Gateway treats the request as denied; the operation is rejected. Partial-success semantics are not part of the contract. | +| Regime returns "not supported" for a management operation | Gateway responds 501. | + +There is no fallback or "soft" decision path. An IAM regime that +is unavailable, slow, or returning errors causes requests to fail +closed. + +## Implementations + +### Open-source role-based regime + +Defined in [`iam-protocol.md`](iam-protocol.md). Implements the +contract via: + +- A pub/sub request/response service (`iam-svc`) reached only by + the gateway over the message bus. +- Credentials are API keys (opaque) or JWTs (Ed25519, locally + validated by the gateway against the regime's published public + key). +- `authorise` reduces to a role-and-workspace-scope check against + the role table defined in [`capabilities.md`](capabilities.md). +- Identity, user, and workspace records live in Cassandra. + +The OSS regime is deliberately simple — three roles, single +home-workspace per user (a regime data-model decision, not a +contract assertion), no policy language. + +### Future regimes + +The contract is shaped to admit, without code change in the +gateway: + +- **OIDC / SSO** — `authenticate` validates an OIDC ID token via + the IdP's JWKS; `Identity.handle` carries the verified subject + and group claims; `authorise` evaluates against group-to- + capability mappings configured at the regime. +- **ABAC / Policy engine** — `authorise` calls out to a policy + engine (Rego, Cedar, custom DSL) with the identity's attributes + and the resource as the policy input. +- **ReBAC (Zanzibar-style)** — `authorise` translates `(identity, + capability, resource)` into a relationship-tuple lookup against + a tuple store. +- **Hybrid** — multiple regimes composed: e.g. authenticate via + SSO, authorise via local policy. + +None of these require gateway changes. The contract surface is +the same; the regime is what differs. + +## References + +- [Identity and Access Management Specification](iam.md) — overall + design and the gateway-side framing. +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. +- [Capability Vocabulary Specification](capabilities.md) — the + capability strings the gateway uses as `authorise` input. diff --git a/docs/tech-specs/iam-protocol.md b/docs/tech-specs/iam-protocol.md index 8049ebfe..603d1c06 100644 --- a/docs/tech-specs/iam-protocol.md +++ b/docs/tech-specs/iam-protocol.md @@ -8,21 +8,41 @@ parent: "Tech Specs" ## Overview -The IAM service is a backend processor, reached over the standard -request/response pub/sub pattern. It is the authority for users, -workspaces, API keys, and login credentials. The API gateway -delegates to it for authentication resolution and for all user / -workspace / key management. +This document specifies the wire protocol of the **open-source IAM +regime** — one implementation of the abstract IAM contract defined +in [`iam-contract.md`](iam-contract.md). Other regimes (OIDC / SSO, +ABAC, ReBAC, external policy engines) implement the same contract +with different transports, data models, and policy semantics; the +gateway is unaware of which regime it's wired against. -This document defines the wire protocol: the `IamRequest` and -`IamResponse` dataclasses, the operation set, the per-operation -input and output fields, the error taxonomy, and the initial HTTP -forwarding endpoint used while IAM is being integrated into the -gateway. +The OSS regime is a backend processor (`iam-svc`) reached over the +standard request/response pub/sub pattern. It owns users, +workspaces, API keys, login credentials, and JWT signing keys, all +backed by Cassandra. The API gateway is its only caller. -Architectural context — roles, capabilities, workspace scoping, -enforcement boundary — lives in [`iam.md`](iam.md) and -[`capabilities.md`](capabilities.md). +This document defines: + +- the `IamRequest` and `IamResponse` dataclasses on the bus, +- the operation set the OSS regime implements, +- per-operation input and output fields, +- the error taxonomy, +- the bootstrap modes, +- the initial HTTP forwarding endpoint used while the protocol is + being exercised. + +The mapping from this regime onto the abstract contract is direct: + +| Contract operation | OSS regime operation | +|---|---| +| `authenticate(credential)` | `resolve-api-key` (for API keys); local JWT validation against `get-signing-key-public` (for JWTs) | +| `authorise(identity, capability, resource, parameters)` | Role-table lookup against the OSS role bundles defined in [`capabilities.md`](capabilities.md), gated by workspace scope. Workspace can come from the resource address (workspace- and flow-level resources) or from a parameter (system-level resources whose parameters reference a workspace, e.g. `create-user with workspace association W`). | +| `authorise_many` | Loop over `authorise` | +| Identity / credential / workspace management | `create-user`, `create-api-key`, etc. as listed below. These are operations on system-level resources (the user / workspace / credential registries); workspace, where it appears in the body, is a parameter. | + +Architectural context — roles, capabilities, workspace as resource +scope, enforcement boundary — lives in [`iam.md`](iam.md) and +[`capabilities.md`](capabilities.md). The contract abstraction +lives in [`iam-contract.md`](iam-contract.md). ## Transport @@ -345,5 +365,7 @@ lands in the subsequent middleware work. ## References +- [IAM Contract Specification](iam-contract.md) — the abstract + gateway↔IAM regime contract this protocol implements. - [Identity and Access Management Specification](iam.md) - [Capability Vocabulary Specification](capabilities.md) diff --git a/docs/tech-specs/iam.md b/docs/tech-specs/iam.md index 50b64444..a764535e 100644 --- a/docs/tech-specs/iam.md +++ b/docs/tech-specs/iam.md @@ -199,9 +199,9 @@ The server rejects all non-auth messages until authentication succeeds. The socket remains open on auth failure, allowing the client to retry with a different token without reconnecting. The client can also send a new auth message at any time to re-authenticate — for example, to -refresh an expiring JWT or to switch workspace. The -resolved identity (user, workspace, roles) is updated on each -successful auth. +refresh an expiring JWT or to switch workspace. The resolved +identity (handle, workspace, principal_id, source) is updated on +each successful auth. #### API keys @@ -219,7 +219,7 @@ For programmatic access: CLI tools, scripts, and integrations. On each request, the gateway resolves an API key by: 1. Hashing the token. -2. Checking a local cache (hash → user/workspace/roles). +2. Checking a local cache (hash → identity). 3. On cache miss, calling the IAM service to resolve. 4. Caching the result with a short TTL (e.g. 60 seconds). @@ -233,9 +233,15 @@ For interactive access via the UI or WebSocket connections. - A user logs in with username and password. The gateway forwards the request to the IAM service, which validates the credentials and returns a signed JWT. -- The JWT carries the user ID, workspace, and roles as claims. +- The JWT carries identity-binding claims only — user id (`sub`) + and the workspace this credential authenticates to. No roles, + no policy state. Per the IAM contract, all policy decisions go + through `authorise`; the gateway never reads roles or other + regime-internal state from the credential. - The gateway validates JWTs locally using the IAM service's public - signing key — no service call needed on subsequent requests. + signing key — no service call needed for the authentication step; + authorisation calls remain per-request (cached per the contract's + caching rules). - Token expiry is enforced by standard JWT validation at the time the request (or WebSocket connection) is made. - For long-lived WebSocket connections, the JWT is validated at connect @@ -285,35 +291,82 @@ authentication uses API keys or JWTs. On first start, the bootstrap process creates a default workspace and admin user with an initial API key. -### User identity +### Identity, credentials, and workspace binding -A user belongs to exactly one workspace. The design supports extending -this to multi-workspace access in the future (see -[Extension points](#extension-points)). +The gateway never asks "which workspace does *this user* belong to?". +That question forces every IAM regime to expose a user-to-workspace +mapping, which prevents regimes where the relationship is many-to-many +or doesn't exist (e.g. SSO with IdP-driven workspace selection). +Instead, the gateway asks "which workspace does *this credential* +authenticate to?" — a question every regime can answer in its own +terms. -A user record contains: +A credential (API key, JWT, OIDC token, etc.) is **bound to a +workspace at issue time**. The IAM regime decides what binding +means: + +- **OSS regime** — each user has a home workspace; credentials + issued to that user are bound to that workspace. A 1:1 + user-to-workspace constraint is an internal data-model decision, + not a contract assertion. +- **Multi-workspace regime** (future / enterprise) — a user with + access to several workspaces gets a different credential per + workspace. Each credential authenticates to exactly one + workspace; the relationship between user and workspace is a + regime-internal detail the gateway does not see. + +When the gateway authenticates a credential, the IAM regime returns +an `Identity` whose `workspace` is the workspace this credential is +for. That value — not "the user's workspace" — is what the gateway +uses for default-fill-in and as input to the IAM `authorise` call. + +#### Identity surface + +What the gateway holds after `authenticate`: + +| Field | Purpose | +|-------|---------| +| `handle` | Opaque token quoted back when calling `authorise`. Regime-defined. | +| `workspace` | The workspace this credential authenticates to. Used as the default if a request omits workspace. | +| `principal_id` | Stable identifier for audit logging (a user id, sub claim, service account id). Never used for authorisation. | +| `source` | How the credential was presented (`api-key`, `jwt`). Logged with audit events; not policy input. | + +Anything else — roles, claims, group memberships, policy attributes +— stays inside the regime and is reachable only via `authorise`. +See [`iam-contract.md`](iam-contract.md) for the full contract. + +#### OSS user record + +The OSS regime stores the following per user. These fields are +**OSS-implementation specifics**, not part of the contract. | Field | Type | Description | |-------|------|-------------| | `id` | string | Unique user identifier (UUID) | | `name` | string | Display name | | `email` | string | Email address (optional) | -| `workspace` | string | Workspace the user belongs to | +| `workspace` | string | Home workspace; default binding for issued credentials | | `roles` | list[string] | Assigned roles (e.g. `["reader"]`) | | `enabled` | bool | Whether the user can authenticate | | `created` | datetime | Account creation timestamp | -The `workspace` field maps to the existing `user` field in `Metadata`. -This means the storage-layer isolation (Cassandra, Neo4j, Qdrant -filtering by `user` + `collection`) works without changes — the gateway -sets the `user` metadata field to the authenticated user's workspace. +The `workspace` field on a user record is the **default binding** +used when issuing credentials, not a constraint visible to the +gateway. An enterprise regime may have no user records at all +(authentication delegated to an IdP). ### Workspaces -A workspace is an isolated data boundary. Users belong to a workspace, -and all data operations are scoped to it. Workspaces map to the existing -`user` field in `Metadata` and the corresponding Cassandra keyspace, -Qdrant collection prefix, and Neo4j property filters. +A workspace is an isolated data boundary — a tenancy scope in which +users, flows, configuration, documents, and knowledge graphs live. +Workspaces map to storage-layer isolation: the `user` field in +`Metadata`, the corresponding Cassandra keyspace, the Qdrant +collection prefix, the Neo4j property filter. + +Workspace is the most prominent component of an operation's +**resource scope**: when a request says "do X to Y", workspace is +part of "Y". Listing users, creating flows, querying the graph — +all of these target a specific workspace. | Field | Type | Description | |-------|------|-------------| @@ -322,57 +375,164 @@ Qdrant collection prefix, and Neo4j property filters. | `enabled` | bool | Whether the workspace is active | | `created` | datetime | Creation timestamp | -All data operations are scoped to a workspace. The gateway determines -the effective workspace for each request as follows: +#### Default-fill-in -1. If the request includes a `workspace` parameter, validate it against - the user's assigned workspace. - - If it matches, use it. - - If it does not match, return 403. (This could be extended to - check a workspace access grant list.) -2. If no `workspace` parameter is provided, use the user's assigned - workspace. +If a request omits workspace, the gateway fills it in from the +authenticated identity's bound workspace (`identity.workspace`) +before any IAM check runs. IAM never receives an unresolved +workspace; every `authorise` call sees a concrete value. -The gateway sets the `user` field in `Metadata` to the effective -workspace ID, replacing the caller-supplied `?user=` query parameter. +#### Authorisation -This design ensures forward compatibility. Clients that pass a -workspace parameter will work unchanged if multi-workspace support is -added later. Requests for an unassigned workspace get a clear 403 -rather than silent misbehaviour. +Whether the resolved workspace is permitted to be operated on by +this caller is an **IAM decision**, not a gateway one. The gateway +calls `authorise(identity, capability, {workspace: ..., ...})` and +relays the answer. In the OSS regime, the answer comes from the +caller's role × workspace-scope — see [`capabilities.md`](capabilities.md). +In other regimes it could come from group mappings, policies, +relationship tuples, or anything else the regime models. + +### Request anatomy + +The shape of a request — where workspace appears, where flow +appears, where parameters live — follows from **the level of the +resource being operated on**, not from any single property of the +request like its URL or its required capability. + +Resources live at one of three levels (see also the resource model +in [`iam-contract.md`](iam-contract.md)): + +| Resource level | Examples | Resource address | +|---|---|---| +| **System** | The user registry, the workspace registry, the IAM signing key, the audit log | empty `{}` | +| **Workspace** | A workspace's config, flow definitions, library, knowledge cores, collections | `{workspace: ...}` | +| **Flow** | A flow's knowledge graph, agent state, LLM context, embeddings, MCP context | `{workspace: ..., flow: ...}` | + +For the gateway-to-bus mapping this dictates **where workspace +lives in the message**, but only when workspace is part of the +*resource address*. Workspace can also appear as an *operation +parameter* on system-level resources (see below). + +#### Workspace as address vs. parameter + +Two distinct roles, two distinct locations: + +- **Workspace as address component.** Workspace identifies the + thing being operated on. Used for workspace-level and flow-level + resources. Lives in the addressing layer of the message — the + URL path for HTTP, or the WebSocket envelope alongside `flow` for + flow-scoped operations sent through the Mux. +- **Workspace as operation parameter.** Workspace is data the + operation acts on, while the resource itself is system-level. + Used for operations on the user registry (`create-user with + workspace association W`), the workspace registry (`create- + workspace W`), and other system-level operations that happen to + reference a workspace. Lives in the request body or inner WS + payload alongside the operation's other parameters. + +The two roles never coexist on the same operation. Either the +operation addresses something within a workspace (workspace is in +the address) or it operates on a system-level resource with +workspace as a parameter (workspace is in the body) or workspace +is irrelevant (system-level operations like `bootstrap`, +`rotate-signing-key`, `login` itself). + +#### Where workspace lives, by request type + +| Request type | Resource level | Workspace lives in | +|---|---|---| +| Flow-scoped data plane (`agent`, `graph-rag`, `llm`, `embeddings`, `mcp`, etc.) | Flow | Envelope alongside `flow` (WS) or URL path (HTTP) — part of the address | +| Workspace-scoped control plane (`config`, `library`, `knowledge`, `collection-management`, flow lifecycle) | Workspace | Body / inner request — part of the address | +| User registry ops (`create-user`, `list-users`, `disable-user`, etc.) | System | Body — as a *parameter* (the user's workspace association or a list filter) | +| Workspace registry ops (`create-workspace`, `list-workspaces`, etc.) | System | Body — as a *parameter* (the workspace identifier in `workspace_record`) | +| Credential ops (`create-api-key`, `revoke-api-key`, `change-password`, `reset-password`) | System | Body — as a *parameter* on ops that have one; absent on `change-password` (target is the caller's identity) | +| System ops (`bootstrap`, `login`, `rotate-signing-key`, `get-signing-key-public`) | System | Not present at all | + +The classification is deliberate. Users are a global concept that +*have* a workspace; they don't *live* in one. An OSS regime has +1:1 user-to-workspace; a multi-workspace regime maps a user to many +workspaces; an SSO regime might delegate workspace membership to an +IdP entirely. The gateway treats user-registry operations as +system-level so the contract is the same across regimes — the +workspace association is a parameter the regime interprets in its +own terms. + +#### HTTP + +HTTP routes by URL path, so the address lives in the URL. +Per-operation REST shape: + +- Flow-level: `POST /api/v1/workspaces/{w}/flows/{f}/services/{kind}` + — `workspace` and `flow` are URL components. +- Workspace-level: `POST /api/v1/workspaces/{w}/config`, + `/api/v1/workspaces/{w}/library`, etc. — `workspace` is a URL + component. +- System-level: `POST /api/v1/users`, `/api/v1/workspaces`, etc. — + no workspace in URL; if the operation references one, it's a + field in the body. + +`/api/v1/iam` is itself registry-driven: the body's `operation` +field is looked up against the registry to obtain the capability, +resource shape, and parameter shape per operation, rather than +gating the whole endpoint with a single coarse capability. + +#### WebSocket Mux + +The Mux envelope is the addressing layer for flow-scoped +operations. For workspace-level and system-level operations the +envelope routes by `service` only, and the inner request payload +carries the address components or parameters as appropriate. See +[`iam-contract.md`](iam-contract.md) for the operation-registry +mechanism the Mux uses to know which fields to read. ### Roles and access control -Three roles with fixed permissions: +Roles are an OSS-regime concept and live entirely in the IAM +service. The gateway does not enumerate or check them — it asks +`authorise(identity, capability, resource, parameters)` per +request and the regime maps the caller's roles to a decision. -| Role | Data operations | Admin operations | System | -|------|----------------|-----------------|--------| -| `reader` | Query knowledge graph, embeddings, RAG | None | None | -| `writer` | All reader operations + load documents, manage collections | None | None | -| `admin` | All writer operations | Config, flows, collection management, user management | Metrics | +The OSS regime ships three roles: -Role checks happen at the gateway before dispatching to backend -services. Each endpoint declares the minimum role required: +| Role | Capabilities granted | +|------|----------------------| +| `reader` | Read capabilities on data and config (`graph:read`, `documents:read`, `rows:read`, `config:read`, `flows:read`, `knowledge:read`, `collections:read`, `keys:self`, plus the per-service caps `agent`, `llm`, `embeddings`, `mcp`). | +| `writer` | All reader capabilities, plus `graph:write`, `documents:write`, `rows:write`, `knowledge:write`, `collections:write`. | +| `admin` | All writer capabilities, plus `config:write`, `flows:write`, `users:read`, `users:write`, `users:admin`, `keys:admin`, `workspaces:admin`, `iam:admin`, `metrics:read`. | -| Endpoint pattern | Minimum role | -|-----------------|--------------| -| `GET /api/v1/socket` (queries) | `reader` | -| `POST /api/v1/librarian` | `writer` | -| `POST /api/v1/flow/*/import/*` | `writer` | -| `POST /api/v1/config` | `admin` | -| `GET /api/v1/flow/*` | `admin` | -| `GET /api/metrics` | `admin` | +Workspace scope: `reader` and `writer` are active only in the +caller's bound workspace; `admin` is active across all workspaces. -Roles are hierarchical: `admin` implies `writer`, which implies -`reader`. +The gateway gates each endpoint by *capability*, not by role. +Capabilities are declared per operation in the gateway's operation +registry; see [`iam-contract.md`](iam-contract.md) for the +registry mechanism and [`capabilities.md`](capabilities.md) for +the capability vocabulary. ### IAM service -The IAM service is a new backend service that manages all identity and -access data. It is the authority for users, workspaces, API keys, and -credentials. The gateway delegates to it. +The IAM service is a backend service that implements the +[IAM contract](iam-contract.md) — `authenticate`, `authorise`, and +the management operations the gateway forwards. It is the +authority for identity, credential validation, and access decisions. +The gateway treats it as a black box behind the contract; nothing +in the gateway is regime-specific. -#### Data model +The OSS distribution ships one IAM regime: a role-based service +backed by Cassandra, described in +[`iam-protocol.md`](iam-protocol.md). Enterprise / future regimes +can replace this implementation without changing the gateway, the +wire protocol between gateway and backends, or the capability +vocabulary — see the contract spec for the abstraction the gateway +is wired against and the implementation notes for what other +regimes look like. + +#### OSS data model + +The OSS regime stores users, workspaces, API keys, and signing +keys in Cassandra. This is an **OSS regime implementation +detail**; it is not part of the contract. Other regimes will have +different (or no) data models. ``` iam_workspaces ( @@ -456,42 +616,53 @@ surface — e.g. `"missing required field 'workspace'"` or ### Gateway changes -The current `Authenticator` class is replaced with a thin authentication -middleware that delegates to the IAM service: +The current `Authenticator` class is replaced with a thin +authentication+authorisation middleware that delegates to the IAM +service per the IAM contract. The gateway performs no role check +itself — authorisation is asked of the regime via `authorise`. For HTTP requests: 1. Extract Bearer token from the `Authorization` header. 2. If the token has JWT format (dotted structure): - Validate signature locally using the cached public key. - - Extract user ID, workspace, and roles from claims. + - Build an `Identity` from `sub` and `workspace` claims (no + other claims are consulted). 3. Otherwise, treat as an API key: - Hash the token and check the local cache. - - On cache miss, call the IAM service to resolve. - - Cache the result (user/workspace/roles) with a short TTL. + - On cache miss, call the IAM service to resolve to an + `Identity` (handle, workspace, principal_id, source). + - Cache the result with a short TTL. 4. If neither succeeds, return 401. -5. If the user or workspace is disabled, return 403. -6. Check the user's role against the endpoint's minimum role. If - insufficient, return 403. -7. Resolve the effective workspace: - - If the request includes a `workspace` parameter, validate it - against the user's assigned workspace. Return 403 on mismatch. - - If no `workspace` parameter, use the user's assigned workspace. -8. Set the `user` field in the request context to the effective - workspace ID. This propagates through `Metadata` to all downstream - services. +5. Look up the operation in the gateway's operation registry to get + `(capability, resource_level, extractors)`. Build the resource + address (system / workspace / flow level) and parameters from + the request. +6. Default-fill the workspace into the body when the operation is + workspace- or flow-level (so downstream code sees a single + canonical address); the resource address keeps its supplied + value. +7. Call `authorise(identity, capability, resource, parameters)`. + On allow, forward the request; on deny, return 403. On regime + error, fail closed (401 / 503 per deployment). +8. Cache the decision per the contract's caching rules (clamped + above by a deployment-set ceiling). For WebSocket connections: 1. Accept the connection in an unauthenticated state. 2. Wait for an auth message (`{"type": "auth", "token": "..."}`). -3. Validate the token using the same logic as steps 2-7 above. +3. Validate the token using the same logic as steps 1-3 above. 4. On success, attach the resolved identity to the connection and send `{"type": "auth-ok", ...}`. 5. On failure, send `{"type": "auth-failed", ...}` but keep the socket open. 6. Reject all non-auth messages until authentication succeeds. 7. Accept new auth messages at any time to re-authenticate. +8. For each subsequent request frame, look up + `flow-service:` in the registry and call `authorise` + against the `{workspace, flow}` resource — same authority + gateway HTTP callers see, evaluated per-frame. ### CLI changes @@ -892,6 +1063,12 @@ service, not in the config service. Reasons: ## References +- [IAM Contract Specification](iam-contract.md) — the gateway↔IAM + regime abstraction this design is wired against. +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. +- [Capability Vocabulary Specification](capabilities.md) — the + capability strings the gateway uses as `authorise` input. - [Data Ownership and Information Separation](data-ownership-model.md) - [MCP Tool Bearer Token Specification](mcp-tool-bearer-token.md) - [Multi-Tenant Support Specification](multi-tenant-support.md) diff --git a/tests/unit/test_gateway/test_auth.py b/tests/unit/test_gateway/test_auth.py index ba2b9bc2..26e93fd9 100644 --- a/tests/unit/test_gateway/test_auth.py +++ b/tests/unit/test_gateway/test_auth.py @@ -87,7 +87,6 @@ class TestVerifyJwtEddsa: priv, pub = make_keypair() claims = { "sub": "user-1", "workspace": "default", - "roles": ["reader"], "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -99,7 +98,7 @@ class TestVerifyJwtEddsa: def test_expired_jwt_rejected(self): priv, pub = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()) - 3600, "exp": int(time.time()) - 1, } @@ -111,7 +110,7 @@ class TestVerifyJwtEddsa: priv_a, _ = make_keypair() _, pub_b = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -131,7 +130,7 @@ class TestVerifyJwtEddsa: # since we expect it to bail before verifying. header = {"alg": "HS256", "typ": "JWT", "kid": "x"} payload = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } h = _b64url(json.dumps(header, separators=(",", ":")).encode()) @@ -149,11 +148,12 @@ class TestIdentity: def test_fields(self): i = Identity( - user_id="u", workspace="w", roles=["reader"], source="api-key", + handle="u", workspace="w", + principal_id="u", source="api-key", ) - assert i.user_id == "u" + assert i.handle == "u" assert i.workspace == "w" - assert i.roles == ["reader"] + assert i.principal_id == "u" assert i.source == "api-key" @@ -194,7 +194,6 @@ class TestIamAuthDispatch: priv, pub = make_keypair() claims = { "sub": "user-1", "workspace": "default", - "roles": ["writer"], "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -206,9 +205,9 @@ class TestIamAuthDispatch: ident = await auth.authenticate( make_request(f"Bearer {token}") ) - assert ident.user_id == "user-1" + assert ident.handle == "user-1" assert ident.workspace == "default" - assert ident.roles == ["writer"] + assert ident.principal_id == "user-1" assert ident.source == "jwt" @pytest.mark.asyncio @@ -217,7 +216,7 @@ class TestIamAuthDispatch: # must not validate — even ones that would otherwise pass. priv, _ = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } token = sign_jwt(priv, claims) @@ -232,6 +231,9 @@ class TestIamAuthDispatch: async def fake_resolve(api_key): assert api_key == "tg_testkey" + # Roles are returned by the regime as a hint but the + # gateway ignores them — kept here so the resolve + # protocol shape is exercised. return ("user-xyz", "default", ["admin"]) async def fake_with_client(op): @@ -241,9 +243,9 @@ class TestIamAuthDispatch: ident = await auth.authenticate( make_request("Bearer tg_testkey") ) - assert ident.user_id == "user-xyz" + assert ident.handle == "user-xyz" assert ident.workspace == "default" - assert ident.roles == ["admin"] + assert ident.principal_id == "user-xyz" assert ident.source == "api-key" @pytest.mark.asyncio @@ -301,8 +303,8 @@ class TestApiKeyCache: a = await auth.authenticate(make_request("Bearer tg_a")) b = await auth.authenticate(make_request("Bearer tg_b")) - assert a.user_id == "u-tg_a" - assert b.user_id == "u-tg_b" + assert a.handle == "u-tg_a" + assert b.handle == "u-tg_b" assert seen == ["tg_a", "tg_b"] @pytest.mark.asyncio @@ -310,3 +312,136 @@ class TestApiKeyCache: # Not a behaviour test — just ensures we don't accidentally # set TTL to 0 (which would defeat the cache) or to a week. assert 10 <= API_KEY_CACHE_TTL <= 3600 + + +# -- IamAuth.authorise ----------------------------------------------------- + + +class TestAuthorise: + """``authorise()`` is the gateway's only authorisation entry + point under the IAM contract. It calls iam-svc, caches the + decision for the regime's TTL (clamped above), and raises 403 + on deny / 401 on regime error (fail closed).""" + + def _make_identity(self, handle="u-1", workspace="default"): + return Identity( + handle=handle, workspace=workspace, + principal_id=handle, source="api-key", + ) + + @pytest.mark.asyncio + async def test_allow_returns_no_exception(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + await auth.authorise( + self._make_identity(), + "graph:read", + {"workspace": "default"}, + {}, + ) + + @pytest.mark.asyncio + async def test_deny_raises_403(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authorise=AsyncMock(return_value=(False, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPForbidden): + await auth.authorise( + self._make_identity(), + "users:admin", + {}, + {"workspace": "acme"}, + ) + + @pytest.mark.asyncio + async def test_regime_error_fails_closed_as_401(self): + # If iam-svc errors, the gateway must NOT silently allow. + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + raise RuntimeError("iam-svc down") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authorise( + self._make_identity(), + "graph:read", + {"workspace": "default"}, + {}, + ) + + @pytest.mark.asyncio + async def test_allow_decision_is_cached(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + for _ in range(5): + await auth.authorise( + ident, "graph:read", {"workspace": "default"}, {}, + ) + + assert calls["n"] == 1 + + @pytest.mark.asyncio + async def test_deny_decision_is_cached(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(False, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + for _ in range(5): + with pytest.raises(web.HTTPForbidden): + await auth.authorise( + ident, "users:admin", {}, {"workspace": "acme"}, + ) + + # Denies are cached too — repeated attempts don't re-hit IAM. + assert calls["n"] == 1 + + @pytest.mark.asyncio + async def test_different_resources_cached_separately(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + await auth.authorise( + ident, "graph:read", {"workspace": "a"}, {}, + ) + await auth.authorise( + ident, "graph:read", {"workspace": "b"}, {}, + ) + + # Different resource → different cache key → two IAM calls. + assert calls["n"] == 2 diff --git a/tests/unit/test_gateway/test_capabilities.py b/tests/unit/test_gateway/test_capabilities.py index 063e9ea4..102e381e 100644 --- a/tests/unit/test_gateway/test_capabilities.py +++ b/tests/unit/test_gateway/test_capabilities.py @@ -1,15 +1,22 @@ """ -Tests for gateway/capabilities.py — the capability + role + workspace -model that underpins all gateway authorisation. +Tests for gateway/capabilities.py — the thin authorisation surface +under the IAM contract. + +The gateway no longer holds policy state (roles, capability sets, +workspace scopes); those live in iam-svc. These tests cover only +what the gateway shim does itself: PUBLIC / AUTHENTICATED short- +circuiting, default-fill of workspace, and forwarding of capability +checks to ``auth.authorise``. """ import pytest from aiohttp import web +from unittest.mock import AsyncMock, MagicMock from trustgraph.gateway.capabilities import ( PUBLIC, AUTHENTICATED, - KNOWN_CAPABILITIES, ROLE_DEFINITIONS, - check, enforce_workspace, access_denied, auth_failure, + enforce, enforce_workspace, + access_denied, auth_failure, ) @@ -17,109 +24,74 @@ from trustgraph.gateway.capabilities import ( class _Identity: - """Minimal stand-in for auth.Identity — the capability module - accesses ``.workspace`` and ``.roles``.""" - def __init__(self, workspace, roles): - self.user_id = "user-1" + """Stand-in for auth.Identity — under the IAM contract it has + just ``handle``, ``workspace``, ``principal_id``, ``source``.""" + + def __init__(self, handle="user-1", workspace="default"): + self.handle = handle self.workspace = workspace - self.roles = list(roles) + self.principal_id = handle + self.source = "api-key" -def reader_in(ws): - return _Identity(ws, ["reader"]) +def _allow_auth(identity=None): + """Build an Auth double that authenticates to ``identity`` and + allows every authorise() call.""" + auth = MagicMock() + auth.authenticate = AsyncMock( + return_value=identity or _Identity(), + ) + auth.authorise = AsyncMock(return_value=None) + return auth -def writer_in(ws): - return _Identity(ws, ["writer"]) +def _deny_auth(identity=None): + """Build an Auth double that authenticates but denies authorise.""" + auth = MagicMock() + auth.authenticate = AsyncMock( + return_value=identity or _Identity(), + ) + auth.authorise = AsyncMock(side_effect=access_denied()) + return auth -def admin_in(ws): - return _Identity(ws, ["admin"]) +# -- enforce() ------------------------------------------------------------- -# -- role table sanity ----------------------------------------------------- +class TestEnforce: + @pytest.mark.asyncio + async def test_public_returns_none_no_auth(self): + auth = _allow_auth() + result = await enforce(MagicMock(), auth, PUBLIC) + assert result is None + auth.authenticate.assert_not_called() + auth.authorise.assert_not_called() -class TestRoleTable: + @pytest.mark.asyncio + async def test_authenticated_skips_authorise(self): + identity = _Identity() + auth = _allow_auth(identity) + result = await enforce(MagicMock(), auth, AUTHENTICATED) + assert result is identity + auth.authenticate.assert_awaited_once() + auth.authorise.assert_not_called() - def test_oss_roles_present(self): - assert set(ROLE_DEFINITIONS.keys()) == {"reader", "writer", "admin"} + @pytest.mark.asyncio + async def test_capability_calls_authorise_system_level(self): + identity = _Identity() + auth = _allow_auth(identity) + result = await enforce(MagicMock(), auth, "graph:read") + assert result is identity + auth.authorise.assert_awaited_once_with( + identity, "graph:read", {}, {}, + ) - def test_admin_is_cross_workspace(self): - assert ROLE_DEFINITIONS["admin"]["workspace_scope"] == "*" - - def test_reader_writer_are_assigned_scope(self): - assert ROLE_DEFINITIONS["reader"]["workspace_scope"] == "assigned" - assert ROLE_DEFINITIONS["writer"]["workspace_scope"] == "assigned" - - def test_admin_superset_of_writer(self): - admin = ROLE_DEFINITIONS["admin"]["capabilities"] - writer = ROLE_DEFINITIONS["writer"]["capabilities"] - assert writer.issubset(admin) - - def test_writer_superset_of_reader(self): - writer = ROLE_DEFINITIONS["writer"]["capabilities"] - reader = ROLE_DEFINITIONS["reader"]["capabilities"] - assert reader.issubset(writer) - - def test_admin_has_users_admin(self): - assert "users:admin" in ROLE_DEFINITIONS["admin"]["capabilities"] - - def test_writer_does_not_have_users_admin(self): - assert "users:admin" not in ROLE_DEFINITIONS["writer"]["capabilities"] - - def test_every_bundled_capability_is_known(self): - for role in ROLE_DEFINITIONS.values(): - for cap in role["capabilities"]: - assert cap in KNOWN_CAPABILITIES - - -# -- check() --------------------------------------------------------------- - - -class TestCheck: - - def test_reader_has_reader_cap_in_own_workspace(self): - assert check(reader_in("default"), "graph:read", "default") - - def test_reader_does_not_have_writer_cap(self): - assert not check(reader_in("default"), "graph:write", "default") - - def test_reader_cannot_act_in_other_workspace(self): - assert not check(reader_in("default"), "graph:read", "acme") - - def test_writer_has_write_in_own_workspace(self): - assert check(writer_in("default"), "graph:write", "default") - - def test_writer_cannot_act_in_other_workspace(self): - assert not check(writer_in("default"), "graph:write", "acme") - - def test_admin_has_everything_everywhere(self): - for cap in ("graph:read", "graph:write", "config:write", - "users:admin", "metrics:read"): - assert check(admin_in("default"), cap, "acme"), ( - f"admin should have {cap} in acme" - ) - - def test_admin_has_caps_without_explicit_workspace(self): - assert check(admin_in("default"), "users:admin") - - def test_default_target_is_identity_workspace(self): - # Reader with no target workspace → should check against own - assert check(reader_in("default"), "graph:read") - - def test_unknown_capability_returns_false(self): - assert not check(admin_in("default"), "nonsense:cap", "default") - - def test_unknown_role_contributes_nothing(self): - ident = _Identity("default", ["made-up-role"]) - assert not check(ident, "graph:read", "default") - - def test_multi_role_union(self): - # If a user is both reader and admin, they inherit admin's - # cross-workspace powers. - ident = _Identity("default", ["reader", "admin"]) - assert check(ident, "users:admin", "acme") + @pytest.mark.asyncio + async def test_capability_denied_raises_forbidden(self): + auth = _deny_auth() + with pytest.raises(web.HTTPForbidden): + await enforce(MagicMock(), auth, "users:admin") # -- enforce_workspace() --------------------------------------------------- @@ -127,56 +99,54 @@ class TestCheck: class TestEnforceWorkspace: - def test_reader_in_own_workspace_allowed(self): - data = {"workspace": "default", "operation": "x"} - enforce_workspace(data, reader_in("default")) - assert data["workspace"] == "default" - - def test_reader_no_workspace_injects_assigned(self): + @pytest.mark.asyncio + async def test_default_fills_from_identity(self): data = {"operation": "x"} - enforce_workspace(data, reader_in("default")) + auth = _allow_auth() + await enforce_workspace(data, _Identity(workspace="default"), auth) assert data["workspace"] == "default" - def test_reader_mismatched_workspace_denied(self): + @pytest.mark.asyncio + async def test_caller_supplied_workspace_kept(self): data = {"workspace": "acme", "operation": "x"} - with pytest.raises(web.HTTPForbidden): - enforce_workspace(data, reader_in("default")) - - def test_admin_can_target_any_workspace(self): - data = {"workspace": "acme", "operation": "x"} - enforce_workspace(data, admin_in("default")) + auth = _allow_auth() + await enforce_workspace(data, _Identity(workspace="default"), auth) assert data["workspace"] == "acme" - def test_admin_no_workspace_defaults_to_assigned(self): - data = {"operation": "x"} - enforce_workspace(data, admin_in("default")) - assert data["workspace"] == "default" - - def test_writer_same_workspace_specified_allowed(self): + @pytest.mark.asyncio + async def test_no_capability_skips_authorise(self): data = {"workspace": "default"} - enforce_workspace(data, writer_in("default")) - assert data["workspace"] == "default" + auth = _allow_auth() + await enforce_workspace(data, _Identity(), auth) + auth.authorise.assert_not_called() - def test_non_dict_passthrough(self): - # Non-dict bodies are returned unchanged (e.g. streaming). - result = enforce_workspace("not-a-dict", reader_in("default")) - assert result == "not-a-dict" + @pytest.mark.asyncio + async def test_capability_calls_authorise_with_resource(self): + data = {"workspace": "acme"} + identity = _Identity() + auth = _allow_auth(identity) + await enforce_workspace( + data, identity, auth, capability="graph:read", + ) + auth.authorise.assert_awaited_once_with( + identity, "graph:read", {"workspace": "acme"}, {}, + ) - def test_with_capability_tightens_check(self): - # Reader lacks graph:write; workspace-only check would pass - # (scope is fine), but combined check must reject. - data = {"workspace": "default"} + @pytest.mark.asyncio + async def test_capability_denied_propagates(self): + data = {"workspace": "acme"} + auth = _deny_auth() with pytest.raises(web.HTTPForbidden): - enforce_workspace( - data, reader_in("default"), capability="graph:write", + await enforce_workspace( + data, _Identity(), auth, capability="users:admin", ) - def test_with_capability_passes_when_granted(self): - data = {"workspace": "default"} - enforce_workspace( - data, reader_in("default"), capability="graph:read", - ) - assert data["workspace"] == "default" + @pytest.mark.asyncio + async def test_non_dict_passthrough(self): + auth = _allow_auth() + result = await enforce_workspace("not-a-dict", _Identity(), auth) + assert result == "not-a-dict" + auth.authorise.assert_not_called() # -- helpers --------------------------------------------------------------- @@ -199,5 +169,3 @@ class TestSentinels: def test_public_and_authenticated_are_distinct(self): assert PUBLIC != AUTHENTICATED - assert PUBLIC not in KNOWN_CAPABILITIES - assert AUTHENTICATED not in KNOWN_CAPABILITIES diff --git a/tests/unit/test_gateway/test_endpoint_manager.py b/tests/unit/test_gateway/test_endpoint_manager.py index cf12565c..8f659b71 100644 --- a/tests/unit/test_gateway/test_endpoint_manager.py +++ b/tests/unit/test_gateway/test_endpoint_manager.py @@ -73,14 +73,16 @@ class TestEndpointManager: prometheus_url="http://test:9090" ) - # Each dispatcher factory is invoked exactly once during - # construction — one per endpoint that needs a dedicated - # wire. dispatch_auth_iam is the dedicated factory for the - # AuthEndpoints forwarder (login / bootstrap / - # change-password), distinct from dispatch_global_service - # (the generic /api/v1/{kind} route). + # Each dispatcher factory is invoked once per endpoint that + # needs a dedicated wire. dispatch_auth_iam is shared by + # two endpoints — AuthEndpoints (login / bootstrap / + # change-password) and IamEndpoint (registry-driven + # /api/v1/iam) — so it's expected to be called twice. + # Both forwarders pin the dispatcher to kind=iam and reuse + # the same factory; they're distinct from + # dispatch_global_service (the generic /api/v1/{kind} route). mock_dispatcher_manager.dispatch_global_service.assert_called_once() - mock_dispatcher_manager.dispatch_auth_iam.assert_called_once() + assert mock_dispatcher_manager.dispatch_auth_iam.call_count == 2 mock_dispatcher_manager.dispatch_socket.assert_called_once() mock_dispatcher_manager.dispatch_flow_service.assert_called_once() mock_dispatcher_manager.dispatch_flow_import.assert_called_once() diff --git a/tests/unit/test_gateway/test_socket_graceful_shutdown.py b/tests/unit/test_gateway/test_socket_graceful_shutdown.py index 23f22d30..6c3e323b 100644 --- a/tests/unit/test_gateway/test_socket_graceful_shutdown.py +++ b/tests/unit/test_gateway/test_socket_graceful_shutdown.py @@ -25,11 +25,11 @@ from trustgraph.gateway.auth import Identity TEST_CAP = "graph:write" -def _valid_identity(roles=("admin",)): +def _valid_identity(): return Identity( - user_id="test-user", + handle="test-user", workspace="default", - roles=list(roles), + principal_id="test-user", source="api-key", ) @@ -37,11 +37,12 @@ def _valid_identity(roles=("admin",)): @pytest.fixture def mock_auth(): """Mock IAM-backed authenticator. Successful by default — - ``authenticate`` returns a valid admin identity. Tests that - need the auth failure path override the ``authenticate`` - attribute locally.""" + ``authenticate`` returns a valid identity and ``authorise`` + allows everything. Tests that need the failure paths override + the relevant attribute locally.""" auth = MagicMock() auth.authenticate = AsyncMock(return_value=_valid_identity()) + auth.authorise = AsyncMock(return_value=None) return auth @@ -135,6 +136,7 @@ async def test_handle_normal_flow(): """Valid bearer → handshake accepted, dispatcher created.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) dispatcher_created = False async def mock_dispatcher_factory(ws, running, match_info): @@ -192,6 +194,7 @@ async def test_handle_exception_group_cleanup(): """Test exception group triggers dispatcher cleanup.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) mock_dispatcher = AsyncMock() mock_dispatcher.destroy = AsyncMock() @@ -262,6 +265,7 @@ async def test_handle_dispatcher_cleanup_timeout(): """Test dispatcher cleanup with timeout.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) # Mock dispatcher that takes long to destroy mock_dispatcher = AsyncMock() @@ -388,6 +392,7 @@ async def test_handle_websocket_already_closed(): """Test handling when websocket is already closed.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) mock_dispatcher = AsyncMock() mock_dispatcher.destroy = AsyncMock() diff --git a/trustgraph-base/trustgraph/base/iam_client.py b/trustgraph-base/trustgraph/base/iam_client.py index 5cfda7c8..f90694fc 100644 --- a/trustgraph-base/trustgraph/base/iam_client.py +++ b/trustgraph-base/trustgraph/base/iam_client.py @@ -1,4 +1,6 @@ +import json + from . request_response_spec import RequestResponse, RequestResponseSpec from .. schema import ( IamRequest, IamResponse, @@ -44,7 +46,13 @@ class IamClient(RequestResponse): Returns ``(user_id, workspace, roles)`` or raises ``RuntimeError`` with error type ``auth-failed`` if the key is - unknown / expired / revoked.""" + unknown / expired / revoked. + + Note: the ``roles`` value is a regime-internal hint and is + not used by the gateway directly under the IAM contract; + all authorisation decisions go through ``authorise()``. + Returned here only for backward compatibility with callers + that haven't migrated.""" resp = await self._request( operation="resolve-api-key", api_key=api_key, @@ -56,6 +64,40 @@ class IamClient(RequestResponse): list(resp.resolved_roles), ) + async def authorise(self, identity_handle, capability, + resource, parameters, timeout=IAM_TIMEOUT): + """Ask the IAM regime whether ``identity_handle`` may perform + ``capability`` on ``resource`` given ``parameters``. + + Implements the contract ``authorise(identity, capability, + resource, parameters) → (decision, ttl)``. Returns a tuple + ``(allow: bool, ttl_seconds: int)``. The TTL is the + regime's suggested cache lifetime for this decision; the + gateway honours it (clamped above by gateway-side policy).""" + resp = await self._request( + operation="authorise", + user_id=identity_handle, + capability=capability, + resource_json=json.dumps(resource or {}, sort_keys=True), + parameters_json=json.dumps(parameters or {}, sort_keys=True), + timeout=timeout, + ) + return resp.decision_allow, resp.decision_ttl_seconds + + async def authorise_many(self, identity_handle, checks, + timeout=IAM_TIMEOUT): + """Bulk authorise. ``checks`` is a list of dicts each + carrying ``capability``, ``resource``, and ``parameters``. + Returns a list of ``(allow, ttl)`` tuples in the same order.""" + resp = await self._request( + operation="authorise-many", + user_id=identity_handle, + authorise_checks=json.dumps(list(checks), sort_keys=True), + timeout=timeout, + ) + decisions = json.loads(resp.decisions_json or "[]") + return [(d.get("allow", False), d.get("ttl", 0)) for d in decisions] + async def create_user(self, workspace, user, actor="", timeout=IAM_TIMEOUT): """Create a user. ``user`` is a ``UserInput``.""" diff --git a/trustgraph-base/trustgraph/schema/services/iam.py b/trustgraph-base/trustgraph/schema/services/iam.py index 1e3ab1ab..4b5685a5 100644 --- a/trustgraph-base/trustgraph/schema/services/iam.py +++ b/trustgraph-base/trustgraph/schema/services/iam.py @@ -99,6 +99,21 @@ class IamRequest: workspace_record: WorkspaceInput | None = None key: ApiKeyInput | None = None + # ---- authorise / authorise-many inputs ---- + # Capability string from the vocabulary in capabilities.md. + capability: str = "" + # Resource identifier as JSON. See the IAM contract spec for + # the resource-component vocabulary. An empty dict denotes a + # system-level resource. + resource_json: str = "" + # Operation parameters as JSON. Decision-relevant fields the + # operation supplied that are not part of the resource address + # (e.g. workspace association on create-user). + parameters_json: str = "" + # For authorise-many: a JSON-serialised list of + # {"capability": str, "resource": dict, "parameters": dict}. + authorise_checks: str = "" + @dataclass class IamResponse: @@ -133,6 +148,18 @@ class IamResponse: bootstrap_admin_user_id: str = "" bootstrap_admin_api_key: str = "" + # ---- authorise / authorise-many outputs ---- + # authorise: the regime's allow / deny verdict. + decision_allow: bool = False + # Cache TTL the regime suggests, in seconds. Gateway respects + # this for both allow and deny decisions; bounded above by + # gateway-side policy (typically <= 60s). + decision_ttl_seconds: int = 0 + # authorise-many: a JSON-serialised list of {"allow": bool, + # "ttl": int} in the same order as the request's + # authorise_checks. + decisions_json: str = "" + error: Error | None = None diff --git a/trustgraph-flow/trustgraph/gateway/auth.py b/trustgraph-flow/trustgraph/gateway/auth.py index 95743261..6abcbe15 100644 --- a/trustgraph-flow/trustgraph/gateway/auth.py +++ b/trustgraph-flow/trustgraph/gateway/auth.py @@ -1,15 +1,16 @@ """ -IAM-backed authentication for the API gateway. +IAM-backed authentication and authorisation for the API gateway. -Replaces the legacy GATEWAY_SECRET shared-token Authenticator. The -gateway is now stateless with respect to credentials: it either -verifies a JWT locally using the active IAM signing public key, or -resolves an API key by hash with a short local cache backed by the -IAM service. +The gateway delegates both authentication ("who is this caller?") +and authorisation ("may they do this?") to the IAM regime via the +contract specified in docs/tech-specs/iam-contract.md. No regime- +specific policy (roles, scopes, claims) lives in the gateway. -Identity returned by authenticate() is the (user_id, workspace, -roles) triple the rest of the gateway — capability checks, workspace -resolver, audit logging — needs. +- Authentication: API keys are resolved by IAM; JWTs are validated + locally against the cached signing public key. +- Authorisation: every per-request decision is asked of IAM via + ``authorise(identity, capability, resource, parameters)``, with + results cached for the TTL the regime returns. """ import asyncio @@ -19,7 +20,7 @@ import json import logging import time import uuid -from dataclasses import dataclass +from dataclasses import dataclass, field from aiohttp import web @@ -37,12 +38,34 @@ logger = logging.getLogger("auth") API_KEY_CACHE_TTL = 60 # seconds +# Upper bound on cache TTL the gateway honours for an authorisation +# decision, regardless of what the regime suggested. Caps the +# revocation latency window. +AUTHZ_CACHE_TTL_MAX = 60 # seconds + @dataclass class Identity: - user_id: str + """The gateway-side surface of an authenticated caller. + + Per the IAM contract this is a small fixed shape; regime-internal + state (roles, claims, group memberships) is reachable only via + the regime's ``authorise`` operation. The gateway itself never + reads policy from this object. + """ + # Opaque handle, quoted back when calling ``authorise``. For + # the OSS regime this is the user record's id; the gateway + # treats it as a string with no semantic content. + handle: str + # The workspace this credential authenticates to. Used by the + # gateway as the default-fill-in for operations that omit a + # workspace. Never used as policy input. workspace: str - roles: list + # Stable identifier for audit logs. In OSS this is the same + # value as ``handle``; not assumed equal in the contract. + principal_id: str + # How the credential was presented. Non-policy; useful for + # logs / metrics only. source: str # "api-key" | "jwt" @@ -111,6 +134,13 @@ class IamAuth: self._key_cache = {} self._key_cache_lock = asyncio.Lock() + # Authorisation decision cache: hash(handle, capability, + # resource, parameters) -> (allow_bool, expires_ts). Holds + # both allows and denies — denies cached briefly to avoid + # hammering iam-svc with repeated rejected attempts. + self._authz_cache: dict[str, tuple[bool, float]] = {} + self._authz_cache_lock = asyncio.Lock() + # ------------------------------------------------------------------ # Short-lived client helper. Mirrors the pattern used by the # bootstrap framework and AsyncProcessor: a fresh uuid suffix per @@ -221,12 +251,13 @@ class IamAuth: sub = claims.get("sub", "") ws = claims.get("workspace", "") - roles = list(claims.get("roles", [])) if not sub or not ws: raise _auth_failure() + # JWT carries no policy state under the IAM contract; + # any roles / claims field is ignored here. return Identity( - user_id=sub, workspace=ws, roles=roles, source="jwt", + handle=sub, workspace=ws, principal_id=sub, source="jwt", ) async def _resolve_api_key(self, plaintext): @@ -245,7 +276,10 @@ class IamAuth: try: async def _call(client): return await client.resolve_api_key(plaintext) - user_id, workspace, roles = await self._with_client(_call) + # ``roles`` is returned by the OSS regime as a hint + # but is not consulted by the gateway; all policy + # decisions go through ``authorise``. + user_id, workspace, _roles = await self._with_client(_call) except Exception as e: logger.debug( f"API key resolution failed: " @@ -257,8 +291,81 @@ class IamAuth: raise _auth_failure() identity = Identity( - user_id=user_id, workspace=workspace, - roles=list(roles), source="api-key", + handle=user_id, workspace=workspace, + principal_id=user_id, source="api-key", ) self._key_cache[h] = (identity, now + API_KEY_CACHE_TTL) return identity + + # ------------------------------------------------------------------ + # Authorisation + # ------------------------------------------------------------------ + + @staticmethod + def _authz_cache_key(handle, capability, resource, parameters): + payload = json.dumps( + { + "h": handle, + "c": capability, + "r": resource or {}, + "p": parameters or {}, + }, + sort_keys=True, + separators=(",", ":"), + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + async def authorise(self, identity, capability, resource, parameters): + """Ask the IAM regime whether ``identity`` may perform + ``capability`` on ``resource`` given ``parameters``. + + Caches the decision for the regime's suggested TTL, clamped + above by ``AUTHZ_CACHE_TTL_MAX``. Both allow and deny + decisions are cached (denies briefly, to avoid hammering + iam-svc with repeated rejected attempts). + + Raises ``HTTPForbidden`` (403 / "access denied") on a deny + decision. Raises ``HTTPUnauthorized`` (401 / "auth failure") + if the IAM service errors out — failing closed.""" + + key = self._authz_cache_key( + identity.handle, capability, resource, parameters, + ) + now = time.time() + + cached = self._authz_cache.get(key) + if cached and cached[1] > now: + allow, _ = cached + if not allow: + raise _access_denied() + return + + async with self._authz_cache_lock: + cached = self._authz_cache.get(key) + if cached and cached[1] > now: + allow, _ = cached + if not allow: + raise _access_denied() + return + + try: + async def _call(client): + return await client.authorise( + identity.handle, capability, + resource or {}, parameters or {}, + ) + allow, ttl = await self._with_client(_call) + except Exception as e: + logger.warning( + f"authorise failed: {type(e).__name__}: {e}; " + f"failing closed for " + f"{identity.principal_id!r} cap={capability!r}" + ) + raise _auth_failure() + + ttl = max(0, min(int(ttl or 0), AUTHZ_CACHE_TTL_MAX)) + self._authz_cache[key] = (bool(allow), now + ttl) + + if not allow: + raise _access_denied() + return diff --git a/trustgraph-flow/trustgraph/gateway/capabilities.py b/trustgraph-flow/trustgraph/gateway/capabilities.py index 15e25684..72ca51c7 100644 --- a/trustgraph-flow/trustgraph/gateway/capabilities.py +++ b/trustgraph-flow/trustgraph/gateway/capabilities.py @@ -1,36 +1,23 @@ """ -Capability vocabulary, role definitions, and authorisation helpers. +Gateway-side authorisation entry points. -See docs/tech-specs/capabilities.md for the authoritative description. -The data here is the OSS bundle table in that spec. Enterprise -editions may replace this module with their own role table; the -vocabulary (capability strings) is shared. +Under the IAM contract (see docs/tech-specs/iam-contract.md) the +gateway holds *no* policy state. Roles, capability sets, and +workspace-scope rules all live in the IAM regime (iam-svc for OSS). +This module is the thin surface the gateway uses to ask the regime +for a decision: -Role model ----------- -A role has two dimensions: +- ``PUBLIC`` / ``AUTHENTICATED`` sentinels for endpoints that don't + go through capability-based authorisation. +- :func:`enforce` — authenticate-only, then ask the regime. +- :func:`enforce_workspace` — default-fill the workspace from the + caller's bound workspace and ask the regime, with the workspace + treated as the resource address. - 1. **capability set** — which operations the role grants. - 2. **workspace scope** — which workspaces the role is active in. - -The authorisation question is: *given the caller's roles, a required -capability, and a target workspace, does any role grant the -capability AND apply to the target workspace?* - -Workspace scope values recognised here: - - - ``"assigned"`` — the role applies only to the caller's own - assigned workspace (stored on their user record). - - ``"*"`` — the role applies to every workspace. - -Enterprise editions can add richer scopes (explicit permitted-set, -patterns, etc.) without changing the wire protocol. - -Sentinels ---------- -- ``PUBLIC`` — endpoint requires no authentication. -- ``AUTHENTICATED`` — endpoint requires a valid identity, no - specific capability. +The capability strings themselves are an open vocabulary — see +docs/tech-specs/capabilities.md. The gateway does not validate them +beyond passing them through; an unknown capability simply produces a +deny verdict from the regime. """ from aiohttp import web @@ -40,125 +27,6 @@ PUBLIC = "__public__" AUTHENTICATED = "__authenticated__" -# Capability vocabulary. Mirrors the "Capability list" tables in -# capabilities.md. Kept as a set so the gateway can fail-closed on -# an endpoint that declares an unknown capability. -KNOWN_CAPABILITIES = { - # Data plane - "agent", - "graph:read", "graph:write", - "documents:read", "documents:write", - "rows:read", "rows:write", - "llm", - "embeddings", - "mcp", - # Control plane - "config:read", "config:write", - "flows:read", "flows:write", - "users:read", "users:write", "users:admin", - "keys:self", "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", - "collections:read", "collections:write", - "knowledge:read", "knowledge:write", -} - - -# Capability sets used below. -_READER_CAPS = { - "agent", - "graph:read", - "documents:read", - "rows:read", - "llm", - "embeddings", - "mcp", - "config:read", - "flows:read", - "collections:read", - "knowledge:read", - "keys:self", -} - -_WRITER_CAPS = _READER_CAPS | { - "graph:write", - "documents:write", - "rows:write", - "collections:write", - "knowledge:write", -} - -_ADMIN_CAPS = _WRITER_CAPS | { - "config:write", - "flows:write", - "users:read", "users:write", "users:admin", - "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", -} - - -# Role definitions. Each role has a capability set and a workspace -# scope. Enterprise overrides this mapping. -ROLE_DEFINITIONS = { - "reader": { - "capabilities": _READER_CAPS, - "workspace_scope": "assigned", - }, - "writer": { - "capabilities": _WRITER_CAPS, - "workspace_scope": "assigned", - }, - "admin": { - "capabilities": _ADMIN_CAPS, - "workspace_scope": "*", - }, -} - - -def _scope_permits(role_name, target_workspace, assigned_workspace): - """Does the given role apply to ``target_workspace``?""" - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - return False - scope = role["workspace_scope"] - if scope == "*": - return True - if scope == "assigned": - return target_workspace == assigned_workspace - # Future scope types (lists, patterns) extend here. - return False - - -def check(identity, capability, target_workspace=None): - """Is ``identity`` permitted to invoke ``capability`` on - ``target_workspace``? - - Passes iff some role held by the caller both (a) grants - ``capability`` and (b) is active in ``target_workspace``. - - ``target_workspace`` defaults to the caller's assigned workspace, - which makes this function usable for system-level operations and - for authenticated endpoints that don't take a workspace argument - (the call collapses to "do any of my roles grant this cap?").""" - if capability not in KNOWN_CAPABILITIES: - return False - - target = target_workspace or identity.workspace - - for role_name in identity.roles: - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - continue - if capability not in role["capabilities"]: - continue - if _scope_permits(role_name, target, identity.workspace): - return True - return False - - def access_denied(): return web.HTTPForbidden( text='{"error":"access denied"}', @@ -174,21 +42,19 @@ def auth_failure(): async def enforce(request, auth, capability): - """Authenticate + capability-check for endpoints that carry no - workspace dimension on the request (metrics, i18n, etc.). + """Authenticate the caller and (for non-sentinel capabilities) + ask the IAM regime whether they may invoke ``capability``. - For endpoints that carry a workspace field on the body, call - :func:`enforce_workspace` *after* parsing the body to validate - the workspace and re-check the capability in that scope. Most - endpoints do both. + The resource is system-level (``{}``) and parameters are empty — + use :func:`enforce_workspace` for workspace-scoped endpoints, or + drive authorisation through the operation registry for richer + cases. - - ``PUBLIC``: no authentication, returns ``None``. - - ``AUTHENTICATED``: any valid identity. - - capability string: identity must have it, checked against the - caller's assigned workspace (adequate for endpoints whose - capability is system-level, e.g. ``metrics:read``, or where - the real workspace-aware check happens in - :func:`enforce_workspace` after body parsing).""" + - ``PUBLIC``: returns ``None`` — no authentication. + - ``AUTHENTICATED``: returns the ``Identity`` — no authorisation. + - capability string: returns the ``Identity`` if the regime + allows; raises ``HTTPForbidden`` otherwise. + """ if capability == PUBLIC: return None @@ -197,42 +63,38 @@ async def enforce(request, auth, capability): if capability == AUTHENTICATED: return identity - if not check(identity, capability): - raise access_denied() - + await auth.authorise(identity, capability, {}, {}) return identity -def enforce_workspace(data, identity, capability=None): - """Resolve + validate the workspace on a request body. +async def enforce_workspace(data, identity, auth, capability=None): + """Default-fill the workspace on a request body and (optionally) + authorise the caller for ``capability`` against that workspace. - Target workspace = ``data["workspace"]`` if supplied, else the - caller's assigned workspace. - - At least one of the caller's roles must (a) be active in the - target workspace and, if ``capability`` is given, (b) grant - ``capability``. Otherwise 403. + caller's bound workspace. - On success, ``data["workspace"]`` is overwritten with the - resolved value — callers can rely on the outgoing message - having the gateway's chosen workspace rather than any - caller-supplied value. + resolved value so downstream code sees a single canonical + address. + - When ``capability`` is given, the regime is asked whether the + caller may invoke ``capability`` on ``{workspace: target}``. + Raises ``HTTPForbidden`` on a deny. - For ``capability=None`` the workspace scope alone is checked — - useful when the body has a workspace but the endpoint already - passed its capability check (e.g. via :func:`enforce`).""" + For ``capability=None`` no authorisation call is made — the + caller has presumably already authorised via :func:`enforce` + (handy for endpoints that authorise once then resolve workspace + on the body before forwarding). + """ if not isinstance(data, dict): return data requested = data.get("workspace", "") target = requested or identity.workspace + data["workspace"] = target - for role_name in identity.roles: - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - continue - if capability is not None and capability not in role["capabilities"]: - continue - if _scope_permits(role_name, target, identity.workspace): - data["workspace"] = target - return data + if capability is not None: + await auth.authorise( + identity, capability, {"workspace": target}, {}, + ) - raise access_denied() + return data diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index 013cd1ea..37a72f11 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -121,20 +121,45 @@ class Mux: }) return - # Workspace resolution. Role workspace scope determines - # which target workspaces are permitted. The resolved - # value is written to both the envelope and the inner - # request payload so clients don't have to repeat it - # per-message (same convenience HTTP callers get via - # enforce_workspace). + # Per-service capability gating. Resolved through the + # operation registry so the WS path matches what HTTP + # callers see — same authority, same caps. Service + # kinds that aren't registered are refused. + from ..registry import lookup as _registry_lookup from ..capabilities import enforce_workspace from aiohttp import web as _web + service = data.get("service", "") + op = _registry_lookup(f"flow-service:{service}") + if op is None: + await self.ws.send_json({ + "id": request_id, + "error": { + "message": "unknown service", + "type": "unknown-service", + }, + "complete": True, + }) + return + + # Workspace + flow form the resource address for a + # flow-level service call. Resolve workspace first + # (default-fill from the caller's bound workspace), + # then ask the regime to authorise the service-level + # capability against that {workspace, flow} resource. try: - enforce_workspace(data, self.identity) + await enforce_workspace(data, self.identity, self.auth) inner = data.get("request") if isinstance(inner, dict): - enforce_workspace(inner, self.identity) + await enforce_workspace(inner, self.identity, self.auth) + + resource = { + "workspace": data.get("workspace", ""), + "flow": data.get("flow", ""), + } + await self.auth.authorise( + self.identity, op.capability, resource, {}, + ) except _web.HTTPForbidden: await self.ws.send_json({ "id": request_id, @@ -145,6 +170,16 @@ class Mux: "complete": True, }) return + except _web.HTTPUnauthorized: + await self.ws.send_json({ + "id": request_id, + "error": { + "message": "auth failure", + "type": "auth-required", + }, + "complete": True, + }) + return workspace = data["workspace"] diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py b/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py index 6037fc4b..0b476b7b 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py @@ -97,7 +97,7 @@ class AuthEndpoints: ) req = { "operation": "change-password", - "user_id": identity.user_id, + "user_id": identity.handle, "password": body.get("current_password", ""), "new_password": body.get("new_password", ""), } diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py index ee9c0447..920b02ca 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py @@ -36,7 +36,7 @@ class ConstantEndpoint: data = await request.json() if identity is not None: - enforce_workspace(data, identity) + await enforce_workspace(data, identity, self.auth) async def responder(x, fin): pass diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py new file mode 100644 index 00000000..70fa33f7 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py @@ -0,0 +1,106 @@ +""" +Registry-driven /api/v1/iam endpoint. + +The gateway no longer gates IAM management with a single coarse +``users:admin`` capability. Instead, each operation declares its +own capability + resource shape in the registry (``registry.py``); +this endpoint reads the body's ``operation`` field, looks up the +declaration, and asks the IAM regime to authorise the call. + +Operations not in the registry produce a 400 ``unknown operation``. +This is the gateway's primary mechanism for fail-closed gating of +the IAM surface — the registry is the source of truth. +""" + +import logging + +from aiohttp import web + +from .. capabilities import ( + PUBLIC, AUTHENTICATED, auth_failure, +) +from .. registry import lookup, RequestContext + +logger = logging.getLogger("iam-endpoint") +logger.setLevel(logging.INFO) + + +class IamEndpoint: + """POST /api/v1/iam — generic forwarder gated by the operation + registry. The IAM dispatcher (``iam_dispatcher``) forwards the + body verbatim to iam-svc once authorisation succeeds.""" + + def __init__(self, endpoint_path, auth, dispatcher): + self.path = endpoint_path + self.auth = auth + self.dispatcher = dispatcher + + async def start(self): + pass + + def add_routes(self, app): + app.add_routes([web.post(self.path, self.handle)]) + + async def handle(self, request): + try: + body = await request.json() + except Exception: + return web.json_response( + {"error": "invalid json"}, status=400, + ) + if not isinstance(body, dict): + return web.json_response( + {"error": "body must be an object"}, status=400, + ) + + op_name = body.get("operation", "") + op = lookup(op_name) + if op is None: + return web.json_response( + {"error": "unknown operation"}, status=400, + ) + + # Authentication: required for everything except PUBLIC. + identity = None + if op.capability != PUBLIC: + try: + identity = await self.auth.authenticate(request) + except web.HTTPException: + raise + + # Authorisation: capability sentinels short-circuit the + # regime call; capability strings go through authorise(). + if op.capability not in (PUBLIC, AUTHENTICATED): + ctx = RequestContext( + body=body, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + except Exception as e: + logger.warning( + f"extractor failed for {op_name!r}: " + f"{type(e).__name__}: {e}" + ) + return web.json_response( + {"error": "bad request"}, status=400, + ) + + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + + async def responder(x, fin): + pass + + try: + resp = await self.dispatcher.process(body, responder) + except web.HTTPException: + raise + except Exception as e: + logger.error(f"Exception: {e}", exc_info=True) + return web.json_response({"error": str(e)}) + + return web.json_response(resp) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index 69b11e07..ed5ef4b5 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -9,90 +9,44 @@ from . socket import SocketEndpoint from . metrics import MetricsEndpoint from . i18n import I18nPackEndpoint from . auth_endpoints import AuthEndpoints +from . iam_endpoint import IamEndpoint +from . registry_endpoint import RegistryRoutedVariableEndpoint -from .. capabilities import PUBLIC, AUTHENTICATED +from .. capabilities import PUBLIC, AUTHENTICATED, auth_failure +from .. registry import lookup as _registry_lookup, RequestContext from .. dispatch.manager import DispatcherManager -# Capability required for each kind on the /api/v1/{kind} generic -# endpoint (global services). Coarse gating — the IAM bundle split -# of "read vs write" per admin subsystem is not applied here because -# this endpoint forwards an opaque operation in the body. Writes -# are the upper bound on what the endpoint can do, so we gate on -# the write/admin capability. -GLOBAL_KIND_CAPABILITY = { - "config": "config:write", - "flow": "flows:write", - "librarian": "documents:write", - "knowledge": "knowledge:write", - "collection-management": "collections:write", - # IAM endpoints land on /api/v1/iam and require the admin bundle. - # Login / bootstrap / change-password are served by - # AuthEndpoints, which handle their own gating (PUBLIC / - # AUTHENTICATED). - "iam": "users:admin", -} +# /api/v1/{kind} (config / flow / librarian / knowledge / +# collection-management), /api/v1/iam, and /api/v1/flow/{flow}/... +# routes are all gated per-operation by the registry, not by a +# per-kind capability map. Login / bootstrap / change-password are +# served by AuthEndpoints with their own PUBLIC / AUTHENTICATED +# sentinels. -# Capability required for each kind on the -# /api/v1/flow/{flow}/service/{kind} endpoint (per-flow data-plane). -FLOW_KIND_CAPABILITY = { - "agent": "agent", - "text-completion": "llm", - "prompt": "llm", - "mcp-tool": "mcp", - "graph-rag": "graph:read", - "document-rag": "documents:read", - "embeddings": "embeddings", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "triples": "graph:read", - "rows": "rows:read", - "nlp-query": "rows:read", - "structured-query": "rows:read", - "structured-diag": "rows:read", - "row-embeddings": "rows:read", - "sparql": "graph:read", -} - - -# Capability for the streaming flow import/export endpoints, -# keyed by the "kind" URL segment. -FLOW_IMPORT_CAPABILITY = { - "triples": "graph:write", - "graph-embeddings": "graph:write", - "document-embeddings": "documents:write", - "entity-contexts": "documents:write", - "rows": "rows:write", -} - -FLOW_EXPORT_CAPABILITY = { - "triples": "graph:read", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "entity-contexts": "documents:read", -} - - -from .. capabilities import enforce, enforce_workspace import logging as _mgr_logging _mgr_logger = _mgr_logging.getLogger("endpoint") class _RoutedVariableEndpoint: - """HTTP endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the two - generic dispatch paths (``/api/v1/{kind}`` and - ``/api/v1/flow/{flow}/service/{kind}``). Self-contained rather - than subclassing ``VariableEndpoint`` to avoid mutating shared - state across concurrent requests.""" + """HTTP endpoint that gates per request via the operation + registry. The URL's ``kind`` parameter combined with a fixed + ``registry_prefix`` yields the registry key — e.g. prefix + ``flow-service`` and kind ``agent`` looks up + ``flow-service:agent``. - def __init__(self, endpoint_path, auth, dispatcher, capability_map): + Used for ``/api/v1/flow/{flow}/service/{kind}`` (per-flow + data-plane services). ``/api/v1/{kind}`` (workspace-level + global services) goes through ``RegistryRoutedVariableEndpoint`` + which discriminates on body operation as well as URL kind.""" + + def __init__(self, endpoint_path, auth, dispatcher, registry_prefix): self.path = endpoint_path self.auth = auth self.dispatcher = dispatcher - self._capability_map = capability_map + self._registry_prefix = registry_prefix async def start(self): pass @@ -102,18 +56,26 @@ class _RoutedVariableEndpoint: async def handle(self, request): kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: + op = _registry_lookup(f"{self._registry_prefix}:{kind}") + if op is None: return web.json_response( {"error": "unknown kind"}, status=404, ) - identity = await enforce(request, self.auth, cap) + identity = await self.auth.authenticate(request) try: data = await request.json() - if identity is not None: - enforce_workspace(data, identity) + ctx = RequestContext( + body=data if isinstance(data, dict) else {}, + match_info=dict(request.match_info), + identity=identity, + ) + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) async def responder(x, fin): pass @@ -131,15 +93,15 @@ class _RoutedVariableEndpoint: class _RoutedSocketEndpoint: - """WebSocket endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the flow - import/export streaming endpoints.""" + """WebSocket endpoint gated per request via the operation + registry. Like ``_RoutedVariableEndpoint`` but for the + streaming flow import / export socket paths.""" - def __init__(self, endpoint_path, auth, dispatcher, capability_map): + def __init__(self, endpoint_path, auth, dispatcher, registry_prefix): self.path = endpoint_path self.auth = auth self.dispatcher = dispatcher - self._capability_map = capability_map + self._registry_prefix = registry_prefix async def start(self): pass @@ -148,11 +110,9 @@ class _RoutedSocketEndpoint: app.add_routes([web.get(self.path, self.handle)]) async def handle(self, request): - from .. capabilities import check, auth_failure, access_denied - kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: + op = _registry_lookup(f"{self._registry_prefix}:{kind}") + if op is None: return web.json_response( {"error": "unknown kind"}, status=404, ) @@ -168,8 +128,20 @@ class _RoutedSocketEndpoint: ) except web.HTTPException as e: return e - if not check(identity, cap): - return access_denied() + + ctx = RequestContext( + body={}, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + except web.HTTPException as e: + return e # Delegate the websocket handling to a standalone SocketEndpoint # with the resolved capability, bypassing the per-request mutation @@ -178,7 +150,7 @@ class _RoutedSocketEndpoint: endpoint_path=self.path, auth=self.auth, dispatcher=self.dispatcher, - capability=cap, + capability=op.capability, ) return await ws_ep.handle(request) @@ -203,6 +175,18 @@ class EndpointManager: auth=auth, ), + # /api/v1/iam — registry-driven IAM management. Per + # operation gating happens inside IamEndpoint via the + # operation registry; the dispatcher forwards verbatim + # to iam-svc once authorisation has succeeded. Listed + # before the generic /api/v1/{kind} route so it wins + # the match for "iam". + IamEndpoint( + endpoint_path="/api/v1/iam", + auth=auth, + dispatcher=dispatcher_manager.dispatch_auth_iam(), + ), + I18nPackEndpoint( endpoint_path="/api/v1/i18n/packs/{lang}", auth=auth, @@ -215,12 +199,16 @@ class EndpointManager: capability="metrics:read", ), - # Global services: capability chosen per-kind. - _RoutedVariableEndpoint( + # Global services: registry-driven per-operation gating. + # Each kind+op combination has a registry entry that + # declares its capability and resource shape. Listed + # after the IAM and auth-surface routes; aiohttp's + # path matcher prefers the more-specific path so this + # variable route doesn't shadow them. + RegistryRoutedVariableEndpoint( endpoint_path="/api/v1/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_global_service(), - capability_map=GLOBAL_KIND_CAPABILITY, ), # /api/v1/socket: WebSocket handshake accepts @@ -236,26 +224,29 @@ class EndpointManager: in_band_auth=True, ), - # Per-flow request/response services — capability per kind. + # Per-flow request/response services — gated per + # ``flow-service:`` registry entry. _RoutedVariableEndpoint( endpoint_path="/api/v1/flow/{flow}/service/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_service(), - capability_map=FLOW_KIND_CAPABILITY, + registry_prefix="flow-service", ), - # Per-flow streaming import/export — capability per kind. + # Per-flow streaming import/export — gated per + # ``flow-import:`` / ``flow-export:`` registry + # entry. _RoutedSocketEndpoint( endpoint_path="/api/v1/flow/{flow}/import/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_import(), - capability_map=FLOW_IMPORT_CAPABILITY, + registry_prefix="flow-import", ), _RoutedSocketEndpoint( endpoint_path="/api/v1/flow/{flow}/export/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_export(), - capability_map=FLOW_EXPORT_CAPABILITY, + registry_prefix="flow-export", ), StreamEndpoint( diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py new file mode 100644 index 00000000..296376fa --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py @@ -0,0 +1,123 @@ +""" +Registry-driven dispatch for ``/api/v1/{kind}`` global services. + +The body's ``operation`` field plus the URL's ``{kind}`` together +form the canonical operation name (``:``) that the +gateway looks up in ``registry.py``. The matched operation +declares its capability and resource shape; this endpoint asks the +IAM regime to authorise the call before forwarding the body +verbatim to the backend dispatcher. + +The dispatcher is the same ``dispatch_global_service()`` factory the +old coarse path used; only the gating layer has changed. + +Operations not present in the registry are rejected with 400 +``unknown operation`` — fail closed. +""" + +import logging + +from aiohttp import web + +from .. capabilities import ( + PUBLIC, AUTHENTICATED, auth_failure, +) +from .. registry import lookup, RequestContext + +logger = logging.getLogger("registry-endpoint") +logger.setLevel(logging.INFO) + + +class RegistryRoutedVariableEndpoint: + """POST /api/v1/{kind} — kind comes from the URL, operation comes + from the body, both are joined as the registry key.""" + + def __init__(self, endpoint_path, auth, dispatcher): + self.path = endpoint_path + self.auth = auth + self.dispatcher = dispatcher + + async def start(self): + pass + + def add_routes(self, app): + app.add_routes([web.post(self.path, self.handle)]) + + async def handle(self, request): + kind = request.match_info.get("kind", "") + if not kind: + return web.json_response( + {"error": "missing kind"}, status=404, + ) + + try: + body = await request.json() + except Exception: + return web.json_response( + {"error": "invalid json"}, status=400, + ) + if not isinstance(body, dict): + return web.json_response( + {"error": "body must be an object"}, status=400, + ) + + op_name = body.get("operation", "") + if not op_name: + return web.json_response( + {"error": "missing operation"}, status=400, + ) + + registry_key = f"{kind}:{op_name}" + op = lookup(registry_key) + if op is None: + return web.json_response( + {"error": "unknown operation"}, status=400, + ) + + identity = None + if op.capability != PUBLIC: + identity = await self.auth.authenticate(request) + + if op.capability not in (PUBLIC, AUTHENTICATED): + ctx = RequestContext( + body=body, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + except Exception as e: + logger.warning( + f"extractor failed for {registry_key!r}: " + f"{type(e).__name__}: {e}" + ) + return web.json_response( + {"error": "bad request"}, status=400, + ) + + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + + # Default-fill workspace into the body so downstream + # dispatchers see the canonical resolved value. The + # extractor has already pulled the workspace out; + # mirror it back to the body for the verbatim forward. + if "workspace" in resource: + body["workspace"] = resource["workspace"] + + async def responder(x, fin): + pass + + try: + resp = await self.dispatcher.process( + body, responder, request.match_info, + ) + except web.HTTPException: + raise + except Exception as e: + logger.error(f"Exception: {e}", exc_info=True) + return web.json_response({"error": str(e)}) + + return web.json_response(resp) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py index 08629ea2..f53ad73b 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py @@ -5,7 +5,7 @@ import logging from .. running import Running from .. capabilities import ( - PUBLIC, AUTHENTICATED, check, auth_failure, access_denied, + PUBLIC, AUTHENTICATED, auth_failure, ) logger = logging.getLogger("socket") @@ -97,8 +97,12 @@ class SocketEndpoint: except web.HTTPException as e: return e if self.capability != AUTHENTICATED: - if not check(identity, self.capability): - return access_denied() + try: + await self.auth.authorise( + identity, self.capability, {}, {}, + ) + except web.HTTPException as e: + return e # 50MB max message size ws = web.WebSocketResponse(max_msg_size=52428800) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py index 5e0d9d21..6a336f42 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py @@ -36,7 +36,7 @@ class VariableEndpoint: data = await request.json() if identity is not None: - enforce_workspace(data, identity) + await enforce_workspace(data, identity, self.auth) async def responder(x, fin): pass diff --git a/trustgraph-flow/trustgraph/gateway/registry.py b/trustgraph-flow/trustgraph/gateway/registry.py new file mode 100644 index 00000000..32a517a9 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/registry.py @@ -0,0 +1,515 @@ +""" +Gateway operation registry. + +Single declarative table mapping each operation the gateway +recognises to: + +- The capability the IAM regime is asked to authorise against. +- The resource level (system / workspace / flow) — determines the + shape of the resource identifier handed to ``authorise``. +- Extractors that build the resource and parameters from the + request context. + +This is a gateway-internal concept. It is not part of the IAM +contract — the contract specifies what arguments ``authorise`` +receives; the registry is how the gateway populates them. + +See docs/tech-specs/iam-contract.md for the contract and +docs/tech-specs/iam.md for the request anatomy. +""" + +from dataclasses import dataclass, field +from typing import Any, Callable + + +# Sentinels for operations that don't go through capability-based +# authorisation. Mirror the values used in capabilities.py so the +# gateway endpoint layer can recognise them uniformly. +PUBLIC = "__public__" +AUTHENTICATED = "__authenticated__" + + +class ResourceLevel: + """Where the operation's resource lives. + + ``SYSTEM`` — operation acts on a deployment-level resource + (the user registry, the workspace registry, + the signing key). resource = {}. Workspace, + if relevant, is a parameter, not an address. + + ``WORKSPACE`` — operation acts on something within a workspace + (config, library, knowledge, collections, flow + lifecycle). resource = {workspace}. + + ``FLOW`` — operation acts on something within a flow + within a workspace (graph, agent, llm, etc.). + resource = {workspace, flow}. + """ + SYSTEM = "system" + WORKSPACE = "workspace" + FLOW = "flow" + + +@dataclass +class RequestContext: + """The bundle of inputs the registry's extractors operate on. + Assembled by the gateway from the incoming request after + authentication.""" + + # Parsed JSON body (HTTP) or inner request payload (WebSocket). + body: dict = field(default_factory=dict) + + # URL path components (HTTP) or WebSocket envelope routing + # fields (id, service, workspace, flow). + match_info: dict = field(default_factory=dict) + + # Authenticated identity for default-fill-in. Always present + # by the time extractors run, except for PUBLIC operations + # where it is None. + identity: Any = None + + +@dataclass +class Operation: + """Declared operation the gateway can dispatch + authorise.""" + + # Canonical operation name (used for registry lookup, audit, + # debug logs). Mirrors the operation strings in the IAM + # service and other backends where applicable. + name: str + + # Capability required to invoke this operation. Either a + # string from the capability vocabulary in capabilities.md, or + # the PUBLIC / AUTHENTICATED sentinel for operations that + # don't go through capability-based authorisation. + capability: str + + # Where the operation's resource lives. Determines the + # shape of the resource argument passed to authorise. + resource_level: str + + # Build the resource identifier from the request context. + # Returns a dict with the appropriate components for the + # resource level: {} for SYSTEM, {workspace} for WORKSPACE, + # {workspace, flow} for FLOW. Default-fill-in of workspace + # from identity.workspace happens here when applicable. + extract_resource: Callable[[RequestContext], dict] + + # Build the parameters dict — decision-relevant fields the + # operation supplied that are not part of the resource + # address. E.g. workspace association on a system-level + # user-registry operation. + extract_parameters: Callable[[RequestContext], dict] + + +# --------------------------------------------------------------------------- +# Registry storage. +# --------------------------------------------------------------------------- + + +_REGISTRY: dict[str, Operation] = {} + + +def register(op: Operation) -> None: + if op.name in _REGISTRY: + raise RuntimeError( + f"operation {op.name!r} already registered" + ) + _REGISTRY[op.name] = op + + +def lookup(name: str) -> Operation | None: + return _REGISTRY.get(name) + + +def all_operations() -> list[Operation]: + return list(_REGISTRY.values()) + + +# --------------------------------------------------------------------------- +# Common extractor helpers. +# --------------------------------------------------------------------------- + + +def _empty_resource(_ctx: RequestContext) -> dict: + """System-level resource: empty dict.""" + return {} + + +def _workspace_from_body(ctx: RequestContext) -> dict: + """Workspace-level resource sourced from the request body's + workspace field, defaulting to the caller's bound workspace.""" + ws = (ctx.body.get("workspace") if isinstance(ctx.body, dict) else "") + if not ws and ctx.identity is not None: + ws = ctx.identity.workspace + return {"workspace": ws} + + +def _flow_from_match_info(ctx: RequestContext) -> dict: + """Flow-level resource sourced from URL path components or WS + envelope fields. Both ``workspace`` and ``flow`` are required; + no default-fill-in (the address is the operation's identity).""" + return { + "workspace": ctx.match_info.get("workspace", ""), + "flow": ctx.match_info.get("flow", ""), + } + + +def _no_parameters(_ctx: RequestContext) -> dict: + return {} + + +def _body_as_parameters(ctx: RequestContext) -> dict: + """All body fields are parameters — used when the operation's + body is small and uniformly decision-relevant (e.g. user- + registry ops where the body's user.workspace is what the + regime checks against the admin's scope).""" + return dict(ctx.body) if isinstance(ctx.body, dict) else {} + + +def _workspace_param_only(ctx: RequestContext) -> dict: + """Parameters dict carrying only the workspace association. + Used by system-level operations (e.g. user-registry ops) where + the workspace isn't part of the resource address but is the + field the regime uses to scope the admin's authority. + + Pulls the workspace from the inner ``user`` / ``workspace_record`` + body field if present (create-user, create-workspace), then from + the top-level body, then from the caller's bound workspace.""" + body = ctx.body if isinstance(ctx.body, dict) else {} + inner_user = body.get("user") if isinstance(body.get("user"), dict) else {} + inner_ws = ( + body.get("workspace_record") + if isinstance(body.get("workspace_record"), dict) else {} + ) + ws = ( + inner_user.get("workspace") + or inner_ws.get("id") + or body.get("workspace") + ) + if not ws and ctx.identity is not None: + ws = ctx.identity.workspace + return {"workspace": ws or ""} + + +# --------------------------------------------------------------------------- +# Operation registrations. +# +# The gateway looks operations up by their canonical name (the same +# string the request body / WS envelope carries in its ``operation`` +# field where applicable). Auth-surface operations (login, bootstrap, +# change-password) are not listed here — they have their own routes +# in auth_endpoints.py and use PUBLIC / AUTHENTICATED sentinels +# directly. Pure gateway↔IAM internal operations (resolve-api-key, +# authorise, authorise-many, get-signing-key-public) are likewise +# excluded; they are never invoked over the public API. +# --------------------------------------------------------------------------- + + +# IAM management operations. All routed through /api/v1/iam, body +# carries ``operation`` plus operation-specific fields. + +# User registry: SYSTEM-level resource (users are global, identified +# by handle). The admin's authority is scoped per workspace via the +# parameters {workspace} field — that's what the regime checks +# against the admin's role workspace_scope. +register(Operation( + name="create-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="list-users", + capability="users:read", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="get-user", + capability="users:read", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="update-user", + capability="users:write", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="disable-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="enable-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="delete-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="reset-password", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) + + +# API keys: workspace-level resource — keys live within a workspace. +register(Operation( + name="create-api-key", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) +register(Operation( + name="list-api-keys", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) +register(Operation( + name="revoke-api-key", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) + + +# Workspace registry: SYSTEM-level resource (workspaces are the +# top-level addressable unit). No parameters — the workspace being +# acted on is identified by the body, not used as a scope cue. +register(Operation( + name="create-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="list-workspaces", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="get-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="update-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="disable-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# Signing key: SYSTEM-level operational op. +register(Operation( + name="rotate-signing-key", + capability="iam:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# --------------------------------------------------------------------------- +# Auth-surface entries. +# +# Listed here so the registry is the one place the gateway looks for +# operation→capability mappings — including the sentinels for paths +# that don't go through capability-based authorisation. The actual +# routing is in auth_endpoints.py; these entries let the registry- +# driven dispatcher recognise the operation if it sees it on a +# generic path. +# --------------------------------------------------------------------------- + +register(Operation( + name="login", + capability=PUBLIC, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="bootstrap", + capability=PUBLIC, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="change-password", + capability=AUTHENTICATED, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# --------------------------------------------------------------------------- +# Generic kind/operation entries. +# +# Names are ``:`` so the registry key is unique +# across dispatchers. All entries below are workspace-level +# resources (workspace defaulted from the caller's bound workspace +# if absent). Read/write distinction maps to the existing +# ``:read`` / ``:write`` capability vocabulary +# defined in capabilities.md. +# --------------------------------------------------------------------------- + + +def _register_kind_op(kind: str, op: str, capability: str) -> None: + """Helper: register a workspace-level kind:op with the standard + extractors (workspace from body, no extra parameters).""" + register(Operation( + name=f"{kind}:{op}", + capability=capability, + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, + )) + + +# config: KV-style workspace config service. +for _op in ("get", "list", "getvalues", "getvalues-all-ws", "config"): + _register_kind_op("config", _op, "config:read") +for _op in ("put", "delete"): + _register_kind_op("config", _op, "config:write") + + +# flow: flow-blueprint and flow-lifecycle service. +for _op in ("list-blueprints", "get-blueprint", "list-flows", "get-flow"): + _register_kind_op("flow", _op, "flows:read") +for _op in ("put-blueprint", "delete-blueprint", "start-flow", "stop-flow"): + _register_kind_op("flow", _op, "flows:write") + + +# librarian: document storage and processing service. +for _op in ( + "get-document-metadata", "get-document-content", + "stream-document", "list-documents", "list-processing", + "get-upload-status", "list-uploads", +): + _register_kind_op("librarian", _op, "documents:read") +for _op in ( + "add-document", "remove-document", "update-document", + "add-processing", "remove-processing", + "begin-upload", "upload-chunk", "complete-upload", "abort-upload", +): + _register_kind_op("librarian", _op, "documents:write") + + +# knowledge: knowledge-graph core service. +for _op in ("get-kg-core", "list-kg-cores"): + _register_kind_op("knowledge", _op, "knowledge:read") +for _op in ("put-kg-core", "delete-kg-core", + "load-kg-core", "unload-kg-core"): + _register_kind_op("knowledge", _op, "knowledge:write") + + +# collection-management: workspace collection lifecycle. +_register_kind_op("collection-management", "list-collections", "collections:read") +for _op in ("update-collection", "delete-collection"): + _register_kind_op("collection-management", _op, "collections:write") + + +# --------------------------------------------------------------------------- +# Per-flow data-plane services. +# +# /api/v1/flow/{flow}/service/{kind} and the streaming +# /api/v1/flow/{flow}/{import,export}/{kind} paths. No body-level +# ``operation`` discriminator — the URL kind is the operation +# identity. Resource is FLOW level (workspace + flow). +# +# Names: ``flow-service:``, ``flow-import:``, +# ``flow-export:``. +# --------------------------------------------------------------------------- + + +def _register_flow_kind(prefix: str, kind: str, capability: str) -> None: + register(Operation( + name=f"{prefix}:{kind}", + capability=capability, + resource_level=ResourceLevel.FLOW, + extract_resource=_flow_from_match_info, + extract_parameters=_no_parameters, + )) + + +# Request/response services on /api/v1/flow/{flow}/service/{kind}. +_FLOW_SERVICES = { + "agent": "agent", + "text-completion": "llm", + "prompt": "llm", + "mcp-tool": "mcp", + "graph-rag": "graph:read", + "document-rag": "documents:read", + "embeddings": "embeddings", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "triples": "graph:read", + "rows": "rows:read", + "nlp-query": "rows:read", + "structured-query": "rows:read", + "structured-diag": "rows:read", + "row-embeddings": "rows:read", + "sparql": "graph:read", +} +for _kind, _cap in _FLOW_SERVICES.items(): + _register_flow_kind("flow-service", _kind, _cap) + + +# Streaming import socket endpoints. +_FLOW_IMPORTS = { + "triples": "graph:write", + "graph-embeddings": "graph:write", + "document-embeddings": "documents:write", + "entity-contexts": "documents:write", + "rows": "rows:write", +} +for _kind, _cap in _FLOW_IMPORTS.items(): + _register_flow_kind("flow-import", _kind, _cap) + + +# Streaming export socket endpoints. +_FLOW_EXPORTS = { + "triples": "graph:read", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "entity-contexts": "documents:read", +} +for _kind, _cap in _FLOW_EXPORTS.items(): + _register_flow_kind("flow-export", _kind, _cap) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 6e7c7aa5..44c7df23 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -40,6 +40,78 @@ API_KEY_RANDOM_BYTES = 24 JWT_ISSUER = "trustgraph-iam" JWT_TTL_SECONDS = 3600 +# Default authorisation cache TTL the regime tells the gateway to +# observe. 60s is the OSS-spec maximum revocation latency: a role +# change, workspace disable, or key revoke takes effect within at +# most this much time. +AUTHZ_CACHE_TTL_SECONDS = 60 + + +# OSS regime role table. Lives here, not in the gateway — the +# gateway is regime-agnostic and must not encode policy. +# +# Each role has a capability set and a workspace scope. The +# evaluator (handle_authorise below) checks (a) that some role +# held by the caller grants the requested capability, and (b) +# that role's workspace scope permits the target workspace. + +_READER_CAPS = { + "agent", + "graph:read", + "documents:read", + "rows:read", + "llm", + "embeddings", + "mcp", + "config:read", + "flows:read", + "collections:read", + "knowledge:read", + "keys:self", +} + +_WRITER_CAPS = _READER_CAPS | { + "graph:write", + "documents:write", + "rows:write", + "collections:write", + "knowledge:write", +} + +_ADMIN_CAPS = _WRITER_CAPS | { + "config:write", + "flows:write", + "users:read", "users:write", "users:admin", + "keys:admin", + "workspaces:admin", + "iam:admin", + "metrics:read", +} + +ROLE_DEFINITIONS = { + "reader": { + "capabilities": _READER_CAPS, + "workspace_scope": "assigned", + }, + "writer": { + "capabilities": _WRITER_CAPS, + "workspace_scope": "assigned", + }, + "admin": { + "capabilities": _ADMIN_CAPS, + "workspace_scope": "*", + }, +} + + +def _scope_permits(role_scope, target_workspace, assigned_workspace): + """Does the given role apply to ``target_workspace``?""" + if role_scope == "*": + return True + if role_scope == "assigned": + return target_workspace == assigned_workspace + return False + def _now_iso(): return datetime.datetime.now(datetime.timezone.utc).isoformat() @@ -250,6 +322,10 @@ class IamService: return await self.handle_disable_workspace(v) if op == "rotate-signing-key": return await self.handle_rotate_signing_key(v) + if op == "authorise": + return await self.handle_authorise(v) + if op == "authorise-many": + return await self.handle_authorise_many(v) return _err( "invalid-argument", @@ -478,7 +554,7 @@ class IamService: ( id, ws, _username, _name, _email, password_hash, - roles, enabled, _mcp, _created, + _roles, enabled, _mcp, _created, ) = user_row if not enabled: @@ -496,11 +572,14 @@ class IamService: now_ts = int(_now_dt().timestamp()) exp_ts = now_ts + JWT_TTL_SECONDS + # Per the IAM contract the gateway never reads policy state + # from the credential — roles stay server-side, reachable + # only via authorise(). JWT carries identity + workspace + # binding only. claims = { "iss": JWT_ISSUER, "sub": id, "workspace": ws, - "roles": sorted(roles) if roles else [], "iat": now_ts, "exp": exp_ts, } @@ -1130,3 +1209,134 @@ class IamService: await self.table_store.delete_api_key(key_hash) return IamResponse() + + # ------------------------------------------------------------------ + # authorise / authorise-many + # + # The IAM contract (see docs/tech-specs/iam-contract.md) calls + # for the regime — not the gateway — to decide whether an + # identity may perform a capability on a resource given the + # operation's parameters. These two operations are the OSS + # regime's implementation of that contract. + # + # Inputs (on IamRequest): + # user_id — the identity handle (the gateway's + # opaque reference). For OSS this is the + # user record's id. + # capability — the capability string from the + # capabilities.md vocabulary. + # resource_json — JSON dict, the resource address + # ({} for system, {workspace} for + # workspace, {workspace, flow} for flow). + # parameters_json — JSON dict, decision-relevant operation + # parameters (e.g. workspace association + # on user-registry operations). + # authorise_checks — for authorise-many, a JSON list of + # {capability, resource, parameters}. + # + # Outputs (on IamResponse): + # decision_allow — single allow / deny verdict. + # decision_ttl_seconds — gateway cache TTL for this + # decision. + # decisions_json — for authorise-many, list of + # {allow, ttl} in request order. + # ------------------------------------------------------------------ + + def _decide(self, user_row, capability, resource, parameters): + """Single authorisation decision. Returns (allow, ttl).""" + + if user_row is None: + return False, AUTHZ_CACHE_TTL_SECONDS + + # user_row layout: + # 0:id 1:workspace 2:username 3:name 4:email 5:password_hash + # 6:roles 7:enabled 8:must_change_password 9:created + if not user_row[7]: # disabled + return False, AUTHZ_CACHE_TTL_SECONDS + + # Disabled workspace check (defense in depth — credentials + # bound to a disabled workspace shouldn't be able to act). + # Cheap; one row read. + # We do this only when a target workspace is involved, to + # avoid an extra read for system-level operations that + # bypass workspace altogether. + target_workspace = ( + (resource or {}).get("workspace") + or (parameters or {}).get("workspace") + ) + + roles = user_row[6] or set() + assigned_workspace = user_row[1] + + for role_name in roles: + defn = ROLE_DEFINITIONS.get(role_name) + if defn is None: + continue + if capability not in defn["capabilities"]: + continue + if target_workspace is None or _scope_permits( + defn["workspace_scope"], + target_workspace, + assigned_workspace, + ): + return True, AUTHZ_CACHE_TTL_SECONDS + + return False, AUTHZ_CACHE_TTL_SECONDS + + async def handle_authorise(self, v): + if not v.capability: + return _err("invalid-argument", "capability required") + if not v.user_id: + return _err("invalid-argument", "user_id (handle) required") + + try: + resource = json.loads(v.resource_json or "{}") + parameters = json.loads(v.parameters_json or "{}") + except json.JSONDecodeError as e: + return _err("invalid-argument", f"bad json: {e}") + + user_row = await self.table_store.get_user(v.user_id) + allow, ttl = self._decide( + user_row, v.capability, resource, parameters, + ) + return IamResponse( + decision_allow=allow, + decision_ttl_seconds=ttl, + ) + + async def handle_authorise_many(self, v): + if not v.user_id: + return _err("invalid-argument", "user_id (handle) required") + if not v.authorise_checks: + return _err("invalid-argument", "authorise_checks required") + + try: + checks = json.loads(v.authorise_checks) + except json.JSONDecodeError as e: + return _err("invalid-argument", f"bad json: {e}") + if not isinstance(checks, list): + return _err( + "invalid-argument", + "authorise_checks must be a JSON list", + ) + + # One user lookup for the whole batch. + user_row = await self.table_store.get_user(v.user_id) + + decisions = [] + for c in checks: + if not isinstance(c, dict): + decisions.append({ + "allow": False, + "ttl": AUTHZ_CACHE_TTL_SECONDS, + }) + continue + allow, ttl = self._decide( + user_row, + c.get("capability", ""), + c.get("resource") or {}, + c.get("parameters") or {}, + ) + decisions.append({"allow": allow, "ttl": ttl}) + + return IamResponse(decisions_json=json.dumps(decisions))