Compare commits

...

3 commits

Author SHA1 Message Date
Cyber MacGeddon
843e68cded Websocket auth protocol 2026-04-23 20:11:41 +01:00
Cyber MacGeddon
d5dabad001 IAM integrated with gateway 2026-04-23 19:45:28 +01:00
Cyber MacGeddon
8348b7728b IAM secure bootstrap options 2026-04-23 19:23:18 +01:00
18 changed files with 1292 additions and 233 deletions

View file

@ -248,6 +248,46 @@ Passwords, API-key plaintext, and signing-key private material are
never returned in any response other than the explicit one-time never returned in any response other than the explicit one-time
responses above (`reset-password`, `create-api-key`, `bootstrap`). responses above (`reset-password`, `create-api-key`, `bootstrap`).
## Bootstrap modes
`iam-svc` requires a bootstrap mode to be chosen at startup. There is
no default — an unset or invalid mode causes the service to refuse
to start. The purpose is to force the operator to make an explicit
security decision rather than rely on an implicit "safe" fallback.
| Mode | Startup behaviour | `bootstrap` operation | Suitability |
|---|---|---|---|
| `token` | On first start with empty tables, auto-seeds the `default` workspace, admin user, admin API key (using the operator-provided `--bootstrap-token`), and an initial signing key. No-op on subsequent starts. | Refused — returns `auth-failed` / `"auth failure"` regardless of caller. | Production, any public-exposure deployment. |
| `bootstrap` | No startup seeding. Tables remain empty until the `bootstrap` operation is invoked over the pub/sub bus (typically via `tg-bootstrap-iam`). | Live while tables are empty. Generates and returns the admin API key once. Refused (`auth-failed`) once tables are populated. | Dev / compose up / CI. **Not safe under public exposure** — any caller reaching the gateway's `/api/v1/iam` forwarder before the operator can cause a token to be issued to them. Operators choosing this mode accept that risk. |
### Error masking
In both modes, any refused invocation of the `bootstrap` operation
returns the same error (`auth-failed` / `"auth failure"`). A caller
cannot distinguish:
- "service is in token mode"
- "service is in bootstrap mode but already bootstrapped"
- "operation forbidden"
This matches the general IAM error-policy stance (see `iam.md`) and
prevents externally enumerating IAM's state.
### Bootstrap-token lifecycle
The bootstrap token — whether operator-supplied (`token` mode) or
service-generated (`bootstrap` mode) — is a one-time credential. It
is stored as admin's single API key, tagged `name="bootstrap"`. The
operator's first admin action after bootstrap should be:
1. Create a durable admin user and API key (or issue a durable API
key to the bootstrap admin).
2. Revoke the bootstrap key via `revoke-api-key`.
3. Remove the bootstrap token from any deployment configuration.
The `name="bootstrap"` marker makes bootstrap keys easy to detect in
tooling (e.g. a `tg-list-api-keys` filter).
## HTTP forwarding (initial integration) ## HTTP forwarding (initial integration)
For the initial gateway integration — before the IAM service is For the initial gateway integration — before the IAM service is

View file

@ -40,6 +40,7 @@ tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main"
tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main"
tg-get-document-content = "trustgraph.cli.get_document_content:main" tg-get-document-content = "trustgraph.cli.get_document_content:main"
tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main"
tg-bootstrap-iam = "trustgraph.cli.bootstrap_iam:main"
tg-invoke-agent = "trustgraph.cli.invoke_agent:main" tg-invoke-agent = "trustgraph.cli.invoke_agent:main"
tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main" tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main"
tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main" tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main"

View file

@ -0,0 +1,94 @@
"""
Bootstraps the IAM service. Only works when iam-svc is running in
bootstrap mode with empty tables. Prints the initial admin API key
to stdout.
This is a one-time, trust-sensitive operation. The resulting token
is shown once and never again capture it on use. Rotate and
revoke it as soon as a real admin API key has been issued.
"""
import argparse
import json
import os
import sys
import requests
default_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/")
def bootstrap(url):
# Unauthenticated public endpoint — IAM refuses the bootstrap
# operation unless the service is running in bootstrap mode with
# empty tables, so the safety gate lives on the server side.
endpoint = url.rstrip("/") + "/api/v1/auth/bootstrap"
headers = {"Content-Type": "application/json"}
resp = requests.post(
endpoint,
headers=headers,
data=json.dumps({}),
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP {resp.status_code}: {resp.text}"
)
body = resp.json()
if "error" in body:
raise RuntimeError(
f"IAM {body['error'].get('type', 'error')}: "
f"{body['error'].get('message', '')}"
)
api_key = body.get("bootstrap_admin_api_key")
user_id = body.get("bootstrap_admin_user_id")
if not api_key:
raise RuntimeError(
"IAM response did not contain a bootstrap token — the "
"service may already be bootstrapped, or may be running "
"in token mode."
)
return user_id, api_key
def main():
parser = argparse.ArgumentParser(
prog="tg-bootstrap-iam",
description=__doc__,
)
parser.add_argument(
"-u", "--api-url",
default=default_url,
help=f"API URL (default: {default_url})",
)
args = parser.parse_args()
try:
user_id, api_key = bootstrap(args.api_url)
except Exception as e:
print("Exception:", e, file=sys.stderr, flush=True)
sys.exit(1)
# Stdout gets machine-readable output (the key). Any operator
# context goes to stderr.
print(f"Admin user id: {user_id}", file=sys.stderr)
print(
"Admin API key (shown once, capture now):",
file=sys.stderr,
)
print(api_key)
if __name__ == "__main__":
main()

View file

@ -1,22 +1,264 @@
"""
IAM-backed authentication for the API gateway.
class Authenticator: Replaces the legacy GATEWAY_SECRET shared-token Authenticator. The
gateway is now stateless with respect to credentials: it either
verifies a JWT locally using the active IAM signing public key, or
resolves an API key by hash with a short local cache backed by the
IAM service.
def __init__(self, token=None, allow_all=False): Identity returned by authenticate() is the (user_id, workspace,
roles) triple the rest of the gateway capability checks, workspace
resolver, audit logging needs.
"""
if not allow_all and token is None: import asyncio
raise RuntimeError("Need a token") import base64
import hashlib
import json
import logging
import time
import uuid
from dataclasses import dataclass
if not allow_all and token == "": from aiohttp import web
raise RuntimeError("Need a token")
self.token = token from cryptography.hazmat.primitives import serialization
self.allow_all = allow_all from cryptography.hazmat.primitives.asymmetric import ed25519
def permitted(self, token, roles): from ..base.iam_client import IamClient
from ..base.metrics import ProducerMetrics, SubscriberMetrics
from ..schema import (
IamRequest, IamResponse,
iam_request_queue, iam_response_queue,
)
if self.allow_all: return True logger = logging.getLogger("auth")
if self.token != token: return False API_KEY_CACHE_TTL = 60 # seconds
return True
@dataclass
class Identity:
user_id: str
workspace: str
roles: list
source: str # "api-key" | "jwt"
def _auth_failure():
return web.HTTPUnauthorized(
text='{"error":"auth failure"}',
content_type="application/json",
)
def _access_denied():
return web.HTTPForbidden(
text='{"error":"access denied"}',
content_type="application/json",
)
def _b64url_decode(s):
pad = "=" * (-len(s) % 4)
return base64.urlsafe_b64decode(s + pad)
def _verify_jwt_eddsa(token, public_pem):
"""Verify an Ed25519 JWT and return its claims. Raises on any
validation failure. Refuses non-EdDSA algorithms."""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("malformed JWT")
h_b64, p_b64, s_b64 = parts
signing_input = f"{h_b64}.{p_b64}".encode("ascii")
header = json.loads(_b64url_decode(h_b64))
if header.get("alg") != "EdDSA":
raise ValueError(f"unsupported alg: {header.get('alg')!r}")
key = serialization.load_pem_public_key(public_pem.encode("ascii"))
if not isinstance(key, ed25519.Ed25519PublicKey):
raise ValueError("public key is not Ed25519")
signature = _b64url_decode(s_b64)
key.verify(signature, signing_input) # raises InvalidSignature
claims = json.loads(_b64url_decode(p_b64))
exp = claims.get("exp")
if exp is None or exp < time.time():
raise ValueError("expired")
return claims
class IamAuth:
"""Resolves bearer credentials via the IAM service.
Used by every gateway endpoint that needs authentication. Fetches
the IAM signing public key at startup (cached in memory). API
keys are resolved via the IAM service with a local hashidentity
cache (short TTL so revoked keys stop working within the TTL
window without any push mechanism)."""
def __init__(self, backend, id="api-gateway"):
self.backend = backend
self.id = id
# Populated at start() via IAM.
self._signing_public_pem = None
# API-key cache: plaintext_sha256_hex -> (Identity, expires_ts)
self._key_cache = {}
self._key_cache_lock = asyncio.Lock()
# ------------------------------------------------------------------
# Short-lived client helper. Mirrors the pattern used by the
# bootstrap framework and AsyncProcessor: a fresh uuid suffix per
# invocation so Pulsar exclusive subscriptions don't collide with
# ghosts from prior calls.
# ------------------------------------------------------------------
def _make_client(self):
rr_id = str(uuid.uuid4())
return IamClient(
backend=self.backend,
subscription=f"{self.id}--iam--{rr_id}",
consumer_name=self.id,
request_topic=iam_request_queue,
request_schema=IamRequest,
request_metrics=ProducerMetrics(
processor=self.id, flow=None, name="iam-request",
),
response_topic=iam_response_queue,
response_schema=IamResponse,
response_metrics=SubscriberMetrics(
processor=self.id, flow=None, name="iam-response",
),
)
async def _with_client(self, op):
"""Open a short-lived IamClient, run ``op(client)``, close."""
client = self._make_client()
await client.start()
try:
return await op(client)
finally:
try:
await client.stop()
except Exception:
pass
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self, max_retries=30, retry_delay=2.0):
"""Fetch the signing public key from IAM. Retries on
failure the gateway may be starting before IAM is ready."""
async def _fetch(client):
return await client.get_signing_key_public()
for attempt in range(max_retries):
try:
pem = await self._with_client(_fetch)
if pem:
self._signing_public_pem = pem
logger.info(
"IamAuth: fetched IAM signing public key "
f"({len(pem)} bytes)"
)
return
except Exception as e:
logger.info(
f"IamAuth: waiting for IAM signing key "
f"({type(e).__name__}: {e}); "
f"retry {attempt + 1}/{max_retries}"
)
await asyncio.sleep(retry_delay)
# Don't prevent startup forever. A later authenticate() call
# will try again via the JWT path.
logger.warning(
"IamAuth: could not fetch IAM signing key at startup; "
"JWT validation will fail until it's available"
)
# ------------------------------------------------------------------
# Authentication
# ------------------------------------------------------------------
async def authenticate(self, request):
"""Extract and validate the Bearer credential from an HTTP
request. Returns an ``Identity``. Raises HTTPUnauthorized
(401 / "auth failure") on any failure mode the caller
cannot distinguish missing / malformed / invalid / expired /
revoked credentials."""
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
raise _auth_failure()
token = header[len("Bearer "):].strip()
if not token:
raise _auth_failure()
# API keys always start with "tg_". JWTs have two dots and
# no "tg_" prefix. Discriminate cheaply.
if token.startswith("tg_"):
return await self._resolve_api_key(token)
if token.count(".") == 2:
return self._verify_jwt(token)
raise _auth_failure()
def _verify_jwt(self, token):
if not self._signing_public_pem:
raise _auth_failure()
try:
claims = _verify_jwt_eddsa(token, self._signing_public_pem)
except Exception as e:
logger.debug(f"JWT validation failed: {type(e).__name__}: {e}")
raise _auth_failure()
sub = claims.get("sub", "")
ws = claims.get("workspace", "")
roles = list(claims.get("roles", []))
if not sub or not ws:
raise _auth_failure()
return Identity(
user_id=sub, workspace=ws, roles=roles, source="jwt",
)
async def _resolve_api_key(self, plaintext):
h = hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
cached = self._key_cache.get(h)
now = time.time()
if cached and cached[1] > now:
return cached[0]
async with self._key_cache_lock:
cached = self._key_cache.get(h)
if cached and cached[1] > now:
return cached[0]
try:
async def _call(client):
return await client.resolve_api_key(plaintext)
user_id, workspace, roles = await self._with_client(_call)
except Exception as e:
logger.debug(
f"API key resolution failed: "
f"{type(e).__name__}: {e}"
)
raise _auth_failure()
if not user_id or not workspace:
raise _auth_failure()
identity = Identity(
user_id=user_id, workspace=workspace,
roles=list(roles), source="api-key",
)
self._key_cache[h] = (identity, now + API_KEY_CACHE_TTL)
return identity

View file

@ -0,0 +1,163 @@
"""
Capability vocabulary and OSS role bundles.
See docs/tech-specs/capabilities.md for the authoritative description.
The mapping below is the data form of the OSS bundle table in that
spec. Enterprise editions may replace this module with their own
role table; the vocabulary (capability strings) is shared.
The module also exposes:
- ``PUBLIC`` a sentinel indicating an endpoint requires no
authentication (login, bootstrap).
- ``AUTHENTICATED`` a sentinel indicating an endpoint requires a
valid identity but no specific capability (e.g. change-password).
- ``check(roles, capability)`` the union-of-bundles membership test.
"""
from aiohttp import web
PUBLIC = "__public__"
AUTHENTICATED = "__authenticated__"
# Capability vocabulary. Mirrors the "Capability list" tables in
# capabilities.md. Kept as a set of valid strings so the gateway can
# fail-closed on an endpoint that declares an unknown capability.
KNOWN_CAPABILITIES = {
# Data plane
"agent",
"graph:read", "graph:write",
"documents:read", "documents:write",
"rows:read", "rows:write",
"llm",
"embeddings",
"mcp",
# Control plane
"config:read", "config:write",
"flows:read", "flows:write",
"users:read", "users:write", "users:admin",
"keys:self", "keys:admin",
"workspaces:admin",
"iam:admin",
"metrics:read",
"collections:read", "collections:write",
"knowledge:read", "knowledge:write",
}
# OSS role → capability set. Enterprise overrides this mapping.
_READER_CAPS = {
"agent",
"graph:read",
"documents:read",
"rows:read",
"llm",
"embeddings",
"mcp",
"config:read",
"flows:read",
"collections:read",
"knowledge:read",
"keys:self",
}
_WRITER_CAPS = _READER_CAPS | {
"graph:write",
"documents:write",
"rows:write",
"collections:write",
"knowledge:write",
}
_ADMIN_CAPS = _WRITER_CAPS | {
"config:write",
"flows:write",
"users:read", "users:write", "users:admin",
"keys:admin",
"workspaces:admin",
"iam:admin",
"metrics:read",
}
ROLE_CAPABILITIES = {
"reader": _READER_CAPS,
"writer": _WRITER_CAPS,
"admin": _ADMIN_CAPS,
}
def check(roles, capability):
"""Return True if any of ``roles`` grants ``capability``.
Unknown roles contribute zero capabilities (deterministic fail-
closed behaviour per the spec)."""
if capability not in KNOWN_CAPABILITIES:
# Endpoint misconfiguration. Fail closed.
return False
for r in roles:
if capability in ROLE_CAPABILITIES.get(r, ()):
return True
return False
def access_denied():
return web.HTTPForbidden(
text='{"error":"access denied"}',
content_type="application/json",
)
def auth_failure():
return web.HTTPUnauthorized(
text='{"error":"auth failure"}',
content_type="application/json",
)
async def enforce(request, auth, capability):
"""Authenticate + capability-check in one step. Returns an
``Identity`` (or ``None`` for ``PUBLIC`` endpoints) or raises
the appropriate HTTPException.
Usage in an endpoint handler:
identity = await enforce(request, self.auth, self.capability)
- ``PUBLIC``: no authentication attempted, returns ``None``.
- ``AUTHENTICATED``: any valid identity is accepted.
- any capability string: identity must carry a role granting it.
"""
if capability == PUBLIC:
return None
identity = await auth.authenticate(request)
if capability == AUTHENTICATED:
return identity
if not check(identity.roles, capability):
raise access_denied()
return identity
def enforce_workspace(data, identity):
"""Validate + inject the workspace field on a request body.
OSS behaviour:
- If ``data["workspace"]`` is present and differs from the
caller's assigned workspace → 403.
- Otherwise, set ``data["workspace"]`` to the caller's assigned
workspace.
Enterprise editions will plug in a different resolver that
checks a permitted-set instead of a single value; the wire
protocol is unchanged."""
requested = data.get("workspace", "") if isinstance(data, dict) else ""
if requested and requested != identity.workspace:
raise access_denied()
if isinstance(data, dict):
data["workspace"] = identity.workspace
return data

View file

@ -108,12 +108,18 @@ class DispatcherWrapper:
class DispatcherManager: class DispatcherManager:
def __init__(self, backend, config_receiver, prefix="api-gateway", def __init__(self, backend, config_receiver, prefix="api-gateway",
queue_overrides=None): queue_overrides=None, auth=None):
self.backend = backend self.backend = backend
self.config_receiver = config_receiver self.config_receiver = config_receiver
self.config_receiver.add_handler(self) self.config_receiver.add_handler(self)
self.prefix = prefix self.prefix = prefix
# Gateway IamAuth — used by the socket mux for first-frame
# auth. ``None`` keeps the legacy "caller-supplied
# workspace" behaviour for anything that instantiates Mux
# directly without auth.
self.auth = auth
# Store queue overrides for global services # Store queue overrides for global services
# Format: {"config": {"request": "...", "response": "..."}, ...} # Format: {"config": {"request": "...", "response": "..."}, ...}
self.queue_overrides = queue_overrides or {} self.queue_overrides = queue_overrides or {}
@ -165,6 +171,15 @@ class DispatcherManager:
def dispatch_global_service(self): def dispatch_global_service(self):
return DispatcherWrapper(self.process_global_service) return DispatcherWrapper(self.process_global_service)
def dispatch_auth_iam(self):
"""Pre-configured IAM dispatcher for the gateway's auth
endpoints (login, bootstrap, change-password). Pins the
kind to ``iam`` so these handlers don't have to supply URL
params the global dispatcher would expect."""
async def _process(data, responder):
return await self.invoke_global_service(data, responder, "iam")
return DispatcherWrapper(_process)
def dispatch_core_export(self): def dispatch_core_export(self):
return DispatcherWrapper(self.process_core_export) return DispatcherWrapper(self.process_core_export)
@ -316,7 +331,10 @@ class DispatcherManager:
async def process_socket(self, ws, running, params): async def process_socket(self, ws, running, params):
dispatcher = Mux(self, ws, running) # The mux self-authenticates via the first-frame protocol;
# pass the gateway's IamAuth so it can validate tokens
# without reaching back into the endpoint layer.
dispatcher = Mux(self, ws, running, auth=self.auth)
return dispatcher return dispatcher

View file

@ -16,11 +16,26 @@ MAX_QUEUE_SIZE = 10
class Mux: class Mux:
def __init__(self, dispatcher_manager, ws, running): def __init__(self, dispatcher_manager, ws, running, auth=None):
"""
``auth`` an ``IamAuth`` when the enclosing endpoint is
configured for in-band first-frame auth. ``None`` for the
legacy ``?token=`` path (kept for the flow import/export
streaming endpoints).
"""
self.dispatcher_manager = dispatcher_manager self.dispatcher_manager = dispatcher_manager
self.ws = ws self.ws = ws
self.running = running self.running = running
self.auth = auth
# Authenticated identity, populated by the first-frame auth
# protocol. ``None`` means the socket is not yet
# authenticated; any non-auth frame is refused. If
# ``auth`` is ``None`` (legacy path) the mux acts as if
# already authenticated and uses client-supplied workspace
# values (pre-existing behaviour).
self.identity = None
self.q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) self.q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
@ -31,6 +46,41 @@ class Mux:
if self.ws: if self.ws:
await self.ws.close() await self.ws.close()
async def _handle_auth_frame(self, data):
"""Process a ``{"type": "auth", "token": "..."}`` frame.
On success, updates ``self.identity`` and returns an
``auth-ok`` response frame. On failure, returns the masked
auth-failure frame. Never raises auth failures keep the
socket open so the client can retry without reconnecting
(important for browsers, which treat a handshake-time 401
as terminal)."""
token = data.get("token", "")
if not token or self.auth is None:
await self.ws.send_json({
"type": "auth-failed",
"error": "auth failure",
})
return
class _Shim:
def __init__(self, tok):
self.headers = {"Authorization": f"Bearer {tok}"}
try:
identity = await self.auth.authenticate(_Shim(token))
except Exception:
await self.ws.send_json({
"type": "auth-failed",
"error": "auth failure",
})
return
self.identity = identity
await self.ws.send_json({
"type": "auth-ok",
"workspace": identity.workspace,
})
async def receive(self, msg): async def receive(self, msg):
request_id = None request_id = None
@ -38,6 +88,18 @@ class Mux:
try: try:
data = msg.json() data = msg.json()
# In-band auth protocol: the client sends
# ``{"type": "auth", "token": "..."}`` as its first frame
# (and any time it wants to re-auth: JWT refresh, token
# rotation, workspace switch in a future multi-workspace
# enterprise). The protocol coexists with legacy
# non-auth sockets (self.auth is None) — on those, every
# frame is a request and workspace is caller-supplied.
if isinstance(data, dict) and data.get("type") == "auth":
await self._handle_auth_frame(data)
return
request_id = data.get("id") request_id = data.get("id")
if "request" not in data: if "request" not in data:
@ -46,9 +108,42 @@ class Mux:
if "id" not in data: if "id" not in data:
raise RuntimeError("Bad message") raise RuntimeError("Bad message")
# First-frame auth gating: if the enclosing endpoint is
# configured for in-band auth, reject all non-auth frames
# until an auth-ok has been issued.
if self.auth is not None and self.identity is None:
await self.ws.send_json({
"id": request_id,
"error": {
"message": "auth failure",
"type": "auth-required",
},
"complete": True,
})
return
# Workspace resolution. Authenticated sockets override
# the client-supplied workspace with the resolved value
# from the identity; mismatch is an access-denied error.
if self.identity is not None:
requested_ws = data.get("workspace", "")
if requested_ws and requested_ws != self.identity.workspace:
await self.ws.send_json({
"id": request_id,
"error": {
"message": "access denied",
"type": "access-denied",
},
"complete": True,
})
return
workspace = self.identity.workspace
else:
workspace = data.get("workspace", "default")
await self.q.put(( await self.q.put((
data["id"], data["id"],
data.get("workspace", "default"), workspace,
data.get("flow"), data.get("flow"),
data["service"], data["service"],
data["request"] data["request"]

View file

@ -0,0 +1,115 @@
"""
Gateway auth endpoints.
Three dedicated paths:
POST /api/v1/auth/login unauthenticated; username/password JWT
POST /api/v1/auth/bootstrap unauthenticated; IAM bootstrap op
POST /api/v1/auth/change-password authenticated; any role
These are the only IAM-surface operations that can be reached from
outside. Everything else routes through ``/api/v1/iam`` gated by
``users:admin``.
"""
import logging
from aiohttp import web
from .. capabilities import enforce, PUBLIC, AUTHENTICATED
logger = logging.getLogger("auth-endpoints")
logger.setLevel(logging.INFO)
class AuthEndpoints:
"""Groups the three auth-surface handlers. Each forwards to the
IAM service via the existing ``IamRequestor`` dispatcher."""
def __init__(self, iam_dispatcher, auth):
self.iam = iam_dispatcher
self.auth = auth
async def start(self):
pass
def add_routes(self, app):
app.add_routes([
web.post("/api/v1/auth/login", self.login),
web.post("/api/v1/auth/bootstrap", self.bootstrap),
web.post(
"/api/v1/auth/change-password",
self.change_password,
),
])
async def _forward(self, body):
async def responder(x, fin):
pass
return await self.iam.process(body, responder)
async def login(self, request):
"""Public. Accepts {username, password, workspace?}. Returns
{jwt, jwt_expires} on success; IAM's masked auth failure on
anything else."""
await enforce(request, self.auth, PUBLIC)
try:
body = await request.json()
except Exception:
return web.json_response(
{"error": "invalid json"}, status=400,
)
req = {
"operation": "login",
"username": body.get("username", ""),
"password": body.get("password", ""),
"workspace": body.get("workspace", ""),
}
resp = await self._forward(req)
if "error" in resp:
return web.json_response(
{"error": "auth failure"}, status=401,
)
return web.json_response(resp)
async def bootstrap(self, request):
"""Public. Valid only when IAM is running in bootstrap mode
with empty tables. In every other case the IAM service
returns a masked auth-failure."""
await enforce(request, self.auth, PUBLIC)
resp = await self._forward({"operation": "bootstrap"})
if "error" in resp:
return web.json_response(
{"error": "auth failure"}, status=401,
)
return web.json_response(resp)
async def change_password(self, request):
"""Authenticated (any role). Accepts {current_password,
new_password}; user_id is taken from the authenticated
identity the caller cannot change someone else's password
this way (reset-password is the admin path)."""
identity = await enforce(request, self.auth, AUTHENTICATED)
try:
body = await request.json()
except Exception:
return web.json_response(
{"error": "invalid json"}, status=400,
)
req = {
"operation": "change-password",
"user_id": identity.user_id,
"password": body.get("current_password", ""),
"new_password": body.get("new_password", ""),
}
resp = await self._forward(req)
if "error" in resp:
err_type = resp.get("error", {}).get("type", "")
if err_type == "auth-failed":
return web.json_response(
{"error": "auth failure"}, status=401,
)
return web.json_response(
{"error": resp.get("error", {}).get("message", "error")},
status=400,
)
return web.json_response(resp)

View file

@ -1,28 +1,27 @@
import asyncio
from aiohttp import web
import uuid
import logging import logging
from aiohttp import web
from .. capabilities import enforce, enforce_workspace
logger = logging.getLogger("endpoint") logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class ConstantEndpoint: class ConstantEndpoint:
def __init__(self, endpoint_path, auth, dispatcher): def __init__(self, endpoint_path, auth, dispatcher, capability):
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "service" self.capability = capability
self.dispatcher = dispatcher self.dispatcher = dispatcher
async def start(self): async def start(self):
pass pass
def add_routes(self, app): def add_routes(self, app):
app.add_routes([ app.add_routes([
web.post(self.path, self.handle), web.post(self.path, self.handle),
]) ])
@ -31,22 +30,14 @@ class ConstantEndpoint:
logger.debug(f"Processing request: {request.path}") logger.debug(f"Processing request: {request.path}")
try: identity = await enforce(request, self.auth, self.capability)
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
try: try:
data = await request.json() data = await request.json()
if identity is not None:
enforce_workspace(data, identity)
async def responder(x, fin): async def responder(x, fin):
pass pass
@ -54,10 +45,8 @@ class ConstantEndpoint:
return web.json_response(resp) return web.json_response(resp)
except web.HTTPException:
raise
except Exception as e: except Exception as e:
logging.error(f"Exception: {e}") logger.error(f"Exception: {e}", exc_info=True)
return web.json_response({"error": str(e)})
return web.json_response(
{ "error": str(e) }
)

View file

@ -4,16 +4,18 @@ from aiohttp import web
from trustgraph.i18n import get_language_pack from trustgraph.i18n import get_language_pack
from .. capabilities import enforce
logger = logging.getLogger("endpoint") logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class I18nPackEndpoint: class I18nPackEndpoint:
def __init__(self, endpoint_path: str, auth): def __init__(self, endpoint_path: str, auth, capability):
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "service" self.capability = capability
async def start(self): async def start(self):
pass pass
@ -26,26 +28,13 @@ class I18nPackEndpoint:
async def handle(self, request): async def handle(self, request):
logger.debug(f"Processing i18n pack request: {request.path}") logger.debug(f"Processing i18n pack request: {request.path}")
token = "" await enforce(request, self.auth, self.capability)
try:
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except Exception:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
lang = request.match_info.get("lang") or "en" lang = request.match_info.get("lang") or "en"
# This is a path traversal defense, and is a critical sec defense. # Path-traversal defense — critical, do not remove.
# Do not remove!
if "/" in lang or ".." in lang: if "/" in lang or ".." in lang:
return web.HTTPBadRequest(reason="Invalid language code") return web.HTTPBadRequest(reason="Invalid language code")
pack = get_language_pack(lang) pack = get_language_pack(lang)
return web.json_response(pack) return web.json_response(pack)

View file

@ -8,72 +8,284 @@ from . variable_endpoint import VariableEndpoint
from . socket import SocketEndpoint from . socket import SocketEndpoint
from . metrics import MetricsEndpoint from . metrics import MetricsEndpoint
from . i18n import I18nPackEndpoint from . i18n import I18nPackEndpoint
from . auth_endpoints import AuthEndpoints
from .. capabilities import PUBLIC, AUTHENTICATED
from .. dispatch.manager import DispatcherManager from .. dispatch.manager import DispatcherManager
# Capability required for each kind on the /api/v1/{kind} generic
# endpoint (global services). Coarse gating — the IAM bundle split
# of "read vs write" per admin subsystem is not applied here because
# this endpoint forwards an opaque operation in the body. Writes
# are the upper bound on what the endpoint can do, so we gate on
# the write/admin capability.
GLOBAL_KIND_CAPABILITY = {
"config": "config:write",
"flow": "flows:write",
"librarian": "documents:write",
"knowledge": "knowledge:write",
"collection-management": "collections:write",
# IAM endpoints land on /api/v1/iam and require the admin bundle.
# Login / bootstrap / change-password are served by
# AuthEndpoints, which handle their own gating (PUBLIC /
# AUTHENTICATED).
"iam": "users:admin",
}
# Capability required for each kind on the
# /api/v1/flow/{flow}/service/{kind} endpoint (per-flow data-plane).
FLOW_KIND_CAPABILITY = {
"agent": "agent",
"text-completion": "llm",
"prompt": "llm",
"mcp-tool": "mcp",
"graph-rag": "graph:read",
"document-rag": "documents:read",
"embeddings": "embeddings",
"graph-embeddings": "graph:read",
"document-embeddings": "documents:read",
"triples": "graph:read",
"rows": "rows:read",
"nlp-query": "rows:read",
"structured-query": "rows:read",
"structured-diag": "rows:read",
"row-embeddings": "rows:read",
"sparql": "graph:read",
}
# Capability for the streaming flow import/export endpoints,
# keyed by the "kind" URL segment.
FLOW_IMPORT_CAPABILITY = {
"triples": "graph:write",
"graph-embeddings": "graph:write",
"document-embeddings": "documents:write",
"entity-contexts": "documents:write",
"rows": "rows:write",
}
FLOW_EXPORT_CAPABILITY = {
"triples": "graph:read",
"graph-embeddings": "graph:read",
"document-embeddings": "documents:read",
"entity-contexts": "documents:read",
}
from .. capabilities import enforce, enforce_workspace
import logging as _mgr_logging
_mgr_logger = _mgr_logging.getLogger("endpoint")
class _RoutedVariableEndpoint:
"""HTTP endpoint whose required capability is looked up per
request from the URL's ``kind`` parameter. Used for the two
generic dispatch paths (``/api/v1/{kind}`` and
``/api/v1/flow/{flow}/service/{kind}``). Self-contained rather
than subclassing ``VariableEndpoint`` to avoid mutating shared
state across concurrent requests."""
def __init__(self, endpoint_path, auth, dispatcher, capability_map):
self.path = endpoint_path
self.auth = auth
self.dispatcher = dispatcher
self._capability_map = capability_map
async def start(self):
pass
def add_routes(self, app):
app.add_routes([web.post(self.path, self.handle)])
async def handle(self, request):
kind = request.match_info.get("kind", "")
cap = self._capability_map.get(kind)
if cap is None:
return web.json_response(
{"error": "unknown kind"}, status=404,
)
identity = await enforce(request, self.auth, cap)
try:
data = await request.json()
if identity is not None:
enforce_workspace(data, identity)
async def responder(x, fin):
pass
resp = await self.dispatcher.process(
data, responder, request.match_info,
)
return web.json_response(resp)
except web.HTTPException:
raise
except Exception as e:
_mgr_logger.error(f"Exception: {e}", exc_info=True)
return web.json_response({"error": str(e)})
class _RoutedSocketEndpoint:
"""WebSocket endpoint whose required capability is looked up per
request from the URL's ``kind`` parameter. Used for the flow
import/export streaming endpoints."""
def __init__(self, endpoint_path, auth, dispatcher, capability_map):
self.path = endpoint_path
self.auth = auth
self.dispatcher = dispatcher
self._capability_map = capability_map
async def start(self):
pass
def add_routes(self, app):
app.add_routes([web.get(self.path, self.handle)])
async def handle(self, request):
from .. capabilities import check, auth_failure, access_denied
kind = request.match_info.get("kind", "")
cap = self._capability_map.get(kind)
if cap is None:
return web.json_response(
{"error": "unknown kind"}, status=404,
)
token = request.query.get("token", "")
if not token:
return auth_failure()
from . socket import _QueryTokenRequest
try:
identity = await self.auth.authenticate(
_QueryTokenRequest(token)
)
except web.HTTPException as e:
return e
if not check(identity.roles, cap):
return access_denied()
# Delegate the websocket handling to a standalone SocketEndpoint
# with the resolved capability, bypassing the per-request mutation
# concern by instantiating fresh state.
ws_ep = SocketEndpoint(
endpoint_path=self.path,
auth=self.auth,
dispatcher=self.dispatcher,
capability=cap,
)
return await ws_ep.handle(request)
class EndpointManager: class EndpointManager:
def __init__( def __init__(
self, dispatcher_manager, auth, prometheus_url, timeout=600 self, dispatcher_manager, auth, prometheus_url, timeout=600,
): ):
self.dispatcher_manager = dispatcher_manager self.dispatcher_manager = dispatcher_manager
self.timeout = timeout self.timeout = timeout
self.services = { # IAM forwarder (needed by AuthEndpoints). The same dispatcher
} # the global /api/v1/iam path uses. No workspace enforcement on
# auth endpoints since login / bootstrap / change-password are
# pre-identity.
self._iam_dispatcher = dispatcher_manager.dispatch_global_service()
self.endpoints = [ self.endpoints = [
# Auth surface — public / authenticated-any. Must come
# before the generic /api/v1/{kind} routes to win the
# match for /api/v1/auth/* paths. aiohttp routes in
# registration order, so we prepend here.
AuthEndpoints(
iam_dispatcher=dispatcher_manager.dispatch_auth_iam(),
auth=auth,
),
I18nPackEndpoint( I18nPackEndpoint(
endpoint_path = "/api/v1/i18n/packs/{lang}", endpoint_path="/api/v1/i18n/packs/{lang}",
auth = auth, auth=auth,
capability=PUBLIC,
), ),
MetricsEndpoint( MetricsEndpoint(
endpoint_path = "/api/metrics", endpoint_path="/api/metrics",
prometheus_url = prometheus_url, prometheus_url=prometheus_url,
auth = auth, auth=auth,
capability="metrics:read",
), ),
VariableEndpoint(
endpoint_path = "/api/v1/{kind}", auth = auth, # Global services: capability chosen per-kind.
dispatcher = dispatcher_manager.dispatch_global_service(), _RoutedVariableEndpoint(
endpoint_path="/api/v1/{kind}",
auth=auth,
dispatcher=dispatcher_manager.dispatch_global_service(),
capability_map=GLOBAL_KIND_CAPABILITY,
), ),
# /api/v1/socket: WebSocket handshake accepts
# unconditionally; the Mux dispatcher runs the
# first-frame auth protocol. Handshake-time 401s break
# browser reconnection, so authentication is always
# in-band for this endpoint.
SocketEndpoint( SocketEndpoint(
endpoint_path = "/api/v1/socket", endpoint_path="/api/v1/socket",
auth = auth, auth=auth,
dispatcher = dispatcher_manager.dispatch_socket() dispatcher=dispatcher_manager.dispatch_socket(),
capability=AUTHENTICATED, # informational only; bypassed
in_band_auth=True,
), ),
VariableEndpoint(
endpoint_path = "/api/v1/flow/{flow}/service/{kind}", # Per-flow request/response services — capability per kind.
auth = auth, _RoutedVariableEndpoint(
dispatcher = dispatcher_manager.dispatch_flow_service(), endpoint_path="/api/v1/flow/{flow}/service/{kind}",
auth=auth,
dispatcher=dispatcher_manager.dispatch_flow_service(),
capability_map=FLOW_KIND_CAPABILITY,
), ),
SocketEndpoint(
endpoint_path = "/api/v1/flow/{flow}/import/{kind}", # Per-flow streaming import/export — capability per kind.
auth = auth, _RoutedSocketEndpoint(
dispatcher = dispatcher_manager.dispatch_flow_import() endpoint_path="/api/v1/flow/{flow}/import/{kind}",
auth=auth,
dispatcher=dispatcher_manager.dispatch_flow_import(),
capability_map=FLOW_IMPORT_CAPABILITY,
), ),
SocketEndpoint( _RoutedSocketEndpoint(
endpoint_path = "/api/v1/flow/{flow}/export/{kind}", endpoint_path="/api/v1/flow/{flow}/export/{kind}",
auth = auth, auth=auth,
dispatcher = dispatcher_manager.dispatch_flow_export() dispatcher=dispatcher_manager.dispatch_flow_export(),
capability_map=FLOW_EXPORT_CAPABILITY,
),
StreamEndpoint(
endpoint_path="/api/v1/import-core",
auth=auth,
method="POST",
dispatcher=dispatcher_manager.dispatch_core_import(),
# Cross-subject import — require the admin bundle via a
# single representative capability.
capability="users:admin",
), ),
StreamEndpoint( StreamEndpoint(
endpoint_path = "/api/v1/import-core", endpoint_path="/api/v1/export-core",
auth = auth, auth=auth,
method = "POST", method="GET",
dispatcher = dispatcher_manager.dispatch_core_import(), dispatcher=dispatcher_manager.dispatch_core_export(),
capability="users:admin",
), ),
StreamEndpoint( StreamEndpoint(
endpoint_path = "/api/v1/export-core", endpoint_path="/api/v1/document-stream",
auth = auth, auth=auth,
method = "GET", method="GET",
dispatcher = dispatcher_manager.dispatch_core_export(), dispatcher=dispatcher_manager.dispatch_document_stream(),
), capability="documents:read",
StreamEndpoint(
endpoint_path = "/api/v1/document-stream",
auth = auth,
method = "GET",
dispatcher = dispatcher_manager.dispatch_document_stream(),
), ),
] ]
@ -84,4 +296,3 @@ class EndpointManager:
async def start(self): async def start(self):
for ep in self.endpoints: for ep in self.endpoints:
await ep.start() await ep.start()

View file

@ -10,17 +10,19 @@ import asyncio
import uuid import uuid
import logging import logging
from .. capabilities import enforce
logger = logging.getLogger("endpoint") logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class MetricsEndpoint: class MetricsEndpoint:
def __init__(self, prometheus_url, endpoint_path, auth): def __init__(self, prometheus_url, endpoint_path, auth, capability):
self.prometheus_url = prometheus_url self.prometheus_url = prometheus_url
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "service" self.capability = capability
async def start(self): async def start(self):
pass pass
@ -35,17 +37,7 @@ class MetricsEndpoint:
logger.debug(f"Processing metrics request: {request.path}") logger.debug(f"Processing metrics request: {request.path}")
try: await enforce(request, self.auth, self.capability)
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
path = request.match_info["path"] path = request.match_info["path"]
url = ( url = (

View file

@ -4,6 +4,9 @@ from aiohttp import web, WSMsgType
import logging import logging
from .. running import Running from .. running import Running
from .. capabilities import (
PUBLIC, AUTHENTICATED, check, auth_failure, access_denied,
)
logger = logging.getLogger("socket") logger = logging.getLogger("socket")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -11,12 +14,25 @@ logger.setLevel(logging.INFO)
class SocketEndpoint: class SocketEndpoint:
def __init__( def __init__(
self, endpoint_path, auth, dispatcher, self, endpoint_path, auth, dispatcher, capability,
in_band_auth=False,
): ):
"""
``in_band_auth=True`` skips the handshake-time auth check.
The WebSocket handshake always succeeds; the dispatcher is
expected to gate itself via the first-frame auth protocol
(see ``Mux``).
This avoids the browser problem where a 401 on the handshake
is treated as permanent and prevents reconnection, and lets
long-lived sockets refresh their credential mid-session by
sending a new auth frame.
"""
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "socket" self.capability = capability
self.in_band_auth = in_band_auth
self.dispatcher = dispatcher self.dispatcher = dispatcher
@ -61,14 +77,28 @@ class SocketEndpoint:
raise raise
async def handle(self, request): async def handle(self, request):
"""Enhanced handler with better cleanup""" """Enhanced handler with better cleanup.
try:
token = request.query['token']
except:
token = ""
if not self.auth.permitted(token, self.operation): Auth: WebSocket clients pass the bearer token on the
return web.HTTPUnauthorized() ``?token=...`` query string; we wrap it into a synthetic
Authorization header before delegating to the standard auth
path so the IAM-backed flow (JWT / API key) applies uniformly.
The first-frame auth protocol described in the IAM spec is
a future upgrade."""
if not self.in_band_auth and self.capability != PUBLIC:
token = request.query.get("token", "")
if not token:
return auth_failure()
try:
identity = await self.auth.authenticate(
_QueryTokenRequest(token)
)
except web.HTTPException as e:
return e
if self.capability != AUTHENTICATED:
if not check(identity.roles, self.capability):
return access_denied()
# 50MB max message size # 50MB max message size
ws = web.WebSocketResponse(max_msg_size=52428800) ws = web.WebSocketResponse(max_msg_size=52428800)
@ -150,3 +180,11 @@ class SocketEndpoint:
web.get(self.path, self.handle), web.get(self.path, self.handle),
]) ])
class _QueryTokenRequest:
"""Minimal shim that exposes headers["Authorization"] to
IamAuth.authenticate(), derived from a query-string token."""
def __init__(self, token):
self.headers = {"Authorization": f"Bearer {token}"}

View file

@ -1,82 +1,64 @@
import asyncio
from aiohttp import web
import logging import logging
from aiohttp import web
from .. capabilities import enforce
logger = logging.getLogger("endpoint") logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class StreamEndpoint: class StreamEndpoint:
def __init__(self, endpoint_path, auth, dispatcher, method="POST"): def __init__(
self, endpoint_path, auth, dispatcher, capability, method="POST",
):
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "service" self.capability = capability
self.method = method self.method = method
self.dispatcher = dispatcher self.dispatcher = dispatcher
async def start(self): async def start(self):
pass pass
def add_routes(self, app): def add_routes(self, app):
if self.method == "POST": if self.method == "POST":
app.add_routes([ app.add_routes([web.post(self.path, self.handle)])
web.post(self.path, self.handle),
])
elif self.method == "GET": elif self.method == "GET":
app.add_routes([ app.add_routes([web.get(self.path, self.handle)])
web.get(self.path, self.handle),
])
else: else:
raise RuntimeError("Bad method" + self.method) raise RuntimeError("Bad method " + self.method)
async def handle(self, request): async def handle(self, request):
logger.debug(f"Processing request: {request.path}") logger.debug(f"Processing request: {request.path}")
try: await enforce(request, self.auth, self.capability)
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
try: try:
data = request.content data = request.content
async def error(err): async def error(err):
return web.HTTPInternalServerError(text = err) return web.HTTPInternalServerError(text=err)
async def ok( async def ok(
status=200, reason="OK", type="application/octet-stream" status=200, reason="OK",
type="application/octet-stream",
): ):
response = web.StreamResponse( response = web.StreamResponse(
status = status, reason = reason, status=status, reason=reason,
headers = {"Content-Type": type} headers={"Content-Type": type},
) )
await response.prepare(request) await response.prepare(request)
return response return response
resp = await self.dispatcher.process( resp = await self.dispatcher.process(data, error, ok, request)
data, error, ok, request
)
return resp return resp
except web.HTTPException:
raise
except Exception as e: except Exception as e:
logging.error(f"Exception: {e}") logger.error(f"Exception: {e}", exc_info=True)
return web.json_response({"error": str(e)})
return web.json_response(
{ "error": str(e) }
)

View file

@ -1,27 +1,27 @@
import asyncio
from aiohttp import web
import logging import logging
from aiohttp import web
from .. capabilities import enforce, enforce_workspace
logger = logging.getLogger("endpoint") logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class VariableEndpoint: class VariableEndpoint:
def __init__(self, endpoint_path, auth, dispatcher): def __init__(self, endpoint_path, auth, dispatcher, capability):
self.path = endpoint_path self.path = endpoint_path
self.auth = auth self.auth = auth
self.operation = "service" self.capability = capability
self.dispatcher = dispatcher self.dispatcher = dispatcher
async def start(self): async def start(self):
pass pass
def add_routes(self, app): def add_routes(self, app):
app.add_routes([ app.add_routes([
web.post(self.path, self.handle), web.post(self.path, self.handle),
]) ])
@ -30,35 +30,25 @@ class VariableEndpoint:
logger.debug(f"Processing request: {request.path}") logger.debug(f"Processing request: {request.path}")
try: identity = await enforce(request, self.auth, self.capability)
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
try: try:
data = await request.json() data = await request.json()
if identity is not None:
enforce_workspace(data, identity)
async def responder(x, fin): async def responder(x, fin):
pass pass
resp = await self.dispatcher.process( resp = await self.dispatcher.process(
data, responder, request.match_info data, responder, request.match_info,
) )
return web.json_response(resp) return web.json_response(resp)
except web.HTTPException:
raise
except Exception as e: except Exception as e:
logging.error(f"Exception: {e}") logger.error(f"Exception: {e}", exc_info=True)
return web.json_response({"error": str(e)})
return web.json_response(
{ "error": str(e) }
)

View file

@ -12,7 +12,7 @@ import os
from trustgraph.base.logging import setup_logging, add_logging_args from trustgraph.base.logging import setup_logging, add_logging_args
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
from . auth import Authenticator from . auth import IamAuth
from . config.receiver import ConfigReceiver from . config.receiver import ConfigReceiver
from . dispatch.manager import DispatcherManager from . dispatch.manager import DispatcherManager
@ -35,7 +35,6 @@ default_prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus:9090")
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
default_timeout = 600 default_timeout = 600
default_port = 8088 default_port = 8088
default_api_token = os.getenv("GATEWAY_SECRET", "")
class Api: class Api:
@ -60,13 +59,14 @@ class Api:
if not self.prometheus_url.endswith("/"): if not self.prometheus_url.endswith("/"):
self.prometheus_url += "/" self.prometheus_url += "/"
api_token = config.get("api_token", default_api_token) # IAM-backed authentication. The legacy GATEWAY_SECRET
# shared-token path has been removed — there is no
# Token not set, or token equal empty string means no auth # "open for everyone" fallback. The gateway cannot
if api_token: # authenticate any request until IAM is reachable.
self.auth = Authenticator(token=api_token) self.auth = IamAuth(
else: backend=self.pubsub_backend,
self.auth = Authenticator(allow_all=True) id=config.get("id", "api-gateway"),
)
self.config_receiver = ConfigReceiver(self.pubsub_backend) self.config_receiver = ConfigReceiver(self.pubsub_backend)
@ -118,6 +118,7 @@ class Api:
config_receiver = self.config_receiver, config_receiver = self.config_receiver,
prefix = "gateway", prefix = "gateway",
queue_overrides = queue_overrides, queue_overrides = queue_overrides,
auth = self.auth,
) )
self.endpoint_manager = EndpointManager( self.endpoint_manager = EndpointManager(
@ -138,6 +139,12 @@ class Api:
client_max_size=256 * 1024 * 1024 client_max_size=256 * 1024 * 1024
) )
# Fetch IAM signing public key before accepting traffic.
# Blocks for a bounded retry window; the gateway starts even
# if IAM is still unreachable (JWT validation will 401 until
# the key is available).
await self.auth.start()
await self.config_receiver.start() await self.config_receiver.start()
for ep in self.endpoints: for ep in self.endpoints:
@ -189,12 +196,6 @@ def run():
help=f'API request timeout in seconds (default: {default_timeout})', help=f'API request timeout in seconds (default: {default_timeout})',
) )
parser.add_argument(
'--api-token',
default=default_api_token,
help=f'Secret API token (default: no auth)',
)
add_logging_args(parser) add_logging_args(parser)
parser.add_argument( parser.add_argument(

View file

@ -172,13 +172,29 @@ def _sign_jwt(kid, private_pem, claims):
class IamService: class IamService:
def __init__(self, host, username, password, keyspace): def __init__(self, host, username, password, keyspace,
bootstrap_mode, bootstrap_token=None):
self.table_store = IamTableStore( self.table_store = IamTableStore(
host, username, password, keyspace, host, username, password, keyspace,
) )
# Active signing key cache: (kid, private_pem, public_pem) or # bootstrap_mode: "token" or "bootstrap". In "token" mode the
# None. Loaded lazily on first use; refreshed whenever a key # service auto-seeds on first start using the provided
# is created. # bootstrap_token and the ``bootstrap`` operation is refused
# thereafter (indistinguishable from an already-bootstrapped
# deployment per the error policy). In "bootstrap" mode the
# ``bootstrap`` operation is live until tables are populated.
if bootstrap_mode not in ("token", "bootstrap"):
raise ValueError(
f"bootstrap_mode must be 'token' or 'bootstrap', "
f"got {bootstrap_mode!r}"
)
if bootstrap_mode == "token" and not bootstrap_token:
raise ValueError(
"bootstrap_mode='token' requires bootstrap_token"
)
self.bootstrap_mode = bootstrap_mode
self.bootstrap_token = bootstrap_token
self._signing_key = None self._signing_key = None
self._signing_key_lock = asyncio.Lock() self._signing_key_lock = asyncio.Lock()
@ -283,21 +299,40 @@ class IamService:
# bootstrap # bootstrap
# ------------------------------------------------------------------ # ------------------------------------------------------------------
async def handle_bootstrap(self, v): async def auto_bootstrap_if_token_mode(self):
"""No-op if any workspace already exists. Otherwise create """Called from the service processor at startup. In
the ``default`` workspace, an ``admin`` user with role ``token`` mode, if tables are empty, seeds the default
``admin``, and an initial API key for that admin. The workspace / admin / signing key using the operator-provided
plaintext API key is returned once in the response.""" bootstrap token. The admin's API key plaintext is *the*
``bootstrap_token`` the operator already knows it, nothing
needs to be returned or logged.
In ``bootstrap`` mode this is a no-op; seeding happens on
explicit ``bootstrap`` operation invocation."""
if self.bootstrap_mode != "token":
return
if await self.table_store.any_workspace_exists(): if await self.table_store.any_workspace_exists():
logger.info( logger.info(
"IAM bootstrap: tables already populated; no-op" "IAM: token mode, tables already populated; skipping "
"auto-bootstrap"
) )
return IamResponse() return
logger.info("IAM: token mode, empty tables; auto-bootstrapping")
await self._seed_tables(self.bootstrap_token)
logger.info(
"IAM: auto-bootstrap complete using operator-provided token"
)
async def _seed_tables(self, api_key_plaintext):
"""Shared seeding logic used by token-mode auto-bootstrap and
bootstrap-mode handle_bootstrap. Creates the default
workspace, admin user, admin API key (using the given
plaintext), and an initial signing key. Returns the admin
user id."""
now = _now_dt() now = _now_dt()
# Workspace.
await self.table_store.put_workspace( await self.table_store.put_workspace(
id=DEFAULT_WORKSPACE, id=DEFAULT_WORKSPACE,
name="Default", name="Default",
@ -305,11 +340,7 @@ class IamService:
created=now, created=now,
) )
# Admin user.
admin_user_id = str(uuid.uuid4()) admin_user_id = str(uuid.uuid4())
# Password is set to a random unusable value; admin logs in
# with the API key below. Password login for this user can be
# enabled later by reset-password.
admin_password = secrets.token_urlsafe(32) admin_password = secrets.token_urlsafe(32)
await self.table_store.put_user( await self.table_store.put_user(
id=admin_user_id, id=admin_user_id,
@ -324,21 +355,18 @@ class IamService:
created=now, created=now,
) )
# Admin API key.
plaintext = _generate_api_key()
key_id = str(uuid.uuid4()) key_id = str(uuid.uuid4())
await self.table_store.put_api_key( await self.table_store.put_api_key(
key_hash=_hash_api_key(plaintext), key_hash=_hash_api_key(api_key_plaintext),
id=key_id, id=key_id,
user_id=admin_user_id, user_id=admin_user_id,
name="bootstrap", name="bootstrap",
prefix=plaintext[:len(API_KEY_PREFIX) + 4], prefix=api_key_plaintext[:len(API_KEY_PREFIX) + 4],
expires=None, expires=None,
created=now, created=now,
last_used=None, last_used=None,
) )
# Initial JWT signing key.
kid, private_pem, public_pem = _generate_signing_keypair() kid, private_pem, public_pem = _generate_signing_keypair()
await self.table_store.put_signing_key( await self.table_store.put_signing_key(
kid=kid, kid=kid,
@ -347,15 +375,28 @@ class IamService:
created=now, created=now,
retired=None, retired=None,
) )
# Populate cache so login calls in this process don't go
# back to Cassandra on first use.
self._signing_key = (kid, private_pem, public_pem) self._signing_key = (kid, private_pem, public_pem)
logger.info( logger.info(
f"IAM bootstrap: created workspace={DEFAULT_WORKSPACE!r}, " f"IAM seeded: workspace={DEFAULT_WORKSPACE!r}, "
f"admin user_id={admin_user_id}, initial API key issued, " f"admin user_id={admin_user_id}, signing key kid={kid}"
f"signing key kid={kid}"
) )
return admin_user_id
async def handle_bootstrap(self, v):
"""Explicit bootstrap op. Only available in ``bootstrap``
mode and only when tables are empty. Every other case is
masked to a generic auth failure the caller cannot
distinguish 'not in bootstrap mode' from 'already
bootstrapped' from 'operation forbidden'."""
if self.bootstrap_mode != "bootstrap":
return _err("auth-failed", "auth failure")
if await self.table_store.any_workspace_exists():
return _err("auth-failed", "auth failure")
plaintext = _generate_api_key()
admin_user_id = await self._seed_tables(plaintext)
return IamResponse( return IamResponse(
bootstrap_admin_user_id=admin_user_id, bootstrap_admin_user_id=admin_user_id,

View file

@ -39,6 +39,32 @@ class Processor(AsyncProcessor):
"iam_response_queue", default_iam_response_queue, "iam_response_queue", default_iam_response_queue,
) )
bootstrap_mode = params.get("bootstrap_mode")
bootstrap_token = params.get("bootstrap_token")
if bootstrap_mode not in ("token", "bootstrap"):
raise RuntimeError(
"iam-svc: --bootstrap-mode is required. Set to 'token' "
"(with --bootstrap-token) for production, or 'bootstrap' "
"to enable the explicit bootstrap operation over the "
"pub/sub bus (dev / quick-start only, not safe under "
"public exposure). Refusing to start."
)
if bootstrap_mode == "token" and not bootstrap_token:
raise RuntimeError(
"iam-svc: --bootstrap-mode=token requires "
"--bootstrap-token. Refusing to start."
)
if bootstrap_mode == "bootstrap" and bootstrap_token:
raise RuntimeError(
"iam-svc: --bootstrap-token is not accepted when "
"--bootstrap-mode=bootstrap. Ambiguous intent. "
"Refusing to start."
)
self.bootstrap_mode = bootstrap_mode
self.bootstrap_token = bootstrap_token
cassandra_host = params.get("cassandra_host") cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username") cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password") cassandra_password = params.get("cassandra_password")
@ -96,12 +122,19 @@ class Processor(AsyncProcessor):
username=self.cassandra_username, username=self.cassandra_username,
password=self.cassandra_password, password=self.cassandra_password,
keyspace=keyspace, keyspace=keyspace,
bootstrap_mode=self.bootstrap_mode,
bootstrap_token=self.bootstrap_token,
) )
logger.info("IAM service initialised") logger.info(
f"IAM service initialised (bootstrap-mode={self.bootstrap_mode})"
)
async def start(self): async def start(self):
await self.pubsub.ensure_topic(self.iam_request_topic) await self.pubsub.ensure_topic(self.iam_request_topic)
# Token-mode auto-bootstrap runs before we accept requests so
# the first inbound call always sees a populated table.
await self.iam.auto_bootstrap_if_token_mode()
await self.iam_request_consumer.start() await self.iam_request_consumer.start()
async def on_iam_request(self, msg, consumer, flow): async def on_iam_request(self, msg, consumer, flow):
@ -144,6 +177,31 @@ class Processor(AsyncProcessor):
default=default_iam_response_queue, default=default_iam_response_queue,
help=f"IAM response queue (default: {default_iam_response_queue})", help=f"IAM response queue (default: {default_iam_response_queue})",
) )
parser.add_argument(
"--bootstrap-mode",
default=None,
choices=["token", "bootstrap"],
help=(
"IAM bootstrap mode (required). "
"'token' = operator supplies the initial admin API "
"key via --bootstrap-token; auto-seeds on first start, "
"bootstrap operation refused. "
"'bootstrap' = bootstrap operation is live over the "
"bus until tables are populated; a token is generated "
"and returned by tg-bootstrap-iam. Unsafe to run "
"'bootstrap' mode with public exposure."
),
)
parser.add_argument(
"--bootstrap-token",
default=None,
help=(
"Initial admin API key plaintext, required when "
"--bootstrap-mode=token. Treat as a one-time "
"credential: the operator should rotate to a new key "
"and revoke this one after first use."
),
)
add_cassandra_args(parser) add_cassandra_args(parser)