diff --git a/docs/tech-specs/capabilities.md b/docs/tech-specs/capabilities.md index 60f5acbf..7717cbc9 100644 --- a/docs/tech-specs/capabilities.md +++ b/docs/tech-specs/capabilities.md @@ -8,22 +8,41 @@ parent: "Tech Specs" ## Overview -Authorisation in TrustGraph is **capability-based**. Every gateway -endpoint maps to exactly one *capability*; a user's roles each grant -a set of capabilities; an authenticated request is permitted when -the required capability is a member of the union of the caller's -role capability sets. +Every gateway endpoint maps to exactly one *capability* — a string +from a closed vocabulary defined in this document. When the +gateway authorises a request, it hands the IAM regime four things: +the authenticated identity, the required capability, the +operation's resource (the structured identifier of what's being +operated on), and the operation's parameters. The IAM regime +decides allow or deny; see the [IAM contract](iam-contract.md) for +the full abstraction. -This document defines the capability vocabulary — the closed list -of capability strings that the gateway recognises — and the -open-source edition's role bundles. +A capability is a **permission**, not a structural classification. +`graph:read` says "the caller may read graphs"; it does not say +where graphs live or how they are addressed. The shape of a +request — whether workspace appears in the URL, the envelope, or +the body, and whether it is a resource address component or an +operation parameter — is determined by what the operation operates +on, not by what permission it requires. Permission and structure +are orthogonal; the contract takes both. -The capability mechanism is shared between open-source and potential -3rd party enterprise capability. The open-source edition ships a -fixed three-role bundle (`reader`, `writer`, `admin`). Enterprise -capability may define additional roles by composing their own -capability bundles from the same vocabulary; no protocol, gateway, -or backend-service change is required. +This document defines: + +- The **capability vocabulary** — the closed list of capability + strings the gateway uses as input to `authorise`. All IAM + regimes share this vocabulary; that's the only schema the + gateway and the IAM regime have to agree on. +- The **open-source role bundles** — the role-and-scope table the + OSS IAM regime uses to answer `authorise` calls. Other regimes + answer the same call differently; the bundles below are an + OSS-specific implementation detail, not a contract assertion. + +A regime may evaluate `authorise` using role bundles (OSS), IdP +group memberships, attribute-based policies, relationship tuples, +or any other mechanism. The gateway is unaware of which. The +capability strings — and the resource component vocabulary the +gateway populates alongside them — are the only thing both sides +have to agree on. ## Motivation @@ -113,19 +132,50 @@ granting `llm` expresses exactly that. An administrator granting `agent` should treat it as a grant of everything the agent composes at deployment time. -### Authorisation evaluation +### Authorisation evaluation (OSS regime) + +This section describes how the OSS IAM regime answers +`authorise(identity, capability, resource, parameters)`. Other +regimes answer the same contract differently; only the inputs (the +capability vocabulary, the resource components, the parameter +shape) are shared. For a request bearing a resolved set of roles -`R = {r1, r2, ...}` against an endpoint that requires capability -`c`: +`R = {r1, r2, ...}`, a required capability `c`, a resource, and +parameters: ``` -allow if c IN union(bundle(r) for r in R) +let target_workspace = + resource.workspace (workspace-/flow-level resources) + or parameters.workspace (system-level resources whose + parameters reference a workspace) + or unset (system-level operations with no + workspace context) + +allow if some role r in R has c in its capability bundle + and (target_workspace is unset + or r's workspace_scope permits target_workspace) ``` -No hierarchy, no precedence, no role-order sensitivity. A user +The OSS regime considers workspace from whichever role it plays in +the operation: + +- For workspace-level and flow-level resources, the workspace lives + in `resource.workspace` and that is what the role's scope is + checked against. +- For system-level resources whose operation parameters reference a + workspace (e.g. `create-user with workspace association W`), + workspace lives in `parameters.workspace` and that is what the + role's scope is checked against. The resource is system-level + (`resource = {}`) but the workspace constraint still bites. +- For system-level operations with no workspace context (e.g. + `bootstrap`, `rotate-signing-key`), the workspace-scope check + collapses — only capability-bundle membership matters. + +No hierarchy, no precedence, no role-order sensitivity. A user with a single role is the common case; a user with multiple roles -gets the union of their bundles. +is allowed if any role independently grants both the capability +and the relevant workspace scope. ### Enforcement boundary @@ -214,5 +264,10 @@ ships that feature. ## References +- [IAM Contract Specification](iam-contract.md) — the abstract + gateway↔IAM regime contract; capability strings are inputs to + `authorise`. - [Identity and Access Management Specification](iam.md) +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. - [Architecture Principles](architecture-principles.md) diff --git a/docs/tech-specs/data-ownership-model.md b/docs/tech-specs/data-ownership-model.md index ea94ec46..b112d195 100644 --- a/docs/tech-specs/data-ownership-model.md +++ b/docs/tech-specs/data-ownership-model.md @@ -22,8 +22,16 @@ are the boundaries around data, and who owns what? A workspace is the primary isolation boundary. It represents an organisation, team, or independent operating unit. All data belongs to -exactly one workspace. Cross-workspace access is never permitted through -the API. +exactly one workspace. + +Cross-workspace access through the API is gated by the IAM regime +(see [`iam-contract.md`](iam-contract.md)). In the OSS distribution, +the role table defined in [`capabilities.md`](capabilities.md) +permits cross-workspace operation only to the `admin` role; the +`reader` and `writer` roles are constrained to a single assigned +workspace per credential. Other regimes can model the relationship +between identity and workspace differently — the gateway makes no +assumption. A workspace owns: - Source documents @@ -279,9 +287,18 @@ A typical workflow: The current codebase uses a `user` field in message metadata and storage partition keys to identify the workspace. The `collection` field -identifies the collection within that workspace. The IAM spec describes -how the gateway maps authenticated credentials to a workspace identity -and sets these fields. +identifies the collection within that workspace. + +The gateway is the single point at which workspace gets stamped onto +outbound pub/sub messages. An incoming credential authenticates to a +workspace (the credential's binding, not a user-to-workspace lookup — +see [`iam-contract.md`](iam-contract.md) and the *Identity surface* +section of [`iam.md`](iam.md)); any caller-supplied workspace on the +request is reconciled against the authenticated identity by the IAM +regime; the resolved value is what the gateway writes into outgoing +messages and the storage layers' partition keys. Backend services +trust the workspace they receive — defense-in-depth happens at the +gateway, not at the bus. For details on how each storage backend implements this scoping, see: @@ -302,7 +319,10 @@ For details on how each storage backend implements this scoping, see: ## References -- [Identity and Access Management](iam.md) +- [IAM Contract](iam-contract.md) — gateway↔IAM regime abstraction. +- [Identity and Access Management](iam.md) — gateway-side framing. +- [Capability Vocabulary](capabilities.md) — capability strings and + the OSS role bundles that decide cross-workspace eligibility. - [Collection Management](collection-management.md) - [Entity-Centric Graph](entity-centric-graph.md) - [Neo4j User Collection Isolation](neo4j-user-collection-isolation.md) diff --git a/docs/tech-specs/flow-class-definition.md b/docs/tech-specs/flow-blueprint-definition.md similarity index 100% rename from docs/tech-specs/flow-class-definition.md rename to docs/tech-specs/flow-blueprint-definition.md diff --git a/docs/tech-specs/iam-contract.md b/docs/tech-specs/iam-contract.md new file mode 100644 index 00000000..3289add1 --- /dev/null +++ b/docs/tech-specs/iam-contract.md @@ -0,0 +1,366 @@ +--- +layout: default +title: "IAM Contract Technical Specification" +parent: "Tech Specs" +--- + +# IAM Contract Technical Specification + +## Overview + +The IAM contract is the abstraction between the API gateway and any +identity / access management regime that fronts it. The gateway +treats IAM as a black box behind two operations — *authenticate* and +*authorise* — plus a small surface of management operations. No +regime-specific concept (roles, scopes, groups, claims, policy +languages) is visible to the gateway, and no gateway-specific +concept (capability vocabulary, request anatomy) is visible to +backend services. + +The TrustGraph open-source distribution ships one IAM regime — a +role-based implementation defined in +[`iam-protocol.md`](iam-protocol.md) — that is one implementation of +this contract. Enterprise editions can replace it with a different +regime (OIDC / SSO, ABAC, ReBAC, external policy engine) without +changing the gateway, the wire protocol, or the backends. + +## Motivation + +Authorisation models vary by deployment. A small team might be +happy with three predefined roles; an enterprise might need group- +mapping from an upstream IdP, attribute-based policies, or +relationship-based access control. Hard-wiring any one of those +into the gateway forces every other regime to either compromise its +model or be re-implemented. + +A narrow contract — "authenticate this credential" and "may this +identity perform this operation on this resource" — captures what +the gateway actually needs to know without committing to a policy +shape. The IAM regime owns the policy decision; the gateway is a +generic enforcement point. + +## Operations + +### `authenticate` + +``` +authenticate(credential: bytes) → Identity | AuthFailure +``` + +Validates a credential the client presented. The gateway treats +the credential as opaque bytes — for the OSS regime today that's +either an API key plaintext or a JWT, but the gateway does not +parse them; the IAM regime decides. + +On success, returns an `Identity`. On any failure the IAM regime +returns the same opaque `AuthFailure` — never a description of which +condition failed. This is the spec's masked-error rule: an +attacker probing the endpoint cannot distinguish "no such key", +"expired", "wrong signature", "revoked", "user disabled", etc. + +### `authorise` + +``` +authorise(identity: Identity, + capability: str, + resource: Resource, + parameters: dict) + → Decision +``` + +Asks whether the identity is permitted to perform the named +capability on the named resource, given the operation's +parameters. Returns `allow` or `deny`. `identity` is whatever +`authenticate` returned for this caller; the gateway never +decomposes it. + +The four arguments separate concerns: + +- **`identity`** — who is asking. +- **`capability`** — what permission they are exercising (e.g. + `users:write`, `graph:read`). Permission, not structure. +- **`resource`** — what is being operated on, as a structured + identifier. See *The Resource model* below. +- **`parameters`** — operation-specific data that the regime may + need to consider beyond the resource identifier. Used when a + decision depends on attributes the request supplies — e.g. an + admin scoped to one workspace creating a user *with workspace + association W*: the resource is the system-level user registry, + and W is a parameter the regime checks against the admin's + scope. + +Different regimes use the four arguments differently — the OSS +regime checks role bundles against the capability and the role's +workspace scope against parameters; an SSO regime might consult an +upstream IdP's group memberships; an ABAC regime evaluates a +policy with all four as inputs. The contract is unchanged. + +### `authorise_many` + +``` +authorise_many(identity: Identity, + checks: list[(str, Resource, dict)]) + → list[Decision] +``` + +Bulk variant of `authorise`. Same semantics, one round-trip for +many decisions. Used when an operation fans out to multiple +resources (e.g. an agent that touches several workspaces) and a +single permission check isn't sufficient. + +`authorise_many` is not just a performance optimisation; it pins +the contract for fan-out operations early, before clients (or +internal callers) build patterns that assume one-permission-check- +per-request. Regimes implement it as a loop over `authorise` +unless they have a more efficient path. + +### Management operations + +Beyond the request-time `authenticate` / `authorise`, the contract +also covers identity-lifecycle and credential-lifecycle operations +that are invoked by administrative requests rather than by the +authentication path. These are regime-specific in detail (an SSO +regime that delegates user management to the IdP may not implement +most of them) but the operation set the gateway can forward is: + +- User management: `create-user`, `list-users`, `get-user`, + `update-user`, `disable-user`, `enable-user`, `delete-user` +- Credential management: `create-api-key`, `list-api-keys`, + `revoke-api-key`, `change-password`, `reset-password` +- Workspace management: `create-workspace`, `list-workspaces`, + `get-workspace`, `update-workspace`, `disable-workspace` +- Session management: `login` +- Key management: `get-signing-key-public`, `rotate-signing-key` +- Bootstrap: `bootstrap` + +A regime that does not support one of these (e.g. an SSO regime +where users are managed in the IdP) returns a defined "not +supported" error; the gateway surfaces it as a 501. + +## The `Identity` surface + +`Identity` is *mostly* opaque. The gateway holds the value as a +token to quote back when calling `authorise`, never decomposing it. +But there are a few gateway-side concerns that need a small +surface: + +| Field | Purpose | +|---|---| +| `handle` | Opaque reference passed back to `authorise`. Regime-defined; gateway treats as a string. | +| `workspace` | The workspace this credential authenticates to. Used by the gateway only as a default-fill-in for operations that omit a workspace. Never used as policy input — when authorisation needs to know which workspace the operation acts on, the operation places it in the resource address (or a parameter), and the regime decides. | +| `principal_id` | Stable identifier the gateway logs for audit (a user id, a sub claim, a service account id). Never used for authorisation — that's `authorise`'s job. | +| `source` | How the credential was presented (`api-key`, `jwt`, …). Non-policy; useful for logs and metrics only. | + +Anything else — roles, claims, group memberships, policy attributes +— stays inside the regime and is reachable only via `authorise`. + +## The `Resource` model + +A `Resource` is a structured value identifying *what is being +operated on*. Resources live at one of three levels in TrustGraph, +based on where the resource exists in the deployment: + +### Resource levels + +| Level | What lives there | Resource shape | +|---|---|---| +| **System** | The user registry, the workspace registry, the signing key, the audit log — anything that exists once per deployment. | `{}` | +| **Workspace** | A workspace's config, flow definitions, library (documents), knowledge cores, collections — things that exist *within* a workspace. | `{workspace: "..."}` | +| **Flow** | A flow's knowledge graph, agent state, LLM context, embedding state, MCP context — things that exist *within* a flow within a workspace. | `{workspace: "...", flow: "..."}` | + +Note carefully: + +- **Users are a system-level resource.** A user record exists at + the deployment level; the fact that a user has a *workspace + association* (one in OSS, possibly many in other regimes) is a + property of the user record, not a containment. Operations on + the user registry have `resource = {}`; the workspace + association appears as a *parameter*, not as a resource address + component. +- **Workspaces themselves are a system-level resource.** The + workspace registry exists at the deployment level. `create- + workspace` and `list-workspaces` are system-level operations; + the workspace identifier in their bodies is a parameter, not an + address. +- **A workspace's contents are workspace-level resources.** A + workspace's config, flows, library, etc. live within a + workspace. Their resource address is `{workspace: ...}`. +- **A flow's contents are flow-level resources.** Knowledge + graphs, agents, etc. live within a flow. Their resource + address is `{workspace: ..., flow: ...}`. + +### Component vocabulary + +| Component | Type | Meaning | Used by | +|---|---|---|---| +| `workspace` | string | Identifier of the workspace whose contents are being operated on | workspace-level and flow-level resource addresses | +| `flow` | string | Identifier of a flow within a workspace; always paired with `workspace` | flow-level resource addresses | +| `collection` | string | Reserved for finer-grained scoping within a workspace | future / enterprise | +| `document` | string | Reserved for per-document scoping | future / enterprise | + +A `Resource` is a partial mapping of these components to values. +The level of the resource (system / workspace / flow) determines +which components must be present. An empty `{}` is the +system-level resource. + +### Workspace as parameter vs. address + +Workspace plays two distinct roles in operations and shows up in +two distinct places: + +- **As a resource address component** — workspace identifies the + thing being operated on. Lives in `resource.workspace`. Example: + `config:read` reads the config *of* workspace W. +- **As an operation parameter** — workspace is data the operation + acts on or filters by, while the resource itself is system-level. + Lives in `parameters.workspace`. Example: `users:write` + creates a user *with workspace association* W; the resource is + the user registry (system), and W is a parameter. + +These are not interchangeable. The IAM regime considers each role +separately; the OSS role table, for instance, applies workspace- +scope to the address component when checking workspace-level +operations, and to a parameter when checking +"create-user-with-workspace-W". Both end up enforcing the admin's +scope, but through different code paths. + +### Extension rules + +The vocabulary is closed but extensible. Adding a new component: + +1. The component is added to the vocabulary in this spec, with a + defined name, type, and meaning. +2. Existing IAM regimes ignore unknown components (forward + compatibility — adding a new component does not break older + regimes that don't understand it). +3. Older gateways that don't populate a new component leave it + unset; regimes that need it for a decision treat "unset" as + "absent" and decide accordingly (typically: cannot grant + permission scoped to a component the gateway didn't supply). + +A regime that wants stricter behaviour (e.g. fail-closed on +unknown components rather than ignoring them) declares so as part +of its own configuration; the contract default is "ignore unknown". + +## Operation registry (gateway-side) + +Mapping a request onto `(capability, resource, parameters)` is +service-specific — it cannot be inferred from the capability +alone. The gateway maintains an **operation registry** that +declares, per operation: + +- The required capability. +- The resource level (system / workspace / flow) — determines the + shape of the resource identifier. +- How to extract the resource address components (workspace, + flow) from the request — from URL path, WebSocket envelope, or + body. +- Which body fields are operation parameters (and which of those + the IAM regime should see in the `parameters` argument). + +This registry is part of the gateway's endpoint declarations, not +part of the IAM contract. The contract specifies what arguments +`authorise` receives; how the gateway populates them is its own +concern. + +In the OSS gateway, registry keys follow these conventions: + +| Pattern | Used by | Resource level | +|---|---|---| +| bare op name (`create-user`, `list-users`, `login`, …) | `/api/v1/iam` and the auth surface | system / workspace, per op | +| `:` (`config:get`, `flow:list-blueprints`, `librarian:add-document`, …) | `/api/v1/{kind}` (workspace-scoped global services) | workspace | +| `flow-service:` (`flow-service:agent`, `flow-service:graph-rag`, …) | `/api/v1/flow/{flow}/service/{kind}` and the WS Mux | flow | +| `flow-import:` / `flow-export:` | `/api/v1/flow/{flow}/{import,export}/{kind}` streaming sockets | flow | + +Keys are an OSS-gateway implementation detail — the contract does +not constrain naming. The conventions above exist so the registry +key is uniquely derivable from the request path and (where +applicable) body without ambiguity. + +## Caching + +Both `authenticate` and `authorise` results are cached at the +gateway, on different policies: + +- **`authenticate`** — cached by a hash of the credential. The OSS + gateway uses a fixed short TTL (currently 60 s) so that revoked + API keys and disabled users stop working within the TTL window + without any push mechanism. Regimes that want a different + behaviour can return an `expires` hint with the identity; the + gateway honours the smaller of `expires` and its own ceiling. + +- **`authorise`** — cached by a hash of `(handle, capability, + resource, parameters)`. The regime returns a suggested TTL with + the decision; the gateway clamps it above by a deployment-set + ceiling (currently 60 s). Both allow and deny decisions are + cached; denies briefly, to avoid hammering the regime with + repeated rejected attempts. + +The TTL ceiling caps the revocation latency window — a role +revoked at the regime takes effect at the gateway no later than +the ceiling. Operators that need stricter revocation can lower +the ceiling. + +## Failure modes + +| Condition | Behaviour | +|---|---| +| `authenticate` returns AuthFailure | Gateway responds 401 with the masked `auth failure` body. | +| `authorise` returns deny | Gateway responds 403 with the masked `access denied` body. | +| IAM regime unreachable | Gateway responds 401 / 503 (deployment-defined). No fail-open. | +| `authorise_many` partial deny | Gateway treats the request as denied; the operation is rejected. Partial-success semantics are not part of the contract. | +| Regime returns "not supported" for a management operation | Gateway responds 501. | + +There is no fallback or "soft" decision path. An IAM regime that +is unavailable, slow, or returning errors causes requests to fail +closed. + +## Implementations + +### Open-source role-based regime + +Defined in [`iam-protocol.md`](iam-protocol.md). Implements the +contract via: + +- A pub/sub request/response service (`iam-svc`) reached only by + the gateway over the message bus. +- Credentials are API keys (opaque) or JWTs (Ed25519, locally + validated by the gateway against the regime's published public + key). +- `authorise` reduces to a role-and-workspace-scope check against + the role table defined in [`capabilities.md`](capabilities.md). +- Identity, user, and workspace records live in Cassandra. + +The OSS regime is deliberately simple — three roles, single +home-workspace per user (a regime data-model decision, not a +contract assertion), no policy language. + +### Future regimes + +The contract is shaped to admit, without code change in the +gateway: + +- **OIDC / SSO** — `authenticate` validates an OIDC ID token via + the IdP's JWKS; `Identity.handle` carries the verified subject + and group claims; `authorise` evaluates against group-to- + capability mappings configured at the regime. +- **ABAC / Policy engine** — `authorise` calls out to a policy + engine (Rego, Cedar, custom DSL) with the identity's attributes + and the resource as the policy input. +- **ReBAC (Zanzibar-style)** — `authorise` translates `(identity, + capability, resource)` into a relationship-tuple lookup against + a tuple store. +- **Hybrid** — multiple regimes composed: e.g. authenticate via + SSO, authorise via local policy. + +None of these require gateway changes. The contract surface is +the same; the regime is what differs. + +## References + +- [Identity and Access Management Specification](iam.md) — overall + design and the gateway-side framing. +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. +- [Capability Vocabulary Specification](capabilities.md) — the + capability strings the gateway uses as `authorise` input. diff --git a/docs/tech-specs/iam-protocol.md b/docs/tech-specs/iam-protocol.md index 8049ebfe..603d1c06 100644 --- a/docs/tech-specs/iam-protocol.md +++ b/docs/tech-specs/iam-protocol.md @@ -8,21 +8,41 @@ parent: "Tech Specs" ## Overview -The IAM service is a backend processor, reached over the standard -request/response pub/sub pattern. It is the authority for users, -workspaces, API keys, and login credentials. The API gateway -delegates to it for authentication resolution and for all user / -workspace / key management. +This document specifies the wire protocol of the **open-source IAM +regime** — one implementation of the abstract IAM contract defined +in [`iam-contract.md`](iam-contract.md). Other regimes (OIDC / SSO, +ABAC, ReBAC, external policy engines) implement the same contract +with different transports, data models, and policy semantics; the +gateway is unaware of which regime it's wired against. -This document defines the wire protocol: the `IamRequest` and -`IamResponse` dataclasses, the operation set, the per-operation -input and output fields, the error taxonomy, and the initial HTTP -forwarding endpoint used while IAM is being integrated into the -gateway. +The OSS regime is a backend processor (`iam-svc`) reached over the +standard request/response pub/sub pattern. It owns users, +workspaces, API keys, login credentials, and JWT signing keys, all +backed by Cassandra. The API gateway is its only caller. -Architectural context — roles, capabilities, workspace scoping, -enforcement boundary — lives in [`iam.md`](iam.md) and -[`capabilities.md`](capabilities.md). +This document defines: + +- the `IamRequest` and `IamResponse` dataclasses on the bus, +- the operation set the OSS regime implements, +- per-operation input and output fields, +- the error taxonomy, +- the bootstrap modes, +- the initial HTTP forwarding endpoint used while the protocol is + being exercised. + +The mapping from this regime onto the abstract contract is direct: + +| Contract operation | OSS regime operation | +|---|---| +| `authenticate(credential)` | `resolve-api-key` (for API keys); local JWT validation against `get-signing-key-public` (for JWTs) | +| `authorise(identity, capability, resource, parameters)` | Role-table lookup against the OSS role bundles defined in [`capabilities.md`](capabilities.md), gated by workspace scope. Workspace can come from the resource address (workspace- and flow-level resources) or from a parameter (system-level resources whose parameters reference a workspace, e.g. `create-user with workspace association W`). | +| `authorise_many` | Loop over `authorise` | +| Identity / credential / workspace management | `create-user`, `create-api-key`, etc. as listed below. These are operations on system-level resources (the user / workspace / credential registries); workspace, where it appears in the body, is a parameter. | + +Architectural context — roles, capabilities, workspace as resource +scope, enforcement boundary — lives in [`iam.md`](iam.md) and +[`capabilities.md`](capabilities.md). The contract abstraction +lives in [`iam-contract.md`](iam-contract.md). ## Transport @@ -345,5 +365,7 @@ lands in the subsequent middleware work. ## References +- [IAM Contract Specification](iam-contract.md) — the abstract + gateway↔IAM regime contract this protocol implements. - [Identity and Access Management Specification](iam.md) - [Capability Vocabulary Specification](capabilities.md) diff --git a/docs/tech-specs/iam.md b/docs/tech-specs/iam.md index 50b64444..a764535e 100644 --- a/docs/tech-specs/iam.md +++ b/docs/tech-specs/iam.md @@ -199,9 +199,9 @@ The server rejects all non-auth messages until authentication succeeds. The socket remains open on auth failure, allowing the client to retry with a different token without reconnecting. The client can also send a new auth message at any time to re-authenticate — for example, to -refresh an expiring JWT or to switch workspace. The -resolved identity (user, workspace, roles) is updated on each -successful auth. +refresh an expiring JWT or to switch workspace. The resolved +identity (handle, workspace, principal_id, source) is updated on +each successful auth. #### API keys @@ -219,7 +219,7 @@ For programmatic access: CLI tools, scripts, and integrations. On each request, the gateway resolves an API key by: 1. Hashing the token. -2. Checking a local cache (hash → user/workspace/roles). +2. Checking a local cache (hash → identity). 3. On cache miss, calling the IAM service to resolve. 4. Caching the result with a short TTL (e.g. 60 seconds). @@ -233,9 +233,15 @@ For interactive access via the UI or WebSocket connections. - A user logs in with username and password. The gateway forwards the request to the IAM service, which validates the credentials and returns a signed JWT. -- The JWT carries the user ID, workspace, and roles as claims. +- The JWT carries identity-binding claims only — user id (`sub`) + and the workspace this credential authenticates to. No roles, + no policy state. Per the IAM contract, all policy decisions go + through `authorise`; the gateway never reads roles or other + regime-internal state from the credential. - The gateway validates JWTs locally using the IAM service's public - signing key — no service call needed on subsequent requests. + signing key — no service call needed for the authentication step; + authorisation calls remain per-request (cached per the contract's + caching rules). - Token expiry is enforced by standard JWT validation at the time the request (or WebSocket connection) is made. - For long-lived WebSocket connections, the JWT is validated at connect @@ -285,35 +291,82 @@ authentication uses API keys or JWTs. On first start, the bootstrap process creates a default workspace and admin user with an initial API key. -### User identity +### Identity, credentials, and workspace binding -A user belongs to exactly one workspace. The design supports extending -this to multi-workspace access in the future (see -[Extension points](#extension-points)). +The gateway never asks "which workspace does *this user* belong to?". +That question forces every IAM regime to expose a user-to-workspace +mapping, which prevents regimes where the relationship is many-to-many +or doesn't exist (e.g. SSO with IdP-driven workspace selection). +Instead, the gateway asks "which workspace does *this credential* +authenticate to?" — a question every regime can answer in its own +terms. -A user record contains: +A credential (API key, JWT, OIDC token, etc.) is **bound to a +workspace at issue time**. The IAM regime decides what binding +means: + +- **OSS regime** — each user has a home workspace; credentials + issued to that user are bound to that workspace. A 1:1 + user-to-workspace constraint is an internal data-model decision, + not a contract assertion. +- **Multi-workspace regime** (future / enterprise) — a user with + access to several workspaces gets a different credential per + workspace. Each credential authenticates to exactly one + workspace; the relationship between user and workspace is a + regime-internal detail the gateway does not see. + +When the gateway authenticates a credential, the IAM regime returns +an `Identity` whose `workspace` is the workspace this credential is +for. That value — not "the user's workspace" — is what the gateway +uses for default-fill-in and as input to the IAM `authorise` call. + +#### Identity surface + +What the gateway holds after `authenticate`: + +| Field | Purpose | +|-------|---------| +| `handle` | Opaque token quoted back when calling `authorise`. Regime-defined. | +| `workspace` | The workspace this credential authenticates to. Used as the default if a request omits workspace. | +| `principal_id` | Stable identifier for audit logging (a user id, sub claim, service account id). Never used for authorisation. | +| `source` | How the credential was presented (`api-key`, `jwt`). Logged with audit events; not policy input. | + +Anything else — roles, claims, group memberships, policy attributes +— stays inside the regime and is reachable only via `authorise`. +See [`iam-contract.md`](iam-contract.md) for the full contract. + +#### OSS user record + +The OSS regime stores the following per user. These fields are +**OSS-implementation specifics**, not part of the contract. | Field | Type | Description | |-------|------|-------------| | `id` | string | Unique user identifier (UUID) | | `name` | string | Display name | | `email` | string | Email address (optional) | -| `workspace` | string | Workspace the user belongs to | +| `workspace` | string | Home workspace; default binding for issued credentials | | `roles` | list[string] | Assigned roles (e.g. `["reader"]`) | | `enabled` | bool | Whether the user can authenticate | | `created` | datetime | Account creation timestamp | -The `workspace` field maps to the existing `user` field in `Metadata`. -This means the storage-layer isolation (Cassandra, Neo4j, Qdrant -filtering by `user` + `collection`) works without changes — the gateway -sets the `user` metadata field to the authenticated user's workspace. +The `workspace` field on a user record is the **default binding** +used when issuing credentials, not a constraint visible to the +gateway. An enterprise regime may have no user records at all +(authentication delegated to an IdP). ### Workspaces -A workspace is an isolated data boundary. Users belong to a workspace, -and all data operations are scoped to it. Workspaces map to the existing -`user` field in `Metadata` and the corresponding Cassandra keyspace, -Qdrant collection prefix, and Neo4j property filters. +A workspace is an isolated data boundary — a tenancy scope in which +users, flows, configuration, documents, and knowledge graphs live. +Workspaces map to storage-layer isolation: the `user` field in +`Metadata`, the corresponding Cassandra keyspace, the Qdrant +collection prefix, the Neo4j property filter. + +Workspace is the most prominent component of an operation's +**resource scope**: when a request says "do X to Y", workspace is +part of "Y". Listing users, creating flows, querying the graph — +all of these target a specific workspace. | Field | Type | Description | |-------|------|-------------| @@ -322,57 +375,164 @@ Qdrant collection prefix, and Neo4j property filters. | `enabled` | bool | Whether the workspace is active | | `created` | datetime | Creation timestamp | -All data operations are scoped to a workspace. The gateway determines -the effective workspace for each request as follows: +#### Default-fill-in -1. If the request includes a `workspace` parameter, validate it against - the user's assigned workspace. - - If it matches, use it. - - If it does not match, return 403. (This could be extended to - check a workspace access grant list.) -2. If no `workspace` parameter is provided, use the user's assigned - workspace. +If a request omits workspace, the gateway fills it in from the +authenticated identity's bound workspace (`identity.workspace`) +before any IAM check runs. IAM never receives an unresolved +workspace; every `authorise` call sees a concrete value. -The gateway sets the `user` field in `Metadata` to the effective -workspace ID, replacing the caller-supplied `?user=` query parameter. +#### Authorisation -This design ensures forward compatibility. Clients that pass a -workspace parameter will work unchanged if multi-workspace support is -added later. Requests for an unassigned workspace get a clear 403 -rather than silent misbehaviour. +Whether the resolved workspace is permitted to be operated on by +this caller is an **IAM decision**, not a gateway one. The gateway +calls `authorise(identity, capability, {workspace: ..., ...})` and +relays the answer. In the OSS regime, the answer comes from the +caller's role × workspace-scope — see [`capabilities.md`](capabilities.md). +In other regimes it could come from group mappings, policies, +relationship tuples, or anything else the regime models. + +### Request anatomy + +The shape of a request — where workspace appears, where flow +appears, where parameters live — follows from **the level of the +resource being operated on**, not from any single property of the +request like its URL or its required capability. + +Resources live at one of three levels (see also the resource model +in [`iam-contract.md`](iam-contract.md)): + +| Resource level | Examples | Resource address | +|---|---|---| +| **System** | The user registry, the workspace registry, the IAM signing key, the audit log | empty `{}` | +| **Workspace** | A workspace's config, flow definitions, library, knowledge cores, collections | `{workspace: ...}` | +| **Flow** | A flow's knowledge graph, agent state, LLM context, embeddings, MCP context | `{workspace: ..., flow: ...}` | + +For the gateway-to-bus mapping this dictates **where workspace +lives in the message**, but only when workspace is part of the +*resource address*. Workspace can also appear as an *operation +parameter* on system-level resources (see below). + +#### Workspace as address vs. parameter + +Two distinct roles, two distinct locations: + +- **Workspace as address component.** Workspace identifies the + thing being operated on. Used for workspace-level and flow-level + resources. Lives in the addressing layer of the message — the + URL path for HTTP, or the WebSocket envelope alongside `flow` for + flow-scoped operations sent through the Mux. +- **Workspace as operation parameter.** Workspace is data the + operation acts on, while the resource itself is system-level. + Used for operations on the user registry (`create-user with + workspace association W`), the workspace registry (`create- + workspace W`), and other system-level operations that happen to + reference a workspace. Lives in the request body or inner WS + payload alongside the operation's other parameters. + +The two roles never coexist on the same operation. Either the +operation addresses something within a workspace (workspace is in +the address) or it operates on a system-level resource with +workspace as a parameter (workspace is in the body) or workspace +is irrelevant (system-level operations like `bootstrap`, +`rotate-signing-key`, `login` itself). + +#### Where workspace lives, by request type + +| Request type | Resource level | Workspace lives in | +|---|---|---| +| Flow-scoped data plane (`agent`, `graph-rag`, `llm`, `embeddings`, `mcp`, etc.) | Flow | Envelope alongside `flow` (WS) or URL path (HTTP) — part of the address | +| Workspace-scoped control plane (`config`, `library`, `knowledge`, `collection-management`, flow lifecycle) | Workspace | Body / inner request — part of the address | +| User registry ops (`create-user`, `list-users`, `disable-user`, etc.) | System | Body — as a *parameter* (the user's workspace association or a list filter) | +| Workspace registry ops (`create-workspace`, `list-workspaces`, etc.) | System | Body — as a *parameter* (the workspace identifier in `workspace_record`) | +| Credential ops (`create-api-key`, `revoke-api-key`, `change-password`, `reset-password`) | System | Body — as a *parameter* on ops that have one; absent on `change-password` (target is the caller's identity) | +| System ops (`bootstrap`, `login`, `rotate-signing-key`, `get-signing-key-public`) | System | Not present at all | + +The classification is deliberate. Users are a global concept that +*have* a workspace; they don't *live* in one. An OSS regime has +1:1 user-to-workspace; a multi-workspace regime maps a user to many +workspaces; an SSO regime might delegate workspace membership to an +IdP entirely. The gateway treats user-registry operations as +system-level so the contract is the same across regimes — the +workspace association is a parameter the regime interprets in its +own terms. + +#### HTTP + +HTTP routes by URL path, so the address lives in the URL. +Per-operation REST shape: + +- Flow-level: `POST /api/v1/workspaces/{w}/flows/{f}/services/{kind}` + — `workspace` and `flow` are URL components. +- Workspace-level: `POST /api/v1/workspaces/{w}/config`, + `/api/v1/workspaces/{w}/library`, etc. — `workspace` is a URL + component. +- System-level: `POST /api/v1/users`, `/api/v1/workspaces`, etc. — + no workspace in URL; if the operation references one, it's a + field in the body. + +`/api/v1/iam` is itself registry-driven: the body's `operation` +field is looked up against the registry to obtain the capability, +resource shape, and parameter shape per operation, rather than +gating the whole endpoint with a single coarse capability. + +#### WebSocket Mux + +The Mux envelope is the addressing layer for flow-scoped +operations. For workspace-level and system-level operations the +envelope routes by `service` only, and the inner request payload +carries the address components or parameters as appropriate. See +[`iam-contract.md`](iam-contract.md) for the operation-registry +mechanism the Mux uses to know which fields to read. ### Roles and access control -Three roles with fixed permissions: +Roles are an OSS-regime concept and live entirely in the IAM +service. The gateway does not enumerate or check them — it asks +`authorise(identity, capability, resource, parameters)` per +request and the regime maps the caller's roles to a decision. -| Role | Data operations | Admin operations | System | -|------|----------------|-----------------|--------| -| `reader` | Query knowledge graph, embeddings, RAG | None | None | -| `writer` | All reader operations + load documents, manage collections | None | None | -| `admin` | All writer operations | Config, flows, collection management, user management | Metrics | +The OSS regime ships three roles: -Role checks happen at the gateway before dispatching to backend -services. Each endpoint declares the minimum role required: +| Role | Capabilities granted | +|------|----------------------| +| `reader` | Read capabilities on data and config (`graph:read`, `documents:read`, `rows:read`, `config:read`, `flows:read`, `knowledge:read`, `collections:read`, `keys:self`, plus the per-service caps `agent`, `llm`, `embeddings`, `mcp`). | +| `writer` | All reader capabilities, plus `graph:write`, `documents:write`, `rows:write`, `knowledge:write`, `collections:write`. | +| `admin` | All writer capabilities, plus `config:write`, `flows:write`, `users:read`, `users:write`, `users:admin`, `keys:admin`, `workspaces:admin`, `iam:admin`, `metrics:read`. | -| Endpoint pattern | Minimum role | -|-----------------|--------------| -| `GET /api/v1/socket` (queries) | `reader` | -| `POST /api/v1/librarian` | `writer` | -| `POST /api/v1/flow/*/import/*` | `writer` | -| `POST /api/v1/config` | `admin` | -| `GET /api/v1/flow/*` | `admin` | -| `GET /api/metrics` | `admin` | +Workspace scope: `reader` and `writer` are active only in the +caller's bound workspace; `admin` is active across all workspaces. -Roles are hierarchical: `admin` implies `writer`, which implies -`reader`. +The gateway gates each endpoint by *capability*, not by role. +Capabilities are declared per operation in the gateway's operation +registry; see [`iam-contract.md`](iam-contract.md) for the +registry mechanism and [`capabilities.md`](capabilities.md) for +the capability vocabulary. ### IAM service -The IAM service is a new backend service that manages all identity and -access data. It is the authority for users, workspaces, API keys, and -credentials. The gateway delegates to it. +The IAM service is a backend service that implements the +[IAM contract](iam-contract.md) — `authenticate`, `authorise`, and +the management operations the gateway forwards. It is the +authority for identity, credential validation, and access decisions. +The gateway treats it as a black box behind the contract; nothing +in the gateway is regime-specific. -#### Data model +The OSS distribution ships one IAM regime: a role-based service +backed by Cassandra, described in +[`iam-protocol.md`](iam-protocol.md). Enterprise / future regimes +can replace this implementation without changing the gateway, the +wire protocol between gateway and backends, or the capability +vocabulary — see the contract spec for the abstraction the gateway +is wired against and the implementation notes for what other +regimes look like. + +#### OSS data model + +The OSS regime stores users, workspaces, API keys, and signing +keys in Cassandra. This is an **OSS regime implementation +detail**; it is not part of the contract. Other regimes will have +different (or no) data models. ``` iam_workspaces ( @@ -456,42 +616,53 @@ surface — e.g. `"missing required field 'workspace'"` or ### Gateway changes -The current `Authenticator` class is replaced with a thin authentication -middleware that delegates to the IAM service: +The current `Authenticator` class is replaced with a thin +authentication+authorisation middleware that delegates to the IAM +service per the IAM contract. The gateway performs no role check +itself — authorisation is asked of the regime via `authorise`. For HTTP requests: 1. Extract Bearer token from the `Authorization` header. 2. If the token has JWT format (dotted structure): - Validate signature locally using the cached public key. - - Extract user ID, workspace, and roles from claims. + - Build an `Identity` from `sub` and `workspace` claims (no + other claims are consulted). 3. Otherwise, treat as an API key: - Hash the token and check the local cache. - - On cache miss, call the IAM service to resolve. - - Cache the result (user/workspace/roles) with a short TTL. + - On cache miss, call the IAM service to resolve to an + `Identity` (handle, workspace, principal_id, source). + - Cache the result with a short TTL. 4. If neither succeeds, return 401. -5. If the user or workspace is disabled, return 403. -6. Check the user's role against the endpoint's minimum role. If - insufficient, return 403. -7. Resolve the effective workspace: - - If the request includes a `workspace` parameter, validate it - against the user's assigned workspace. Return 403 on mismatch. - - If no `workspace` parameter, use the user's assigned workspace. -8. Set the `user` field in the request context to the effective - workspace ID. This propagates through `Metadata` to all downstream - services. +5. Look up the operation in the gateway's operation registry to get + `(capability, resource_level, extractors)`. Build the resource + address (system / workspace / flow level) and parameters from + the request. +6. Default-fill the workspace into the body when the operation is + workspace- or flow-level (so downstream code sees a single + canonical address); the resource address keeps its supplied + value. +7. Call `authorise(identity, capability, resource, parameters)`. + On allow, forward the request; on deny, return 403. On regime + error, fail closed (401 / 503 per deployment). +8. Cache the decision per the contract's caching rules (clamped + above by a deployment-set ceiling). For WebSocket connections: 1. Accept the connection in an unauthenticated state. 2. Wait for an auth message (`{"type": "auth", "token": "..."}`). -3. Validate the token using the same logic as steps 2-7 above. +3. Validate the token using the same logic as steps 1-3 above. 4. On success, attach the resolved identity to the connection and send `{"type": "auth-ok", ...}`. 5. On failure, send `{"type": "auth-failed", ...}` but keep the socket open. 6. Reject all non-auth messages until authentication succeeds. 7. Accept new auth messages at any time to re-authenticate. +8. For each subsequent request frame, look up + `flow-service:` in the registry and call `authorise` + against the `{workspace, flow}` resource — same authority + gateway HTTP callers see, evaluated per-frame. ### CLI changes @@ -892,6 +1063,12 @@ service, not in the config service. Reasons: ## References +- [IAM Contract Specification](iam-contract.md) — the gateway↔IAM + regime abstraction this design is wired against. +- [IAM Service Protocol Specification](iam-protocol.md) — the OSS + regime's wire-level protocol. +- [Capability Vocabulary Specification](capabilities.md) — the + capability strings the gateway uses as `authorise` input. - [Data Ownership and Information Separation](data-ownership-model.md) - [MCP Tool Bearer Token Specification](mcp-tool-bearer-token.md) - [Multi-Tenant Support Specification](multi-tenant-support.md) diff --git a/tests/unit/test_gateway/test_auth.py b/tests/unit/test_gateway/test_auth.py index ba2b9bc2..26e93fd9 100644 --- a/tests/unit/test_gateway/test_auth.py +++ b/tests/unit/test_gateway/test_auth.py @@ -87,7 +87,6 @@ class TestVerifyJwtEddsa: priv, pub = make_keypair() claims = { "sub": "user-1", "workspace": "default", - "roles": ["reader"], "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -99,7 +98,7 @@ class TestVerifyJwtEddsa: def test_expired_jwt_rejected(self): priv, pub = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()) - 3600, "exp": int(time.time()) - 1, } @@ -111,7 +110,7 @@ class TestVerifyJwtEddsa: priv_a, _ = make_keypair() _, pub_b = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -131,7 +130,7 @@ class TestVerifyJwtEddsa: # since we expect it to bail before verifying. header = {"alg": "HS256", "typ": "JWT", "kid": "x"} payload = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } h = _b64url(json.dumps(header, separators=(",", ":")).encode()) @@ -149,11 +148,12 @@ class TestIdentity: def test_fields(self): i = Identity( - user_id="u", workspace="w", roles=["reader"], source="api-key", + handle="u", workspace="w", + principal_id="u", source="api-key", ) - assert i.user_id == "u" + assert i.handle == "u" assert i.workspace == "w" - assert i.roles == ["reader"] + assert i.principal_id == "u" assert i.source == "api-key" @@ -194,7 +194,6 @@ class TestIamAuthDispatch: priv, pub = make_keypair() claims = { "sub": "user-1", "workspace": "default", - "roles": ["writer"], "iat": int(time.time()), "exp": int(time.time()) + 60, } @@ -206,9 +205,9 @@ class TestIamAuthDispatch: ident = await auth.authenticate( make_request(f"Bearer {token}") ) - assert ident.user_id == "user-1" + assert ident.handle == "user-1" assert ident.workspace == "default" - assert ident.roles == ["writer"] + assert ident.principal_id == "user-1" assert ident.source == "jwt" @pytest.mark.asyncio @@ -217,7 +216,7 @@ class TestIamAuthDispatch: # must not validate — even ones that would otherwise pass. priv, _ = make_keypair() claims = { - "sub": "user-1", "workspace": "default", "roles": [], + "sub": "user-1", "workspace": "default", "iat": int(time.time()), "exp": int(time.time()) + 60, } token = sign_jwt(priv, claims) @@ -232,6 +231,9 @@ class TestIamAuthDispatch: async def fake_resolve(api_key): assert api_key == "tg_testkey" + # Roles are returned by the regime as a hint but the + # gateway ignores them — kept here so the resolve + # protocol shape is exercised. return ("user-xyz", "default", ["admin"]) async def fake_with_client(op): @@ -241,9 +243,9 @@ class TestIamAuthDispatch: ident = await auth.authenticate( make_request("Bearer tg_testkey") ) - assert ident.user_id == "user-xyz" + assert ident.handle == "user-xyz" assert ident.workspace == "default" - assert ident.roles == ["admin"] + assert ident.principal_id == "user-xyz" assert ident.source == "api-key" @pytest.mark.asyncio @@ -301,8 +303,8 @@ class TestApiKeyCache: a = await auth.authenticate(make_request("Bearer tg_a")) b = await auth.authenticate(make_request("Bearer tg_b")) - assert a.user_id == "u-tg_a" - assert b.user_id == "u-tg_b" + assert a.handle == "u-tg_a" + assert b.handle == "u-tg_b" assert seen == ["tg_a", "tg_b"] @pytest.mark.asyncio @@ -310,3 +312,136 @@ class TestApiKeyCache: # Not a behaviour test — just ensures we don't accidentally # set TTL to 0 (which would defeat the cache) or to a week. assert 10 <= API_KEY_CACHE_TTL <= 3600 + + +# -- IamAuth.authorise ----------------------------------------------------- + + +class TestAuthorise: + """``authorise()`` is the gateway's only authorisation entry + point under the IAM contract. It calls iam-svc, caches the + decision for the regime's TTL (clamped above), and raises 403 + on deny / 401 on regime error (fail closed).""" + + def _make_identity(self, handle="u-1", workspace="default"): + return Identity( + handle=handle, workspace=workspace, + principal_id=handle, source="api-key", + ) + + @pytest.mark.asyncio + async def test_allow_returns_no_exception(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + await auth.authorise( + self._make_identity(), + "graph:read", + {"workspace": "default"}, + {}, + ) + + @pytest.mark.asyncio + async def test_deny_raises_403(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authorise=AsyncMock(return_value=(False, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPForbidden): + await auth.authorise( + self._make_identity(), + "users:admin", + {}, + {"workspace": "acme"}, + ) + + @pytest.mark.asyncio + async def test_regime_error_fails_closed_as_401(self): + # If iam-svc errors, the gateway must NOT silently allow. + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + raise RuntimeError("iam-svc down") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authorise( + self._make_identity(), + "graph:read", + {"workspace": "default"}, + {}, + ) + + @pytest.mark.asyncio + async def test_allow_decision_is_cached(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + for _ in range(5): + await auth.authorise( + ident, "graph:read", {"workspace": "default"}, {}, + ) + + assert calls["n"] == 1 + + @pytest.mark.asyncio + async def test_deny_decision_is_cached(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(False, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + for _ in range(5): + with pytest.raises(web.HTTPForbidden): + await auth.authorise( + ident, "users:admin", {}, {"workspace": "acme"}, + ) + + # Denies are cached too — repeated attempts don't re-hit IAM. + assert calls["n"] == 1 + + @pytest.mark.asyncio + async def test_different_resources_cached_separately(self): + auth = IamAuth(backend=Mock()) + calls = {"n": 0} + + async def fake_with_client(op): + calls["n"] += 1 + return await op(Mock( + authorise=AsyncMock(return_value=(True, 30)), + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = self._make_identity() + await auth.authorise( + ident, "graph:read", {"workspace": "a"}, {}, + ) + await auth.authorise( + ident, "graph:read", {"workspace": "b"}, {}, + ) + + # Different resource → different cache key → two IAM calls. + assert calls["n"] == 2 diff --git a/tests/unit/test_gateway/test_capabilities.py b/tests/unit/test_gateway/test_capabilities.py index 063e9ea4..102e381e 100644 --- a/tests/unit/test_gateway/test_capabilities.py +++ b/tests/unit/test_gateway/test_capabilities.py @@ -1,15 +1,22 @@ """ -Tests for gateway/capabilities.py — the capability + role + workspace -model that underpins all gateway authorisation. +Tests for gateway/capabilities.py — the thin authorisation surface +under the IAM contract. + +The gateway no longer holds policy state (roles, capability sets, +workspace scopes); those live in iam-svc. These tests cover only +what the gateway shim does itself: PUBLIC / AUTHENTICATED short- +circuiting, default-fill of workspace, and forwarding of capability +checks to ``auth.authorise``. """ import pytest from aiohttp import web +from unittest.mock import AsyncMock, MagicMock from trustgraph.gateway.capabilities import ( PUBLIC, AUTHENTICATED, - KNOWN_CAPABILITIES, ROLE_DEFINITIONS, - check, enforce_workspace, access_denied, auth_failure, + enforce, enforce_workspace, + access_denied, auth_failure, ) @@ -17,109 +24,74 @@ from trustgraph.gateway.capabilities import ( class _Identity: - """Minimal stand-in for auth.Identity — the capability module - accesses ``.workspace`` and ``.roles``.""" - def __init__(self, workspace, roles): - self.user_id = "user-1" + """Stand-in for auth.Identity — under the IAM contract it has + just ``handle``, ``workspace``, ``principal_id``, ``source``.""" + + def __init__(self, handle="user-1", workspace="default"): + self.handle = handle self.workspace = workspace - self.roles = list(roles) + self.principal_id = handle + self.source = "api-key" -def reader_in(ws): - return _Identity(ws, ["reader"]) +def _allow_auth(identity=None): + """Build an Auth double that authenticates to ``identity`` and + allows every authorise() call.""" + auth = MagicMock() + auth.authenticate = AsyncMock( + return_value=identity or _Identity(), + ) + auth.authorise = AsyncMock(return_value=None) + return auth -def writer_in(ws): - return _Identity(ws, ["writer"]) +def _deny_auth(identity=None): + """Build an Auth double that authenticates but denies authorise.""" + auth = MagicMock() + auth.authenticate = AsyncMock( + return_value=identity or _Identity(), + ) + auth.authorise = AsyncMock(side_effect=access_denied()) + return auth -def admin_in(ws): - return _Identity(ws, ["admin"]) +# -- enforce() ------------------------------------------------------------- -# -- role table sanity ----------------------------------------------------- +class TestEnforce: + @pytest.mark.asyncio + async def test_public_returns_none_no_auth(self): + auth = _allow_auth() + result = await enforce(MagicMock(), auth, PUBLIC) + assert result is None + auth.authenticate.assert_not_called() + auth.authorise.assert_not_called() -class TestRoleTable: + @pytest.mark.asyncio + async def test_authenticated_skips_authorise(self): + identity = _Identity() + auth = _allow_auth(identity) + result = await enforce(MagicMock(), auth, AUTHENTICATED) + assert result is identity + auth.authenticate.assert_awaited_once() + auth.authorise.assert_not_called() - def test_oss_roles_present(self): - assert set(ROLE_DEFINITIONS.keys()) == {"reader", "writer", "admin"} + @pytest.mark.asyncio + async def test_capability_calls_authorise_system_level(self): + identity = _Identity() + auth = _allow_auth(identity) + result = await enforce(MagicMock(), auth, "graph:read") + assert result is identity + auth.authorise.assert_awaited_once_with( + identity, "graph:read", {}, {}, + ) - def test_admin_is_cross_workspace(self): - assert ROLE_DEFINITIONS["admin"]["workspace_scope"] == "*" - - def test_reader_writer_are_assigned_scope(self): - assert ROLE_DEFINITIONS["reader"]["workspace_scope"] == "assigned" - assert ROLE_DEFINITIONS["writer"]["workspace_scope"] == "assigned" - - def test_admin_superset_of_writer(self): - admin = ROLE_DEFINITIONS["admin"]["capabilities"] - writer = ROLE_DEFINITIONS["writer"]["capabilities"] - assert writer.issubset(admin) - - def test_writer_superset_of_reader(self): - writer = ROLE_DEFINITIONS["writer"]["capabilities"] - reader = ROLE_DEFINITIONS["reader"]["capabilities"] - assert reader.issubset(writer) - - def test_admin_has_users_admin(self): - assert "users:admin" in ROLE_DEFINITIONS["admin"]["capabilities"] - - def test_writer_does_not_have_users_admin(self): - assert "users:admin" not in ROLE_DEFINITIONS["writer"]["capabilities"] - - def test_every_bundled_capability_is_known(self): - for role in ROLE_DEFINITIONS.values(): - for cap in role["capabilities"]: - assert cap in KNOWN_CAPABILITIES - - -# -- check() --------------------------------------------------------------- - - -class TestCheck: - - def test_reader_has_reader_cap_in_own_workspace(self): - assert check(reader_in("default"), "graph:read", "default") - - def test_reader_does_not_have_writer_cap(self): - assert not check(reader_in("default"), "graph:write", "default") - - def test_reader_cannot_act_in_other_workspace(self): - assert not check(reader_in("default"), "graph:read", "acme") - - def test_writer_has_write_in_own_workspace(self): - assert check(writer_in("default"), "graph:write", "default") - - def test_writer_cannot_act_in_other_workspace(self): - assert not check(writer_in("default"), "graph:write", "acme") - - def test_admin_has_everything_everywhere(self): - for cap in ("graph:read", "graph:write", "config:write", - "users:admin", "metrics:read"): - assert check(admin_in("default"), cap, "acme"), ( - f"admin should have {cap} in acme" - ) - - def test_admin_has_caps_without_explicit_workspace(self): - assert check(admin_in("default"), "users:admin") - - def test_default_target_is_identity_workspace(self): - # Reader with no target workspace → should check against own - assert check(reader_in("default"), "graph:read") - - def test_unknown_capability_returns_false(self): - assert not check(admin_in("default"), "nonsense:cap", "default") - - def test_unknown_role_contributes_nothing(self): - ident = _Identity("default", ["made-up-role"]) - assert not check(ident, "graph:read", "default") - - def test_multi_role_union(self): - # If a user is both reader and admin, they inherit admin's - # cross-workspace powers. - ident = _Identity("default", ["reader", "admin"]) - assert check(ident, "users:admin", "acme") + @pytest.mark.asyncio + async def test_capability_denied_raises_forbidden(self): + auth = _deny_auth() + with pytest.raises(web.HTTPForbidden): + await enforce(MagicMock(), auth, "users:admin") # -- enforce_workspace() --------------------------------------------------- @@ -127,56 +99,54 @@ class TestCheck: class TestEnforceWorkspace: - def test_reader_in_own_workspace_allowed(self): - data = {"workspace": "default", "operation": "x"} - enforce_workspace(data, reader_in("default")) - assert data["workspace"] == "default" - - def test_reader_no_workspace_injects_assigned(self): + @pytest.mark.asyncio + async def test_default_fills_from_identity(self): data = {"operation": "x"} - enforce_workspace(data, reader_in("default")) + auth = _allow_auth() + await enforce_workspace(data, _Identity(workspace="default"), auth) assert data["workspace"] == "default" - def test_reader_mismatched_workspace_denied(self): + @pytest.mark.asyncio + async def test_caller_supplied_workspace_kept(self): data = {"workspace": "acme", "operation": "x"} - with pytest.raises(web.HTTPForbidden): - enforce_workspace(data, reader_in("default")) - - def test_admin_can_target_any_workspace(self): - data = {"workspace": "acme", "operation": "x"} - enforce_workspace(data, admin_in("default")) + auth = _allow_auth() + await enforce_workspace(data, _Identity(workspace="default"), auth) assert data["workspace"] == "acme" - def test_admin_no_workspace_defaults_to_assigned(self): - data = {"operation": "x"} - enforce_workspace(data, admin_in("default")) - assert data["workspace"] == "default" - - def test_writer_same_workspace_specified_allowed(self): + @pytest.mark.asyncio + async def test_no_capability_skips_authorise(self): data = {"workspace": "default"} - enforce_workspace(data, writer_in("default")) - assert data["workspace"] == "default" + auth = _allow_auth() + await enforce_workspace(data, _Identity(), auth) + auth.authorise.assert_not_called() - def test_non_dict_passthrough(self): - # Non-dict bodies are returned unchanged (e.g. streaming). - result = enforce_workspace("not-a-dict", reader_in("default")) - assert result == "not-a-dict" + @pytest.mark.asyncio + async def test_capability_calls_authorise_with_resource(self): + data = {"workspace": "acme"} + identity = _Identity() + auth = _allow_auth(identity) + await enforce_workspace( + data, identity, auth, capability="graph:read", + ) + auth.authorise.assert_awaited_once_with( + identity, "graph:read", {"workspace": "acme"}, {}, + ) - def test_with_capability_tightens_check(self): - # Reader lacks graph:write; workspace-only check would pass - # (scope is fine), but combined check must reject. - data = {"workspace": "default"} + @pytest.mark.asyncio + async def test_capability_denied_propagates(self): + data = {"workspace": "acme"} + auth = _deny_auth() with pytest.raises(web.HTTPForbidden): - enforce_workspace( - data, reader_in("default"), capability="graph:write", + await enforce_workspace( + data, _Identity(), auth, capability="users:admin", ) - def test_with_capability_passes_when_granted(self): - data = {"workspace": "default"} - enforce_workspace( - data, reader_in("default"), capability="graph:read", - ) - assert data["workspace"] == "default" + @pytest.mark.asyncio + async def test_non_dict_passthrough(self): + auth = _allow_auth() + result = await enforce_workspace("not-a-dict", _Identity(), auth) + assert result == "not-a-dict" + auth.authorise.assert_not_called() # -- helpers --------------------------------------------------------------- @@ -199,5 +169,3 @@ class TestSentinels: def test_public_and_authenticated_are_distinct(self): assert PUBLIC != AUTHENTICATED - assert PUBLIC not in KNOWN_CAPABILITIES - assert AUTHENTICATED not in KNOWN_CAPABILITIES diff --git a/tests/unit/test_gateway/test_endpoint_manager.py b/tests/unit/test_gateway/test_endpoint_manager.py index cf12565c..8f659b71 100644 --- a/tests/unit/test_gateway/test_endpoint_manager.py +++ b/tests/unit/test_gateway/test_endpoint_manager.py @@ -73,14 +73,16 @@ class TestEndpointManager: prometheus_url="http://test:9090" ) - # Each dispatcher factory is invoked exactly once during - # construction — one per endpoint that needs a dedicated - # wire. dispatch_auth_iam is the dedicated factory for the - # AuthEndpoints forwarder (login / bootstrap / - # change-password), distinct from dispatch_global_service - # (the generic /api/v1/{kind} route). + # Each dispatcher factory is invoked once per endpoint that + # needs a dedicated wire. dispatch_auth_iam is shared by + # two endpoints — AuthEndpoints (login / bootstrap / + # change-password) and IamEndpoint (registry-driven + # /api/v1/iam) — so it's expected to be called twice. + # Both forwarders pin the dispatcher to kind=iam and reuse + # the same factory; they're distinct from + # dispatch_global_service (the generic /api/v1/{kind} route). mock_dispatcher_manager.dispatch_global_service.assert_called_once() - mock_dispatcher_manager.dispatch_auth_iam.assert_called_once() + assert mock_dispatcher_manager.dispatch_auth_iam.call_count == 2 mock_dispatcher_manager.dispatch_socket.assert_called_once() mock_dispatcher_manager.dispatch_flow_service.assert_called_once() mock_dispatcher_manager.dispatch_flow_import.assert_called_once() diff --git a/tests/unit/test_gateway/test_socket_graceful_shutdown.py b/tests/unit/test_gateway/test_socket_graceful_shutdown.py index 23f22d30..6c3e323b 100644 --- a/tests/unit/test_gateway/test_socket_graceful_shutdown.py +++ b/tests/unit/test_gateway/test_socket_graceful_shutdown.py @@ -25,11 +25,11 @@ from trustgraph.gateway.auth import Identity TEST_CAP = "graph:write" -def _valid_identity(roles=("admin",)): +def _valid_identity(): return Identity( - user_id="test-user", + handle="test-user", workspace="default", - roles=list(roles), + principal_id="test-user", source="api-key", ) @@ -37,11 +37,12 @@ def _valid_identity(roles=("admin",)): @pytest.fixture def mock_auth(): """Mock IAM-backed authenticator. Successful by default — - ``authenticate`` returns a valid admin identity. Tests that - need the auth failure path override the ``authenticate`` - attribute locally.""" + ``authenticate`` returns a valid identity and ``authorise`` + allows everything. Tests that need the failure paths override + the relevant attribute locally.""" auth = MagicMock() auth.authenticate = AsyncMock(return_value=_valid_identity()) + auth.authorise = AsyncMock(return_value=None) return auth @@ -135,6 +136,7 @@ async def test_handle_normal_flow(): """Valid bearer → handshake accepted, dispatcher created.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) dispatcher_created = False async def mock_dispatcher_factory(ws, running, match_info): @@ -192,6 +194,7 @@ async def test_handle_exception_group_cleanup(): """Test exception group triggers dispatcher cleanup.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) mock_dispatcher = AsyncMock() mock_dispatcher.destroy = AsyncMock() @@ -262,6 +265,7 @@ async def test_handle_dispatcher_cleanup_timeout(): """Test dispatcher cleanup with timeout.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) # Mock dispatcher that takes long to destroy mock_dispatcher = AsyncMock() @@ -388,6 +392,7 @@ async def test_handle_websocket_already_closed(): """Test handling when websocket is already closed.""" mock_auth = MagicMock() mock_auth.authenticate = AsyncMock(return_value=_valid_identity()) + mock_auth.authorise = AsyncMock(return_value=None) mock_dispatcher = AsyncMock() mock_dispatcher.destroy = AsyncMock() diff --git a/trustgraph-base/trustgraph/base/iam_client.py b/trustgraph-base/trustgraph/base/iam_client.py index 5cfda7c8..f90694fc 100644 --- a/trustgraph-base/trustgraph/base/iam_client.py +++ b/trustgraph-base/trustgraph/base/iam_client.py @@ -1,4 +1,6 @@ +import json + from . request_response_spec import RequestResponse, RequestResponseSpec from .. schema import ( IamRequest, IamResponse, @@ -44,7 +46,13 @@ class IamClient(RequestResponse): Returns ``(user_id, workspace, roles)`` or raises ``RuntimeError`` with error type ``auth-failed`` if the key is - unknown / expired / revoked.""" + unknown / expired / revoked. + + Note: the ``roles`` value is a regime-internal hint and is + not used by the gateway directly under the IAM contract; + all authorisation decisions go through ``authorise()``. + Returned here only for backward compatibility with callers + that haven't migrated.""" resp = await self._request( operation="resolve-api-key", api_key=api_key, @@ -56,6 +64,40 @@ class IamClient(RequestResponse): list(resp.resolved_roles), ) + async def authorise(self, identity_handle, capability, + resource, parameters, timeout=IAM_TIMEOUT): + """Ask the IAM regime whether ``identity_handle`` may perform + ``capability`` on ``resource`` given ``parameters``. + + Implements the contract ``authorise(identity, capability, + resource, parameters) → (decision, ttl)``. Returns a tuple + ``(allow: bool, ttl_seconds: int)``. The TTL is the + regime's suggested cache lifetime for this decision; the + gateway honours it (clamped above by gateway-side policy).""" + resp = await self._request( + operation="authorise", + user_id=identity_handle, + capability=capability, + resource_json=json.dumps(resource or {}, sort_keys=True), + parameters_json=json.dumps(parameters or {}, sort_keys=True), + timeout=timeout, + ) + return resp.decision_allow, resp.decision_ttl_seconds + + async def authorise_many(self, identity_handle, checks, + timeout=IAM_TIMEOUT): + """Bulk authorise. ``checks`` is a list of dicts each + carrying ``capability``, ``resource``, and ``parameters``. + Returns a list of ``(allow, ttl)`` tuples in the same order.""" + resp = await self._request( + operation="authorise-many", + user_id=identity_handle, + authorise_checks=json.dumps(list(checks), sort_keys=True), + timeout=timeout, + ) + decisions = json.loads(resp.decisions_json or "[]") + return [(d.get("allow", False), d.get("ttl", 0)) for d in decisions] + async def create_user(self, workspace, user, actor="", timeout=IAM_TIMEOUT): """Create a user. ``user`` is a ``UserInput``.""" diff --git a/trustgraph-base/trustgraph/schema/services/iam.py b/trustgraph-base/trustgraph/schema/services/iam.py index 1e3ab1ab..4b5685a5 100644 --- a/trustgraph-base/trustgraph/schema/services/iam.py +++ b/trustgraph-base/trustgraph/schema/services/iam.py @@ -99,6 +99,21 @@ class IamRequest: workspace_record: WorkspaceInput | None = None key: ApiKeyInput | None = None + # ---- authorise / authorise-many inputs ---- + # Capability string from the vocabulary in capabilities.md. + capability: str = "" + # Resource identifier as JSON. See the IAM contract spec for + # the resource-component vocabulary. An empty dict denotes a + # system-level resource. + resource_json: str = "" + # Operation parameters as JSON. Decision-relevant fields the + # operation supplied that are not part of the resource address + # (e.g. workspace association on create-user). + parameters_json: str = "" + # For authorise-many: a JSON-serialised list of + # {"capability": str, "resource": dict, "parameters": dict}. + authorise_checks: str = "" + @dataclass class IamResponse: @@ -133,6 +148,18 @@ class IamResponse: bootstrap_admin_user_id: str = "" bootstrap_admin_api_key: str = "" + # ---- authorise / authorise-many outputs ---- + # authorise: the regime's allow / deny verdict. + decision_allow: bool = False + # Cache TTL the regime suggests, in seconds. Gateway respects + # this for both allow and deny decisions; bounded above by + # gateway-side policy (typically <= 60s). + decision_ttl_seconds: int = 0 + # authorise-many: a JSON-serialised list of {"allow": bool, + # "ttl": int} in the same order as the request's + # authorise_checks. + decisions_json: str = "" + error: Error | None = None diff --git a/trustgraph-flow/trustgraph/gateway/auth.py b/trustgraph-flow/trustgraph/gateway/auth.py index 95743261..6abcbe15 100644 --- a/trustgraph-flow/trustgraph/gateway/auth.py +++ b/trustgraph-flow/trustgraph/gateway/auth.py @@ -1,15 +1,16 @@ """ -IAM-backed authentication for the API gateway. +IAM-backed authentication and authorisation for the API gateway. -Replaces the legacy GATEWAY_SECRET shared-token Authenticator. The -gateway is now stateless with respect to credentials: it either -verifies a JWT locally using the active IAM signing public key, or -resolves an API key by hash with a short local cache backed by the -IAM service. +The gateway delegates both authentication ("who is this caller?") +and authorisation ("may they do this?") to the IAM regime via the +contract specified in docs/tech-specs/iam-contract.md. No regime- +specific policy (roles, scopes, claims) lives in the gateway. -Identity returned by authenticate() is the (user_id, workspace, -roles) triple the rest of the gateway — capability checks, workspace -resolver, audit logging — needs. +- Authentication: API keys are resolved by IAM; JWTs are validated + locally against the cached signing public key. +- Authorisation: every per-request decision is asked of IAM via + ``authorise(identity, capability, resource, parameters)``, with + results cached for the TTL the regime returns. """ import asyncio @@ -19,7 +20,7 @@ import json import logging import time import uuid -from dataclasses import dataclass +from dataclasses import dataclass, field from aiohttp import web @@ -37,12 +38,34 @@ logger = logging.getLogger("auth") API_KEY_CACHE_TTL = 60 # seconds +# Upper bound on cache TTL the gateway honours for an authorisation +# decision, regardless of what the regime suggested. Caps the +# revocation latency window. +AUTHZ_CACHE_TTL_MAX = 60 # seconds + @dataclass class Identity: - user_id: str + """The gateway-side surface of an authenticated caller. + + Per the IAM contract this is a small fixed shape; regime-internal + state (roles, claims, group memberships) is reachable only via + the regime's ``authorise`` operation. The gateway itself never + reads policy from this object. + """ + # Opaque handle, quoted back when calling ``authorise``. For + # the OSS regime this is the user record's id; the gateway + # treats it as a string with no semantic content. + handle: str + # The workspace this credential authenticates to. Used by the + # gateway as the default-fill-in for operations that omit a + # workspace. Never used as policy input. workspace: str - roles: list + # Stable identifier for audit logs. In OSS this is the same + # value as ``handle``; not assumed equal in the contract. + principal_id: str + # How the credential was presented. Non-policy; useful for + # logs / metrics only. source: str # "api-key" | "jwt" @@ -111,6 +134,13 @@ class IamAuth: self._key_cache = {} self._key_cache_lock = asyncio.Lock() + # Authorisation decision cache: hash(handle, capability, + # resource, parameters) -> (allow_bool, expires_ts). Holds + # both allows and denies — denies cached briefly to avoid + # hammering iam-svc with repeated rejected attempts. + self._authz_cache: dict[str, tuple[bool, float]] = {} + self._authz_cache_lock = asyncio.Lock() + # ------------------------------------------------------------------ # Short-lived client helper. Mirrors the pattern used by the # bootstrap framework and AsyncProcessor: a fresh uuid suffix per @@ -221,12 +251,13 @@ class IamAuth: sub = claims.get("sub", "") ws = claims.get("workspace", "") - roles = list(claims.get("roles", [])) if not sub or not ws: raise _auth_failure() + # JWT carries no policy state under the IAM contract; + # any roles / claims field is ignored here. return Identity( - user_id=sub, workspace=ws, roles=roles, source="jwt", + handle=sub, workspace=ws, principal_id=sub, source="jwt", ) async def _resolve_api_key(self, plaintext): @@ -245,7 +276,10 @@ class IamAuth: try: async def _call(client): return await client.resolve_api_key(plaintext) - user_id, workspace, roles = await self._with_client(_call) + # ``roles`` is returned by the OSS regime as a hint + # but is not consulted by the gateway; all policy + # decisions go through ``authorise``. + user_id, workspace, _roles = await self._with_client(_call) except Exception as e: logger.debug( f"API key resolution failed: " @@ -257,8 +291,81 @@ class IamAuth: raise _auth_failure() identity = Identity( - user_id=user_id, workspace=workspace, - roles=list(roles), source="api-key", + handle=user_id, workspace=workspace, + principal_id=user_id, source="api-key", ) self._key_cache[h] = (identity, now + API_KEY_CACHE_TTL) return identity + + # ------------------------------------------------------------------ + # Authorisation + # ------------------------------------------------------------------ + + @staticmethod + def _authz_cache_key(handle, capability, resource, parameters): + payload = json.dumps( + { + "h": handle, + "c": capability, + "r": resource or {}, + "p": parameters or {}, + }, + sort_keys=True, + separators=(",", ":"), + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + async def authorise(self, identity, capability, resource, parameters): + """Ask the IAM regime whether ``identity`` may perform + ``capability`` on ``resource`` given ``parameters``. + + Caches the decision for the regime's suggested TTL, clamped + above by ``AUTHZ_CACHE_TTL_MAX``. Both allow and deny + decisions are cached (denies briefly, to avoid hammering + iam-svc with repeated rejected attempts). + + Raises ``HTTPForbidden`` (403 / "access denied") on a deny + decision. Raises ``HTTPUnauthorized`` (401 / "auth failure") + if the IAM service errors out — failing closed.""" + + key = self._authz_cache_key( + identity.handle, capability, resource, parameters, + ) + now = time.time() + + cached = self._authz_cache.get(key) + if cached and cached[1] > now: + allow, _ = cached + if not allow: + raise _access_denied() + return + + async with self._authz_cache_lock: + cached = self._authz_cache.get(key) + if cached and cached[1] > now: + allow, _ = cached + if not allow: + raise _access_denied() + return + + try: + async def _call(client): + return await client.authorise( + identity.handle, capability, + resource or {}, parameters or {}, + ) + allow, ttl = await self._with_client(_call) + except Exception as e: + logger.warning( + f"authorise failed: {type(e).__name__}: {e}; " + f"failing closed for " + f"{identity.principal_id!r} cap={capability!r}" + ) + raise _auth_failure() + + ttl = max(0, min(int(ttl or 0), AUTHZ_CACHE_TTL_MAX)) + self._authz_cache[key] = (bool(allow), now + ttl) + + if not allow: + raise _access_denied() + return diff --git a/trustgraph-flow/trustgraph/gateway/capabilities.py b/trustgraph-flow/trustgraph/gateway/capabilities.py index 15e25684..72ca51c7 100644 --- a/trustgraph-flow/trustgraph/gateway/capabilities.py +++ b/trustgraph-flow/trustgraph/gateway/capabilities.py @@ -1,36 +1,23 @@ """ -Capability vocabulary, role definitions, and authorisation helpers. +Gateway-side authorisation entry points. -See docs/tech-specs/capabilities.md for the authoritative description. -The data here is the OSS bundle table in that spec. Enterprise -editions may replace this module with their own role table; the -vocabulary (capability strings) is shared. +Under the IAM contract (see docs/tech-specs/iam-contract.md) the +gateway holds *no* policy state. Roles, capability sets, and +workspace-scope rules all live in the IAM regime (iam-svc for OSS). +This module is the thin surface the gateway uses to ask the regime +for a decision: -Role model ----------- -A role has two dimensions: +- ``PUBLIC`` / ``AUTHENTICATED`` sentinels for endpoints that don't + go through capability-based authorisation. +- :func:`enforce` — authenticate-only, then ask the regime. +- :func:`enforce_workspace` — default-fill the workspace from the + caller's bound workspace and ask the regime, with the workspace + treated as the resource address. - 1. **capability set** — which operations the role grants. - 2. **workspace scope** — which workspaces the role is active in. - -The authorisation question is: *given the caller's roles, a required -capability, and a target workspace, does any role grant the -capability AND apply to the target workspace?* - -Workspace scope values recognised here: - - - ``"assigned"`` — the role applies only to the caller's own - assigned workspace (stored on their user record). - - ``"*"`` — the role applies to every workspace. - -Enterprise editions can add richer scopes (explicit permitted-set, -patterns, etc.) without changing the wire protocol. - -Sentinels ---------- -- ``PUBLIC`` — endpoint requires no authentication. -- ``AUTHENTICATED`` — endpoint requires a valid identity, no - specific capability. +The capability strings themselves are an open vocabulary — see +docs/tech-specs/capabilities.md. The gateway does not validate them +beyond passing them through; an unknown capability simply produces a +deny verdict from the regime. """ from aiohttp import web @@ -40,125 +27,6 @@ PUBLIC = "__public__" AUTHENTICATED = "__authenticated__" -# Capability vocabulary. Mirrors the "Capability list" tables in -# capabilities.md. Kept as a set so the gateway can fail-closed on -# an endpoint that declares an unknown capability. -KNOWN_CAPABILITIES = { - # Data plane - "agent", - "graph:read", "graph:write", - "documents:read", "documents:write", - "rows:read", "rows:write", - "llm", - "embeddings", - "mcp", - # Control plane - "config:read", "config:write", - "flows:read", "flows:write", - "users:read", "users:write", "users:admin", - "keys:self", "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", - "collections:read", "collections:write", - "knowledge:read", "knowledge:write", -} - - -# Capability sets used below. -_READER_CAPS = { - "agent", - "graph:read", - "documents:read", - "rows:read", - "llm", - "embeddings", - "mcp", - "config:read", - "flows:read", - "collections:read", - "knowledge:read", - "keys:self", -} - -_WRITER_CAPS = _READER_CAPS | { - "graph:write", - "documents:write", - "rows:write", - "collections:write", - "knowledge:write", -} - -_ADMIN_CAPS = _WRITER_CAPS | { - "config:write", - "flows:write", - "users:read", "users:write", "users:admin", - "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", -} - - -# Role definitions. Each role has a capability set and a workspace -# scope. Enterprise overrides this mapping. -ROLE_DEFINITIONS = { - "reader": { - "capabilities": _READER_CAPS, - "workspace_scope": "assigned", - }, - "writer": { - "capabilities": _WRITER_CAPS, - "workspace_scope": "assigned", - }, - "admin": { - "capabilities": _ADMIN_CAPS, - "workspace_scope": "*", - }, -} - - -def _scope_permits(role_name, target_workspace, assigned_workspace): - """Does the given role apply to ``target_workspace``?""" - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - return False - scope = role["workspace_scope"] - if scope == "*": - return True - if scope == "assigned": - return target_workspace == assigned_workspace - # Future scope types (lists, patterns) extend here. - return False - - -def check(identity, capability, target_workspace=None): - """Is ``identity`` permitted to invoke ``capability`` on - ``target_workspace``? - - Passes iff some role held by the caller both (a) grants - ``capability`` and (b) is active in ``target_workspace``. - - ``target_workspace`` defaults to the caller's assigned workspace, - which makes this function usable for system-level operations and - for authenticated endpoints that don't take a workspace argument - (the call collapses to "do any of my roles grant this cap?").""" - if capability not in KNOWN_CAPABILITIES: - return False - - target = target_workspace or identity.workspace - - for role_name in identity.roles: - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - continue - if capability not in role["capabilities"]: - continue - if _scope_permits(role_name, target, identity.workspace): - return True - return False - - def access_denied(): return web.HTTPForbidden( text='{"error":"access denied"}', @@ -174,21 +42,19 @@ def auth_failure(): async def enforce(request, auth, capability): - """Authenticate + capability-check for endpoints that carry no - workspace dimension on the request (metrics, i18n, etc.). + """Authenticate the caller and (for non-sentinel capabilities) + ask the IAM regime whether they may invoke ``capability``. - For endpoints that carry a workspace field on the body, call - :func:`enforce_workspace` *after* parsing the body to validate - the workspace and re-check the capability in that scope. Most - endpoints do both. + The resource is system-level (``{}``) and parameters are empty — + use :func:`enforce_workspace` for workspace-scoped endpoints, or + drive authorisation through the operation registry for richer + cases. - - ``PUBLIC``: no authentication, returns ``None``. - - ``AUTHENTICATED``: any valid identity. - - capability string: identity must have it, checked against the - caller's assigned workspace (adequate for endpoints whose - capability is system-level, e.g. ``metrics:read``, or where - the real workspace-aware check happens in - :func:`enforce_workspace` after body parsing).""" + - ``PUBLIC``: returns ``None`` — no authentication. + - ``AUTHENTICATED``: returns the ``Identity`` — no authorisation. + - capability string: returns the ``Identity`` if the regime + allows; raises ``HTTPForbidden`` otherwise. + """ if capability == PUBLIC: return None @@ -197,42 +63,38 @@ async def enforce(request, auth, capability): if capability == AUTHENTICATED: return identity - if not check(identity, capability): - raise access_denied() - + await auth.authorise(identity, capability, {}, {}) return identity -def enforce_workspace(data, identity, capability=None): - """Resolve + validate the workspace on a request body. +async def enforce_workspace(data, identity, auth, capability=None): + """Default-fill the workspace on a request body and (optionally) + authorise the caller for ``capability`` against that workspace. - Target workspace = ``data["workspace"]`` if supplied, else the - caller's assigned workspace. - - At least one of the caller's roles must (a) be active in the - target workspace and, if ``capability`` is given, (b) grant - ``capability``. Otherwise 403. + caller's bound workspace. - On success, ``data["workspace"]`` is overwritten with the - resolved value — callers can rely on the outgoing message - having the gateway's chosen workspace rather than any - caller-supplied value. + resolved value so downstream code sees a single canonical + address. + - When ``capability`` is given, the regime is asked whether the + caller may invoke ``capability`` on ``{workspace: target}``. + Raises ``HTTPForbidden`` on a deny. - For ``capability=None`` the workspace scope alone is checked — - useful when the body has a workspace but the endpoint already - passed its capability check (e.g. via :func:`enforce`).""" + For ``capability=None`` no authorisation call is made — the + caller has presumably already authorised via :func:`enforce` + (handy for endpoints that authorise once then resolve workspace + on the body before forwarding). + """ if not isinstance(data, dict): return data requested = data.get("workspace", "") target = requested or identity.workspace + data["workspace"] = target - for role_name in identity.roles: - role = ROLE_DEFINITIONS.get(role_name) - if role is None: - continue - if capability is not None and capability not in role["capabilities"]: - continue - if _scope_permits(role_name, target, identity.workspace): - data["workspace"] = target - return data + if capability is not None: + await auth.authorise( + identity, capability, {"workspace": target}, {}, + ) - raise access_denied() + return data diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index 013cd1ea..37a72f11 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -121,20 +121,45 @@ class Mux: }) return - # Workspace resolution. Role workspace scope determines - # which target workspaces are permitted. The resolved - # value is written to both the envelope and the inner - # request payload so clients don't have to repeat it - # per-message (same convenience HTTP callers get via - # enforce_workspace). + # Per-service capability gating. Resolved through the + # operation registry so the WS path matches what HTTP + # callers see — same authority, same caps. Service + # kinds that aren't registered are refused. + from ..registry import lookup as _registry_lookup from ..capabilities import enforce_workspace from aiohttp import web as _web + service = data.get("service", "") + op = _registry_lookup(f"flow-service:{service}") + if op is None: + await self.ws.send_json({ + "id": request_id, + "error": { + "message": "unknown service", + "type": "unknown-service", + }, + "complete": True, + }) + return + + # Workspace + flow form the resource address for a + # flow-level service call. Resolve workspace first + # (default-fill from the caller's bound workspace), + # then ask the regime to authorise the service-level + # capability against that {workspace, flow} resource. try: - enforce_workspace(data, self.identity) + await enforce_workspace(data, self.identity, self.auth) inner = data.get("request") if isinstance(inner, dict): - enforce_workspace(inner, self.identity) + await enforce_workspace(inner, self.identity, self.auth) + + resource = { + "workspace": data.get("workspace", ""), + "flow": data.get("flow", ""), + } + await self.auth.authorise( + self.identity, op.capability, resource, {}, + ) except _web.HTTPForbidden: await self.ws.send_json({ "id": request_id, @@ -145,6 +170,16 @@ class Mux: "complete": True, }) return + except _web.HTTPUnauthorized: + await self.ws.send_json({ + "id": request_id, + "error": { + "message": "auth failure", + "type": "auth-required", + }, + "complete": True, + }) + return workspace = data["workspace"] diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py b/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py index 6037fc4b..0b476b7b 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py @@ -97,7 +97,7 @@ class AuthEndpoints: ) req = { "operation": "change-password", - "user_id": identity.user_id, + "user_id": identity.handle, "password": body.get("current_password", ""), "new_password": body.get("new_password", ""), } diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py index ee9c0447..920b02ca 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py @@ -36,7 +36,7 @@ class ConstantEndpoint: data = await request.json() if identity is not None: - enforce_workspace(data, identity) + await enforce_workspace(data, identity, self.auth) async def responder(x, fin): pass diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py new file mode 100644 index 00000000..70fa33f7 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/endpoint/iam_endpoint.py @@ -0,0 +1,106 @@ +""" +Registry-driven /api/v1/iam endpoint. + +The gateway no longer gates IAM management with a single coarse +``users:admin`` capability. Instead, each operation declares its +own capability + resource shape in the registry (``registry.py``); +this endpoint reads the body's ``operation`` field, looks up the +declaration, and asks the IAM regime to authorise the call. + +Operations not in the registry produce a 400 ``unknown operation``. +This is the gateway's primary mechanism for fail-closed gating of +the IAM surface — the registry is the source of truth. +""" + +import logging + +from aiohttp import web + +from .. capabilities import ( + PUBLIC, AUTHENTICATED, auth_failure, +) +from .. registry import lookup, RequestContext + +logger = logging.getLogger("iam-endpoint") +logger.setLevel(logging.INFO) + + +class IamEndpoint: + """POST /api/v1/iam — generic forwarder gated by the operation + registry. The IAM dispatcher (``iam_dispatcher``) forwards the + body verbatim to iam-svc once authorisation succeeds.""" + + def __init__(self, endpoint_path, auth, dispatcher): + self.path = endpoint_path + self.auth = auth + self.dispatcher = dispatcher + + async def start(self): + pass + + def add_routes(self, app): + app.add_routes([web.post(self.path, self.handle)]) + + async def handle(self, request): + try: + body = await request.json() + except Exception: + return web.json_response( + {"error": "invalid json"}, status=400, + ) + if not isinstance(body, dict): + return web.json_response( + {"error": "body must be an object"}, status=400, + ) + + op_name = body.get("operation", "") + op = lookup(op_name) + if op is None: + return web.json_response( + {"error": "unknown operation"}, status=400, + ) + + # Authentication: required for everything except PUBLIC. + identity = None + if op.capability != PUBLIC: + try: + identity = await self.auth.authenticate(request) + except web.HTTPException: + raise + + # Authorisation: capability sentinels short-circuit the + # regime call; capability strings go through authorise(). + if op.capability not in (PUBLIC, AUTHENTICATED): + ctx = RequestContext( + body=body, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + except Exception as e: + logger.warning( + f"extractor failed for {op_name!r}: " + f"{type(e).__name__}: {e}" + ) + return web.json_response( + {"error": "bad request"}, status=400, + ) + + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + + async def responder(x, fin): + pass + + try: + resp = await self.dispatcher.process(body, responder) + except web.HTTPException: + raise + except Exception as e: + logger.error(f"Exception: {e}", exc_info=True) + return web.json_response({"error": str(e)}) + + return web.json_response(resp) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index 69b11e07..ed5ef4b5 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -9,90 +9,44 @@ from . socket import SocketEndpoint from . metrics import MetricsEndpoint from . i18n import I18nPackEndpoint from . auth_endpoints import AuthEndpoints +from . iam_endpoint import IamEndpoint +from . registry_endpoint import RegistryRoutedVariableEndpoint -from .. capabilities import PUBLIC, AUTHENTICATED +from .. capabilities import PUBLIC, AUTHENTICATED, auth_failure +from .. registry import lookup as _registry_lookup, RequestContext from .. dispatch.manager import DispatcherManager -# Capability required for each kind on the /api/v1/{kind} generic -# endpoint (global services). Coarse gating — the IAM bundle split -# of "read vs write" per admin subsystem is not applied here because -# this endpoint forwards an opaque operation in the body. Writes -# are the upper bound on what the endpoint can do, so we gate on -# the write/admin capability. -GLOBAL_KIND_CAPABILITY = { - "config": "config:write", - "flow": "flows:write", - "librarian": "documents:write", - "knowledge": "knowledge:write", - "collection-management": "collections:write", - # IAM endpoints land on /api/v1/iam and require the admin bundle. - # Login / bootstrap / change-password are served by - # AuthEndpoints, which handle their own gating (PUBLIC / - # AUTHENTICATED). - "iam": "users:admin", -} +# /api/v1/{kind} (config / flow / librarian / knowledge / +# collection-management), /api/v1/iam, and /api/v1/flow/{flow}/... +# routes are all gated per-operation by the registry, not by a +# per-kind capability map. Login / bootstrap / change-password are +# served by AuthEndpoints with their own PUBLIC / AUTHENTICATED +# sentinels. -# Capability required for each kind on the -# /api/v1/flow/{flow}/service/{kind} endpoint (per-flow data-plane). -FLOW_KIND_CAPABILITY = { - "agent": "agent", - "text-completion": "llm", - "prompt": "llm", - "mcp-tool": "mcp", - "graph-rag": "graph:read", - "document-rag": "documents:read", - "embeddings": "embeddings", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "triples": "graph:read", - "rows": "rows:read", - "nlp-query": "rows:read", - "structured-query": "rows:read", - "structured-diag": "rows:read", - "row-embeddings": "rows:read", - "sparql": "graph:read", -} - - -# Capability for the streaming flow import/export endpoints, -# keyed by the "kind" URL segment. -FLOW_IMPORT_CAPABILITY = { - "triples": "graph:write", - "graph-embeddings": "graph:write", - "document-embeddings": "documents:write", - "entity-contexts": "documents:write", - "rows": "rows:write", -} - -FLOW_EXPORT_CAPABILITY = { - "triples": "graph:read", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "entity-contexts": "documents:read", -} - - -from .. capabilities import enforce, enforce_workspace import logging as _mgr_logging _mgr_logger = _mgr_logging.getLogger("endpoint") class _RoutedVariableEndpoint: - """HTTP endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the two - generic dispatch paths (``/api/v1/{kind}`` and - ``/api/v1/flow/{flow}/service/{kind}``). Self-contained rather - than subclassing ``VariableEndpoint`` to avoid mutating shared - state across concurrent requests.""" + """HTTP endpoint that gates per request via the operation + registry. The URL's ``kind`` parameter combined with a fixed + ``registry_prefix`` yields the registry key — e.g. prefix + ``flow-service`` and kind ``agent`` looks up + ``flow-service:agent``. - def __init__(self, endpoint_path, auth, dispatcher, capability_map): + Used for ``/api/v1/flow/{flow}/service/{kind}`` (per-flow + data-plane services). ``/api/v1/{kind}`` (workspace-level + global services) goes through ``RegistryRoutedVariableEndpoint`` + which discriminates on body operation as well as URL kind.""" + + def __init__(self, endpoint_path, auth, dispatcher, registry_prefix): self.path = endpoint_path self.auth = auth self.dispatcher = dispatcher - self._capability_map = capability_map + self._registry_prefix = registry_prefix async def start(self): pass @@ -102,18 +56,26 @@ class _RoutedVariableEndpoint: async def handle(self, request): kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: + op = _registry_lookup(f"{self._registry_prefix}:{kind}") + if op is None: return web.json_response( {"error": "unknown kind"}, status=404, ) - identity = await enforce(request, self.auth, cap) + identity = await self.auth.authenticate(request) try: data = await request.json() - if identity is not None: - enforce_workspace(data, identity) + ctx = RequestContext( + body=data if isinstance(data, dict) else {}, + match_info=dict(request.match_info), + identity=identity, + ) + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) async def responder(x, fin): pass @@ -131,15 +93,15 @@ class _RoutedVariableEndpoint: class _RoutedSocketEndpoint: - """WebSocket endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the flow - import/export streaming endpoints.""" + """WebSocket endpoint gated per request via the operation + registry. Like ``_RoutedVariableEndpoint`` but for the + streaming flow import / export socket paths.""" - def __init__(self, endpoint_path, auth, dispatcher, capability_map): + def __init__(self, endpoint_path, auth, dispatcher, registry_prefix): self.path = endpoint_path self.auth = auth self.dispatcher = dispatcher - self._capability_map = capability_map + self._registry_prefix = registry_prefix async def start(self): pass @@ -148,11 +110,9 @@ class _RoutedSocketEndpoint: app.add_routes([web.get(self.path, self.handle)]) async def handle(self, request): - from .. capabilities import check, auth_failure, access_denied - kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: + op = _registry_lookup(f"{self._registry_prefix}:{kind}") + if op is None: return web.json_response( {"error": "unknown kind"}, status=404, ) @@ -168,8 +128,20 @@ class _RoutedSocketEndpoint: ) except web.HTTPException as e: return e - if not check(identity, cap): - return access_denied() + + ctx = RequestContext( + body={}, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + except web.HTTPException as e: + return e # Delegate the websocket handling to a standalone SocketEndpoint # with the resolved capability, bypassing the per-request mutation @@ -178,7 +150,7 @@ class _RoutedSocketEndpoint: endpoint_path=self.path, auth=self.auth, dispatcher=self.dispatcher, - capability=cap, + capability=op.capability, ) return await ws_ep.handle(request) @@ -203,6 +175,18 @@ class EndpointManager: auth=auth, ), + # /api/v1/iam — registry-driven IAM management. Per + # operation gating happens inside IamEndpoint via the + # operation registry; the dispatcher forwards verbatim + # to iam-svc once authorisation has succeeded. Listed + # before the generic /api/v1/{kind} route so it wins + # the match for "iam". + IamEndpoint( + endpoint_path="/api/v1/iam", + auth=auth, + dispatcher=dispatcher_manager.dispatch_auth_iam(), + ), + I18nPackEndpoint( endpoint_path="/api/v1/i18n/packs/{lang}", auth=auth, @@ -215,12 +199,16 @@ class EndpointManager: capability="metrics:read", ), - # Global services: capability chosen per-kind. - _RoutedVariableEndpoint( + # Global services: registry-driven per-operation gating. + # Each kind+op combination has a registry entry that + # declares its capability and resource shape. Listed + # after the IAM and auth-surface routes; aiohttp's + # path matcher prefers the more-specific path so this + # variable route doesn't shadow them. + RegistryRoutedVariableEndpoint( endpoint_path="/api/v1/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_global_service(), - capability_map=GLOBAL_KIND_CAPABILITY, ), # /api/v1/socket: WebSocket handshake accepts @@ -236,26 +224,29 @@ class EndpointManager: in_band_auth=True, ), - # Per-flow request/response services — capability per kind. + # Per-flow request/response services — gated per + # ``flow-service:`` registry entry. _RoutedVariableEndpoint( endpoint_path="/api/v1/flow/{flow}/service/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_service(), - capability_map=FLOW_KIND_CAPABILITY, + registry_prefix="flow-service", ), - # Per-flow streaming import/export — capability per kind. + # Per-flow streaming import/export — gated per + # ``flow-import:`` / ``flow-export:`` registry + # entry. _RoutedSocketEndpoint( endpoint_path="/api/v1/flow/{flow}/import/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_import(), - capability_map=FLOW_IMPORT_CAPABILITY, + registry_prefix="flow-import", ), _RoutedSocketEndpoint( endpoint_path="/api/v1/flow/{flow}/export/{kind}", auth=auth, dispatcher=dispatcher_manager.dispatch_flow_export(), - capability_map=FLOW_EXPORT_CAPABILITY, + registry_prefix="flow-export", ), StreamEndpoint( diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py new file mode 100644 index 00000000..296376fa --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/endpoint/registry_endpoint.py @@ -0,0 +1,123 @@ +""" +Registry-driven dispatch for ``/api/v1/{kind}`` global services. + +The body's ``operation`` field plus the URL's ``{kind}`` together +form the canonical operation name (``:``) that the +gateway looks up in ``registry.py``. The matched operation +declares its capability and resource shape; this endpoint asks the +IAM regime to authorise the call before forwarding the body +verbatim to the backend dispatcher. + +The dispatcher is the same ``dispatch_global_service()`` factory the +old coarse path used; only the gating layer has changed. + +Operations not present in the registry are rejected with 400 +``unknown operation`` — fail closed. +""" + +import logging + +from aiohttp import web + +from .. capabilities import ( + PUBLIC, AUTHENTICATED, auth_failure, +) +from .. registry import lookup, RequestContext + +logger = logging.getLogger("registry-endpoint") +logger.setLevel(logging.INFO) + + +class RegistryRoutedVariableEndpoint: + """POST /api/v1/{kind} — kind comes from the URL, operation comes + from the body, both are joined as the registry key.""" + + def __init__(self, endpoint_path, auth, dispatcher): + self.path = endpoint_path + self.auth = auth + self.dispatcher = dispatcher + + async def start(self): + pass + + def add_routes(self, app): + app.add_routes([web.post(self.path, self.handle)]) + + async def handle(self, request): + kind = request.match_info.get("kind", "") + if not kind: + return web.json_response( + {"error": "missing kind"}, status=404, + ) + + try: + body = await request.json() + except Exception: + return web.json_response( + {"error": "invalid json"}, status=400, + ) + if not isinstance(body, dict): + return web.json_response( + {"error": "body must be an object"}, status=400, + ) + + op_name = body.get("operation", "") + if not op_name: + return web.json_response( + {"error": "missing operation"}, status=400, + ) + + registry_key = f"{kind}:{op_name}" + op = lookup(registry_key) + if op is None: + return web.json_response( + {"error": "unknown operation"}, status=400, + ) + + identity = None + if op.capability != PUBLIC: + identity = await self.auth.authenticate(request) + + if op.capability not in (PUBLIC, AUTHENTICATED): + ctx = RequestContext( + body=body, + match_info=dict(request.match_info), + identity=identity, + ) + try: + resource = op.extract_resource(ctx) + parameters = op.extract_parameters(ctx) + except Exception as e: + logger.warning( + f"extractor failed for {registry_key!r}: " + f"{type(e).__name__}: {e}" + ) + return web.json_response( + {"error": "bad request"}, status=400, + ) + + await self.auth.authorise( + identity, op.capability, resource, parameters, + ) + + # Default-fill workspace into the body so downstream + # dispatchers see the canonical resolved value. The + # extractor has already pulled the workspace out; + # mirror it back to the body for the verbatim forward. + if "workspace" in resource: + body["workspace"] = resource["workspace"] + + async def responder(x, fin): + pass + + try: + resp = await self.dispatcher.process( + body, responder, request.match_info, + ) + except web.HTTPException: + raise + except Exception as e: + logger.error(f"Exception: {e}", exc_info=True) + return web.json_response({"error": str(e)}) + + return web.json_response(resp) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py index 08629ea2..f53ad73b 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py @@ -5,7 +5,7 @@ import logging from .. running import Running from .. capabilities import ( - PUBLIC, AUTHENTICATED, check, auth_failure, access_denied, + PUBLIC, AUTHENTICATED, auth_failure, ) logger = logging.getLogger("socket") @@ -97,8 +97,12 @@ class SocketEndpoint: except web.HTTPException as e: return e if self.capability != AUTHENTICATED: - if not check(identity, self.capability): - return access_denied() + try: + await self.auth.authorise( + identity, self.capability, {}, {}, + ) + except web.HTTPException as e: + return e # 50MB max message size ws = web.WebSocketResponse(max_msg_size=52428800) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py index 5e0d9d21..6a336f42 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py @@ -36,7 +36,7 @@ class VariableEndpoint: data = await request.json() if identity is not None: - enforce_workspace(data, identity) + await enforce_workspace(data, identity, self.auth) async def responder(x, fin): pass diff --git a/trustgraph-flow/trustgraph/gateway/registry.py b/trustgraph-flow/trustgraph/gateway/registry.py new file mode 100644 index 00000000..32a517a9 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/registry.py @@ -0,0 +1,515 @@ +""" +Gateway operation registry. + +Single declarative table mapping each operation the gateway +recognises to: + +- The capability the IAM regime is asked to authorise against. +- The resource level (system / workspace / flow) — determines the + shape of the resource identifier handed to ``authorise``. +- Extractors that build the resource and parameters from the + request context. + +This is a gateway-internal concept. It is not part of the IAM +contract — the contract specifies what arguments ``authorise`` +receives; the registry is how the gateway populates them. + +See docs/tech-specs/iam-contract.md for the contract and +docs/tech-specs/iam.md for the request anatomy. +""" + +from dataclasses import dataclass, field +from typing import Any, Callable + + +# Sentinels for operations that don't go through capability-based +# authorisation. Mirror the values used in capabilities.py so the +# gateway endpoint layer can recognise them uniformly. +PUBLIC = "__public__" +AUTHENTICATED = "__authenticated__" + + +class ResourceLevel: + """Where the operation's resource lives. + + ``SYSTEM`` — operation acts on a deployment-level resource + (the user registry, the workspace registry, + the signing key). resource = {}. Workspace, + if relevant, is a parameter, not an address. + + ``WORKSPACE`` — operation acts on something within a workspace + (config, library, knowledge, collections, flow + lifecycle). resource = {workspace}. + + ``FLOW`` — operation acts on something within a flow + within a workspace (graph, agent, llm, etc.). + resource = {workspace, flow}. + """ + SYSTEM = "system" + WORKSPACE = "workspace" + FLOW = "flow" + + +@dataclass +class RequestContext: + """The bundle of inputs the registry's extractors operate on. + Assembled by the gateway from the incoming request after + authentication.""" + + # Parsed JSON body (HTTP) or inner request payload (WebSocket). + body: dict = field(default_factory=dict) + + # URL path components (HTTP) or WebSocket envelope routing + # fields (id, service, workspace, flow). + match_info: dict = field(default_factory=dict) + + # Authenticated identity for default-fill-in. Always present + # by the time extractors run, except for PUBLIC operations + # where it is None. + identity: Any = None + + +@dataclass +class Operation: + """Declared operation the gateway can dispatch + authorise.""" + + # Canonical operation name (used for registry lookup, audit, + # debug logs). Mirrors the operation strings in the IAM + # service and other backends where applicable. + name: str + + # Capability required to invoke this operation. Either a + # string from the capability vocabulary in capabilities.md, or + # the PUBLIC / AUTHENTICATED sentinel for operations that + # don't go through capability-based authorisation. + capability: str + + # Where the operation's resource lives. Determines the + # shape of the resource argument passed to authorise. + resource_level: str + + # Build the resource identifier from the request context. + # Returns a dict with the appropriate components for the + # resource level: {} for SYSTEM, {workspace} for WORKSPACE, + # {workspace, flow} for FLOW. Default-fill-in of workspace + # from identity.workspace happens here when applicable. + extract_resource: Callable[[RequestContext], dict] + + # Build the parameters dict — decision-relevant fields the + # operation supplied that are not part of the resource + # address. E.g. workspace association on a system-level + # user-registry operation. + extract_parameters: Callable[[RequestContext], dict] + + +# --------------------------------------------------------------------------- +# Registry storage. +# --------------------------------------------------------------------------- + + +_REGISTRY: dict[str, Operation] = {} + + +def register(op: Operation) -> None: + if op.name in _REGISTRY: + raise RuntimeError( + f"operation {op.name!r} already registered" + ) + _REGISTRY[op.name] = op + + +def lookup(name: str) -> Operation | None: + return _REGISTRY.get(name) + + +def all_operations() -> list[Operation]: + return list(_REGISTRY.values()) + + +# --------------------------------------------------------------------------- +# Common extractor helpers. +# --------------------------------------------------------------------------- + + +def _empty_resource(_ctx: RequestContext) -> dict: + """System-level resource: empty dict.""" + return {} + + +def _workspace_from_body(ctx: RequestContext) -> dict: + """Workspace-level resource sourced from the request body's + workspace field, defaulting to the caller's bound workspace.""" + ws = (ctx.body.get("workspace") if isinstance(ctx.body, dict) else "") + if not ws and ctx.identity is not None: + ws = ctx.identity.workspace + return {"workspace": ws} + + +def _flow_from_match_info(ctx: RequestContext) -> dict: + """Flow-level resource sourced from URL path components or WS + envelope fields. Both ``workspace`` and ``flow`` are required; + no default-fill-in (the address is the operation's identity).""" + return { + "workspace": ctx.match_info.get("workspace", ""), + "flow": ctx.match_info.get("flow", ""), + } + + +def _no_parameters(_ctx: RequestContext) -> dict: + return {} + + +def _body_as_parameters(ctx: RequestContext) -> dict: + """All body fields are parameters — used when the operation's + body is small and uniformly decision-relevant (e.g. user- + registry ops where the body's user.workspace is what the + regime checks against the admin's scope).""" + return dict(ctx.body) if isinstance(ctx.body, dict) else {} + + +def _workspace_param_only(ctx: RequestContext) -> dict: + """Parameters dict carrying only the workspace association. + Used by system-level operations (e.g. user-registry ops) where + the workspace isn't part of the resource address but is the + field the regime uses to scope the admin's authority. + + Pulls the workspace from the inner ``user`` / ``workspace_record`` + body field if present (create-user, create-workspace), then from + the top-level body, then from the caller's bound workspace.""" + body = ctx.body if isinstance(ctx.body, dict) else {} + inner_user = body.get("user") if isinstance(body.get("user"), dict) else {} + inner_ws = ( + body.get("workspace_record") + if isinstance(body.get("workspace_record"), dict) else {} + ) + ws = ( + inner_user.get("workspace") + or inner_ws.get("id") + or body.get("workspace") + ) + if not ws and ctx.identity is not None: + ws = ctx.identity.workspace + return {"workspace": ws or ""} + + +# --------------------------------------------------------------------------- +# Operation registrations. +# +# The gateway looks operations up by their canonical name (the same +# string the request body / WS envelope carries in its ``operation`` +# field where applicable). Auth-surface operations (login, bootstrap, +# change-password) are not listed here — they have their own routes +# in auth_endpoints.py and use PUBLIC / AUTHENTICATED sentinels +# directly. Pure gateway↔IAM internal operations (resolve-api-key, +# authorise, authorise-many, get-signing-key-public) are likewise +# excluded; they are never invoked over the public API. +# --------------------------------------------------------------------------- + + +# IAM management operations. All routed through /api/v1/iam, body +# carries ``operation`` plus operation-specific fields. + +# User registry: SYSTEM-level resource (users are global, identified +# by handle). The admin's authority is scoped per workspace via the +# parameters {workspace} field — that's what the regime checks +# against the admin's role workspace_scope. +register(Operation( + name="create-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="list-users", + capability="users:read", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="get-user", + capability="users:read", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="update-user", + capability="users:write", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="disable-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="enable-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="delete-user", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) +register(Operation( + name="reset-password", + capability="users:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_workspace_param_only, +)) + + +# API keys: workspace-level resource — keys live within a workspace. +register(Operation( + name="create-api-key", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) +register(Operation( + name="list-api-keys", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) +register(Operation( + name="revoke-api-key", + capability="keys:admin", + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, +)) + + +# Workspace registry: SYSTEM-level resource (workspaces are the +# top-level addressable unit). No parameters — the workspace being +# acted on is identified by the body, not used as a scope cue. +register(Operation( + name="create-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="list-workspaces", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="get-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="update-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="disable-workspace", + capability="workspaces:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# Signing key: SYSTEM-level operational op. +register(Operation( + name="rotate-signing-key", + capability="iam:admin", + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# --------------------------------------------------------------------------- +# Auth-surface entries. +# +# Listed here so the registry is the one place the gateway looks for +# operation→capability mappings — including the sentinels for paths +# that don't go through capability-based authorisation. The actual +# routing is in auth_endpoints.py; these entries let the registry- +# driven dispatcher recognise the operation if it sees it on a +# generic path. +# --------------------------------------------------------------------------- + +register(Operation( + name="login", + capability=PUBLIC, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="bootstrap", + capability=PUBLIC, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) +register(Operation( + name="change-password", + capability=AUTHENTICATED, + resource_level=ResourceLevel.SYSTEM, + extract_resource=_empty_resource, + extract_parameters=_no_parameters, +)) + + +# --------------------------------------------------------------------------- +# Generic kind/operation entries. +# +# Names are ``:`` so the registry key is unique +# across dispatchers. All entries below are workspace-level +# resources (workspace defaulted from the caller's bound workspace +# if absent). Read/write distinction maps to the existing +# ``:read`` / ``:write`` capability vocabulary +# defined in capabilities.md. +# --------------------------------------------------------------------------- + + +def _register_kind_op(kind: str, op: str, capability: str) -> None: + """Helper: register a workspace-level kind:op with the standard + extractors (workspace from body, no extra parameters).""" + register(Operation( + name=f"{kind}:{op}", + capability=capability, + resource_level=ResourceLevel.WORKSPACE, + extract_resource=_workspace_from_body, + extract_parameters=_no_parameters, + )) + + +# config: KV-style workspace config service. +for _op in ("get", "list", "getvalues", "getvalues-all-ws", "config"): + _register_kind_op("config", _op, "config:read") +for _op in ("put", "delete"): + _register_kind_op("config", _op, "config:write") + + +# flow: flow-blueprint and flow-lifecycle service. +for _op in ("list-blueprints", "get-blueprint", "list-flows", "get-flow"): + _register_kind_op("flow", _op, "flows:read") +for _op in ("put-blueprint", "delete-blueprint", "start-flow", "stop-flow"): + _register_kind_op("flow", _op, "flows:write") + + +# librarian: document storage and processing service. +for _op in ( + "get-document-metadata", "get-document-content", + "stream-document", "list-documents", "list-processing", + "get-upload-status", "list-uploads", +): + _register_kind_op("librarian", _op, "documents:read") +for _op in ( + "add-document", "remove-document", "update-document", + "add-processing", "remove-processing", + "begin-upload", "upload-chunk", "complete-upload", "abort-upload", +): + _register_kind_op("librarian", _op, "documents:write") + + +# knowledge: knowledge-graph core service. +for _op in ("get-kg-core", "list-kg-cores"): + _register_kind_op("knowledge", _op, "knowledge:read") +for _op in ("put-kg-core", "delete-kg-core", + "load-kg-core", "unload-kg-core"): + _register_kind_op("knowledge", _op, "knowledge:write") + + +# collection-management: workspace collection lifecycle. +_register_kind_op("collection-management", "list-collections", "collections:read") +for _op in ("update-collection", "delete-collection"): + _register_kind_op("collection-management", _op, "collections:write") + + +# --------------------------------------------------------------------------- +# Per-flow data-plane services. +# +# /api/v1/flow/{flow}/service/{kind} and the streaming +# /api/v1/flow/{flow}/{import,export}/{kind} paths. No body-level +# ``operation`` discriminator — the URL kind is the operation +# identity. Resource is FLOW level (workspace + flow). +# +# Names: ``flow-service:``, ``flow-import:``, +# ``flow-export:``. +# --------------------------------------------------------------------------- + + +def _register_flow_kind(prefix: str, kind: str, capability: str) -> None: + register(Operation( + name=f"{prefix}:{kind}", + capability=capability, + resource_level=ResourceLevel.FLOW, + extract_resource=_flow_from_match_info, + extract_parameters=_no_parameters, + )) + + +# Request/response services on /api/v1/flow/{flow}/service/{kind}. +_FLOW_SERVICES = { + "agent": "agent", + "text-completion": "llm", + "prompt": "llm", + "mcp-tool": "mcp", + "graph-rag": "graph:read", + "document-rag": "documents:read", + "embeddings": "embeddings", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "triples": "graph:read", + "rows": "rows:read", + "nlp-query": "rows:read", + "structured-query": "rows:read", + "structured-diag": "rows:read", + "row-embeddings": "rows:read", + "sparql": "graph:read", +} +for _kind, _cap in _FLOW_SERVICES.items(): + _register_flow_kind("flow-service", _kind, _cap) + + +# Streaming import socket endpoints. +_FLOW_IMPORTS = { + "triples": "graph:write", + "graph-embeddings": "graph:write", + "document-embeddings": "documents:write", + "entity-contexts": "documents:write", + "rows": "rows:write", +} +for _kind, _cap in _FLOW_IMPORTS.items(): + _register_flow_kind("flow-import", _kind, _cap) + + +# Streaming export socket endpoints. +_FLOW_EXPORTS = { + "triples": "graph:read", + "graph-embeddings": "graph:read", + "document-embeddings": "documents:read", + "entity-contexts": "documents:read", +} +for _kind, _cap in _FLOW_EXPORTS.items(): + _register_flow_kind("flow-export", _kind, _cap) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 6e7c7aa5..44c7df23 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -40,6 +40,78 @@ API_KEY_RANDOM_BYTES = 24 JWT_ISSUER = "trustgraph-iam" JWT_TTL_SECONDS = 3600 +# Default authorisation cache TTL the regime tells the gateway to +# observe. 60s is the OSS-spec maximum revocation latency: a role +# change, workspace disable, or key revoke takes effect within at +# most this much time. +AUTHZ_CACHE_TTL_SECONDS = 60 + + +# OSS regime role table. Lives here, not in the gateway — the +# gateway is regime-agnostic and must not encode policy. +# +# Each role has a capability set and a workspace scope. The +# evaluator (handle_authorise below) checks (a) that some role +# held by the caller grants the requested capability, and (b) +# that role's workspace scope permits the target workspace. + +_READER_CAPS = { + "agent", + "graph:read", + "documents:read", + "rows:read", + "llm", + "embeddings", + "mcp", + "config:read", + "flows:read", + "collections:read", + "knowledge:read", + "keys:self", +} + +_WRITER_CAPS = _READER_CAPS | { + "graph:write", + "documents:write", + "rows:write", + "collections:write", + "knowledge:write", +} + +_ADMIN_CAPS = _WRITER_CAPS | { + "config:write", + "flows:write", + "users:read", "users:write", "users:admin", + "keys:admin", + "workspaces:admin", + "iam:admin", + "metrics:read", +} + +ROLE_DEFINITIONS = { + "reader": { + "capabilities": _READER_CAPS, + "workspace_scope": "assigned", + }, + "writer": { + "capabilities": _WRITER_CAPS, + "workspace_scope": "assigned", + }, + "admin": { + "capabilities": _ADMIN_CAPS, + "workspace_scope": "*", + }, +} + + +def _scope_permits(role_scope, target_workspace, assigned_workspace): + """Does the given role apply to ``target_workspace``?""" + if role_scope == "*": + return True + if role_scope == "assigned": + return target_workspace == assigned_workspace + return False + def _now_iso(): return datetime.datetime.now(datetime.timezone.utc).isoformat() @@ -250,6 +322,10 @@ class IamService: return await self.handle_disable_workspace(v) if op == "rotate-signing-key": return await self.handle_rotate_signing_key(v) + if op == "authorise": + return await self.handle_authorise(v) + if op == "authorise-many": + return await self.handle_authorise_many(v) return _err( "invalid-argument", @@ -478,7 +554,7 @@ class IamService: ( id, ws, _username, _name, _email, password_hash, - roles, enabled, _mcp, _created, + _roles, enabled, _mcp, _created, ) = user_row if not enabled: @@ -496,11 +572,14 @@ class IamService: now_ts = int(_now_dt().timestamp()) exp_ts = now_ts + JWT_TTL_SECONDS + # Per the IAM contract the gateway never reads policy state + # from the credential — roles stay server-side, reachable + # only via authorise(). JWT carries identity + workspace + # binding only. claims = { "iss": JWT_ISSUER, "sub": id, "workspace": ws, - "roles": sorted(roles) if roles else [], "iat": now_ts, "exp": exp_ts, } @@ -1130,3 +1209,134 @@ class IamService: await self.table_store.delete_api_key(key_hash) return IamResponse() + + # ------------------------------------------------------------------ + # authorise / authorise-many + # + # The IAM contract (see docs/tech-specs/iam-contract.md) calls + # for the regime — not the gateway — to decide whether an + # identity may perform a capability on a resource given the + # operation's parameters. These two operations are the OSS + # regime's implementation of that contract. + # + # Inputs (on IamRequest): + # user_id — the identity handle (the gateway's + # opaque reference). For OSS this is the + # user record's id. + # capability — the capability string from the + # capabilities.md vocabulary. + # resource_json — JSON dict, the resource address + # ({} for system, {workspace} for + # workspace, {workspace, flow} for flow). + # parameters_json — JSON dict, decision-relevant operation + # parameters (e.g. workspace association + # on user-registry operations). + # authorise_checks — for authorise-many, a JSON list of + # {capability, resource, parameters}. + # + # Outputs (on IamResponse): + # decision_allow — single allow / deny verdict. + # decision_ttl_seconds — gateway cache TTL for this + # decision. + # decisions_json — for authorise-many, list of + # {allow, ttl} in request order. + # ------------------------------------------------------------------ + + def _decide(self, user_row, capability, resource, parameters): + """Single authorisation decision. Returns (allow, ttl).""" + + if user_row is None: + return False, AUTHZ_CACHE_TTL_SECONDS + + # user_row layout: + # 0:id 1:workspace 2:username 3:name 4:email 5:password_hash + # 6:roles 7:enabled 8:must_change_password 9:created + if not user_row[7]: # disabled + return False, AUTHZ_CACHE_TTL_SECONDS + + # Disabled workspace check (defense in depth — credentials + # bound to a disabled workspace shouldn't be able to act). + # Cheap; one row read. + # We do this only when a target workspace is involved, to + # avoid an extra read for system-level operations that + # bypass workspace altogether. + target_workspace = ( + (resource or {}).get("workspace") + or (parameters or {}).get("workspace") + ) + + roles = user_row[6] or set() + assigned_workspace = user_row[1] + + for role_name in roles: + defn = ROLE_DEFINITIONS.get(role_name) + if defn is None: + continue + if capability not in defn["capabilities"]: + continue + if target_workspace is None or _scope_permits( + defn["workspace_scope"], + target_workspace, + assigned_workspace, + ): + return True, AUTHZ_CACHE_TTL_SECONDS + + return False, AUTHZ_CACHE_TTL_SECONDS + + async def handle_authorise(self, v): + if not v.capability: + return _err("invalid-argument", "capability required") + if not v.user_id: + return _err("invalid-argument", "user_id (handle) required") + + try: + resource = json.loads(v.resource_json or "{}") + parameters = json.loads(v.parameters_json or "{}") + except json.JSONDecodeError as e: + return _err("invalid-argument", f"bad json: {e}") + + user_row = await self.table_store.get_user(v.user_id) + allow, ttl = self._decide( + user_row, v.capability, resource, parameters, + ) + return IamResponse( + decision_allow=allow, + decision_ttl_seconds=ttl, + ) + + async def handle_authorise_many(self, v): + if not v.user_id: + return _err("invalid-argument", "user_id (handle) required") + if not v.authorise_checks: + return _err("invalid-argument", "authorise_checks required") + + try: + checks = json.loads(v.authorise_checks) + except json.JSONDecodeError as e: + return _err("invalid-argument", f"bad json: {e}") + if not isinstance(checks, list): + return _err( + "invalid-argument", + "authorise_checks must be a JSON list", + ) + + # One user lookup for the whole batch. + user_row = await self.table_store.get_user(v.user_id) + + decisions = [] + for c in checks: + if not isinstance(c, dict): + decisions.append({ + "allow": False, + "ttl": AUTHZ_CACHE_TTL_SECONDS, + }) + continue + allow, ttl = self._decide( + user_row, + c.get("capability", ""), + c.get("resource") or {}, + c.get("parameters") or {}, + ) + decisions.append({"allow": allow, "ttl": ttl}) + + return IamResponse(decisions_json=json.dumps(decisions))