diff --git a/dev-tools/library_client.py b/dev-tools/library_client.py new file mode 100644 index 00000000..ae9d6857 --- /dev/null +++ b/dev-tools/library_client.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +""" +Client utility for browsing and loading documents from the TrustGraph +public document library. + +Usage: + python library_client.py list + python library_client.py search + python library_client.py load-all + python library_client.py load-doc + python library_client.py load-match +""" + +import json +import urllib.request +import sys +import os +import argparse + +from trustgraph.api import Api +from trustgraph.api.types import Uri, Literal, Triple + +BUCKET_URL = "https://storage.googleapis.com/trustgraph-library" +INDEX_URL = f"{BUCKET_URL}/index.json" + +default_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +default_user = "trustgraph" +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + + +def fetch_index(): + with urllib.request.urlopen(INDEX_URL) as resp: + return json.loads(resp.read()) + + +def fetch_document_metadata(doc_id): + url = f"{BUCKET_URL}/{doc_id}.json" + with urllib.request.urlopen(url) as resp: + return json.loads(resp.read()) + + +def fetch_document_content(doc_id): + url = f"{BUCKET_URL}/{doc_id}.epub" + with urllib.request.urlopen(url) as resp: + return resp.read() + + +def search_index(index, query): + query = query.lower() + results = [] + for doc in index: + title = doc.get("title", "").lower() + comments = doc.get("comments", "").lower() + tags = [t.lower() for t in doc.get("tags", [])] + if (query in title or query in comments or + any(query in t for t in tags)): + results.append(doc) + return results + + +def print_index(index): + if not index: + return + + # Calculate column widths + id_width = max(len(str(doc.get("id", ""))) for doc in index) + title_width = max(len(doc.get("title", "")) for doc in index) + + # Cap title width for readability + title_width = min(title_width, 60) + id_width = max(id_width, 2) + + try: + term_width = os.get_terminal_size().columns + except OSError: + term_width = 120 + + tags_width = max(term_width - id_width - title_width - 6, 20) + + header = f"{'ID':<{id_width}} {'Title':<{title_width}} {'Tags':<{tags_width}}" + print(header) + print("-" * len(header)) + + for doc in index: + eid = str(doc.get("id", "")) + title = doc.get("title", "") + if len(title) > title_width: + title = title[:title_width - 3] + "..." + tags = ", ".join(doc.get("tags", [])) + if len(tags) > tags_width: + tags = tags[:tags_width - 3] + "..." + print(f"{eid:<{id_width}} {title:<{title_width}} {tags}") + + +def convert_value(v): + """Convert a JSON triple value to a Uri or Literal.""" + if v["type"] == "uri": + return Uri(v["value"]) + else: + return Literal(v["value"]) + + +def convert_metadata(metadata_json): + """Convert JSON metadata triples to Triple objects.""" + triples = [] + for t in metadata_json: + triples.append(Triple( + s=convert_value(t["s"]), + p=convert_value(t["p"]), + o=convert_value(t["o"]), + )) + return triples + + +def load_document(api, user, doc_entry): + """Fetch metadata and content for a document, then load into TrustGraph.""" + doc_id = doc_entry["id"] + title = doc_entry["title"] + + print(f" [{doc_id}] {title}") + + print(f" fetching metadata...") + doc_json = fetch_document_metadata(doc_id) + doc = doc_json[0] + + print(f" fetching content...") + content = fetch_document_content(doc_id) + + print(f" loading into TrustGraph ({len(content) // 1024}KB)...") + metadata = convert_metadata(doc["metadata"]) + + api.add_document( + id=doc["id"], + metadata=metadata, + user=user, + kind=doc["kind"], + title=doc["title"], + comments=doc["comments"], + tags=doc["tags"], + document=content, + ) + + print(f" done.") + + +def load_documents(api, user, docs): + """Load a list of documents.""" + print(f"Loading {len(docs)} document(s)...\n") + for doc in docs: + try: + load_document(api, user, doc) + except Exception as e: + print(f" FAILED: {e}", file=sys.stderr) + print() + print("Complete.") + + +def main(): + parser = argparse.ArgumentParser( + description="Browse and load documents from the TrustGraph public document library.", + ) + + parser.add_argument( + "-u", "--url", default=default_url, + help=f"TrustGraph API URL (default: {default_url})", + ) + parser.add_argument( + "-U", "--user", default=default_user, + help=f"User ID (default: {default_user})", + ) + parser.add_argument( + "-t", "--token", default=default_token, + help="Authentication token (default: $TRUSTGRAPH_TOKEN)", + ) + + sub = parser.add_subparsers(dest="command") + + sub.add_parser("list", help="List all documents") + + search_parser = sub.add_parser("search", help="Search documents") + search_parser.add_argument("query", help="Text to search for") + + sub.add_parser("load-all", help="Load all documents into TrustGraph") + + load_doc_parser = sub.add_parser("load-doc", help="Load a document by ID") + load_doc_parser.add_argument("id", help="Document ID (ebook number)") + + load_match_parser = sub.add_parser( + "load-match", help="Load all documents matching a search term", + ) + load_match_parser.add_argument("query", help="Text to search for") + + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(1) + + index = fetch_index() + + if args.command in ("list", "search"): + if args.command == "list": + print_index(index) + else: + results = search_index(index, args.query) + if results: + print_index(results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + return + + # Load commands need the API + api = Api(args.url, token=args.token).library() + + if args.command == "load-all": + load_documents(api, args.user, index) + + elif args.command == "load-doc": + matches = [d for d in index if str(d.get("id")) == args.id] + if not matches: + print(f"No document with ID '{args.id}' found.", file=sys.stderr) + sys.exit(1) + load_documents(api, args.user, matches) + + elif args.command == "load-match": + results = search_index(index, args.query) + if results: + load_documents(api, args.user, results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/docs/tech-specs/config-push-poke.md b/docs/tech-specs/config-push-poke.md new file mode 100644 index 00000000..4273e46d --- /dev/null +++ b/docs/tech-specs/config-push-poke.md @@ -0,0 +1,282 @@ +# Config Push "Notify" Pattern Technical Specification + +## Overview + +Replace the current config push mechanism — which broadcasts the full config +blob on a `state` class queue — with a lightweight "notify" notification +containing only the version number and affected types. Processors that care +about those types fetch the full config via the existing request/response +interface. + +This solves the RabbitMQ late-subscriber problem: when a process restarts, +its fresh queue has no historical messages, so it never receives the current +config state. With the notify pattern, the push queue is only a signal — the +source of truth is the config service's request/response API, which is +always available. + +## Problem + +On Pulsar, `state` class queues are persistent topics. A new subscriber +with `InitialPosition.Earliest` reads from message 0 and receives the +last config push. On RabbitMQ, each subscriber gets a fresh per-subscriber +queue (named with a new UUID). Messages published before the queue existed +are gone. A restarting processor never gets the current config. + +## Design + +### The Notify Message + +The `ConfigPush` schema changes from carrying the full config to carrying +just a version number and the list of affected config types: + +```python +@dataclass +class ConfigPush: + version: int = 0 + types: list[str] = field(default_factory=list) +``` + +When the config service handles a `put` or `delete`, it knows which types +were affected (from the request's `values[].type` or `keys[].type`). It +includes those in the notify. On startup, the config service sends a notify +with an empty types list (meaning "everything"). + +### Subscribe-then-Fetch Startup (No Race Condition) + +The critical ordering to avoid missing an update: + +1. **Subscribe** to the config push queue. Buffer incoming notify messages. +2. **Fetch** the full config via request/response (`operation: "config"`). + This returns the config dict and a version number. +3. **Apply** the fetched config to all registered handlers. +4. **Process** buffered notifys. For any notify with `version > fetched_version`, + re-fetch and re-apply. Discard notifys with `version <= fetched_version`. +5. **Enter steady state**. Process future notifys as they arrive. + +This is safe because: +- If an update happens before the subscription, the fetch picks it up. +- If an update happens between subscribe and fetch, it's in the buffer. +- If an update happens after the fetch, it arrives on the queue normally. +- Version comparison ensures no duplicate processing. + +### Processor API + +The current API requires processors to understand the full config dict +structure. The new API should be cleaner — processors declare which config +types they care about and provide a handler that receives only the relevant +config subset. + +#### Current API + +```python +# In processor __init__: +self.register_config_handler(self.on_configure_flows) + +# Handler receives the entire config dict: +async def on_configure_flows(self, config, version): + if "active-flow" not in config: + return + if self.id in config["active-flow"]: + flow_config = json.loads(config["active-flow"][self.id]) + # ... +``` + +#### New API + +```python +# In processor __init__: +self.register_config_handler( + handler=self.on_configure_flows, + types=["active-flow"], +) + +# Handler receives only the relevant config subset, same signature: +async def on_configure_flows(self, config, version): + # config still contains the full dict, but handler is only called + # when "active-flow" type changes (or on startup) + if "active-flow" not in config: + return + # ... +``` + +The `types` parameter is optional. If omitted, the handler is called for +every config change (backward compatible). If specified, the handler is +only invoked when the notify's `types` list intersects with the handler's +types, or on startup (empty types list = everything). + +#### Internal Registration Structure + +```python +# In AsyncProcessor: +def register_config_handler(self, handler, types=None): + self.config_handlers.append({ + "handler": handler, + "types": set(types) if types else None, # None = all types + }) +``` + +#### Notify Processing Logic + +```python +async def on_config_notify(self, message, consumer, flow): + notify_version = message.value().version + notify_types = set(message.value().types) + + # Skip if we already have this version or newer + if notify_version <= self.config_version: + return + + # Fetch full config from config service + config, version = await self.config_client.config() + self.config_version = version + + # Determine which handlers to invoke + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None: + # Handler cares about everything + await entry["handler"](config, version) + elif not notify_types or notify_types & handler_types: + # notify_types empty = startup (invoke all), + # or intersection with handler's types + await entry["handler"](config, version) +``` + +### Config Service Changes + +#### Push Method + +The `push()` method changes to send only version + types: + +```python +async def push(self, types=None): + version = await self.config.get_version() + resp = ConfigPush( + version=version, + types=types or [], + ) + await self.config_push_producer.send(resp) +``` + +#### Put/Delete Handlers + +Extract affected types and pass to push: + +```python +async def handle_put(self, v): + types = list(set(k.type for k in v.values)) + for k in v.values: + await self.table_store.put_config(k.type, k.key, k.value) + await self.inc_version() + await self.push(types=types) + +async def handle_delete(self, v): + types = list(set(k.type for k in v.keys)) + for k in v.keys: + await self.table_store.delete_key(k.type, k.key) + await self.inc_version() + await self.push(types=types) +``` + +#### Queue Class Change + +The config push queue changes from `state` class to `flow` class. The push +is now a transient signal — the source of truth is the config service's +request/response API, not the queue. `flow` class is persistent (survives +broker restarts) but doesn't require last-message retention, which was the +root cause of the RabbitMQ problem. + +```python +config_push_queue = queue('config', cls='flow') # was cls='state' +``` + +#### Startup Push + +On startup, the config service sends a notify with empty types list +(signalling "everything changed"): + +```python +async def start(self): + await self.push(types=[]) # Empty = all types + await self.config_request_consumer.start() +``` + +### AsyncProcessor Changes + +The `AsyncProcessor` needs a config request/response client alongside the +push consumer. The startup sequence becomes: + +```python +async def start(self): + # 1. Start the push consumer (begins buffering notifys) + await self.config_sub_task.start() + + # 2. Fetch current config via request/response + config, version = await self.config_client.config() + self.config_version = version + + # 3. Apply to all handlers (startup = all handlers invoked) + for entry in self.config_handlers: + await entry["handler"](config, version) + + # 4. Buffered notifys are now processed by on_config_notify, + # which skips versions <= self.config_version +``` + +The config client needs to be created in `__init__` using the existing +request/response queue infrastructure. The `ConfigClient` from +`trustgraph.clients.config_client` already exists but uses a synchronous +blocking pattern. An async variant or integration with the processor's +pub/sub backend is needed. + +### Existing Config Handler Types + +For reference, the config types currently used by handlers: + +| Handler | Type(s) | Used By | +|---------|---------|---------| +| `on_configure_flows` | `active-flow` | All FlowProcessor subclasses | +| `on_collection_config` | `collection` | Storage services (triples, embeddings, rows) | +| `on_prompt_config` | `prompt` | Prompt template service, agent extract | +| `on_schema_config` | `schema` | Rows storage, row embeddings, NLP query, structured diag | +| `on_cost_config` | `token-costs` | Metering service | +| `on_ontology_config` | `ontology` | Ontology extraction | +| `on_librarian_config` | `librarian` | Librarian service | +| `on_mcp_config` | `mcp-tool` | MCP tool service | +| `on_knowledge_config` | `kg-core` | Cores service | + +## Implementation Order + +1. **Update ConfigPush schema** — change `config` field to `types` field. + +2. **Update config service** — modify `push()` to send version + types. + Modify `handle_put`/`handle_delete` to extract affected types. + +3. **Add async config query to AsyncProcessor** — create a + request/response client for config queries within the processor's + event loop. + +4. **Implement subscribe-then-fetch startup** — reorder + `AsyncProcessor.start()` to subscribe first, then fetch, then + process buffered notifys with version comparison. + +5. **Update register_config_handler** — add optional `types` parameter. + Update `on_config_notify` to filter by type intersection. + +6. **Update existing handlers** — add `types` parameter to all + `register_config_handler` calls across the codebase. + +7. **Backward compatibility** — handlers without `types` parameter + continue to work (invoked for all changes). + +## Risks + +- **Thundering herd**: if many processors restart simultaneously, they + all hit the config service API at once. Mitigated by the config service + already being designed for request/response load, and the number of + processors being small (tens, not thousands). + +- **Config service availability**: processors now depend on the config + service being up at startup, not just having received a push. This is + already the case in practice — without config, processors can't do + anything useful. diff --git a/tests/unit/test_base/test_flow_processor.py b/tests/unit/test_base/test_flow_processor.py index 70835e00..8672831a 100644 --- a/tests/unit/test_base/test_flow_processor.py +++ b/tests/unit/test_base/test_flow_processor.py @@ -35,7 +35,9 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase): mock_async_init.assert_called_once() # Verify register_config_handler was called with the correct handler - mock_register_config.assert_called_once_with(processor.on_configure_flows) + mock_register_config.assert_called_once_with( + processor.on_configure_flows, types=["active-flow"] + ) # Verify FlowProcessor-specific initialization assert hasattr(processor, 'flows') diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index 7f7dbdcd..d59bff59 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -1,7 +1,8 @@ # Base class for processors. Implements: -# - Pulsar client, subscribe and consume basic +# - Pub/sub client, subscribe and consume basic # - the async startup logic +# - Config notify handling with subscribe-then-fetch pattern # - Initialising metrics import asyncio @@ -12,12 +13,17 @@ import logging import os from prometheus_client import start_http_server, Info -from .. schema import ConfigPush, config_push_queue +from .. schema import ConfigPush, ConfigRequest, ConfigResponse +from .. schema import config_push_queue, config_request_queue +from .. schema import config_response_queue from .. log_level import LogLevel from . pubsub import get_pubsub, add_pubsub_args from . producer import Producer from . consumer import Consumer -from . metrics import ProcessorMetrics, ConsumerMetrics +from . subscriber import Subscriber +from . request_response_spec import RequestResponse +from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics +from . metrics import SubscriberMetrics from . logging import add_logging_args, setup_logging default_config_queue = config_push_queue @@ -57,9 +63,13 @@ class AsyncProcessor: "config_push_queue", default_config_queue ) - # This records registered configuration handlers + # This records registered configuration handlers, each entry is: + # { "handler": async_fn, "types": set_or_none } self.config_handlers = [] + # Track the current config version for dedup + self.config_version = 0 + # Create a random ID for this subscription to the configuration # service config_subscriber_id = str(uuid.uuid4()) @@ -68,8 +78,7 @@ class AsyncProcessor: processor = self.id, flow = None, name = "config", ) - # Subscribe to config queue — exclusive so every processor - # gets its own copy of config pushes (broadcast pattern) + # Subscribe to config notify queue self.config_sub_task = Consumer( taskgroup = self.taskgroup, @@ -80,21 +89,93 @@ class AsyncProcessor: topic = self.config_push_queue, schema = ConfigPush, - handler = self.on_config_change, + handler = self.on_config_notify, metrics = config_consumer_metrics, - start_of_messages = True, + start_of_messages = False, consumer_type = 'exclusive', ) self.running = True - # This is called to start dynamic behaviour. An over-ride point for - # extra functionality + def _create_config_client(self): + """Create a short-lived config request/response client.""" + config_rr_id = str(uuid.uuid4()) + + config_req_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor = self.id, flow = None, name = "config-response", + ) + + return RequestResponse( + backend = self.pubsub_backend, + subscription = f"{self.id}--config--{config_rr_id}", + consumer_name = self.id, + request_topic = config_request_queue, + request_schema = ConfigRequest, + request_metrics = config_req_metrics, + response_topic = config_response_queue, + response_schema = ConfigResponse, + response_metrics = config_resp_metrics, + ) + + async def fetch_config(self): + """Fetch full config from config service using a short-lived + request/response client. Returns (config, version) or raises.""" + client = self._create_config_client() + try: + await client.start() + resp = await client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + if resp.error: + raise RuntimeError(f"Config error: {resp.error.message}") + return resp.config, resp.version + finally: + await client.stop() + + # This is called to start dynamic behaviour. + # Implements the subscribe-then-fetch pattern to avoid race conditions. async def start(self): + + # 1. Start the notify consumer (begins buffering incoming notifys) await self.config_sub_task.start() + # 2. Fetch current config via request/response + await self.fetch_and_apply_config() + + # 3. Any buffered notifys with version > fetched version will be + # processed by on_config_notify, which does the version check + + async def fetch_and_apply_config(self): + """Fetch full config from config service and apply to all handlers. + Retries until successful — config service may not be ready yet.""" + + while self.running: + + try: + config, version = await self.fetch_config() + + logger.info(f"Fetched config version {version}") + + self.config_version = version + + # Apply to all handlers (startup = invoke all) + for entry in self.config_handlers: + await entry["handler"](config, version) + + return + + except Exception as e: + logger.warning( + f"Config fetch failed: {e}, retrying in 2s..." + ) + await asyncio.sleep(2) + # This is called to stop all threads. An over-ride point for extra # functionality def stop(self): @@ -110,20 +191,66 @@ class AsyncProcessor: def pulsar_host(self): return self._pulsar_host # Register a new event handler for configuration change - def register_config_handler(self, handler): - self.config_handlers.append(handler) + def register_config_handler(self, handler, types=None): + self.config_handlers.append({ + "handler": handler, + "types": set(types) if types else None, + }) - # Called when a new configuration message push occurs - async def on_config_change(self, message, consumer, flow): + # Called when a config notify message arrives + async def on_config_notify(self, message, consumer, flow): - # Get configuration data and version number - config = message.value().config - version = message.value().version + notify_version = message.value().version + notify_types = set(message.value().types) - # Invoke message handlers - logger.info(f"Config change event: version={version}") - for ch in self.config_handlers: - await ch(config, version) + # Skip if we already have this version or newer + if notify_version <= self.config_version: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"already at v{self.config_version}" + ) + return + + # Check if any handler cares about the affected types + if notify_types: + any_interested = False + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None or notify_types & handler_types: + any_interested = True + break + + if not any_interested: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"no handlers for types {notify_types}" + ) + self.config_version = notify_version + return + + logger.info( + f"Config notify v{notify_version} types={list(notify_types)}, " + f"fetching config..." + ) + + # Fetch full config using short-lived client + try: + config, version = await self.fetch_config() + + self.config_version = version + + # Invoke handlers that care about the affected types + for entry in self.config_handlers: + handler_types = entry["types"] + if handler_types is None: + await entry["handler"](config, version) + elif not notify_types or notify_types & handler_types: + await entry["handler"](config, version) + + except Exception as e: + logger.error( + f"Failed to fetch config on notify: {e}", exc_info=True + ) # This is the 'main' body of the handler. It is a point to override # if needed. By default does nothing. Processors are implemented @@ -181,7 +308,7 @@ class AsyncProcessor: prog=ident, description=doc ) - + parser.add_argument( '--id', default=ident, @@ -271,4 +398,3 @@ class AsyncProcessor: default=8000, help=f'Pulsar host (default: 8000)', ) - diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index 1caeaec0..4579a8c2 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -26,7 +26,9 @@ class FlowProcessor(AsyncProcessor): super(FlowProcessor, self).__init__(**params) # Register configuration handler - self.register_config_handler(self.on_configure_flows) + self.register_config_handler( + self.on_configure_flows, types=["active-flow"] + ) # Initialise flow information state self.flows = {} diff --git a/trustgraph-base/trustgraph/schema/services/config.py b/trustgraph-base/trustgraph/schema/services/config.py index 36e55674..fb219bd9 100644 --- a/trustgraph-base/trustgraph/schema/services/config.py +++ b/trustgraph-base/trustgraph/schema/services/config.py @@ -58,11 +58,11 @@ class ConfigResponse: @dataclass class ConfigPush: version: int = 0 - config: dict[str, dict[str, str]] = field(default_factory=dict) + types: list[str] = field(default_factory=list) config_request_queue = queue('config', cls='request') config_response_queue = queue('config', cls='response') -config_push_queue = queue('config', cls='state') +config_push_queue = queue('config', cls='flow') ############################################################################ diff --git a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py index 23789b96..0bc5d7e3 100755 --- a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py +++ b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py @@ -24,7 +24,7 @@ class Service(ToolService): **params ) - self.register_config_handler(self.on_mcp_config) + self.register_config_handler(self.on_mcp_config, types=["mcp-tool"]) self.mcp_services = {} diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index c0a5be1e..6c897f6b 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -148,18 +148,7 @@ class Configuration: async def handle_delete(self, v): - # for k in v.keys: - # if k.type not in self or k.key not in self[k.type]: - # return ConfigResponse( - # version = None, - # values = None, - # directory = None, - # config = None, - # error = Error( - # type = "key-error", - # message = f"Key error" - # ) - # ) + types = list(set(k.type for k in v.keys)) for k in v.keys: @@ -167,20 +156,22 @@ class Configuration: await self.inc_version() - await self.push() + await self.push(types=types) return ConfigResponse( ) async def handle_put(self, v): + types = list(set(k.type for k in v.values)) + for k in v.values: await self.table_store.put_config(k.type, k.key, k.value) await self.inc_version() - await self.push() + await self.push(types=types) return ConfigResponse( ) diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index ab02fa30..775c8b4e 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -126,12 +126,12 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["flow-blueprint"]) return FlowResponse( error = None, ) - + async def handle_delete_blueprint(self, msg): logger.debug(f"Flow config message: {msg}") @@ -140,7 +140,7 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["flow-blueprint"]) return FlowResponse( error = None, @@ -270,7 +270,7 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["active-flow", "flow"]) return FlowResponse( error = None, @@ -332,12 +332,12 @@ class FlowConfig: await self.config.inc_version() - await self.config.push() + await self.config.push(types=["active-flow", "flow"]) return FlowResponse( error = None, ) - + async def handle(self, msg): logger.debug(f"Handling flow message: {msg.operation}") diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 42b256df..5c235bb2 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -167,25 +167,22 @@ class Processor(AsyncProcessor): async def start(self): - await self.push() + await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() await self.flow_request_consumer.start() - - async def push(self): - config = await self.config.get_config() + async def push(self, types=None): + version = await self.config.get_version() resp = ConfigPush( version = version, - config = config, + types = types or [], ) await self.config_push_producer.send(resp) - # Race condition, should make sure version & config sync - - logger.info(f"Pushed configuration version {await self.config.get_version()}") + logger.info(f"Pushed config poke version {version}, types={resp.types}") async def on_config_request(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index b8cc5f9e..3bc4e9b6 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -108,7 +108,7 @@ class Processor(AsyncProcessor): flow_config = self, ) - self.register_config_handler(self.on_knowledge_config) + self.register_config_handler(self.on_knowledge_config, types=["kg-core"]) self.flows = {} diff --git a/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py index 1365cb14..362bdec9 100644 --- a/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/row_embeddings/embeddings.py @@ -66,8 +66,8 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handlers - self.register_config_handler(self.on_schema_config) - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py index 5ce343c6..ce8d6aae 100644 --- a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py @@ -43,7 +43,7 @@ class Processor(FlowProcessor): self.template_id = template_id self.config_key = config_key - self.register_config_handler(self.on_prompt_config) + self.register_config_handler(self.on_prompt_config, types=["prompt"]) self.register_specification( ConsumerSpec( diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 5078d817..29808cae 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -107,7 +107,7 @@ class Processor(FlowProcessor): ) # Register config handler for ontology updates - self.register_config_handler(self.on_ontology_config) + self.register_config_handler(self.on_ontology_config, types=["ontology"]) # Shared components (not flow-specific) self.ontology_loader = OntologyLoader() diff --git a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py index 02aa7d78..8fd494b0 100644 --- a/trustgraph-flow/trustgraph/extract/kg/rows/processor.py +++ b/trustgraph-flow/trustgraph/extract/kg/rows/processor.py @@ -82,7 +82,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index d956c7c6..97f4e7eb 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -1,36 +1,27 @@ """ -API gateway. Offers HTTP services which are translated to interaction on the -Pulsar bus. +API gateway config receiver. Subscribes to config notify notifications and +fetches full config via request/response to manage flow lifecycle. """ module = "api-gateway" -# FIXME: Subscribes to Pulsar unnecessarily, should only do it when there -# are active listeners - -# FIXME: Connection errors in publishers / subscribers cause those threads -# to fail and are not failed or retried - import asyncio -import argparse -from aiohttp import web -import logging -import os -import base64 import uuid - -# Module logger -logger = logging.getLogger(__name__) +import logging import json -from prometheus_client import start_http_server - -from ... schema import ConfigPush, config_push_queue -from ... base import Consumer +from ... schema import ConfigPush, ConfigRequest, ConfigResponse +from ... schema import config_push_queue, config_request_queue +from ... schema import config_response_queue +from ... base import Consumer, Producer +from ... base.subscriber import Subscriber +from ... base.request_response_spec import RequestResponse +from ... base.metrics import ProducerMetrics, SubscriberMetrics logger = logging.getLogger("config.receiver") logger.setLevel(logging.INFO) + class ConfigReceiver: def __init__(self, backend): @@ -41,34 +32,107 @@ class ConfigReceiver: self.flows = {} + self.config_version = 0 + def add_handler(self, h): self.flow_handlers.append(h) - async def on_config(self, msg, proc, flow): + async def on_config_notify(self, msg, proc, flow): try: v = msg.value() + notify_version = v.version + notify_types = set(v.types) - logger.info(f"Config version: {v.version}") + # Skip if we already have this version or newer + if notify_version <= self.config_version: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"already at v{self.config_version}" + ) + return - flows = v.config.get("flow", {}) + # Gateway cares about flow config + if notify_types and "flow" not in notify_types and "active-flow" not in notify_types: + logger.debug( + f"Ignoring config notify v{notify_version}, " + f"no flow types in {notify_types}" + ) + self.config_version = notify_version + return - wanted = list(flows.keys()) - current = list(self.flows.keys()) + logger.info( + f"Config notify v{notify_version}, fetching config..." + ) - for k in wanted: - if k not in current: - self.flows[k] = json.loads(flows[k]) - await self.start_flow(k, self.flows[k]) - - for k in current: - if k not in wanted: - await self.stop_flow(k, self.flows[k]) - del self.flows[k] + await self.fetch_and_apply() except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config notify processing exception: {e}", exc_info=True + ) + + async def fetch_and_apply(self, retry=False): + """Fetch full config and apply flow changes. + If retry=True, keeps retrying until successful.""" + + while True: + + try: + logger.info("Fetching config from config service...") + + resp = await self.config_client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + + logger.info(f"Config response received") + + if resp.error: + if retry: + logger.warning( + f"Config fetch error: {resp.error.message}, " + f"retrying in 2s..." + ) + await asyncio.sleep(2) + continue + logger.error( + f"Config fetch error: {resp.error.message}" + ) + return + + self.config_version = resp.version + config = resp.config + + flows = config.get("flow", {}) + + wanted = list(flows.keys()) + current = list(self.flows.keys()) + + for k in wanted: + if k not in current: + self.flows[k] = json.loads(flows[k]) + await self.start_flow(k, self.flows[k]) + + for k in current: + if k not in wanted: + await self.stop_flow(k, self.flows[k]) + del self.flows[k] + + return + + except Exception as e: + if retry: + logger.warning( + f"Config fetch failed: {e}, retrying in 2s..." + ) + await asyncio.sleep(2) + continue + logger.error( + f"Config fetch exception: {e}", exc_info=True + ) + return async def start_flow(self, id, flow): @@ -79,7 +143,9 @@ class ConfigReceiver: try: await handler.start_flow(id, flow) except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config processing exception: {e}", exc_info=True + ) async def stop_flow(self, id, flow): @@ -90,32 +156,80 @@ class ConfigReceiver: try: await handler.stop_flow(id, flow) except Exception as e: - logger.error(f"Config processing exception: {e}", exc_info=True) + logger.error( + f"Config processing exception: {e}", exc_info=True + ) async def config_loader(self): - async with asyncio.TaskGroup() as tg: + while True: - id = str(uuid.uuid4()) + try: - self.config_cons = Consumer( - taskgroup = tg, - flow = None, - backend = self.backend, - subscriber = f"gateway-{id}", - topic = config_push_queue, - schema = ConfigPush, - handler = self.on_config, - start_of_messages = True, - ) + async with asyncio.TaskGroup() as tg: - await self.config_cons.start() + id = str(uuid.uuid4()) - logger.debug("Waiting for config updates...") + # Config request/response client + config_req_metrics = ProducerMetrics( + processor="api-gateway", flow=None, + name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor="api-gateway", flow=None, + name="config-response", + ) - logger.info("Config consumer finished") + self.config_client = RequestResponse( + backend=self.backend, + subscription=f"api-gateway--config--{id}", + consumer_name="api-gateway", + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + + logger.info("Starting config request/response client...") + await self.config_client.start() + logger.info("Config request/response client started") + + # Subscribe to notify queue + self.config_cons = Consumer( + taskgroup=tg, + flow=None, + backend=self.backend, + subscriber=f"gateway-{id}", + topic=config_push_queue, + schema=ConfigPush, + handler=self.on_config_notify, + start_of_messages=False, + ) + + logger.info("Starting config notify consumer...") + await self.config_cons.start() + logger.info("Config notify consumer started") + + # Fetch current config (subscribe-then-fetch pattern) + # Retry until config service is available + await self.fetch_and_apply(retry=True) + + logger.info( + "Config loader initialised, waiting for notifys..." + ) + + logger.warning("Config consumer exited, restarting...") + + except Exception as e: + logger.error( + f"Config loader exception: {e}, restarting in 4s...", + exc_info=True + ) + + await asyncio.sleep(4) async def start(self): - - asyncio.create_task(self.config_loader()) + asyncio.create_task(self.config_loader()) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 4f8f5465..15cc97fa 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -246,7 +246,7 @@ class Processor(AsyncProcessor): taskgroup = self.taskgroup, ) - self.register_config_handler(self.on_librarian_config) + self.register_config_handler(self.on_librarian_config, types=["librarian"]) self.flows = {} diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 7851232a..f120a812 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -40,7 +40,7 @@ class Processor(FlowProcessor): } ) - self.register_config_handler(self.on_cost_config) + self.register_config_handler(self.on_cost_config, types=["token-costs"]) self.register_specification( ConsumerSpec( diff --git a/trustgraph-flow/trustgraph/prompt/template/service.py b/trustgraph-flow/trustgraph/prompt/template/service.py index 5fc177d5..97298e13 100755 --- a/trustgraph-flow/trustgraph/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/prompt/template/service.py @@ -65,7 +65,7 @@ class Processor(FlowProcessor): ) ) - self.register_config_handler(self.on_prompt_config) + self.register_config_handler(self.on_prompt_config, types=["prompt"]) # Null configuration, should reload quickly self.manager = PromptManager() diff --git a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py index 2337642f..f928a911 100644 --- a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py @@ -84,7 +84,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py b/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py index 04dae978..b567cc7b 100644 --- a/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py +++ b/trustgraph-flow/trustgraph/retrieval/nlp_query/service.py @@ -64,7 +64,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py index d69c8f17..b878bf61 100644 --- a/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py +++ b/trustgraph-flow/trustgraph/retrieval/structured_diag/service.py @@ -70,7 +70,7 @@ class Processor(FlowProcessor): ) # Register config handler for schema updates - self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) # Schema storage: name -> RowSchema self.schemas: Dict[str, RowSchema] = {} diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py index e282f876..f5c12441 100755 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py @@ -31,7 +31,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.vecstore = DocVectors(store_uri) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_document_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py index ea091d35..31a70f23 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -58,7 +58,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.last_index_name = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_index(self, index_name, dim): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py index a87f2128..e5e7e705 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py @@ -37,7 +37,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_document_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py index 0f27adf9..9346c948 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py @@ -45,7 +45,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.vecstore = EntityVectors(store_uri) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_graph_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index d907e873..6a95a38d 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -72,7 +72,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.last_index_name = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_index(self, index_name, dim): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index f887d487..9a7672f8 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -52,7 +52,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_graph_embeddings(self, message): diff --git a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py index 42e59012..a6ec4ff7 100644 --- a/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/row_embeddings/qdrant/write.py @@ -61,7 +61,7 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handler for collection management - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Cache of created Qdrant collections self.created_collections: Set[str] = set() diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index d15916b6..673cba4d 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -75,8 +75,8 @@ class Processor(CollectionConfigHandler, FlowProcessor): ) # Register config handlers - self.register_config_handler(self.on_schema_config) - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_schema_config, types=["schema"]) + self.register_config_handler(self.on_collection_config, types=["collection"]) # Cache of known keyspaces and whether tables exist self.known_keyspaces: Set[str] = set() diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index d31d6223..2a240f0b 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -144,7 +144,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.tg = None # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) async def store_triples(self, message): diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py index ac8d05c4..86f9a6e3 100755 --- a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -57,7 +57,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.io = FalkorDB.from_url(graph_url).select_graph(database) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_node(self, uri, user, collection): diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py index 7864ac80..16a7d3ed 100755 --- a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -66,7 +66,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.create_indexes(session) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_indexes(self, session): diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 3db712fb..f7b2d947 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -66,7 +66,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.create_indexes(session) # Register for config push notifications - self.register_config_handler(self.on_collection_config) + self.register_config_handler(self.on_collection_config, types=["collection"]) def create_indexes(self, session):