mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-03 23:11:00 +02:00
Compare commits
6 commits
2f64ffc99d
...
645b6a66fd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
645b6a66fd | ||
|
|
ef8bb3aed4 | ||
|
|
95e4839da7 | ||
|
|
706e62b7c2 | ||
|
|
22096e07e2 | ||
|
|
fdb52a6bfc |
43 changed files with 270 additions and 138 deletions
|
|
@ -18,6 +18,20 @@ from trustgraph.schema import ExtractedObject, Metadata, RowSchema, Field
|
|||
class TestRowsCassandraIntegration:
|
||||
"""Integration tests for Cassandra row storage with unified table"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_async_execute(self):
|
||||
"""Route async_execute through session.execute so the mock's
|
||||
side_effect handles all CQL (DDL and DML) uniformly and every
|
||||
call lands in mock_session.execute.call_args_list."""
|
||||
async def _fake(session, query, params=None):
|
||||
session.execute(query, params)
|
||||
return []
|
||||
with patch(
|
||||
'trustgraph.storage.rows.cassandra.write.async_execute',
|
||||
new=_fake,
|
||||
):
|
||||
yield
|
||||
|
||||
@pytest.fixture
|
||||
def mock_cassandra_session(self):
|
||||
"""Mock Cassandra session for integration tests"""
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
[pytest]
|
||||
testpaths = tests
|
||||
python_paths = .
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
|
|
@ -8,7 +7,7 @@ addopts =
|
|||
-v
|
||||
--tb=short
|
||||
--strict-markers
|
||||
--disable-warnings
|
||||
# --disable-warnings
|
||||
# --cov-fail-under=80
|
||||
asyncio_mode = auto
|
||||
markers =
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ tool usage patterns.
|
|||
import pytest
|
||||
from unittest.mock import Mock, AsyncMock
|
||||
import asyncio
|
||||
import inspect
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
|
|
@ -133,7 +134,7 @@ class TestToolCoordinationLogic:
|
|||
resolved_params[key] = value
|
||||
|
||||
# Execute tool
|
||||
if asyncio.iscoroutinefunction(tool_function):
|
||||
if inspect.iscoroutinefunction(tool_function):
|
||||
result = await tool_function(**resolved_params)
|
||||
else:
|
||||
result = tool_function(**resolved_params)
|
||||
|
|
@ -227,7 +228,7 @@ class TestToolCoordinationLogic:
|
|||
# Simulate async execution with delay
|
||||
await asyncio.sleep(0.001) # Small delay to simulate work
|
||||
|
||||
if asyncio.iscoroutinefunction(tool_function):
|
||||
if inspect.iscoroutinefunction(tool_function):
|
||||
result = await tool_function(**parameters)
|
||||
else:
|
||||
result = tool_function(**parameters)
|
||||
|
|
@ -337,7 +338,7 @@ class TestToolCoordinationLogic:
|
|||
if attempt > 0:
|
||||
await asyncio.sleep(0.001 * (self.backoff_factor ** attempt))
|
||||
|
||||
if asyncio.iscoroutinefunction(tool_function):
|
||||
if inspect.iscoroutinefunction(tool_function):
|
||||
result = await tool_function(**parameters)
|
||||
else:
|
||||
result = tool_function(**parameters)
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ def test_setup_logging_without_loki_configures_console(monkeypatch):
|
|||
kwargs = basic_config.call_args.kwargs
|
||||
assert kwargs["level"] == logging.DEBUG
|
||||
assert kwargs["force"] is True
|
||||
assert "processor-1" in kwargs["format"]
|
||||
assert "%(processor_id)s" in kwargs["format"]
|
||||
assert len(kwargs["handlers"]) == 1
|
||||
logger.info.assert_called_once_with("Logging configured with level: debug")
|
||||
|
||||
|
|
@ -60,11 +60,14 @@ def test_setup_logging_with_loki_enables_queue_listener(monkeypatch):
|
|||
queue_listener = MagicMock()
|
||||
loki_handler = MagicMock()
|
||||
|
||||
noisy_logger = MagicMock()
|
||||
logger_map = {
|
||||
None: root_logger,
|
||||
"trustgraph.base.logging": module_logger,
|
||||
"urllib3": urllib3_logger,
|
||||
"urllib3.connectionpool": connectionpool_logger,
|
||||
"pika": noisy_logger,
|
||||
"cassandra": noisy_logger,
|
||||
}
|
||||
|
||||
monkeypatch.setattr(logging, "basicConfig", basic_config)
|
||||
|
|
|
|||
|
|
@ -330,7 +330,8 @@ class TestUnifiedTableQueries:
|
|||
"""Test queries against the unified rows table"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_with_index_match(self):
|
||||
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
|
||||
async def test_query_with_index_match(self, mock_async_execute):
|
||||
"""Test query execution with matching index"""
|
||||
processor = MagicMock()
|
||||
processor.session = MagicMock()
|
||||
|
|
@ -340,10 +341,10 @@ class TestUnifiedTableQueries:
|
|||
processor.find_matching_index = Processor.find_matching_index.__get__(processor, Processor)
|
||||
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
|
||||
|
||||
# Mock session execute to return test data
|
||||
# Mock async_execute to return test data
|
||||
mock_row = MagicMock()
|
||||
mock_row.data = {"id": "123", "name": "Test Product", "category": "electronics"}
|
||||
processor.session.execute.return_value = [mock_row]
|
||||
mock_async_execute.return_value = [mock_row]
|
||||
|
||||
schema = RowSchema(
|
||||
name="products",
|
||||
|
|
@ -366,12 +367,12 @@ class TestUnifiedTableQueries:
|
|||
|
||||
# Verify Cassandra was connected and queried
|
||||
processor.connect_cassandra.assert_called_once()
|
||||
processor.session.execute.assert_called_once()
|
||||
mock_async_execute.assert_called_once()
|
||||
|
||||
# Verify query structure - should query unified rows table
|
||||
call_args = processor.session.execute.call_args
|
||||
query = call_args[0][0]
|
||||
params = call_args[0][1]
|
||||
call_args = mock_async_execute.call_args
|
||||
query = call_args[0][1]
|
||||
params = call_args[0][2]
|
||||
|
||||
assert "SELECT data, source FROM test_user.rows" in query
|
||||
assert "collection = %s" in query
|
||||
|
|
@ -390,7 +391,8 @@ class TestUnifiedTableQueries:
|
|||
assert results[0]["category"] == "electronics"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_without_index_match(self):
|
||||
@patch('trustgraph.query.rows.cassandra.service.async_execute', new_callable=AsyncMock)
|
||||
async def test_query_without_index_match(self, mock_async_execute):
|
||||
"""Test query execution without matching index (scan mode)"""
|
||||
processor = MagicMock()
|
||||
processor.session = MagicMock()
|
||||
|
|
@ -401,12 +403,12 @@ class TestUnifiedTableQueries:
|
|||
processor._matches_filters = Processor._matches_filters.__get__(processor, Processor)
|
||||
processor.query_cassandra = Processor.query_cassandra.__get__(processor, Processor)
|
||||
|
||||
# Mock session execute to return test data
|
||||
# Mock async_execute to return test data
|
||||
mock_row1 = MagicMock()
|
||||
mock_row1.data = {"id": "1", "name": "Product A", "price": "100"}
|
||||
mock_row2 = MagicMock()
|
||||
mock_row2.data = {"id": "2", "name": "Product B", "price": "200"}
|
||||
processor.session.execute.return_value = [mock_row1, mock_row2]
|
||||
mock_async_execute.return_value = [mock_row1, mock_row2]
|
||||
|
||||
schema = RowSchema(
|
||||
name="products",
|
||||
|
|
@ -428,8 +430,8 @@ class TestUnifiedTableQueries:
|
|||
)
|
||||
|
||||
# Query should use ALLOW FILTERING for scan
|
||||
call_args = processor.session.execute.call_args
|
||||
query = call_args[0][0]
|
||||
call_args = mock_async_execute.call_args
|
||||
query = call_args[0][1]
|
||||
|
||||
assert "ALLOW FILTERING" in query
|
||||
|
||||
|
|
|
|||
|
|
@ -72,7 +72,6 @@ def processor(mock_pulsar_client, sample_schemas):
|
|||
return proc
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestNLPQueryProcessor:
|
||||
"""Test NLP Query service processor"""
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,6 @@ def processor(mock_pulsar_client):
|
|||
return proc
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestStructuredQueryProcessor:
|
||||
"""Test Structured Query service processor"""
|
||||
|
||||
|
|
|
|||
|
|
@ -160,7 +160,8 @@ class TestRowsCassandraStorageLogic:
|
|||
assert id_field.primary is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_object_processing_stores_data_map(self):
|
||||
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
|
||||
async def test_object_processing_stores_data_map(self, mock_async_execute):
|
||||
"""Test that row processing stores data as map<text, text>"""
|
||||
processor = MagicMock()
|
||||
processor.schemas = {
|
||||
|
|
@ -184,6 +185,8 @@ class TestRowsCassandraStorageLogic:
|
|||
processor.collection_exists = MagicMock(return_value=True)
|
||||
processor.on_object = Processor.on_object.__get__(processor, Processor)
|
||||
|
||||
mock_async_execute.return_value = []
|
||||
|
||||
# Create test object
|
||||
test_obj = ExtractedObject(
|
||||
metadata=Metadata(
|
||||
|
|
@ -205,10 +208,10 @@ class TestRowsCassandraStorageLogic:
|
|||
await processor.on_object(msg, None, None)
|
||||
|
||||
# Verify insert was executed
|
||||
processor.session.execute.assert_called()
|
||||
insert_call = processor.session.execute.call_args
|
||||
insert_cql = insert_call[0][0]
|
||||
values = insert_call[0][1]
|
||||
mock_async_execute.assert_called()
|
||||
insert_call = mock_async_execute.call_args
|
||||
insert_cql = insert_call[0][1]
|
||||
values = insert_call[0][2]
|
||||
|
||||
# Verify using unified rows table
|
||||
assert "INSERT INTO test_user.rows" in insert_cql
|
||||
|
|
@ -222,7 +225,8 @@ class TestRowsCassandraStorageLogic:
|
|||
assert values[5] == "" # source
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_object_processing_multiple_indexes(self):
|
||||
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
|
||||
async def test_object_processing_multiple_indexes(self, mock_async_execute):
|
||||
"""Test that row is written once per indexed field"""
|
||||
processor = MagicMock()
|
||||
processor.schemas = {
|
||||
|
|
@ -246,6 +250,8 @@ class TestRowsCassandraStorageLogic:
|
|||
processor.collection_exists = MagicMock(return_value=True)
|
||||
processor.on_object = Processor.on_object.__get__(processor, Processor)
|
||||
|
||||
mock_async_execute.return_value = []
|
||||
|
||||
test_obj = ExtractedObject(
|
||||
metadata=Metadata(
|
||||
id="test-001",
|
||||
|
|
@ -264,12 +270,12 @@ class TestRowsCassandraStorageLogic:
|
|||
await processor.on_object(msg, None, None)
|
||||
|
||||
# Should have 3 inserts (one per indexed field: id, category, status)
|
||||
assert processor.session.execute.call_count == 3
|
||||
assert mock_async_execute.call_count == 3
|
||||
|
||||
# Check that different index_names were used
|
||||
index_names_used = set()
|
||||
for call in processor.session.execute.call_args_list:
|
||||
values = call[0][1]
|
||||
for call in mock_async_execute.call_args_list:
|
||||
values = call[0][2]
|
||||
index_names_used.add(values[2]) # index_name is 3rd value
|
||||
|
||||
assert index_names_used == {"id", "category", "status"}
|
||||
|
|
@ -279,7 +285,8 @@ class TestRowsCassandraStorageBatchLogic:
|
|||
"""Test batch processing logic for unified table implementation"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_batch_object_processing(self):
|
||||
@patch('trustgraph.storage.rows.cassandra.write.async_execute', new_callable=AsyncMock)
|
||||
async def test_batch_object_processing(self, mock_async_execute):
|
||||
"""Test processing of batch ExtractedObjects"""
|
||||
processor = MagicMock()
|
||||
processor.schemas = {
|
||||
|
|
@ -302,6 +309,8 @@ class TestRowsCassandraStorageBatchLogic:
|
|||
processor.collection_exists = MagicMock(return_value=True)
|
||||
processor.on_object = Processor.on_object.__get__(processor, Processor)
|
||||
|
||||
mock_async_execute.return_value = []
|
||||
|
||||
# Create batch object with multiple values
|
||||
batch_obj = ExtractedObject(
|
||||
metadata=Metadata(
|
||||
|
|
@ -325,12 +334,12 @@ class TestRowsCassandraStorageBatchLogic:
|
|||
await processor.on_object(msg, None, None)
|
||||
|
||||
# Should have 3 inserts (one per row, one index per row since only primary key)
|
||||
assert processor.session.execute.call_count == 3
|
||||
assert mock_async_execute.call_count == 3
|
||||
|
||||
# Check each insert has different id
|
||||
ids_inserted = set()
|
||||
for call in processor.session.execute.call_args_list:
|
||||
values = call[0][1]
|
||||
for call in mock_async_execute.call_args_list:
|
||||
values = call[0][2]
|
||||
ids_inserted.add(tuple(values[3])) # index_value is 4th value
|
||||
|
||||
assert ids_inserted == {("001",), ("002",), ("003",)}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ with hand-built fake rows.
|
|||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import Mock, AsyncMock, patch
|
||||
|
||||
from trustgraph.tables.knowledge import KnowledgeTableStore
|
||||
from trustgraph.schema import (
|
||||
|
|
@ -35,7 +35,10 @@ def _make_store():
|
|||
class TestGetGraphEmbeddings:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_row_converts_to_entity_embeddings_with_singular_vector(self):
|
||||
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
|
||||
async def test_row_converts_to_entity_embeddings_with_singular_vector(
|
||||
self, mock_async_execute
|
||||
):
|
||||
"""
|
||||
Cassandra rows return entities as a list of [entity_tuple, vector]
|
||||
pairs in row[3]. The deserializer must construct EntityEmbeddings
|
||||
|
|
@ -56,8 +59,8 @@ class TestGetGraphEmbeddings:
|
|||
|
||||
store = _make_store()
|
||||
store.cassandra = Mock()
|
||||
store.cassandra.execute = Mock(return_value=[fake_row])
|
||||
store.get_graph_embeddings_stmt = Mock()
|
||||
mock_async_execute.return_value = [fake_row]
|
||||
|
||||
received = []
|
||||
|
||||
|
|
@ -72,7 +75,8 @@ class TestGetGraphEmbeddings:
|
|||
)
|
||||
|
||||
# Assert
|
||||
store.cassandra.execute.assert_called_once_with(
|
||||
mock_async_execute.assert_called_once_with(
|
||||
store.cassandra,
|
||||
store.get_graph_embeddings_stmt,
|
||||
("alice", "doc-1"),
|
||||
)
|
||||
|
|
@ -102,15 +106,16 @@ class TestGetGraphEmbeddings:
|
|||
assert ge.entities[2].entity.value == "a literal entity"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_entities_blob_yields_empty_list(self):
|
||||
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
|
||||
async def test_empty_entities_blob_yields_empty_list(self, mock_async_execute):
|
||||
"""row[3] being None / empty must produce a GraphEmbeddings with
|
||||
no entities, not raise."""
|
||||
fake_row = (None, None, None, None)
|
||||
|
||||
store = _make_store()
|
||||
store.cassandra = Mock()
|
||||
store.cassandra.execute = Mock(return_value=[fake_row])
|
||||
store.get_graph_embeddings_stmt = Mock()
|
||||
mock_async_execute.return_value = [fake_row]
|
||||
|
||||
received = []
|
||||
|
||||
|
|
@ -123,7 +128,8 @@ class TestGetGraphEmbeddings:
|
|||
assert received[0].entities == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_rows_each_emit_one_message(self):
|
||||
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
|
||||
async def test_multiple_rows_each_emit_one_message(self, mock_async_execute):
|
||||
fake_rows = [
|
||||
(None, None, None, [
|
||||
(("http://example.org/a", True), [1.0]),
|
||||
|
|
@ -135,8 +141,8 @@ class TestGetGraphEmbeddings:
|
|||
|
||||
store = _make_store()
|
||||
store.cassandra = Mock()
|
||||
store.cassandra.execute = Mock(return_value=fake_rows)
|
||||
store.get_graph_embeddings_stmt = Mock()
|
||||
mock_async_execute.return_value = fake_rows
|
||||
|
||||
received = []
|
||||
|
||||
|
|
@ -157,7 +163,8 @@ class TestGetTriples:
|
|||
the same Metadata construction. Cover it for parity."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_row_converts_to_triples(self):
|
||||
@patch('trustgraph.tables.knowledge.async_execute', new_callable=AsyncMock)
|
||||
async def test_row_converts_to_triples(self, mock_async_execute):
|
||||
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri)
|
||||
fake_row = (
|
||||
None, None, None,
|
||||
|
|
@ -172,8 +179,8 @@ class TestGetTriples:
|
|||
|
||||
store = _make_store()
|
||||
store.cassandra = Mock()
|
||||
store.cassandra.execute = Mock(return_value=[fake_row])
|
||||
store.get_triples_stmt = Mock()
|
||||
mock_async_execute.return_value = [fake_row]
|
||||
|
||||
received = []
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Agent manager service completion base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import time
|
||||
import logging
|
||||
from prometheus_client import Histogram
|
||||
|
|
@ -97,7 +100,7 @@ class AgentService(FlowProcessor):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from typing import Any, Callable
|
||||
|
||||
# Base class for processors. Implements:
|
||||
# - Pub/sub client, subscribe and consume basic
|
||||
|
|
@ -178,20 +182,20 @@ class AsyncProcessor:
|
|||
|
||||
# This is called to stop all threads. An over-ride point for extra
|
||||
# functionality
|
||||
def stop(self):
|
||||
def stop(self) -> None:
|
||||
self.pubsub_backend.close()
|
||||
self.running = False
|
||||
|
||||
# Returns the pub/sub backend (new interface)
|
||||
@property
|
||||
def pubsub(self): return self.pubsub_backend
|
||||
def pubsub(self) -> Any: return self.pubsub_backend
|
||||
|
||||
# Returns the pulsar host (backward compatibility)
|
||||
@property
|
||||
def pulsar_host(self): return self._pulsar_host
|
||||
def pulsar_host(self) -> str: return self._pulsar_host
|
||||
|
||||
# Register a new event handler for configuration change
|
||||
def register_config_handler(self, handler, types=None):
|
||||
def register_config_handler(self, handler: Callable[..., Any], types: list[type] | None = None) -> None:
|
||||
self.config_handlers.append({
|
||||
"handler": handler,
|
||||
"types": set(types) if types else None,
|
||||
|
|
@ -295,13 +299,13 @@ class AsyncProcessor:
|
|||
raise e
|
||||
|
||||
@classmethod
|
||||
def setup_logging(cls, args):
|
||||
def setup_logging(cls, args: dict[str, Any]) -> None:
|
||||
"""Configure logging for the entire application"""
|
||||
setup_logging(args)
|
||||
|
||||
# Startup fabric. launch calls launch_async in async mode.
|
||||
@classmethod
|
||||
def launch(cls, ident, doc):
|
||||
def launch(cls, ident: str, doc: str) -> None:
|
||||
|
||||
# Start assembling CLI arguments
|
||||
parser = argparse.ArgumentParser(
|
||||
|
|
@ -374,7 +378,7 @@ class AsyncProcessor:
|
|||
# The command-line arguments are built using a stack of add_args
|
||||
# invocations
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
add_pubsub_args(parser)
|
||||
add_logging_args(parser)
|
||||
|
|
|
|||
|
|
@ -1,16 +1,19 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . metrics import ConsumerMetrics
|
||||
from . consumer import Consumer
|
||||
from . spec import Spec
|
||||
|
||||
class ConsumerSpec(Spec):
|
||||
def __init__(self, name, schema, handler, concurrency = 1):
|
||||
def __init__(self, name: str, schema: Any, handler: Any, concurrency: int = 1) -> None:
|
||||
self.name = name
|
||||
self.schema = schema
|
||||
self.handler = handler
|
||||
self.concurrency = concurrency
|
||||
|
||||
def add(self, flow, processor, definition):
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
consumer_metrics = ConsumerMetrics(
|
||||
processor = flow.id, flow = flow.name, name = self.name,
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
|
||||
"""
|
||||
Document embeddings query service. Input is vectors. Output is list of
|
||||
embeddings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
|
||||
|
|
@ -82,7 +85,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor):
|
|||
await flow("response").send(r, properties={"id": id})
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
@ -93,7 +96,7 @@ class DocumentEmbeddingsQueryService(FlowProcessor):
|
|||
help=f'Number of concurrent requests (default: {default_concurrency})'
|
||||
)
|
||||
|
||||
def run():
|
||||
def run() -> None:
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Document embeddings store base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import DocumentEmbeddings
|
||||
|
|
@ -49,7 +52,7 @@ class DocumentEmbeddingsStoreService(FlowProcessor):
|
|||
raise e
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
"""
|
||||
Base class for dynamically pluggable tool services.
|
||||
|
||||
|
|
@ -11,6 +10,10 @@ Uses direct Pulsar topics (no flow configuration required):
|
|||
- Response: non-persistent://tg/response/{topic}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import json
|
||||
import logging
|
||||
import asyncio
|
||||
|
|
@ -173,7 +176,7 @@ class DynamicToolService(AsyncProcessor):
|
|||
raise NotImplementedError("Subclasses must implement invoke()")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Embeddings resolution base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import time
|
||||
import logging
|
||||
from prometheus_client import Histogram
|
||||
|
|
@ -100,7 +103,7 @@ class EmbeddingsService(FlowProcessor):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--concurrency',
|
||||
|
|
@ -112,4 +115,3 @@ class EmbeddingsService(FlowProcessor):
|
|||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,13 @@
|
|||
import asyncio
|
||||
|
||||
class Flow:
|
||||
"""
|
||||
Runtime representation of a deployed flow process.
|
||||
|
||||
This class maintains internal processor states and orchestrates
|
||||
lifecycles (start, stop) for inputs (consumers) and parameters
|
||||
that drive data flowing across linked nodes.
|
||||
"""
|
||||
def __init__(self, id, flow, processor, defn):
|
||||
|
||||
self.id = id
|
||||
|
|
|
|||
|
|
@ -1,3 +1,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from argparse import ArgumentParser
|
||||
|
||||
# Base class for processor with management of flows in & out which are managed
|
||||
# by configuration. This is probably all processor types, except for the
|
||||
|
|
@ -41,7 +45,7 @@ class FlowProcessor(AsyncProcessor):
|
|||
logger.info("Service initialised.")
|
||||
|
||||
# Register a configuration variable
|
||||
def register_specification(self, spec):
|
||||
def register_specification(self, spec: Any) -> None:
|
||||
self.specifications.append(spec)
|
||||
|
||||
# Start processing for a new flow
|
||||
|
|
@ -99,7 +103,7 @@ class FlowProcessor(AsyncProcessor):
|
|||
await super(FlowProcessor, self).start()
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import logging
|
||||
|
||||
|
|
@ -9,7 +12,7 @@ from .. knowledge import Uri, Literal
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def to_value(x):
|
||||
def to_value(x: Any) -> Any:
|
||||
"""Convert schema Term to Uri or Literal."""
|
||||
if x.type == IRI:
|
||||
return Uri(x.iri)
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
|
||||
"""
|
||||
Graph embeddings query service. Input is vectors. Output is list of
|
||||
embeddings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
|
||||
|
|
@ -82,7 +85,7 @@ class GraphEmbeddingsQueryService(FlowProcessor):
|
|||
await flow("response").send(r, properties={"id": id})
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
@ -93,7 +96,7 @@ class GraphEmbeddingsQueryService(FlowProcessor):
|
|||
help=f'Number of concurrent requests (default: {default_concurrency})'
|
||||
)
|
||||
|
||||
def run():
|
||||
def run() -> None:
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Graph embeddings store base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import GraphEmbeddings
|
||||
|
|
@ -49,7 +52,7 @@ class GraphEmbeddingsStoreService(FlowProcessor):
|
|||
raise e
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
LLM text completion base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import time
|
||||
import logging
|
||||
from prometheus_client import Histogram, Info
|
||||
|
|
@ -42,6 +45,12 @@ class LlmChunk:
|
|||
__slots__ = ["text", "in_token", "out_token", "model", "is_final"]
|
||||
|
||||
class LlmService(FlowProcessor):
|
||||
"""
|
||||
Extensible service processing requests to Large Language Models (LLMs).
|
||||
|
||||
This class handles the core logic of dispatching text completion or chat requests
|
||||
to integrated underlying LLM providers (e.g. OpenAI, vertex ai).
|
||||
"""
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
|
|
@ -199,7 +208,7 @@ class LlmService(FlowProcessor):
|
|||
properties={"id": id}
|
||||
)
|
||||
|
||||
def supports_streaming(self):
|
||||
def supports_streaming(self) -> bool:
|
||||
"""
|
||||
Override in subclass to indicate streaming support.
|
||||
Returns False by default.
|
||||
|
|
@ -215,7 +224,7 @@ class LlmService(FlowProcessor):
|
|||
raise NotImplementedError("Streaming not implemented for this provider")
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--concurrency',
|
||||
|
|
|
|||
|
|
@ -11,7 +11,9 @@ Supports dual output to console and Loki for centralized log aggregation.
|
|||
import contextvars
|
||||
import logging
|
||||
import logging.handlers
|
||||
from argparse import ArgumentParser
|
||||
from queue import Queue
|
||||
from typing import Any
|
||||
import os
|
||||
|
||||
|
||||
|
|
@ -53,7 +55,7 @@ class _ProcessorIdFilter(logging.Filter):
|
|||
return True
|
||||
|
||||
|
||||
def add_logging_args(parser):
|
||||
def add_logging_args(parser: ArgumentParser) -> None:
|
||||
"""
|
||||
Add standard logging arguments to an argument parser.
|
||||
|
||||
|
|
@ -100,7 +102,7 @@ def add_logging_args(parser):
|
|||
)
|
||||
|
||||
|
||||
def setup_logging(args):
|
||||
def setup_logging(args: dict[str, Any]) -> None:
|
||||
"""
|
||||
Configure logging from parsed command-line arguments.
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,19 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from prometheus_client import start_http_server, Info, Enum, Histogram
|
||||
from prometheus_client import Counter
|
||||
|
||||
class ConsumerMetrics:
|
||||
"""
|
||||
Metrics tracking and reporting for flow consumers.
|
||||
|
||||
This class manages prometheus metrics specifically related to consumers
|
||||
within the flow, including state, requests, processing time, and queues.
|
||||
"""
|
||||
|
||||
def __init__(self, processor, flow, name):
|
||||
def __init__(self, processor: str, flow: str, name: str) -> None:
|
||||
|
||||
self.processor = processor
|
||||
self.flow = flow
|
||||
|
|
@ -35,30 +44,30 @@ class ConsumerMetrics:
|
|||
["processor", "flow", "name"],
|
||||
)
|
||||
|
||||
def process(self, status):
|
||||
def process(self, status: str) -> None:
|
||||
__class__.processing_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
status=status
|
||||
).inc()
|
||||
|
||||
def rate_limit(self):
|
||||
def rate_limit(self) -> None:
|
||||
__class__.rate_limit_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).inc()
|
||||
|
||||
def state(self, state):
|
||||
def state(self, state: str) -> None:
|
||||
__class__.state_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).state(state)
|
||||
|
||||
def record_time(self):
|
||||
def record_time(self) -> Any:
|
||||
return __class__.request_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).time()
|
||||
|
||||
class ProducerMetrics:
|
||||
|
||||
def __init__(self, processor, flow, name):
|
||||
def __init__(self, processor: str, flow: str, name: str) -> None:
|
||||
|
||||
self.processor = processor
|
||||
self.flow = flow
|
||||
|
|
@ -70,13 +79,13 @@ class ProducerMetrics:
|
|||
["processor", "flow", "name"],
|
||||
)
|
||||
|
||||
def inc(self):
|
||||
def inc(self) -> None:
|
||||
__class__.producer_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name
|
||||
).inc()
|
||||
|
||||
class ProcessorMetrics:
|
||||
def __init__(self, processor):
|
||||
def __init__(self, processor: str) -> None:
|
||||
|
||||
self.processor = processor
|
||||
|
||||
|
|
@ -86,14 +95,14 @@ class ProcessorMetrics:
|
|||
["processor"]
|
||||
)
|
||||
|
||||
def info(self, info):
|
||||
def info(self, info: dict[str, str]) -> None:
|
||||
__class__.processor_metric.labels(
|
||||
processor = self.processor
|
||||
).info(info)
|
||||
|
||||
class SubscriberMetrics:
|
||||
|
||||
def __init__(self, processor, flow, name):
|
||||
def __init__(self, processor: str, flow: str, name: str) -> None:
|
||||
|
||||
self.processor = processor
|
||||
self.flow = flow
|
||||
|
|
@ -118,19 +127,18 @@ class SubscriberMetrics:
|
|||
["processor", "flow", "name"],
|
||||
)
|
||||
|
||||
def received(self):
|
||||
def received(self) -> None:
|
||||
__class__.received_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).inc()
|
||||
|
||||
def state(self, state):
|
||||
def state(self, state: str) -> None:
|
||||
|
||||
__class__.state_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).state(state)
|
||||
|
||||
def dropped(self, state):
|
||||
def dropped(self, state: str) -> None:
|
||||
__class__.dropped_metric.labels(
|
||||
processor = self.processor, flow = self.flow, name = self.name,
|
||||
).inc()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,21 +1,23 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . spec import Spec
|
||||
|
||||
class Parameter:
|
||||
def __init__(self, value):
|
||||
def __init__(self, value: Any) -> None:
|
||||
self.value = value
|
||||
async def start(self):
|
||||
async def start(self) -> None:
|
||||
pass
|
||||
async def stop(self):
|
||||
async def stop(self) -> None:
|
||||
pass
|
||||
|
||||
class ParameterSpec(Spec):
|
||||
def __init__(self, name):
|
||||
def __init__(self, name: str) -> None:
|
||||
self.name = name
|
||||
|
||||
def add(self, flow, processor, definition):
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
value = definition.get(self.name, None)
|
||||
|
||||
flow.parameter[self.name] = Parameter(value)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,14 +1,17 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . producer import Producer
|
||||
from . metrics import ProducerMetrics
|
||||
from . spec import Spec
|
||||
|
||||
class ProducerSpec(Spec):
|
||||
def __init__(self, name, schema):
|
||||
def __init__(self, name: str, schema: Any) -> None:
|
||||
self.name = name
|
||||
self.schema = schema
|
||||
|
||||
def add(self, flow, processor, definition):
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
producer_metrics = ProducerMetrics(
|
||||
processor = flow.id, flow = flow.name, name = self.name
|
||||
|
|
@ -22,4 +25,3 @@ class ProducerSpec(Spec):
|
|||
)
|
||||
|
||||
flow.producer[self.name] = producer
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
import json
|
||||
import asyncio
|
||||
import inspect
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Any
|
||||
|
||||
|
|
@ -80,7 +81,7 @@ class PromptClient(RequestResponse):
|
|||
|
||||
if resp.text is not None:
|
||||
if chunk_callback:
|
||||
if asyncio.iscoroutinefunction(chunk_callback):
|
||||
if inspect.iscoroutinefunction(chunk_callback):
|
||||
await chunk_callback(resp.text, end_stream)
|
||||
else:
|
||||
chunk_callback(resp.text, end_stream)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import logging
|
||||
from argparse import ArgumentParser
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -15,7 +18,7 @@ DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest')
|
|||
DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/')
|
||||
|
||||
|
||||
def get_pubsub(**config):
|
||||
def get_pubsub(**config: Any) -> Any:
|
||||
"""
|
||||
Factory function to create a pub/sub backend based on configuration.
|
||||
|
||||
|
|
@ -51,7 +54,7 @@ def get_pubsub(**config):
|
|||
STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650'
|
||||
|
||||
|
||||
def add_pubsub_args(parser, standalone=False):
|
||||
def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
|
||||
"""Add pub/sub CLI arguments to an argument parser.
|
||||
|
||||
Args:
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from . subscriber import Subscriber
|
||||
from . producer import Producer
|
||||
|
|
@ -115,7 +117,7 @@ class RequestResponseSpec(Spec):
|
|||
self.response_schema = response_schema
|
||||
self.impl = impl
|
||||
|
||||
def add(self, flow, processor, definition):
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
request_metrics = ProducerMetrics(
|
||||
processor = flow.id, flow = flow.name, name = self.request_name
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . metrics import SubscriberMetrics
|
||||
from . subscriber import Subscriber
|
||||
|
|
@ -5,11 +8,11 @@ from . spec import Spec
|
|||
|
||||
class SubscriberSpec(Spec):
|
||||
|
||||
def __init__(self, name, schema):
|
||||
def __init__(self, name: str, schema: Any) -> None:
|
||||
self.name = name
|
||||
self.schema = schema
|
||||
|
||||
def add(self, flow, processor, definition):
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
subscriber_metrics = SubscriberMetrics(
|
||||
processor = flow.id, flow = flow.name, name = self.name
|
||||
|
|
@ -27,4 +30,3 @@ class SubscriberSpec(Spec):
|
|||
# Put it in the consumer map, does that work?
|
||||
# It means it gets start/stop call.
|
||||
flow.consumer[self.name] = subscriber
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,13 @@ from . request_response_spec import RequestResponse, RequestResponseSpec
|
|||
from .. schema import ToolRequest, ToolResponse
|
||||
|
||||
class ToolClient(RequestResponse):
|
||||
"""
|
||||
Client for invoking tools over the flow messaging fabric.
|
||||
|
||||
This class provides an interface to abstract away the messaging mechanics
|
||||
and provides a direct awaitable mechanism for invoking tools and
|
||||
getting their responses.
|
||||
"""
|
||||
|
||||
async def invoke(self, name, parameters={}, timeout=600):
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Tool invocation base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import json
|
||||
import logging
|
||||
from prometheus_client import Counter
|
||||
|
|
@ -112,7 +115,7 @@ class ToolService(FlowProcessor):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--concurrency',
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . request_response_spec import RequestResponse, RequestResponseSpec
|
||||
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Term, IRI, LITERAL, TRIPLE
|
||||
|
|
@ -11,7 +14,7 @@ class Triple:
|
|||
self.o = o
|
||||
|
||||
|
||||
def to_value(x):
|
||||
def to_value(x: Any) -> Any:
|
||||
"""Convert schema Term to Uri or Literal."""
|
||||
if x.type == IRI:
|
||||
return Uri(x.iri)
|
||||
|
|
@ -21,7 +24,7 @@ def to_value(x):
|
|||
return Literal(x.value or x.iri)
|
||||
|
||||
|
||||
def from_value(x):
|
||||
def from_value(x: Any) -> Any:
|
||||
"""Convert Uri, Literal, string, or Term to schema Term."""
|
||||
if x is None:
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -1,9 +1,12 @@
|
|||
|
||||
"""
|
||||
Triples query service. Input is a (s, p, o) triple, some values may be
|
||||
null. Output is a list of triples.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Error
|
||||
|
|
@ -108,7 +111,7 @@ class TriplesQueryService(FlowProcessor):
|
|||
yield [], True
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
@ -119,7 +122,7 @@ class TriplesQueryService(FlowProcessor):
|
|||
help=f'Number of concurrent requests (default: {default_concurrency})'
|
||||
)
|
||||
|
||||
def run():
|
||||
def run() -> None:
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
|
||||
"""
|
||||
Triples store base class
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import logging
|
||||
|
||||
from .. schema import Triples
|
||||
|
|
@ -15,6 +18,12 @@ logger = logging.getLogger(__name__)
|
|||
default_ident = "triples-write"
|
||||
|
||||
class TriplesStoreService(FlowProcessor):
|
||||
"""
|
||||
Component for maintaining the triples store.
|
||||
|
||||
This service acts as a processor in the flow that receives knowledge triples
|
||||
and writes them persistently into an overarching graph database or equivalent backend.
|
||||
"""
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
|
|
@ -47,7 +56,7 @@ class TriplesStoreService(FlowProcessor):
|
|||
raise e
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
FlowProcessor.add_args(parser)
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ Agent provenance tracks the reasoning trace of agent sessions:
|
|||
"""
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
from .. schema import Triple, Term, IRI, LITERAL
|
||||
|
|
@ -87,7 +87,7 @@ def agent_session_triples(
|
|||
List of Triple objects
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
triples = [
|
||||
_triple(session_uri, RDF_TYPE, _iri(PROV_ENTITY)),
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
Helper functions to build PROV-O triples for extraction-time provenance.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional
|
||||
|
||||
from .. schema import Triple, Term, IRI, LITERAL, TRIPLE
|
||||
|
|
@ -192,7 +192,7 @@ def derived_entity_triples(
|
|||
List of Triple objects
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
act_uri = activity_uri()
|
||||
agt_uri = agent_uri(component_name)
|
||||
|
|
@ -309,7 +309,7 @@ def subgraph_provenance_triples(
|
|||
List of Triple objects for the provenance
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
act_uri = activity_uri()
|
||||
agt_uri = agent_uri(component_name)
|
||||
|
|
@ -386,7 +386,7 @@ def question_triples(
|
|||
List of Triple objects
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
triples = [
|
||||
_triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)),
|
||||
|
|
@ -640,7 +640,7 @@ def docrag_question_triples(
|
|||
List of Triple objects
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
triples = [
|
||||
_triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)),
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ librarian integration.
|
|||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from ... schema import AgentRequest, AgentResponse, AgentStep, Error
|
||||
from ... schema import Triples, Metadata
|
||||
|
|
@ -253,7 +253,7 @@ class PatternBase:
|
|||
collection, respond, streaming,
|
||||
parent_uri=None):
|
||||
"""Emit provenance triples for a new session."""
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
triples = set_graph(
|
||||
agent_session_triples(
|
||||
session_uri, question, timestamp,
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import sys
|
|||
import functools
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -452,7 +452,7 @@ class Processor(AgentService):
|
|||
|
||||
# On first iteration, emit session triples
|
||||
if iteration_num == 1:
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
triples = set_graph(
|
||||
agent_session_triples(session_uri, request.question, timestamp),
|
||||
GRAPH_RETRIEVAL
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ Provides comprehensive error handling, retry logic, and graceful degradation.
|
|||
import logging
|
||||
import time
|
||||
import asyncio
|
||||
import inspect
|
||||
from typing import Dict, Any, List, Optional, Callable, Union, Type
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
|
@ -244,7 +245,7 @@ class ErrorRecoveryStrategy:
|
|||
await asyncio.sleep(delay)
|
||||
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(operation):
|
||||
if inspect.iscoroutinefunction(operation):
|
||||
return await operation(*args, **kwargs)
|
||||
else:
|
||||
return operation(*args, **kwargs)
|
||||
|
|
@ -260,7 +261,7 @@ class ErrorRecoveryStrategy:
|
|||
if fallback_func:
|
||||
logger.info(f"Executing fallback for {context.category.value}")
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(fallback_func):
|
||||
if inspect.iscoroutinefunction(fallback_func):
|
||||
return await fallback_func(context, *args, **kwargs)
|
||||
else:
|
||||
return fallback_func(context, *args, **kwargs)
|
||||
|
|
@ -420,7 +421,7 @@ def with_error_handling(category: ErrorCategory,
|
|||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return await func(*args, **kwargs)
|
||||
else:
|
||||
return func(*args, **kwargs)
|
||||
|
|
@ -469,7 +470,7 @@ def with_error_handling(category: ErrorCategory,
|
|||
cause=e
|
||||
)
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ Provides comprehensive monitoring of system performance, query patterns, and res
|
|||
import logging
|
||||
import time
|
||||
import asyncio
|
||||
import inspect
|
||||
import threading
|
||||
from typing import Dict, Any, List, Optional, Callable
|
||||
from dataclasses import dataclass, field
|
||||
|
|
@ -579,7 +580,7 @@ def monitor_performance(component: str,
|
|||
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
if not monitor or not monitor.monitoring_enabled:
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return await func(*args, **kwargs)
|
||||
else:
|
||||
return func(*args, **kwargs)
|
||||
|
|
@ -591,7 +592,7 @@ def monitor_performance(component: str,
|
|||
|
||||
success = True
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
if inspect.iscoroutinefunction(func):
|
||||
result = await func(*args, **kwargs)
|
||||
else:
|
||||
result = func(*args, **kwargs)
|
||||
|
|
@ -603,7 +604,7 @@ def monitor_performance(component: str,
|
|||
duration = monitor.metrics_collector.stop_timer(timer)
|
||||
monitor.record_request(component, operation, duration, success)
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return wrapper
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Provenance imports
|
||||
from trustgraph.provenance import (
|
||||
|
|
@ -199,7 +199,7 @@ class DocumentRag:
|
|||
exp_uri = docrag_exploration_uri(session_id)
|
||||
syn_uri = docrag_synthesis_uri(session_id)
|
||||
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
# Emit question explainability immediately
|
||||
if explain_callback:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import math
|
|||
import time
|
||||
import uuid
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from ... schema import Term, Triple as SchemaTriple, IRI, LITERAL, TRIPLE
|
||||
from ... knowledge import Uri, Literal
|
||||
|
|
@ -643,7 +643,7 @@ class GraphRag:
|
|||
foc_uri = make_focus_uri(session_id)
|
||||
syn_uri = make_synthesis_uri(session_id)
|
||||
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
# Emit question explainability immediately
|
||||
if explain_callback:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue