diff --git a/docs/tech-specs/iam-protocol.md b/docs/tech-specs/iam-protocol.md index 8638e7e9..18c5e0b2 100644 --- a/docs/tech-specs/iam-protocol.md +++ b/docs/tech-specs/iam-protocol.md @@ -248,46 +248,6 @@ Passwords, API-key plaintext, and signing-key private material are never returned in any response other than the explicit one-time responses above (`reset-password`, `create-api-key`, `bootstrap`). -## Bootstrap modes - -`iam-svc` requires a bootstrap mode to be chosen at startup. There is -no default — an unset or invalid mode causes the service to refuse -to start. The purpose is to force the operator to make an explicit -security decision rather than rely on an implicit "safe" fallback. - -| Mode | Startup behaviour | `bootstrap` operation | Suitability | -|---|---|---|---| -| `token` | On first start with empty tables, auto-seeds the `default` workspace, admin user, admin API key (using the operator-provided `--bootstrap-token`), and an initial signing key. No-op on subsequent starts. | Refused — returns `auth-failed` / `"auth failure"` regardless of caller. | Production, any public-exposure deployment. | -| `bootstrap` | No startup seeding. Tables remain empty until the `bootstrap` operation is invoked over the pub/sub bus (typically via `tg-bootstrap-iam`). | Live while tables are empty. Generates and returns the admin API key once. Refused (`auth-failed`) once tables are populated. | Dev / compose up / CI. **Not safe under public exposure** — any caller reaching the gateway's `/api/v1/iam` forwarder before the operator can cause a token to be issued to them. Operators choosing this mode accept that risk. | - -### Error masking - -In both modes, any refused invocation of the `bootstrap` operation -returns the same error (`auth-failed` / `"auth failure"`). A caller -cannot distinguish: - -- "service is in token mode" -- "service is in bootstrap mode but already bootstrapped" -- "operation forbidden" - -This matches the general IAM error-policy stance (see `iam.md`) and -prevents externally enumerating IAM's state. - -### Bootstrap-token lifecycle - -The bootstrap token — whether operator-supplied (`token` mode) or -service-generated (`bootstrap` mode) — is a one-time credential. It -is stored as admin's single API key, tagged `name="bootstrap"`. The -operator's first admin action after bootstrap should be: - -1. Create a durable admin user and API key (or issue a durable API - key to the bootstrap admin). -2. Revoke the bootstrap key via `revoke-api-key`. -3. Remove the bootstrap token from any deployment configuration. - -The `name="bootstrap"` marker makes bootstrap keys easy to detect in -tooling (e.g. a `tg-list-api-keys` filter). - ## HTTP forwarding (initial integration) For the initial gateway integration — before the IAM service is diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 8d88991d..d316ae4f 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -40,7 +40,6 @@ tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-get-document-content = "trustgraph.cli.get_document_content:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" -tg-bootstrap-iam = "trustgraph.cli.bootstrap_iam:main" tg-invoke-agent = "trustgraph.cli.invoke_agent:main" tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main" tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main" diff --git a/trustgraph-cli/trustgraph/cli/bootstrap_iam.py b/trustgraph-cli/trustgraph/cli/bootstrap_iam.py deleted file mode 100644 index 99a789e2..00000000 --- a/trustgraph-cli/trustgraph/cli/bootstrap_iam.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Bootstraps the IAM service. Only works when iam-svc is running in -bootstrap mode with empty tables. Prints the initial admin API key -to stdout. - -This is a one-time, trust-sensitive operation. The resulting token -is shown once and never again — capture it on use. Rotate and -revoke it as soon as a real admin API key has been issued. -""" - -import argparse -import json -import os -import sys - -import requests - -default_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") - - -def bootstrap(url): - - # Unauthenticated public endpoint — IAM refuses the bootstrap - # operation unless the service is running in bootstrap mode with - # empty tables, so the safety gate lives on the server side. - endpoint = url.rstrip("/") + "/api/v1/auth/bootstrap" - - headers = {"Content-Type": "application/json"} - - resp = requests.post( - endpoint, - headers=headers, - data=json.dumps({}), - ) - - if resp.status_code != 200: - raise RuntimeError( - f"HTTP {resp.status_code}: {resp.text}" - ) - - body = resp.json() - - if "error" in body: - raise RuntimeError( - f"IAM {body['error'].get('type', 'error')}: " - f"{body['error'].get('message', '')}" - ) - - api_key = body.get("bootstrap_admin_api_key") - user_id = body.get("bootstrap_admin_user_id") - - if not api_key: - raise RuntimeError( - "IAM response did not contain a bootstrap token — the " - "service may already be bootstrapped, or may be running " - "in token mode." - ) - - return user_id, api_key - - -def main(): - - parser = argparse.ArgumentParser( - prog="tg-bootstrap-iam", - description=__doc__, - ) - - parser.add_argument( - "-u", "--api-url", - default=default_url, - help=f"API URL (default: {default_url})", - ) - - args = parser.parse_args() - - try: - user_id, api_key = bootstrap(args.api_url) - except Exception as e: - print("Exception:", e, file=sys.stderr, flush=True) - sys.exit(1) - - # Stdout gets machine-readable output (the key). Any operator - # context goes to stderr. - print(f"Admin user id: {user_id}", file=sys.stderr) - print( - "Admin API key (shown once, capture now):", - file=sys.stderr, - ) - print(api_key) - - -if __name__ == "__main__": - main() diff --git a/trustgraph-flow/trustgraph/gateway/auth.py b/trustgraph-flow/trustgraph/gateway/auth.py index 95743261..a693ca32 100644 --- a/trustgraph-flow/trustgraph/gateway/auth.py +++ b/trustgraph-flow/trustgraph/gateway/auth.py @@ -1,264 +1,22 @@ -""" -IAM-backed authentication for the API gateway. -Replaces the legacy GATEWAY_SECRET shared-token Authenticator. The -gateway is now stateless with respect to credentials: it either -verifies a JWT locally using the active IAM signing public key, or -resolves an API key by hash with a short local cache backed by the -IAM service. +class Authenticator: -Identity returned by authenticate() is the (user_id, workspace, -roles) triple the rest of the gateway — capability checks, workspace -resolver, audit logging — needs. -""" + def __init__(self, token=None, allow_all=False): -import asyncio -import base64 -import hashlib -import json -import logging -import time -import uuid -from dataclasses import dataclass + if not allow_all and token is None: + raise RuntimeError("Need a token") -from aiohttp import web + if not allow_all and token == "": + raise RuntimeError("Need a token") -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ed25519 + self.token = token + self.allow_all = allow_all -from ..base.iam_client import IamClient -from ..base.metrics import ProducerMetrics, SubscriberMetrics -from ..schema import ( - IamRequest, IamResponse, - iam_request_queue, iam_response_queue, -) + def permitted(self, token, roles): -logger = logging.getLogger("auth") + if self.allow_all: return True -API_KEY_CACHE_TTL = 60 # seconds + if self.token != token: return False + return True -@dataclass -class Identity: - user_id: str - workspace: str - roles: list - source: str # "api-key" | "jwt" - - -def _auth_failure(): - return web.HTTPUnauthorized( - text='{"error":"auth failure"}', - content_type="application/json", - ) - - -def _access_denied(): - return web.HTTPForbidden( - text='{"error":"access denied"}', - content_type="application/json", - ) - - -def _b64url_decode(s): - pad = "=" * (-len(s) % 4) - return base64.urlsafe_b64decode(s + pad) - - -def _verify_jwt_eddsa(token, public_pem): - """Verify an Ed25519 JWT and return its claims. Raises on any - validation failure. Refuses non-EdDSA algorithms.""" - parts = token.split(".") - if len(parts) != 3: - raise ValueError("malformed JWT") - h_b64, p_b64, s_b64 = parts - signing_input = f"{h_b64}.{p_b64}".encode("ascii") - header = json.loads(_b64url_decode(h_b64)) - if header.get("alg") != "EdDSA": - raise ValueError(f"unsupported alg: {header.get('alg')!r}") - - key = serialization.load_pem_public_key(public_pem.encode("ascii")) - if not isinstance(key, ed25519.Ed25519PublicKey): - raise ValueError("public key is not Ed25519") - - signature = _b64url_decode(s_b64) - key.verify(signature, signing_input) # raises InvalidSignature - - claims = json.loads(_b64url_decode(p_b64)) - exp = claims.get("exp") - if exp is None or exp < time.time(): - raise ValueError("expired") - return claims - - -class IamAuth: - """Resolves bearer credentials via the IAM service. - - Used by every gateway endpoint that needs authentication. Fetches - the IAM signing public key at startup (cached in memory). API - keys are resolved via the IAM service with a local hash→identity - cache (short TTL so revoked keys stop working within the TTL - window without any push mechanism).""" - - def __init__(self, backend, id="api-gateway"): - self.backend = backend - self.id = id - - # Populated at start() via IAM. - self._signing_public_pem = None - - # API-key cache: plaintext_sha256_hex -> (Identity, expires_ts) - self._key_cache = {} - self._key_cache_lock = asyncio.Lock() - - # ------------------------------------------------------------------ - # Short-lived client helper. Mirrors the pattern used by the - # bootstrap framework and AsyncProcessor: a fresh uuid suffix per - # invocation so Pulsar exclusive subscriptions don't collide with - # ghosts from prior calls. - # ------------------------------------------------------------------ - - def _make_client(self): - rr_id = str(uuid.uuid4()) - return IamClient( - backend=self.backend, - subscription=f"{self.id}--iam--{rr_id}", - consumer_name=self.id, - request_topic=iam_request_queue, - request_schema=IamRequest, - request_metrics=ProducerMetrics( - processor=self.id, flow=None, name="iam-request", - ), - response_topic=iam_response_queue, - response_schema=IamResponse, - response_metrics=SubscriberMetrics( - processor=self.id, flow=None, name="iam-response", - ), - ) - - async def _with_client(self, op): - """Open a short-lived IamClient, run ``op(client)``, close.""" - client = self._make_client() - await client.start() - try: - return await op(client) - finally: - try: - await client.stop() - except Exception: - pass - - # ------------------------------------------------------------------ - # Lifecycle - # ------------------------------------------------------------------ - - async def start(self, max_retries=30, retry_delay=2.0): - """Fetch the signing public key from IAM. Retries on - failure — the gateway may be starting before IAM is ready.""" - - async def _fetch(client): - return await client.get_signing_key_public() - - for attempt in range(max_retries): - try: - pem = await self._with_client(_fetch) - if pem: - self._signing_public_pem = pem - logger.info( - "IamAuth: fetched IAM signing public key " - f"({len(pem)} bytes)" - ) - return - except Exception as e: - logger.info( - f"IamAuth: waiting for IAM signing key " - f"({type(e).__name__}: {e}); " - f"retry {attempt + 1}/{max_retries}" - ) - await asyncio.sleep(retry_delay) - - # Don't prevent startup forever. A later authenticate() call - # will try again via the JWT path. - logger.warning( - "IamAuth: could not fetch IAM signing key at startup; " - "JWT validation will fail until it's available" - ) - - # ------------------------------------------------------------------ - # Authentication - # ------------------------------------------------------------------ - - async def authenticate(self, request): - """Extract and validate the Bearer credential from an HTTP - request. Returns an ``Identity``. Raises HTTPUnauthorized - (401 / "auth failure") on any failure mode — the caller - cannot distinguish missing / malformed / invalid / expired / - revoked credentials.""" - - header = request.headers.get("Authorization", "") - if not header.startswith("Bearer "): - raise _auth_failure() - token = header[len("Bearer "):].strip() - if not token: - raise _auth_failure() - - # API keys always start with "tg_". JWTs have two dots and - # no "tg_" prefix. Discriminate cheaply. - if token.startswith("tg_"): - return await self._resolve_api_key(token) - if token.count(".") == 2: - return self._verify_jwt(token) - raise _auth_failure() - - def _verify_jwt(self, token): - if not self._signing_public_pem: - raise _auth_failure() - try: - claims = _verify_jwt_eddsa(token, self._signing_public_pem) - except Exception as e: - logger.debug(f"JWT validation failed: {type(e).__name__}: {e}") - raise _auth_failure() - - sub = claims.get("sub", "") - ws = claims.get("workspace", "") - roles = list(claims.get("roles", [])) - if not sub or not ws: - raise _auth_failure() - - return Identity( - user_id=sub, workspace=ws, roles=roles, source="jwt", - ) - - async def _resolve_api_key(self, plaintext): - h = hashlib.sha256(plaintext.encode("utf-8")).hexdigest() - - cached = self._key_cache.get(h) - now = time.time() - if cached and cached[1] > now: - return cached[0] - - async with self._key_cache_lock: - cached = self._key_cache.get(h) - if cached and cached[1] > now: - return cached[0] - - try: - async def _call(client): - return await client.resolve_api_key(plaintext) - user_id, workspace, roles = await self._with_client(_call) - except Exception as e: - logger.debug( - f"API key resolution failed: " - f"{type(e).__name__}: {e}" - ) - raise _auth_failure() - - if not user_id or not workspace: - raise _auth_failure() - - identity = Identity( - user_id=user_id, workspace=workspace, - roles=list(roles), source="api-key", - ) - self._key_cache[h] = (identity, now + API_KEY_CACHE_TTL) - return identity diff --git a/trustgraph-flow/trustgraph/gateway/capabilities.py b/trustgraph-flow/trustgraph/gateway/capabilities.py deleted file mode 100644 index 5413a4b1..00000000 --- a/trustgraph-flow/trustgraph/gateway/capabilities.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Capability vocabulary and OSS role bundles. - -See docs/tech-specs/capabilities.md for the authoritative description. -The mapping below is the data form of the OSS bundle table in that -spec. Enterprise editions may replace this module with their own -role table; the vocabulary (capability strings) is shared. - -The module also exposes: - -- ``PUBLIC`` — a sentinel indicating an endpoint requires no - authentication (login, bootstrap). -- ``AUTHENTICATED`` — a sentinel indicating an endpoint requires a - valid identity but no specific capability (e.g. change-password). -- ``check(roles, capability)`` — the union-of-bundles membership test. -""" - -from aiohttp import web - - -PUBLIC = "__public__" -AUTHENTICATED = "__authenticated__" - - -# Capability vocabulary. Mirrors the "Capability list" tables in -# capabilities.md. Kept as a set of valid strings so the gateway can -# fail-closed on an endpoint that declares an unknown capability. -KNOWN_CAPABILITIES = { - # Data plane - "agent", - "graph:read", "graph:write", - "documents:read", "documents:write", - "rows:read", "rows:write", - "llm", - "embeddings", - "mcp", - # Control plane - "config:read", "config:write", - "flows:read", "flows:write", - "users:read", "users:write", "users:admin", - "keys:self", "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", - "collections:read", "collections:write", - "knowledge:read", "knowledge:write", -} - - -# OSS role → capability set. Enterprise overrides this mapping. -_READER_CAPS = { - "agent", - "graph:read", - "documents:read", - "rows:read", - "llm", - "embeddings", - "mcp", - "config:read", - "flows:read", - "collections:read", - "knowledge:read", - "keys:self", -} - -_WRITER_CAPS = _READER_CAPS | { - "graph:write", - "documents:write", - "rows:write", - "collections:write", - "knowledge:write", -} - -_ADMIN_CAPS = _WRITER_CAPS | { - "config:write", - "flows:write", - "users:read", "users:write", "users:admin", - "keys:admin", - "workspaces:admin", - "iam:admin", - "metrics:read", -} - -ROLE_CAPABILITIES = { - "reader": _READER_CAPS, - "writer": _WRITER_CAPS, - "admin": _ADMIN_CAPS, -} - - -def check(roles, capability): - """Return True if any of ``roles`` grants ``capability``. - - Unknown roles contribute zero capabilities (deterministic fail- - closed behaviour per the spec).""" - if capability not in KNOWN_CAPABILITIES: - # Endpoint misconfiguration. Fail closed. - return False - for r in roles: - if capability in ROLE_CAPABILITIES.get(r, ()): - return True - return False - - -def access_denied(): - return web.HTTPForbidden( - text='{"error":"access denied"}', - content_type="application/json", - ) - - -def auth_failure(): - return web.HTTPUnauthorized( - text='{"error":"auth failure"}', - content_type="application/json", - ) - - -async def enforce(request, auth, capability): - """Authenticate + capability-check in one step. Returns an - ``Identity`` (or ``None`` for ``PUBLIC`` endpoints) or raises - the appropriate HTTPException. - - Usage in an endpoint handler: - - identity = await enforce(request, self.auth, self.capability) - - - ``PUBLIC``: no authentication attempted, returns ``None``. - - ``AUTHENTICATED``: any valid identity is accepted. - - any capability string: identity must carry a role granting it. - """ - if capability == PUBLIC: - return None - - identity = await auth.authenticate(request) - - if capability == AUTHENTICATED: - return identity - - if not check(identity.roles, capability): - raise access_denied() - - return identity - - -def enforce_workspace(data, identity): - """Validate + inject the workspace field on a request body. - - OSS behaviour: - - If ``data["workspace"]`` is present and differs from the - caller's assigned workspace → 403. - - Otherwise, set ``data["workspace"]`` to the caller's assigned - workspace. - - Enterprise editions will plug in a different resolver that - checks a permitted-set instead of a single value; the wire - protocol is unchanged.""" - requested = data.get("workspace", "") if isinstance(data, dict) else "" - if requested and requested != identity.workspace: - raise access_denied() - if isinstance(data, dict): - data["workspace"] = identity.workspace - return data diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index de64f25f..95a0ab66 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -108,18 +108,12 @@ class DispatcherWrapper: class DispatcherManager: def __init__(self, backend, config_receiver, prefix="api-gateway", - queue_overrides=None, auth=None): + queue_overrides=None): self.backend = backend self.config_receiver = config_receiver self.config_receiver.add_handler(self) self.prefix = prefix - # Gateway IamAuth — used by the socket mux for first-frame - # auth. ``None`` keeps the legacy "caller-supplied - # workspace" behaviour for anything that instantiates Mux - # directly without auth. - self.auth = auth - # Store queue overrides for global services # Format: {"config": {"request": "...", "response": "..."}, ...} self.queue_overrides = queue_overrides or {} @@ -171,15 +165,6 @@ class DispatcherManager: def dispatch_global_service(self): return DispatcherWrapper(self.process_global_service) - def dispatch_auth_iam(self): - """Pre-configured IAM dispatcher for the gateway's auth - endpoints (login, bootstrap, change-password). Pins the - kind to ``iam`` so these handlers don't have to supply URL - params the global dispatcher would expect.""" - async def _process(data, responder): - return await self.invoke_global_service(data, responder, "iam") - return DispatcherWrapper(_process) - def dispatch_core_export(self): return DispatcherWrapper(self.process_core_export) @@ -331,10 +316,7 @@ class DispatcherManager: async def process_socket(self, ws, running, params): - # The mux self-authenticates via the first-frame protocol; - # pass the gateway's IamAuth so it can validate tokens - # without reaching back into the endpoint layer. - dispatcher = Mux(self, ws, running, auth=self.auth) + dispatcher = Mux(self, ws, running) return dispatcher diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index b9bb45bc..3d610dca 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -16,26 +16,11 @@ MAX_QUEUE_SIZE = 10 class Mux: - def __init__(self, dispatcher_manager, ws, running, auth=None): - """ - ``auth`` — an ``IamAuth`` when the enclosing endpoint is - configured for in-band first-frame auth. ``None`` for the - legacy ``?token=`` path (kept for the flow import/export - streaming endpoints). - """ + def __init__(self, dispatcher_manager, ws, running): self.dispatcher_manager = dispatcher_manager self.ws = ws self.running = running - self.auth = auth - - # Authenticated identity, populated by the first-frame auth - # protocol. ``None`` means the socket is not yet - # authenticated; any non-auth frame is refused. If - # ``auth`` is ``None`` (legacy path) the mux acts as if - # already authenticated and uses client-supplied workspace - # values (pre-existing behaviour). - self.identity = None self.q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) @@ -46,41 +31,6 @@ class Mux: if self.ws: await self.ws.close() - async def _handle_auth_frame(self, data): - """Process a ``{"type": "auth", "token": "..."}`` frame. - On success, updates ``self.identity`` and returns an - ``auth-ok`` response frame. On failure, returns the masked - auth-failure frame. Never raises — auth failures keep the - socket open so the client can retry without reconnecting - (important for browsers, which treat a handshake-time 401 - as terminal).""" - token = data.get("token", "") - if not token or self.auth is None: - await self.ws.send_json({ - "type": "auth-failed", - "error": "auth failure", - }) - return - - class _Shim: - def __init__(self, tok): - self.headers = {"Authorization": f"Bearer {tok}"} - - try: - identity = await self.auth.authenticate(_Shim(token)) - except Exception: - await self.ws.send_json({ - "type": "auth-failed", - "error": "auth failure", - }) - return - - self.identity = identity - await self.ws.send_json({ - "type": "auth-ok", - "workspace": identity.workspace, - }) - async def receive(self, msg): request_id = None @@ -88,18 +38,6 @@ class Mux: try: data = msg.json() - - # In-band auth protocol: the client sends - # ``{"type": "auth", "token": "..."}`` as its first frame - # (and any time it wants to re-auth: JWT refresh, token - # rotation, workspace switch in a future multi-workspace - # enterprise). The protocol coexists with legacy - # non-auth sockets (self.auth is None) — on those, every - # frame is a request and workspace is caller-supplied. - if isinstance(data, dict) and data.get("type") == "auth": - await self._handle_auth_frame(data) - return - request_id = data.get("id") if "request" not in data: @@ -108,42 +46,9 @@ class Mux: if "id" not in data: raise RuntimeError("Bad message") - # First-frame auth gating: if the enclosing endpoint is - # configured for in-band auth, reject all non-auth frames - # until an auth-ok has been issued. - if self.auth is not None and self.identity is None: - await self.ws.send_json({ - "id": request_id, - "error": { - "message": "auth failure", - "type": "auth-required", - }, - "complete": True, - }) - return - - # Workspace resolution. Authenticated sockets override - # the client-supplied workspace with the resolved value - # from the identity; mismatch is an access-denied error. - if self.identity is not None: - requested_ws = data.get("workspace", "") - if requested_ws and requested_ws != self.identity.workspace: - await self.ws.send_json({ - "id": request_id, - "error": { - "message": "access denied", - "type": "access-denied", - }, - "complete": True, - }) - return - workspace = self.identity.workspace - else: - workspace = data.get("workspace", "default") - await self.q.put(( data["id"], - workspace, + data.get("workspace", "default"), data.get("flow"), data["service"], data["request"] diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py b/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py deleted file mode 100644 index 6037fc4b..00000000 --- a/trustgraph-flow/trustgraph/gateway/endpoint/auth_endpoints.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Gateway auth endpoints. - -Three dedicated paths: - POST /api/v1/auth/login — unauthenticated; username/password → JWT - POST /api/v1/auth/bootstrap — unauthenticated; IAM bootstrap op - POST /api/v1/auth/change-password — authenticated; any role - -These are the only IAM-surface operations that can be reached from -outside. Everything else routes through ``/api/v1/iam`` gated by -``users:admin``. -""" - -import logging - -from aiohttp import web - -from .. capabilities import enforce, PUBLIC, AUTHENTICATED - -logger = logging.getLogger("auth-endpoints") -logger.setLevel(logging.INFO) - - -class AuthEndpoints: - """Groups the three auth-surface handlers. Each forwards to the - IAM service via the existing ``IamRequestor`` dispatcher.""" - - def __init__(self, iam_dispatcher, auth): - self.iam = iam_dispatcher - self.auth = auth - - async def start(self): - pass - - def add_routes(self, app): - app.add_routes([ - web.post("/api/v1/auth/login", self.login), - web.post("/api/v1/auth/bootstrap", self.bootstrap), - web.post( - "/api/v1/auth/change-password", - self.change_password, - ), - ]) - - async def _forward(self, body): - async def responder(x, fin): - pass - return await self.iam.process(body, responder) - - async def login(self, request): - """Public. Accepts {username, password, workspace?}. Returns - {jwt, jwt_expires} on success; IAM's masked auth failure on - anything else.""" - await enforce(request, self.auth, PUBLIC) - try: - body = await request.json() - except Exception: - return web.json_response( - {"error": "invalid json"}, status=400, - ) - req = { - "operation": "login", - "username": body.get("username", ""), - "password": body.get("password", ""), - "workspace": body.get("workspace", ""), - } - resp = await self._forward(req) - if "error" in resp: - return web.json_response( - {"error": "auth failure"}, status=401, - ) - return web.json_response(resp) - - async def bootstrap(self, request): - """Public. Valid only when IAM is running in bootstrap mode - with empty tables. In every other case the IAM service - returns a masked auth-failure.""" - await enforce(request, self.auth, PUBLIC) - resp = await self._forward({"operation": "bootstrap"}) - if "error" in resp: - return web.json_response( - {"error": "auth failure"}, status=401, - ) - return web.json_response(resp) - - async def change_password(self, request): - """Authenticated (any role). Accepts {current_password, - new_password}; user_id is taken from the authenticated - identity — the caller cannot change someone else's password - this way (reset-password is the admin path).""" - identity = await enforce(request, self.auth, AUTHENTICATED) - try: - body = await request.json() - except Exception: - return web.json_response( - {"error": "invalid json"}, status=400, - ) - req = { - "operation": "change-password", - "user_id": identity.user_id, - "password": body.get("current_password", ""), - "new_password": body.get("new_password", ""), - } - resp = await self._forward(req) - if "error" in resp: - err_type = resp.get("error", {}).get("type", "") - if err_type == "auth-failed": - return web.json_response( - {"error": "auth failure"}, status=401, - ) - return web.json_response( - {"error": resp.get("error", {}).get("message", "error")}, - status=400, - ) - return web.json_response(resp) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py index ee9c0447..58ba1738 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/constant_endpoint.py @@ -1,27 +1,28 @@ -import logging - +import asyncio from aiohttp import web - -from .. capabilities import enforce, enforce_workspace +import uuid +import logging logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) - class ConstantEndpoint: - def __init__(self, endpoint_path, auth, dispatcher, capability): + def __init__(self, endpoint_path, auth, dispatcher): self.path = endpoint_path + self.auth = auth - self.capability = capability + self.operation = "service" + self.dispatcher = dispatcher async def start(self): pass def add_routes(self, app): + app.add_routes([ web.post(self.path, self.handle), ]) @@ -30,13 +31,21 @@ class ConstantEndpoint: logger.debug(f"Processing request: {request.path}") - identity = await enforce(request, self.auth, self.capability) + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() try: - data = await request.json() - if identity is not None: - enforce_workspace(data, identity) + data = await request.json() async def responder(x, fin): pass @@ -45,8 +54,10 @@ class ConstantEndpoint: return web.json_response(resp) - except web.HTTPException: - raise except Exception as e: - logger.error(f"Exception: {e}", exc_info=True) - return web.json_response({"error": str(e)}) + logging.error(f"Exception: {e}") + + return web.json_response( + { "error": str(e) } + ) + diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/i18n.py b/trustgraph-flow/trustgraph/gateway/endpoint/i18n.py index f28f293d..b949a499 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/i18n.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/i18n.py @@ -4,18 +4,16 @@ from aiohttp import web from trustgraph.i18n import get_language_pack -from .. capabilities import enforce - logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) class I18nPackEndpoint: - def __init__(self, endpoint_path: str, auth, capability): + def __init__(self, endpoint_path: str, auth): self.path = endpoint_path self.auth = auth - self.capability = capability + self.operation = "service" async def start(self): pass @@ -28,13 +26,26 @@ class I18nPackEndpoint: async def handle(self, request): logger.debug(f"Processing i18n pack request: {request.path}") - await enforce(request, self.auth, self.capability) + token = "" + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except Exception: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() lang = request.match_info.get("lang") or "en" - # Path-traversal defense — critical, do not remove. + # This is a path traversal defense, and is a critical sec defense. + # Do not remove! if "/" in lang or ".." in lang: return web.HTTPBadRequest(reason="Invalid language code") pack = get_language_pack(lang) + return web.json_response(pack) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index 5bdaf367..fb8b0b76 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -8,284 +8,72 @@ from . variable_endpoint import VariableEndpoint from . socket import SocketEndpoint from . metrics import MetricsEndpoint from . i18n import I18nPackEndpoint -from . auth_endpoints import AuthEndpoints - -from .. capabilities import PUBLIC, AUTHENTICATED from .. dispatch.manager import DispatcherManager - -# Capability required for each kind on the /api/v1/{kind} generic -# endpoint (global services). Coarse gating — the IAM bundle split -# of "read vs write" per admin subsystem is not applied here because -# this endpoint forwards an opaque operation in the body. Writes -# are the upper bound on what the endpoint can do, so we gate on -# the write/admin capability. -GLOBAL_KIND_CAPABILITY = { - "config": "config:write", - "flow": "flows:write", - "librarian": "documents:write", - "knowledge": "knowledge:write", - "collection-management": "collections:write", - # IAM endpoints land on /api/v1/iam and require the admin bundle. - # Login / bootstrap / change-password are served by - # AuthEndpoints, which handle their own gating (PUBLIC / - # AUTHENTICATED). - "iam": "users:admin", -} - - -# Capability required for each kind on the -# /api/v1/flow/{flow}/service/{kind} endpoint (per-flow data-plane). -FLOW_KIND_CAPABILITY = { - "agent": "agent", - "text-completion": "llm", - "prompt": "llm", - "mcp-tool": "mcp", - "graph-rag": "graph:read", - "document-rag": "documents:read", - "embeddings": "embeddings", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "triples": "graph:read", - "rows": "rows:read", - "nlp-query": "rows:read", - "structured-query": "rows:read", - "structured-diag": "rows:read", - "row-embeddings": "rows:read", - "sparql": "graph:read", -} - - -# Capability for the streaming flow import/export endpoints, -# keyed by the "kind" URL segment. -FLOW_IMPORT_CAPABILITY = { - "triples": "graph:write", - "graph-embeddings": "graph:write", - "document-embeddings": "documents:write", - "entity-contexts": "documents:write", - "rows": "rows:write", -} - -FLOW_EXPORT_CAPABILITY = { - "triples": "graph:read", - "graph-embeddings": "graph:read", - "document-embeddings": "documents:read", - "entity-contexts": "documents:read", -} - - -from .. capabilities import enforce, enforce_workspace -import logging as _mgr_logging -_mgr_logger = _mgr_logging.getLogger("endpoint") - - -class _RoutedVariableEndpoint: - """HTTP endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the two - generic dispatch paths (``/api/v1/{kind}`` and - ``/api/v1/flow/{flow}/service/{kind}``). Self-contained rather - than subclassing ``VariableEndpoint`` to avoid mutating shared - state across concurrent requests.""" - - def __init__(self, endpoint_path, auth, dispatcher, capability_map): - self.path = endpoint_path - self.auth = auth - self.dispatcher = dispatcher - self._capability_map = capability_map - - async def start(self): - pass - - def add_routes(self, app): - app.add_routes([web.post(self.path, self.handle)]) - - async def handle(self, request): - kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: - return web.json_response( - {"error": "unknown kind"}, status=404, - ) - - identity = await enforce(request, self.auth, cap) - - try: - data = await request.json() - if identity is not None: - enforce_workspace(data, identity) - - async def responder(x, fin): - pass - - resp = await self.dispatcher.process( - data, responder, request.match_info, - ) - return web.json_response(resp) - - except web.HTTPException: - raise - except Exception as e: - _mgr_logger.error(f"Exception: {e}", exc_info=True) - return web.json_response({"error": str(e)}) - - -class _RoutedSocketEndpoint: - """WebSocket endpoint whose required capability is looked up per - request from the URL's ``kind`` parameter. Used for the flow - import/export streaming endpoints.""" - - def __init__(self, endpoint_path, auth, dispatcher, capability_map): - self.path = endpoint_path - self.auth = auth - self.dispatcher = dispatcher - self._capability_map = capability_map - - async def start(self): - pass - - def add_routes(self, app): - app.add_routes([web.get(self.path, self.handle)]) - - async def handle(self, request): - from .. capabilities import check, auth_failure, access_denied - - kind = request.match_info.get("kind", "") - cap = self._capability_map.get(kind) - if cap is None: - return web.json_response( - {"error": "unknown kind"}, status=404, - ) - - token = request.query.get("token", "") - if not token: - return auth_failure() - - from . socket import _QueryTokenRequest - try: - identity = await self.auth.authenticate( - _QueryTokenRequest(token) - ) - except web.HTTPException as e: - return e - if not check(identity.roles, cap): - return access_denied() - - # Delegate the websocket handling to a standalone SocketEndpoint - # with the resolved capability, bypassing the per-request mutation - # concern by instantiating fresh state. - ws_ep = SocketEndpoint( - endpoint_path=self.path, - auth=self.auth, - dispatcher=self.dispatcher, - capability=cap, - ) - return await ws_ep.handle(request) - - class EndpointManager: def __init__( - self, dispatcher_manager, auth, prometheus_url, timeout=600, + self, dispatcher_manager, auth, prometheus_url, timeout=600 ): self.dispatcher_manager = dispatcher_manager self.timeout = timeout - # IAM forwarder (needed by AuthEndpoints). The same dispatcher - # the global /api/v1/iam path uses. No workspace enforcement on - # auth endpoints since login / bootstrap / change-password are - # pre-identity. - self._iam_dispatcher = dispatcher_manager.dispatch_global_service() + self.services = { + } self.endpoints = [ - - # Auth surface — public / authenticated-any. Must come - # before the generic /api/v1/{kind} routes to win the - # match for /api/v1/auth/* paths. aiohttp routes in - # registration order, so we prepend here. - AuthEndpoints( - iam_dispatcher=dispatcher_manager.dispatch_auth_iam(), - auth=auth, - ), - I18nPackEndpoint( - endpoint_path="/api/v1/i18n/packs/{lang}", - auth=auth, - capability=PUBLIC, + endpoint_path = "/api/v1/i18n/packs/{lang}", + auth = auth, ), MetricsEndpoint( - endpoint_path="/api/metrics", - prometheus_url=prometheus_url, - auth=auth, - capability="metrics:read", + endpoint_path = "/api/metrics", + prometheus_url = prometheus_url, + auth = auth, ), - - # Global services: capability chosen per-kind. - _RoutedVariableEndpoint( - endpoint_path="/api/v1/{kind}", - auth=auth, - dispatcher=dispatcher_manager.dispatch_global_service(), - capability_map=GLOBAL_KIND_CAPABILITY, + VariableEndpoint( + endpoint_path = "/api/v1/{kind}", auth = auth, + dispatcher = dispatcher_manager.dispatch_global_service(), ), - - # /api/v1/socket: WebSocket handshake accepts - # unconditionally; the Mux dispatcher runs the - # first-frame auth protocol. Handshake-time 401s break - # browser reconnection, so authentication is always - # in-band for this endpoint. SocketEndpoint( - endpoint_path="/api/v1/socket", - auth=auth, - dispatcher=dispatcher_manager.dispatch_socket(), - capability=AUTHENTICATED, # informational only; bypassed - in_band_auth=True, + endpoint_path = "/api/v1/socket", + auth = auth, + dispatcher = dispatcher_manager.dispatch_socket() ), - - # Per-flow request/response services — capability per kind. - _RoutedVariableEndpoint( - endpoint_path="/api/v1/flow/{flow}/service/{kind}", - auth=auth, - dispatcher=dispatcher_manager.dispatch_flow_service(), - capability_map=FLOW_KIND_CAPABILITY, + VariableEndpoint( + endpoint_path = "/api/v1/flow/{flow}/service/{kind}", + auth = auth, + dispatcher = dispatcher_manager.dispatch_flow_service(), ), - - # Per-flow streaming import/export — capability per kind. - _RoutedSocketEndpoint( - endpoint_path="/api/v1/flow/{flow}/import/{kind}", - auth=auth, - dispatcher=dispatcher_manager.dispatch_flow_import(), - capability_map=FLOW_IMPORT_CAPABILITY, + SocketEndpoint( + endpoint_path = "/api/v1/flow/{flow}/import/{kind}", + auth = auth, + dispatcher = dispatcher_manager.dispatch_flow_import() ), - _RoutedSocketEndpoint( - endpoint_path="/api/v1/flow/{flow}/export/{kind}", - auth=auth, - dispatcher=dispatcher_manager.dispatch_flow_export(), - capability_map=FLOW_EXPORT_CAPABILITY, - ), - - StreamEndpoint( - endpoint_path="/api/v1/import-core", - auth=auth, - method="POST", - dispatcher=dispatcher_manager.dispatch_core_import(), - # Cross-subject import — require the admin bundle via a - # single representative capability. - capability="users:admin", + SocketEndpoint( + endpoint_path = "/api/v1/flow/{flow}/export/{kind}", + auth = auth, + dispatcher = dispatcher_manager.dispatch_flow_export() ), StreamEndpoint( - endpoint_path="/api/v1/export-core", - auth=auth, - method="GET", - dispatcher=dispatcher_manager.dispatch_core_export(), - capability="users:admin", + endpoint_path = "/api/v1/import-core", + auth = auth, + method = "POST", + dispatcher = dispatcher_manager.dispatch_core_import(), ), StreamEndpoint( - endpoint_path="/api/v1/document-stream", - auth=auth, - method="GET", - dispatcher=dispatcher_manager.dispatch_document_stream(), - capability="documents:read", + endpoint_path = "/api/v1/export-core", + auth = auth, + method = "GET", + dispatcher = dispatcher_manager.dispatch_core_export(), + ), + StreamEndpoint( + endpoint_path = "/api/v1/document-stream", + auth = auth, + method = "GET", + dispatcher = dispatcher_manager.dispatch_document_stream(), ), ] @@ -296,3 +84,4 @@ class EndpointManager: async def start(self): for ep in self.endpoints: await ep.start() + diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/metrics.py b/trustgraph-flow/trustgraph/gateway/endpoint/metrics.py index 6832d1e3..903a199c 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/metrics.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/metrics.py @@ -10,19 +10,17 @@ import asyncio import uuid import logging -from .. capabilities import enforce - logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) class MetricsEndpoint: - def __init__(self, prometheus_url, endpoint_path, auth, capability): + def __init__(self, prometheus_url, endpoint_path, auth): self.prometheus_url = prometheus_url self.path = endpoint_path self.auth = auth - self.capability = capability + self.operation = "service" async def start(self): pass @@ -37,7 +35,17 @@ class MetricsEndpoint: logger.debug(f"Processing metrics request: {request.path}") - await enforce(request, self.auth, self.capability) + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() path = request.match_info["path"] url = ( diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py index d0e86567..9065761c 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/socket.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/socket.py @@ -4,9 +4,6 @@ from aiohttp import web, WSMsgType import logging from .. running import Running -from .. capabilities import ( - PUBLIC, AUTHENTICATED, check, auth_failure, access_denied, -) logger = logging.getLogger("socket") logger.setLevel(logging.INFO) @@ -14,25 +11,12 @@ logger.setLevel(logging.INFO) class SocketEndpoint: def __init__( - self, endpoint_path, auth, dispatcher, capability, - in_band_auth=False, + self, endpoint_path, auth, dispatcher, ): - """ - ``in_band_auth=True`` skips the handshake-time auth check. - The WebSocket handshake always succeeds; the dispatcher is - expected to gate itself via the first-frame auth protocol - (see ``Mux``). - - This avoids the browser problem where a 401 on the handshake - is treated as permanent and prevents reconnection, and lets - long-lived sockets refresh their credential mid-session by - sending a new auth frame. - """ self.path = endpoint_path self.auth = auth - self.capability = capability - self.in_band_auth = in_band_auth + self.operation = "socket" self.dispatcher = dispatcher @@ -77,29 +61,15 @@ class SocketEndpoint: raise async def handle(self, request): - """Enhanced handler with better cleanup. - - Auth: WebSocket clients pass the bearer token on the - ``?token=...`` query string; we wrap it into a synthetic - Authorization header before delegating to the standard auth - path so the IAM-backed flow (JWT / API key) applies uniformly. - The first-frame auth protocol described in the IAM spec is - a future upgrade.""" - - if not self.in_band_auth and self.capability != PUBLIC: - token = request.query.get("token", "") - if not token: - return auth_failure() - try: - identity = await self.auth.authenticate( - _QueryTokenRequest(token) - ) - except web.HTTPException as e: - return e - if self.capability != AUTHENTICATED: - if not check(identity.roles, self.capability): - return access_denied() + """Enhanced handler with better cleanup""" + try: + token = request.query['token'] + except: + token = "" + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() + # 50MB max message size ws = web.WebSocketResponse(max_msg_size=52428800) @@ -180,11 +150,3 @@ class SocketEndpoint: web.get(self.path, self.handle), ]) - -class _QueryTokenRequest: - """Minimal shim that exposes headers["Authorization"] to - IamAuth.authenticate(), derived from a query-string token.""" - - def __init__(self, token): - self.headers = {"Authorization": f"Bearer {token}"} - diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py index 7b0c4692..38d8846f 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py @@ -1,64 +1,82 @@ -import logging - +import asyncio from aiohttp import web - -from .. capabilities import enforce +import logging logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) - class StreamEndpoint: - def __init__( - self, endpoint_path, auth, dispatcher, capability, method="POST", - ): + def __init__(self, endpoint_path, auth, dispatcher, method="POST"): + self.path = endpoint_path + self.auth = auth - self.capability = capability + self.operation = "service" self.method = method + self.dispatcher = dispatcher async def start(self): pass def add_routes(self, app): + if self.method == "POST": - app.add_routes([web.post(self.path, self.handle)]) + app.add_routes([ + web.post(self.path, self.handle), + ]) elif self.method == "GET": - app.add_routes([web.get(self.path, self.handle)]) + app.add_routes([ + web.get(self.path, self.handle), + ]) else: - raise RuntimeError("Bad method " + self.method) + raise RuntimeError("Bad method" + self.method) async def handle(self, request): logger.debug(f"Processing request: {request.path}") - await enforce(request, self.auth, self.capability) + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() try: + data = request.content async def error(err): - return web.HTTPInternalServerError(text=err) + return web.HTTPInternalServerError(text = err) async def ok( - status=200, reason="OK", - type="application/octet-stream", + status=200, reason="OK", type="application/octet-stream" ): response = web.StreamResponse( - status=status, reason=reason, - headers={"Content-Type": type}, + status = status, reason = reason, + headers = {"Content-Type": type} ) await response.prepare(request) return response - resp = await self.dispatcher.process(data, error, ok, request) + resp = await self.dispatcher.process( + data, error, ok, request + ) + return resp - except web.HTTPException: - raise except Exception as e: - logger.error(f"Exception: {e}", exc_info=True) - return web.json_response({"error": str(e)}) + logging.error(f"Exception: {e}") + + return web.json_response( + { "error": str(e) } + ) + diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py index 5e0d9d21..608de71b 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py @@ -1,27 +1,27 @@ -import logging - +import asyncio from aiohttp import web - -from .. capabilities import enforce, enforce_workspace +import logging logger = logging.getLogger("endpoint") logger.setLevel(logging.INFO) - class VariableEndpoint: - def __init__(self, endpoint_path, auth, dispatcher, capability): + def __init__(self, endpoint_path, auth, dispatcher): self.path = endpoint_path + self.auth = auth - self.capability = capability + self.operation = "service" + self.dispatcher = dispatcher async def start(self): pass def add_routes(self, app): + app.add_routes([ web.post(self.path, self.handle), ]) @@ -30,25 +30,35 @@ class VariableEndpoint: logger.debug(f"Processing request: {request.path}") - identity = await enforce(request, self.auth, self.capability) + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() try: - data = await request.json() - if identity is not None: - enforce_workspace(data, identity) + data = await request.json() async def responder(x, fin): pass resp = await self.dispatcher.process( - data, responder, request.match_info, + data, responder, request.match_info ) return web.json_response(resp) - except web.HTTPException: - raise except Exception as e: - logger.error(f"Exception: {e}", exc_info=True) - return web.json_response({"error": str(e)}) + logging.error(f"Exception: {e}") + + return web.json_response( + { "error": str(e) } + ) + diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index f75f3b25..4e465bf7 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -12,7 +12,7 @@ import os from trustgraph.base.logging import setup_logging, add_logging_args from trustgraph.base.pubsub import get_pubsub, add_pubsub_args -from . auth import IamAuth +from . auth import Authenticator from . config.receiver import ConfigReceiver from . dispatch.manager import DispatcherManager @@ -35,6 +35,7 @@ default_prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus:9090") default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) default_timeout = 600 default_port = 8088 +default_api_token = os.getenv("GATEWAY_SECRET", "") class Api: @@ -59,14 +60,13 @@ class Api: if not self.prometheus_url.endswith("/"): self.prometheus_url += "/" - # IAM-backed authentication. The legacy GATEWAY_SECRET - # shared-token path has been removed — there is no - # "open for everyone" fallback. The gateway cannot - # authenticate any request until IAM is reachable. - self.auth = IamAuth( - backend=self.pubsub_backend, - id=config.get("id", "api-gateway"), - ) + api_token = config.get("api_token", default_api_token) + + # Token not set, or token equal empty string means no auth + if api_token: + self.auth = Authenticator(token=api_token) + else: + self.auth = Authenticator(allow_all=True) self.config_receiver = ConfigReceiver(self.pubsub_backend) @@ -118,7 +118,6 @@ class Api: config_receiver = self.config_receiver, prefix = "gateway", queue_overrides = queue_overrides, - auth = self.auth, ) self.endpoint_manager = EndpointManager( @@ -133,18 +132,12 @@ class Api: ] async def app_factory(self): - + self.app = web.Application( middlewares=[], client_max_size=256 * 1024 * 1024 ) - # Fetch IAM signing public key before accepting traffic. - # Blocks for a bounded retry window; the gateway starts even - # if IAM is still unreachable (JWT validation will 401 until - # the key is available). - await self.auth.start() - await self.config_receiver.start() for ep in self.endpoints: @@ -196,6 +189,12 @@ def run(): help=f'API request timeout in seconds (default: {default_timeout})', ) + parser.add_argument( + '--api-token', + default=default_api_token, + help=f'Secret API token (default: no auth)', + ) + add_logging_args(parser) parser.add_argument( diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 7c7aaffd..2fde4a28 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -172,29 +172,13 @@ def _sign_jwt(kid, private_pem, claims): class IamService: - def __init__(self, host, username, password, keyspace, - bootstrap_mode, bootstrap_token=None): + def __init__(self, host, username, password, keyspace): self.table_store = IamTableStore( host, username, password, keyspace, ) - # bootstrap_mode: "token" or "bootstrap". In "token" mode the - # service auto-seeds on first start using the provided - # bootstrap_token and the ``bootstrap`` operation is refused - # thereafter (indistinguishable from an already-bootstrapped - # deployment per the error policy). In "bootstrap" mode the - # ``bootstrap`` operation is live until tables are populated. - if bootstrap_mode not in ("token", "bootstrap"): - raise ValueError( - f"bootstrap_mode must be 'token' or 'bootstrap', " - f"got {bootstrap_mode!r}" - ) - if bootstrap_mode == "token" and not bootstrap_token: - raise ValueError( - "bootstrap_mode='token' requires bootstrap_token" - ) - self.bootstrap_mode = bootstrap_mode - self.bootstrap_token = bootstrap_token - + # Active signing key cache: (kid, private_pem, public_pem) or + # None. Loaded lazily on first use; refreshed whenever a key + # is created. self._signing_key = None self._signing_key_lock = asyncio.Lock() @@ -299,40 +283,21 @@ class IamService: # bootstrap # ------------------------------------------------------------------ - async def auto_bootstrap_if_token_mode(self): - """Called from the service processor at startup. In - ``token`` mode, if tables are empty, seeds the default - workspace / admin / signing key using the operator-provided - bootstrap token. The admin's API key plaintext is *the* - ``bootstrap_token`` — the operator already knows it, nothing - needs to be returned or logged. - - In ``bootstrap`` mode this is a no-op; seeding happens on - explicit ``bootstrap`` operation invocation.""" - if self.bootstrap_mode != "token": - return + async def handle_bootstrap(self, v): + """No-op if any workspace already exists. Otherwise create + the ``default`` workspace, an ``admin`` user with role + ``admin``, and an initial API key for that admin. The + plaintext API key is returned once in the response.""" if await self.table_store.any_workspace_exists(): logger.info( - "IAM: token mode, tables already populated; skipping " - "auto-bootstrap" + "IAM bootstrap: tables already populated; no-op" ) - return + return IamResponse() - logger.info("IAM: token mode, empty tables; auto-bootstrapping") - await self._seed_tables(self.bootstrap_token) - logger.info( - "IAM: auto-bootstrap complete using operator-provided token" - ) - - async def _seed_tables(self, api_key_plaintext): - """Shared seeding logic used by token-mode auto-bootstrap and - bootstrap-mode handle_bootstrap. Creates the default - workspace, admin user, admin API key (using the given - plaintext), and an initial signing key. Returns the admin - user id.""" now = _now_dt() + # Workspace. await self.table_store.put_workspace( id=DEFAULT_WORKSPACE, name="Default", @@ -340,7 +305,11 @@ class IamService: created=now, ) + # Admin user. admin_user_id = str(uuid.uuid4()) + # Password is set to a random unusable value; admin logs in + # with the API key below. Password login for this user can be + # enabled later by reset-password. admin_password = secrets.token_urlsafe(32) await self.table_store.put_user( id=admin_user_id, @@ -355,18 +324,21 @@ class IamService: created=now, ) + # Admin API key. + plaintext = _generate_api_key() key_id = str(uuid.uuid4()) await self.table_store.put_api_key( - key_hash=_hash_api_key(api_key_plaintext), + key_hash=_hash_api_key(plaintext), id=key_id, user_id=admin_user_id, name="bootstrap", - prefix=api_key_plaintext[:len(API_KEY_PREFIX) + 4], + prefix=plaintext[:len(API_KEY_PREFIX) + 4], expires=None, created=now, last_used=None, ) + # Initial JWT signing key. kid, private_pem, public_pem = _generate_signing_keypair() await self.table_store.put_signing_key( kid=kid, @@ -375,28 +347,15 @@ class IamService: created=now, retired=None, ) + # Populate cache so login calls in this process don't go + # back to Cassandra on first use. self._signing_key = (kid, private_pem, public_pem) logger.info( - f"IAM seeded: workspace={DEFAULT_WORKSPACE!r}, " - f"admin user_id={admin_user_id}, signing key kid={kid}" + f"IAM bootstrap: created workspace={DEFAULT_WORKSPACE!r}, " + f"admin user_id={admin_user_id}, initial API key issued, " + f"signing key kid={kid}" ) - return admin_user_id - - async def handle_bootstrap(self, v): - """Explicit bootstrap op. Only available in ``bootstrap`` - mode and only when tables are empty. Every other case is - masked to a generic auth failure — the caller cannot - distinguish 'not in bootstrap mode' from 'already - bootstrapped' from 'operation forbidden'.""" - if self.bootstrap_mode != "bootstrap": - return _err("auth-failed", "auth failure") - - if await self.table_store.any_workspace_exists(): - return _err("auth-failed", "auth failure") - - plaintext = _generate_api_key() - admin_user_id = await self._seed_tables(plaintext) return IamResponse( bootstrap_admin_user_id=admin_user_id, diff --git a/trustgraph-flow/trustgraph/iam/service/service.py b/trustgraph-flow/trustgraph/iam/service/service.py index 8ea31cf0..61bc1fd8 100644 --- a/trustgraph-flow/trustgraph/iam/service/service.py +++ b/trustgraph-flow/trustgraph/iam/service/service.py @@ -39,32 +39,6 @@ class Processor(AsyncProcessor): "iam_response_queue", default_iam_response_queue, ) - bootstrap_mode = params.get("bootstrap_mode") - bootstrap_token = params.get("bootstrap_token") - - if bootstrap_mode not in ("token", "bootstrap"): - raise RuntimeError( - "iam-svc: --bootstrap-mode is required. Set to 'token' " - "(with --bootstrap-token) for production, or 'bootstrap' " - "to enable the explicit bootstrap operation over the " - "pub/sub bus (dev / quick-start only, not safe under " - "public exposure). Refusing to start." - ) - if bootstrap_mode == "token" and not bootstrap_token: - raise RuntimeError( - "iam-svc: --bootstrap-mode=token requires " - "--bootstrap-token. Refusing to start." - ) - if bootstrap_mode == "bootstrap" and bootstrap_token: - raise RuntimeError( - "iam-svc: --bootstrap-token is not accepted when " - "--bootstrap-mode=bootstrap. Ambiguous intent. " - "Refusing to start." - ) - - self.bootstrap_mode = bootstrap_mode - self.bootstrap_token = bootstrap_token - cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") @@ -122,19 +96,12 @@ class Processor(AsyncProcessor): username=self.cassandra_username, password=self.cassandra_password, keyspace=keyspace, - bootstrap_mode=self.bootstrap_mode, - bootstrap_token=self.bootstrap_token, ) - logger.info( - f"IAM service initialised (bootstrap-mode={self.bootstrap_mode})" - ) + logger.info("IAM service initialised") async def start(self): await self.pubsub.ensure_topic(self.iam_request_topic) - # Token-mode auto-bootstrap runs before we accept requests so - # the first inbound call always sees a populated table. - await self.iam.auto_bootstrap_if_token_mode() await self.iam_request_consumer.start() async def on_iam_request(self, msg, consumer, flow): @@ -177,31 +144,6 @@ class Processor(AsyncProcessor): default=default_iam_response_queue, help=f"IAM response queue (default: {default_iam_response_queue})", ) - parser.add_argument( - "--bootstrap-mode", - default=None, - choices=["token", "bootstrap"], - help=( - "IAM bootstrap mode (required). " - "'token' = operator supplies the initial admin API " - "key via --bootstrap-token; auto-seeds on first start, " - "bootstrap operation refused. " - "'bootstrap' = bootstrap operation is live over the " - "bus until tables are populated; a token is generated " - "and returned by tg-bootstrap-iam. Unsafe to run " - "'bootstrap' mode with public exposure." - ), - ) - parser.add_argument( - "--bootstrap-token", - default=None, - help=( - "Initial admin API key plaintext, required when " - "--bootstrap-mode=token. Treat as a one-time " - "credential: the operator should rotate to a new key " - "and revoke this one after first use." - ), - ) add_cassandra_args(parser)