Fixed, all tests pass

This commit is contained in:
Cyber MacGeddon 2025-12-04 21:07:32 +00:00
parent 2a77c4eb6f
commit 3f15929247

View file

@ -2,17 +2,16 @@
Unit tests for the load_knowledge CLI module. Unit tests for the load_knowledge CLI module.
Tests the business logic of loading triples and entity contexts from Turtle files Tests the business logic of loading triples and entity contexts from Turtle files
while mocking WebSocket connections and external dependencies. using the BulkClient API.
""" """
import pytest import pytest
import json
import tempfile import tempfile
import asyncio from unittest.mock import Mock, patch, MagicMock, call
from unittest.mock import AsyncMock, Mock, patch, mock_open, MagicMock
from pathlib import Path from pathlib import Path
from trustgraph.cli.load_knowledge import KnowledgeLoader, main from trustgraph.cli.load_knowledge import KnowledgeLoader, main
from trustgraph.api import Triple
@pytest.fixture @pytest.fixture
@ -43,26 +42,6 @@ def temp_turtle_file(sample_turtle_content):
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)
@pytest.fixture
def mock_websocket():
"""Mock WebSocket connection."""
mock_ws = MagicMock()
async def async_send(data):
return None
async def async_recv():
return ""
async def async_close():
return None
mock_ws.send = Mock(side_effect=async_send)
mock_ws.recv = Mock(side_effect=async_recv)
mock_ws.close = Mock(side_effect=async_close)
return mock_ws
@pytest.fixture @pytest.fixture
def knowledge_loader(): def knowledge_loader():
"""Create a KnowledgeLoader instance with test parameters.""" """Create a KnowledgeLoader instance with test parameters."""
@ -72,125 +51,66 @@ def knowledge_loader():
user="test-user", user="test-user",
collection="test-collection", collection="test-collection",
document_id="test-doc-123", document_id="test-doc-123",
url="ws://test.example.com/" url="http://test.example.com/",
token=None
) )
class TestKnowledgeLoader: class TestKnowledgeLoader:
"""Test the KnowledgeLoader class business logic.""" """Test the KnowledgeLoader class business logic."""
def test_init_constructs_urls_correctly(self): def test_init_stores_parameters_correctly(self):
"""Test that URLs are constructed properly.""" """Test that initialization stores parameters correctly."""
loader = KnowledgeLoader( loader = KnowledgeLoader(
files=["test.ttl"], files=["file1.ttl", "file2.ttl"],
flow="my-flow", flow="my-flow",
user="user1", user="user1",
collection="col1", collection="col1",
document_id="doc1", document_id="doc1",
url="ws://example.com/" url="http://example.com/",
token="test-token"
) )
assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples" assert loader.files == ["file1.ttl", "file2.ttl"]
assert loader.entity_contexts_url == "ws://example.com/api/v1/flow/my-flow/import/entity-contexts" assert loader.flow == "my-flow"
assert loader.user == "user1" assert loader.user == "user1"
assert loader.collection == "col1" assert loader.collection == "col1"
assert loader.document_id == "doc1" assert loader.document_id == "doc1"
assert loader.url == "http://example.com/"
assert loader.token == "test-token"
def test_init_adds_trailing_slash(self): def test_load_triples_from_file_yields_triples(self, temp_turtle_file, knowledge_loader):
"""Test that trailing slash is added to URL if missing.""" """Test that load_triples_from_file yields Triple objects."""
loader = KnowledgeLoader( triples = list(knowledge_loader.load_triples_from_file(temp_turtle_file))
files=["test.ttl"],
flow="my-flow",
user="user1",
collection="col1",
document_id="doc1",
url="ws://example.com" # No trailing slash
)
assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples" # Should have triples for all statements in the file
assert len(triples) > 0
@pytest.mark.asyncio # Verify they are Triple objects
async def test_load_triples_sends_correct_messages(self, temp_turtle_file, mock_websocket): for triple in triples:
"""Test that triple loading sends correctly formatted messages.""" assert isinstance(triple, Triple)
loader = KnowledgeLoader( assert hasattr(triple, 's')
files=[temp_turtle_file], assert hasattr(triple, 'p')
flow="test-flow", assert hasattr(triple, 'o')
user="test-user", assert isinstance(triple.s, str)
collection="test-collection", assert isinstance(triple.p, str)
document_id="test-doc" assert isinstance(triple.o, str)
)
await loader.load_triples(temp_turtle_file, mock_websocket) def test_load_entity_contexts_from_file_yields_literals_only(self, temp_turtle_file, knowledge_loader):
# Verify WebSocket send was called
assert mock_websocket.send.call_count > 0
# Check message format for one of the calls
sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list]
# Verify message structure
sample_message = sent_messages[0]
assert "metadata" in sample_message
assert "triples" in sample_message
metadata = sample_message["metadata"]
assert metadata["id"] == "test-doc"
assert metadata["user"] == "test-user"
assert metadata["collection"] == "test-collection"
assert isinstance(metadata["metadata"], list)
triple = sample_message["triples"][0]
assert "s" in triple
assert "p" in triple
assert "o" in triple
# Check Value structure
assert "v" in triple["s"]
assert "e" in triple["s"]
assert triple["s"]["e"] is True # Subject should be URI
@pytest.mark.asyncio
async def test_load_entity_contexts_processes_literals_only(self, temp_turtle_file, mock_websocket):
"""Test that entity contexts are created only for literals.""" """Test that entity contexts are created only for literals."""
loader = KnowledgeLoader( contexts = list(knowledge_loader.load_entity_contexts_from_file(temp_turtle_file))
files=[temp_turtle_file],
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc"
)
await loader.load_entity_contexts(temp_turtle_file, mock_websocket) # Should have contexts for literal objects (foaf:name, foaf:age, foaf:email)
assert len(contexts) > 0
# Get all sent messages # Verify format: (entity, context) tuples
sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list] for entity, context in contexts:
assert isinstance(entity, str)
assert isinstance(context, str)
# Entity should be a URI (subject)
assert entity.startswith("http://")
# Verify we got entity context messages def test_load_entity_contexts_skips_uri_objects(self):
assert len(sent_messages) > 0
for message in sent_messages:
assert "metadata" in message
assert "entities" in message
metadata = message["metadata"]
assert metadata["id"] == "test-doc"
assert metadata["user"] == "test-user"
assert metadata["collection"] == "test-collection"
entity_context = message["entities"][0]
assert "entity" in entity_context
assert "context" in entity_context
entity = entity_context["entity"]
assert "v" in entity
assert "e" in entity
assert entity["e"] is True # Entity should be URI (subject)
# Context should be a string (the literal value)
assert isinstance(entity_context["context"], str)
@pytest.mark.asyncio
async def test_load_entity_contexts_skips_uri_objects(self, mock_websocket):
"""Test that URI objects don't generate entity contexts.""" """Test that URI objects don't generate entity contexts."""
# Create turtle with only URI objects (no literals) # Create turtle with only URI objects (no literals)
turtle_content = """ turtle_content = """
@ -208,63 +128,68 @@ ex:mary ex:knows ex:bob .
flow="test-flow", flow="test-flow",
user="test-user", user="test-user",
collection="test-collection", collection="test-collection",
document_id="test-doc" document_id="test-doc",
url="http://test.example.com/"
) )
await loader.load_entity_contexts(f.name, mock_websocket) contexts = list(loader.load_entity_contexts_from_file(f.name))
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)
# Should not send any messages since there are no literals # Should have no contexts since there are no literals
mock_websocket.send.assert_not_called() assert len(contexts) == 0
@pytest.mark.asyncio @patch('trustgraph.cli.load_knowledge.Api')
@patch('trustgraph.cli.load_knowledge.connect') def test_run_calls_bulk_api(self, mock_api_class, temp_turtle_file):
async def test_run_calls_both_loaders(self, mock_connect, knowledge_loader, temp_turtle_file): """Test that run() uses BulkClient API."""
"""Test that run() calls both triple and entity context loaders.""" # Setup mocks
knowledge_loader.files = [temp_turtle_file] mock_api = MagicMock()
mock_bulk = MagicMock()
mock_api_class.return_value = mock_api
mock_api.bulk.return_value = mock_bulk
# Create a simple mock websocket loader = KnowledgeLoader(
mock_ws = MagicMock() files=[temp_turtle_file],
async def mock_send(data): flow="test-flow",
pass user="test-user",
mock_ws.send = mock_send collection="test-collection",
document_id="test-doc",
url="http://test.example.com/",
token="test-token"
)
# Create async context manager mock loader.run()
async def mock_aenter(self):
return mock_ws
async def mock_aexit(self, exc_type, exc_val, exc_tb): # Verify Api was created with correct parameters
return None mock_api_class.assert_called_once_with(
url="http://test.example.com/",
token="test-token"
)
mock_connection = MagicMock() # Verify bulk client was obtained
mock_connection.__aenter__ = mock_aenter mock_api.bulk.assert_called_once()
mock_connection.__aexit__ = mock_aexit
mock_connect.return_value = mock_connection
# Create AsyncMock objects that can track calls properly # Verify import_triples was called
mock_load_triples = AsyncMock(return_value=None) assert mock_bulk.import_triples.call_count == 1
mock_load_contexts = AsyncMock(return_value=None) call_args = mock_bulk.import_triples.call_args
assert call_args[1]['flow'] == "test-flow"
assert call_args[1]['metadata']['id'] == "test-doc"
assert call_args[1]['metadata']['user'] == "test-user"
assert call_args[1]['metadata']['collection'] == "test-collection"
with patch.object(knowledge_loader, 'load_triples', mock_load_triples), \ # Verify import_entity_contexts was called
patch.object(knowledge_loader, 'load_entity_contexts', mock_load_contexts): assert mock_bulk.import_entity_contexts.call_count == 1
call_args = mock_bulk.import_entity_contexts.call_args
await knowledge_loader.run() assert call_args[1]['flow'] == "test-flow"
assert call_args[1]['metadata']['id'] == "test-doc"
# Verify both methods were called
mock_load_triples.assert_called_once_with(temp_turtle_file, mock_ws)
mock_load_contexts.assert_called_once_with(temp_turtle_file, mock_ws)
# Verify WebSocket connections were made to both URLs
assert mock_connect.call_count == 2
class TestCLIArgumentParsing: class TestCLIArgumentParsing:
"""Test CLI argument parsing and main function.""" """Test CLI argument parsing and main function."""
@patch('trustgraph.cli.load_knowledge.KnowledgeLoader') @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
@patch('trustgraph.cli.load_knowledge.asyncio.run') @patch('trustgraph.cli.load_knowledge.time.sleep')
def test_main_parses_args_correctly(self, mock_asyncio_run, mock_loader_class): def test_main_parses_args_correctly(self, mock_sleep, mock_loader_class):
"""Test that main() parses arguments correctly.""" """Test that main() parses arguments correctly."""
mock_loader_instance = MagicMock() mock_loader_instance = MagicMock()
mock_loader_class.return_value = mock_loader_instance mock_loader_class.return_value = mock_loader_instance
@ -275,7 +200,8 @@ class TestCLIArgumentParsing:
'-f', 'my-flow', '-f', 'my-flow',
'-U', 'my-user', '-U', 'my-user',
'-C', 'my-collection', '-C', 'my-collection',
'-u', 'ws://custom.example.com/', '-u', 'http://custom.example.com/',
'-t', 'my-token',
'file1.ttl', 'file1.ttl',
'file2.ttl' 'file2.ttl'
] ]
@ -286,19 +212,20 @@ class TestCLIArgumentParsing:
# Verify KnowledgeLoader was instantiated with correct args # Verify KnowledgeLoader was instantiated with correct args
mock_loader_class.assert_called_once_with( mock_loader_class.assert_called_once_with(
document_id='doc-123', document_id='doc-123',
url='ws://custom.example.com/', url='http://custom.example.com/',
token='my-token',
flow='my-flow', flow='my-flow',
files=['file1.ttl', 'file2.ttl'], files=['file1.ttl', 'file2.ttl'],
user='my-user', user='my-user',
collection='my-collection' collection='my-collection'
) )
# Verify asyncio.run was called once # Verify run was called
mock_asyncio_run.assert_called_once() mock_loader_instance.run.assert_called_once()
@patch('trustgraph.cli.load_knowledge.KnowledgeLoader') @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
@patch('trustgraph.cli.load_knowledge.asyncio.run') @patch('trustgraph.cli.load_knowledge.time.sleep')
def test_main_uses_defaults(self, mock_asyncio_run, mock_loader_class): def test_main_uses_defaults(self, mock_sleep, mock_loader_class):
"""Test that main() uses default values when not specified.""" """Test that main() uses default values when not specified."""
mock_loader_instance = MagicMock() mock_loader_instance = MagicMock()
mock_loader_class.return_value = mock_loader_instance mock_loader_class.return_value = mock_loader_instance
@ -317,80 +244,69 @@ class TestCLIArgumentParsing:
assert call_args['flow'] == 'default' assert call_args['flow'] == 'default'
assert call_args['user'] == 'trustgraph' assert call_args['user'] == 'trustgraph'
assert call_args['collection'] == 'default' assert call_args['collection'] == 'default'
assert call_args['url'] == 'ws://localhost:8088/' assert call_args['url'] == 'http://localhost:8088/'
assert call_args['token'] is None
class TestErrorHandling: class TestErrorHandling:
"""Test error handling scenarios.""" """Test error handling scenarios."""
@pytest.mark.asyncio def test_load_triples_handles_invalid_turtle(self, knowledge_loader):
async def test_load_triples_handles_invalid_turtle(self, mock_websocket):
"""Test handling of invalid Turtle content.""" """Test handling of invalid Turtle content."""
# Create file with invalid Turtle content # Create file with invalid Turtle content
with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
f.write("Invalid Turtle Content {{{") f.write("Invalid Turtle Content {{{")
f.flush() f.flush()
loader = KnowledgeLoader(
files=[f.name],
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc"
)
# Should raise an exception for invalid Turtle # Should raise an exception for invalid Turtle
with pytest.raises(Exception): with pytest.raises(Exception):
await loader.load_triples(f.name, mock_websocket) list(knowledge_loader.load_triples_from_file(f.name))
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)
@pytest.mark.asyncio def test_load_entity_contexts_handles_invalid_turtle(self, knowledge_loader):
async def test_load_entity_contexts_handles_invalid_turtle(self, mock_websocket):
"""Test handling of invalid Turtle content in entity contexts.""" """Test handling of invalid Turtle content in entity contexts."""
# Create file with invalid Turtle content # Create file with invalid Turtle content
with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
f.write("Invalid Turtle Content {{{") f.write("Invalid Turtle Content {{{")
f.flush() f.flush()
loader = KnowledgeLoader(
files=[f.name],
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc"
)
# Should raise an exception for invalid Turtle # Should raise an exception for invalid Turtle
with pytest.raises(Exception): with pytest.raises(Exception):
await loader.load_entity_contexts(f.name, mock_websocket) list(knowledge_loader.load_entity_contexts_from_file(f.name))
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)
@pytest.mark.asyncio @patch('trustgraph.cli.load_knowledge.Api')
@patch('trustgraph.cli.load_knowledge.connect')
@patch('builtins.print') # Mock print to avoid output during tests @patch('builtins.print') # Mock print to avoid output during tests
async def test_run_handles_connection_errors(self, mock_print, mock_connect, knowledge_loader, temp_turtle_file): def test_run_handles_api_errors(self, mock_print, mock_api_class, temp_turtle_file):
"""Test handling of WebSocket connection errors.""" """Test handling of API errors."""
knowledge_loader.files = [temp_turtle_file] # Mock API to raise an error
mock_api_class.side_effect = Exception("API connection failed")
# Mock connection failure loader = KnowledgeLoader(
mock_connect.side_effect = ConnectionError("Failed to connect") files=[temp_turtle_file],
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc",
url="http://test.example.com/"
)
# Should not raise exception, just print error # Should raise the exception
await knowledge_loader.run() with pytest.raises(Exception, match="API connection failed"):
loader.run()
@patch('trustgraph.cli.load_knowledge.KnowledgeLoader') @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
@patch('trustgraph.cli.load_knowledge.asyncio.run')
@patch('trustgraph.cli.load_knowledge.time.sleep') @patch('trustgraph.cli.load_knowledge.time.sleep')
@patch('builtins.print') # Mock print to avoid output during tests @patch('builtins.print') # Mock print to avoid output during tests
def test_main_retries_on_exception(self, mock_print, mock_sleep, mock_asyncio_run, mock_loader_class): def test_main_retries_on_exception(self, mock_print, mock_sleep, mock_loader_class):
"""Test that main() retries on exceptions.""" """Test that main() retries on exceptions."""
mock_loader_instance = MagicMock() mock_loader_instance = MagicMock()
mock_loader_class.return_value = mock_loader_instance mock_loader_class.return_value = mock_loader_instance
# First call raises exception, second succeeds # First call raises exception, second succeeds
mock_asyncio_run.side_effect = [Exception("Test error"), None] mock_loader_instance.run.side_effect = [Exception("Test error"), None]
test_args = [ test_args = [
'tg-load-knowledge', 'tg-load-knowledge',
@ -402,38 +318,29 @@ class TestErrorHandling:
main() main()
# Should have been called twice (first failed, second succeeded) # Should have been called twice (first failed, second succeeded)
assert mock_asyncio_run.call_count == 2 assert mock_loader_instance.run.call_count == 2
mock_sleep.assert_called_once_with(10) mock_sleep.assert_called_once_with(10)
class TestDataValidation: class TestDataValidation:
"""Test data validation and edge cases.""" """Test data validation and edge cases."""
@pytest.mark.asyncio def test_empty_turtle_file(self, knowledge_loader):
async def test_empty_turtle_file(self, mock_websocket):
"""Test handling of empty Turtle files.""" """Test handling of empty Turtle files."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
f.write("") # Empty file f.write("") # Empty file
f.flush() f.flush()
loader = KnowledgeLoader( triples = list(knowledge_loader.load_triples_from_file(f.name))
files=[f.name], contexts = list(knowledge_loader.load_entity_contexts_from_file(f.name))
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc"
)
await loader.load_triples(f.name, mock_websocket) # Should return empty lists for empty file
await loader.load_entity_contexts(f.name, mock_websocket) assert len(triples) == 0
assert len(contexts) == 0
# Should not send any messages for empty file
mock_websocket.send.assert_not_called()
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)
@pytest.mark.asyncio def test_turtle_with_mixed_literals_and_uris(self, knowledge_loader):
async def test_turtle_with_mixed_literals_and_uris(self, mock_websocket):
"""Test handling of Turtle with mixed literal and URI objects.""" """Test handling of Turtle with mixed literal and URI objects."""
turtle_content = """ turtle_content = """
@prefix ex: <http://example.org/> . @prefix ex: <http://example.org/> .
@ -448,32 +355,18 @@ ex:mary ex:name "Mary Johnson" .
f.write(turtle_content) f.write(turtle_content)
f.flush() f.flush()
loader = KnowledgeLoader( contexts = list(knowledge_loader.load_entity_contexts_from_file(f.name))
files=[f.name],
flow="test-flow",
user="test-user",
collection="test-collection",
document_id="test-doc"
)
await loader.load_entity_contexts(f.name, mock_websocket)
sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list]
# Should have 4 entity contexts (for the 4 literals: "John Smith", "25", "New York", "Mary Johnson") # Should have 4 entity contexts (for the 4 literals: "John Smith", "25", "New York", "Mary Johnson")
# URI ex:mary should be skipped # URI ex:mary should be skipped
assert len(sent_messages) == 4 assert len(contexts) == 4
# Verify all contexts are for literals (subjects should be URIs) # Verify all contexts are for literals (subjects should be URIs)
contexts = [] context_values = [context for entity, context in contexts]
for message in sent_messages:
entity_context = message["entities"][0]
assert entity_context["entity"]["e"] is True # Subject is URI
contexts.append(entity_context["context"])
assert "John Smith" in contexts assert "John Smith" in context_values
assert "25" in contexts assert "25" in context_values
assert "New York" in contexts assert "New York" in context_values
assert "Mary Johnson" in contexts assert "Mary Johnson" in context_values
Path(f.name).unlink(missing_ok=True) Path(f.name).unlink(missing_ok=True)