Merge branch 'release/v2.4'

This commit is contained in:
Cyber MacGeddon 2026-05-19 18:01:35 +01:00
commit 668b64742f
24 changed files with 1119 additions and 100 deletions

View file

@ -0,0 +1,186 @@
---
layout: default
title: "No-Auth IAM Regime"
parent: "Tech Specs"
---
# No-Auth IAM Regime
## Overview
A minimal IAM regime that permits all access unconditionally.
Implements the same Pulsar request/response protocol as `iam-svc`
(see [iam-contract.md](iam-contract.md)) so it is a drop-in
replacement: swap `iam-svc` for `no-auth-svc` in the deployment
and the gateway, bootstrapper, and all other components continue
to work without modification.
Intended for development, testing, single-tenant self-hosted
deployments, and evaluation environments where authentication
overhead is unwanted.
## Motivation
The full IAM regime requires Cassandra tables, a bootstrap
sequence, API key management, and signing key rotation. For
many deployments this is unnecessary friction:
- Local development and CI/CD pipelines.
- Single-user or small-team self-hosted instances.
- Evaluation and demo environments.
- Deployments behind an external authentication proxy
(e.g. OAuth2 reverse proxy, VPN-gated access).
Today operators who want no auth must still deploy `iam-svc` and
complete the bootstrap ceremony. A purpose-built no-auth regime
eliminates that requirement entirely.
## Design
### Deployment
Replace `iam-svc` with `no-auth-svc` in the processor group or
container configuration. No other services change. The no-auth
service listens on the standard IAM Pulsar topics:
- Request: `request:<topicspace>:iam`
- Response: `response:<topicspace>:iam`
### Dependencies
None. No database, no config entries, no signing keys, no
bootstrap sequence.
### Operation responses
The service implements the IAM contract
([iam-contract.md](iam-contract.md)) with the following
behaviour for each operation:
| Operation | Behaviour |
|---|---|
| `authenticate-anonymous` | Returns a default identity: `user_id="anonymous"`, `workspace="default"`, `roles=["admin"]`. This is the key operation that distinguishes no-auth from the full regime. |
| `resolve-api-key` | Accepts any token. Returns the same default identity as `authenticate-anonymous`. |
| `authorise` | Always allows. Returns `decision_allow=True`, `decision_ttl_seconds=3600`. |
| `authorise-many` | Always allows all checks. |
| `get-signing-key-public` | Returns an empty string. The gateway skips JWT validation when no key is available. |
| `bootstrap` | No-op. Returns empty admin user/key. |
| `bootstrap-status` | Returns `bootstrap_available=False`. |
| `whoami` | Returns a stub user record for the actor. |
| `login` | Returns empty JWT (not supported under no-auth). |
| `create-user`, `list-users`, `get-user`, `update-user`, `delete-user`, `disable-user`, `enable-user` | Return empty/stub responses. User management is meaningless without auth. |
| `create-workspace`, `list-workspaces`, `get-workspace`, `update-workspace`, `disable-workspace` | Return empty/stub responses. |
| `create-api-key`, `list-api-keys`, `revoke-api-key` | Return empty/stub responses. |
| `change-password`, `reset-password` | No-op. |
| `rotate-signing-key` | No-op. |
| Unknown operation | Returns an error response (same as `iam-svc`). |
### Workspace resolution
When `resolve-api-key` is called, the returned workspace
determines which workspace the request operates against. The
no-auth service defaults to `"default"`.
A configurable `--default-workspace` flag allows operators to
change this without code changes.
### Anonymous authentication
A new `authenticate-anonymous` operation is added to the IAM
protocol. This is a small, backward-compatible addition to the
contract:
**Gateway change** (`auth.py`): when `authenticate()` receives a
request with no `Authorization` header (or an empty bearer
token), instead of immediately returning 401, it sends an
`authenticate-anonymous` request to the IAM service. If the
regime returns a valid identity, the request proceeds. If the
regime returns an error, the gateway returns 401 as before.
**`iam-svc` (full regime)**: returns `auth-failed` for
`authenticate-anonymous`. Behaviour is unchanged — unauthenticated
requests are rejected exactly as they are today.
**`no-auth-svc`**: returns the default identity (`anonymous` /
`default` workspace). No token required.
This keeps the policy decision ("is anonymous access allowed?")
in the IAM regime, not in the gateway. The gateway is a generic
enforcement point that asks and respects the answer.
**Wire format**: uses the existing `IamRequest` / `IamResponse`
schema with `operation="authenticate-anonymous"`. No new fields
required — the response uses `resolved_user_id`,
`resolved_workspace`, and `resolved_roles`, same as
`resolve-api-key`.
Requests that do carry a bearer token follow the existing
`resolve-api-key` / JWT paths unchanged.
## Implementation
### Service structure
The service is a standard `AsyncProcessor` that consumes IAM
requests and produces IAM responses, identical in shape to the
existing `iam-svc` processor:
```
trustgraph-flow/
trustgraph/
iam/
noauth/
__init__.py
__main__.py
service.py # AsyncProcessor wiring
handler.py # Operation dispatch, always-allow logic
```
### Handler
The handler is a single `handle(request) -> response` function
with a dispatch table. Each operation returns a pre-built
`IamResponse` with the appropriate fields set. No database
access, no crypto, no state.
### Configuration
| Flag | Default | Description |
|---|---|---|
| `--default-workspace` | `"default"` | Workspace returned by `resolve-api-key` |
| `--default-user-id` | `"anonymous"` | User ID returned by `resolve-api-key` |
### Entry point
```
tg-no-auth-svc
```
Or via processor group:
```yaml
- class: trustgraph.iam.noauth.Processor
params:
<<: *defaults
id: no-auth-svc
```
## Security considerations
This regime provides **no security whatsoever**. Any caller with
network access to the API gateway has full admin access to all
workspaces.
Operators must ensure that network-level controls (firewall,
VPN, private network) provide adequate protection when deploying
this regime. The regime is explicitly not suitable for multi-
tenant or internet-facing deployments.
## Testing
- Unit: verify each operation returns the expected stub response.
- Integration: deploy `no-auth-svc` in place of `iam-svc`, confirm
the gateway starts, accepts requests with a dummy bearer token,
and routes them to the default workspace.
- E2E: run the standard e2e test suite with `no-auth-svc` to
confirm no regressions.

View file

@ -272,23 +272,22 @@ class TestMetricsIntegration:
class TestPollTimeout:
@pytest.mark.asyncio
async def test_poll_timeout_is_100ms(self):
"""Consumer receive timeout should be 100ms, not the original 2000ms.
async def test_poll_timeout_is_2000ms(self):
"""Consumer receive timeout should be 2000ms.
A 2000ms poll timeout means every service adds up to 2s of idle
blocking between message bursts. With many sequential hops in a
query pipeline, this compounds into seconds of unnecessary latency.
100ms keeps responsiveness high without significant CPU overhead.
receive() is a blocking call that returns immediately when a
message arrives the timeout only governs how often the loop
checks the shutdown flag during idle periods. Lower values
(e.g. 100ms) generate excessive C++ client WARN logging with
no latency benefit.
"""
consumer = _make_consumer()
# Wire up a mock Pulsar consumer that records the receive kwargs
mock_pulsar_consumer = MagicMock()
received_kwargs = {}
def capture_receive(**kwargs):
received_kwargs.update(kwargs)
# Stop after one call
consumer.running = False
raise type('Timeout', (Exception,), {})("timeout")
@ -296,7 +295,7 @@ class TestPollTimeout:
await consumer.consume_from_queue(mock_pulsar_consumer)
assert received_kwargs.get("timeout_millis") == 100
assert received_kwargs.get("timeout_millis") == 2000
# ---------------------------------------------------------------------------

View file

@ -165,22 +165,37 @@ class TestIamAuthDispatch:
by shape of the bearer."""
@pytest.mark.asyncio
async def test_no_authorization_header_raises_401(self):
async def test_no_authorization_header_tries_anonymous(self):
auth = IamAuth(backend=Mock())
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request(None))
async def fake_with_client(op):
raise RuntimeError("auth-failed: anonymous access not permitted")
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request(None))
@pytest.mark.asyncio
async def test_non_bearer_header_raises_401(self):
async def test_non_bearer_header_tries_anonymous(self):
auth = IamAuth(backend=Mock())
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Basic whatever"))
async def fake_with_client(op):
raise RuntimeError("auth-failed: anonymous access not permitted")
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Basic whatever"))
@pytest.mark.asyncio
async def test_empty_bearer_raises_401(self):
async def test_empty_bearer_tries_anonymous(self):
auth = IamAuth(backend=Mock())
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Bearer "))
async def fake_with_client(op):
raise RuntimeError("auth-failed: anonymous access not permitted")
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Bearer "))
@pytest.mark.asyncio
async def test_unknown_format_raises_401(self):
@ -445,3 +460,121 @@ class TestAuthorise:
# Different resource → different cache key → two IAM calls.
assert calls["n"] == 2
# -- Anonymous authentication boundary ------------------------------------
class TestAnonymousAuthBoundary:
"""The gateway must only attempt anonymous auth when no credential
is presented. A malformed token must NOT fall through to the
anonymous path that would let an attacker bypass a broken token
by simply sending garbage."""
@pytest.mark.asyncio
async def test_no_header_attempts_anonymous(self):
auth = IamAuth(backend=Mock())
async def fake_with_client(op):
return await op(Mock(
authenticate_anonymous=AsyncMock(
return_value=("anon", "default", ["reader"]),
)
))
with patch.object(auth, "_with_client", side_effect=fake_with_client):
ident = await auth.authenticate(make_request(None))
assert ident.handle == "anon"
assert ident.source == "anonymous"
@pytest.mark.asyncio
async def test_empty_bearer_attempts_anonymous(self):
auth = IamAuth(backend=Mock())
async def fake_with_client(op):
return await op(Mock(
authenticate_anonymous=AsyncMock(
return_value=("anon", "default", ["reader"]),
)
))
with patch.object(auth, "_with_client", side_effect=fake_with_client):
ident = await auth.authenticate(make_request("Bearer "))
assert ident.handle == "anon"
assert ident.source == "anonymous"
@pytest.mark.asyncio
async def test_malformed_token_does_not_fall_through_to_anonymous(self):
auth = IamAuth(backend=Mock())
called = {"anonymous": False}
original = auth._authenticate_anonymous
async def spy_anonymous():
called["anonymous"] = True
return await original()
auth._authenticate_anonymous = spy_anonymous
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Bearer garbage"))
assert not called["anonymous"]
@pytest.mark.asyncio
async def test_bad_api_key_does_not_fall_through_to_anonymous(self):
auth = IamAuth(backend=Mock())
called = {"anonymous": False}
async def spy_anonymous():
called["anonymous"] = True
auth._authenticate_anonymous = spy_anonymous
async def fake_with_client(op):
raise RuntimeError("auth-failed: unknown key")
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Bearer tg_bad"))
assert not called["anonymous"]
@pytest.mark.asyncio
async def test_bad_jwt_does_not_fall_through_to_anonymous(self):
auth = IamAuth(backend=Mock())
auth._signing_public_pem = "not-a-real-pem"
called = {"anonymous": False}
async def spy_anonymous():
called["anonymous"] = True
auth._authenticate_anonymous = spy_anonymous
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request("Bearer a.b.c"))
assert not called["anonymous"]
@pytest.mark.asyncio
async def test_anonymous_rejected_by_iam_raises_401(self):
auth = IamAuth(backend=Mock())
async def fake_with_client(op):
raise RuntimeError("auth-failed: anonymous access not permitted")
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request(None))
@pytest.mark.asyncio
async def test_anonymous_with_empty_user_id_raises_401(self):
auth = IamAuth(backend=Mock())
async def fake_with_client(op):
return await op(Mock(
authenticate_anonymous=AsyncMock(
return_value=("", "default", []),
)
))
with patch.object(auth, "_with_client", side_effect=fake_with_client):
with pytest.raises(web.HTTPUnauthorized):
await auth.authenticate(make_request(None))

View file

View file

@ -0,0 +1,44 @@
"""
Contract test: the full iam-svc MUST reject authenticate-anonymous.
This is a safety pin if someone accidentally adds anonymous access
to the production IAM handler, this test catches it.
"""
import asyncio
from unittest.mock import Mock, AsyncMock
import pytest
from trustgraph.iam.service.iam import IamService
def _make_request(**kwargs):
req = Mock()
for k, v in kwargs.items():
setattr(req, k, v)
return req
class TestIamRejectsAnonymous:
@pytest.fixture
def handler(self):
svc = object.__new__(IamService)
svc.table_store = Mock(spec=[])
svc.bootstrap_mode = "token"
svc.bootstrap_token = "tok"
svc._on_workspace_created = None
svc._on_workspace_deleted = None
svc._signing_key = None
svc._signing_key_lock = asyncio.Lock()
return svc
@pytest.mark.asyncio
async def test_authenticate_anonymous_returns_auth_failed(self, handler):
resp = await handler.handle(
_make_request(operation="authenticate-anonymous")
)
assert resp.error is not None
assert resp.error.type == "auth-failed"
assert "anonymous" in resp.error.message.lower()

View file

@ -0,0 +1,138 @@
"""
Tests for the no-auth IAM handler.
Verifies that NoAuthHandler returns the expected permissive responses
and that the always-allow authorise path returns the correct shape.
"""
import json
from unittest.mock import Mock
import pytest
from trustgraph.iam.noauth.handler import NoAuthHandler
def _make_request(**kwargs):
req = Mock()
for k, v in kwargs.items():
setattr(req, k, v)
return req
class TestAuthenticateAnonymous:
@pytest.mark.asyncio
async def test_returns_default_identity(self):
h = NoAuthHandler(
default_user_id="anon", default_workspace="ws",
)
resp = await h.handle(
_make_request(operation="authenticate-anonymous")
)
assert resp.error is None
assert resp.resolved_user_id == "anon"
assert resp.resolved_workspace == "ws"
assert "admin" in list(resp.resolved_roles)
@pytest.mark.asyncio
async def test_custom_defaults_propagate(self):
h = NoAuthHandler(
default_user_id="dev-user", default_workspace="dev-ws",
)
resp = await h.handle(
_make_request(operation="authenticate-anonymous")
)
assert resp.resolved_user_id == "dev-user"
assert resp.resolved_workspace == "dev-ws"
class TestResolveApiKey:
@pytest.mark.asyncio
async def test_any_key_resolves_to_default_identity(self):
h = NoAuthHandler()
resp = await h.handle(
_make_request(operation="resolve-api-key", api_key="tg_bogus")
)
assert resp.error is None
assert resp.resolved_user_id == "anonymous"
assert resp.resolved_workspace == "default"
class TestAuthorise:
@pytest.mark.asyncio
async def test_always_allows(self):
h = NoAuthHandler()
resp = await h.handle(
_make_request(
operation="authorise",
user_id="anyone",
capability="anything",
resource_json="{}",
parameters_json="{}",
)
)
assert resp.error is None
assert resp.decision_allow is True
assert resp.decision_ttl_seconds > 0
@pytest.mark.asyncio
async def test_authorise_many_returns_matching_count(self):
h = NoAuthHandler()
checks = [
{"capability": "a", "resource": {}, "parameters": {}},
{"capability": "b", "resource": {}, "parameters": {}},
{"capability": "c", "resource": {}, "parameters": {}},
]
resp = await h.handle(
_make_request(
operation="authorise-many",
user_id="u",
authorise_checks=json.dumps(checks),
)
)
assert resp.error is None
decisions = json.loads(resp.decisions_json)
assert len(decisions) == 3
assert all(d["allow"] is True for d in decisions)
class TestCreateWorkspaceCallback:
@pytest.mark.asyncio
async def test_create_workspace_calls_callback(self):
called_with = []
async def on_created(ws_id):
called_with.append(ws_id)
h = NoAuthHandler(on_workspace_created=on_created)
req = _make_request(operation="create-workspace")
req.workspace_record = Mock()
req.workspace_record.id = "test-ws"
resp = await h.handle(req)
assert resp.error is None
assert called_with == ["test-ws"]
@pytest.mark.asyncio
async def test_create_workspace_without_callback_still_succeeds(self):
h = NoAuthHandler()
req = _make_request(operation="create-workspace")
req.workspace_record = Mock()
req.workspace_record.id = "test-ws"
resp = await h.handle(req)
assert resp.error is None
class TestUnknownOperation:
@pytest.mark.asyncio
async def test_unknown_op_returns_error(self):
h = NoAuthHandler()
resp = await h.handle(
_make_request(operation="not-a-real-op")
)
assert resp.error is not None
assert resp.error.type == "invalid-argument"

View file

@ -62,12 +62,6 @@ class AsyncSocketClient:
if self._connected:
return
if not self.token:
raise ProtocolException(
"AsyncSocketClient requires a token for first-frame "
"auth against /api/v1/socket"
)
ws_url = self._build_ws_url()
self._connect_cm = websockets.connect(
ws_url, ping_interval=20, ping_timeout=self.timeout
@ -79,7 +73,7 @@ class AsyncSocketClient:
# reader task so the response isn't consumed by the reader's
# id-based routing.
await self._socket.send(json.dumps({
"type": "auth", "token": self.token,
"type": "auth", "token": self.token or "",
}))
try:
raw = await asyncio.wait_for(

View file

@ -137,12 +137,6 @@ class SocketClient:
if self._connected:
return
if not self.token:
raise ProtocolException(
"SocketClient requires a token for first-frame auth "
"against /api/v1/socket"
)
ws_url = self._build_ws_url()
self._connect_cm = websockets.connect(
ws_url, ping_interval=20, ping_timeout=self.timeout
@ -153,7 +147,7 @@ class SocketClient:
# auth-ok / auth-failed response isn't consumed by the reader
# loop's id-based routing.
await self._socket.send(json.dumps({
"type": "auth", "token": self.token,
"type": "auth", "token": self.token or "",
}))
try:
raw = await asyncio.wait_for(

View file

@ -76,8 +76,10 @@ class Consumer:
if hasattr(self, "consumer"):
if self.consumer:
self.consumer.unsubscribe()
self.consumer.close()
try:
self.consumer.close()
except Exception:
pass
self.consumer = None
async def stop(self):
@ -157,12 +159,14 @@ class Consumer:
except Exception as e:
logger.error(f"Consumer loop exception: {e}", exc_info=True)
for c in consumers:
for i, c in enumerate(consumers):
try:
c.unsubscribe()
c.close()
except Exception:
pass
except Exception as ce:
logger.warning(
f"Consumer {i} close failed (error path): "
f"{type(ce).__name__}: {ce}"
)
for ex in executors:
ex.shutdown(wait=False)
consumers = []
@ -171,12 +175,14 @@ class Consumer:
continue
finally:
for c in consumers:
for i, c in enumerate(consumers):
try:
c.unsubscribe()
c.close()
except Exception:
pass
except Exception as ce:
logger.warning(
f"Consumer {i} close failed: "
f"{type(ce).__name__}: {ce}"
)
for ex in executors:
ex.shutdown(wait=False)
@ -188,7 +194,7 @@ class Consumer:
try:
msg = await loop.run_in_executor(
executor,
lambda: consumer.receive(timeout_millis=100),
lambda: consumer.receive(timeout_millis=2000),
)
except Exception as e:
# Handle timeout from any backend

View file

@ -62,6 +62,22 @@ class IamClient(RequestResponse):
)
return resp.user
async def authenticate_anonymous(self, timeout=IAM_TIMEOUT):
"""Request anonymous access from the IAM regime.
Returns ``(user_id, workspace, roles)`` if the regime permits
anonymous access, or raises ``RuntimeError`` with error type
``auth-failed`` if it does not."""
resp = await self._request(
operation="authenticate-anonymous",
timeout=timeout,
)
return (
resp.resolved_user_id,
resp.resolved_workspace,
list(resp.resolved_roles),
)
async def resolve_api_key(self, api_key, timeout=IAM_TIMEOUT):
"""Resolve a plaintext API key to its identity triple.

View file

@ -10,6 +10,7 @@ logger = logging.getLogger(__name__)
# Default connection settings from environment
DEFAULT_PULSAR_HOST = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
DEFAULT_PULSAR_API_KEY = os.getenv("PULSAR_API_KEY", None)
DEFAULT_PULSAR_ADMIN_URL = os.getenv("PULSAR_ADMIN_URL", 'http://pulsar:8080')
DEFAULT_RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", 'rabbitmq')
DEFAULT_RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", '5672'))
@ -43,6 +44,7 @@ def get_pubsub(**config: Any) -> Any:
host=config.get('pulsar_host', DEFAULT_PULSAR_HOST),
api_key=config.get('pulsar_api_key', DEFAULT_PULSAR_API_KEY),
listener=config.get('pulsar_listener'),
admin_url=config.get('pulsar_admin_url', DEFAULT_PULSAR_ADMIN_URL),
)
elif backend_type == 'rabbitmq':
from .rabbitmq_backend import RabbitMQBackend
@ -77,6 +79,7 @@ def get_pubsub(**config: Any) -> Any:
STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650'
STANDALONE_PULSAR_ADMIN_URL = 'http://localhost:8080'
def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
@ -88,6 +91,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
that run outside containers)
"""
pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST
pulsar_admin_url = STANDALONE_PULSAR_ADMIN_URL if standalone else DEFAULT_PULSAR_ADMIN_URL
pulsar_listener = 'localhost' if standalone else None
rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST
kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP
@ -105,6 +109,12 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
help=f'Pulsar host (default: {pulsar_host})',
)
parser.add_argument(
'--pulsar-admin-url',
default=pulsar_admin_url,
help=f'Pulsar admin REST API URL (default: {pulsar_admin_url})',
)
parser.add_argument(
'--pulsar-api-key',
default=DEFAULT_PULSAR_API_KEY,

View file

@ -7,8 +7,12 @@ handling topic mapping, serialization, and Pulsar client management.
import pulsar
import _pulsar
import asyncio
import json
import logging
import urllib.request
import urllib.error
import urllib.parse
from typing import Any
from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message
@ -117,7 +121,10 @@ class PulsarBackend:
producers and consumers.
"""
def __init__(self, host: str, api_key: str = None, listener: str = None):
def __init__(
self, host: str, api_key: str = None, listener: str = None,
admin_url: str = None,
):
"""
Initialize Pulsar backend.
@ -125,10 +132,12 @@ class PulsarBackend:
host: Pulsar broker URL (e.g., pulsar://localhost:6650)
api_key: Optional API key for authentication
listener: Optional listener name for multi-homed setups
admin_url: Pulsar admin REST API URL (e.g., http://pulsar:8080)
"""
self.host = host
self.api_key = api_key
self.listener = listener
self.admin_url = admin_url
# Create Pulsar client
client_args = {'service_url': host}
@ -139,6 +148,10 @@ class PulsarBackend:
if api_key:
client_args['authentication'] = pulsar.AuthenticationToken(api_key)
client_args['logger'] = pulsar.ConsoleLogger(
_pulsar.LoggerLevel.Error
)
self.client = pulsar.Client(**client_args)
logger.info(f"Pulsar client connected to {host}")
@ -266,24 +279,129 @@ class PulsarBackend:
return PulsarBackendConsumer(pulsar_consumer, schema)
def _admin_api_path(self, pulsar_uri: str) -> str:
"""
Convert a Pulsar topic URI to an admin REST API path.
persistent://tg/flow/triples-store:default:explain-flow
-> /admin/v2/persistent/tg/flow/triples-store%3Adefault%3Aexplain-flow
"""
scheme, rest = pulsar_uri.split('://', 1)
tenant, namespace, topic = rest.split('/', 2)
encoded_topic = urllib.parse.quote(topic, safe='')
return f"/admin/v2/{scheme}/{tenant}/{namespace}/{encoded_topic}"
def _admin_request(self, method, path):
"""
Make a synchronous admin REST API request.
Returns parsed JSON for GET, None for DELETE/PUT.
Raises urllib.error.HTTPError for non-404 errors.
404 is treated as success (idempotent deletion).
"""
url = f"{self.admin_url}{path}"
req = urllib.request.Request(url, method=method)
try:
with urllib.request.urlopen(req) as resp:
if method == 'GET':
return json.loads(resp.read().decode('utf-8'))
return None
except urllib.error.HTTPError as e:
if e.code == 404:
return None
raise
def _delete_topic_sync(self, topic: str):
"""
Delete a persistent topic and all its subscriptions.
Subscriptions must be removed first Pulsar rejects topic
deletion while subscriptions exist. Force-deletes each
subscription to disconnect any lingering consumers.
"""
pulsar_uri = self.map_topic(topic)
if pulsar_uri.startswith('non-persistent://'):
return
api_path = self._admin_api_path(pulsar_uri)
try:
subs = self._admin_request('GET', f"{api_path}/subscriptions")
except Exception as e:
logger.warning(f"Failed to list subscriptions for {topic}: {e}")
return
if subs:
for sub in subs:
encoded_sub = urllib.parse.quote(sub, safe='')
try:
self._admin_request(
'DELETE',
f"{api_path}/subscription/{encoded_sub}"
f"?force=true"
)
logger.info(
f"Deleted subscription {sub} from {topic}"
)
except Exception as e:
logger.warning(
f"Failed to delete subscription {sub} "
f"from {topic}: {e}"
)
try:
self._admin_request('DELETE', api_path)
logger.info(f"Deleted topic: {topic}")
except Exception as e:
logger.warning(f"Failed to delete topic {topic}: {e}")
def _topic_exists_sync(self, topic: str) -> bool:
"""Check topic existence via admin API."""
pulsar_uri = self.map_topic(topic)
if pulsar_uri.startswith('non-persistent://'):
return False
api_path = self._admin_api_path(pulsar_uri)
try:
result = self._admin_request('GET', f"{api_path}/stats")
return result is not None
except Exception:
return False
async def create_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit persistent topic creation."""
"""No-op — Pulsar auto-creates topics on first use."""
pass
async def delete_topic(self, topic: str) -> None:
"""No-op — to be replaced with admin REST API calls.
TODO: Delete persistent topic via admin API."""
pass
"""
Delete a persistent topic and all its subscriptions via
the admin REST API.
Called by the flow controller during deliberate flow deletion.
Non-persistent topics are skipped. Idempotent.
"""
if not self.admin_url:
logger.warning(
f"Cannot delete topic {topic}: "
f"no admin URL configured"
)
return
await asyncio.to_thread(self._delete_topic_sync, topic)
async def topic_exists(self, topic: str) -> bool:
"""Returns True — Pulsar auto-creates on subscribe.
TODO: Use admin REST API for actual existence check."""
return True
"""Check whether a persistent topic exists via the admin API."""
if not self.admin_url:
return True
return await asyncio.to_thread(self._topic_exists_sync, topic)
async def ensure_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit creation."""
"""No-op — Pulsar auto-creates topics on first use."""
pass
def close(self) -> None:

View file

@ -64,6 +64,7 @@ bootstrap = "trustgraph.bootstrap.bootstrapper:run"
config-svc = "trustgraph.config.service:run"
flow-svc = "trustgraph.flow.service:run"
iam-svc = "trustgraph.iam.service:run"
no-auth-svc = "trustgraph.iam.noauth:run"
doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run"
doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run"
doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run"

View file

@ -326,6 +326,58 @@ class Processor(AsyncProcessor):
# Main loop.
# ------------------------------------------------------------------
async def _run_pre_service(self):
"""Run pre-service initialisers before opening pub/sub clients.
These bring up infrastructure that other services depend on
(e.g. Pulsar tenant/namespaces). They use out-of-band APIs
(HTTP admin), not pub/sub, so they don't need a config client.
They run without flag tracking they must be idempotent.
"""
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
if not pre_specs:
return
for spec in pre_specs:
child_logger = logger.getChild(spec.name)
child_ctx = InitContext(
logger=child_logger,
config=None,
make_flow_client=self._make_flow_client,
make_iam_client=self._make_iam_client,
)
child_logger.info(f"Running pre-service initialiser")
try:
await spec.instance.run(child_ctx, None, spec.flag)
child_logger.info(f"Pre-service initialiser completed")
except Exception as e:
child_logger.error(
f"Pre-service initialiser failed: "
f"{type(e).__name__}: {e}",
exc_info=True,
)
raise
async def start(self):
# Run pre-service initialisers before opening any pub/sub
# connections. They bring up infrastructure (Pulsar
# namespaces, etc.) that super().start() depends on.
while self.running:
try:
await self._run_pre_service()
break
except Exception as e:
logger.info(
f"Pre-service initialisation failed "
f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s"
)
await asyncio.sleep(GATE_BACKOFF)
await super().start()
async def run(self):
logger.info(
@ -347,29 +399,18 @@ class Processor(AsyncProcessor):
continue
try:
# Phase 1: pre-service initialisers run unconditionally.
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
pre_results = {}
for spec in pre_specs:
pre_results[spec.name] = await self._run_spec(
spec, config,
)
# Phase 2: gate.
# Phase 1: gate.
gate_ok = await self._gate_ready(config)
# Phase 3: post-service initialisers, if gate passed.
post_results = {}
# Phase 2: post-service initialisers, if gate passed.
results = {}
if gate_ok:
post_specs = [
s for s in self.specs
if s.instance.wait_for_services
]
for spec in post_specs:
post_results[spec.name] = await self._run_spec(
results[spec.name] = await self._run_spec(
spec, config,
)
@ -377,8 +418,7 @@ class Processor(AsyncProcessor):
if not gate_ok:
sleep_for = GATE_BACKOFF
else:
all_results = {**pre_results, **post_results}
if any(r != "skip" for r in all_results.values()):
if any(r != "skip" for r in results.values()):
sleep_for = INIT_RETRY
else:
sleep_for = STEADY_INTERVAL

View file

@ -112,6 +112,10 @@ class PulsarTopology(Initialiser):
def _reconcile_sync(self, logger):
if not self._tenant_exists():
clusters = self._get_clusters()
if not clusters:
raise RuntimeError(
"Pulsar cluster list is empty — broker not ready yet"
)
logger.info(
f"Creating tenant {self.tenant!r} with clusters {clusters}"
)

View file

@ -233,10 +233,10 @@ class IamAuth:
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
raise _auth_failure()
return await self._authenticate_anonymous()
token = header[len("Bearer "):].strip()
if not token:
raise _auth_failure()
return await self._authenticate_anonymous()
# API keys always start with "tg_". JWTs have two dots and
# no "tg_" prefix. Discriminate cheaply.
@ -266,6 +266,26 @@ class IamAuth:
handle=sub, workspace=ws, principal_id=sub, source="jwt",
)
async def _authenticate_anonymous(self):
try:
async def _call(client):
return await client.authenticate_anonymous()
user_id, workspace, _roles = await self._with_client(_call)
except Exception as e:
logger.debug(
f"Anonymous authentication rejected: "
f"{type(e).__name__}: {e}"
)
raise _auth_failure()
if not user_id or not workspace:
raise _auth_failure()
return Identity(
handle=user_id, workspace=workspace,
principal_id=user_id, source="anonymous",
)
async def _resolve_api_key(self, plaintext):
h = hashlib.sha256(plaintext.encode("utf-8")).hexdigest()

View file

@ -57,16 +57,13 @@ class Mux:
(important for browsers, which treat a handshake-time 401
as terminal)."""
token = data.get("token", "")
if not token:
await self.ws.send_json({
"type": "auth-failed",
"error": "auth failure",
})
return
class _Shim:
def __init__(self, tok):
self.headers = {"Authorization": f"Bearer {tok}"}
self.headers = (
{"Authorization": f"Bearer {tok}"} if tok
else {}
)
try:
identity = await self.auth.authenticate(_Shim(token))

View file

@ -0,0 +1 @@
from . service import *

View file

@ -0,0 +1,4 @@
from . service import run
run()

View file

@ -0,0 +1,131 @@
"""
No-auth IAM handler. Implements the IAM contract with every operation
returning a permissive or stub response. No database, no crypto,
no state.
"""
import json
import logging
from trustgraph.schema import IamResponse, Error, UserRecord
logger = logging.getLogger(__name__)
def _err(type, message):
return IamResponse(error=Error(type=type, message=message))
class NoAuthHandler:
def __init__(self, default_user_id="anonymous",
default_workspace="default",
on_workspace_created=None):
self.default_user_id = default_user_id
self.default_workspace = default_workspace
self._on_workspace_created = on_workspace_created
def _default_identity_response(self):
return IamResponse(
resolved_user_id=self.default_user_id,
resolved_workspace=self.default_workspace,
resolved_roles=["admin"],
)
def _default_user_record(self):
return UserRecord(
id=self.default_user_id,
workspace=self.default_workspace,
username=self.default_user_id,
name="Anonymous User",
roles=["admin"],
enabled=True,
)
async def handle(self, v):
op = v.operation
try:
if op == "authenticate-anonymous":
return self._default_identity_response()
if op == "resolve-api-key":
return self._default_identity_response()
if op == "authorise":
return IamResponse(
decision_allow=True,
decision_ttl_seconds=3600,
)
if op == "authorise-many":
checks = json.loads(v.authorise_checks or "[]")
decisions = [
{"allow": True, "ttl": 3600}
for _ in checks
]
return IamResponse(
decisions_json=json.dumps(decisions),
)
if op == "get-signing-key-public":
return IamResponse(signing_key_public="")
if op == "bootstrap":
return IamResponse()
if op == "bootstrap-status":
return IamResponse(bootstrap_available=False)
if op == "whoami":
return IamResponse(user=self._default_user_record())
if op == "login":
return IamResponse()
if op in (
"create-user", "get-user", "update-user",
"disable-user", "enable-user",
):
return IamResponse(user=self._default_user_record())
if op == "list-users":
return IamResponse(users=[self._default_user_record()])
if op == "delete-user":
return IamResponse()
if op == "create-workspace":
if self._on_workspace_created and v.workspace_record:
await self._on_workspace_created(v.workspace_record.id)
return IamResponse()
if op in (
"get-workspace", "update-workspace",
"disable-workspace",
):
return IamResponse()
if op == "list-workspaces":
return IamResponse()
if op in ("create-api-key", "list-api-keys", "revoke-api-key"):
return IamResponse()
if op in ("change-password", "reset-password"):
return IamResponse()
if op == "rotate-signing-key":
return IamResponse()
return _err(
"invalid-argument",
f"unknown operation: {op!r}",
)
except Exception as e:
logger.error(
f"no-auth {op} failed: {type(e).__name__}: {e}",
exc_info=True,
)
return _err("internal-error", str(e))

View file

@ -0,0 +1,182 @@
"""
No-auth IAM service. Drop-in replacement for iam-svc that permits
all access unconditionally. No database, no bootstrap, no signing keys.
"""
import logging
import uuid
from trustgraph.schema import Error
from trustgraph.schema import IamRequest, IamResponse
from trustgraph.schema import iam_request_queue, iam_response_queue
from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigValue
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base import ConsumerMetrics, ProducerMetrics
from trustgraph.base.metrics import SubscriberMetrics
from trustgraph.base.request_response_spec import RequestResponse
from . handler import NoAuthHandler
logger = logging.getLogger(__name__)
default_ident = "no-auth-svc"
default_iam_request_queue = iam_request_queue
default_iam_response_queue = iam_response_queue
class Processor(AsyncProcessor):
def __init__(self, **params):
iam_req_q = params.get(
"iam_request_queue", default_iam_request_queue,
)
iam_resp_q = params.get(
"iam_response_queue", default_iam_response_queue,
)
default_user_id = params.get("default_user_id", "anonymous")
default_workspace = params.get("default_workspace", "default")
super().__init__(**params)
iam_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="iam-request",
)
iam_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="iam-response",
)
self.iam_request_topic = iam_req_q
self.iam_request_consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=iam_req_q,
subscriber=self.id,
schema=IamRequest,
handler=self.on_iam_request,
metrics=iam_request_metrics,
)
self.iam_response_producer = Producer(
backend=self.pubsub,
topic=iam_resp_q,
schema=IamResponse,
metrics=iam_response_metrics,
)
self.handler = NoAuthHandler(
default_user_id=default_user_id,
default_workspace=default_workspace,
on_workspace_created=self._ensure_workspace_registered,
)
logger.info(
f"No-auth IAM service initialised "
f"(user={default_user_id}, workspace={default_workspace})"
)
async def start(self):
await self.pubsub.ensure_topic(self.iam_request_topic)
await self.iam_request_consumer.start()
def _create_config_client(self):
config_rr_id = str(uuid.uuid4())
config_req_metrics = ProducerMetrics(
processor=self.id, flow=None, name="config-request",
)
config_resp_metrics = SubscriberMetrics(
processor=self.id, flow=None, name="config-response",
)
return RequestResponse(
backend=self.pubsub,
subscription=f"{self.id}--config--{config_rr_id}",
consumer_name=self.id,
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=config_req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=config_resp_metrics,
)
async def _ensure_workspace_registered(self, workspace_id):
client = self._create_config_client()
try:
await client.start()
await client.request(
ConfigRequest(
operation="put",
workspace="__workspaces__",
values=[ConfigValue(
type="workspace", key=workspace_id,
value='{"enabled": true}',
)],
),
timeout=10,
)
finally:
await client.stop()
logger.info(
f"Registered workspace in config: {workspace_id}"
)
async def on_iam_request(self, msg, consumer, flow):
id = None
try:
v = msg.value()
id = msg.properties()["id"]
logger.debug(
f"Handling IAM request {id} op={v.operation!r}"
)
resp = await self.handler.handle(v)
await self.iam_response_producer.send(
resp, properties={"id": id},
)
except Exception as e:
logger.error(
f"IAM request failed: {type(e).__name__}: {e}",
exc_info=True,
)
resp = IamResponse(
error=Error(type="internal-error", message=str(e)),
)
if id is not None:
await self.iam_response_producer.send(
resp, properties={"id": id},
)
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
parser.add_argument(
"--iam-request-queue",
default=default_iam_request_queue,
help=f"IAM request queue (default: {default_iam_request_queue})",
)
parser.add_argument(
"--iam-response-queue",
default=default_iam_response_queue,
help=f"IAM response queue (default: {default_iam_response_queue})",
)
parser.add_argument(
"--default-user-id",
default="anonymous",
help="User ID for all requests (default: anonymous)",
)
parser.add_argument(
"--default-workspace",
default="default",
help="Workspace for all requests (default: default)",
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -287,6 +287,9 @@ class IamService:
op = v.operation
try:
if op == "authenticate-anonymous":
return _err("auth-failed", "anonymous access not permitted")
if op == "bootstrap":
return await self.handle_bootstrap(v)
if op == "bootstrap-status":
@ -394,8 +397,8 @@ class IamService:
async def auto_bootstrap_if_token_mode(self):
"""Called from the service processor at startup. In
``token`` mode, if tables are empty, seeds the default
workspace / admin / signing key using the operator-provided
``token`` mode, if tables are empty, seeds the admin user,
API key, and signing key using the operator-provided
bootstrap token. The admin's API key plaintext is *the*
``bootstrap_token`` the operator already knows it, nothing
needs to be returned or logged.
@ -405,7 +408,7 @@ class IamService:
if self.bootstrap_mode != "token":
return
if await self.table_store.any_workspace_exists():
if await self.table_store.any_signing_key_exists():
logger.info(
"IAM: token mode, tables already populated; skipping "
"auto-bootstrap"
@ -420,22 +423,13 @@ class IamService:
async def _seed_tables(self, api_key_plaintext):
"""Shared seeding logic used by token-mode auto-bootstrap and
bootstrap-mode handle_bootstrap. Creates the default
workspace, admin user, admin API key (using the given
plaintext), and an initial signing key. Returns the admin
bootstrap-mode handle_bootstrap. Creates the admin user,
admin API key (using the given plaintext), and an initial
signing key. The workspace is created separately by the
bootstrapper's WorkspaceInit initialiser. Returns the admin
user id."""
now = _now_dt()
await self.table_store.put_workspace(
id=DEFAULT_WORKSPACE,
name="Default",
enabled=True,
created=now,
)
if self._on_workspace_created:
await self._on_workspace_created(DEFAULT_WORKSPACE)
admin_user_id = str(uuid.uuid4())
admin_password = secrets.token_urlsafe(32)
await self.table_store.put_user(
@ -488,7 +482,7 @@ class IamService:
if self.bootstrap_mode != "bootstrap":
return _err("auth-failed", "auth failure")
if await self.table_store.any_workspace_exists():
if await self.table_store.any_signing_key_exists():
return _err("auth-failed", "auth failure")
plaintext = _generate_api_key()
@ -528,7 +522,7 @@ class IamService:
instead of forcing callers to probe the masked-failure path."""
available = (
self.bootstrap_mode == "bootstrap"
and not await self.table_store.any_workspace_exists()
and not await self.table_store.any_signing_key_exists()
)
return IamResponse(bootstrap_available=available)

View file

@ -202,11 +202,14 @@ ASK {{
if response and isinstance(response, dict):
query = response.get('query', '').strip()
if query.upper().startswith(('SELECT', 'ASK', 'CONSTRUCT', 'DESCRIBE')):
parts = query.split()
if parts and parts[0].upper() in (
'SELECT', 'ASK', 'CONSTRUCT', 'DESCRIBE',
):
return SPARQLQuery(
query=query,
variables=self._extract_variables(query),
query_type=query.split()[0].upper(),
query_type=parts[0].upper(),
explanation=response.get('explanation', 'Generated by LLM'),
complexity_score=self._calculate_complexity(query)
)

View file

@ -435,3 +435,7 @@ class IamTableStore:
async def any_workspace_exists(self):
rows = await self.list_workspaces()
return bool(rows)
async def any_signing_key_exists(self):
rows = await self.list_signing_keys()
return bool(rows)