diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index d056b7fc..1133a78b 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -157,9 +157,17 @@ class Processor(AsyncProcessor): logger.info("Config service initialized") async def _discover_workspaces(self): - workspaces = await self.config.table_store.get_keys( - WORKSPACES_NAMESPACE, WORKSPACE_TYPE - ) + logger.info("Discovering workspaces from Cassandra...") + try: + workspaces = await self.config.table_store.get_keys( + WORKSPACES_NAMESPACE, WORKSPACE_TYPE + ) + logger.info(f"Discovered workspaces: {workspaces}") + except Exception as e: + logger.error( + f"Workspace discovery failed: {e}", exc_info=True + ) + return for workspace_id in workspaces: if workspace_id not in self.workspace_consumers: @@ -220,6 +228,11 @@ class Processor(AsyncProcessor): await self.pubsub.ensure_topic(self.config_request_queue_base) await self.push() # Startup poke: empty types = everything await self.system_consumer.start() + + # Start the config push subscriber so we receive our own + # workspace change notifications. + await self.config_sub_task.start() + await self._discover_workspaces() async def push(self, changes=None, workspace_changes=None):