Compare commits

...

6 commits

Author SHA1 Message Date
Lennard Geißler
645b6a66fd
fix: replace deprecated asyncio.iscoroutinefunction with inspect.iscoroutinefunction (#819)
asyncio.iscoroutinefunction is deprecated since Python 3.14 and slated for
removal in 3.16. The inspect equivalent has an identical signature and return
semantics. Replaces 8 call sites across 3 modules to silence DeprecationWarnings
reported in #818.
2026-04-16 10:57:39 +01:00
Trevin Chow
ef8bb3aed4
fix: replace deprecated datetime.utcnow() with timezone-aware datetime.now(timezone.utc) (#816)
Python 3.14 deprecates datetime.utcnow(). Replace all 9 occurrences with
datetime.now(timezone.utc) and normalize the output to preserve the existing
ISO-8601 "Z"-suffixed format so downstream parsers are unaffected.

Fixes #814
2026-04-16 10:16:11 +01:00
cybermaggedon
95e4839da7
Fix docstring breakage in type hint change (#817) 2026-04-16 10:10:58 +01:00
RaccoonLabs
706e62b7c2 feat: add type hints to all public functions in trustgraph/base (#803)
feat: add type hints to all public functions in trustgraph/base

Add type annotations to 23 modules covering:
- Metrics classes (ConsumerMetrics, ProducerMetrics, etc.)
- Spec classes (ConsumerSpec, ProducerSpec, SubscriberSpec, etc.)
- Service classes with add_args() and run() methods
- Utility functions (logging, pubsub, clients)
- AsyncProcessor methods

All 93 public functions now fully typed.

Refs #785

* refactor: deduplicate imports and move __future__ after docstrings

Addresses review feedback on PR #803:
- Remove duplicate 'from argparse import ArgumentParser' across 12 files
- Move 'from __future__ import annotations' to line 1 in all files
- Clean up excessive blank lines
2026-04-16 10:08:19 +01:00
cybermaggedon
22096e07e2
Fix tests broken by the recent RabbitMQ/Cassandra async fixes (#815)
- Fix invalid key in config causing rogue warning
- Fix asyncio test tags
2026-04-16 10:00:18 +01:00
Alex Jenkins
fdb52a6bfc Add docstrings to public classes (#812)
Add class-level docstrings to five public classes in trustgraph-base:
Flow, LlmService, ConsumerMetrics, ToolClient, and TriplesStoreService.
Each docstring summarises the class's role in the system to aid
discoverability for new contributors.

Signed-off-by: Jenkins, Kenneth Alexander <kjenkins60@gatech.edu>
2026-04-16 09:07:08 +01:00
43 changed files with 270 additions and 138 deletions

View file

@ -18,6 +18,20 @@ from trustgraph.schema import ExtractedObject, Metadata, RowSchema, Field
class TestRowsCassandraIntegration:
"""Integration tests for Cassandra row storage with unified table"""
@pytest.fixture(autouse=True)
def patch_async_execute(self):
"""Route async_execute through session.execute so the mock's
side_effect handles all CQL (DDL and DML) uniformly and every
call lands in mock_session.execute.call_args_list."""
async def _fake(session, query, params=None):
session.execute(query, params)
return []
with patch(
'trustgraph.storage.rows.cassandra.write.async_execute',
new=_fake,
):
yield
@pytest.fixture
def mock_cassandra_session(self):
"""Mock Cassandra session for integration tests"""

View file

@ -1,6 +1,5 @@
[pytest]
testpaths = tests
python_paths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*
@ -8,7 +7,7 @@ addopts =
-v
--tb=short
--strict-markers
--disable-warnings
# --disable-warnings
# --cov-fail-under=80
asyncio_mode = auto
markers =

View file

@ -9,6 +9,7 @@ tool usage patterns.
import pytest
from unittest.mock import Mock, AsyncMock
import asyncio
import inspect
from collections import defaultdict
@ -133,7 +134,7 @@ class TestToolCoordinationLogic:
resolved_params[key] = value
# Execute tool
if asyncio.iscoroutinefunction(tool_function):
if inspect.iscoroutinefunction(tool_function):
result = await tool_function(**resolved_params)
else:
result = tool_function(**resolved_params)
@ -227,7 +228,7 @@ class TestToolCoordinationLogic:
# Simulate async execution with delay
await asyncio.sleep(0.001) # Small delay to simulate work
if asyncio.iscoroutinefunction(tool_function):
if inspect.iscoroutinefunction(tool_function):
result = await tool_function(**parameters)
else:
result = tool_function(**parameters)
@ -337,7 +338,7 @@ class TestToolCoordinationLogic:
if attempt > 0:
await asyncio.sleep(0.001 * (self.backoff_factor ** attempt))
if asyncio.iscoroutinefunction(tool_function):
if inspect.iscoroutinefunction(tool_function):
result = await tool_function(**parameters)
else:
result = tool_function(**parameters)

View file

@ -45,7 +45,7 @@ def test_setup_logging_without_loki_configures_console(monkeypatch):
kwargs = basic_config.call_args.kwargs
assert kwargs["level"] == logging.DEBUG
assert kwargs["force"] is True
assert "processor-1" in kwargs["format"]
assert "%(processor_id)s" in kwargs["format"]
assert len(kwargs["handlers"]) == 1
logger.info.assert_called_once_with("Logging configured with level: debug")
@ -60,11 +60,14 @@ def test_setup_logging_with_loki_enables_queue_listener(monkeypatch):
queue_listener = MagicMock()
loki_handler = MagicMock()
noisy_logger = MagicMock()
logger_map = {
None: root_logger,
"trustgraph.base.logging": module_logger,
"urllib3": urllib3_logger,
"urllib3.connectionpool": connectionpool_logger,
"pika": noisy_logger,
"cassandra": noisy_logger,
}
monkeypatch.setattr(logging, "basicConfig", basic_config)

View file

@ -330,7 +330,8 @@ class TestUnifiedTableQueries:
"""Test queries against the unified rows table"""
@pytest.mark.asyncio
async def test_query_with_index_match(self):
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_with_index_match(self, mock_async_execute):
"""Test query execution with matching index"""
processor = MagicMock()
processor.session = MagicMock()
@ -340,10 +341,10 @@ class TestUnifiedTableQueries:
processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock session execute to return test data
# Mock async_execute to return test data
mock_row = MagicMock()
mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"}
processor.session.execute.return_value = [mock_row]
mock_async_execute.return_value = [mock_row]
schema = RowSchema(
name="products",
@ -366,12 +367,12 @@ class TestUnifiedTableQueries:
# Verify Cassandra was connected and queried
processor.connect_cassandra.assert_called_once()
processor.session.execute.assert_called_once()
mock_async_execute.assert_called_once()
# Verify query structure - should query unified rows table
call_args = processor.session.execute.call_args
query = call_args[0][0]
params = call_args[0][1]
call_args = mock_async_execute.call_args
query = call_args[0][1]
params = call_args[0][2]
assert "SELECT data, source FROM test_user.rows" in query
assert "collection = %s" in query
@ -390,7 +391,8 @@ class TestUnifiedTableQueries:
assert results[0]["category"] == "electronics"
@pytest.mark.asyncio
async def test_query_without_index_match(self):
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
async def test_query_without_index_match(self, mock_async_execute):
"""Test query execution without matching index (scan mode)"""
processor = MagicMock()
processor.session = MagicMock()
@ -401,12 +403,12 @@ class TestUnifiedTableQueries:
processor._matches_filters = Processor._matches_filters.__get__(processor, Processor)
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
# Mock session execute to return test data
# Mock async_execute to return test data
mock_row1 = MagicMock()
mock_row1.data = {"id": "1", "name": "Product A", "price": "100"}
mock_row2 = MagicMock()
mock_row2.data = {"id": "2", "name": "Product B", "price": "200"}
processor.session.execute.return_value = [mock_row1, mock_row2]
mock_async_execute.return_value = [mock_row1, mock_row2]
schema = RowSchema(
name="products",
@ -428,8 +430,8 @@ class TestUnifiedTableQueries:
)
# Query should use ALLOW FILTERING for scan
call_args = processor.session.execute.call_args
query = call_args[0][0]
call_args = mock_async_execute.call_args
query = call_args[0][1]
assert "ALLOW FILTERING" in query

View file

@ -72,7 +72,6 @@ def processor(mock_pulsar_client, sample_schemas):
return proc
@pytest.mark.asyncio
class TestNLPQueryProcessor:
"""Test NLP Query service processor"""

View file

@ -36,7 +36,6 @@ def processor(mock_pulsar_client):
return proc
@pytest.mark.asyncio
class TestStructuredQueryProcessor:
"""Test Structured Query service processor"""

View file

@ -160,7 +160,8 @@ class TestRowsCassandraStorageLogic:
assert id_field.primary is True
@pytest.mark.asyncio
async def test_object_processing_stores_data_map(self):
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
async def test_object_processing_stores_data_map(self, mock_async_execute):
"""Test that row processing stores data as map<text, text>"""
processor = MagicMock()
processor.schemas = {
@ -184,6 +185,8 @@ class TestRowsCassandraStorageLogic:
processor.collection_exists = MagicMock(return_value=True)
processor.on_object = Processor.on_object.__get__(processor, Processor)
mock_async_execute.return_value = []
# Create test object
test_obj = ExtractedObject(
metadata=Metadata(
@ -205,10 +208,10 @@ class TestRowsCassandraStorageLogic:
await processor.on_object(msg, None, None)
# Verify insert was executed
processor.session.execute.assert_called()
insert_call = processor.session.execute.call_args
insert_cql = insert_call[0][0]
values = insert_call[0][1]
mock_async_execute.assert_called()
insert_call = mock_async_execute.call_args
insert_cql = insert_call[0][1]
values = insert_call[0][2]
# Verify using unified rows table
assert "INSERT INTO test_user.rows" in insert_cql
@ -222,7 +225,8 @@ class TestRowsCassandraStorageLogic:
assert values[5] == "" # source
@pytest.mark.asyncio
async def test_object_processing_multiple_indexes(self):
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
async def test_object_processing_multiple_indexes(self, mock_async_execute):
"""Test that row is written once per indexed field"""
processor = MagicMock()
processor.schemas = {
@ -246,6 +250,8 @@ class TestRowsCassandraStorageLogic:
processor.collection_exists = MagicMock(return_value=True)
processor.on_object = Processor.on_object.__get__(processor, Processor)
mock_async_execute.return_value = []
test_obj = ExtractedObject(
metadata=Metadata(
id="test-001",
@ -264,12 +270,12 @@ class TestRowsCassandraStorageLogic:
await processor.on_object(msg, None, None)
# Should have 3 inserts (one per indexed field: id, category, status)
assert processor.session.execute.call_count == 3
assert mock_async_execute.call_count == 3
# Check that different index_names were used
index_names_used = set()
for call in processor.session.execute.call_args_list:
values = call[0][1]
for call in mock_async_execute.call_args_list:
values = call[0][2]
index_names_used.add(values[2]) # index_name is 3rd value
assert index_names_used == {"id", "category", "status"}
@ -279,7 +285,8 @@ class TestRowsCassandraStorageBatchLogic:
"""Test batch processing logic for unified table implementation"""
@pytest.mark.asyncio
async def test_batch_object_processing(self):
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
async def test_batch_object_processing(self, mock_async_execute):
"""Test processing of batch ExtractedObjects"""
processor = MagicMock()
processor.schemas = {
@ -302,6 +309,8 @@ class TestRowsCassandraStorageBatchLogic:
processor.collection_exists = MagicMock(return_value=True)
processor.on_object = Processor.on_object.__get__(processor, Processor)
mock_async_execute.return_value = []
# Create batch object with multiple values
batch_obj = ExtractedObject(
metadata=Metadata(
@ -325,12 +334,12 @@ class TestRowsCassandraStorageBatchLogic:
await processor.on_object(msg, None, None)
# Should have 3 inserts (one per row, one index per row since only primary key)
assert processor.session.execute.call_count == 3
assert mock_async_execute.call_count == 3
# Check each insert has different id
ids_inserted = set()
for call in processor.session.execute.call_args_list:
values = call[0][1]
for call in mock_async_execute.call_args_list:
values = call[0][2]
ids_inserted.add(tuple(values[3])) # index_value is 4th value
assert ids_inserted == {("001",), ("002",), ("003",)}

View file

@ -9,7 +9,7 @@ with hand-built fake rows.
"""
import pytest
from unittest.mock import Mock
from unittest.mock import Mock, AsyncMock, patch
from trustgraph.tables.knowledge import KnowledgeTableStore
from trustgraph.schema import (
@ -35,7 +35,10 @@ def _make_store():
class TestGetGraphEmbeddings:
@pytest.mark.asyncio
async def test_row_converts_to_entity_embeddings_with_singular_vector(self):
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_row_converts_to_entity_embeddings_with_singular_vector(
self, mock_async_execute
):
"""
Cassandra rows return entities as a list of [entity_tuple, vector]
pairs in row[3]. The deserializer must construct EntityEmbeddings
@ -56,8 +59,8 @@ class TestGetGraphEmbeddings:
store = _make_store()
store.cassandra = Mock()
store.cassandra.execute = Mock(return_value=[fake_row])
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = [fake_row]
received = []
@ -72,7 +75,8 @@ class TestGetGraphEmbeddings:
)
# Assert
store.cassandra.execute.assert_called_once_with(
mock_async_execute.assert_called_once_with(
store.cassandra,
store.get_graph_embeddings_stmt,
("alice", "doc-1"),
)
@ -102,15 +106,16 @@ class TestGetGraphEmbeddings:
assert ge.entities[2].entity.value == "a literal entity"
@pytest.mark.asyncio
async def test_empty_entities_blob_yields_empty_list(self):
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_empty_entities_blob_yields_empty_list(self, mock_async_execute):
"""row[3] being None / empty must produce a GraphEmbeddings with
no entities, not raise."""
fake_row = (None, None, None, None)
store = _make_store()
store.cassandra = Mock()
store.cassandra.execute = Mock(return_value=[fake_row])
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = [fake_row]
received = []
@ -123,7 +128,8 @@ class TestGetGraphEmbeddings:
assert received[0].entities == []
@pytest.mark.asyncio
async def test_multiple_rows_each_emit_one_message(self):
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_multiple_rows_each_emit_one_message(self, mock_async_execute):
fake_rows = [
(None, None, None, [
(("http://example.org/a", True), [1.0]),
@ -135,8 +141,8 @@ class TestGetGraphEmbeddings:
store = _make_store()
store.cassandra = Mock()
store.cassandra.execute = Mock(return_value=fake_rows)
store.get_graph_embeddings_stmt = Mock()
mock_async_execute.return_value = fake_rows
received = []
@ -157,7 +163,8 @@ class TestGetTriples:
the same Metadata construction. Cover it for parity."""
@pytest.mark.asyncio
async def test_row_converts_to_triples(self):
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
async def test_row_converts_to_triples(self, mock_async_execute):
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri)
fake_row = (
None, None, None,
@ -172,8 +179,8 @@ class TestGetTriples:
store = _make_store()
store.cassandra = Mock()
store.cassandra.execute = Mock(return_value=[fake_row])
store.get_triples_stmt = Mock()
mock_async_execute.return_value = [fake_row]
received = []

View file

@ -1,8 +1,11 @@
"""
Agent manager service completion base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import time
import logging
from prometheus_client import Histogram
@ -97,7 +100,7 @@ class AgentService(FlowProcessor):
)
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)

View file

@ -1,3 +1,7 @@
from __future__ import annotations
from argparse import ArgumentParser
from typing import Any, Callable
# Base class for processors. Implements:
# - Pub/sub client, subscribe and consume basic
@ -178,20 +182,20 @@ class AsyncProcessor:
# This is called to stop all threads. An over-ride point for extra
# functionality
def stop(self):
def stop(self) -> None:
self.pubsub_backend.close()
self.running = False
# Returns the pub/sub backend (new interface)
@property
def pubsub(self): return self.pubsub_backend
def pubsub(self) -> Any: return self.pubsub_backend
# Returns the pulsar host (backward compatibility)
@property
def pulsar_host(self): return self._pulsar_host
def pulsar_host(self) -> str: return self._pulsar_host
# Register a new event handler for configuration change
def register_config_handler(self, handler, types=None):
def register_config_handler(self, handler: Callable[..., Any], types: list[type] | None = None) -> None:
self.config_handlers.append({
"handler": handler,
"types": set(types) if types else None,
@ -295,13 +299,13 @@ class AsyncProcessor:
raise e
@classmethod
def setup_logging(cls, args):
def setup_logging(cls, args: dict[str, Any]) -> None:
"""Configure logging for the entire application"""
setup_logging(args)
# Startup fabric. launch calls launch_async in async mode.
@classmethod
def launch(cls, ident, doc):
def launch(cls, ident: str, doc: str) -> None:
# Start assembling CLI arguments
parser = argparse.ArgumentParser(
@ -374,7 +378,7 @@ class AsyncProcessor:
# The command-line arguments are built using a stack of add_args
# invocations
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
add_pubsub_args(parser)
add_logging_args(parser)

View file

@ -1,16 +1,19 @@
from __future__ import annotations
from typing import Any
from . metrics import ConsumerMetrics
from . consumer import Consumer
from . spec import Spec
class ConsumerSpec(Spec):
def __init__(self, name, schema, handler, concurrency = 1):
def __init__(self, name: str, schema: Any, handler: Any, concurrency: int = 1) -> None:
self.name = name
self.schema = schema
self.handler = handler
self.concurrency = concurrency
def add(self, flow, processor, definition):
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
consumer_metrics = ConsumerMetrics(
processor = flow.id, flow = flow.name, name = self.name,

View file

@ -1,9 +1,12 @@
"""
Document embeddings query service. Input is vectors. Output is list of
embeddings.
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
@ -82,7 +85,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor):
await flow("response").send(r, properties={"id": id})
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)
@ -93,7 +96,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor):
help=f'Number of concurrent requests (default: {default_concurrency})'
)
def run():
def run() -> None:
Processor.launch(default_ident, __doc__)

View file

@ -1,8 +1,11 @@
"""
Document embeddings store base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import DocumentEmbeddings
@ -49,7 +52,7 @@ class DocumentEmbeddingsStoreService(FlowProcessor):
raise e
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)

View file

@ -1,4 +1,3 @@
"""
Base class for dynamically pluggable tool services.
@ -11,6 +10,10 @@ Uses direct Pulsar topics (no flow configuration required):
- Response: non-persistent://tg/response/{topic}
"""
from __future__ import annotations
from argparse import ArgumentParser
import json
import logging
import asyncio
@ -173,7 +176,7 @@ class DynamicToolService(AsyncProcessor):
raise NotImplementedError("Subclasses must implement invoke()")
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
AsyncProcessor.add_args(parser)

View file

@ -1,8 +1,11 @@
"""
Embeddings resolution base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import time
import logging
from prometheus_client import Histogram
@ -100,7 +103,7 @@ class EmbeddingsService(FlowProcessor):
)
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
parser.add_argument(
'-c', '--concurrency',
@ -112,4 +115,3 @@ class EmbeddingsService(FlowProcessor):
FlowProcessor.add_args(parser)

View file

@ -2,6 +2,13 @@
import asyncio
class Flow:
"""
Runtime representation of a deployed flow process.
This class maintains internal processor states and orchestrates
lifecycles (start, stop) for inputs (consumers) and parameters
that drive data flowing across linked nodes.
"""
def __init__(self, id, flow, processor, defn):
self.id = id

View file

@ -1,3 +1,7 @@
from __future__ import annotations
from typing import Any
from argparse import ArgumentParser
# Base class for processor with management of flows in & out which are managed
# by configuration. This is probably all processor types, except for the
@ -41,7 +45,7 @@ class FlowProcessor(AsyncProcessor):
logger.info("Service initialised.")
# Register a configuration variable
def register_specification(self, spec):
def register_specification(self, spec: Any) -> None:
self.specifications.append(spec)
# Start processing for a new flow
@ -99,7 +103,7 @@ class FlowProcessor(AsyncProcessor):
await super(FlowProcessor, self).start()
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
AsyncProcessor.add_args(parser)

View file

@ -1,3 +1,6 @@
from __future__ import annotations
from typing import Any
import logging
@ -9,7 +12,7 @@ from .. knowledge import Uri, Literal
logger = logging.getLogger(__name__)
def to_value(x):
def to_value(x: Any) -> Any:
"""Convert schema Term to Uri or Literal."""
if x.type == IRI:
return Uri(x.iri)

View file

@ -1,9 +1,12 @@
"""
Graph embeddings query service. Input is vectors. Output is list of
embeddings.
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
@ -82,7 +85,7 @@ class GraphEmbeddingsQueryService(FlowProcessor):
await flow("response").send(r, properties={"id": id})
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)
@ -93,7 +96,7 @@ class GraphEmbeddingsQueryService(FlowProcessor):
help=f'Number of concurrent requests (default: {default_concurrency})'
)
def run():
def run() -> None:
Processor.launch(default_ident, __doc__)

View file

@ -1,8 +1,11 @@
"""
Graph embeddings store base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import GraphEmbeddings
@ -49,7 +52,7 @@ class GraphEmbeddingsStoreService(FlowProcessor):
raise e
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)

View file

@ -1,8 +1,11 @@
"""
LLM text completion base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import time
import logging
from prometheus_client import Histogram, Info
@ -42,6 +45,12 @@ class LlmChunk:
__slots__ = ["text", "in_token", "out_token", "model", "is_final"]
class LlmService(FlowProcessor):
"""
Extensible service processing requests to Large Language Models (LLMs).
This class handles the core logic of dispatching text completion or chat requests
to integrated underlying LLM providers (e.g. OpenAI, vertex ai).
"""
def __init__(self, **params):
@ -199,7 +208,7 @@ class LlmService(FlowProcessor):
properties={"id": id}
)
def supports_streaming(self):
def supports_streaming(self) -> bool:
"""
Override in subclass to indicate streaming support.
Returns False by default.
@ -215,7 +224,7 @@ class LlmService(FlowProcessor):
raise NotImplementedError("Streaming not implemented for this provider")
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
parser.add_argument(
'-c', '--concurrency',

View file

@ -11,7 +11,9 @@ Supports dual output to console and Loki for centralized log aggregation.
import contextvars
import logging
import logging.handlers
from argparse import ArgumentParser
from queue import Queue
from typing import Any
import os
@ -53,7 +55,7 @@ class _ProcessorIdFilter(logging.Filter):
return True
def add_logging_args(parser):
def add_logging_args(parser: ArgumentParser) -> None:
"""
Add standard logging arguments to an argument parser.
@ -100,7 +102,7 @@ def add_logging_args(parser):
)
def setup_logging(args):
def setup_logging(args: dict[str, Any]) -> None:
"""
Configure logging from parsed command-line arguments.

View file

@ -1,10 +1,19 @@
from __future__ import annotations
from typing import Any
from prometheus_client import start_http_server, Info, Enum, Histogram
from prometheus_client import Counter
class ConsumerMetrics:
"""
Metrics tracking and reporting for flow consumers.
This class manages prometheus metrics specifically related to consumers
within the flow, including state, requests, processing time, and queues.
"""
def __init__(self, processor, flow, name):
def __init__(self, processor: str, flow: str, name: str) -> None:
self.processor = processor
self.flow = flow
@ -35,30 +44,30 @@ class ConsumerMetrics:
["processor", "flow", "name"],
)
def process(self, status):
def process(self, status: str) -> None:
__class__.processing_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
status=status
).inc()
def rate_limit(self):
def rate_limit(self) -> None:
__class__.rate_limit_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).inc()
def state(self, state):
def state(self, state: str) -> None:
__class__.state_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).state(state)
def record_time(self):
def record_time(self) -> Any:
return __class__.request_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).time()
class ProducerMetrics:
def __init__(self, processor, flow, name):
def __init__(self, processor: str, flow: str, name: str) -> None:
self.processor = processor
self.flow = flow
@ -70,13 +79,13 @@ class ProducerMetrics:
["processor", "flow", "name"],
)
def inc(self):
def inc(self) -> None:
__class__.producer_metric.labels(
processor = self.processor, flow = self.flow, name = self.name
).inc()
class ProcessorMetrics:
def __init__(self, processor):
def __init__(self, processor: str) -> None:
self.processor = processor
@ -86,14 +95,14 @@ class ProcessorMetrics:
["processor"]
)
def info(self, info):
def info(self, info: dict[str, str]) -> None:
__class__.processor_metric.labels(
processor = self.processor
).info(info)
class SubscriberMetrics:
def __init__(self, processor, flow, name):
def __init__(self, processor: str, flow: str, name: str) -> None:
self.processor = processor
self.flow = flow
@ -118,19 +127,18 @@ class SubscriberMetrics:
["processor", "flow", "name"],
)
def received(self):
def received(self) -> None:
__class__.received_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).inc()
def state(self, state):
def state(self, state: str) -> None:
__class__.state_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).state(state)
def dropped(self, state):
def dropped(self, state: str) -> None:
__class__.dropped_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).inc()

View file

@ -1,21 +1,23 @@
from __future__ import annotations
from typing import Any
from . spec import Spec
class Parameter:
def __init__(self, value):
def __init__(self, value: Any) -> None:
self.value = value
async def start(self):
async def start(self) -> None:
pass
async def stop(self):
async def stop(self) -> None:
pass
class ParameterSpec(Spec):
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def add(self, flow, processor, definition):
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
value = definition.get(self.name, None)
flow.parameter[self.name] = Parameter(value)

View file

@ -1,14 +1,17 @@
from __future__ import annotations
from typing import Any
from . producer import Producer
from . metrics import ProducerMetrics
from . spec import Spec
class ProducerSpec(Spec):
def __init__(self, name, schema):
def __init__(self, name: str, schema: Any) -> None:
self.name = name
self.schema = schema
def add(self, flow, processor, definition):
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
producer_metrics = ProducerMetrics(
processor = flow.id, flow = flow.name, name = self.name
@ -22,4 +25,3 @@ class ProducerSpec(Spec):
)
flow.producer[self.name] = producer

View file

@ -1,6 +1,7 @@
import json
import asyncio
import inspect
from dataclasses import dataclass
from typing import Optional, Any
@ -80,7 +81,7 @@ class PromptClient(RequestResponse):
if resp.text is not None:
if chunk_callback:
if asyncio.iscoroutinefunction(chunk_callback):
if inspect.iscoroutinefunction(chunk_callback):
await chunk_callback(resp.text, end_stream)
else:
chunk_callback(resp.text, end_stream)

View file

@ -1,6 +1,9 @@
from __future__ import annotations
import os
import logging
from argparse import ArgumentParser
from typing import Any
logger = logging.getLogger(__name__)
@ -15,7 +18,7 @@ DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest')
DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/')
def get_pubsub(**config):
def get_pubsub(**config: Any) -> Any:
"""
Factory function to create a pub/sub backend based on configuration.
@ -51,7 +54,7 @@ def get_pubsub(**config):
STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650'
def add_pubsub_args(parser, standalone=False):
def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
"""Add pub/sub CLI arguments to an argument parser.
Args:

View file

@ -1,7 +1,9 @@
from __future__ import annotations
import uuid
import asyncio
import logging
from typing import Any
from . subscriber import Subscriber
from . producer import Producer
@ -115,7 +117,7 @@ class RequestResponseSpec(Spec):
self.response_schema = response_schema
self.impl = impl
def add(self, flow, processor, definition):
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
request_metrics = ProducerMetrics(
processor = flow.id, flow = flow.name, name = self.request_name

View file

@ -1,3 +1,6 @@
from __future__ import annotations
from typing import Any
from . metrics import SubscriberMetrics
from . subscriber import Subscriber
@ -5,11 +8,11 @@ from . spec import Spec
class SubscriberSpec(Spec):
def __init__(self, name, schema):
def __init__(self, name: str, schema: Any) -> None:
self.name = name
self.schema = schema
def add(self, flow, processor, definition):
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
subscriber_metrics = SubscriberMetrics(
processor = flow.id, flow = flow.name, name = self.name
@ -27,4 +30,3 @@ class SubscriberSpec(Spec):
# Put it in the consumer map, does that work?
# It means it gets start/stop call.
flow.consumer[self.name] = subscriber

View file

@ -5,6 +5,13 @@ from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import ToolRequest, ToolResponse
class ToolClient(RequestResponse):
"""
Client for invoking tools over the flow messaging fabric.
This class provides an interface to abstract away the messaging mechanics
and provides a direct awaitable mechanism for invoking tools and
getting their responses.
"""
async def invoke(self, name, parameters={}, timeout=600):

View file

@ -1,8 +1,11 @@
"""
Tool invocation base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import json
import logging
from prometheus_client import Counter
@ -112,7 +115,7 @@ class ToolService(FlowProcessor):
)
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
parser.add_argument(
'-c', '--concurrency',

View file

@ -1,3 +1,6 @@
from __future__ import annotations
from typing import Any
from . request_response_spec import RequestResponse, RequestResponseSpec
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Term, IRI, LITERAL, TRIPLE
@ -11,7 +14,7 @@ class Triple:
self.o = o
def to_value(x):
def to_value(x: Any) -> Any:
"""Convert schema Term to Uri or Literal."""
if x.type == IRI:
return Uri(x.iri)
@ -21,7 +24,7 @@ def to_value(x):
return Literal(x.value or x.iri)
def from_value(x):
def from_value(x: Any) -> Any:
"""Convert Uri, Literal, string, or Term to schema Term."""
if x is None:
return None

View file

@ -1,9 +1,12 @@
"""
Triples query service. Input is a (s, p, o) triple, some values may be
null. Output is a list of triples.
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Error
@ -108,7 +111,7 @@ class TriplesQueryService(FlowProcessor):
yield [], True
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)
@ -119,7 +122,7 @@ class TriplesQueryService(FlowProcessor):
help=f'Number of concurrent requests (default: {default_concurrency})'
)
def run():
def run() -> None:
Processor.launch(default_ident, __doc__)

View file

@ -1,8 +1,11 @@
"""
Triples store base class
"""
from __future__ import annotations
from argparse import ArgumentParser
import logging
from .. schema import Triples
@ -15,6 +18,12 @@ logger = logging.getLogger(__name__)
default_ident = "triples-write"
class TriplesStoreService(FlowProcessor):
"""
Component for maintaining the triples store.
This service acts as a processor in the flow that receives knowledge triples
and writes them persistently into an overarching graph database or equivalent backend.
"""
def __init__(self, **params):
@ -47,7 +56,7 @@ class TriplesStoreService(FlowProcessor):
raise e
@staticmethod
def add_args(parser):
def add_args(parser: ArgumentParser) -> None:
FlowProcessor.add_args(parser)

View file

@ -13,7 +13,7 @@ Agent provenance tracks the reasoning trace of agent sessions:
"""
import json
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any
from .. schema import Triple, Term, IRI, LITERAL
@ -87,7 +87,7 @@ def agent_session_triples(
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
triples = [
_triple(session_uri, RDF_TYPE, _iri(PROV_ENTITY)),

View file

@ -2,7 +2,7 @@
Helper functions to build PROV-O triples for extraction-time provenance.
"""
from datetime import datetime
from datetime import datetime, timezone
from typing import List, Optional
from .. schema import Triple, Term, IRI, LITERAL, TRIPLE
@ -192,7 +192,7 @@ def derived_entity_triples(
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
act_uri = activity_uri()
agt_uri = agent_uri(component_name)
@ -309,7 +309,7 @@ def subgraph_provenance_triples(
List of Triple objects for the provenance
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
act_uri = activity_uri()
agt_uri = agent_uri(component_name)
@ -386,7 +386,7 @@ def question_triples(
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
triples = [
_triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)),
@ -640,7 +640,7 @@ def docrag_question_triples(
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
triples = [
_triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)),

View file

@ -9,7 +9,7 @@ librarian integration.
import json
import logging
import uuid
from datetime import datetime
from datetime import datetime, timezone
from ... schema import AgentRequest, AgentResponse, AgentStep, Error
from ... schema import Triples, Metadata
@ -253,7 +253,7 @@ class PatternBase:
collection, respond, streaming,
parent_uri=None):
"""Emit provenance triples for a new session."""
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
triples = set_graph(
agent_session_triples(
session_uri, question, timestamp,

View file

@ -10,7 +10,7 @@ import sys
import functools
import logging
import uuid
from datetime import datetime
from datetime import datetime, timezone
# Module logger
logger = logging.getLogger(__name__)
@ -452,7 +452,7 @@ class Processor(AgentService):
# On first iteration, emit session triples
if iteration_num == 1:
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
triples = set_graph(
agent_session_triples(session_uri, request.question, timestamp),
GRAPH_RETRIEVAL

View file

@ -6,6 +6,7 @@ Provides comprehensive error handling, retry logic, and graceful degradation.
import logging
import time
import asyncio
import inspect
from typing import Dict, Any, List, Optional, Callable, Union, Type
from dataclasses import dataclass
from enum import Enum
@ -244,7 +245,7 @@ class ErrorRecoveryStrategy:
await asyncio.sleep(delay)
try:
if asyncio.iscoroutinefunction(operation):
if inspect.iscoroutinefunction(operation):
return await operation(*args, **kwargs)
else:
return operation(*args, **kwargs)
@ -260,7 +261,7 @@ class ErrorRecoveryStrategy:
if fallback_func:
logger.info(f"Executing fallback for {context.category.value}")
try:
if asyncio.iscoroutinefunction(fallback_func):
if inspect.iscoroutinefunction(fallback_func):
return await fallback_func(context, *args, **kwargs)
else:
return fallback_func(context, *args, **kwargs)
@ -420,7 +421,7 @@ def with_error_handling(category: ErrorCategory,
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
@ -469,7 +470,7 @@ def with_error_handling(category: ErrorCategory,
cause=e
)
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper

View file

@ -6,6 +6,7 @@ Provides comprehensive monitoring of system performance, query patterns, and res
import logging
import time
import asyncio
import inspect
import threading
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
@ -579,7 +580,7 @@ def monitor_performance(component: str,
async def async_wrapper(*args, **kwargs):
if not monitor or not monitor.monitoring_enabled:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
@ -591,7 +592,7 @@ def monitor_performance(component: str,
success = True
try:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
@ -603,7 +604,7 @@ def monitor_performance(component: str,
duration = monitor.metrics_collector.stop_timer(timer)
monitor.record_request(component, operation, duration, success)
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return wrapper

View file

@ -2,7 +2,7 @@
import asyncio
import logging
import uuid
from datetime import datetime
from datetime import datetime, timezone
# Provenance imports
from trustgraph.provenance import (
@ -199,7 +199,7 @@ class DocumentRag:
exp_uri = docrag_exploration_uri(session_id)
syn_uri = docrag_synthesis_uri(session_id)
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Emit question explainability immediately
if explain_callback:

View file

@ -7,7 +7,7 @@ import math
import time
import uuid
from collections import OrderedDict
from datetime import datetime
from datetime import datetime, timezone
from ... schema import Term, Triple as SchemaTriple, IRI, LITERAL, TRIPLE
from ... knowledge import Uri, Literal
@ -643,7 +643,7 @@ class GraphRag:
foc_uri = make_focus_uri(session_id)
syn_uri = make_synthesis_uri(session_id)
timestamp = datetime.utcnow().isoformat() + "Z"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Emit question explainability immediately
if explain_callback: