diff --git a/docs/tech-specs/no-auth-regime.md b/docs/tech-specs/no-auth-regime.md new file mode 100644 index 00000000..ae8b427f --- /dev/null +++ b/docs/tech-specs/no-auth-regime.md @@ -0,0 +1,186 @@ +--- +layout: default +title: "No-Auth IAM Regime" +parent: "Tech Specs" +--- + +# No-Auth IAM Regime + +## Overview + +A minimal IAM regime that permits all access unconditionally. +Implements the same Pulsar request/response protocol as `iam-svc` +(see [iam-contract.md](iam-contract.md)) so it is a drop-in +replacement: swap `iam-svc` for `no-auth-svc` in the deployment +and the gateway, bootstrapper, and all other components continue +to work without modification. + +Intended for development, testing, single-tenant self-hosted +deployments, and evaluation environments where authentication +overhead is unwanted. + +## Motivation + +The full IAM regime requires Cassandra tables, a bootstrap +sequence, API key management, and signing key rotation. For +many deployments this is unnecessary friction: + +- Local development and CI/CD pipelines. +- Single-user or small-team self-hosted instances. +- Evaluation and demo environments. +- Deployments behind an external authentication proxy + (e.g. OAuth2 reverse proxy, VPN-gated access). + +Today operators who want no auth must still deploy `iam-svc` and +complete the bootstrap ceremony. A purpose-built no-auth regime +eliminates that requirement entirely. + +## Design + +### Deployment + +Replace `iam-svc` with `no-auth-svc` in the processor group or +container configuration. No other services change. The no-auth +service listens on the standard IAM Pulsar topics: + +- Request: `request::iam` +- Response: `response::iam` + +### Dependencies + +None. No database, no config entries, no signing keys, no +bootstrap sequence. + +### Operation responses + +The service implements the IAM contract +([iam-contract.md](iam-contract.md)) with the following +behaviour for each operation: + +| Operation | Behaviour | +|---|---| +| `authenticate-anonymous` | Returns a default identity: `user_id="anonymous"`, `workspace="default"`, `roles=["admin"]`. This is the key operation that distinguishes no-auth from the full regime. | +| `resolve-api-key` | Accepts any token. Returns the same default identity as `authenticate-anonymous`. | +| `authorise` | Always allows. Returns `decision_allow=True`, `decision_ttl_seconds=3600`. | +| `authorise-many` | Always allows all checks. | +| `get-signing-key-public` | Returns an empty string. The gateway skips JWT validation when no key is available. | +| `bootstrap` | No-op. Returns empty admin user/key. | +| `bootstrap-status` | Returns `bootstrap_available=False`. | +| `whoami` | Returns a stub user record for the actor. | +| `login` | Returns empty JWT (not supported under no-auth). | +| `create-user`, `list-users`, `get-user`, `update-user`, `delete-user`, `disable-user`, `enable-user` | Return empty/stub responses. User management is meaningless without auth. | +| `create-workspace`, `list-workspaces`, `get-workspace`, `update-workspace`, `disable-workspace` | Return empty/stub responses. | +| `create-api-key`, `list-api-keys`, `revoke-api-key` | Return empty/stub responses. | +| `change-password`, `reset-password` | No-op. | +| `rotate-signing-key` | No-op. | +| Unknown operation | Returns an error response (same as `iam-svc`). | + +### Workspace resolution + +When `resolve-api-key` is called, the returned workspace +determines which workspace the request operates against. The +no-auth service defaults to `"default"`. + +A configurable `--default-workspace` flag allows operators to +change this without code changes. + +### Anonymous authentication + +A new `authenticate-anonymous` operation is added to the IAM +protocol. This is a small, backward-compatible addition to the +contract: + +**Gateway change** (`auth.py`): when `authenticate()` receives a +request with no `Authorization` header (or an empty bearer +token), instead of immediately returning 401, it sends an +`authenticate-anonymous` request to the IAM service. If the +regime returns a valid identity, the request proceeds. If the +regime returns an error, the gateway returns 401 as before. + +**`iam-svc` (full regime)**: returns `auth-failed` for +`authenticate-anonymous`. Behaviour is unchanged — unauthenticated +requests are rejected exactly as they are today. + +**`no-auth-svc`**: returns the default identity (`anonymous` / +`default` workspace). No token required. + +This keeps the policy decision ("is anonymous access allowed?") +in the IAM regime, not in the gateway. The gateway is a generic +enforcement point that asks and respects the answer. + +**Wire format**: uses the existing `IamRequest` / `IamResponse` +schema with `operation="authenticate-anonymous"`. No new fields +required — the response uses `resolved_user_id`, +`resolved_workspace`, and `resolved_roles`, same as +`resolve-api-key`. + +Requests that do carry a bearer token follow the existing +`resolve-api-key` / JWT paths unchanged. + +## Implementation + +### Service structure + +The service is a standard `AsyncProcessor` that consumes IAM +requests and produces IAM responses, identical in shape to the +existing `iam-svc` processor: + +``` +trustgraph-flow/ + trustgraph/ + iam/ + noauth/ + __init__.py + __main__.py + service.py # AsyncProcessor wiring + handler.py # Operation dispatch, always-allow logic +``` + +### Handler + +The handler is a single `handle(request) -> response` function +with a dispatch table. Each operation returns a pre-built +`IamResponse` with the appropriate fields set. No database +access, no crypto, no state. + +### Configuration + +| Flag | Default | Description | +|---|---|---| +| `--default-workspace` | `"default"` | Workspace returned by `resolve-api-key` | +| `--default-user-id` | `"anonymous"` | User ID returned by `resolve-api-key` | + +### Entry point + +``` +tg-no-auth-svc +``` + +Or via processor group: + +```yaml +- class: trustgraph.iam.noauth.Processor + params: + <<: *defaults + id: no-auth-svc +``` + +## Security considerations + +This regime provides **no security whatsoever**. Any caller with +network access to the API gateway has full admin access to all +workspaces. + +Operators must ensure that network-level controls (firewall, +VPN, private network) provide adequate protection when deploying +this regime. The regime is explicitly not suitable for multi- +tenant or internet-facing deployments. + +## Testing + +- Unit: verify each operation returns the expected stub response. +- Integration: deploy `no-auth-svc` in place of `iam-svc`, confirm + the gateway starts, accepts requests with a dummy bearer token, + and routes them to the default workspace. +- E2E: run the standard e2e test suite with `no-auth-svc` to + confirm no regressions. diff --git a/tests/unit/test_concurrency/test_consumer_concurrency.py b/tests/unit/test_concurrency/test_consumer_concurrency.py index 59c7f2b5..44d82182 100644 --- a/tests/unit/test_concurrency/test_consumer_concurrency.py +++ b/tests/unit/test_concurrency/test_consumer_concurrency.py @@ -272,23 +272,22 @@ class TestMetricsIntegration: class TestPollTimeout: @pytest.mark.asyncio - async def test_poll_timeout_is_100ms(self): - """Consumer receive timeout should be 100ms, not the original 2000ms. + async def test_poll_timeout_is_2000ms(self): + """Consumer receive timeout should be 2000ms. - A 2000ms poll timeout means every service adds up to 2s of idle - blocking between message bursts. With many sequential hops in a - query pipeline, this compounds into seconds of unnecessary latency. - 100ms keeps responsiveness high without significant CPU overhead. + receive() is a blocking call that returns immediately when a + message arrives — the timeout only governs how often the loop + checks the shutdown flag during idle periods. Lower values + (e.g. 100ms) generate excessive C++ client WARN logging with + no latency benefit. """ consumer = _make_consumer() - # Wire up a mock Pulsar consumer that records the receive kwargs mock_pulsar_consumer = MagicMock() received_kwargs = {} def capture_receive(**kwargs): received_kwargs.update(kwargs) - # Stop after one call consumer.running = False raise type('Timeout', (Exception,), {})("timeout") @@ -296,7 +295,7 @@ class TestPollTimeout: await consumer.consume_from_queue(mock_pulsar_consumer) - assert received_kwargs.get("timeout_millis") == 100 + assert received_kwargs.get("timeout_millis") == 2000 # --------------------------------------------------------------------------- diff --git a/tests/unit/test_gateway/test_auth.py b/tests/unit/test_gateway/test_auth.py index 26e93fd9..8ffcafa1 100644 --- a/tests/unit/test_gateway/test_auth.py +++ b/tests/unit/test_gateway/test_auth.py @@ -165,22 +165,37 @@ class TestIamAuthDispatch: by shape of the bearer.""" @pytest.mark.asyncio - async def test_no_authorization_header_raises_401(self): + async def test_no_authorization_header_tries_anonymous(self): auth = IamAuth(backend=Mock()) - with pytest.raises(web.HTTPUnauthorized): - await auth.authenticate(make_request(None)) + + async def fake_with_client(op): + raise RuntimeError("auth-failed: anonymous access not permitted") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request(None)) @pytest.mark.asyncio - async def test_non_bearer_header_raises_401(self): + async def test_non_bearer_header_tries_anonymous(self): auth = IamAuth(backend=Mock()) - with pytest.raises(web.HTTPUnauthorized): - await auth.authenticate(make_request("Basic whatever")) + + async def fake_with_client(op): + raise RuntimeError("auth-failed: anonymous access not permitted") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request("Basic whatever")) @pytest.mark.asyncio - async def test_empty_bearer_raises_401(self): + async def test_empty_bearer_tries_anonymous(self): auth = IamAuth(backend=Mock()) - with pytest.raises(web.HTTPUnauthorized): - await auth.authenticate(make_request("Bearer ")) + + async def fake_with_client(op): + raise RuntimeError("auth-failed: anonymous access not permitted") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request("Bearer ")) @pytest.mark.asyncio async def test_unknown_format_raises_401(self): @@ -445,3 +460,121 @@ class TestAuthorise: # Different resource → different cache key → two IAM calls. assert calls["n"] == 2 + + +# -- Anonymous authentication boundary ------------------------------------ + + +class TestAnonymousAuthBoundary: + """The gateway must only attempt anonymous auth when no credential + is presented. A malformed token must NOT fall through to the + anonymous path — that would let an attacker bypass a broken token + by simply sending garbage.""" + + @pytest.mark.asyncio + async def test_no_header_attempts_anonymous(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authenticate_anonymous=AsyncMock( + return_value=("anon", "default", ["reader"]), + ) + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = await auth.authenticate(make_request(None)) + assert ident.handle == "anon" + assert ident.source == "anonymous" + + @pytest.mark.asyncio + async def test_empty_bearer_attempts_anonymous(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authenticate_anonymous=AsyncMock( + return_value=("anon", "default", ["reader"]), + ) + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + ident = await auth.authenticate(make_request("Bearer ")) + assert ident.handle == "anon" + assert ident.source == "anonymous" + + @pytest.mark.asyncio + async def test_malformed_token_does_not_fall_through_to_anonymous(self): + auth = IamAuth(backend=Mock()) + called = {"anonymous": False} + + original = auth._authenticate_anonymous + + async def spy_anonymous(): + called["anonymous"] = True + return await original() + + auth._authenticate_anonymous = spy_anonymous + + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request("Bearer garbage")) + assert not called["anonymous"] + + @pytest.mark.asyncio + async def test_bad_api_key_does_not_fall_through_to_anonymous(self): + auth = IamAuth(backend=Mock()) + called = {"anonymous": False} + + async def spy_anonymous(): + called["anonymous"] = True + + auth._authenticate_anonymous = spy_anonymous + + async def fake_with_client(op): + raise RuntimeError("auth-failed: unknown key") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request("Bearer tg_bad")) + assert not called["anonymous"] + + @pytest.mark.asyncio + async def test_bad_jwt_does_not_fall_through_to_anonymous(self): + auth = IamAuth(backend=Mock()) + auth._signing_public_pem = "not-a-real-pem" + called = {"anonymous": False} + + async def spy_anonymous(): + called["anonymous"] = True + + auth._authenticate_anonymous = spy_anonymous + + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request("Bearer a.b.c")) + assert not called["anonymous"] + + @pytest.mark.asyncio + async def test_anonymous_rejected_by_iam_raises_401(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + raise RuntimeError("auth-failed: anonymous access not permitted") + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request(None)) + + @pytest.mark.asyncio + async def test_anonymous_with_empty_user_id_raises_401(self): + auth = IamAuth(backend=Mock()) + + async def fake_with_client(op): + return await op(Mock( + authenticate_anonymous=AsyncMock( + return_value=("", "default", []), + ) + )) + + with patch.object(auth, "_with_client", side_effect=fake_with_client): + with pytest.raises(web.HTTPUnauthorized): + await auth.authenticate(make_request(None)) diff --git a/tests/unit/test_iam/__init__.py b/tests/unit/test_iam/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_iam/test_iam_rejects_anonymous.py b/tests/unit/test_iam/test_iam_rejects_anonymous.py new file mode 100644 index 00000000..492b570a --- /dev/null +++ b/tests/unit/test_iam/test_iam_rejects_anonymous.py @@ -0,0 +1,44 @@ +""" +Contract test: the full iam-svc MUST reject authenticate-anonymous. + +This is a safety pin — if someone accidentally adds anonymous access +to the production IAM handler, this test catches it. +""" + +import asyncio +from unittest.mock import Mock, AsyncMock + +import pytest + +from trustgraph.iam.service.iam import IamService + + +def _make_request(**kwargs): + req = Mock() + for k, v in kwargs.items(): + setattr(req, k, v) + return req + + +class TestIamRejectsAnonymous: + + @pytest.fixture + def handler(self): + svc = object.__new__(IamService) + svc.table_store = Mock(spec=[]) + svc.bootstrap_mode = "token" + svc.bootstrap_token = "tok" + svc._on_workspace_created = None + svc._on_workspace_deleted = None + svc._signing_key = None + svc._signing_key_lock = asyncio.Lock() + return svc + + @pytest.mark.asyncio + async def test_authenticate_anonymous_returns_auth_failed(self, handler): + resp = await handler.handle( + _make_request(operation="authenticate-anonymous") + ) + assert resp.error is not None + assert resp.error.type == "auth-failed" + assert "anonymous" in resp.error.message.lower() diff --git a/tests/unit/test_iam/test_noauth_handler.py b/tests/unit/test_iam/test_noauth_handler.py new file mode 100644 index 00000000..38461b62 --- /dev/null +++ b/tests/unit/test_iam/test_noauth_handler.py @@ -0,0 +1,138 @@ +""" +Tests for the no-auth IAM handler. + +Verifies that NoAuthHandler returns the expected permissive responses +and that the always-allow authorise path returns the correct shape. +""" + +import json +from unittest.mock import Mock + +import pytest + +from trustgraph.iam.noauth.handler import NoAuthHandler + + +def _make_request(**kwargs): + req = Mock() + for k, v in kwargs.items(): + setattr(req, k, v) + return req + + +class TestAuthenticateAnonymous: + + @pytest.mark.asyncio + async def test_returns_default_identity(self): + h = NoAuthHandler( + default_user_id="anon", default_workspace="ws", + ) + resp = await h.handle( + _make_request(operation="authenticate-anonymous") + ) + assert resp.error is None + assert resp.resolved_user_id == "anon" + assert resp.resolved_workspace == "ws" + assert "admin" in list(resp.resolved_roles) + + @pytest.mark.asyncio + async def test_custom_defaults_propagate(self): + h = NoAuthHandler( + default_user_id="dev-user", default_workspace="dev-ws", + ) + resp = await h.handle( + _make_request(operation="authenticate-anonymous") + ) + assert resp.resolved_user_id == "dev-user" + assert resp.resolved_workspace == "dev-ws" + + +class TestResolveApiKey: + + @pytest.mark.asyncio + async def test_any_key_resolves_to_default_identity(self): + h = NoAuthHandler() + resp = await h.handle( + _make_request(operation="resolve-api-key", api_key="tg_bogus") + ) + assert resp.error is None + assert resp.resolved_user_id == "anonymous" + assert resp.resolved_workspace == "default" + + +class TestAuthorise: + + @pytest.mark.asyncio + async def test_always_allows(self): + h = NoAuthHandler() + resp = await h.handle( + _make_request( + operation="authorise", + user_id="anyone", + capability="anything", + resource_json="{}", + parameters_json="{}", + ) + ) + assert resp.error is None + assert resp.decision_allow is True + assert resp.decision_ttl_seconds > 0 + + @pytest.mark.asyncio + async def test_authorise_many_returns_matching_count(self): + h = NoAuthHandler() + checks = [ + {"capability": "a", "resource": {}, "parameters": {}}, + {"capability": "b", "resource": {}, "parameters": {}}, + {"capability": "c", "resource": {}, "parameters": {}}, + ] + resp = await h.handle( + _make_request( + operation="authorise-many", + user_id="u", + authorise_checks=json.dumps(checks), + ) + ) + assert resp.error is None + decisions = json.loads(resp.decisions_json) + assert len(decisions) == 3 + assert all(d["allow"] is True for d in decisions) + + +class TestCreateWorkspaceCallback: + + @pytest.mark.asyncio + async def test_create_workspace_calls_callback(self): + called_with = [] + + async def on_created(ws_id): + called_with.append(ws_id) + + h = NoAuthHandler(on_workspace_created=on_created) + req = _make_request(operation="create-workspace") + req.workspace_record = Mock() + req.workspace_record.id = "test-ws" + resp = await h.handle(req) + assert resp.error is None + assert called_with == ["test-ws"] + + @pytest.mark.asyncio + async def test_create_workspace_without_callback_still_succeeds(self): + h = NoAuthHandler() + req = _make_request(operation="create-workspace") + req.workspace_record = Mock() + req.workspace_record.id = "test-ws" + resp = await h.handle(req) + assert resp.error is None + + +class TestUnknownOperation: + + @pytest.mark.asyncio + async def test_unknown_op_returns_error(self): + h = NoAuthHandler() + resp = await h.handle( + _make_request(operation="not-a-real-op") + ) + assert resp.error is not None + assert resp.error.type == "invalid-argument" diff --git a/trustgraph-base/trustgraph/api/async_socket_client.py b/trustgraph-base/trustgraph/api/async_socket_client.py index ca9146b9..d18bee34 100644 --- a/trustgraph-base/trustgraph/api/async_socket_client.py +++ b/trustgraph-base/trustgraph/api/async_socket_client.py @@ -62,12 +62,6 @@ class AsyncSocketClient: if self._connected: return - if not self.token: - raise ProtocolException( - "AsyncSocketClient requires a token for first-frame " - "auth against /api/v1/socket" - ) - ws_url = self._build_ws_url() self._connect_cm = websockets.connect( ws_url, ping_interval=20, ping_timeout=self.timeout @@ -79,7 +73,7 @@ class AsyncSocketClient: # reader task so the response isn't consumed by the reader's # id-based routing. await self._socket.send(json.dumps({ - "type": "auth", "token": self.token, + "type": "auth", "token": self.token or "", })) try: raw = await asyncio.wait_for( diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index 75a7be9a..9874c8af 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -137,12 +137,6 @@ class SocketClient: if self._connected: return - if not self.token: - raise ProtocolException( - "SocketClient requires a token for first-frame auth " - "against /api/v1/socket" - ) - ws_url = self._build_ws_url() self._connect_cm = websockets.connect( ws_url, ping_interval=20, ping_timeout=self.timeout @@ -153,7 +147,7 @@ class SocketClient: # auth-ok / auth-failed response isn't consumed by the reader # loop's id-based routing. await self._socket.send(json.dumps({ - "type": "auth", "token": self.token, + "type": "auth", "token": self.token or "", })) try: raw = await asyncio.wait_for( diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 5c59c515..b9f2ee0b 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -76,8 +76,10 @@ class Consumer: if hasattr(self, "consumer"): if self.consumer: - self.consumer.unsubscribe() - self.consumer.close() + try: + self.consumer.close() + except Exception: + pass self.consumer = None async def stop(self): @@ -157,12 +159,14 @@ class Consumer: except Exception as e: logger.error(f"Consumer loop exception: {e}", exc_info=True) - for c in consumers: + for i, c in enumerate(consumers): try: - c.unsubscribe() c.close() - except Exception: - pass + except Exception as ce: + logger.warning( + f"Consumer {i} close failed (error path): " + f"{type(ce).__name__}: {ce}" + ) for ex in executors: ex.shutdown(wait=False) consumers = [] @@ -171,12 +175,14 @@ class Consumer: continue finally: - for c in consumers: + for i, c in enumerate(consumers): try: - c.unsubscribe() c.close() - except Exception: - pass + except Exception as ce: + logger.warning( + f"Consumer {i} close failed: " + f"{type(ce).__name__}: {ce}" + ) for ex in executors: ex.shutdown(wait=False) @@ -188,7 +194,7 @@ class Consumer: try: msg = await loop.run_in_executor( executor, - lambda: consumer.receive(timeout_millis=100), + lambda: consumer.receive(timeout_millis=2000), ) except Exception as e: # Handle timeout from any backend diff --git a/trustgraph-base/trustgraph/base/iam_client.py b/trustgraph-base/trustgraph/base/iam_client.py index 4be59de1..e0457d19 100644 --- a/trustgraph-base/trustgraph/base/iam_client.py +++ b/trustgraph-base/trustgraph/base/iam_client.py @@ -62,6 +62,22 @@ class IamClient(RequestResponse): ) return resp.user + async def authenticate_anonymous(self, timeout=IAM_TIMEOUT): + """Request anonymous access from the IAM regime. + + Returns ``(user_id, workspace, roles)`` if the regime permits + anonymous access, or raises ``RuntimeError`` with error type + ``auth-failed`` if it does not.""" + resp = await self._request( + operation="authenticate-anonymous", + timeout=timeout, + ) + return ( + resp.resolved_user_id, + resp.resolved_workspace, + list(resp.resolved_roles), + ) + async def resolve_api_key(self, api_key, timeout=IAM_TIMEOUT): """Resolve a plaintext API key to its identity triple. diff --git a/trustgraph-base/trustgraph/base/pubsub.py b/trustgraph-base/trustgraph/base/pubsub.py index fb4765c1..4ae8d2d0 100644 --- a/trustgraph-base/trustgraph/base/pubsub.py +++ b/trustgraph-base/trustgraph/base/pubsub.py @@ -10,6 +10,7 @@ logger = logging.getLogger(__name__) # Default connection settings from environment DEFAULT_PULSAR_HOST = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') DEFAULT_PULSAR_API_KEY = os.getenv("PULSAR_API_KEY", None) +DEFAULT_PULSAR_ADMIN_URL = os.getenv("PULSAR_ADMIN_URL", 'http://pulsar:8080') DEFAULT_RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", 'rabbitmq') DEFAULT_RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", '5672')) @@ -43,6 +44,7 @@ def get_pubsub(**config: Any) -> Any: host=config.get('pulsar_host', DEFAULT_PULSAR_HOST), api_key=config.get('pulsar_api_key', DEFAULT_PULSAR_API_KEY), listener=config.get('pulsar_listener'), + admin_url=config.get('pulsar_admin_url', DEFAULT_PULSAR_ADMIN_URL), ) elif backend_type == 'rabbitmq': from .rabbitmq_backend import RabbitMQBackend @@ -77,6 +79,7 @@ def get_pubsub(**config: Any) -> Any: STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650' +STANDALONE_PULSAR_ADMIN_URL = 'http://localhost:8080' def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: @@ -88,6 +91,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: that run outside containers) """ pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST + pulsar_admin_url = STANDALONE_PULSAR_ADMIN_URL if standalone else DEFAULT_PULSAR_ADMIN_URL pulsar_listener = 'localhost' if standalone else None rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP @@ -105,6 +109,12 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: help=f'Pulsar host (default: {pulsar_host})', ) + parser.add_argument( + '--pulsar-admin-url', + default=pulsar_admin_url, + help=f'Pulsar admin REST API URL (default: {pulsar_admin_url})', + ) + parser.add_argument( '--pulsar-api-key', default=DEFAULT_PULSAR_API_KEY, diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index e27d16af..e85dfbef 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -7,8 +7,12 @@ handling topic mapping, serialization, and Pulsar client management. import pulsar import _pulsar +import asyncio import json import logging +import urllib.request +import urllib.error +import urllib.parse from typing import Any from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message @@ -117,7 +121,10 @@ class PulsarBackend: producers and consumers. """ - def __init__(self, host: str, api_key: str = None, listener: str = None): + def __init__( + self, host: str, api_key: str = None, listener: str = None, + admin_url: str = None, + ): """ Initialize Pulsar backend. @@ -125,10 +132,12 @@ class PulsarBackend: host: Pulsar broker URL (e.g., pulsar://localhost:6650) api_key: Optional API key for authentication listener: Optional listener name for multi-homed setups + admin_url: Pulsar admin REST API URL (e.g., http://pulsar:8080) """ self.host = host self.api_key = api_key self.listener = listener + self.admin_url = admin_url # Create Pulsar client client_args = {'service_url': host} @@ -139,6 +148,10 @@ class PulsarBackend: if api_key: client_args['authentication'] = pulsar.AuthenticationToken(api_key) + client_args['logger'] = pulsar.ConsoleLogger( + _pulsar.LoggerLevel.Error + ) + self.client = pulsar.Client(**client_args) logger.info(f"Pulsar client connected to {host}") @@ -266,24 +279,129 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) + def _admin_api_path(self, pulsar_uri: str) -> str: + """ + Convert a Pulsar topic URI to an admin REST API path. + + persistent://tg/flow/triples-store:default:explain-flow + -> /admin/v2/persistent/tg/flow/triples-store%3Adefault%3Aexplain-flow + """ + scheme, rest = pulsar_uri.split('://', 1) + tenant, namespace, topic = rest.split('/', 2) + encoded_topic = urllib.parse.quote(topic, safe='') + return f"/admin/v2/{scheme}/{tenant}/{namespace}/{encoded_topic}" + + def _admin_request(self, method, path): + """ + Make a synchronous admin REST API request. + + Returns parsed JSON for GET, None for DELETE/PUT. + Raises urllib.error.HTTPError for non-404 errors. + 404 is treated as success (idempotent deletion). + """ + url = f"{self.admin_url}{path}" + req = urllib.request.Request(url, method=method) + + try: + with urllib.request.urlopen(req) as resp: + if method == 'GET': + return json.loads(resp.read().decode('utf-8')) + return None + except urllib.error.HTTPError as e: + if e.code == 404: + return None + raise + + def _delete_topic_sync(self, topic: str): + """ + Delete a persistent topic and all its subscriptions. + + Subscriptions must be removed first — Pulsar rejects topic + deletion while subscriptions exist. Force-deletes each + subscription to disconnect any lingering consumers. + """ + pulsar_uri = self.map_topic(topic) + + if pulsar_uri.startswith('non-persistent://'): + return + + api_path = self._admin_api_path(pulsar_uri) + + try: + subs = self._admin_request('GET', f"{api_path}/subscriptions") + except Exception as e: + logger.warning(f"Failed to list subscriptions for {topic}: {e}") + return + + if subs: + for sub in subs: + encoded_sub = urllib.parse.quote(sub, safe='') + try: + self._admin_request( + 'DELETE', + f"{api_path}/subscription/{encoded_sub}" + f"?force=true" + ) + logger.info( + f"Deleted subscription {sub} from {topic}" + ) + except Exception as e: + logger.warning( + f"Failed to delete subscription {sub} " + f"from {topic}: {e}" + ) + + try: + self._admin_request('DELETE', api_path) + logger.info(f"Deleted topic: {topic}") + except Exception as e: + logger.warning(f"Failed to delete topic {topic}: {e}") + + def _topic_exists_sync(self, topic: str) -> bool: + """Check topic existence via admin API.""" + pulsar_uri = self.map_topic(topic) + + if pulsar_uri.startswith('non-persistent://'): + return False + + api_path = self._admin_api_path(pulsar_uri) + + try: + result = self._admin_request('GET', f"{api_path}/stats") + return result is not None + except Exception: + return False + async def create_topic(self, topic: str) -> None: - """No-op — Pulsar auto-creates topics on first use. - TODO: Use admin REST API for explicit persistent topic creation.""" + """No-op — Pulsar auto-creates topics on first use.""" pass async def delete_topic(self, topic: str) -> None: - """No-op — to be replaced with admin REST API calls. - TODO: Delete persistent topic via admin API.""" - pass + """ + Delete a persistent topic and all its subscriptions via + the admin REST API. + + Called by the flow controller during deliberate flow deletion. + Non-persistent topics are skipped. Idempotent. + """ + if not self.admin_url: + logger.warning( + f"Cannot delete topic {topic}: " + f"no admin URL configured" + ) + return + + await asyncio.to_thread(self._delete_topic_sync, topic) async def topic_exists(self, topic: str) -> bool: - """Returns True — Pulsar auto-creates on subscribe. - TODO: Use admin REST API for actual existence check.""" - return True + """Check whether a persistent topic exists via the admin API.""" + if not self.admin_url: + return True + + return await asyncio.to_thread(self._topic_exists_sync, topic) async def ensure_topic(self, topic: str) -> None: - """No-op — Pulsar auto-creates topics on first use. - TODO: Use admin REST API for explicit creation.""" + """No-op — Pulsar auto-creates topics on first use.""" pass def close(self) -> None: diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index d8c690b5..8488a0a7 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -64,6 +64,7 @@ bootstrap = "trustgraph.bootstrap.bootstrapper:run" config-svc = "trustgraph.config.service:run" flow-svc = "trustgraph.flow.service:run" iam-svc = "trustgraph.iam.service:run" +no-auth-svc = "trustgraph.iam.noauth:run" doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run" doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run" doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run" diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py index 3c658fe3..81b7e98d 100644 --- a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py @@ -326,6 +326,58 @@ class Processor(AsyncProcessor): # Main loop. # ------------------------------------------------------------------ + async def _run_pre_service(self): + """Run pre-service initialisers before opening pub/sub clients. + + These bring up infrastructure that other services depend on + (e.g. Pulsar tenant/namespaces). They use out-of-band APIs + (HTTP admin), not pub/sub, so they don't need a config client. + They run without flag tracking — they must be idempotent. + """ + pre_specs = [ + s for s in self.specs + if not s.instance.wait_for_services + ] + if not pre_specs: + return + + for spec in pre_specs: + child_logger = logger.getChild(spec.name) + child_ctx = InitContext( + logger=child_logger, + config=None, + make_flow_client=self._make_flow_client, + make_iam_client=self._make_iam_client, + ) + child_logger.info(f"Running pre-service initialiser") + try: + await spec.instance.run(child_ctx, None, spec.flag) + child_logger.info(f"Pre-service initialiser completed") + except Exception as e: + child_logger.error( + f"Pre-service initialiser failed: " + f"{type(e).__name__}: {e}", + exc_info=True, + ) + raise + + async def start(self): + # Run pre-service initialisers before opening any pub/sub + # connections. They bring up infrastructure (Pulsar + # namespaces, etc.) that super().start() depends on. + while self.running: + try: + await self._run_pre_service() + break + except Exception as e: + logger.info( + f"Pre-service initialisation failed " + f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s" + ) + await asyncio.sleep(GATE_BACKOFF) + + await super().start() + async def run(self): logger.info( @@ -347,29 +399,18 @@ class Processor(AsyncProcessor): continue try: - # Phase 1: pre-service initialisers run unconditionally. - pre_specs = [ - s for s in self.specs - if not s.instance.wait_for_services - ] - pre_results = {} - for spec in pre_specs: - pre_results[spec.name] = await self._run_spec( - spec, config, - ) - - # Phase 2: gate. + # Phase 1: gate. gate_ok = await self._gate_ready(config) - # Phase 3: post-service initialisers, if gate passed. - post_results = {} + # Phase 2: post-service initialisers, if gate passed. + results = {} if gate_ok: post_specs = [ s for s in self.specs if s.instance.wait_for_services ] for spec in post_specs: - post_results[spec.name] = await self._run_spec( + results[spec.name] = await self._run_spec( spec, config, ) @@ -377,8 +418,7 @@ class Processor(AsyncProcessor): if not gate_ok: sleep_for = GATE_BACKOFF else: - all_results = {**pre_results, **post_results} - if any(r != "skip" for r in all_results.values()): + if any(r != "skip" for r in results.values()): sleep_for = INIT_RETRY else: sleep_for = STEADY_INTERVAL diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py index 843fe056..1e4805de 100644 --- a/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py @@ -112,6 +112,10 @@ class PulsarTopology(Initialiser): def _reconcile_sync(self, logger): if not self._tenant_exists(): clusters = self._get_clusters() + if not clusters: + raise RuntimeError( + "Pulsar cluster list is empty — broker not ready yet" + ) logger.info( f"Creating tenant {self.tenant!r} with clusters {clusters}" ) diff --git a/trustgraph-flow/trustgraph/gateway/auth.py b/trustgraph-flow/trustgraph/gateway/auth.py index 1309ecfc..273fcb5a 100644 --- a/trustgraph-flow/trustgraph/gateway/auth.py +++ b/trustgraph-flow/trustgraph/gateway/auth.py @@ -233,10 +233,10 @@ class IamAuth: header = request.headers.get("Authorization", "") if not header.startswith("Bearer "): - raise _auth_failure() + return await self._authenticate_anonymous() token = header[len("Bearer "):].strip() if not token: - raise _auth_failure() + return await self._authenticate_anonymous() # API keys always start with "tg_". JWTs have two dots and # no "tg_" prefix. Discriminate cheaply. @@ -266,6 +266,26 @@ class IamAuth: handle=sub, workspace=ws, principal_id=sub, source="jwt", ) + async def _authenticate_anonymous(self): + try: + async def _call(client): + return await client.authenticate_anonymous() + user_id, workspace, _roles = await self._with_client(_call) + except Exception as e: + logger.debug( + f"Anonymous authentication rejected: " + f"{type(e).__name__}: {e}" + ) + raise _auth_failure() + + if not user_id or not workspace: + raise _auth_failure() + + return Identity( + handle=user_id, workspace=workspace, + principal_id=user_id, source="anonymous", + ) + async def _resolve_api_key(self, plaintext): h = hashlib.sha256(plaintext.encode("utf-8")).hexdigest() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py index 02c0eed2..bdbd18d8 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -57,16 +57,13 @@ class Mux: (important for browsers, which treat a handshake-time 401 as terminal).""" token = data.get("token", "") - if not token: - await self.ws.send_json({ - "type": "auth-failed", - "error": "auth failure", - }) - return class _Shim: def __init__(self, tok): - self.headers = {"Authorization": f"Bearer {tok}"} + self.headers = ( + {"Authorization": f"Bearer {tok}"} if tok + else {} + ) try: identity = await self.auth.authenticate(_Shim(token)) diff --git a/trustgraph-flow/trustgraph/iam/noauth/__init__.py b/trustgraph-flow/trustgraph/iam/noauth/__init__.py new file mode 100644 index 00000000..98f4d9da --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/noauth/__init__.py @@ -0,0 +1 @@ +from . service import * diff --git a/trustgraph-flow/trustgraph/iam/noauth/__main__.py b/trustgraph-flow/trustgraph/iam/noauth/__main__.py new file mode 100644 index 00000000..a731dd63 --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/noauth/__main__.py @@ -0,0 +1,4 @@ + +from . service import run + +run() diff --git a/trustgraph-flow/trustgraph/iam/noauth/handler.py b/trustgraph-flow/trustgraph/iam/noauth/handler.py new file mode 100644 index 00000000..d457697e --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/noauth/handler.py @@ -0,0 +1,131 @@ +""" +No-auth IAM handler. Implements the IAM contract with every operation +returning a permissive or stub response. No database, no crypto, +no state. +""" + +import json +import logging + +from trustgraph.schema import IamResponse, Error, UserRecord + +logger = logging.getLogger(__name__) + + +def _err(type, message): + return IamResponse(error=Error(type=type, message=message)) + + +class NoAuthHandler: + + def __init__(self, default_user_id="anonymous", + default_workspace="default", + on_workspace_created=None): + self.default_user_id = default_user_id + self.default_workspace = default_workspace + self._on_workspace_created = on_workspace_created + + def _default_identity_response(self): + return IamResponse( + resolved_user_id=self.default_user_id, + resolved_workspace=self.default_workspace, + resolved_roles=["admin"], + ) + + def _default_user_record(self): + return UserRecord( + id=self.default_user_id, + workspace=self.default_workspace, + username=self.default_user_id, + name="Anonymous User", + roles=["admin"], + enabled=True, + ) + + async def handle(self, v): + op = v.operation + + try: + if op == "authenticate-anonymous": + return self._default_identity_response() + + if op == "resolve-api-key": + return self._default_identity_response() + + if op == "authorise": + return IamResponse( + decision_allow=True, + decision_ttl_seconds=3600, + ) + + if op == "authorise-many": + checks = json.loads(v.authorise_checks or "[]") + decisions = [ + {"allow": True, "ttl": 3600} + for _ in checks + ] + return IamResponse( + decisions_json=json.dumps(decisions), + ) + + if op == "get-signing-key-public": + return IamResponse(signing_key_public="") + + if op == "bootstrap": + return IamResponse() + + if op == "bootstrap-status": + return IamResponse(bootstrap_available=False) + + if op == "whoami": + return IamResponse(user=self._default_user_record()) + + if op == "login": + return IamResponse() + + if op in ( + "create-user", "get-user", "update-user", + "disable-user", "enable-user", + ): + return IamResponse(user=self._default_user_record()) + + if op == "list-users": + return IamResponse(users=[self._default_user_record()]) + + if op == "delete-user": + return IamResponse() + + if op == "create-workspace": + if self._on_workspace_created and v.workspace_record: + await self._on_workspace_created(v.workspace_record.id) + return IamResponse() + + if op in ( + "get-workspace", "update-workspace", + "disable-workspace", + ): + return IamResponse() + + if op == "list-workspaces": + return IamResponse() + + if op in ("create-api-key", "list-api-keys", "revoke-api-key"): + return IamResponse() + + if op in ("change-password", "reset-password"): + return IamResponse() + + if op == "rotate-signing-key": + return IamResponse() + + return _err( + "invalid-argument", + f"unknown operation: {op!r}", + ) + + except Exception as e: + logger.error( + f"no-auth {op} failed: {type(e).__name__}: {e}", + exc_info=True, + ) + return _err("internal-error", str(e)) diff --git a/trustgraph-flow/trustgraph/iam/noauth/service.py b/trustgraph-flow/trustgraph/iam/noauth/service.py new file mode 100644 index 00000000..76d13a3c --- /dev/null +++ b/trustgraph-flow/trustgraph/iam/noauth/service.py @@ -0,0 +1,182 @@ +""" +No-auth IAM service. Drop-in replacement for iam-svc that permits +all access unconditionally. No database, no bootstrap, no signing keys. +""" + +import logging +import uuid + +from trustgraph.schema import Error +from trustgraph.schema import IamRequest, IamResponse +from trustgraph.schema import iam_request_queue, iam_response_queue +from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigValue +from trustgraph.schema import config_request_queue, config_response_queue + +from trustgraph.base import AsyncProcessor, Consumer, Producer +from trustgraph.base import ConsumerMetrics, ProducerMetrics +from trustgraph.base.metrics import SubscriberMetrics +from trustgraph.base.request_response_spec import RequestResponse + +from . handler import NoAuthHandler + +logger = logging.getLogger(__name__) + +default_ident = "no-auth-svc" + +default_iam_request_queue = iam_request_queue +default_iam_response_queue = iam_response_queue + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + iam_req_q = params.get( + "iam_request_queue", default_iam_request_queue, + ) + iam_resp_q = params.get( + "iam_response_queue", default_iam_response_queue, + ) + + default_user_id = params.get("default_user_id", "anonymous") + default_workspace = params.get("default_workspace", "default") + + super().__init__(**params) + + iam_request_metrics = ConsumerMetrics( + processor=self.id, flow=None, name="iam-request", + ) + iam_response_metrics = ProducerMetrics( + processor=self.id, flow=None, name="iam-response", + ) + + self.iam_request_topic = iam_req_q + + self.iam_request_consumer = Consumer( + taskgroup=self.taskgroup, + backend=self.pubsub, + flow=None, + topic=iam_req_q, + subscriber=self.id, + schema=IamRequest, + handler=self.on_iam_request, + metrics=iam_request_metrics, + ) + + self.iam_response_producer = Producer( + backend=self.pubsub, + topic=iam_resp_q, + schema=IamResponse, + metrics=iam_response_metrics, + ) + + self.handler = NoAuthHandler( + default_user_id=default_user_id, + default_workspace=default_workspace, + on_workspace_created=self._ensure_workspace_registered, + ) + + logger.info( + f"No-auth IAM service initialised " + f"(user={default_user_id}, workspace={default_workspace})" + ) + + async def start(self): + await self.pubsub.ensure_topic(self.iam_request_topic) + await self.iam_request_consumer.start() + + def _create_config_client(self): + config_rr_id = str(uuid.uuid4()) + config_req_metrics = ProducerMetrics( + processor=self.id, flow=None, name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor=self.id, flow=None, name="config-response", + ) + return RequestResponse( + backend=self.pubsub, + subscription=f"{self.id}--config--{config_rr_id}", + consumer_name=self.id, + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + + async def _ensure_workspace_registered(self, workspace_id): + client = self._create_config_client() + try: + await client.start() + await client.request( + ConfigRequest( + operation="put", + workspace="__workspaces__", + values=[ConfigValue( + type="workspace", key=workspace_id, + value='{"enabled": true}', + )], + ), + timeout=10, + ) + finally: + await client.stop() + logger.info( + f"Registered workspace in config: {workspace_id}" + ) + + async def on_iam_request(self, msg, consumer, flow): + + id = None + try: + v = msg.value() + id = msg.properties()["id"] + logger.debug( + f"Handling IAM request {id} op={v.operation!r}" + ) + resp = await self.handler.handle(v) + await self.iam_response_producer.send( + resp, properties={"id": id}, + ) + except Exception as e: + logger.error( + f"IAM request failed: {type(e).__name__}: {e}", + exc_info=True, + ) + resp = IamResponse( + error=Error(type="internal-error", message=str(e)), + ) + if id is not None: + await self.iam_response_producer.send( + resp, properties={"id": id}, + ) + + @staticmethod + def add_args(parser): + AsyncProcessor.add_args(parser) + + parser.add_argument( + "--iam-request-queue", + default=default_iam_request_queue, + help=f"IAM request queue (default: {default_iam_request_queue})", + ) + parser.add_argument( + "--iam-response-queue", + default=default_iam_response_queue, + help=f"IAM response queue (default: {default_iam_response_queue})", + ) + parser.add_argument( + "--default-user-id", + default="anonymous", + help="User ID for all requests (default: anonymous)", + ) + parser.add_argument( + "--default-workspace", + default="default", + help="Workspace for all requests (default: default)", + ) + + +def run(): + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 755a1c5d..0335012e 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -287,6 +287,9 @@ class IamService: op = v.operation try: + if op == "authenticate-anonymous": + return _err("auth-failed", "anonymous access not permitted") + if op == "bootstrap": return await self.handle_bootstrap(v) if op == "bootstrap-status": @@ -394,8 +397,8 @@ class IamService: async def auto_bootstrap_if_token_mode(self): """Called from the service processor at startup. In - ``token`` mode, if tables are empty, seeds the default - workspace / admin / signing key using the operator-provided + ``token`` mode, if tables are empty, seeds the admin user, + API key, and signing key using the operator-provided bootstrap token. The admin's API key plaintext is *the* ``bootstrap_token`` — the operator already knows it, nothing needs to be returned or logged. @@ -405,7 +408,7 @@ class IamService: if self.bootstrap_mode != "token": return - if await self.table_store.any_workspace_exists(): + if await self.table_store.any_signing_key_exists(): logger.info( "IAM: token mode, tables already populated; skipping " "auto-bootstrap" @@ -420,22 +423,13 @@ class IamService: async def _seed_tables(self, api_key_plaintext): """Shared seeding logic used by token-mode auto-bootstrap and - bootstrap-mode handle_bootstrap. Creates the default - workspace, admin user, admin API key (using the given - plaintext), and an initial signing key. Returns the admin + bootstrap-mode handle_bootstrap. Creates the admin user, + admin API key (using the given plaintext), and an initial + signing key. The workspace is created separately by the + bootstrapper's WorkspaceInit initialiser. Returns the admin user id.""" now = _now_dt() - await self.table_store.put_workspace( - id=DEFAULT_WORKSPACE, - name="Default", - enabled=True, - created=now, - ) - - if self._on_workspace_created: - await self._on_workspace_created(DEFAULT_WORKSPACE) - admin_user_id = str(uuid.uuid4()) admin_password = secrets.token_urlsafe(32) await self.table_store.put_user( @@ -488,7 +482,7 @@ class IamService: if self.bootstrap_mode != "bootstrap": return _err("auth-failed", "auth failure") - if await self.table_store.any_workspace_exists(): + if await self.table_store.any_signing_key_exists(): return _err("auth-failed", "auth failure") plaintext = _generate_api_key() @@ -528,7 +522,7 @@ class IamService: instead of forcing callers to probe the masked-failure path.""" available = ( self.bootstrap_mode == "bootstrap" - and not await self.table_store.any_workspace_exists() + and not await self.table_store.any_signing_key_exists() ) return IamResponse(bootstrap_available=available) diff --git a/trustgraph-flow/trustgraph/query/ontology/sparql_generator.py b/trustgraph-flow/trustgraph/query/ontology/sparql_generator.py index 44c7e0a1..97fc5f4d 100644 --- a/trustgraph-flow/trustgraph/query/ontology/sparql_generator.py +++ b/trustgraph-flow/trustgraph/query/ontology/sparql_generator.py @@ -202,11 +202,14 @@ ASK {{ if response and isinstance(response, dict): query = response.get('query', '').strip() - if query.upper().startswith(('SELECT', 'ASK', 'CONSTRUCT', 'DESCRIBE')): + parts = query.split() + if parts and parts[0].upper() in ( + 'SELECT', 'ASK', 'CONSTRUCT', 'DESCRIBE', + ): return SPARQLQuery( query=query, variables=self._extract_variables(query), - query_type=query.split()[0].upper(), + query_type=parts[0].upper(), explanation=response.get('explanation', 'Generated by LLM'), complexity_score=self._calculate_complexity(query) ) diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py index 8bf9c8b4..d7bf5e3d 100644 --- a/trustgraph-flow/trustgraph/tables/iam.py +++ b/trustgraph-flow/trustgraph/tables/iam.py @@ -435,3 +435,7 @@ class IamTableStore: async def any_workspace_exists(self): rows = await self.list_workspaces() return bool(rows) + + async def any_signing_key_exists(self): + rows = await self.list_signing_keys() + return bool(rows)