trustgraph/trustgraph-base/trustgraph/base/graph_embeddings_store_service.py
RaccoonLabs b881fb528e
feat: add type hints to all public functions in trustgraph/base (#803)
feat: add type hints to all public functions in trustgraph/base

Add type annotations to 23 modules covering:
- Metrics classes (ConsumerMetrics, ProducerMetrics, etc.)
- Spec classes (ConsumerSpec, ProducerSpec, SubscriberSpec, etc.)
- Service classes with add_args() and run() methods
- Utility functions (logging, pubsub, clients)
- AsyncProcessor methods

All 93 public functions now fully typed.

Refs #785

* refactor: deduplicate imports and move __future__ after docstrings

Addresses review feedback on PR #803:
- Remove duplicate 'from argparse import ArgumentParser' across 12 files
- Move 'from __future__ import annotations' to line 1 in all files
- Clean up excessive blank lines
2026-04-16 09:59:04 +01:00

58 lines
1.2 KiB
Python

from __future__ import annotations
from argparse import ArgumentParser
"""
Graph embeddings store base class
"""
import logging
from .. schema import GraphEmbeddings
from .. base import FlowProcessor, ConsumerSpec
from .. exceptions import TooManyRequests
# Module logger
logger = logging.getLogger(__name__)
default_ident = "graph-embeddings-write"
class GraphEmbeddingsStoreService(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
super(GraphEmbeddingsStoreService, self).__init__(
**params | { "id": id }
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = GraphEmbeddings,
handler = self.on_message
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
await self.store_graph_embeddings(request)
except TooManyRequests as e:
raise e
except Exception as e:
logger.error(f"Exception in graph embeddings store service: {e}", exc_info=True)
raise e
@staticmethod
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)