trustgraph/tests/unit/test_query/test_triples_neo4j_query.py
cybermaggedon 89be656990
Release/v1.2 (#457)
* Bump setup.py versions for 1.1

* PoC MCP server (#419)

* Very initial MCP server PoC for TrustGraph

* Put service on port 8000

* Add MCP container and packages to buildout

* Update docs for API/CLI changes in 1.0 (#421)

* Update some API basics for the 0.23/1.0 API change

* Add MCP container push (#425)

* Add command args to the MCP server (#426)

* Host and port parameters

* Added websocket arg

* More docs

* MCP client support (#427)

- MCP client service
- Tool request/response schema
- API gateway support for mcp-tool
- Message translation for tool request & response
- Make mcp-tool using configuration service for information
  about where the MCP services are.

* Feature/react call mcp (#428)

Key Features

  - MCP Tool Integration: Added core MCP tool support with ToolClientSpec and ToolClient classes
  - API Enhancement: New mcp_tool method for flow-specific tool invocation
  - CLI Tooling: New tg-invoke-mcp-tool command for testing MCP integration
  - React Agent Enhancement: Fixed and improved multi-tool invocation capabilities
  - Tool Management: Enhanced CLI for tool configuration and management

Changes

  - Added MCP tool invocation to API with flow-specific integration
  - Implemented ToolClientSpec and ToolClient for tool call handling
  - Updated agent-manager-react to invoke MCP tools with configurable types
  - Enhanced CLI with new commands and improved help text
  - Added comprehensive documentation for new CLI commands
  - Improved tool configuration management

Testing

  - Added tg-invoke-mcp-tool CLI command for isolated MCP integration testing
  - Enhanced agent capability to invoke multiple tools simultaneously

* Test suite executed from CI pipeline (#433)

* Test strategy & test cases

* Unit tests

* Integration tests

* Extending test coverage (#434)

* Contract tests

* Testing embeedings

* Agent unit tests

* Knowledge pipeline tests

* Turn on contract tests

* Increase storage test coverage (#435)

* Fixing storage and adding tests

* PR pipeline only runs quick tests

* Empty configuration is returned as empty list, previously was not in response (#436)

* Update config util to take files as well as command-line text (#437)

* Updated CLI invocation and config model for tools and mcp (#438)

* Updated CLI invocation and config model for tools and mcp

* CLI anomalies

* Tweaked the MCP tool implementation for new model

* Update agent implementation to match the new model

* Fix agent tools, now all tested

* Fixed integration tests

* Fix MCP delete tool params

* Update Python deps to 1.2

* Update to enable knowledge extraction using the agent framework (#439)

* Implement KG extraction agent (kg-extract-agent)

* Using ReAct framework (agent-manager-react)
 
* ReAct manager had an issue when emitting JSON, which conflicts which ReAct manager's own JSON messages, so refactored ReAct manager to use traditional ReAct messages, non-JSON structure.
 
* Minor refactor to take the prompt template client out of prompt-template so it can be more readily used by other modules. kg-extract-agent uses this framework.

* Migrate from setup.py to pyproject.toml (#440)

* Converted setup.py to pyproject.toml

* Modern package infrastructure as recommended by py docs

* Install missing build deps (#441)

* Install missing build deps (#442)

* Implement logging strategy (#444)

* Logging strategy and convert all prints() to logging invocations

* Fix/startup failure (#445)

* Fix loggin startup problems

* Fix logging startup problems (#446)

* Fix logging startup problems (#447)

* Fixed Mistral OCR to use current API (#448)

* Fixed Mistral OCR to use current API

* Added PDF decoder tests

* Fix Mistral OCR ident to be standard pdf-decoder (#450)

* Fix Mistral OCR ident to be standard pdf-decoder

* Correct test

* Schema structure refactor (#451)

* Write schema refactor spec

* Implemented schema refactor spec

* Structure data mvp (#452)

* Structured data tech spec

* Architecture principles

* New schemas

* Updated schemas and specs

* Object extractor

* Add .coveragerc

* New tests

* Cassandra object storage

* Trying to object extraction working, issues exist

* Validate librarian collection (#453)

* Fix token chunker, broken API invocation (#454)

* Fix token chunker, broken API invocation (#455)

* Knowledge load utility CLI (#456)

* Knowledge loader

* More tests
2025-08-18 20:56:09 +01:00

338 lines
No EOL
13 KiB
Python

"""
Tests for Neo4j triples query service
"""
import pytest
from unittest.mock import MagicMock, patch
from trustgraph.query.triples.neo4j.service import Processor
from trustgraph.schema import Value, TriplesQueryRequest
class TestNeo4jQueryProcessor:
"""Test cases for Neo4j query processor"""
@pytest.fixture
def processor(self):
"""Create a processor instance for testing"""
with patch('trustgraph.query.triples.neo4j.service.GraphDatabase'):
return Processor(
taskgroup=MagicMock(),
id='test-neo4j-query',
graph_host='bolt://localhost:7687'
)
def test_create_value_with_http_uri(self, processor):
"""Test create_value with HTTP URI"""
result = processor.create_value("http://example.com/resource")
assert isinstance(result, Value)
assert result.value == "http://example.com/resource"
assert result.is_uri is True
def test_create_value_with_https_uri(self, processor):
"""Test create_value with HTTPS URI"""
result = processor.create_value("https://example.com/resource")
assert isinstance(result, Value)
assert result.value == "https://example.com/resource"
assert result.is_uri is True
def test_create_value_with_literal(self, processor):
"""Test create_value with literal value"""
result = processor.create_value("just a literal string")
assert isinstance(result, Value)
assert result.value == "just a literal string"
assert result.is_uri is False
def test_create_value_with_empty_string(self, processor):
"""Test create_value with empty string"""
result = processor.create_value("")
assert isinstance(result, Value)
assert result.value == ""
assert result.is_uri is False
def test_create_value_with_partial_uri(self, processor):
"""Test create_value with string that looks like URI but isn't complete"""
result = processor.create_value("http")
assert isinstance(result, Value)
assert result.value == "http"
assert result.is_uri is False
def test_create_value_with_ftp_uri(self, processor):
"""Test create_value with FTP URI (should not be detected as URI)"""
result = processor.create_value("ftp://example.com/file")
assert isinstance(result, Value)
assert result.value == "ftp://example.com/file"
assert result.is_uri is False
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
def test_processor_initialization_with_defaults(self, mock_graph_db):
"""Test processor initialization with default parameters"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
processor = Processor(taskgroup=taskgroup_mock)
assert processor.db == 'neo4j'
mock_graph_db.driver.assert_called_once_with(
'bolt://neo4j:7687',
auth=('neo4j', 'password')
)
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
def test_processor_initialization_with_custom_params(self, mock_graph_db):
"""Test processor initialization with custom parameters"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
processor = Processor(
taskgroup=taskgroup_mock,
graph_host='bolt://custom:7687',
username='queryuser',
password='querypass',
database='customdb'
)
assert processor.db == 'customdb'
mock_graph_db.driver.assert_called_once_with(
'bolt://custom:7687',
auth=('queryuser', 'querypass')
)
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
@pytest.mark.asyncio
async def test_query_triples_spo_query(self, mock_graph_db):
"""Test SPO query (all values specified)"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
# Mock query results - both queries return one record each
mock_records = [MagicMock()]
mock_driver.execute_query.return_value = (mock_records, None, None)
processor = Processor(taskgroup=taskgroup_mock)
# Create query request
query = TriplesQueryRequest(
user='test_user',
collection='test_collection',
s=Value(value="http://example.com/subject", is_uri=True),
p=Value(value="http://example.com/predicate", is_uri=True),
o=Value(value="literal object", is_uri=False),
limit=100
)
result = await processor.query_triples(query)
# Verify both literal and URI queries were executed
assert mock_driver.execute_query.call_count == 2
# Verify result contains the queried triple (appears twice - once from each query)
assert len(result) == 2
assert result[0].s.value == "http://example.com/subject"
assert result[0].p.value == "http://example.com/predicate"
assert result[0].o.value == "literal object"
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
@pytest.mark.asyncio
async def test_query_triples_sp_query(self, mock_graph_db):
"""Test SP query (subject and predicate specified)"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
# Mock query results with different objects
mock_record1 = MagicMock()
mock_record1.data.return_value = {"dest": "literal result"}
mock_record2 = MagicMock()
mock_record2.data.return_value = {"dest": "http://example.com/uri_result"}
mock_driver.execute_query.side_effect = [
([mock_record1], None, None), # Literal query
([mock_record2], None, None) # URI query
]
processor = Processor(taskgroup=taskgroup_mock)
# Create query request
query = TriplesQueryRequest(
user='test_user',
collection='test_collection',
s=Value(value="http://example.com/subject", is_uri=True),
p=Value(value="http://example.com/predicate", is_uri=True),
o=None,
limit=100
)
result = await processor.query_triples(query)
# Verify both literal and URI queries were executed
assert mock_driver.execute_query.call_count == 2
# Verify results contain different objects
assert len(result) == 2
assert result[0].s.value == "http://example.com/subject"
assert result[0].p.value == "http://example.com/predicate"
assert result[0].o.value == "literal result"
assert result[1].s.value == "http://example.com/subject"
assert result[1].p.value == "http://example.com/predicate"
assert result[1].o.value == "http://example.com/uri_result"
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
@pytest.mark.asyncio
async def test_query_triples_wildcard_query(self, mock_graph_db):
"""Test wildcard query (no constraints)"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
# Mock query results
mock_record1 = MagicMock()
mock_record1.data.return_value = {"src": "http://example.com/s1", "rel": "http://example.com/p1", "dest": "literal1"}
mock_record2 = MagicMock()
mock_record2.data.return_value = {"src": "http://example.com/s2", "rel": "http://example.com/p2", "dest": "http://example.com/o2"}
mock_driver.execute_query.side_effect = [
([mock_record1], None, None), # Literal query
([mock_record2], None, None) # URI query
]
processor = Processor(taskgroup=taskgroup_mock)
# Create query request
query = TriplesQueryRequest(
user='test_user',
collection='test_collection',
s=None,
p=None,
o=None,
limit=100
)
result = await processor.query_triples(query)
# Verify both literal and URI queries were executed
assert mock_driver.execute_query.call_count == 2
# Verify results contain different triples
assert len(result) == 2
assert result[0].s.value == "http://example.com/s1"
assert result[0].p.value == "http://example.com/p1"
assert result[0].o.value == "literal1"
assert result[1].s.value == "http://example.com/s2"
assert result[1].p.value == "http://example.com/p2"
assert result[1].o.value == "http://example.com/o2"
@patch('trustgraph.query.triples.neo4j.service.GraphDatabase')
@pytest.mark.asyncio
async def test_query_triples_exception_handling(self, mock_graph_db):
"""Test exception handling during query processing"""
taskgroup_mock = MagicMock()
mock_driver = MagicMock()
mock_graph_db.driver.return_value = mock_driver
# Mock execute_query to raise exception
mock_driver.execute_query.side_effect = Exception("Database connection failed")
processor = Processor(taskgroup=taskgroup_mock)
# Create query request
query = TriplesQueryRequest(
user='test_user',
collection='test_collection',
s=Value(value="http://example.com/subject", is_uri=True),
p=None,
o=None,
limit=100
)
# Should raise the exception
with pytest.raises(Exception, match="Database connection failed"):
await processor.query_triples(query)
def test_add_args_method(self):
"""Test that add_args properly configures argument parser"""
from argparse import ArgumentParser
from unittest.mock import patch
parser = ArgumentParser()
# Mock the parent class add_args method
with patch('trustgraph.query.triples.neo4j.service.TriplesQueryService.add_args') as mock_parent_add_args:
Processor.add_args(parser)
# Verify parent add_args was called
mock_parent_add_args.assert_called_once()
# Verify our specific arguments were added
# Parse empty args to check defaults
args = parser.parse_args([])
assert hasattr(args, 'graph_host')
assert args.graph_host == 'bolt://neo4j:7687'
assert hasattr(args, 'username')
assert args.username == 'neo4j'
assert hasattr(args, 'password')
assert args.password == 'password'
assert hasattr(args, 'database')
assert args.database == 'neo4j'
def test_add_args_with_custom_values(self):
"""Test add_args with custom command line values"""
from argparse import ArgumentParser
from unittest.mock import patch
parser = ArgumentParser()
with patch('trustgraph.query.triples.neo4j.service.TriplesQueryService.add_args'):
Processor.add_args(parser)
# Test parsing with custom values
args = parser.parse_args([
'--graph-host', 'bolt://custom:7687',
'--username', 'queryuser',
'--password', 'querypass',
'--database', 'querydb'
])
assert args.graph_host == 'bolt://custom:7687'
assert args.username == 'queryuser'
assert args.password == 'querypass'
assert args.database == 'querydb'
def test_add_args_short_form(self):
"""Test add_args with short form arguments"""
from argparse import ArgumentParser
from unittest.mock import patch
parser = ArgumentParser()
with patch('trustgraph.query.triples.neo4j.service.TriplesQueryService.add_args'):
Processor.add_args(parser)
# Test parsing with short form
args = parser.parse_args(['-g', 'bolt://short:7687'])
assert args.graph_host == 'bolt://short:7687'
@patch('trustgraph.query.triples.neo4j.service.Processor.launch')
def test_run_function(self, mock_launch):
"""Test the run function calls Processor.launch with correct parameters"""
from trustgraph.query.triples.neo4j.service import run, default_ident
run()
mock_launch.assert_called_once_with(
default_ident,
"\nTriples query service for neo4j.\nInput is a (s, p, o) triple, some values may be null. Output is a list of\ntriples.\n"
)