diff --git a/dev-tools/library_client.py b/dev-tools/library_client.py new file mode 100644 index 00000000..ae9d6857 --- /dev/null +++ b/dev-tools/library_client.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +""" +Client utility for browsing and loading documents from the TrustGraph +public document library. + +Usage: + python library_client.py list + python library_client.py search + python library_client.py load-all + python library_client.py load-doc + python library_client.py load-match +""" + +import json +import urllib.request +import sys +import os +import argparse + +from trustgraph.api import Api +from trustgraph.api.types import Uri, Literal, Triple + +BUCKET_URL = "https://storage.googleapis.com/trustgraph-library" +INDEX_URL = f"{BUCKET_URL}/index.json" + +default_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +default_user = "trustgraph" +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + + +def fetch_index(): + with urllib.request.urlopen(INDEX_URL) as resp: + return json.loads(resp.read()) + + +def fetch_document_metadata(doc_id): + url = f"{BUCKET_URL}/{doc_id}.json" + with urllib.request.urlopen(url) as resp: + return json.loads(resp.read()) + + +def fetch_document_content(doc_id): + url = f"{BUCKET_URL}/{doc_id}.epub" + with urllib.request.urlopen(url) as resp: + return resp.read() + + +def search_index(index, query): + query = query.lower() + results = [] + for doc in index: + title = doc.get("title", "").lower() + comments = doc.get("comments", "").lower() + tags = [t.lower() for t in doc.get("tags", [])] + if (query in title or query in comments or + any(query in t for t in tags)): + results.append(doc) + return results + + +def print_index(index): + if not index: + return + + # Calculate column widths + id_width = max(len(str(doc.get("id", ""))) for doc in index) + title_width = max(len(doc.get("title", "")) for doc in index) + + # Cap title width for readability + title_width = min(title_width, 60) + id_width = max(id_width, 2) + + try: + term_width = os.get_terminal_size().columns + except OSError: + term_width = 120 + + tags_width = max(term_width - id_width - title_width - 6, 20) + + header = f"{'ID':<{id_width}} {'Title':<{title_width}} {'Tags':<{tags_width}}" + print(header) + print("-" * len(header)) + + for doc in index: + eid = str(doc.get("id", "")) + title = doc.get("title", "") + if len(title) > title_width: + title = title[:title_width - 3] + "..." + tags = ", ".join(doc.get("tags", [])) + if len(tags) > tags_width: + tags = tags[:tags_width - 3] + "..." + print(f"{eid:<{id_width}} {title:<{title_width}} {tags}") + + +def convert_value(v): + """Convert a JSON triple value to a Uri or Literal.""" + if v["type"] == "uri": + return Uri(v["value"]) + else: + return Literal(v["value"]) + + +def convert_metadata(metadata_json): + """Convert JSON metadata triples to Triple objects.""" + triples = [] + for t in metadata_json: + triples.append(Triple( + s=convert_value(t["s"]), + p=convert_value(t["p"]), + o=convert_value(t["o"]), + )) + return triples + + +def load_document(api, user, doc_entry): + """Fetch metadata and content for a document, then load into TrustGraph.""" + doc_id = doc_entry["id"] + title = doc_entry["title"] + + print(f" [{doc_id}] {title}") + + print(f" fetching metadata...") + doc_json = fetch_document_metadata(doc_id) + doc = doc_json[0] + + print(f" fetching content...") + content = fetch_document_content(doc_id) + + print(f" loading into TrustGraph ({len(content) // 1024}KB)...") + metadata = convert_metadata(doc["metadata"]) + + api.add_document( + id=doc["id"], + metadata=metadata, + user=user, + kind=doc["kind"], + title=doc["title"], + comments=doc["comments"], + tags=doc["tags"], + document=content, + ) + + print(f" done.") + + +def load_documents(api, user, docs): + """Load a list of documents.""" + print(f"Loading {len(docs)} document(s)...\n") + for doc in docs: + try: + load_document(api, user, doc) + except Exception as e: + print(f" FAILED: {e}", file=sys.stderr) + print() + print("Complete.") + + +def main(): + parser = argparse.ArgumentParser( + description="Browse and load documents from the TrustGraph public document library.", + ) + + parser.add_argument( + "-u", "--url", default=default_url, + help=f"TrustGraph API URL (default: {default_url})", + ) + parser.add_argument( + "-U", "--user", default=default_user, + help=f"User ID (default: {default_user})", + ) + parser.add_argument( + "-t", "--token", default=default_token, + help="Authentication token (default: $TRUSTGRAPH_TOKEN)", + ) + + sub = parser.add_subparsers(dest="command") + + sub.add_parser("list", help="List all documents") + + search_parser = sub.add_parser("search", help="Search documents") + search_parser.add_argument("query", help="Text to search for") + + sub.add_parser("load-all", help="Load all documents into TrustGraph") + + load_doc_parser = sub.add_parser("load-doc", help="Load a document by ID") + load_doc_parser.add_argument("id", help="Document ID (ebook number)") + + load_match_parser = sub.add_parser( + "load-match", help="Load all documents matching a search term", + ) + load_match_parser.add_argument("query", help="Text to search for") + + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(1) + + index = fetch_index() + + if args.command in ("list", "search"): + if args.command == "list": + print_index(index) + else: + results = search_index(index, args.query) + if results: + print_index(results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + return + + # Load commands need the API + api = Api(args.url, token=args.token).library() + + if args.command == "load-all": + load_documents(api, args.user, index) + + elif args.command == "load-doc": + matches = [d for d in index if str(d.get("id")) == args.id] + if not matches: + print(f"No document with ID '{args.id}' found.", file=sys.stderr) + sys.exit(1) + load_documents(api, args.user, matches) + + elif args.command == "load-match": + results = search_index(index, args.query) + if results: + load_documents(api, args.user, results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/docs/tech-specs/config-push-poke.md b/docs/tech-specs/config-push-poke.md new file mode 100644 index 00000000..4273e46d --- /dev/null +++ b/docs/tech-specs/config-push-poke.md @@ -0,0 +1,282 @@ +# Config Push "Notify" Pattern Technical Specification + +## Overview + +Replace the current config push mechanism — which broadcasts the full config +blob on a `state` class queue — with a lightweight "notify" notification +containing only the version number and affected types. Processors that care +about those types fetch the full config via the existing request/response +interface. + +This solves the RabbitMQ late-subscriber problem: when a process restarts, +its fresh queue has no historical messages, so it never receives the current +config state. With the notify pattern, the push queue is only a signal — the +source of truth is the config service's request/response API, which is +always available. + +## Problem + +On Pulsar, `state` class queues are persistent topics. A new subscriber +with `InitialPosition.Earliest` reads from message 0 and receives the +last config push. On RabbitMQ, each subscriber gets a fresh per-subscriber +queue (named with a new UUID). Messages published before the queue existed +are gone. A restarting processor never gets the current config. + +## Design + +### The Notify Message + +The `ConfigPush` schema changes from carrying the full config to carrying +just a version number and the list of affected config types: + +```python +@dataclass +class ConfigPush: + version: int = 0 + types: list[str] = field(default_factory=list) +``` + +When the config service handles a `put` or `delete`, it knows which types +were affected (from the request's `values[].type` or `keys[].type`). It +includes those in the notify. On startup, the config service sends a notify +with an empty types list (meaning "everything"). + +### Subscribe-then-Fetch Startup (No Race Condition) + +The critical ordering to avoid missing an update: + +1. **Subscribe** to the config push queue. Buffer incoming notify messages. +2. **Fetch** the full config via request/response (`operation: "config"`). + This returns the config dict and a version number. +3. **Apply** the fetched config to all registered handlers. +4. **Process** buffered notifys. For any notify with `version > fetched_version`, + re-fetch and re-apply. Discard notifys with `version <= fetched_version`. +5. **Enter steady state**. Process future notifys as they arrive. + +This is safe because: +- If an update happens before the subscription, the fetch picks it up. +- If an update happens between subscribe and fetch, it's in the buffer. +- If an update happens after the fetch, it arrives on the queue normally. +- Version comparison ensures no duplicate processing. + +### Processor API + +The current API requires processors to understand the full config dict +structure. The new API should be cleaner — processors declare which config +types they care about and provide a handler that receives only the relevant +config subset. + +#### Current API + +```python +# In processor __init__: +self.register_config_handler(self.on_configure_flows) + +# Handler receives the entire config dict: +async def on_configure_flows(self, config, version): + if "active-flow" not in config: + return + if self.id in config["active-flow"]: + flow_config = json.loads(config["active-flow"][self.id]) + # ... +``` + +#### New API + +```python +# In processor __init__: +self.register_config_handler( + handler=self.on_configure_flows, + types=["active-flow"], +) + +# Handler receives only the relevant config subset, same signature: +async def on_configure_flows(self, config, version): + # config still contains the full dict, but handler is only called + # when "active-flow" type changes (or on startup) + if "active-flow" not in config: + return + # ... +``` + +The `types` parameter is optional. If omitted, the handler is called for +every config change (backward compatible). If specified, the handler is +only invoked when the notify's `types` list intersects with the handler's +types, or on startup (empty types list = everything). + +#### Internal Registration Structure + +```python +# In AsyncProcessor: +def register_config_handler(self, handler, types=None): + self.config_handlers.append({ + "handler": handler, + "types": set(types) if types else None, # None = all types + }) +``` + +#### Notify Processing Logic + +```python +async def on_config_notify(self, message, consumer, flow): + notify_version = message.value().version + notify_types = set(message.value().types) + + # Skip if we already have this version or newer + if notify_version <= self.config_version: + return + + # Fetch full config from config service + config, version = await self.config_client.config() + self.config_version = version + + # Determine which handlers to invoke + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None: + # Handler cares about everything + await entry["handler"](config, version) + elif not notify_types or notify_types & handler_types: + # notify_types empty = startup (invoke all), + # or intersection with handler's types + await entry["handler"](config, version) +``` + +### Config Service Changes + +#### Push Method + +The `push()` method changes to send only version + types: + +```python +async def push(self, types=None): + version = await self.config.get_version() + resp = ConfigPush( + version=version, + types=types or [], + ) + await self.config_push_producer.send(resp) +``` + +#### Put/Delete Handlers + +Extract affected types and pass to push: + +```python +async def handle_put(self, v): + types = list(set(k.type for k in v.values)) + for k in v.values: + await self.table_store.put_config(k.type, k.key, k.value) + await self.inc_version() + await self.push(types=types) + +async def handle_delete(self, v): + types = list(set(k.type for k in v.keys)) + for k in v.keys: + await self.table_store.delete_key(k.type, k.key) + await self.inc_version() + await self.push(types=types) +``` + +#### Queue Class Change + +The config push queue changes from `state` class to `flow` class. The push +is now a transient signal — the source of truth is the config service's +request/response API, not the queue. `flow` class is persistent (survives +broker restarts) but doesn't require last-message retention, which was the +root cause of the RabbitMQ problem. + +```python +config_push_queue = queue('config', cls='flow') # was cls='state' +``` + +#### Startup Push + +On startup, the config service sends a notify with empty types list +(signalling "everything changed"): + +```python +async def start(self): + await self.push(types=[]) # Empty = all types + await self.config_request_consumer.start() +``` + +### AsyncProcessor Changes + +The `AsyncProcessor` needs a config request/response client alongside the +push consumer. The startup sequence becomes: + +```python +async def start(self): + # 1. Start the push consumer (begins buffering notifys) + await self.config_sub_task.start() + + # 2. Fetch current config via request/response + config, version = await self.config_client.config() + self.config_version = version + + # 3. Apply to all handlers (startup = all handlers invoked) + for entry in self.config_handlers: + await entry["handler"](config, version) + + # 4. Buffered notifys are now processed by on_config_notify, + # which skips versions <= self.config_version +``` + +The config client needs to be created in `__init__` using the existing +request/response queue infrastructure. The `ConfigClient` from +`trustgraph.clients.config_client` already exists but uses a synchronous +blocking pattern. An async variant or integration with the processor's +pub/sub backend is needed. + +### Existing Config Handler Types + +For reference, the config types currently used by handlers: + +| Handler | Type(s) | Used By | +|---------|---------|---------| +| `on_configure_flows` | `active-flow` | All FlowProcessor subclasses | +| `on_collection_config` | `collection` | Storage services (triples, embeddings, rows) | +| `on_prompt_config` | `prompt` | Prompt template service, agent extract | +| `on_schema_config` | `schema` | Rows storage, row embeddings, NLP query, structured diag | +| `on_cost_config` | `token-costs` | Metering service | +| `on_ontology_config` | `ontology` | Ontology extraction | +| `on_librarian_config` | `librarian` | Librarian service | +| `on_mcp_config` | `mcp-tool` | MCP tool service | +| `on_knowledge_config` | `kg-core` | Cores service | + +## Implementation Order + +1. **Update ConfigPush schema** — change `config` field to `types` field. + +2. **Update config service** — modify `push()` to send version + types. + Modify `handle_put`/`handle_delete` to extract affected types. + +3. **Add async config query to AsyncProcessor** — create a + request/response client for config queries within the processor's + event loop. + +4. **Implement subscribe-then-fetch startup** — reorder + `AsyncProcessor.start()` to subscribe first, then fetch, then + process buffered notifys with version comparison. + +5. **Update register_config_handler** — add optional `types` parameter. + Update `on_config_notify` to filter by type intersection. + +6. **Update existing handlers** — add `types` parameter to all + `register_config_handler` calls across the codebase. + +7. **Backward compatibility** — handlers without `types` parameter + continue to work (invoked for all changes). + +## Risks + +- **Thundering herd**: if many processors restart simultaneously, they + all hit the config service API at once. Mitigated by the config service + already being designed for request/response load, and the number of + processors being small (tens, not thousands). + +- **Config service availability**: processors now depend on the config + service being up at startup, not just having received a push. This is + already the case in practice — without config, processors can't do + anything useful. diff --git a/tests/unit/test_base/test_async_processor_config.py b/tests/unit/test_base/test_async_processor_config.py new file mode 100644 index 00000000..f1a83fef --- /dev/null +++ b/tests/unit/test_base/test_async_processor_config.py @@ -0,0 +1,323 @@ +""" +Tests for AsyncProcessor config notify pattern: +- register_config_handler with types filtering +- on_config_notify version comparison and type matching +- fetch_config with short-lived client +- fetch_and_apply_config retry logic +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch, Mock +from trustgraph.schema import Term, IRI, LITERAL + + +# Patch heavy dependencies before importing AsyncProcessor +@pytest.fixture +def processor(): + """Create an AsyncProcessor with mocked dependencies.""" + with patch('trustgraph.base.async_processor.get_pubsub') as mock_pubsub, \ + patch('trustgraph.base.async_processor.Consumer') as mock_consumer, \ + patch('trustgraph.base.async_processor.ProcessorMetrics') as mock_pm, \ + patch('trustgraph.base.async_processor.ConsumerMetrics') as mock_cm: + + mock_pubsub.return_value = MagicMock() + mock_consumer.return_value = MagicMock() + mock_pm.return_value = MagicMock() + mock_cm.return_value = MagicMock() + + from trustgraph.base.async_processor import AsyncProcessor + p = AsyncProcessor( + id="test-processor", + taskgroup=AsyncMock(), + ) + return p + + +class TestRegisterConfigHandler: + + def test_register_without_types(self, processor): + handler = AsyncMock() + processor.register_config_handler(handler) + + assert len(processor.config_handlers) == 1 + assert processor.config_handlers[0]["handler"] is handler + assert processor.config_handlers[0]["types"] is None + + def test_register_with_types(self, processor): + handler = AsyncMock() + processor.register_config_handler(handler, types=["prompt"]) + + assert processor.config_handlers[0]["types"] == {"prompt"} + + def test_register_multiple_types(self, processor): + handler = AsyncMock() + processor.register_config_handler( + handler, types=["schema", "collection"] + ) + + assert processor.config_handlers[0]["types"] == { + "schema", "collection" + } + + def test_register_multiple_handlers(self, processor): + h1 = AsyncMock() + h2 = AsyncMock() + processor.register_config_handler(h1, types=["prompt"]) + processor.register_config_handler(h2, types=["schema"]) + + assert len(processor.config_handlers) == 2 + + +class TestOnConfigNotify: + + @pytest.mark.asyncio + async def test_skip_old_version(self, processor): + processor.config_version = 5 + + handler = AsyncMock() + processor.register_config_handler(handler, types=["prompt"]) + + msg = Mock() + msg.value.return_value = Mock(version=3, types=["prompt"]) + + await processor.on_config_notify(msg, None, None) + + handler.assert_not_called() + + @pytest.mark.asyncio + async def test_skip_same_version(self, processor): + processor.config_version = 5 + + handler = AsyncMock() + processor.register_config_handler(handler, types=["prompt"]) + + msg = Mock() + msg.value.return_value = Mock(version=5, types=["prompt"]) + + await processor.on_config_notify(msg, None, None) + + handler.assert_not_called() + + @pytest.mark.asyncio + async def test_skip_irrelevant_types(self, processor): + processor.config_version = 1 + + handler = AsyncMock() + processor.register_config_handler(handler, types=["prompt"]) + + msg = Mock() + msg.value.return_value = Mock(version=2, types=["schema"]) + + await processor.on_config_notify(msg, None, None) + + handler.assert_not_called() + # Version should still be updated + assert processor.config_version == 2 + + @pytest.mark.asyncio + async def test_fetch_on_relevant_type(self, processor): + processor.config_version = 1 + + handler = AsyncMock() + processor.register_config_handler(handler, types=["prompt"]) + + # Mock fetch_config + mock_config = {"prompt": {"key": "value"}} + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + return_value=(mock_config, 2) + ): + msg = Mock() + msg.value.return_value = Mock(version=2, types=["prompt"]) + + await processor.on_config_notify(msg, None, None) + + handler.assert_called_once_with(mock_config, 2) + assert processor.config_version == 2 + + @pytest.mark.asyncio + async def test_handler_without_types_always_called(self, processor): + processor.config_version = 1 + + handler = AsyncMock() + processor.register_config_handler(handler) # No types = all + + mock_config = {"anything": {}} + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + return_value=(mock_config, 2) + ): + msg = Mock() + msg.value.return_value = Mock(version=2, types=["whatever"]) + + await processor.on_config_notify(msg, None, None) + + handler.assert_called_once_with(mock_config, 2) + + @pytest.mark.asyncio + async def test_mixed_handlers_type_filtering(self, processor): + processor.config_version = 1 + + prompt_handler = AsyncMock() + schema_handler = AsyncMock() + all_handler = AsyncMock() + + processor.register_config_handler(prompt_handler, types=["prompt"]) + processor.register_config_handler(schema_handler, types=["schema"]) + processor.register_config_handler(all_handler) + + mock_config = {"prompt": {}} + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + return_value=(mock_config, 2) + ): + msg = Mock() + msg.value.return_value = Mock(version=2, types=["prompt"]) + + await processor.on_config_notify(msg, None, None) + + prompt_handler.assert_called_once() + schema_handler.assert_not_called() + all_handler.assert_called_once() + + @pytest.mark.asyncio + async def test_empty_types_invokes_all(self, processor): + """Empty types list (startup signal) should invoke all handlers.""" + processor.config_version = 1 + + h1 = AsyncMock() + h2 = AsyncMock() + processor.register_config_handler(h1, types=["prompt"]) + processor.register_config_handler(h2, types=["schema"]) + + mock_config = {} + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + return_value=(mock_config, 2) + ): + msg = Mock() + msg.value.return_value = Mock(version=2, types=[]) + + await processor.on_config_notify(msg, None, None) + + h1.assert_called_once() + h2.assert_called_once() + + @pytest.mark.asyncio + async def test_fetch_failure_handled(self, processor): + processor.config_version = 1 + + handler = AsyncMock() + processor.register_config_handler(handler) + + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + side_effect=RuntimeError("Connection failed") + ): + msg = Mock() + msg.value.return_value = Mock(version=2, types=["prompt"]) + + # Should not raise + await processor.on_config_notify(msg, None, None) + + handler.assert_not_called() + + +class TestFetchConfig: + + @pytest.mark.asyncio + async def test_fetch_returns_config_and_version(self, processor): + mock_resp = Mock() + mock_resp.error = None + mock_resp.config = {"prompt": {"key": "val"}} + mock_resp.version = 42 + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + + with patch.object( + processor, '_create_config_client', return_value=mock_client + ): + config, version = await processor.fetch_config() + + assert config == {"prompt": {"key": "val"}} + assert version == 42 + mock_client.stop.assert_called_once() + + @pytest.mark.asyncio + async def test_fetch_raises_on_error_response(self, processor): + mock_resp = Mock() + mock_resp.error = Mock(message="not found") + mock_resp.config = {} + mock_resp.version = 0 + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + + with patch.object( + processor, '_create_config_client', return_value=mock_client + ): + with pytest.raises(RuntimeError, match="Config error"): + await processor.fetch_config() + + mock_client.stop.assert_called_once() + + @pytest.mark.asyncio + async def test_fetch_stops_client_on_exception(self, processor): + mock_client = AsyncMock() + mock_client.request.side_effect = TimeoutError("timeout") + + with patch.object( + processor, '_create_config_client', return_value=mock_client + ): + with pytest.raises(TimeoutError): + await processor.fetch_config() + + mock_client.stop.assert_called_once() + + +class TestFetchAndApplyConfig: + + @pytest.mark.asyncio + async def test_applies_config_to_all_handlers(self, processor): + h1 = AsyncMock() + h2 = AsyncMock() + processor.register_config_handler(h1, types=["prompt"]) + processor.register_config_handler(h2, types=["schema"]) + + mock_config = {"prompt": {}, "schema": {}} + with patch.object( + processor, 'fetch_config', + new_callable=AsyncMock, + return_value=(mock_config, 10) + ): + await processor.fetch_and_apply_config() + + # On startup, all handlers are invoked regardless of type + h1.assert_called_once_with(mock_config, 10) + h2.assert_called_once_with(mock_config, 10) + assert processor.config_version == 10 + + @pytest.mark.asyncio + async def test_retries_on_failure(self, processor): + call_count = 0 + mock_config = {"prompt": {}} + + async def mock_fetch(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise RuntimeError("not ready") + return mock_config, 5 + + with patch.object(processor, 'fetch_config', side_effect=mock_fetch), \ + patch('asyncio.sleep', new_callable=AsyncMock): + await processor.fetch_and_apply_config() + + assert call_count == 3 + assert processor.config_version == 5 diff --git a/tests/unit/test_base/test_flow_processor.py b/tests/unit/test_base/test_flow_processor.py index 70835e00..8672831a 100644 --- a/tests/unit/test_base/test_flow_processor.py +++ b/tests/unit/test_base/test_flow_processor.py @@ -35,7 +35,9 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase): mock_async_init.assert_called_once() # Verify register_config_handler was called with the correct handler - mock_register_config.assert_called_once_with(processor.on_configure_flows) + mock_register_config.assert_called_once_with( + processor.on_configure_flows, types=["active-flow"] + ) # Verify FlowProcessor-specific initialization assert hasattr(processor, 'flows') diff --git a/tests/unit/test_gateway/test_config_receiver.py b/tests/unit/test_gateway/test_config_receiver.py index 803ff4c6..49dc48d8 100644 --- a/tests/unit/test_gateway/test_config_receiver.py +++ b/tests/unit/test_gateway/test_config_receiver.py @@ -5,7 +5,7 @@ Tests for Gateway Config Receiver import pytest import asyncio import json -from unittest.mock import Mock, patch, Mock, MagicMock +from unittest.mock import Mock, patch, MagicMock, AsyncMock import uuid from trustgraph.gateway.config.receiver import ConfigReceiver @@ -23,174 +23,237 @@ class TestConfigReceiver: def test_config_receiver_initialization(self): """Test ConfigReceiver initialization""" mock_backend = Mock() - + config_receiver = ConfigReceiver(mock_backend) - + assert config_receiver.backend == mock_backend assert config_receiver.flow_handlers == [] assert config_receiver.flows == {} + assert config_receiver.config_version == 0 def test_add_handler(self): """Test adding flow handlers""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - + handler1 = Mock() handler2 = Mock() - + config_receiver.add_handler(handler1) config_receiver.add_handler(handler2) - + assert len(config_receiver.flow_handlers) == 2 assert handler1 in config_receiver.flow_handlers assert handler2 in config_receiver.flow_handlers @pytest.mark.asyncio - async def test_on_config_with_new_flows(self): - """Test on_config method with new flows""" + async def test_on_config_notify_new_version(self): + """Test on_config_notify triggers fetch for newer version""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Track calls manually instead of using AsyncMock - start_flow_calls = [] - - async def mock_start_flow(*args): - start_flow_calls.append(args) - - config_receiver.start_flow = mock_start_flow - - # Create mock message with flows + config_receiver.config_version = 1 + + # Mock fetch_and_apply + fetch_calls = [] + async def mock_fetch(**kwargs): + fetch_calls.append(kwargs) + config_receiver.fetch_and_apply = mock_fetch + + # Create notify message with newer version mock_msg = Mock() - mock_msg.value.return_value = Mock( - version="1.0", - config={ - "flow": { - "flow1": '{"name": "test_flow_1", "steps": []}', - "flow2": '{"name": "test_flow_2", "steps": []}' - } - } - ) - - await config_receiver.on_config(mock_msg, None, None) - - # Verify flows were added - assert "flow1" in config_receiver.flows - assert "flow2" in config_receiver.flows - assert config_receiver.flows["flow1"] == {"name": "test_flow_1", "steps": []} - assert config_receiver.flows["flow2"] == {"name": "test_flow_2", "steps": []} - - # Verify start_flow was called for each new flow - assert len(start_flow_calls) == 2 - assert ("flow1", {"name": "test_flow_1", "steps": []}) in start_flow_calls - assert ("flow2", {"name": "test_flow_2", "steps": []}) in start_flow_calls + mock_msg.value.return_value = Mock(version=2, types=["flow"]) + + await config_receiver.on_config_notify(mock_msg, None, None) + + assert len(fetch_calls) == 1 @pytest.mark.asyncio - async def test_on_config_with_removed_flows(self): - """Test on_config method with removed flows""" + async def test_on_config_notify_old_version_ignored(self): + """Test on_config_notify ignores older versions""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Pre-populate with existing flows - config_receiver.flows = { - "flow1": {"name": "test_flow_1", "steps": []}, - "flow2": {"name": "test_flow_2", "steps": []} - } - - # Track calls manually instead of using AsyncMock - stop_flow_calls = [] - - async def mock_stop_flow(*args): - stop_flow_calls.append(args) - - config_receiver.stop_flow = mock_stop_flow - - # Create mock message with only flow1 (flow2 removed) + config_receiver.config_version = 5 + + fetch_calls = [] + async def mock_fetch(**kwargs): + fetch_calls.append(kwargs) + config_receiver.fetch_and_apply = mock_fetch + + # Create notify message with older version mock_msg = Mock() - mock_msg.value.return_value = Mock( - version="1.0", - config={ - "flow": { - "flow1": '{"name": "test_flow_1", "steps": []}' - } - } - ) - - await config_receiver.on_config(mock_msg, None, None) - - # Verify flow2 was removed - assert "flow1" in config_receiver.flows - assert "flow2" not in config_receiver.flows - - # Verify stop_flow was called for removed flow - assert len(stop_flow_calls) == 1 - assert stop_flow_calls[0] == ("flow2", {"name": "test_flow_2", "steps": []}) + mock_msg.value.return_value = Mock(version=3, types=["flow"]) + + await config_receiver.on_config_notify(mock_msg, None, None) + + assert len(fetch_calls) == 0 @pytest.mark.asyncio - async def test_on_config_with_no_flows(self): - """Test on_config method with no flows in config""" + async def test_on_config_notify_irrelevant_types_ignored(self): + """Test on_config_notify ignores types the gateway doesn't care about""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Mock the start_flow and stop_flow methods with async functions - async def mock_start_flow(*args): - pass - async def mock_stop_flow(*args): - pass - config_receiver.start_flow = mock_start_flow - config_receiver.stop_flow = mock_stop_flow - - # Create mock message without flows + config_receiver.config_version = 1 + + fetch_calls = [] + async def mock_fetch(**kwargs): + fetch_calls.append(kwargs) + config_receiver.fetch_and_apply = mock_fetch + + # Create notify message with non-flow type mock_msg = Mock() - mock_msg.value.return_value = Mock( - version="1.0", - config={} - ) - - await config_receiver.on_config(mock_msg, None, None) - - # Verify no flows were added - assert config_receiver.flows == {} - - # Since no flows were in the config, the flow methods shouldn't be called - # (We can't easily assert this with simple async functions, but the test - # passes if no exceptions are thrown) + mock_msg.value.return_value = Mock(version=2, types=["prompt"]) + + await config_receiver.on_config_notify(mock_msg, None, None) + + # Version should be updated but no fetch + assert len(fetch_calls) == 0 + assert config_receiver.config_version == 2 @pytest.mark.asyncio - async def test_on_config_exception_handling(self): - """Test on_config method handles exceptions gracefully""" + async def test_on_config_notify_flow_type_triggers_fetch(self): + """Test on_config_notify fetches for flow-related types""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Create mock message that will cause an exception + config_receiver.config_version = 1 + + fetch_calls = [] + async def mock_fetch(**kwargs): + fetch_calls.append(kwargs) + config_receiver.fetch_and_apply = mock_fetch + + for type_name in ["flow", "active-flow"]: + fetch_calls.clear() + config_receiver.config_version = 1 + + mock_msg = Mock() + mock_msg.value.return_value = Mock(version=2, types=[type_name]) + + await config_receiver.on_config_notify(mock_msg, None, None) + + assert len(fetch_calls) == 1, f"Expected fetch for type {type_name}" + + @pytest.mark.asyncio + async def test_on_config_notify_exception_handling(self): + """Test on_config_notify handles exceptions gracefully""" + mock_backend = Mock() + config_receiver = ConfigReceiver(mock_backend) + + # Create notify message that causes an exception mock_msg = Mock() mock_msg.value.side_effect = Exception("Test exception") - - # This should not raise an exception - await config_receiver.on_config(mock_msg, None, None) - - # Verify flows remain empty + + # Should not raise + await config_receiver.on_config_notify(mock_msg, None, None) + + @pytest.mark.asyncio + async def test_fetch_and_apply_with_new_flows(self): + """Test fetch_and_apply starts new flows""" + mock_backend = Mock() + config_receiver = ConfigReceiver(mock_backend) + + # Mock config_client + mock_resp = Mock() + mock_resp.error = None + mock_resp.version = 5 + mock_resp.config = { + "flow": { + "flow1": '{"name": "test_flow_1"}', + "flow2": '{"name": "test_flow_2"}' + } + } + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + config_receiver.config_client = mock_client + + start_flow_calls = [] + async def mock_start_flow(id, flow): + start_flow_calls.append((id, flow)) + config_receiver.start_flow = mock_start_flow + + await config_receiver.fetch_and_apply() + + assert config_receiver.config_version == 5 + assert "flow1" in config_receiver.flows + assert "flow2" in config_receiver.flows + assert len(start_flow_calls) == 2 + + @pytest.mark.asyncio + async def test_fetch_and_apply_with_removed_flows(self): + """Test fetch_and_apply stops removed flows""" + mock_backend = Mock() + config_receiver = ConfigReceiver(mock_backend) + + # Pre-populate with existing flows + config_receiver.flows = { + "flow1": {"name": "test_flow_1"}, + "flow2": {"name": "test_flow_2"} + } + + # Config now only has flow1 + mock_resp = Mock() + mock_resp.error = None + mock_resp.version = 5 + mock_resp.config = { + "flow": { + "flow1": '{"name": "test_flow_1"}' + } + } + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + config_receiver.config_client = mock_client + + stop_flow_calls = [] + async def mock_stop_flow(id, flow): + stop_flow_calls.append((id, flow)) + config_receiver.stop_flow = mock_stop_flow + + await config_receiver.fetch_and_apply() + + assert "flow1" in config_receiver.flows + assert "flow2" not in config_receiver.flows + assert len(stop_flow_calls) == 1 + assert stop_flow_calls[0][0] == "flow2" + + @pytest.mark.asyncio + async def test_fetch_and_apply_with_no_flows(self): + """Test fetch_and_apply with empty config""" + mock_backend = Mock() + config_receiver = ConfigReceiver(mock_backend) + + mock_resp = Mock() + mock_resp.error = None + mock_resp.version = 1 + mock_resp.config = {} + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + config_receiver.config_client = mock_client + + await config_receiver.fetch_and_apply() + assert config_receiver.flows == {} + assert config_receiver.config_version == 1 @pytest.mark.asyncio async def test_start_flow_with_handlers(self): """Test start_flow method with multiple handlers""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Add mock handlers + handler1 = Mock() handler1.start_flow = Mock() handler2 = Mock() handler2.start_flow = Mock() - + config_receiver.add_handler(handler1) config_receiver.add_handler(handler2) - + flow_data = {"name": "test_flow", "steps": []} - + await config_receiver.start_flow("flow1", flow_data) - - # Verify all handlers were called + handler1.start_flow.assert_called_once_with("flow1", flow_data) handler2.start_flow.assert_called_once_with("flow1", flow_data) @@ -199,19 +262,17 @@ class TestConfigReceiver: """Test start_flow method handles handler exceptions""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Add mock handler that raises exception + handler = Mock() handler.start_flow = Mock(side_effect=Exception("Handler error")) - + config_receiver.add_handler(handler) - + flow_data = {"name": "test_flow", "steps": []} - - # This should not raise an exception + + # Should not raise await config_receiver.start_flow("flow1", flow_data) - - # Verify handler was called + handler.start_flow.assert_called_once_with("flow1", flow_data) @pytest.mark.asyncio @@ -219,21 +280,19 @@ class TestConfigReceiver: """Test stop_flow method with multiple handlers""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Add mock handlers + handler1 = Mock() handler1.stop_flow = Mock() handler2 = Mock() handler2.stop_flow = Mock() - + config_receiver.add_handler(handler1) config_receiver.add_handler(handler2) - + flow_data = {"name": "test_flow", "steps": []} - + await config_receiver.stop_flow("flow1", flow_data) - - # Verify all handlers were called + handler1.stop_flow.assert_called_once_with("flow1", flow_data) handler2.stop_flow.assert_called_once_with("flow1", flow_data) @@ -242,167 +301,77 @@ class TestConfigReceiver: """Test stop_flow method handles handler exceptions""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Add mock handler that raises exception + handler = Mock() handler.stop_flow = Mock(side_effect=Exception("Handler error")) - + config_receiver.add_handler(handler) - + flow_data = {"name": "test_flow", "steps": []} - - # This should not raise an exception + + # Should not raise await config_receiver.stop_flow("flow1", flow_data) - - # Verify handler was called + handler.stop_flow.assert_called_once_with("flow1", flow_data) - @pytest.mark.asyncio - async def test_config_loader_creates_consumer(self): - """Test config_loader method creates Pulsar consumer""" - mock_backend = Mock() - - config_receiver = ConfigReceiver(mock_backend) - # Temporarily restore the real config_loader for this test - config_receiver.config_loader = _real_config_loader.__get__(config_receiver) - - # Mock Consumer class - with patch('trustgraph.gateway.config.receiver.Consumer') as mock_consumer_class, \ - patch('uuid.uuid4') as mock_uuid: - - mock_uuid.return_value = "test-uuid" - mock_consumer = Mock() - async def mock_start(): - pass - mock_consumer.start = mock_start - mock_consumer_class.return_value = mock_consumer - - # Create a task that will complete quickly - async def quick_task(): - await config_receiver.config_loader() - - # Run the task with a timeout to prevent hanging - try: - await asyncio.wait_for(quick_task(), timeout=0.1) - except asyncio.TimeoutError: - # This is expected since the method runs indefinitely - pass - - # Verify Consumer was created with correct parameters - mock_consumer_class.assert_called_once() - call_args = mock_consumer_class.call_args - - assert call_args[1]['backend'] == mock_backend - assert call_args[1]['subscriber'] == "gateway-test-uuid" - assert call_args[1]['handler'] == config_receiver.on_config - assert call_args[1]['start_of_messages'] is True - @patch('asyncio.create_task') @pytest.mark.asyncio async def test_start_creates_config_loader_task(self, mock_create_task): """Test start method creates config loader task""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Mock create_task to avoid actually creating tasks with real coroutines + mock_task = Mock() mock_create_task.return_value = mock_task - + await config_receiver.start() - - # Verify task was created + mock_create_task.assert_called_once() - - # Verify the argument passed to create_task is a coroutine - call_args = mock_create_task.call_args[0] - assert len(call_args) == 1 # Should have one argument (the coroutine) @pytest.mark.asyncio - async def test_on_config_mixed_flow_operations(self): - """Test on_config with mixed add/remove operations""" + async def test_fetch_and_apply_mixed_flow_operations(self): + """Test fetch_and_apply with mixed add/remove operations""" mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - - # Pre-populate with existing flows + + # Pre-populate config_receiver.flows = { - "flow1": {"name": "test_flow_1", "steps": []}, - "flow2": {"name": "test_flow_2", "steps": []} + "flow1": {"name": "test_flow_1"}, + "flow2": {"name": "test_flow_2"} } - - # Track calls manually instead of using Mock - start_flow_calls = [] - stop_flow_calls = [] - - async def mock_start_flow(*args): - start_flow_calls.append(args) - - async def mock_stop_flow(*args): - stop_flow_calls.append(args) - - # Directly assign to avoid patch.object detecting async methods - original_start_flow = config_receiver.start_flow - original_stop_flow = config_receiver.stop_flow + + # Config removes flow1, keeps flow2, adds flow3 + mock_resp = Mock() + mock_resp.error = None + mock_resp.version = 5 + mock_resp.config = { + "flow": { + "flow2": '{"name": "test_flow_2"}', + "flow3": '{"name": "test_flow_3"}' + } + } + + mock_client = AsyncMock() + mock_client.request.return_value = mock_resp + config_receiver.config_client = mock_client + + start_calls = [] + stop_calls = [] + + async def mock_start_flow(id, flow): + start_calls.append((id, flow)) + async def mock_stop_flow(id, flow): + stop_calls.append((id, flow)) + config_receiver.start_flow = mock_start_flow config_receiver.stop_flow = mock_stop_flow - - try: - - # Create mock message with flow1 removed and flow3 added - mock_msg = Mock() - mock_msg.value.return_value = Mock( - version="1.0", - config={ - "flow": { - "flow2": '{"name": "test_flow_2", "steps": []}', - "flow3": '{"name": "test_flow_3", "steps": []}' - } - } - ) - - await config_receiver.on_config(mock_msg, None, None) - - # Verify final state - assert "flow1" not in config_receiver.flows - assert "flow2" in config_receiver.flows - assert "flow3" in config_receiver.flows - - # Verify operations - assert len(start_flow_calls) == 1 - assert start_flow_calls[0] == ("flow3", {"name": "test_flow_3", "steps": []}) - assert len(stop_flow_calls) == 1 - assert stop_flow_calls[0] == ("flow1", {"name": "test_flow_1", "steps": []}) - - finally: - # Restore original methods - config_receiver.start_flow = original_start_flow - config_receiver.stop_flow = original_stop_flow - @pytest.mark.asyncio - async def test_on_config_invalid_json_flow_data(self): - """Test on_config handles invalid JSON in flow data""" - mock_backend = Mock() - config_receiver = ConfigReceiver(mock_backend) - - # Mock the start_flow method with an async function - async def mock_start_flow(*args): - pass - config_receiver.start_flow = mock_start_flow - - # Create mock message with invalid JSON - mock_msg = Mock() - mock_msg.value.return_value = Mock( - version="1.0", - config={ - "flow": { - "flow1": '{"invalid": json}', # Invalid JSON - "flow2": '{"name": "valid_flow", "steps": []}' # Valid JSON - } - } - ) - - # This should handle the exception gracefully - await config_receiver.on_config(mock_msg, None, None) - - # The entire operation should fail due to JSON parsing error - # So no flows should be added - assert config_receiver.flows == {} \ No newline at end of file + await config_receiver.fetch_and_apply() + + assert "flow1" not in config_receiver.flows + assert "flow2" in config_receiver.flows + assert "flow3" in config_receiver.flows + assert len(start_calls) == 1 + assert start_calls[0][0] == "flow3" + assert len(stop_calls) == 1 + assert stop_calls[0][0] == "flow1" diff --git a/tests/unit/test_pubsub/test_queue_naming.py b/tests/unit/test_pubsub/test_queue_naming.py index edd3dfca..8ab09e5a 100644 --- a/tests/unit/test_pubsub/test_queue_naming.py +++ b/tests/unit/test_pubsub/test_queue_naming.py @@ -153,7 +153,7 @@ class TestQueueDefinitions: def test_config_push(self): from trustgraph.schema.services.config import config_push_queue - assert config_push_queue == 'state:tg:config' + assert config_push_queue == 'flow:tg:config' def test_librarian_request(self): from trustgraph.schema.services.library import librarian_request_queue diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index 7f7dbdcd..d59bff59 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -1,7 +1,8 @@ # Base class for processors. Implements: -# - Pulsar client, subscribe and consume basic +# - Pub/sub client, subscribe and consume basic # - the async startup logic +# - Config notify handling with subscribe-then-fetch pattern # - Initialising metrics import asyncio @@ -12,12 +13,17 @@ import logging import os from prometheus_client import start_http_server, Info -from .. schema import ConfigPush, config_push_queue +from .. schema import ConfigPush, ConfigRequest, ConfigResponse +from .. schema import config_push_queue, config_request_queue +from .. schema import config_response_queue from .. log_level import LogLevel from . pubsub import get_pubsub, add_pubsub_args from . producer import Producer from . consumer import Consumer -from . metrics import ProcessorMetrics, ConsumerMetrics +from . subscriber import Subscriber +from . request_response_spec import RequestResponse +from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics +from . metrics import SubscriberMetrics from . logging import add_logging_args, setup_logging default_config_queue = config_push_queue @@ -57,9 +63,13 @@ class AsyncProcessor: "config_push_queue", default_config_queue ) - # This records registered configuration handlers + # This records registered configuration handlers, each entry is: + # { "handler": async_fn, "types": set_or_none } self.config_handlers = [] + # Track the current config version for dedup + self.config_version = 0 + # Create a random ID for this subscription to the configuration # service config_subscriber_id = str(uuid.uuid4()) @@ -68,8 +78,7 @@ class AsyncProcessor: processor = self.id, flow = None, name = "config", ) - # Subscribe to config queue — exclusive so every processor - # gets its own copy of config pushes (broadcast pattern) + # Subscribe to config notify queue self.config_sub_task = Consumer( taskgroup = self.taskgroup, @@ -80,21 +89,93 @@ class AsyncProcessor: topic = self.config_push_queue, schema = ConfigPush, - handler = self.on_config_change, + handler = self.on_config_notify, metrics = config_consumer_metrics, - start_of_messages = True, + start_of_messages = False, consumer_type = 'exclusive', ) self.running = True - # This is called to start dynamic behaviour. An over-ride point for - # extra functionality + def _create_config_client(self): + """Create a short-lived config request/response client.""" + config_rr_id = str(uuid.uuid4()) + + config_req_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor = self.id, flow = None, name = "config-response", + ) + + return RequestResponse( + backend = self.pubsub_backend, + subscription = f"{self.id}--config--{config_rr_id}", + consumer_name = self.id, + request_topic = config_request_queue, + request_schema = ConfigRequest, + request_metrics = config_req_metrics, + response_topic = config_response_queue, + response_schema = ConfigResponse, + response_metrics = config_resp_metrics, + ) + + async def fetch_config(self): + """Fetch full config from config service using a short-lived + request/response client. Returns (config, version) or raises.""" + client = self._create_config_client() + try: + await client.start() + resp = await client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + if resp.error: + raise RuntimeError(f"Config error: {resp.error.message}") + return resp.config, resp.version + finally: + await client.stop() + + # This is called to start dynamic behaviour. + # Implements the subscribe-then-fetch pattern to avoid race conditions. async def start(self): + + # 1. Start the notify consumer (begins buffering incoming notifys) await self.config_sub_task.start() + # 2. Fetch current config via request/response + await self.fetch_and_apply_config() + + # 3. Any buffered notifys with version > fetched version will be + # processed by on_config_notify, which does the version check + + async def fetch_and_apply_config(self): + """Fetch full config from config service and apply to all handlers. + Retries until successful — config service may not be ready yet.""" + + while self.running: + + try: + config, version = await self.fetch_config() + + logger.info(f"Fetched config version {version}") + + self.config_version = version + + # Apply to all handlers (startup = invoke all) + for entry in self.config_handlers: + await entry["handler"](config, version) + + return + + except Exception as e: + logger.warning( + f"Config fetch failed: {e}, retrying in 2s..." + ) + await asyncio.sleep(2) + # This is called to stop all threads. An over-ride point for extra # functionality def stop(self): @@ -110,20 +191,66 @@ class AsyncProcessor: def pulsar_host(self): return self._pulsar_host # Register a new event handler for configuration change - def register_config_handler(self, handler): - self.config_handlers.append(handler) + def register_config_handler(self, handler, types=None): + self.config_handlers.append({ + "handler": handler, + "types": set(types) if types else None, + }) - # Called when a new configuration message push occurs - async def on_config_change(self, message, consumer, flow): + # Called when a config notify message arrives + async def on_config_notify(self, message, consumer, flow): - # Get configuration data and version number - config = message.value().config - version = message.value().version + notify_version = message.value().version + notify_types = set(message.value().types) - # Invoke message handlers - logger.info(f"Config change event: version={version}") - for ch in self.config_handlers: - await ch(config, version) + # Skip if we already have this version or newer + if notify_version <= self.config_version: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"already at v{self.config_version}" + ) + return + + # Check if any handler cares about the affected types + if notify_types: + any_interested = False + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None or notify_types & handler_types: + any_interested = True + break + + if not any_interested: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"no handlers for types {notify_types}" + ) + self.config_version = notify_version + return + + logger.info( + f"Config notify v{notify_version} types={list(notify_types)}, " + f"fetching config..." + ) + + # Fetch full config using short-lived client + try: + config, version = await self.fetch_config() + + self.config_version = version + + # Invoke handlers that care about the affected types + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None: + await entry["handler"](config, version) + elif not notify_types or notify_types & handler_types: + await entry["handler"](config, version) + + except Exception as e: + logger.error( + f"Failed to fetch config on notify: {e}", exc_info=True + ) # This is the 'main' body of the handler. It is a point to override # if needed. By default does nothing. Processors are implemented @@ -181,7 +308,7 @@ class AsyncProcessor: prog=ident, description=doc ) - + parser.add_argument( '--id', default=ident, @@ -271,4 +398,3 @@ class AsyncProcessor: default=8000, help=f'Pulsar host (default: 8000)', ) - diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index 1caeaec0..4579a8c2 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -26,7 +26,9 @@ class FlowProcessor(AsyncProcessor): super(FlowProcessor, self).__init__(**params) # Register configuration handler - self.register_config_handler(self.on_configure_flows) + self.register_config_handler( + self.on_configure_flows, types=["active-flow"] + ) # Initialise flow information state self.flows = {} diff --git a/trustgraph-base/trustgraph/schema/services/config.py b/trustgraph-base/trustgraph/schema/services/config.py index 36e55674..fb219bd9 100644 --- a/trustgraph-base/trustgraph/schema/services/config.py +++ b/trustgraph-base/trustgraph/schema/services/config.py @@ -58,11 +58,11 @@ class ConfigResponse: @dataclass class ConfigPush: version: int = 0 - config: dict[str, dict[str, str]] = field(default_factory=dict) + types: list[str] = field(default_factory=list) config_request_queue = queue('config', cls='request') config_response_queue = queue('config', cls='response') -config_push_queue = queue('config', cls='state') +config_push_queue = queue('config', cls='flow') ############################################################################ diff --git a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py index 23789b96..0bc5d7e3 100755 --- a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py +++ b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py @@ -24,7 +24,7 @@ class Service(ToolService): **params ) - self.register_config_handler(self.on_mcp_config) + self.register_config_handler(self.on_mcp_config, types=["mcp-tool"]) self.mcp_services = {} diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index c0a5be1e..6c897f6b 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -148,18 +148,7 @@ class Configuration: async def handle_delete(self, v): - # for k in v.keys: - # if k.type not in self or k.key not in self[k.type]: - # return ConfigResponse( - # version = None, - # values = None, - # directory = None, - # config = None, - # error = Error( - # type = "key-error", - # message = f"Key error" - # ) - # ) + types = list(set(k.type for k in v.keys)) for k in v.keys: @@ -167,20 +156,22 @@ class Configuration: await self.inc_version() - await self.push() + await self.push(types=types) return ConfigResponse( ) async def handle_put(self, v): + types = list(set(k.type for k in v.values)) + for k in v.values: await self.table_store.put_config(k.type, k.key, k.value) await self.inc_version() - await self.push() + await self.push(types=types) return ConfigResponse( ) diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index ab02fa30..775c8b4e 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -126,12 +126,12 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["flow-blueprint"]) return FlowResponse( error = None, ) - + async def handle_delete_blueprint(self, msg): logger.debug(f"Flow config message: {msg}") @@ -140,7 +140,7 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["flow-blueprint"]) return FlowResponse( error = None, @@ -270,7 +270,7 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["active-flow", "flow"]) return FlowResponse( error = None, @@ -332,12 +332,12 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["active-flow", "flow"]) return FlowResponse( error = None, ) - + async def handle(self, msg): logger.debug(f"Handling flow message: {msg.operation}") diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 42b256df..5c235bb2 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -167,25 +167,22 @@ class Processor(AsyncProcessor): async def start(self): - await self.push() + await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() await self.flow_request_consumer.start() - - async def push(self): - config = await self.config.get_config() + async def push(self, types=None): + version = await self.config.get_version() resp = ConfigPush( version = version, - config = config, + types = types or [], ) await self.config_push_producer.send(resp) - # Race condition, should make sure version & config sync - - logger.info(f"Pushed configuration version {await self.config.get_version()}") + logger.info(f"Pushed config poke version {version}, types={resp.types}") async def on_config_request(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index b8cc5f9e..3bc4e9b6 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -108,7 +108,7 @@ class Processor(AsyncProcessor): flow_config = self, ) - self.register_config_handler(self.on_knowledge_config) + self.register_config_handler(self.on_knowledge_config, types=["kg-core"]) self.flows = {} diff --git a/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py index 1365cb14..362bdec9 100644 --- a/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py @@ -66,8 +66,8 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handlers - self.register_config_handler(self.on_schema_config) - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py index 5ce343c6..ce8d6aae 100644 --- a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py @@ -43,7 +43,7 @@ class Processor(FlowProcessor): self.template_id = template_id self.config_key = config_key - self.register_config_handler(self.on_prompt_config) + self.register_config_handler(self.on_prompt_config, types=["prompt"]) self.register_specification( ConsumerSpec( diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 5078d817..29808cae 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -107,7 +107,7 @@ class Processor(FlowProcessor): ) # Register config handler for ontology updates - self.register_config_handler(self.on_ontology_config) + self.register_config_handler(self.on_ontology_config, types=["ontology"]) # Shared components (not flow-specific) self.ontology_loader = OntologyLoader() diff --git a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py index 02aa7d78..8fd494b0 100644 --- a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py +++ b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py @@ -82,7 +82,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index d956c7c6..97f4e7eb 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -1,36 +1,27 @@ """ -API gateway. Offers HTTP services which are translated to interaction on the -Pulsar bus. +API gateway config receiver. Subscribes to config notify notifications and +fetches full config via request/response to manage flow lifecycle. """ module = "api-gateway" -# FIXME: Subscribes to Pulsar unnecessarily, should only do it when there -# are active listeners - -# FIXME: Connection errors in publishers / subscribers cause those threads -# to fail and are not failed or retried - import asyncio -import argparse -from aiohttp import web -import logging -import os -import base64 import uuid - -# Module logger -logger = logging.getLogger(__name__) +import logging import json -from prometheus_client import start_http_server - -from ... schema import ConfigPush, config_push_queue -from ... base import Consumer +from ... schema import ConfigPush, ConfigRequest, ConfigResponse +from ... schema import config_push_queue, config_request_queue +from ... schema import config_response_queue +from ... base import Consumer, Producer +from ... base.subscriber import Subscriber +from ... base.request_response_spec import RequestResponse +from ... base.metrics import ProducerMetrics, SubscriberMetrics logger = logging.getLogger("config.receiver") logger.setLevel(logging.INFO) + class ConfigReceiver: def __init__(self, backend): @@ -41,34 +32,107 @@ class ConfigReceiver: self.flows = {} + self.config_version = 0 + def add_handler(self, h): self.flow_handlers.append(h) - async def on_config(self, msg, proc, flow): + async def on_config_notify(self, msg, proc, flow): try: v = msg.value() + notify_version = v.version + notify_types = set(v.types) - logger.info(f"Config version: {v.version}") + # Skip if we already have this version or newer + if notify_version <= self.config_version: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"already at v{self.config_version}" + ) + return - flows = v.config.get("flow", {}) + # Gateway cares about flow config + if notify_types and "flow" not in notify_types and "active-flow" not in notify_types: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"no flow types in {notify_types}" + ) + self.config_version = notify_version + return - wanted = list(flows.keys()) - current = list(self.flows.keys()) + logger.info( + f"Config notify v{notify_version}, fetching config..." + ) - for k in wanted: - if k not in current: - self.flows[k] = json.loads(flows[k]) - await self.start_flow(k, self.flows[k]) - - for k in current: - if k not in wanted: - await self.stop_flow(k, self.flows[k]) - del self.flows[k] + await self.fetch_and_apply() except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config notify processing exception: {e}", exc_info=True + ) + + async def fetch_and_apply(self, retry=False): + """Fetch full config and apply flow changes. + If retry=True, keeps retrying until successful.""" + + while True: + + try: + logger.info("Fetching config from config service...") + + resp = await self.config_client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + + logger.info(f"Config response received") + + if resp.error: + if retry: + logger.warning( + f"Config fetch error: {resp.error.message}, " + f"retrying in 2s..." + ) + await asyncio.sleep(2) + continue + logger.error( + f"Config fetch error: {resp.error.message}" + ) + return + + self.config_version = resp.version + config = resp.config + + flows = config.get("flow", {}) + + wanted = list(flows.keys()) + current = list(self.flows.keys()) + + for k in wanted: + if k not in current: + self.flows[k] = json.loads(flows[k]) + await self.start_flow(k, self.flows[k]) + + for k in current: + if k not in wanted: + await self.stop_flow(k, self.flows[k]) + del self.flows[k] + + return + + except Exception as e: + if retry: + logger.warning( + f"Config fetch failed: {e}, retrying in 2s..." + ) + await asyncio.sleep(2) + continue + logger.error( + f"Config fetch exception: {e}", exc_info=True + ) + return async def start_flow(self, id, flow): @@ -79,7 +143,9 @@ class ConfigReceiver: try: await handler.start_flow(id, flow) except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config processing exception: {e}", exc_info=True + ) async def stop_flow(self, id, flow): @@ -90,32 +156,80 @@ class ConfigReceiver: try: await handler.stop_flow(id, flow) except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config processing exception: {e}", exc_info=True + ) async def config_loader(self): - async with asyncio.TaskGroup() as tg: + while True: - id = str(uuid.uuid4()) + try: - self.config_cons = Consumer( - taskgroup = tg, - flow = None, - backend = self.backend, - subscriber = f"gateway-{id}", - topic = config_push_queue, - schema = ConfigPush, - handler = self.on_config, - start_of_messages = True, - ) + async with asyncio.TaskGroup() as tg: - await self.config_cons.start() + id = str(uuid.uuid4()) - logger.debug("Waiting for config updates...") + # Config request/response client + config_req_metrics = ProducerMetrics( + processor="api-gateway", flow=None, + name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor="api-gateway", flow=None, + name="config-response", + ) - logger.info("Config consumer finished") + self.config_client = RequestResponse( + backend=self.backend, + subscription=f"api-gateway--config--{id}", + consumer_name="api-gateway", + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + + logger.info("Starting config request/response client...") + await self.config_client.start() + logger.info("Config request/response client started") + + # Subscribe to notify queue + self.config_cons = Consumer( + taskgroup=tg, + flow=None, + backend=self.backend, + subscriber=f"gateway-{id}", + topic=config_push_queue, + schema=ConfigPush, + handler=self.on_config_notify, + start_of_messages=False, + ) + + logger.info("Starting config notify consumer...") + await self.config_cons.start() + logger.info("Config notify consumer started") + + # Fetch current config (subscribe-then-fetch pattern) + # Retry until config service is available + await self.fetch_and_apply(retry=True) + + logger.info( + "Config loader initialised, waiting for notifys..." + ) + + logger.warning("Config consumer exited, restarting...") + + except Exception as e: + logger.error( + f"Config loader exception: {e}, restarting in 4s...", + exc_info=True + ) + + await asyncio.sleep(4) async def start(self): - - asyncio.create_task(self.config_loader()) + asyncio.create_task(self.config_loader()) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 4f8f5465..15cc97fa 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -246,7 +246,7 @@ class Processor(AsyncProcessor): taskgroup = self.taskgroup, ) - self.register_config_handler(self.on_librarian_config) + self.register_config_handler(self.on_librarian_config, types=["librarian"]) self.flows = {} diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 7851232a..f120a812 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -40,7 +40,7 @@ class Processor(FlowProcessor): } ) - self.register_config_handler(self.on_cost_config) + self.register_config_handler(self.on_cost_config, types=["token-costs"]) self.register_specification( ConsumerSpec( diff --git a/trustgraph-flow/trustgraph/prompt/template/service.py b/trustgraph-flow/trustgraph/prompt/template/service.py index 5fc177d5..97298e13 100755 --- a/trustgraph-flow/trustgraph/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/prompt/template/service.py @@ -65,7 +65,7 @@ class Processor(FlowProcessor): ) ) - self.register_config_handler(self.on_prompt_config) + self.register_config_handler(self.on_prompt_config, types=["prompt"]) # Null configuration, should reload quickly self.manager = PromptManager() diff --git a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py index 2337642f..f928a911 100644 --- a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py @@ -84,7 +84,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py b/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py index 04dae978..b567cc7b 100644 --- a/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py +++ b/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py @@ -64,7 +64,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py index d69c8f17..b878bf61 100644 --- a/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py @@ -70,7 +70,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py index e282f876..f5c12441 100755 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py @@ -31,7 +31,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.vecstore = DocVectors(store_uri) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_document_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py index ea091d35..31a70f23 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -58,7 +58,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.last_index_name = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_index(self, index_name, dim): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py index a87f2128..e5e7e705 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py @@ -37,7 +37,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_document_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py index 0f27adf9..9346c948 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py @@ -45,7 +45,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.vecstore = EntityVectors(store_uri) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_graph_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index d907e873..6a95a38d 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -72,7 +72,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.last_index_name = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_index(self, index_name, dim): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index f887d487..9a7672f8 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -52,7 +52,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_graph_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py index 42e59012..a6ec4ff7 100644 --- a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py @@ -61,7 +61,7 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handler for collection management - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Cache of created Qdrant collections self.created_collections: Set[str] = set() diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index d15916b6..673cba4d 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -75,8 +75,8 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handlers - self.register_config_handler(self.on_schema_config) - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Cache of known keyspaces and whether tables exist self.known_keyspaces: Set[str] = set() diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index d31d6223..2a240f0b 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -144,7 +144,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.tg = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_triples(self, message): diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py index ac8d05c4..86f9a6e3 100755 --- a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -57,7 +57,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.io = FalkorDB.from_url(graph_url).select_graph(database) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_node(self, uri, user, collection): diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py index 7864ac80..16a7d3ed 100755 --- a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -66,7 +66,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.create_indexes(session) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_indexes(self, session): diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 3db712fb..f7b2d947 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -66,7 +66,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.create_indexes(session) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_indexes(self, session):