diff --git a/trustgraph-base/trustgraph/base/agent_service.py b/trustgraph-base/trustgraph/base/agent_service.py index cbb15183..b28ee04b 100644 --- a/trustgraph-base/trustgraph/base/agent_service.py +++ b/trustgraph-base/trustgraph/base/agent_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Agent manager service completion base class @@ -97,7 +100,7 @@ class AgentService(FlowProcessor): ) @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index 4f04df16..9b9328cb 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from argparse import ArgumentParser +from typing import Any, Callable # Base class for processors. Implements: # - Pub/sub client, subscribe and consume basic @@ -178,20 +182,20 @@ class AsyncProcessor: # This is called to stop all threads. An over-ride point for extra # functionality - def stop(self): + def stop(self) -> None: self.pubsub_backend.close() self.running = False # Returns the pub/sub backend (new interface) @property - def pubsub(self): return self.pubsub_backend + def pubsub(self) -> Any: return self.pubsub_backend # Returns the pulsar host (backward compatibility) @property - def pulsar_host(self): return self._pulsar_host + def pulsar_host(self) -> str: return self._pulsar_host # Register a new event handler for configuration change - def register_config_handler(self, handler, types=None): + def register_config_handler(self, handler: Callable[..., Any], types: list[type] | None = None) -> None: self.config_handlers.append({ "handler": handler, "types": set(types) if types else None, @@ -295,13 +299,13 @@ class AsyncProcessor: raise e @classmethod - def setup_logging(cls, args): + def setup_logging(cls, args: dict[str, Any]) -> None: """Configure logging for the entire application""" setup_logging(args) # Startup fabric. launch calls launch_async in async mode. @classmethod - def launch(cls, ident, doc): + def launch(cls, ident: str, doc: str) -> None: # Start assembling CLI arguments parser = argparse.ArgumentParser( @@ -374,7 +378,7 @@ class AsyncProcessor: # The command-line arguments are built using a stack of add_args # invocations @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: add_pubsub_args(parser) add_logging_args(parser) diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index 0ef4672b..ae218b44 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -1,16 +1,19 @@ +from __future__ import annotations + +from typing import Any from . metrics import ConsumerMetrics from . consumer import Consumer from . spec import Spec class ConsumerSpec(Spec): - def __init__(self, name, schema, handler, concurrency = 1): + def __init__(self, name: str, schema: Any, handler: Any, concurrency: int = 1) -> None: self.name = name self.schema = schema self.handler = handler self.concurrency = concurrency - def add(self, flow, processor, definition): + def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: consumer_metrics = ConsumerMetrics( processor = flow.id, flow = flow.name, name = self.name, diff --git a/trustgraph-base/trustgraph/base/document_embeddings_query_service.py b/trustgraph-base/trustgraph/base/document_embeddings_query_service.py index c7aef104..86fb19fc 100644 --- a/trustgraph-base/trustgraph/base/document_embeddings_query_service.py +++ b/trustgraph-base/trustgraph/base/document_embeddings_query_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Document embeddings query service. Input is vectors. Output is list of @@ -82,7 +85,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor): await flow("response").send(r, properties={"id": id}) @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) @@ -93,7 +96,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor): help=f'Number of concurrent requests (default: {default_concurrency})' ) -def run(): +def run() -> None: Processor.launch(default_ident, __doc__) diff --git a/trustgraph-base/trustgraph/base/document_embeddings_store_service.py b/trustgraph-base/trustgraph/base/document_embeddings_store_service.py index 1d33ee94..0d081ce1 100644 --- a/trustgraph-base/trustgraph/base/document_embeddings_store_service.py +++ b/trustgraph-base/trustgraph/base/document_embeddings_store_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Document embeddings store base class @@ -49,7 +52,7 @@ class DocumentEmbeddingsStoreService(FlowProcessor): raise e @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/dynamic_tool_service.py b/trustgraph-base/trustgraph/base/dynamic_tool_service.py index f3fda6dd..7c8e7692 100644 --- a/trustgraph-base/trustgraph/base/dynamic_tool_service.py +++ b/trustgraph-base/trustgraph/base/dynamic_tool_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Base class for dynamically pluggable tool services. @@ -173,7 +176,7 @@ class DynamicToolService(AsyncProcessor): raise NotImplementedError("Subclasses must implement invoke()") @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: AsyncProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/embeddings_service.py b/trustgraph-base/trustgraph/base/embeddings_service.py index 7ae63521..0f8f7f82 100644 --- a/trustgraph-base/trustgraph/base/embeddings_service.py +++ b/trustgraph-base/trustgraph/base/embeddings_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Embeddings resolution base class @@ -100,7 +103,7 @@ class EmbeddingsService(FlowProcessor): ) @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: parser.add_argument( '-c', '--concurrency', @@ -112,4 +115,3 @@ class EmbeddingsService(FlowProcessor): FlowProcessor.add_args(parser) - diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index 4579a8c2..edf974b3 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -1,3 +1,7 @@ +from __future__ import annotations + +from typing import Any +from argparse import ArgumentParser # Base class for processor with management of flows in & out which are managed # by configuration. This is probably all processor types, except for the @@ -41,7 +45,7 @@ class FlowProcessor(AsyncProcessor): logger.info("Service initialised.") # Register a configuration variable - def register_specification(self, spec): + def register_specification(self, spec: Any) -> None: self.specifications.append(spec) # Start processing for a new flow @@ -99,7 +103,7 @@ class FlowProcessor(AsyncProcessor): await super(FlowProcessor, self).start() @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: AsyncProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/graph_embeddings_client.py b/trustgraph-base/trustgraph/base/graph_embeddings_client.py index fec82378..fe717bf1 100644 --- a/trustgraph-base/trustgraph/base/graph_embeddings_client.py +++ b/trustgraph-base/trustgraph/base/graph_embeddings_client.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import Any import logging @@ -9,7 +12,7 @@ from .. knowledge import Uri, Literal logger = logging.getLogger(__name__) -def to_value(x): +def to_value(x: Any) -> Any: """Convert schema Term to Uri or Literal.""" if x.type == IRI: return Uri(x.iri) diff --git a/trustgraph-base/trustgraph/base/graph_embeddings_query_service.py b/trustgraph-base/trustgraph/base/graph_embeddings_query_service.py index cbbef4f2..cc96a398 100644 --- a/trustgraph-base/trustgraph/base/graph_embeddings_query_service.py +++ b/trustgraph-base/trustgraph/base/graph_embeddings_query_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Graph embeddings query service. Input is vectors. Output is list of @@ -82,7 +85,7 @@ class GraphEmbeddingsQueryService(FlowProcessor): await flow("response").send(r, properties={"id": id}) @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) @@ -93,7 +96,7 @@ class GraphEmbeddingsQueryService(FlowProcessor): help=f'Number of concurrent requests (default: {default_concurrency})' ) -def run(): +def run() -> None: Processor.launch(default_ident, __doc__) diff --git a/trustgraph-base/trustgraph/base/graph_embeddings_store_service.py b/trustgraph-base/trustgraph/base/graph_embeddings_store_service.py index 6d3fdf72..e133cf1e 100644 --- a/trustgraph-base/trustgraph/base/graph_embeddings_store_service.py +++ b/trustgraph-base/trustgraph/base/graph_embeddings_store_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Graph embeddings store base class @@ -49,7 +52,7 @@ class GraphEmbeddingsStoreService(FlowProcessor): raise e @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/llm_service.py b/trustgraph-base/trustgraph/base/llm_service.py index 4077c74b..9b96edf6 100644 --- a/trustgraph-base/trustgraph/base/llm_service.py +++ b/trustgraph-base/trustgraph/base/llm_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ LLM text completion base class @@ -205,7 +208,7 @@ class LlmService(FlowProcessor): properties={"id": id} ) - def supports_streaming(self): + def supports_streaming(self) -> bool: """ Override in subclass to indicate streaming support. Returns False by default. @@ -221,7 +224,7 @@ class LlmService(FlowProcessor): raise NotImplementedError("Streaming not implemented for this provider") @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: parser.add_argument( '-c', '--concurrency', diff --git a/trustgraph-base/trustgraph/base/logging.py b/trustgraph-base/trustgraph/base/logging.py index 93cd8fa5..9bf599b1 100644 --- a/trustgraph-base/trustgraph/base/logging.py +++ b/trustgraph-base/trustgraph/base/logging.py @@ -11,7 +11,9 @@ Supports dual output to console and Loki for centralized log aggregation. import contextvars import logging import logging.handlers +from argparse import ArgumentParser from queue import Queue +from typing import Any import os @@ -53,7 +55,7 @@ class _ProcessorIdFilter(logging.Filter): return True -def add_logging_args(parser): +def add_logging_args(parser: ArgumentParser) -> None: """ Add standard logging arguments to an argument parser. @@ -100,7 +102,7 @@ def add_logging_args(parser): ) -def setup_logging(args): +def setup_logging(args: dict[str, Any]) -> None: """ Configure logging from parsed command-line arguments. diff --git a/trustgraph-base/trustgraph/base/metrics.py b/trustgraph-base/trustgraph/base/metrics.py index 618db62d..f9cf3441 100644 --- a/trustgraph-base/trustgraph/base/metrics.py +++ b/trustgraph-base/trustgraph/base/metrics.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import Any from prometheus_client import start_http_server, Info, Enum, Histogram from prometheus_client import Counter @@ -10,7 +13,7 @@ class ConsumerMetrics: within the flow, including state, requests, processing time, and queues. """ - def __init__(self, processor, flow, name): + def __init__(self, processor: str, flow: str, name: str) -> None: self.processor = processor self.flow = flow @@ -41,30 +44,30 @@ class ConsumerMetrics: ["processor", "flow", "name"], ) - def process(self, status): + def process(self, status: str) -> None: __class__.processing_metric.labels( processor = self.processor, flow = self.flow, name = self.name, status=status ).inc() - def rate_limit(self): + def rate_limit(self) -> None: __class__.rate_limit_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).inc() - def state(self, state): + def state(self, state: str) -> None: __class__.state_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).state(state) - def record_time(self): + def record_time(self) -> Any: return __class__.request_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).time() class ProducerMetrics: - def __init__(self, processor, flow, name): + def __init__(self, processor: str, flow: str, name: str) -> None: self.processor = processor self.flow = flow @@ -76,13 +79,13 @@ class ProducerMetrics: ["processor", "flow", "name"], ) - def inc(self): + def inc(self) -> None: __class__.producer_metric.labels( processor = self.processor, flow = self.flow, name = self.name ).inc() class ProcessorMetrics: - def __init__(self, processor): + def __init__(self, processor: str) -> None: self.processor = processor @@ -92,14 +95,14 @@ class ProcessorMetrics: ["processor"] ) - def info(self, info): + def info(self, info: dict[str, str]) -> None: __class__.processor_metric.labels( processor = self.processor ).info(info) class SubscriberMetrics: - def __init__(self, processor, flow, name): + def __init__(self, processor: str, flow: str, name: str) -> None: self.processor = processor self.flow = flow @@ -124,19 +127,18 @@ class SubscriberMetrics: ["processor", "flow", "name"], ) - def received(self): + def received(self) -> None: __class__.received_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).inc() - def state(self, state): + def state(self, state: str) -> None: __class__.state_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).state(state) - def dropped(self, state): + def dropped(self, state: str) -> None: __class__.dropped_metric.labels( processor = self.processor, flow = self.flow, name = self.name, ).inc() - diff --git a/trustgraph-base/trustgraph/base/parameter_spec.py b/trustgraph-base/trustgraph/base/parameter_spec.py index cbaf14e8..4c6dfb21 100644 --- a/trustgraph-base/trustgraph/base/parameter_spec.py +++ b/trustgraph-base/trustgraph/base/parameter_spec.py @@ -1,21 +1,23 @@ +from __future__ import annotations + +from typing import Any from . spec import Spec class Parameter: - def __init__(self, value): + def __init__(self, value: Any) -> None: self.value = value - async def start(self): + async def start(self) -> None: pass - async def stop(self): + async def stop(self) -> None: pass class ParameterSpec(Spec): - def __init__(self, name): + def __init__(self, name: str) -> None: self.name = name - def add(self, flow, processor, definition): + def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: value = definition.get(self.name, None) flow.parameter[self.name] = Parameter(value) - diff --git a/trustgraph-base/trustgraph/base/producer_spec.py b/trustgraph-base/trustgraph/base/producer_spec.py index cf46b958..7e77ef35 100644 --- a/trustgraph-base/trustgraph/base/producer_spec.py +++ b/trustgraph-base/trustgraph/base/producer_spec.py @@ -1,14 +1,17 @@ +from __future__ import annotations + +from typing import Any from . producer import Producer from . metrics import ProducerMetrics from . spec import Spec class ProducerSpec(Spec): - def __init__(self, name, schema): + def __init__(self, name: str, schema: Any) -> None: self.name = name self.schema = schema - def add(self, flow, processor, definition): + def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: producer_metrics = ProducerMetrics( processor = flow.id, flow = flow.name, name = self.name @@ -22,4 +25,3 @@ class ProducerSpec(Spec): ) flow.producer[self.name] = producer - diff --git a/trustgraph-base/trustgraph/base/pubsub.py b/trustgraph-base/trustgraph/base/pubsub.py index 8fe532d8..a7ae3719 100644 --- a/trustgraph-base/trustgraph/base/pubsub.py +++ b/trustgraph-base/trustgraph/base/pubsub.py @@ -1,6 +1,9 @@ +from __future__ import annotations import os import logging +from argparse import ArgumentParser +from typing import Any logger = logging.getLogger(__name__) @@ -15,7 +18,7 @@ DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest') DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/') -def get_pubsub(**config): +def get_pubsub(**config: Any) -> Any: """ Factory function to create a pub/sub backend based on configuration. @@ -51,7 +54,7 @@ def get_pubsub(**config): STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650' -def add_pubsub_args(parser, standalone=False): +def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: """Add pub/sub CLI arguments to an argument parser. Args: diff --git a/trustgraph-base/trustgraph/base/request_response_spec.py b/trustgraph-base/trustgraph/base/request_response_spec.py index e4c80c74..d19aae10 100644 --- a/trustgraph-base/trustgraph/base/request_response_spec.py +++ b/trustgraph-base/trustgraph/base/request_response_spec.py @@ -1,7 +1,9 @@ +from __future__ import annotations import uuid import asyncio import logging +from typing import Any from . subscriber import Subscriber from . producer import Producer @@ -115,7 +117,7 @@ class RequestResponseSpec(Spec): self.response_schema = response_schema self.impl = impl - def add(self, flow, processor, definition): + def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: request_metrics = ProducerMetrics( processor = flow.id, flow = flow.name, name = self.request_name diff --git a/trustgraph-base/trustgraph/base/subscriber_spec.py b/trustgraph-base/trustgraph/base/subscriber_spec.py index b408366c..39b852e5 100644 --- a/trustgraph-base/trustgraph/base/subscriber_spec.py +++ b/trustgraph-base/trustgraph/base/subscriber_spec.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import Any from . metrics import SubscriberMetrics from . subscriber import Subscriber @@ -5,11 +8,11 @@ from . spec import Spec class SubscriberSpec(Spec): - def __init__(self, name, schema): + def __init__(self, name: str, schema: Any) -> None: self.name = name self.schema = schema - def add(self, flow, processor, definition): + def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: subscriber_metrics = SubscriberMetrics( processor = flow.id, flow = flow.name, name = self.name @@ -27,4 +30,3 @@ class SubscriberSpec(Spec): # Put it in the consumer map, does that work? # It means it gets start/stop call. flow.consumer[self.name] = subscriber - diff --git a/trustgraph-base/trustgraph/base/tool_service.py b/trustgraph-base/trustgraph/base/tool_service.py index f6924d52..27ffbfac 100644 --- a/trustgraph-base/trustgraph/base/tool_service.py +++ b/trustgraph-base/trustgraph/base/tool_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Tool invocation base class @@ -112,7 +115,7 @@ class ToolService(FlowProcessor): ) @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: parser.add_argument( '-c', '--concurrency', diff --git a/trustgraph-base/trustgraph/base/triples_client.py b/trustgraph-base/trustgraph/base/triples_client.py index e661f46d..a81a5cd0 100644 --- a/trustgraph-base/trustgraph/base/triples_client.py +++ b/trustgraph-base/trustgraph/base/triples_client.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import Any from . request_response_spec import RequestResponse, RequestResponseSpec from .. schema import TriplesQueryRequest, TriplesQueryResponse, Term, IRI, LITERAL, TRIPLE @@ -11,7 +14,7 @@ class Triple: self.o = o -def to_value(x): +def to_value(x: Any) -> Any: """Convert schema Term to Uri or Literal.""" if x.type == IRI: return Uri(x.iri) @@ -21,7 +24,7 @@ def to_value(x): return Literal(x.value or x.iri) -def from_value(x): +def from_value(x: Any) -> Any: """Convert Uri, Literal, string, or Term to schema Term.""" if x is None: return None diff --git a/trustgraph-base/trustgraph/base/triples_query_service.py b/trustgraph-base/trustgraph/base/triples_query_service.py index 09f36652..b72f37a3 100644 --- a/trustgraph-base/trustgraph/base/triples_query_service.py +++ b/trustgraph-base/trustgraph/base/triples_query_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Triples query service. Input is a (s, p, o) triple, some values may be @@ -108,7 +111,7 @@ class TriplesQueryService(FlowProcessor): yield [], True @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser) @@ -119,7 +122,7 @@ class TriplesQueryService(FlowProcessor): help=f'Number of concurrent requests (default: {default_concurrency})' ) -def run(): +def run() -> None: Processor.launch(default_ident, __doc__) diff --git a/trustgraph-base/trustgraph/base/triples_store_service.py b/trustgraph-base/trustgraph/base/triples_store_service.py index 79036858..f09377bf 100644 --- a/trustgraph-base/trustgraph/base/triples_store_service.py +++ b/trustgraph-base/trustgraph/base/triples_store_service.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from argparse import ArgumentParser """ Triples store base class @@ -53,7 +56,7 @@ class TriplesStoreService(FlowProcessor): raise e @staticmethod - def add_args(parser): + def add_args(parser: ArgumentParser) -> None: FlowProcessor.add_args(parser)