diff --git a/tests/unit/test_cli/__init__.py b/tests/unit/test_cli/__init__.py
new file mode 100644
index 00000000..cd3d007b
--- /dev/null
+++ b/tests/unit/test_cli/__init__.py
@@ -0,0 +1,3 @@
+"""
+Unit tests for CLI modules.
+"""
\ No newline at end of file
diff --git a/tests/unit/test_cli/conftest.py b/tests/unit/test_cli/conftest.py
new file mode 100644
index 00000000..b085345f
--- /dev/null
+++ b/tests/unit/test_cli/conftest.py
@@ -0,0 +1,48 @@
+"""
+Shared fixtures for CLI unit tests.
+"""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock
+
+
+@pytest.fixture
+def mock_websocket_connection():
+ """Mock WebSocket connection for CLI tools."""
+ mock_ws = MagicMock()
+
+ # Create simple async functions that don't leave coroutines hanging
+ async def mock_send(data):
+ return None
+
+ async def mock_recv():
+ return ""
+
+ async def mock_close():
+ return None
+
+ mock_ws.send = mock_send
+ mock_ws.recv = mock_recv
+ mock_ws.close = mock_close
+ return mock_ws
+
+
+@pytest.fixture
+def mock_pulsar_client():
+ """Mock Pulsar client for CLI tools that use messaging."""
+ mock_client = MagicMock()
+ mock_client.create_consumer = MagicMock()
+ mock_client.create_producer = MagicMock()
+ mock_client.close = MagicMock()
+ return mock_client
+
+
+@pytest.fixture
+def sample_metadata():
+ """Sample metadata structure used across CLI tools."""
+ return {
+ "id": "test-doc-123",
+ "metadata": [],
+ "user": "test-user",
+ "collection": "test-collection"
+ }
\ No newline at end of file
diff --git a/tests/unit/test_cli/test_load_knowledge.py b/tests/unit/test_cli/test_load_knowledge.py
new file mode 100644
index 00000000..c7070200
--- /dev/null
+++ b/tests/unit/test_cli/test_load_knowledge.py
@@ -0,0 +1,479 @@
+"""
+Unit tests for the load_knowledge CLI module.
+
+Tests the business logic of loading triples and entity contexts from Turtle files
+while mocking WebSocket connections and external dependencies.
+"""
+
+import pytest
+import json
+import tempfile
+import asyncio
+from unittest.mock import AsyncMock, Mock, patch, mock_open, MagicMock
+from pathlib import Path
+
+from trustgraph.cli.load_knowledge import KnowledgeLoader, main
+
+
+@pytest.fixture
+def sample_turtle_content():
+ """Sample Turtle RDF content for testing."""
+ return """
+@prefix ex: .
+@prefix foaf: .
+
+ex:john foaf:name "John Smith" ;
+ foaf:age "30" ;
+ foaf:knows ex:mary .
+
+ex:mary foaf:name "Mary Johnson" ;
+ foaf:email "mary@example.com" .
+"""
+
+
+@pytest.fixture
+def temp_turtle_file(sample_turtle_content):
+ """Create a temporary Turtle file for testing."""
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write(sample_turtle_content)
+ f.flush()
+ yield f.name
+
+ # Cleanup
+ Path(f.name).unlink(missing_ok=True)
+
+
+@pytest.fixture
+def mock_websocket():
+ """Mock WebSocket connection."""
+ mock_ws = MagicMock()
+
+ async def async_send(data):
+ return None
+
+ async def async_recv():
+ return ""
+
+ async def async_close():
+ return None
+
+ mock_ws.send = Mock(side_effect=async_send)
+ mock_ws.recv = Mock(side_effect=async_recv)
+ mock_ws.close = Mock(side_effect=async_close)
+ return mock_ws
+
+
+@pytest.fixture
+def knowledge_loader():
+ """Create a KnowledgeLoader instance with test parameters."""
+ return KnowledgeLoader(
+ files=["test.ttl"],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc-123",
+ url="ws://test.example.com/"
+ )
+
+
+class TestKnowledgeLoader:
+ """Test the KnowledgeLoader class business logic."""
+
+ def test_init_constructs_urls_correctly(self):
+ """Test that URLs are constructed properly."""
+ loader = KnowledgeLoader(
+ files=["test.ttl"],
+ flow="my-flow",
+ user="user1",
+ collection="col1",
+ document_id="doc1",
+ url="ws://example.com/"
+ )
+
+ assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples"
+ assert loader.entity_contexts_url == "ws://example.com/api/v1/flow/my-flow/import/entity-contexts"
+ assert loader.user == "user1"
+ assert loader.collection == "col1"
+ assert loader.document_id == "doc1"
+
+ def test_init_adds_trailing_slash(self):
+ """Test that trailing slash is added to URL if missing."""
+ loader = KnowledgeLoader(
+ files=["test.ttl"],
+ flow="my-flow",
+ user="user1",
+ collection="col1",
+ document_id="doc1",
+ url="ws://example.com" # No trailing slash
+ )
+
+ assert loader.triples_url == "ws://example.com/api/v1/flow/my-flow/import/triples"
+
+ @pytest.mark.asyncio
+ async def test_load_triples_sends_correct_messages(self, temp_turtle_file, mock_websocket):
+ """Test that triple loading sends correctly formatted messages."""
+ loader = KnowledgeLoader(
+ files=[temp_turtle_file],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ await loader.load_triples(temp_turtle_file, mock_websocket)
+
+ # Verify WebSocket send was called
+ assert mock_websocket.send.call_count > 0
+
+ # Check message format for one of the calls
+ sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list]
+
+ # Verify message structure
+ sample_message = sent_messages[0]
+ assert "metadata" in sample_message
+ assert "triples" in sample_message
+
+ metadata = sample_message["metadata"]
+ assert metadata["id"] == "test-doc"
+ assert metadata["user"] == "test-user"
+ assert metadata["collection"] == "test-collection"
+ assert isinstance(metadata["metadata"], list)
+
+ triple = sample_message["triples"][0]
+ assert "s" in triple
+ assert "p" in triple
+ assert "o" in triple
+
+ # Check Value structure
+ assert "v" in triple["s"]
+ assert "e" in triple["s"]
+ assert triple["s"]["e"] is True # Subject should be URI
+
+ @pytest.mark.asyncio
+ async def test_load_entity_contexts_processes_literals_only(self, temp_turtle_file, mock_websocket):
+ """Test that entity contexts are created only for literals."""
+ loader = KnowledgeLoader(
+ files=[temp_turtle_file],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ await loader.load_entity_contexts(temp_turtle_file, mock_websocket)
+
+ # Get all sent messages
+ sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list]
+
+ # Verify we got entity context messages
+ assert len(sent_messages) > 0
+
+ for message in sent_messages:
+ assert "metadata" in message
+ assert "entities" in message
+
+ metadata = message["metadata"]
+ assert metadata["id"] == "test-doc"
+ assert metadata["user"] == "test-user"
+ assert metadata["collection"] == "test-collection"
+
+ entity_context = message["entities"][0]
+ assert "entity" in entity_context
+ assert "context" in entity_context
+
+ entity = entity_context["entity"]
+ assert "v" in entity
+ assert "e" in entity
+ assert entity["e"] is True # Entity should be URI (subject)
+
+ # Context should be a string (the literal value)
+ assert isinstance(entity_context["context"], str)
+
+ @pytest.mark.asyncio
+ async def test_load_entity_contexts_skips_uri_objects(self, mock_websocket):
+ """Test that URI objects don't generate entity contexts."""
+ # Create turtle with only URI objects (no literals)
+ turtle_content = """
+@prefix ex: .
+ex:john ex:knows ex:mary .
+ex:mary ex:knows ex:bob .
+"""
+
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write(turtle_content)
+ f.flush()
+
+ loader = KnowledgeLoader(
+ files=[f.name],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ await loader.load_entity_contexts(f.name, mock_websocket)
+
+ Path(f.name).unlink(missing_ok=True)
+
+ # Should not send any messages since there are no literals
+ mock_websocket.send.assert_not_called()
+
+ @pytest.mark.asyncio
+ @patch('trustgraph.cli.load_knowledge.connect')
+ async def test_run_calls_both_loaders(self, mock_connect, knowledge_loader, temp_turtle_file):
+ """Test that run() calls both triple and entity context loaders."""
+ knowledge_loader.files = [temp_turtle_file]
+
+ # Create a simple mock websocket
+ mock_ws = MagicMock()
+ async def mock_send(data):
+ pass
+ mock_ws.send = mock_send
+
+ # Create async context manager mock
+ async def mock_aenter(self):
+ return mock_ws
+
+ async def mock_aexit(self, exc_type, exc_val, exc_tb):
+ return None
+
+ mock_connection = MagicMock()
+ mock_connection.__aenter__ = mock_aenter
+ mock_connection.__aexit__ = mock_aexit
+ mock_connect.return_value = mock_connection
+
+ # Create AsyncMock objects that can track calls properly
+ mock_load_triples = AsyncMock(return_value=None)
+ mock_load_contexts = AsyncMock(return_value=None)
+
+ with patch.object(knowledge_loader, 'load_triples', mock_load_triples), \
+ patch.object(knowledge_loader, 'load_entity_contexts', mock_load_contexts):
+
+ await knowledge_loader.run()
+
+ # Verify both methods were called
+ mock_load_triples.assert_called_once_with(temp_turtle_file, mock_ws)
+ mock_load_contexts.assert_called_once_with(temp_turtle_file, mock_ws)
+
+ # Verify WebSocket connections were made to both URLs
+ assert mock_connect.call_count == 2
+
+
+class TestCLIArgumentParsing:
+ """Test CLI argument parsing and main function."""
+
+ @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
+ @patch('trustgraph.cli.load_knowledge.asyncio.run')
+ def test_main_parses_args_correctly(self, mock_asyncio_run, mock_loader_class):
+ """Test that main() parses arguments correctly."""
+ mock_loader_instance = MagicMock()
+ mock_loader_class.return_value = mock_loader_instance
+
+ test_args = [
+ 'tg-load-knowledge',
+ '-i', 'doc-123',
+ '-f', 'my-flow',
+ '-U', 'my-user',
+ '-C', 'my-collection',
+ '-u', 'ws://custom.example.com/',
+ 'file1.ttl',
+ 'file2.ttl'
+ ]
+
+ with patch('sys.argv', test_args):
+ main()
+
+ # Verify KnowledgeLoader was instantiated with correct args
+ mock_loader_class.assert_called_once_with(
+ document_id='doc-123',
+ url='ws://custom.example.com/',
+ flow='my-flow',
+ files=['file1.ttl', 'file2.ttl'],
+ user='my-user',
+ collection='my-collection'
+ )
+
+ # Verify asyncio.run was called once
+ mock_asyncio_run.assert_called_once()
+
+ @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
+ @patch('trustgraph.cli.load_knowledge.asyncio.run')
+ def test_main_uses_defaults(self, mock_asyncio_run, mock_loader_class):
+ """Test that main() uses default values when not specified."""
+ mock_loader_instance = MagicMock()
+ mock_loader_class.return_value = mock_loader_instance
+
+ test_args = [
+ 'tg-load-knowledge',
+ '-i', 'doc-123',
+ 'file1.ttl'
+ ]
+
+ with patch('sys.argv', test_args):
+ main()
+
+ # Verify defaults were used
+ call_args = mock_loader_class.call_args[1]
+ assert call_args['flow'] == 'default'
+ assert call_args['user'] == 'trustgraph'
+ assert call_args['collection'] == 'default'
+ assert call_args['url'] == 'ws://localhost:8088/'
+
+
+class TestErrorHandling:
+ """Test error handling scenarios."""
+
+ @pytest.mark.asyncio
+ async def test_load_triples_handles_invalid_turtle(self, mock_websocket):
+ """Test handling of invalid Turtle content."""
+ # Create file with invalid Turtle content
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write("Invalid Turtle Content {{{")
+ f.flush()
+
+ loader = KnowledgeLoader(
+ files=[f.name],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ # Should raise an exception for invalid Turtle
+ with pytest.raises(Exception):
+ await loader.load_triples(f.name, mock_websocket)
+
+ Path(f.name).unlink(missing_ok=True)
+
+ @pytest.mark.asyncio
+ async def test_load_entity_contexts_handles_invalid_turtle(self, mock_websocket):
+ """Test handling of invalid Turtle content in entity contexts."""
+ # Create file with invalid Turtle content
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write("Invalid Turtle Content {{{")
+ f.flush()
+
+ loader = KnowledgeLoader(
+ files=[f.name],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ # Should raise an exception for invalid Turtle
+ with pytest.raises(Exception):
+ await loader.load_entity_contexts(f.name, mock_websocket)
+
+ Path(f.name).unlink(missing_ok=True)
+
+ @pytest.mark.asyncio
+ @patch('trustgraph.cli.load_knowledge.connect')
+ @patch('builtins.print') # Mock print to avoid output during tests
+ async def test_run_handles_connection_errors(self, mock_print, mock_connect, knowledge_loader, temp_turtle_file):
+ """Test handling of WebSocket connection errors."""
+ knowledge_loader.files = [temp_turtle_file]
+
+ # Mock connection failure
+ mock_connect.side_effect = ConnectionError("Failed to connect")
+
+ # Should not raise exception, just print error
+ await knowledge_loader.run()
+
+ @patch('trustgraph.cli.load_knowledge.KnowledgeLoader')
+ @patch('trustgraph.cli.load_knowledge.asyncio.run')
+ @patch('trustgraph.cli.load_knowledge.time.sleep')
+ @patch('builtins.print') # Mock print to avoid output during tests
+ def test_main_retries_on_exception(self, mock_print, mock_sleep, mock_asyncio_run, mock_loader_class):
+ """Test that main() retries on exceptions."""
+ mock_loader_instance = MagicMock()
+ mock_loader_class.return_value = mock_loader_instance
+
+ # First call raises exception, second succeeds
+ mock_asyncio_run.side_effect = [Exception("Test error"), None]
+
+ test_args = [
+ 'tg-load-knowledge',
+ '-i', 'doc-123',
+ 'file1.ttl'
+ ]
+
+ with patch('sys.argv', test_args):
+ main()
+
+ # Should have been called twice (first failed, second succeeded)
+ assert mock_asyncio_run.call_count == 2
+ mock_sleep.assert_called_once_with(10)
+
+
+class TestDataValidation:
+ """Test data validation and edge cases."""
+
+ @pytest.mark.asyncio
+ async def test_empty_turtle_file(self, mock_websocket):
+ """Test handling of empty Turtle files."""
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write("") # Empty file
+ f.flush()
+
+ loader = KnowledgeLoader(
+ files=[f.name],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ await loader.load_triples(f.name, mock_websocket)
+ await loader.load_entity_contexts(f.name, mock_websocket)
+
+ # Should not send any messages for empty file
+ mock_websocket.send.assert_not_called()
+
+ Path(f.name).unlink(missing_ok=True)
+
+ @pytest.mark.asyncio
+ async def test_turtle_with_mixed_literals_and_uris(self, mock_websocket):
+ """Test handling of Turtle with mixed literal and URI objects."""
+ turtle_content = """
+@prefix ex: .
+ex:john ex:name "John Smith" ;
+ ex:age "25" ;
+ ex:knows ex:mary ;
+ ex:city "New York" .
+ex:mary ex:name "Mary Johnson" .
+"""
+
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.ttl', delete=False) as f:
+ f.write(turtle_content)
+ f.flush()
+
+ loader = KnowledgeLoader(
+ files=[f.name],
+ flow="test-flow",
+ user="test-user",
+ collection="test-collection",
+ document_id="test-doc"
+ )
+
+ await loader.load_entity_contexts(f.name, mock_websocket)
+
+ sent_messages = [json.loads(call.args[0]) for call in mock_websocket.send.call_args_list]
+
+ # Should have 4 entity contexts (for the 4 literals: "John Smith", "25", "New York", "Mary Johnson")
+ # URI ex:mary should be skipped
+ assert len(sent_messages) == 4
+
+ # Verify all contexts are for literals (subjects should be URIs)
+ contexts = []
+ for message in sent_messages:
+ entity_context = message["entities"][0]
+ assert entity_context["entity"]["e"] is True # Subject is URI
+ contexts.append(entity_context["context"])
+
+ assert "John Smith" in contexts
+ assert "25" in contexts
+ assert "New York" in contexts
+ assert "Mary Johnson" in contexts
+
+ Path(f.name).unlink(missing_ok=True)
\ No newline at end of file
diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml
index 6d11ed3e..02b8d958 100644
--- a/trustgraph-cli/pyproject.toml
+++ b/trustgraph-cli/pyproject.toml
@@ -50,6 +50,7 @@ tg-load-pdf = "trustgraph.cli.load_pdf:main"
tg-load-sample-documents = "trustgraph.cli.load_sample_documents:main"
tg-load-text = "trustgraph.cli.load_text:main"
tg-load-turtle = "trustgraph.cli.load_turtle:main"
+tg-load-knowledge = "trustgraph.cli.load_knowledge:main"
tg-put-flow-class = "trustgraph.cli.put_flow_class:main"
tg-put-kg-core = "trustgraph.cli.put_kg_core:main"
tg-remove-library-document = "trustgraph.cli.remove_library_document:main"
diff --git a/trustgraph-cli/trustgraph/cli/load_knowledge.py b/trustgraph-cli/trustgraph/cli/load_knowledge.py
new file mode 100644
index 00000000..58081fa1
--- /dev/null
+++ b/trustgraph-cli/trustgraph/cli/load_knowledge.py
@@ -0,0 +1,202 @@
+"""
+Loads triples and entity contexts into the knowledge graph.
+"""
+
+import asyncio
+import argparse
+import os
+import time
+import rdflib
+import json
+from websockets.asyncio.client import connect
+from typing import List, Dict, Any
+
+from trustgraph.log_level import LogLevel
+
+default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
+default_user = 'trustgraph'
+default_collection = 'default'
+
+class KnowledgeLoader:
+
+ def __init__(
+ self,
+ files,
+ flow,
+ user,
+ collection,
+ document_id,
+ url = default_url,
+ ):
+
+ if not url.endswith("/"):
+ url += "/"
+
+ self.triples_url = url + f"api/v1/flow/{flow}/import/triples"
+ self.entity_contexts_url = url + f"api/v1/flow/{flow}/import/entity-contexts"
+
+ self.files = files
+ self.user = user
+ self.collection = collection
+ self.document_id = document_id
+
+ async def run(self):
+
+ try:
+ # Load triples first
+ async with connect(self.triples_url) as ws:
+ for file in self.files:
+ await self.load_triples(file, ws)
+
+ # Then load entity contexts
+ async with connect(self.entity_contexts_url) as ws:
+ for file in self.files:
+ await self.load_entity_contexts(file, ws)
+
+ except Exception as e:
+ print(e, flush=True)
+
+ async def load_triples(self, file, ws):
+
+ g = rdflib.Graph()
+ g.parse(file, format="turtle")
+
+ def Value(value, is_uri):
+ return { "v": value, "e": is_uri }
+
+ for e in g:
+ s = Value(value=str(e[0]), is_uri=True)
+ p = Value(value=str(e[1]), is_uri=True)
+ if type(e[2]) == rdflib.term.URIRef:
+ o = Value(value=str(e[2]), is_uri=True)
+ else:
+ o = Value(value=str(e[2]), is_uri=False)
+
+ req = {
+ "metadata": {
+ "id": self.document_id,
+ "metadata": [],
+ "user": self.user,
+ "collection": self.collection
+ },
+ "triples": [
+ {
+ "s": s,
+ "p": p,
+ "o": o,
+ }
+ ]
+ }
+
+ await ws.send(json.dumps(req))
+
+ async def load_entity_contexts(self, file, ws):
+ """
+ Load entity contexts by extracting entities from the RDF graph
+ and generating contextual descriptions based on their relationships.
+ """
+
+ g = rdflib.Graph()
+ g.parse(file, format="turtle")
+
+ for s, p, o in g:
+ # If object is a URI, do nothing
+ if isinstance(o, rdflib.term.URIRef):
+ continue
+
+ # If object is a literal, create entity context for subject with literal as context
+ s_str = str(s)
+ o_str = str(o)
+
+ req = {
+ "metadata": {
+ "id": self.document_id,
+ "metadata": [],
+ "user": self.user,
+ "collection": self.collection
+ },
+ "entities": [
+ {
+ "entity": {
+ "v": s_str,
+ "e": True
+ },
+ "context": o_str
+ }
+ ]
+ }
+
+ await ws.send(json.dumps(req))
+
+
+def main():
+
+ parser = argparse.ArgumentParser(
+ prog='tg-load-knowledge',
+ description=__doc__,
+ )
+
+ parser.add_argument(
+ '-u', '--api-url',
+ default=default_url,
+ help=f'API URL (default: {default_url})',
+ )
+
+ parser.add_argument(
+ '-i', '--document-id',
+ required=True,
+ help=f'Document ID)',
+ )
+
+ parser.add_argument(
+ '-f', '--flow-id',
+ default="default",
+ help=f'Flow ID (default: default)'
+ )
+
+ parser.add_argument(
+ '-U', '--user',
+ default=default_user,
+ help=f'User ID (default: {default_user})'
+ )
+
+ parser.add_argument(
+ '-C', '--collection',
+ default=default_collection,
+ help=f'Collection ID (default: {default_collection})'
+ )
+
+
+ parser.add_argument(
+ 'files', nargs='+',
+ help=f'Turtle files to load'
+ )
+
+ args = parser.parse_args()
+
+ while True:
+
+ try:
+ loader = KnowledgeLoader(
+ document_id = args.document_id,
+ url = args.api_url,
+ flow = args.flow_id,
+ files = args.files,
+ user = args.user,
+ collection = args.collection,
+ )
+
+ asyncio.run(loader.run())
+
+ print("Triples and entity contexts loaded.")
+ break
+
+ except Exception as e:
+
+ print("Exception:", e, flush=True)
+ print("Will retry...", flush=True)
+
+ time.sleep(10)
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file