From e89a5b5d2301ad9c56de70d5271d45b6e02e5e33 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 13 Aug 2025 16:07:58 +0100 Subject: [PATCH] Knowledge load utility CLI (#456) * Knowledge loader * More tests --- tests/unit/test_cli/__init__.py | 3 + tests/unit/test_cli/conftest.py | 48 ++ tests/unit/test_cli/test_load_knowledge.py | 479 ++++++++++++++++++ trustgraph-cli/pyproject.toml | 1 + .../trustgraph/cli/load_knowledge.py | 202 ++++++++ 5 files changed, 733 insertions(+) create mode 100644 tests/unit/test_cli/__init__.py create mode 100644 tests/unit/test_cli/conftest.py create mode 100644 tests/unit/test_cli/test_load_knowledge.py create mode 100644 trustgraph-cli/trustgraph/cli/load_knowledge.py diff --git a/tests/unit/test_cli/__init__.py b/tests/unit/test_cli/__init__.py new file mode 100644 index 00000000..cd3d007b --- /dev/null +++ b/tests/unit/test_cli/__init__.py @@ -0,0 +1,3 @@ +""" +Unit tests for CLI modules. +""" \ No newline at end of file diff --git a/tests/unit/test_cli/conftest.py b/tests/unit/test_cli/conftest.py new file mode 100644 index 00000000..b085345f --- /dev/null +++ b/tests/unit/test_cli/conftest.py @@ -0,0 +1,48 @@ +""" +Shared fixtures for CLI unit tests. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock + + +@pytest.fixture +def mock_websocket_connection(): + """Mock WebSocket connection for CLI tools.""" + mock_ws = MagicMock() + + # Create simple async functions that don't leave coroutines hanging + async def mock_send(data): + return None + + async def mock_recv(): + return "" + + async def mock_close(): + return None + + mock_ws.send = mock_send + mock_ws.recv = mock_recv + mock_ws.close = mock_close + return mock_ws + + +@pytest.fixture +def mock_pulsar_client(): + """Mock Pulsar client for CLI tools that use messaging.""" + mock_client = MagicMock() + mock_client.create_consumer = MagicMock() + mock_client.create_producer = MagicMock() + mock_client.close = MagicMock() + return mock_client + + +@pytest.fixture +def sample_metadata(): + """Sample metadata structure used across CLI tools.""" + return { + "id": "test-doc-123", + "metadata": [], + "user": "test-user", + "collection": "test-collection" + } \ No newline at end of file diff --git a/tests/unit/test_cli/test_load_knowledge.py b/tests/unit/test_cli/test_load_knowledge.py new file mode 100644 index 00000000..c7070200 --- /dev/null +++ b/tests/unit/test_cli/test_load_knowledge.py @@ -0,0 +1,479 @@ +""" +Unit tests for the load_knowledge CLI module. + +Tests the business logic of loading triples and entity contexts from Turtle files +while mocking WebSocket connections and external dependencies. +""" + +import pytest +import json +import tempfile +import asyncio +from unittest.mock import AsyncMock, Mock, patch, mock_open, MagicMock +from pathlib import Path + +from trustgraph.cli.load_knowledge import KnowledgeLoader, main + + +@pytest.fixture +def sample_turtle_content(): + """Sample Turtle RDF content for testing.""" + return """ +@prefix ex: . +@prefix foaf: . + +ex:john foaf:name "John Smith" ; + foaf:age "30" ; + foaf:knows ex:mary . + +ex:mary foaf:name "Mary Johnson" ; + foaf:email "mary@example.com" . +""" + + +@pytest.fixture +def temp_turtle_file(sample_turtle_content): + """Create a temporary Turtle file for testing.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write(sample_turtle_content) + f.flush() + yield f.name + + # Cleanup + Path(f.name).unlink(missing_ok=True) + + +@pytest.fixture +def mock_websocket(): + """Mock WebSocket connection.""" + mock_ws = MagicMock() + + async def async_send(data): + return None + + async def async_recv(): + return "" + + async def async_close(): + return None + + mock_ws.send = Mock(side_effect=async_send) + mock_ws.recv = Mock(side_effect=async_recv) + mock_ws.close = Mock(side_effect=async_close) + return mock_ws + + +@pytest.fixture +def knowledge_loader(): + """Create a KnowledgeLoader instance with test parameters.""" + return KnowledgeLoader( + files=["test.ttl"], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc-123", + url="ws://test.example.com/" + ) + + +class TestKnowledgeLoader: + """Test the KnowledgeLoader class business logic.""" + + def test_init_constructs_urls_correctly(self): + """Test that URLs are constructed properly.""" + loader = KnowledgeLoader( + files=["test.ttl"], + flow="my-flow", + user="user1", + collection="col1", + document_id="doc1", + url="ws://example.com/" + ) + + assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples" + assert loader.entity_contexts_url == "ws://example.com/api/v1/flow/my-flow/import/entity-contexts" + assert loader.user == "user1" + assert loader.collection == "col1" + assert loader.document_id == "doc1" + + def test_init_adds_trailing_slash(self): + """Test that trailing slash is added to URL if missing.""" + loader = KnowledgeLoader( + files=["test.ttl"], + flow="my-flow", + user="user1", + collection="col1", + document_id="doc1", + url="ws://example.com" # No trailing slash + ) + + assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples" + + @pytest.mark.asyncio + async def test_load_triples_sends_correct_messages(self, temp_turtle_file, mock_websocket): + """Test that triple loading sends correctly formatted messages.""" + loader = KnowledgeLoader( + files=[temp_turtle_file], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + await loader.load_triples(temp_turtle_file, mock_websocket) + + # Verify WebSocket send was called + assert mock_websocket.send.call_count > 0 + + # Check message format for one of the calls + sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list] + + # Verify message structure + sample_message = sent_messages[0] + assert "metadata" in sample_message + assert "triples" in sample_message + + metadata = sample_message["metadata"] + assert metadata["id"] == "test-doc" + assert metadata["user"] == "test-user" + assert metadata["collection"] == "test-collection" + assert isinstance(metadata["metadata"], list) + + triple = sample_message["triples"][0] + assert "s" in triple + assert "p" in triple + assert "o" in triple + + # Check Value structure + assert "v" in triple["s"] + assert "e" in triple["s"] + assert triple["s"]["e"] is True # Subject should be URI + + @pytest.mark.asyncio + async def test_load_entity_contexts_processes_literals_only(self, temp_turtle_file, mock_websocket): + """Test that entity contexts are created only for literals.""" + loader = KnowledgeLoader( + files=[temp_turtle_file], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + await loader.load_entity_contexts(temp_turtle_file, mock_websocket) + + # Get all sent messages + sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list] + + # Verify we got entity context messages + assert len(sent_messages) > 0 + + for message in sent_messages: + assert "metadata" in message + assert "entities" in message + + metadata = message["metadata"] + assert metadata["id"] == "test-doc" + assert metadata["user"] == "test-user" + assert metadata["collection"] == "test-collection" + + entity_context = message["entities"][0] + assert "entity" in entity_context + assert "context" in entity_context + + entity = entity_context["entity"] + assert "v" in entity + assert "e" in entity + assert entity["e"] is True # Entity should be URI (subject) + + # Context should be a string (the literal value) + assert isinstance(entity_context["context"], str) + + @pytest.mark.asyncio + async def test_load_entity_contexts_skips_uri_objects(self, mock_websocket): + """Test that URI objects don't generate entity contexts.""" + # Create turtle with only URI objects (no literals) + turtle_content = """ +@prefix ex: . +ex:john ex:knows ex:mary . +ex:mary ex:knows ex:bob . +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write(turtle_content) + f.flush() + + loader = KnowledgeLoader( + files=[f.name], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + await loader.load_entity_contexts(f.name, mock_websocket) + + Path(f.name).unlink(missing_ok=True) + + # Should not send any messages since there are no literals + mock_websocket.send.assert_not_called() + + @pytest.mark.asyncio + @patch('trustgraph.cli.load_knowledge.connect') + async def test_run_calls_both_loaders(self, mock_connect, knowledge_loader, temp_turtle_file): + """Test that run() calls both triple and entity context loaders.""" + knowledge_loader.files = [temp_turtle_file] + + # Create a simple mock websocket + mock_ws = MagicMock() + async def mock_send(data): + pass + mock_ws.send = mock_send + + # Create async context manager mock + async def mock_aenter(self): + return mock_ws + + async def mock_aexit(self, exc_type, exc_val, exc_tb): + return None + + mock_connection = MagicMock() + mock_connection.__aenter__ = mock_aenter + mock_connection.__aexit__ = mock_aexit + mock_connect.return_value = mock_connection + + # Create AsyncMock objects that can track calls properly + mock_load_triples = AsyncMock(return_value=None) + mock_load_contexts = AsyncMock(return_value=None) + + with patch.object(knowledge_loader, 'load_triples', mock_load_triples), \ + patch.object(knowledge_loader, 'load_entity_contexts', mock_load_contexts): + + await knowledge_loader.run() + + # Verify both methods were called + mock_load_triples.assert_called_once_with(temp_turtle_file, mock_ws) + mock_load_contexts.assert_called_once_with(temp_turtle_file, mock_ws) + + # Verify WebSocket connections were made to both URLs + assert mock_connect.call_count == 2 + + +class TestCLIArgumentParsing: + """Test CLI argument parsing and main function.""" + + @patch('trustgraph.cli.load_knowledge.KnowledgeLoader') + @patch('trustgraph.cli.load_knowledge.asyncio.run') + def test_main_parses_args_correctly(self, mock_asyncio_run, mock_loader_class): + """Test that main() parses arguments correctly.""" + mock_loader_instance = MagicMock() + mock_loader_class.return_value = mock_loader_instance + + test_args = [ + 'tg-load-knowledge', + '-i', 'doc-123', + '-f', 'my-flow', + '-U', 'my-user', + '-C', 'my-collection', + '-u', 'ws://custom.example.com/', + 'file1.ttl', + 'file2.ttl' + ] + + with patch('sys.argv', test_args): + main() + + # Verify KnowledgeLoader was instantiated with correct args + mock_loader_class.assert_called_once_with( + document_id='doc-123', + url='ws://custom.example.com/', + flow='my-flow', + files=['file1.ttl', 'file2.ttl'], + user='my-user', + collection='my-collection' + ) + + # Verify asyncio.run was called once + mock_asyncio_run.assert_called_once() + + @patch('trustgraph.cli.load_knowledge.KnowledgeLoader') + @patch('trustgraph.cli.load_knowledge.asyncio.run') + def test_main_uses_defaults(self, mock_asyncio_run, mock_loader_class): + """Test that main() uses default values when not specified.""" + mock_loader_instance = MagicMock() + mock_loader_class.return_value = mock_loader_instance + + test_args = [ + 'tg-load-knowledge', + '-i', 'doc-123', + 'file1.ttl' + ] + + with patch('sys.argv', test_args): + main() + + # Verify defaults were used + call_args = mock_loader_class.call_args[1] + assert call_args['flow'] == 'default' + assert call_args['user'] == 'trustgraph' + assert call_args['collection'] == 'default' + assert call_args['url'] == 'ws://localhost:8088/' + + +class TestErrorHandling: + """Test error handling scenarios.""" + + @pytest.mark.asyncio + async def test_load_triples_handles_invalid_turtle(self, mock_websocket): + """Test handling of invalid Turtle content.""" + # Create file with invalid Turtle content + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write("Invalid Turtle Content {{{") + f.flush() + + loader = KnowledgeLoader( + files=[f.name], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + # Should raise an exception for invalid Turtle + with pytest.raises(Exception): + await loader.load_triples(f.name, mock_websocket) + + Path(f.name).unlink(missing_ok=True) + + @pytest.mark.asyncio + async def test_load_entity_contexts_handles_invalid_turtle(self, mock_websocket): + """Test handling of invalid Turtle content in entity contexts.""" + # Create file with invalid Turtle content + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write("Invalid Turtle Content {{{") + f.flush() + + loader = KnowledgeLoader( + files=[f.name], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + # Should raise an exception for invalid Turtle + with pytest.raises(Exception): + await loader.load_entity_contexts(f.name, mock_websocket) + + Path(f.name).unlink(missing_ok=True) + + @pytest.mark.asyncio + @patch('trustgraph.cli.load_knowledge.connect') + @patch('builtins.print') # Mock print to avoid output during tests + async def test_run_handles_connection_errors(self, mock_print, mock_connect, knowledge_loader, temp_turtle_file): + """Test handling of WebSocket connection errors.""" + knowledge_loader.files = [temp_turtle_file] + + # Mock connection failure + mock_connect.side_effect = ConnectionError("Failed to connect") + + # Should not raise exception, just print error + await knowledge_loader.run() + + @patch('trustgraph.cli.load_knowledge.KnowledgeLoader') + @patch('trustgraph.cli.load_knowledge.asyncio.run') + @patch('trustgraph.cli.load_knowledge.time.sleep') + @patch('builtins.print') # Mock print to avoid output during tests + def test_main_retries_on_exception(self, mock_print, mock_sleep, mock_asyncio_run, mock_loader_class): + """Test that main() retries on exceptions.""" + mock_loader_instance = MagicMock() + mock_loader_class.return_value = mock_loader_instance + + # First call raises exception, second succeeds + mock_asyncio_run.side_effect = [Exception("Test error"), None] + + test_args = [ + 'tg-load-knowledge', + '-i', 'doc-123', + 'file1.ttl' + ] + + with patch('sys.argv', test_args): + main() + + # Should have been called twice (first failed, second succeeded) + assert mock_asyncio_run.call_count == 2 + mock_sleep.assert_called_once_with(10) + + +class TestDataValidation: + """Test data validation and edge cases.""" + + @pytest.mark.asyncio + async def test_empty_turtle_file(self, mock_websocket): + """Test handling of empty Turtle files.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write("") # Empty file + f.flush() + + loader = KnowledgeLoader( + files=[f.name], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + await loader.load_triples(f.name, mock_websocket) + await loader.load_entity_contexts(f.name, mock_websocket) + + # Should not send any messages for empty file + mock_websocket.send.assert_not_called() + + Path(f.name).unlink(missing_ok=True) + + @pytest.mark.asyncio + async def test_turtle_with_mixed_literals_and_uris(self, mock_websocket): + """Test handling of Turtle with mixed literal and URI objects.""" + turtle_content = """ +@prefix ex: . +ex:john ex:name "John Smith" ; + ex:age "25" ; + ex:knows ex:mary ; + ex:city "New York" . +ex:mary ex:name "Mary Johnson" . +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: + f.write(turtle_content) + f.flush() + + loader = KnowledgeLoader( + files=[f.name], + flow="test-flow", + user="test-user", + collection="test-collection", + document_id="test-doc" + ) + + await loader.load_entity_contexts(f.name, mock_websocket) + + sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list] + + # Should have 4 entity contexts (for the 4 literals: "John Smith", "25", "New York", "Mary Johnson") + # URI ex:mary should be skipped + assert len(sent_messages) == 4 + + # Verify all contexts are for literals (subjects should be URIs) + contexts = [] + for message in sent_messages: + entity_context = message["entities"][0] + assert entity_context["entity"]["e"] is True # Subject is URI + contexts.append(entity_context["context"]) + + assert "John Smith" in contexts + assert "25" in contexts + assert "New York" in contexts + assert "Mary Johnson" in contexts + + Path(f.name).unlink(missing_ok=True) \ No newline at end of file diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 6d11ed3e..02b8d958 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -50,6 +50,7 @@ tg-load-pdf = "trustgraph.cli.load_pdf:main" tg-load-sample-documents = "trustgraph.cli.load_sample_documents:main" tg-load-text = "trustgraph.cli.load_text:main" tg-load-turtle = "trustgraph.cli.load_turtle:main" +tg-load-knowledge = "trustgraph.cli.load_knowledge:main" tg-put-flow-class = "trustgraph.cli.put_flow_class:main" tg-put-kg-core = "trustgraph.cli.put_kg_core:main" tg-remove-library-document = "trustgraph.cli.remove_library_document:main" diff --git a/trustgraph-cli/trustgraph/cli/load_knowledge.py b/trustgraph-cli/trustgraph/cli/load_knowledge.py new file mode 100644 index 00000000..58081fa1 --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/load_knowledge.py @@ -0,0 +1,202 @@ +""" +Loads triples and entity contexts into the knowledge graph. +""" + +import asyncio +import argparse +import os +import time +import rdflib +import json +from websockets.asyncio.client import connect +from typing import List, Dict, Any + +from trustgraph.log_level import LogLevel + +default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/') +default_user = 'trustgraph' +default_collection = 'default' + +class KnowledgeLoader: + + def __init__( + self, + files, + flow, + user, + collection, + document_id, + url = default_url, + ): + + if not url.endswith("/"): + url += "/" + + self.triples_url = url + f"api/v1/flow/{flow}/import/triples" + self.entity_contexts_url = url + f"api/v1/flow/{flow}/import/entity-contexts" + + self.files = files + self.user = user + self.collection = collection + self.document_id = document_id + + async def run(self): + + try: + # Load triples first + async with connect(self.triples_url) as ws: + for file in self.files: + await self.load_triples(file, ws) + + # Then load entity contexts + async with connect(self.entity_contexts_url) as ws: + for file in self.files: + await self.load_entity_contexts(file, ws) + + except Exception as e: + print(e, flush=True) + + async def load_triples(self, file, ws): + + g = rdflib.Graph() + g.parse(file, format="turtle") + + def Value(value, is_uri): + return { "v": value, "e": is_uri } + + for e in g: + s = Value(value=str(e[0]), is_uri=True) + p = Value(value=str(e[1]), is_uri=True) + if type(e[2]) == rdflib.term.URIRef: + o = Value(value=str(e[2]), is_uri=True) + else: + o = Value(value=str(e[2]), is_uri=False) + + req = { + "metadata": { + "id": self.document_id, + "metadata": [], + "user": self.user, + "collection": self.collection + }, + "triples": [ + { + "s": s, + "p": p, + "o": o, + } + ] + } + + await ws.send(json.dumps(req)) + + async def load_entity_contexts(self, file, ws): + """ + Load entity contexts by extracting entities from the RDF graph + and generating contextual descriptions based on their relationships. + """ + + g = rdflib.Graph() + g.parse(file, format="turtle") + + for s, p, o in g: + # If object is a URI, do nothing + if isinstance(o, rdflib.term.URIRef): + continue + + # If object is a literal, create entity context for subject with literal as context + s_str = str(s) + o_str = str(o) + + req = { + "metadata": { + "id": self.document_id, + "metadata": [], + "user": self.user, + "collection": self.collection + }, + "entities": [ + { + "entity": { + "v": s_str, + "e": True + }, + "context": o_str + } + ] + } + + await ws.send(json.dumps(req)) + + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-load-knowledge', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-i', '--document-id', + required=True, + help=f'Document ID)', + ) + + parser.add_argument( + '-f', '--flow-id', + default="default", + help=f'Flow ID (default: default)' + ) + + parser.add_argument( + '-U', '--user', + default=default_user, + help=f'User ID (default: {default_user})' + ) + + parser.add_argument( + '-C', '--collection', + default=default_collection, + help=f'Collection ID (default: {default_collection})' + ) + + + parser.add_argument( + 'files', nargs='+', + help=f'Turtle files to load' + ) + + args = parser.parse_args() + + while True: + + try: + loader = KnowledgeLoader( + document_id = args.document_id, + url = args.api_url, + flow = args.flow_id, + files = args.files, + user = args.user, + collection = args.collection, + ) + + asyncio.run(loader.run()) + + print("Triples and entity contexts loaded.") + break + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + +if __name__ == "__main__": + main() \ No newline at end of file