diff --git a/trustgraph-base/trustgraph/base/iam_client.py b/trustgraph-base/trustgraph/base/iam_client.py new file mode 100644 index 00000000..887b37bc --- /dev/null +++ b/trustgraph-base/trustgraph/base/iam_client.py @@ -0,0 +1,124 @@ + +from . request_response_spec import RequestResponse, RequestResponseSpec +from .. schema import ( + IamRequest, IamResponse, + UserInput, WorkspaceInput, ApiKeyInput, +) + +IAM_TIMEOUT = 10 + + +class IamClient(RequestResponse): + """Client for the IAM service request/response pub/sub protocol. + + Mirrors ``ConfigClient``: a thin wrapper around ``RequestResponse`` + that knows the IAM request / response schemas. Only the subset of + operations actually implemented by the server today has helper + methods here; callers that need an unimplemented operation can + build ``IamRequest`` and call ``request()`` directly. + """ + + async def _request(self, timeout=IAM_TIMEOUT, **kwargs): + resp = await self.request( + IamRequest(**kwargs), + timeout=timeout, + ) + if resp.error: + raise RuntimeError( + f"{resp.error.type}: {resp.error.message}" + ) + return resp + + async def bootstrap(self, timeout=IAM_TIMEOUT): + """Initial-run IAM self-seed. Returns a tuple of + ``(admin_user_id, admin_api_key_plaintext)``. Both are empty + strings on repeat calls — the operation is a no-op once the + IAM tables are populated.""" + resp = await self._request( + operation="bootstrap", timeout=timeout, + ) + return resp.bootstrap_admin_user_id, resp.bootstrap_admin_api_key + + async def resolve_api_key(self, api_key, timeout=IAM_TIMEOUT): + """Resolve a plaintext API key to its identity triple. + + Returns ``(user_id, workspace, roles)`` or raises + ``RuntimeError`` with error type ``auth-failed`` if the key is + unknown / expired / revoked.""" + resp = await self._request( + operation="resolve-api-key", + api_key=api_key, + timeout=timeout, + ) + return ( + resp.resolved_user_id, + resp.resolved_workspace, + list(resp.resolved_roles), + ) + + async def create_user(self, workspace, user, actor="", + timeout=IAM_TIMEOUT): + """Create a user. ``user`` is a ``UserInput``.""" + resp = await self._request( + operation="create-user", + workspace=workspace, + actor=actor, + user=user, + timeout=timeout, + ) + return resp.user + + async def list_users(self, workspace, actor="", timeout=IAM_TIMEOUT): + resp = await self._request( + operation="list-users", + workspace=workspace, + actor=actor, + timeout=timeout, + ) + return list(resp.users) + + async def create_api_key(self, workspace, key, actor="", + timeout=IAM_TIMEOUT): + """Create an API key. ``key`` is an ``ApiKeyInput``. Returns + ``(plaintext, record)`` — plaintext is returned once and the + caller is responsible for surfacing it to the operator.""" + resp = await self._request( + operation="create-api-key", + workspace=workspace, + actor=actor, + key=key, + timeout=timeout, + ) + return resp.api_key_plaintext, resp.api_key + + async def list_api_keys(self, workspace, user_id, actor="", + timeout=IAM_TIMEOUT): + resp = await self._request( + operation="list-api-keys", + workspace=workspace, + actor=actor, + user_id=user_id, + timeout=timeout, + ) + return list(resp.api_keys) + + async def revoke_api_key(self, workspace, key_id, actor="", + timeout=IAM_TIMEOUT): + await self._request( + operation="revoke-api-key", + workspace=workspace, + actor=actor, + key_id=key_id, + timeout=timeout, + ) + + +class IamClientSpec(RequestResponseSpec): + def __init__(self, request_name, response_name): + super().__init__( + request_name=request_name, + request_schema=IamRequest, + response_name=response_name, + response_schema=IamResponse, + impl=IamClient, + ) diff --git a/trustgraph-base/trustgraph/messaging/__init__.py b/trustgraph-base/trustgraph/messaging/__init__.py index 30f5061c..9fcfa6f7 100644 --- a/trustgraph-base/trustgraph/messaging/__init__.py +++ b/trustgraph-base/trustgraph/messaging/__init__.py @@ -15,6 +15,7 @@ from .translators.library import LibraryRequestTranslator, LibraryResponseTransl from .translators.document_loading import DocumentTranslator, TextDocumentTranslator from .translators.config import ConfigRequestTranslator, ConfigResponseTranslator from .translators.flow import FlowRequestTranslator, FlowResponseTranslator +from .translators.iam import IamRequestTranslator, IamResponseTranslator from .translators.prompt import PromptRequestTranslator, PromptResponseTranslator from .translators.tool import ToolRequestTranslator, ToolResponseTranslator from .translators.embeddings_query import ( @@ -85,11 +86,17 @@ TranslatorRegistry.register_service( ) TranslatorRegistry.register_service( - "flow", - FlowRequestTranslator(), + "flow", + FlowRequestTranslator(), FlowResponseTranslator() ) +TranslatorRegistry.register_service( + "iam", + IamRequestTranslator(), + IamResponseTranslator() +) + TranslatorRegistry.register_service( "prompt", PromptRequestTranslator(), diff --git a/trustgraph-base/trustgraph/messaging/translators/iam.py b/trustgraph-base/trustgraph/messaging/translators/iam.py new file mode 100644 index 00000000..4a717bba --- /dev/null +++ b/trustgraph-base/trustgraph/messaging/translators/iam.py @@ -0,0 +1,194 @@ +from typing import Dict, Any, Tuple + +from ...schema import IamRequest, IamResponse +from ...schema import ( + UserInput, UserRecord, + WorkspaceInput, WorkspaceRecord, + ApiKeyInput, ApiKeyRecord, +) +from .base import MessageTranslator + + +def _user_input_from_dict(d): + if d is None: + return None + return UserInput( + username=d.get("username", ""), + name=d.get("name", ""), + email=d.get("email", ""), + password=d.get("password", ""), + roles=list(d.get("roles", [])), + enabled=d.get("enabled", True), + must_change_password=d.get("must_change_password", False), + ) + + +def _workspace_input_from_dict(d): + if d is None: + return None + return WorkspaceInput( + id=d.get("id", ""), + name=d.get("name", ""), + enabled=d.get("enabled", True), + ) + + +def _api_key_input_from_dict(d): + if d is None: + return None + return ApiKeyInput( + user_id=d.get("user_id", ""), + name=d.get("name", ""), + expires=d.get("expires", ""), + ) + + +def _user_record_to_dict(r): + if r is None: + return None + return { + "id": r.id, + "workspace": r.workspace, + "username": r.username, + "name": r.name, + "email": r.email, + "roles": list(r.roles), + "enabled": r.enabled, + "must_change_password": r.must_change_password, + "created": r.created, + } + + +def _workspace_record_to_dict(r): + if r is None: + return None + return { + "id": r.id, + "name": r.name, + "enabled": r.enabled, + "created": r.created, + } + + +def _api_key_record_to_dict(r): + if r is None: + return None + return { + "id": r.id, + "user_id": r.user_id, + "name": r.name, + "prefix": r.prefix, + "expires": r.expires, + "created": r.created, + "last_used": r.last_used, + } + + +class IamRequestTranslator(MessageTranslator): + + def decode(self, data: Dict[str, Any]) -> IamRequest: + return IamRequest( + operation=data.get("operation", ""), + workspace=data.get("workspace", ""), + actor=data.get("actor", ""), + user_id=data.get("user_id", ""), + username=data.get("username", ""), + key_id=data.get("key_id", ""), + api_key=data.get("api_key", ""), + password=data.get("password", ""), + new_password=data.get("new_password", ""), + user=_user_input_from_dict(data.get("user")), + workspace_record=_workspace_input_from_dict( + data.get("workspace_record") + ), + key=_api_key_input_from_dict(data.get("key")), + ) + + def encode(self, obj: IamRequest) -> Dict[str, Any]: + result = {"operation": obj.operation} + for fname in ( + "workspace", "actor", "user_id", "username", "key_id", + "api_key", "password", "new_password", + ): + v = getattr(obj, fname, "") + if v: + result[fname] = v + if obj.user is not None: + result["user"] = { + "username": obj.user.username, + "name": obj.user.name, + "email": obj.user.email, + "password": obj.user.password, + "roles": list(obj.user.roles), + "enabled": obj.user.enabled, + "must_change_password": obj.user.must_change_password, + } + if obj.workspace_record is not None: + result["workspace_record"] = { + "id": obj.workspace_record.id, + "name": obj.workspace_record.name, + "enabled": obj.workspace_record.enabled, + } + if obj.key is not None: + result["key"] = { + "user_id": obj.key.user_id, + "name": obj.key.name, + "expires": obj.key.expires, + } + return result + + +class IamResponseTranslator(MessageTranslator): + + def decode(self, data: Dict[str, Any]) -> IamResponse: + raise NotImplementedError( + "IamResponse is a server-produced message; no HTTP→schema " + "path is needed" + ) + + def encode(self, obj: IamResponse) -> Dict[str, Any]: + result: Dict[str, Any] = {} + + if obj.user is not None: + result["user"] = _user_record_to_dict(obj.user) + if obj.users: + result["users"] = [_user_record_to_dict(u) for u in obj.users] + if obj.workspace is not None: + result["workspace"] = _workspace_record_to_dict(obj.workspace) + if obj.workspaces: + result["workspaces"] = [ + _workspace_record_to_dict(w) for w in obj.workspaces + ] + if obj.api_key_plaintext: + result["api_key_plaintext"] = obj.api_key_plaintext + if obj.api_key is not None: + result["api_key"] = _api_key_record_to_dict(obj.api_key) + if obj.api_keys: + result["api_keys"] = [ + _api_key_record_to_dict(k) for k in obj.api_keys + ] + if obj.jwt: + result["jwt"] = obj.jwt + if obj.jwt_expires: + result["jwt_expires"] = obj.jwt_expires + if obj.signing_key_public: + result["signing_key_public"] = obj.signing_key_public + if obj.resolved_user_id: + result["resolved_user_id"] = obj.resolved_user_id + if obj.resolved_workspace: + result["resolved_workspace"] = obj.resolved_workspace + if obj.resolved_roles: + result["resolved_roles"] = list(obj.resolved_roles) + if obj.temporary_password: + result["temporary_password"] = obj.temporary_password + if obj.bootstrap_admin_user_id: + result["bootstrap_admin_user_id"] = obj.bootstrap_admin_user_id + if obj.bootstrap_admin_api_key: + result["bootstrap_admin_api_key"] = obj.bootstrap_admin_api_key + + return result + + def encode_with_completion( + self, obj: IamResponse, + ) -> Tuple[Dict[str, Any], bool]: + return self.encode(obj), True diff --git a/trustgraph-base/trustgraph/schema/services/__init__.py b/trustgraph-base/trustgraph/schema/services/__init__.py index 550b7d12..2a214201 100644 --- a/trustgraph-base/trustgraph/schema/services/__init__.py +++ b/trustgraph-base/trustgraph/schema/services/__init__.py @@ -5,6 +5,7 @@ from .agent import * from .flow import * from .prompt import * from .config import * +from .iam import * from .library import * from .lookup import * from .nlp_query import * diff --git a/trustgraph-base/trustgraph/schema/services/iam.py b/trustgraph-base/trustgraph/schema/services/iam.py new file mode 100644 index 00000000..1e3ab1ab --- /dev/null +++ b/trustgraph-base/trustgraph/schema/services/iam.py @@ -0,0 +1,142 @@ + +from dataclasses import dataclass, field + +from ..core.topic import queue +from ..core.primitives import Error + +############################################################################ + +# IAM service — see docs/tech-specs/iam-protocol.md for the full protocol. +# +# Transport: request/response pub/sub, correlated by the `id` message +# property. Caller is the API gateway only; the IAM service trusts +# the bus per the enforcement-boundary policy (no per-request auth +# against the caller). + + +@dataclass +class UserInput: + username: str = "" + name: str = "" + email: str = "" + # Only populated on create-user; never on update-user. + password: str = "" + roles: list[str] = field(default_factory=list) + enabled: bool = True + must_change_password: bool = False + + +@dataclass +class UserRecord: + id: str = "" + workspace: str = "" + username: str = "" + name: str = "" + email: str = "" + roles: list[str] = field(default_factory=list) + enabled: bool = True + must_change_password: bool = False + created: str = "" + + +@dataclass +class WorkspaceInput: + id: str = "" + name: str = "" + enabled: bool = True + + +@dataclass +class WorkspaceRecord: + id: str = "" + name: str = "" + enabled: bool = True + created: str = "" + + +@dataclass +class ApiKeyInput: + user_id: str = "" + name: str = "" + expires: str = "" + + +@dataclass +class ApiKeyRecord: + id: str = "" + user_id: str = "" + name: str = "" + # First 4 chars of the plaintext token, for operator identification + # in list-api-keys. Never enough to reconstruct the key. + prefix: str = "" + expires: str = "" + created: str = "" + last_used: str = "" + + +@dataclass +class IamRequest: + operation: str = "" + + # Workspace scope. Required on workspace-scoped operations; + # omitted for system-level ops (workspace CRUD, signing-key + # ops, bootstrap, resolve-api-key, login). + workspace: str = "" + + # Acting user id for audit. Empty for internal-origin and for + # operations that resolve an identity (login, resolve-api-key). + actor: str = "" + + user_id: str = "" + username: str = "" + key_id: str = "" + api_key: str = "" + + password: str = "" + new_password: str = "" + + user: UserInput | None = None + workspace_record: WorkspaceInput | None = None + key: ApiKeyInput | None = None + + +@dataclass +class IamResponse: + user: UserRecord | None = None + users: list[UserRecord] = field(default_factory=list) + + workspace: WorkspaceRecord | None = None + workspaces: list[WorkspaceRecord] = field(default_factory=list) + + # create-api-key returns the plaintext once; never populated + # on any other operation. + api_key_plaintext: str = "" + api_key: ApiKeyRecord | None = None + api_keys: list[ApiKeyRecord] = field(default_factory=list) + + # login, rotate-signing-key + jwt: str = "" + jwt_expires: str = "" + + # get-signing-key-public + signing_key_public: str = "" + + # resolve-api-key + resolved_user_id: str = "" + resolved_workspace: str = "" + resolved_roles: list[str] = field(default_factory=list) + + # reset-password + temporary_password: str = "" + + # bootstrap + bootstrap_admin_user_id: str = "" + bootstrap_admin_api_key: str = "" + + error: Error | None = None + + +iam_request_queue = queue('iam', cls='request') +iam_response_queue = queue('iam', cls='response') + +############################################################################ diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index cc7dac63..d8c690b5 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -63,6 +63,7 @@ chunker-token = "trustgraph.chunking.token:run" bootstrap = "trustgraph.bootstrap.bootstrapper:run" config-svc = "trustgraph.config.service:run" flow-svc = "trustgraph.flow.service:run" +iam-svc = "trustgraph.iam.service:run" doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run" doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run" doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run" diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/iam.py b/trustgraph-flow/trustgraph/gateway/dispatch/iam.py new file mode 100644 index 00000000..386233f5 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/iam.py @@ -0,0 +1,40 @@ + +from ... schema import IamRequest, IamResponse +from ... schema import iam_request_queue, iam_response_queue +from ... messaging import TranslatorRegistry + +from . requestor import ServiceRequestor + + +class IamRequestor(ServiceRequestor): + def __init__(self, backend, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = iam_request_queue + if response_queue is None: + response_queue = iam_response_queue + + super().__init__( + backend=backend, + consumer_name=consumer, + subscription=subscriber, + request_queue=request_queue, + response_queue=response_queue, + request_schema=IamRequest, + response_schema=IamResponse, + timeout=timeout, + ) + + self.request_translator = ( + TranslatorRegistry.get_request_translator("iam") + ) + self.response_translator = ( + TranslatorRegistry.get_response_translator("iam") + ) + + def to_request(self, body): + return self.request_translator.decode(body) + + def from_response(self, message): + return self.response_translator.encode_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index b238bb5b..95a0ab66 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) from . config import ConfigRequestor from . flow import FlowRequestor +from . iam import IamRequestor from . librarian import LibrarianRequestor from . knowledge import KnowledgeRequestor from . collection_management import CollectionManagementRequestor @@ -72,6 +73,7 @@ request_response_dispatchers = { global_dispatchers = { "config": ConfigRequestor, "flow": FlowRequestor, + "iam": IamRequestor, "librarian": LibrarianRequestor, "knowledge": KnowledgeRequestor, "collection-management": CollectionManagementRequestor, diff --git a/trustgraph-flow/trustgraph/iam/__init__.py b/trustgraph-flow/trustgraph/iam/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph-flow/trustgraph/iam/service/__init__.py b/trustgraph-flow/trustgraph/iam/service/__init__.py new file mode 100644 index 00000000..c8fd7a22 --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/service/__init__.py @@ -0,0 +1,4 @@ + +from . service import run + +__all__ = ["run"] diff --git a/trustgraph-flow/trustgraph/iam/service/__main__.py b/trustgraph-flow/trustgraph/iam/service/__main__.py new file mode 100644 index 00000000..a731dd63 --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/service/__main__.py @@ -0,0 +1,4 @@ + +from . service import run + +run() diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py new file mode 100644 index 00000000..45bd01f6 --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -0,0 +1,474 @@ +""" +IAM business logic. Handles ``IamRequest`` messages and builds +``IamResponse`` messages. Does not concern itself with transport. + +See docs/tech-specs/iam-protocol.md for the wire-level contract and +docs/tech-specs/iam.md for the surrounding architecture. +""" + +import base64 +import datetime +import hashlib +import logging +import os +import secrets +import uuid + +from trustgraph.schema import ( + IamResponse, Error, + UserRecord, WorkspaceRecord, ApiKeyRecord, +) + +from ... tables.iam import IamTableStore + +logger = logging.getLogger(__name__) + + +DEFAULT_WORKSPACE = "default" +BOOTSTRAP_ADMIN_USERNAME = "admin" +BOOTSTRAP_ADMIN_NAME = "Administrator" + +PBKDF2_ITERATIONS = 600_000 +API_KEY_PREFIX = "tg_" +API_KEY_RANDOM_BYTES = 24 + + +def _now_iso(): + return datetime.datetime.now(datetime.timezone.utc).isoformat() + + +def _now_dt(): + return datetime.datetime.now(datetime.timezone.utc) + + +def _iso(dt): + if dt is None: + return "" + if isinstance(dt, str): + return dt + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) + return dt.isoformat() + + +def _hash_password(password): + """Return an encoded PBKDF2-SHA-256 hash of ``password``. + + Format: ``pbkdf2-sha256$$$``. Stored + verbatim in the password_hash column so the algorithm and cost + can be evolved later (new rows get a new prefix; old rows are + verified with their own parameters). + """ + salt = os.urandom(16) + dk = hashlib.pbkdf2_hmac( + "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS, + ) + return ( + f"pbkdf2-sha256${PBKDF2_ITERATIONS}" + f"${base64.b64encode(salt).decode('ascii')}" + f"${base64.b64encode(dk).decode('ascii')}" + ) + + +def _verify_password(password, encoded): + """Constant-time verify ``password`` against an encoded hash.""" + try: + algo, iters, b64_salt, b64_hash = encoded.split("$") + except ValueError: + return False + if algo != "pbkdf2-sha256": + return False + try: + iters = int(iters) + salt = base64.b64decode(b64_salt) + target = base64.b64decode(b64_hash) + except Exception: + return False + dk = hashlib.pbkdf2_hmac( + "sha256", password.encode("utf-8"), salt, iters, + ) + return secrets.compare_digest(dk, target) + + +def _generate_api_key(): + """Return a fresh API-key plaintext of the form ``tg_``.""" + return API_KEY_PREFIX + secrets.token_urlsafe(API_KEY_RANDOM_BYTES) + + +def _hash_api_key(plaintext): + """SHA-256 hex digest of an API key plaintext. Used as the + primary key in ``iam_api_keys`` so ``resolve-api-key`` is O(1).""" + return hashlib.sha256(plaintext.encode("utf-8")).hexdigest() + + +def _err(type, message): + return IamResponse(error=Error(type=type, message=message)) + + +def _parse_expires(s): + if not s: + return None + try: + return datetime.datetime.fromisoformat(s) + except Exception: + return None + + +class IamService: + + def __init__(self, host, username, password, keyspace): + self.table_store = IamTableStore( + host, username, password, keyspace, + ) + + # ------------------------------------------------------------------ + # Dispatch + # ------------------------------------------------------------------ + + async def handle(self, v): + op = v.operation + + try: + if op == "bootstrap": + return await self.handle_bootstrap(v) + if op == "resolve-api-key": + return await self.handle_resolve_api_key(v) + if op == "create-user": + return await self.handle_create_user(v) + if op == "list-users": + return await self.handle_list_users(v) + if op == "create-api-key": + return await self.handle_create_api_key(v) + if op == "list-api-keys": + return await self.handle_list_api_keys(v) + if op == "revoke-api-key": + return await self.handle_revoke_api_key(v) + + return _err( + "invalid-argument", + f"unknown or not-yet-implemented operation: {op!r}", + ) + + except Exception as e: + logger.error( + f"IAM {op} failed: {type(e).__name__}: {e}", + exc_info=True, + ) + return _err("internal-error", str(e)) + + # ------------------------------------------------------------------ + # Record conversion + # ------------------------------------------------------------------ + + def _row_to_user_record(self, row): + ( + id, workspace, username, name, email, _password_hash, + roles, enabled, must_change_password, created, + ) = row + return UserRecord( + id=id or "", + workspace=workspace or "", + username=username or "", + name=name or "", + email=email or "", + roles=sorted(roles) if roles else [], + enabled=bool(enabled), + must_change_password=bool(must_change_password), + created=_iso(created), + ) + + def _row_to_api_key_record(self, row): + ( + _key_hash, id, user_id, name, prefix, expires, + created, last_used, + ) = row + return ApiKeyRecord( + id=id or "", + user_id=user_id or "", + name=name or "", + prefix=prefix or "", + expires=_iso(expires), + created=_iso(created), + last_used=_iso(last_used), + ) + + # ------------------------------------------------------------------ + # bootstrap + # ------------------------------------------------------------------ + + async def handle_bootstrap(self, v): + """No-op if any workspace already exists. Otherwise create + the ``default`` workspace, an ``admin`` user with role + ``admin``, and an initial API key for that admin. The + plaintext API key is returned once in the response.""" + + if await self.table_store.any_workspace_exists(): + logger.info( + "IAM bootstrap: tables already populated; no-op" + ) + return IamResponse() + + now = _now_dt() + + # Workspace. + await self.table_store.put_workspace( + id=DEFAULT_WORKSPACE, + name="Default", + enabled=True, + created=now, + ) + + # Admin user. + admin_user_id = str(uuid.uuid4()) + # Password is set to a random unusable value; admin logs in + # with the API key below. Password login for this user can be + # enabled later by reset-password. + admin_password = secrets.token_urlsafe(32) + await self.table_store.put_user( + id=admin_user_id, + workspace=DEFAULT_WORKSPACE, + username=BOOTSTRAP_ADMIN_USERNAME, + name=BOOTSTRAP_ADMIN_NAME, + email="", + password_hash=_hash_password(admin_password), + roles=["admin"], + enabled=True, + must_change_password=True, + created=now, + ) + + # Admin API key. + plaintext = _generate_api_key() + key_id = str(uuid.uuid4()) + await self.table_store.put_api_key( + key_hash=_hash_api_key(plaintext), + id=key_id, + user_id=admin_user_id, + name="bootstrap", + prefix=plaintext[:len(API_KEY_PREFIX) + 4], + expires=None, + created=now, + last_used=None, + ) + + logger.info( + f"IAM bootstrap: created workspace={DEFAULT_WORKSPACE!r}, " + f"admin user_id={admin_user_id}, initial API key issued" + ) + + return IamResponse( + bootstrap_admin_user_id=admin_user_id, + bootstrap_admin_api_key=plaintext, + ) + + # ------------------------------------------------------------------ + # resolve-api-key + # ------------------------------------------------------------------ + + async def handle_resolve_api_key(self, v): + if not v.api_key: + return _err("auth-failed", "no api key") + + row = await self.table_store.get_api_key_by_hash( + _hash_api_key(v.api_key), + ) + if row is None: + return _err("auth-failed", "unknown api key") + + ( + _key_hash, _id, user_id, _name, _prefix, expires, + _created, _last_used, + ) = row + + if expires is not None: + exp_dt = expires + if isinstance(exp_dt, str): + exp_dt = datetime.datetime.fromisoformat(exp_dt) + if exp_dt.tzinfo is None: + exp_dt = exp_dt.replace(tzinfo=datetime.timezone.utc) + if exp_dt < _now_dt(): + return _err("auth-failed", "api key expired") + + user_row = await self.table_store.get_user(user_id) + if user_row is None: + return _err("auth-failed", "owning user missing") + user = self._row_to_user_record(user_row) + if not user.enabled: + return _err("auth-failed", "owning user disabled") + + # Workspace-disabled check. + ws_row = await self.table_store.get_workspace(user.workspace) + if ws_row is None or not ws_row[2]: + return _err("auth-failed", "owning workspace disabled") + + return IamResponse( + resolved_user_id=user.id, + resolved_workspace=user.workspace, + resolved_roles=list(user.roles), + ) + + # ------------------------------------------------------------------ + # create-user + # ------------------------------------------------------------------ + + async def handle_create_user(self, v): + if not v.workspace: + return _err( + "invalid-argument", "workspace required for create-user", + ) + if v.user is None: + return _err( + "invalid-argument", "user field required for create-user", + ) + if not v.user.username: + return _err("invalid-argument", "user.username required") + if not v.user.password: + return _err("invalid-argument", "user.password required") + + # Workspace must exist and be enabled. + ws = await self.table_store.get_workspace(v.workspace) + if ws is None or not ws[2]: + return _err("not-found", "workspace not found or disabled") + + # Uniqueness on username within workspace. + existing = await self.table_store.get_user_id_by_username( + v.workspace, v.user.username, + ) + if existing: + return _err("duplicate", "username already exists") + + user_id = str(uuid.uuid4()) + now = _now_dt() + + await self.table_store.put_user( + id=user_id, + workspace=v.workspace, + username=v.user.username, + name=v.user.name or v.user.username, + email=v.user.email or "", + password_hash=_hash_password(v.user.password), + roles=list(v.user.roles or []), + enabled=v.user.enabled, + must_change_password=v.user.must_change_password, + created=now, + ) + + row = await self.table_store.get_user(user_id) + return IamResponse(user=self._row_to_user_record(row)) + + # ------------------------------------------------------------------ + # list-users + # ------------------------------------------------------------------ + + async def handle_list_users(self, v): + if not v.workspace: + return _err( + "invalid-argument", "workspace required for list-users", + ) + + rows = await self.table_store.list_users_by_workspace(v.workspace) + return IamResponse( + users=[self._row_to_user_record(r) for r in rows], + ) + + # ------------------------------------------------------------------ + # create-api-key + # ------------------------------------------------------------------ + + async def handle_create_api_key(self, v): + if not v.workspace: + return _err( + "invalid-argument", "workspace required for create-api-key", + ) + if v.key is None or not v.key.user_id: + return _err("invalid-argument", "key.user_id required") + if not v.key.name: + return _err("invalid-argument", "key.name required") + + # Target user must exist and belong to the caller's workspace. + user_row = await self.table_store.get_user(v.key.user_id) + if user_row is None: + return _err("not-found", "user not found") + if user_row[1] != v.workspace: + return _err( + "operation-not-permitted", + "target user is in a different workspace", + ) + + plaintext = _generate_api_key() + key_id = str(uuid.uuid4()) + now = _now_dt() + expires_dt = _parse_expires(v.key.expires) + + await self.table_store.put_api_key( + key_hash=_hash_api_key(plaintext), + id=key_id, + user_id=v.key.user_id, + name=v.key.name, + prefix=plaintext[:len(API_KEY_PREFIX) + 4], + expires=expires_dt, + created=now, + last_used=None, + ) + + row = await self.table_store.get_api_key_by_hash( + _hash_api_key(plaintext), + ) + return IamResponse( + api_key_plaintext=plaintext, + api_key=self._row_to_api_key_record(row), + ) + + # ------------------------------------------------------------------ + # list-api-keys + # ------------------------------------------------------------------ + + async def handle_list_api_keys(self, v): + if not v.workspace: + return _err( + "invalid-argument", + "workspace required for list-api-keys", + ) + if not v.user_id: + return _err( + "invalid-argument", "user_id required for list-api-keys", + ) + + # Workspace-scope check: user must live in this workspace. + user_row = await self.table_store.get_user(v.user_id) + if user_row is None or user_row[1] != v.workspace: + return _err("not-found", "user not found in workspace") + + rows = await self.table_store.list_api_keys_by_user(v.user_id) + return IamResponse( + api_keys=[self._row_to_api_key_record(r) for r in rows], + ) + + # ------------------------------------------------------------------ + # revoke-api-key + # ------------------------------------------------------------------ + + async def handle_revoke_api_key(self, v): + if not v.workspace: + return _err( + "invalid-argument", + "workspace required for revoke-api-key", + ) + if not v.key_id: + return _err("invalid-argument", "key_id required") + + row = await self.table_store.get_api_key_by_id(v.key_id) + if row is None: + return _err("not-found", "api key not found") + + key_hash, _id, user_id, _name, _prefix, _expires, _c, _lu = row + # Workspace-scope check via the owning user. + user_row = await self.table_store.get_user(user_id) + if user_row is None or user_row[1] != v.workspace: + return _err( + "operation-not-permitted", + "key belongs to a different workspace", + ) + + await self.table_store.delete_api_key(key_hash) + return IamResponse() diff --git a/trustgraph-flow/trustgraph/iam/service/service.py b/trustgraph-flow/trustgraph/iam/service/service.py new file mode 100644 index 00000000..61bc1fd8 --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/service/service.py @@ -0,0 +1,152 @@ +""" +IAM service processor. Terminates the IAM request queue and forwards +each request to the IamService business logic, then returns the +response on the IAM response queue. + +Shape mirrors trustgraph.config.service. +""" + +import logging + +from trustgraph.schema import Error +from trustgraph.schema import IamRequest, IamResponse +from trustgraph.schema import iam_request_queue, iam_response_queue + +from trustgraph.base import AsyncProcessor, Consumer, Producer +from trustgraph.base import ConsumerMetrics, ProducerMetrics +from trustgraph.base.cassandra_config import ( + add_cassandra_args, resolve_cassandra_config, +) + +from . iam import IamService + +logger = logging.getLogger(__name__) + +default_ident = "iam-svc" + +default_iam_request_queue = iam_request_queue +default_iam_response_queue = iam_response_queue + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + iam_req_q = params.get( + "iam_request_queue", default_iam_request_queue, + ) + iam_resp_q = params.get( + "iam_response_queue", default_iam_response_queue, + ) + + cassandra_host = params.get("cassandra_host") + cassandra_username = params.get("cassandra_username") + cassandra_password = params.get("cassandra_password") + + hosts, username, password, keyspace = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password, + default_keyspace="iam", + ) + + self.cassandra_host = hosts + self.cassandra_username = username + self.cassandra_password = password + + super().__init__( + **params | { + "iam_request_schema": IamRequest.__name__, + "iam_response_schema": IamResponse.__name__, + "cassandra_host": self.cassandra_host, + "cassandra_username": self.cassandra_username, + "cassandra_password": self.cassandra_password, + } + ) + + iam_request_metrics = ConsumerMetrics( + processor=self.id, flow=None, name="iam-request", + ) + iam_response_metrics = ProducerMetrics( + processor=self.id, flow=None, name="iam-response", + ) + + self.iam_request_topic = iam_req_q + + self.iam_request_consumer = Consumer( + taskgroup=self.taskgroup, + backend=self.pubsub, + flow=None, + topic=iam_req_q, + subscriber=self.id, + schema=IamRequest, + handler=self.on_iam_request, + metrics=iam_request_metrics, + ) + + self.iam_response_producer = Producer( + backend=self.pubsub, + topic=iam_resp_q, + schema=IamResponse, + metrics=iam_response_metrics, + ) + + self.iam = IamService( + host=self.cassandra_host, + username=self.cassandra_username, + password=self.cassandra_password, + keyspace=keyspace, + ) + + logger.info("IAM service initialised") + + async def start(self): + await self.pubsub.ensure_topic(self.iam_request_topic) + await self.iam_request_consumer.start() + + async def on_iam_request(self, msg, consumer, flow): + + id = None + try: + v = msg.value() + id = msg.properties()["id"] + logger.debug( + f"Handling IAM request {id} op={v.operation!r}" + ) + resp = await self.iam.handle(v) + await self.iam_response_producer.send( + resp, properties={"id": id}, + ) + except Exception as e: + logger.error( + f"IAM request failed: {type(e).__name__}: {e}", + exc_info=True, + ) + resp = IamResponse( + error=Error(type="internal-error", message=str(e)), + ) + if id is not None: + await self.iam_response_producer.send( + resp, properties={"id": id}, + ) + + @staticmethod + def add_args(parser): + AsyncProcessor.add_args(parser) + + parser.add_argument( + "--iam-request-queue", + default=default_iam_request_queue, + help=f"IAM request queue (default: {default_iam_request_queue})", + ) + parser.add_argument( + "--iam-response-queue", + default=default_iam_response_queue, + help=f"IAM response queue (default: {default_iam_response_queue})", + ) + + add_cassandra_args(parser) + + +def run(): + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py new file mode 100644 index 00000000..5c9f578a --- /dev/null +++ b/trustgraph-flow/trustgraph/tables/iam.py @@ -0,0 +1,339 @@ +""" +IAM Cassandra table store. + +Tables: + - iam_workspaces (id primary key) + - iam_users (id primary key) + iam_users_by_username lookup table + (workspace, username) -> id + - iam_api_keys (key_hash primary key) with secondary index on user_id + - iam_signing_keys (kid primary key) — RSA keypairs for JWT signing + +See docs/tech-specs/iam-protocol.md for the wire-level context. +""" + +import logging + +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +from ssl import SSLContext, PROTOCOL_TLSv1_2 + +from . cassandra_async import async_execute + +logger = logging.getLogger(__name__) + + +class IamTableStore: + + def __init__( + self, + cassandra_host, cassandra_username, cassandra_password, + keyspace, + ): + self.keyspace = keyspace + + logger.info("IAM: connecting to Cassandra...") + + if isinstance(cassandra_host, str): + cassandra_host = [h.strip() for h in cassandra_host.split(",")] + + if cassandra_username and cassandra_password: + ssl_context = SSLContext(PROTOCOL_TLSv1_2) + auth_provider = PlainTextAuthProvider( + username=cassandra_username, password=cassandra_password, + ) + self.cluster = Cluster( + cassandra_host, + auth_provider=auth_provider, + ssl_context=ssl_context, + ) + else: + self.cluster = Cluster(cassandra_host) + + self.cassandra = self.cluster.connect() + + logger.info("IAM: connected.") + + self._ensure_schema() + self._prepare_statements() + + def _ensure_schema(self): + # FIXME: Replication factor should be configurable. + self.cassandra.execute(f""" + create keyspace if not exists {self.keyspace} + with replication = {{ + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }}; + """) + self.cassandra.set_keyspace(self.keyspace) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS iam_workspaces ( + id text PRIMARY KEY, + name text, + enabled boolean, + created timestamp + ); + """) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS iam_users ( + id text PRIMARY KEY, + workspace text, + username text, + name text, + email text, + password_hash text, + roles set, + enabled boolean, + must_change_password boolean, + created timestamp + ); + """) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS iam_users_by_username ( + workspace text, + username text, + user_id text, + PRIMARY KEY ((workspace), username) + ); + """) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS iam_api_keys ( + key_hash text PRIMARY KEY, + id text, + user_id text, + name text, + prefix text, + expires timestamp, + created timestamp, + last_used timestamp + ); + """) + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS iam_api_keys_user_id_idx + ON iam_api_keys (user_id); + """) + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS iam_api_keys_id_idx + ON iam_api_keys (id); + """) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS iam_signing_keys ( + kid text PRIMARY KEY, + private_pem text, + public_pem text, + created timestamp, + retired timestamp + ); + """) + + logger.info("IAM: Cassandra schema OK.") + + def _prepare_statements(self): + c = self.cassandra + + self.put_workspace_stmt = c.prepare(""" + INSERT INTO iam_workspaces (id, name, enabled, created) + VALUES (?, ?, ?, ?) + """) + self.get_workspace_stmt = c.prepare(""" + SELECT id, name, enabled, created FROM iam_workspaces + WHERE id = ? + """) + self.list_workspaces_stmt = c.prepare(""" + SELECT id, name, enabled, created FROM iam_workspaces + """) + + self.put_user_stmt = c.prepare(""" + INSERT INTO iam_users ( + id, workspace, username, name, email, password_hash, + roles, enabled, must_change_password, created + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """) + self.get_user_stmt = c.prepare(""" + SELECT id, workspace, username, name, email, password_hash, + roles, enabled, must_change_password, created + FROM iam_users WHERE id = ? + """) + self.list_users_by_workspace_stmt = c.prepare(""" + SELECT id, workspace, username, name, email, password_hash, + roles, enabled, must_change_password, created + FROM iam_users WHERE workspace = ? ALLOW FILTERING + """) + + self.put_username_lookup_stmt = c.prepare(""" + INSERT INTO iam_users_by_username (workspace, username, user_id) + VALUES (?, ?, ?) + """) + self.get_user_id_by_username_stmt = c.prepare(""" + SELECT user_id FROM iam_users_by_username + WHERE workspace = ? AND username = ? + """) + self.delete_username_lookup_stmt = c.prepare(""" + DELETE FROM iam_users_by_username + WHERE workspace = ? AND username = ? + """) + + self.put_api_key_stmt = c.prepare(""" + INSERT INTO iam_api_keys ( + key_hash, id, user_id, name, prefix, expires, + created, last_used + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """) + self.get_api_key_by_hash_stmt = c.prepare(""" + SELECT key_hash, id, user_id, name, prefix, expires, + created, last_used + FROM iam_api_keys WHERE key_hash = ? + """) + self.get_api_key_by_id_stmt = c.prepare(""" + SELECT key_hash, id, user_id, name, prefix, expires, + created, last_used + FROM iam_api_keys WHERE id = ? + """) + self.list_api_keys_by_user_stmt = c.prepare(""" + SELECT key_hash, id, user_id, name, prefix, expires, + created, last_used + FROM iam_api_keys WHERE user_id = ? + """) + self.delete_api_key_stmt = c.prepare(""" + DELETE FROM iam_api_keys WHERE key_hash = ? + """) + + self.put_signing_key_stmt = c.prepare(""" + INSERT INTO iam_signing_keys ( + kid, private_pem, public_pem, created, retired + ) + VALUES (?, ?, ?, ?, ?) + """) + self.list_signing_keys_stmt = c.prepare(""" + SELECT kid, private_pem, public_pem, created, retired + FROM iam_signing_keys + """) + + # ------------------------------------------------------------------ + # Workspaces + # ------------------------------------------------------------------ + + async def put_workspace(self, id, name, enabled, created): + await async_execute( + self.cassandra, self.put_workspace_stmt, + (id, name, enabled, created), + ) + + async def get_workspace(self, id): + rows = await async_execute( + self.cassandra, self.get_workspace_stmt, (id,), + ) + return rows[0] if rows else None + + async def list_workspaces(self): + return await async_execute( + self.cassandra, self.list_workspaces_stmt, + ) + + # ------------------------------------------------------------------ + # Users + # ------------------------------------------------------------------ + + async def put_user( + self, id, workspace, username, name, email, password_hash, + roles, enabled, must_change_password, created, + ): + await async_execute( + self.cassandra, self.put_user_stmt, + ( + id, workspace, username, name, email, password_hash, + set(roles) if roles else set(), + enabled, must_change_password, created, + ), + ) + await async_execute( + self.cassandra, self.put_username_lookup_stmt, + (workspace, username, id), + ) + + async def get_user(self, id): + rows = await async_execute( + self.cassandra, self.get_user_stmt, (id,), + ) + return rows[0] if rows else None + + async def get_user_id_by_username(self, workspace, username): + rows = await async_execute( + self.cassandra, self.get_user_id_by_username_stmt, + (workspace, username), + ) + return rows[0][0] if rows else None + + async def list_users_by_workspace(self, workspace): + return await async_execute( + self.cassandra, self.list_users_by_workspace_stmt, (workspace,), + ) + + # ------------------------------------------------------------------ + # API keys + # ------------------------------------------------------------------ + + async def put_api_key( + self, key_hash, id, user_id, name, prefix, expires, + created, last_used, + ): + await async_execute( + self.cassandra, self.put_api_key_stmt, + (key_hash, id, user_id, name, prefix, expires, + created, last_used), + ) + + async def get_api_key_by_hash(self, key_hash): + rows = await async_execute( + self.cassandra, self.get_api_key_by_hash_stmt, (key_hash,), + ) + return rows[0] if rows else None + + async def get_api_key_by_id(self, id): + rows = await async_execute( + self.cassandra, self.get_api_key_by_id_stmt, (id,), + ) + return rows[0] if rows else None + + async def list_api_keys_by_user(self, user_id): + return await async_execute( + self.cassandra, self.list_api_keys_by_user_stmt, (user_id,), + ) + + async def delete_api_key(self, key_hash): + await async_execute( + self.cassandra, self.delete_api_key_stmt, (key_hash,), + ) + + # ------------------------------------------------------------------ + # Signing keys + # ------------------------------------------------------------------ + + async def put_signing_key(self, kid, private_pem, public_pem, + created, retired): + await async_execute( + self.cassandra, self.put_signing_key_stmt, + (kid, private_pem, public_pem, created, retired), + ) + + async def list_signing_keys(self): + return await async_execute( + self.cassandra, self.list_signing_keys_stmt, + ) + + # ------------------------------------------------------------------ + # Bootstrap helpers + # ------------------------------------------------------------------ + + async def any_workspace_exists(self): + rows = await self.list_workspaces() + return bool(rows)