diff --git a/docs/tech-specs/cassandra-consolidation.md b/docs/tech-specs/cassandra-consolidation.md new file mode 100644 index 00000000..c22dca4d --- /dev/null +++ b/docs/tech-specs/cassandra-consolidation.md @@ -0,0 +1,331 @@ +# Tech Spec: Cassandra Configuration Consolidation + +**Status:** Draft +**Author:** Assistant +**Date:** 2024-09-03 + +## Overview + +This specification addresses the inconsistent naming and configuration patterns for Cassandra connection parameters across the TrustGraph codebase. Currently, two different parameter naming schemes exist (`cassandra_*` vs `graph_*`), leading to confusion and maintenance complexity. + +## Problem Statement + +The codebase currently uses two distinct sets of Cassandra configuration parameters: + +1. **Knowledge/Config/Library modules** use: + - `cassandra_host` (list of hosts) + - `cassandra_user` + - `cassandra_password` + +2. **Graph/Storage modules** use: + - `graph_host` (single host, sometimes converted to list) + - `graph_username` + - `graph_password` + +3. **Inconsistent command-line exposure**: + - Some processors (e.g., `kg-store`) don't expose Cassandra settings as command-line arguments + - Other processors expose them with different names and formats + - Help text doesn't reflect environment variable defaults + +Both parameter sets connect to the same Cassandra cluster but with different naming conventions, causing: +- Configuration confusion for users +- Increased maintenance burden +- Inconsistent documentation +- Potential for misconfiguration +- Inability to override settings via command-line in some processors + +## Proposed Solution + +### 1. Standardize Parameter Names + +All modules will use consistent `cassandra_*` parameter names: +- `cassandra_host` - List of hosts (internally stored as list) +- `cassandra_username` - Username for authentication +- `cassandra_password` - Password for authentication + +### 2. Command-Line Arguments + +All processors MUST expose Cassandra configuration via command-line arguments: +- `--cassandra-host` - Comma-separated list of hosts +- `--cassandra-username` - Username for authentication +- `--cassandra-password` - Password for authentication + +### 3. Environment Variable Fallback + +If command-line parameters are not explicitly provided, the system will check environment variables: +- `CASSANDRA_HOST` - Comma-separated list of hosts +- `CASSANDRA_USERNAME` - Username for authentication +- `CASSANDRA_PASSWORD` - Password for authentication + +### 4. Default Values + +If neither command-line parameters nor environment variables are specified: +- `cassandra_host` defaults to `["cassandra"]` +- `cassandra_username` defaults to `None` (no authentication) +- `cassandra_password` defaults to `None` (no authentication) + +### 5. Help Text Requirements + +The `--help` output must: +- Show environment variable values as defaults when set +- Never display password values (show `****` or `` instead) +- Clearly indicate the resolution order in help text + +Example help output: +``` +--cassandra-host HOST + Cassandra host list, comma-separated (default: prod-cluster-1,prod-cluster-2) + [from CASSANDRA_HOST environment variable] + +--cassandra-username USERNAME + Cassandra username (default: cassandra_user) + [from CASSANDRA_USERNAME environment variable] + +--cassandra-password PASSWORD + Cassandra password (default: ) +``` + +## Implementation Details + +### Parameter Resolution Order + +For each Cassandra parameter, the resolution order will be: +1. Command-line argument value +2. Environment variable (`CASSANDRA_*`) +3. Default value + +### Host Parameter Handling + +The `cassandra_host` parameter: +- Command-line accepts comma-separated string: `--cassandra-host "host1,host2,host3"` +- Environment variable accepts comma-separated string: `CASSANDRA_HOST="host1,host2,host3"` +- Internally always stored as list: `["host1", "host2", "host3"]` +- Single host: `"localhost"` → converted to `["localhost"]` +- Already a list: `["host1", "host2"]` → used as-is + +### Authentication Logic + +Authentication will be used when both `cassandra_username` and `cassandra_password` are provided: +```python +if cassandra_username and cassandra_password: + # Use SSL context and PlainTextAuthProvider +else: + # Connect without authentication +``` + +## Files to Modify + +### Modules using `graph_*` parameters (to be changed): +- `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py` +- `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py` +- `trustgraph-flow/trustgraph/storage/rows/cassandra/write.py` +- `trustgraph-flow/trustgraph/query/triples/cassandra/service.py` + +### Modules using `cassandra_*` parameters (to be updated with env fallback): +- `trustgraph-flow/trustgraph/tables/config.py` +- `trustgraph-flow/trustgraph/tables/knowledge.py` +- `trustgraph-flow/trustgraph/tables/library.py` +- `trustgraph-flow/trustgraph/storage/knowledge/store.py` +- `trustgraph-flow/trustgraph/cores/knowledge.py` +- `trustgraph-flow/trustgraph/librarian/librarian.py` +- `trustgraph-flow/trustgraph/librarian/service.py` +- `trustgraph-flow/trustgraph/config/service/service.py` +- `trustgraph-flow/trustgraph/cores/service.py` + +### Test Files to Update: +- `tests/unit/test_cores/test_knowledge_manager.py` +- `tests/unit/test_storage/test_triples_cassandra_storage.py` +- `tests/unit/test_query/test_triples_cassandra_query.py` +- `tests/integration/test_objects_cassandra_integration.py` + +## Implementation Strategy + +### Phase 1: Create Common Configuration Helper +Create utility functions to standardize Cassandra configuration across all processors: + +```python +import os +import argparse + +def get_cassandra_defaults(): + """Get default values from environment variables or fallback.""" + return { + 'host': os.getenv('CASSANDRA_HOST', 'cassandra'), + 'username': os.getenv('CASSANDRA_USERNAME'), + 'password': os.getenv('CASSANDRA_PASSWORD') + } + +def add_cassandra_args(parser: argparse.ArgumentParser): + """ + Add standardized Cassandra arguments to an argument parser. + Shows environment variable values in help text. + """ + defaults = get_cassandra_defaults() + + # Format help text with env var indication + host_help = f"Cassandra host list, comma-separated (default: {defaults['host']})" + if 'CASSANDRA_HOST' in os.environ: + host_help += " [from CASSANDRA_HOST]" + + username_help = f"Cassandra username" + if defaults['username']: + username_help += f" (default: {defaults['username']})" + if 'CASSANDRA_USERNAME' in os.environ: + username_help += " [from CASSANDRA_USERNAME]" + + password_help = "Cassandra password" + if defaults['password']: + password_help += " (default: )" + if 'CASSANDRA_PASSWORD' in os.environ: + password_help += " [from CASSANDRA_PASSWORD]" + + parser.add_argument( + '--cassandra-host', + default=defaults['host'], + help=host_help + ) + + parser.add_argument( + '--cassandra-username', + default=defaults['username'], + help=username_help + ) + + parser.add_argument( + '--cassandra-password', + default=defaults['password'], + help=password_help + ) + +def resolve_cassandra_config(args) -> tuple[list[str], str|None, str|None]: + """ + Convert argparse args to Cassandra configuration. + + Returns: + tuple: (hosts_list, username, password) + """ + # Convert host string to list + if isinstance(args.cassandra_host, str): + hosts = [h.strip() for h in args.cassandra_host.split(',')] + else: + hosts = args.cassandra_host + + return hosts, args.cassandra_username, args.cassandra_password +``` + +### Phase 2: Update Modules Using `graph_*` Parameters +1. Change parameter names from `graph_*` to `cassandra_*` +2. Replace custom `add_args()` methods with standardized `add_cassandra_args()` +3. Use the common configuration helper functions +4. Update documentation strings + +Example transformation: +```python +# OLD CODE +@staticmethod +def add_args(parser): + parser.add_argument( + '-g', '--graph-host', + default="localhost", + help=f'Graph host (default: localhost)' + ) + parser.add_argument( + '--graph-username', + default=None, + help=f'Cassandra username' + ) + +# NEW CODE +@staticmethod +def add_args(parser): + FlowProcessor.add_args(parser) + add_cassandra_args(parser) # Use standard helper +``` + +### Phase 3: Update Modules Using `cassandra_*` Parameters +1. Add command-line argument support where missing (e.g., `kg-store`) +2. Replace existing argument definitions with `add_cassandra_args()` +3. Use `resolve_cassandra_config()` for consistent resolution +4. Ensure consistent host list handling + +### Phase 4: Update Tests and Documentation +1. Update all test files to use new parameter names +2. Update CLI documentation +3. Update API documentation +4. Add environment variable documentation + +## Backward Compatibility + +To maintain backward compatibility during transition: + +1. **Deprecation warnings** for `graph_*` parameters +2. **Parameter aliasing** - accept both old and new names initially +3. **Phased rollout** over multiple releases +4. **Documentation updates** with migration guide + +Example backward compatibility code: +```python +def __init__(self, **params): + # Handle deprecated graph_* parameters + if 'graph_host' in params: + warnings.warn("graph_host is deprecated, use cassandra_host", DeprecationWarning) + params.setdefault('cassandra_host', params.pop('graph_host')) + + if 'graph_username' in params: + warnings.warn("graph_username is deprecated, use cassandra_username", DeprecationWarning) + params.setdefault('cassandra_username', params.pop('graph_username')) + + # ... continue with standard resolution +``` + +## Testing Strategy + +1. **Unit tests** for configuration resolution logic +2. **Integration tests** with various configuration combinations +3. **Environment variable tests** +4. **Backward compatibility tests** with deprecated parameters +5. **Docker compose tests** with environment variables + +## Documentation Updates + +1. Update all CLI command documentation +2. Update API documentation +3. Create migration guide +4. Update Docker compose examples +5. Update configuration reference documentation + +## Risks and Mitigation + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Breaking changes for users | High | Implement backward compatibility period | +| Configuration confusion during transition | Medium | Clear documentation and deprecation warnings | +| Test failures | Medium | Comprehensive test updates | +| Docker deployment issues | High | Update all Docker compose examples | + +## Success Criteria + +- [ ] All modules use consistent `cassandra_*` parameter names +- [ ] All processors expose Cassandra settings via command-line arguments +- [ ] Command-line help text shows environment variable defaults +- [ ] Password values are never displayed in help text +- [ ] Environment variable fallback works correctly +- [ ] `cassandra_host` is consistently handled as a list internally +- [ ] Backward compatibility maintained for at least 2 releases +- [ ] All tests pass with new configuration system +- [ ] Documentation fully updated +- [ ] Docker compose examples work with environment variables + +## Timeline + +- **Week 1:** Implement common configuration helper and update `graph_*` modules +- **Week 2:** Add environment variable support to existing `cassandra_*` modules +- **Week 3:** Update tests and documentation +- **Week 4:** Integration testing and bug fixes + +## Future Considerations + +- Consider extending this pattern to other database configurations (e.g., Elasticsearch) +- Implement configuration validation and better error messages +- Add support for Cassandra connection pooling configuration +- Consider adding configuration file support (.env files) \ No newline at end of file diff --git a/tests/integration/test_cassandra_config_end_to_end.py b/tests/integration/test_cassandra_config_end_to_end.py new file mode 100644 index 00000000..17a59706 --- /dev/null +++ b/tests/integration/test_cassandra_config_end_to_end.py @@ -0,0 +1,453 @@ +""" +End-to-end integration tests for Cassandra configuration. + +Tests complete configuration flow from environment variables +through processors to Cassandra connections. +""" + +import os +import pytest +from unittest.mock import Mock, patch, MagicMock, call +from argparse import ArgumentParser + +# Import processors that use Cassandra configuration +from trustgraph.storage.triples.cassandra.write import Processor as TriplesWriter +from trustgraph.storage.objects.cassandra.write import Processor as ObjectsWriter +from trustgraph.query.triples.cassandra.service import Processor as TriplesQuery +from trustgraph.storage.knowledge.store import Processor as KgStore + + +class TestEndToEndConfigurationFlow: + """Test complete configuration flow from environment to processors.""" + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_triples_writer_env_to_connection(self, mock_cluster): + """Test complete flow from environment variables to TrustGraph connection.""" + env_vars = { + 'CASSANDRA_HOST': 'integration-host1,integration-host2,integration-host3', + 'CASSANDRA_USERNAME': 'integration-user', + 'CASSANDRA_PASSWORD': 'integration-pass' + } + + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = TriplesWriter(taskgroup=MagicMock()) + + # Create a mock message to trigger TrustGraph creation + mock_message = MagicMock() + mock_message.metadata.user = 'test_user' + mock_message.metadata.collection = 'test_collection' + mock_message.triples = [] + + # This should create TrustGraph with environment config + await processor.store_triples(mock_message) + + # Verify Cluster was created with correct hosts + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['integration-host1', 'integration-host2', 'integration-host3'] + assert 'auth_provider' in call_args.kwargs # Should have auth since credentials provided + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + @patch('trustgraph.storage.objects.cassandra.write.PlainTextAuthProvider') + def test_objects_writer_env_to_cluster_connection(self, mock_auth_provider, mock_cluster): + """Test complete flow from environment variables to Cassandra Cluster connection.""" + env_vars = { + 'CASSANDRA_HOST': 'obj-host1,obj-host2', + 'CASSANDRA_USERNAME': 'obj-user', + 'CASSANDRA_PASSWORD': 'obj-pass' + } + + mock_auth_instance = MagicMock() + mock_auth_provider.return_value = mock_auth_instance + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + + # Trigger Cassandra connection + processor.connect_cassandra() + + # Verify auth provider was created with env vars + mock_auth_provider.assert_called_once_with( + username='obj-user', + password='obj-pass' + ) + + # Verify cluster was created with hosts from env and auth + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.kwargs['contact_points'] == ['obj-host1', 'obj-host2'] + assert call_args.kwargs['auth_provider'] == mock_auth_instance + + @pytest.mark.asyncio + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + async def test_kg_store_env_to_table_store(self, mock_table_store): + """Test complete flow from environment variables to KnowledgeTableStore.""" + env_vars = { + 'CASSANDRA_HOST': 'kg-host1,kg-host2,kg-host3,kg-host4', + 'CASSANDRA_USERNAME': 'kg-user', + 'CASSANDRA_PASSWORD': 'kg-pass' + } + + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = KgStore(taskgroup=MagicMock()) + + # Verify KnowledgeTableStore was created with env config + mock_table_store.assert_called_once_with( + cassandra_host=['kg-host1', 'kg-host2', 'kg-host3', 'kg-host4'], + cassandra_user='kg-user', + cassandra_password='kg-pass', + keyspace='knowledge' + ) + + +class TestConfigurationPriorityEndToEnd: + """Test configuration priority chains end-to-end.""" + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_cli_override_env_end_to_end(self, mock_cluster): + """Test that CLI parameters override environment variables end-to-end.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + # CLI parameters should override environment + processor = TriplesWriter( + taskgroup=MagicMock(), + cassandra_host='cli-host1,cli-host2', + cassandra_username='cli-user', + cassandra_password='cli-pass' + ) + + # Trigger TrustGraph creation + mock_message = MagicMock() + mock_message.metadata.user = 'test_user' + mock_message.metadata.collection = 'test_collection' + mock_message.triples = [] + + await processor.store_triples(mock_message) + + # Should use CLI parameters, not environment + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['cli-host1', 'cli-host2'] # From CLI + assert 'auth_provider' in call_args.kwargs # Should have auth since credentials provided + + @pytest.mark.asyncio + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + async def test_partial_cli_with_env_fallback_end_to_end(self, mock_table_store): + """Test partial CLI parameters with environment fallback end-to-end.""" + env_vars = { + 'CASSANDRA_HOST': 'fallback-host1,fallback-host2', + 'CASSANDRA_USERNAME': 'fallback-user', + 'CASSANDRA_PASSWORD': 'fallback-pass' + } + + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + with patch.dict(os.environ, env_vars, clear=True): + # Only provide host via parameter, rest should fall back to env + processor = KgStore( + taskgroup=MagicMock(), + cassandra_host='partial-host' + # username and password not provided - should use env + ) + + # Verify mixed configuration + mock_table_store.assert_called_once_with( + cassandra_host=['partial-host'], # From parameter + cassandra_user='fallback-user', # From environment + cassandra_password='fallback-pass', # From environment + keyspace='knowledge' + ) + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_no_config_defaults_end_to_end(self, mock_cluster): + """Test that defaults are used when no configuration provided end-to-end.""" + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, {}, clear=True): + processor = TriplesQuery(taskgroup=MagicMock()) + + # Mock query to trigger TrustGraph creation + mock_query = MagicMock() + mock_query.user = 'default_user' + mock_query.collection = 'default_collection' + mock_query.s = None + mock_query.p = None + mock_query.o = None + mock_query.limit = 100 + + # Mock the get_all method to return empty list + mock_tg_instance = MagicMock() + mock_tg_instance.get_all.return_value = [] + processor.tg = mock_tg_instance + + await processor.query_triples(mock_query) + + # Should use defaults + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['cassandra'] # Default host + assert 'auth_provider' not in call_args.kwargs # No auth with default config + + +class TestBackwardCompatibilityEndToEnd: + """Test backward compatibility with old parameter names end-to-end.""" + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_old_graph_params_still_work_end_to_end(self, mock_cluster): + """Test that old graph_* parameters still work end-to-end.""" + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + # Use old parameter names + processor = TriplesWriter( + taskgroup=MagicMock(), + graph_host='legacy-host', + graph_username='legacy-user', + graph_password='legacy-pass' + ) + + # Trigger TrustGraph creation + mock_message = MagicMock() + mock_message.metadata.user = 'legacy_user' + mock_message.metadata.collection = 'legacy_collection' + mock_message.triples = [] + + await processor.store_triples(mock_message) + + # Should work with legacy parameters + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['legacy-host'] + assert 'auth_provider' in call_args.kwargs # Should have auth since credentials provided + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_old_cassandra_user_param_still_works_end_to_end(self, mock_table_store): + """Test that old cassandra_user parameter still works end-to-end.""" + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + # Use old cassandra_user parameter + processor = KgStore( + taskgroup=MagicMock(), + cassandra_host='legacy-kg-host', + cassandra_user='legacy-kg-user', # Old parameter name + cassandra_password='legacy-kg-pass' + ) + + # Should work with old parameter name + mock_table_store.assert_called_once_with( + cassandra_host=['legacy-kg-host'], + cassandra_user='legacy-kg-user', + cassandra_password='legacy-kg-pass', + keyspace='knowledge' + ) + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_new_params_override_old_params_end_to_end(self, mock_cluster): + """Test that new parameters override old ones when both are present end-to-end.""" + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + # Provide both old and new parameters + processor = TriplesWriter( + taskgroup=MagicMock(), + cassandra_host='new-host', + graph_host='old-host', # Should be ignored + cassandra_username='new-user', + graph_username='old-user', # Should be ignored + cassandra_password='new-pass', + graph_password='old-pass' # Should be ignored + ) + + # Trigger TrustGraph creation + mock_message = MagicMock() + mock_message.metadata.user = 'precedence_user' + mock_message.metadata.collection = 'precedence_collection' + mock_message.triples = [] + + await processor.store_triples(mock_message) + + # Should use new parameters, not old ones + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['new-host'] # New parameter wins + assert 'auth_provider' in call_args.kwargs # Should have auth since credentials provided + + +class TestMultipleHostsHandling: + """Test multiple Cassandra hosts handling end-to-end.""" + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + def test_multiple_hosts_passed_to_cluster(self, mock_cluster): + """Test that multiple hosts are correctly passed to Cassandra cluster.""" + env_vars = { + 'CASSANDRA_HOST': 'host1,host2,host3,host4,host5' + } + + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + processor.connect_cassandra() + + # Verify all hosts were passed to Cluster + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.kwargs['contact_points'] == ['host1', 'host2', 'host3', 'host4', 'host5'] + + @pytest.mark.asyncio + @patch('trustgraph.direct.cassandra.Cluster') + async def test_single_host_converted_to_list(self, mock_cluster): + """Test that single host is converted to list for TrustGraph.""" + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + processor = TriplesWriter(taskgroup=MagicMock(), cassandra_host='single-host') + + # Trigger TrustGraph creation + mock_message = MagicMock() + mock_message.metadata.user = 'single_user' + mock_message.metadata.collection = 'single_collection' + mock_message.triples = [] + + await processor.store_triples(mock_message) + + # Single host should be converted to list + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + assert call_args.args[0] == ['single-host'] # Converted to list + assert 'auth_provider' not in call_args.kwargs # No auth since no credentials provided + + def test_whitespace_handling_in_host_list(self): + """Test that whitespace in host lists is handled correctly.""" + from trustgraph.base.cassandra_config import resolve_cassandra_config + + # Test various whitespace scenarios + hosts1, _, _ = resolve_cassandra_config(host='host1, host2 , host3') + assert hosts1 == ['host1', 'host2', 'host3'] + + hosts2, _, _ = resolve_cassandra_config(host='host1,host2,host3,') + assert hosts2 == ['host1', 'host2', 'host3'] + + hosts3, _, _ = resolve_cassandra_config(host=' host1 , host2 ') + assert hosts3 == ['host1', 'host2'] + + +class TestAuthenticationFlow: + """Test authentication configuration flow end-to-end.""" + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + @patch('trustgraph.storage.objects.cassandra.write.PlainTextAuthProvider') + def test_authentication_enabled_when_both_credentials_provided(self, mock_auth_provider, mock_cluster): + """Test that authentication is enabled when both username and password are provided.""" + env_vars = { + 'CASSANDRA_HOST': 'auth-host', + 'CASSANDRA_USERNAME': 'auth-user', + 'CASSANDRA_PASSWORD': 'auth-secret' + } + + mock_auth_instance = MagicMock() + mock_auth_provider.return_value = mock_auth_instance + mock_cluster_instance = MagicMock() + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + processor.connect_cassandra() + + # Auth provider should be created + mock_auth_provider.assert_called_once_with( + username='auth-user', + password='auth-secret' + ) + + # Cluster should be created with auth provider + call_args = mock_cluster.call_args + assert 'auth_provider' in call_args.kwargs + assert call_args.kwargs['auth_provider'] == mock_auth_instance + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + @patch('trustgraph.storage.objects.cassandra.write.PlainTextAuthProvider') + def test_no_authentication_when_credentials_missing(self, mock_auth_provider, mock_cluster): + """Test that authentication is not used when credentials are missing.""" + env_vars = { + 'CASSANDRA_HOST': 'no-auth-host' + # No username/password + } + + mock_cluster_instance = MagicMock() + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + processor.connect_cassandra() + + # Auth provider should not be created + mock_auth_provider.assert_not_called() + + # Cluster should be created without auth provider + call_args = mock_cluster.call_args + assert 'auth_provider' not in call_args.kwargs + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + @patch('trustgraph.storage.objects.cassandra.write.PlainTextAuthProvider') + def test_no_authentication_when_only_username_provided(self, mock_auth_provider, mock_cluster): + """Test that authentication is not used when only username is provided.""" + processor = ObjectsWriter( + taskgroup=MagicMock(), + cassandra_host='partial-auth-host', + cassandra_username='partial-user' + # No password + ) + + mock_cluster_instance = MagicMock() + mock_cluster.return_value = mock_cluster_instance + + processor.connect_cassandra() + + # Auth provider should not be created (needs both username AND password) + mock_auth_provider.assert_not_called() + + # Cluster should be created without auth provider + call_args = mock_cluster.call_args + assert 'auth_provider' not in call_args.kwargs \ No newline at end of file diff --git a/tests/unit/test_base/test_cassandra_config.py b/tests/unit/test_base/test_cassandra_config.py new file mode 100644 index 00000000..a3579462 --- /dev/null +++ b/tests/unit/test_base/test_cassandra_config.py @@ -0,0 +1,411 @@ +""" +Unit tests for Cassandra configuration helper module. + +Tests configuration resolution, environment variable handling, +command-line argument parsing, and backward compatibility. +""" + +import argparse +import os +import pytest +from unittest.mock import patch + +from trustgraph.base.cassandra_config import ( + get_cassandra_defaults, + add_cassandra_args, + resolve_cassandra_config, + get_cassandra_config_from_params +) + + +class TestGetCassandraDefaults: + """Test the get_cassandra_defaults function.""" + + def test_defaults_with_no_env_vars(self): + """Test defaults when no environment variables are set.""" + with patch.dict(os.environ, {}, clear=True): + defaults = get_cassandra_defaults() + + assert defaults['host'] == 'cassandra' + assert defaults['username'] is None + assert defaults['password'] is None + + def test_defaults_with_env_vars(self): + """Test defaults when environment variables are set.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host1,env-host2', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + defaults = get_cassandra_defaults() + + assert defaults['host'] == 'env-host1,env-host2' + assert defaults['username'] == 'env-user' + assert defaults['password'] == 'env-pass' + + def test_partial_env_vars(self): + """Test defaults when only some environment variables are set.""" + env_vars = { + 'CASSANDRA_HOST': 'partial-host', + 'CASSANDRA_USERNAME': 'partial-user' + # CASSANDRA_PASSWORD not set + } + + with patch.dict(os.environ, env_vars, clear=True): + defaults = get_cassandra_defaults() + + assert defaults['host'] == 'partial-host' + assert defaults['username'] == 'partial-user' + assert defaults['password'] is None + + +class TestAddCassandraArgs: + """Test the add_cassandra_args function.""" + + def test_basic_args_added(self): + """Test that all three arguments are added to parser.""" + parser = argparse.ArgumentParser() + add_cassandra_args(parser) + + # Parse empty args to check defaults + args = parser.parse_args([]) + + assert hasattr(args, 'cassandra_host') + assert hasattr(args, 'cassandra_username') + assert hasattr(args, 'cassandra_password') + + def test_help_text_no_env_vars(self): + """Test help text when no environment variables are set.""" + with patch.dict(os.environ, {}, clear=True): + parser = argparse.ArgumentParser() + add_cassandra_args(parser) + + help_text = parser.format_help() + + assert 'Cassandra host list, comma-separated (default:' in help_text + assert 'cassandra)' in help_text + assert 'Cassandra username' in help_text + assert 'Cassandra password' in help_text + assert '[from CASSANDRA_HOST]' not in help_text + + def test_help_text_with_env_vars(self): + """Test help text when environment variables are set.""" + env_vars = { + 'CASSANDRA_HOST': 'help-host1,help-host2', + 'CASSANDRA_USERNAME': 'help-user', + 'CASSANDRA_PASSWORD': 'help-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + parser = argparse.ArgumentParser() + add_cassandra_args(parser) + + help_text = parser.format_help() + + # Help text may have line breaks - argparse breaks long lines + # So check for the components that should be there + assert 'help-' in help_text and 'host1' in help_text + assert 'help-host2' in help_text + # Check key components (may be split across lines by argparse) + assert '[from CASSANDRA_HOST]' in help_text + assert '(default: help-user)' in help_text + assert '[from' in help_text and 'CASSANDRA_USERNAME]' in help_text + assert '(default: )' in help_text # Password hidden + assert '[from' in help_text and 'CASSANDRA_PASSWORD]' in help_text + assert 'help-pass' not in help_text # Password value not shown + + def test_command_line_override(self): + """Test that command-line arguments override environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + parser = argparse.ArgumentParser() + add_cassandra_args(parser) + + args = parser.parse_args([ + '--cassandra-host', 'cli-host', + '--cassandra-username', 'cli-user', + '--cassandra-password', 'cli-pass' + ]) + + assert args.cassandra_host == 'cli-host' + assert args.cassandra_username == 'cli-user' + assert args.cassandra_password == 'cli-pass' + + +class TestResolveCassandraConfig: + """Test the resolve_cassandra_config function.""" + + def test_default_configuration(self): + """Test resolution with no parameters or environment variables.""" + with patch.dict(os.environ, {}, clear=True): + hosts, username, password = resolve_cassandra_config() + + assert hosts == ['cassandra'] + assert username is None + assert password is None + + def test_environment_variable_resolution(self): + """Test resolution from environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'env1,env2,env3', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + hosts, username, password = resolve_cassandra_config() + + assert hosts == ['env1', 'env2', 'env3'] + assert username == 'env-user' + assert password == 'env-pass' + + def test_explicit_parameter_override(self): + """Test that explicit parameters override environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + hosts, username, password = resolve_cassandra_config( + host='explicit-host', + username='explicit-user', + password='explicit-pass' + ) + + assert hosts == ['explicit-host'] + assert username == 'explicit-user' + assert password == 'explicit-pass' + + def test_host_list_parsing(self): + """Test different host list formats.""" + # Single host + hosts, _, _ = resolve_cassandra_config(host='single-host') + assert hosts == ['single-host'] + + # Multiple hosts with spaces + hosts, _, _ = resolve_cassandra_config(host='host1, host2 ,host3') + assert hosts == ['host1', 'host2', 'host3'] + + # Empty elements filtered out + hosts, _, _ = resolve_cassandra_config(host='host1,,host2,') + assert hosts == ['host1', 'host2'] + + # Already a list + hosts, _, _ = resolve_cassandra_config(host=['list-host1', 'list-host2']) + assert hosts == ['list-host1', 'list-host2'] + + def test_args_object_resolution(self): + """Test resolution from argparse args object.""" + # Mock args object + class MockArgs: + cassandra_host = 'args-host1,args-host2' + cassandra_username = 'args-user' + cassandra_password = 'args-pass' + + args = MockArgs() + hosts, username, password = resolve_cassandra_config(args) + + assert hosts == ['args-host1', 'args-host2'] + assert username == 'args-user' + assert password == 'args-pass' + + def test_partial_args_with_env_fallback(self): + """Test args object with missing attributes falls back to environment.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + # Args object with only some attributes + class PartialArgs: + cassandra_host = 'args-host' + # Missing cassandra_username and cassandra_password + + with patch.dict(os.environ, env_vars, clear=True): + args = PartialArgs() + hosts, username, password = resolve_cassandra_config(args) + + assert hosts == ['args-host'] # From args + assert username == 'env-user' # From env + assert password == 'env-pass' # From env + + +class TestGetCassandraConfigFromParams: + """Test the get_cassandra_config_from_params function.""" + + def test_new_parameter_names(self): + """Test with new cassandra_* parameter names.""" + params = { + 'cassandra_host': 'new-host1,new-host2', + 'cassandra_username': 'new-user', + 'cassandra_password': 'new-pass' + } + + hosts, username, password = get_cassandra_config_from_params(params) + + assert hosts == ['new-host1', 'new-host2'] + assert username == 'new-user' + assert password == 'new-pass' + + def test_backward_compatibility_graph_params(self): + """Test backward compatibility with old graph_* parameter names.""" + params = { + 'graph_host': 'old-host', + 'graph_username': 'old-user', + 'graph_password': 'old-pass' + } + + hosts, username, password = get_cassandra_config_from_params(params) + + assert hosts == ['old-host'] + assert username == 'old-user' + assert password == 'old-pass' + + def test_old_cassandra_user_compatibility(self): + """Test backward compatibility with cassandra_user (vs cassandra_username).""" + params = { + 'cassandra_host': 'compat-host', + 'cassandra_user': 'compat-user', # Old name + 'cassandra_password': 'compat-pass' + } + + hosts, username, password = get_cassandra_config_from_params(params) + + assert hosts == ['compat-host'] + assert username == 'compat-user' + assert password == 'compat-pass' + + def test_parameter_precedence(self): + """Test that new parameter names take precedence over old ones.""" + params = { + 'cassandra_host': 'new-host', + 'graph_host': 'old-host', + 'cassandra_username': 'new-user', + 'graph_username': 'old-user', + 'cassandra_user': 'older-user', + 'cassandra_password': 'new-pass', + 'graph_password': 'old-pass' + } + + hosts, username, password = get_cassandra_config_from_params(params) + + assert hosts == ['new-host'] # New takes precedence + assert username == 'new-user' # New takes precedence + assert password == 'new-pass' # New takes precedence + + def test_empty_params_with_env_fallback(self): + """Test that empty params falls back to environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'fallback-host1,fallback-host2', + 'CASSANDRA_USERNAME': 'fallback-user', + 'CASSANDRA_PASSWORD': 'fallback-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + params = {} + hosts, username, password = get_cassandra_config_from_params(params) + + assert hosts == ['fallback-host1', 'fallback-host2'] + assert username == 'fallback-user' + assert password == 'fallback-pass' + + +class TestConfigurationPriority: + """Test the overall configuration priority: CLI > env vars > defaults.""" + + def test_full_priority_chain(self): + """Test complete priority chain with all sources present.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + # CLI args should override everything + hosts, username, password = resolve_cassandra_config( + host='cli-host', + username='cli-user', + password='cli-pass' + ) + + assert hosts == ['cli-host'] + assert username == 'cli-user' + assert password == 'cli-pass' + + def test_partial_cli_with_env_fallback(self): + """Test partial CLI args with environment variable fallback.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + # Only provide host via CLI + hosts, username, password = resolve_cassandra_config( + host='cli-host' + # username and password not provided + ) + + assert hosts == ['cli-host'] # From CLI + assert username == 'env-user' # From env + assert password == 'env-pass' # From env + + def test_no_config_defaults(self): + """Test that defaults are used when no configuration is provided.""" + with patch.dict(os.environ, {}, clear=True): + hosts, username, password = resolve_cassandra_config() + + assert hosts == ['cassandra'] # Default + assert username is None # Default + assert password is None # Default + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_empty_host_string(self): + """Test handling of empty host string falls back to default.""" + hosts, _, _ = resolve_cassandra_config(host='') + assert hosts == ['cassandra'] # Falls back to default + + def test_whitespace_only_host(self): + """Test handling of whitespace-only host string.""" + hosts, _, _ = resolve_cassandra_config(host=' ') + assert hosts == [] # Empty after stripping whitespace + + def test_none_values_preserved(self): + """Test that None values are preserved correctly.""" + hosts, username, password = resolve_cassandra_config( + host=None, + username=None, + password=None + ) + + # Should fall back to defaults + assert hosts == ['cassandra'] + assert username is None + assert password is None + + def test_mixed_none_and_values(self): + """Test mixing None and actual values.""" + hosts, username, password = resolve_cassandra_config( + host='mixed-host', + username=None, + password='mixed-pass' + ) + + assert hosts == ['mixed-host'] + assert username is None # Stays None + assert password == 'mixed-pass' \ No newline at end of file diff --git a/tests/unit/test_query/test_triples_cassandra_query.py b/tests/unit/test_query/test_triples_cassandra_query.py index 653e1f6a..efa557b5 100644 --- a/tests/unit/test_query/test_triples_cassandra_query.py +++ b/tests/unit/test_query/test_triples_cassandra_query.py @@ -122,7 +122,7 @@ class TestCassandraQueryProcessor: processor = Processor(taskgroup=taskgroup_mock) - assert processor.graph_host == ['localhost'] + assert processor.graph_host == ['cassandra'] # Updated default assert processor.username is None assert processor.password is None assert processor.table is None @@ -325,12 +325,12 @@ class TestCassandraQueryProcessor: # Verify our specific arguments were added args = parser.parse_args([]) - assert hasattr(args, 'graph_host') - assert args.graph_host == 'localhost' - assert hasattr(args, 'graph_username') - assert args.graph_username is None - assert hasattr(args, 'graph_password') - assert args.graph_password is None + assert hasattr(args, 'cassandra_host') + assert args.cassandra_host == 'cassandra' # Updated to new parameter name and default + assert hasattr(args, 'cassandra_username') + assert args.cassandra_username is None + assert hasattr(args, 'cassandra_password') + assert args.cassandra_password is None def test_add_args_with_custom_values(self): """Test add_args with custom command line values""" @@ -341,16 +341,16 @@ class TestCassandraQueryProcessor: with patch('trustgraph.query.triples.cassandra.service.TriplesQueryService.add_args'): Processor.add_args(parser) - # Test parsing with custom values + # Test parsing with custom values (new cassandra_* arguments) args = parser.parse_args([ - '--graph-host', 'query.cassandra.com', - '--graph-username', 'queryuser', - '--graph-password', 'querypass' + '--cassandra-host', 'query.cassandra.com', + '--cassandra-username', 'queryuser', + '--cassandra-password', 'querypass' ]) - assert args.graph_host == 'query.cassandra.com' - assert args.graph_username == 'queryuser' - assert args.graph_password == 'querypass' + assert args.cassandra_host == 'query.cassandra.com' + assert args.cassandra_username == 'queryuser' + assert args.cassandra_password == 'querypass' def test_add_args_short_form(self): """Test add_args with short form arguments""" @@ -361,10 +361,10 @@ class TestCassandraQueryProcessor: with patch('trustgraph.query.triples.cassandra.service.TriplesQueryService.add_args'): Processor.add_args(parser) - # Test parsing with short form - args = parser.parse_args(['-g', 'short.query.com']) + # Test parsing with cassandra arguments (no short form) + args = parser.parse_args(['--cassandra-host', 'short.query.com']) - assert args.graph_host == 'short.query.com' + assert args.cassandra_host == 'short.query.com' @patch('trustgraph.query.triples.cassandra.service.Processor.launch') def test_run_function(self, mock_launch): @@ -404,7 +404,7 @@ class TestCassandraQueryProcessor: # Verify TrustGraph was created with authentication mock_trustgraph.assert_called_once_with( - hosts=['localhost'], + hosts=['cassandra'], # Updated default keyspace='test_user', table='test_collection', username='authuser', diff --git a/tests/unit/test_storage/test_cassandra_config_integration.py b/tests/unit/test_storage/test_cassandra_config_integration.py new file mode 100644 index 00000000..42235ccb --- /dev/null +++ b/tests/unit/test_storage/test_cassandra_config_integration.py @@ -0,0 +1,428 @@ +""" +Integration tests for Cassandra configuration in processors. + +Tests that processors correctly use the configuration helper +and handle environment variables, CLI args, and backward compatibility. +""" + +import os +import pytest +from unittest.mock import Mock, patch, MagicMock + +from trustgraph.storage.triples.cassandra.write import Processor as TriplesWriter +from trustgraph.storage.objects.cassandra.write import Processor as ObjectsWriter +from trustgraph.query.triples.cassandra.service import Processor as TriplesQuery +from trustgraph.storage.knowledge.store import Processor as KgStore + + +class TestTriplesWriterConfiguration: + """Test Cassandra configuration in triples writer processor.""" + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_environment_variable_configuration(self, mock_trust_graph): + """Test processor picks up configuration from environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host1,env-host2', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + processor = TriplesWriter(taskgroup=MagicMock()) + + assert processor.graph_host == ['env-host1', 'env-host2'] + assert processor.username == 'env-user' + assert processor.password == 'env-pass' + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_parameter_override_environment(self, mock_trust_graph): + """Test explicit parameters override environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + processor = TriplesWriter( + taskgroup=MagicMock(), + cassandra_host='param-host1,param-host2', + cassandra_username='param-user', + cassandra_password='param-pass' + ) + + assert processor.graph_host == ['param-host1', 'param-host2'] + assert processor.username == 'param-user' + assert processor.password == 'param-pass' + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_backward_compatibility_graph_params(self, mock_trust_graph): + """Test backward compatibility with old graph_* parameter names.""" + processor = TriplesWriter( + taskgroup=MagicMock(), + graph_host='compat-host', + graph_username='compat-user', + graph_password='compat-pass' + ) + + assert processor.graph_host == ['compat-host'] + assert processor.username == 'compat-user' + assert processor.password == 'compat-pass' + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_default_configuration(self, mock_trust_graph): + """Test default configuration when no params or env vars provided.""" + with patch.dict(os.environ, {}, clear=True): + processor = TriplesWriter(taskgroup=MagicMock()) + + assert processor.graph_host == ['cassandra'] + assert processor.username is None + assert processor.password is None + + +class TestObjectsWriterConfiguration: + """Test Cassandra configuration in objects writer processor.""" + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + def test_environment_variable_configuration(self, mock_cluster): + """Test processor picks up configuration from environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'obj-env-host1,obj-env-host2', + 'CASSANDRA_USERNAME': 'obj-env-user', + 'CASSANDRA_PASSWORD': 'obj-env-pass' + } + + mock_cluster_instance = MagicMock() + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + + assert processor.graph_host == ['obj-env-host1', 'obj-env-host2'] + assert processor.graph_username == 'obj-env-user' + assert processor.graph_password == 'obj-env-pass' + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + def test_cassandra_connection_with_hosts_list(self, mock_cluster): + """Test that Cassandra connection uses hosts list correctly.""" + env_vars = { + 'CASSANDRA_HOST': 'conn-host1,conn-host2,conn-host3', + 'CASSANDRA_USERNAME': 'conn-user', + 'CASSANDRA_PASSWORD': 'conn-pass' + } + + mock_cluster_instance = MagicMock() + mock_session = MagicMock() + mock_cluster_instance.connect.return_value = mock_session + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + processor.connect_cassandra() + + # Verify cluster was called with hosts list + mock_cluster.assert_called_once() + call_args = mock_cluster.call_args + + # Check that contact_points was passed the hosts list + assert 'contact_points' in call_args.kwargs + assert call_args.kwargs['contact_points'] == ['conn-host1', 'conn-host2', 'conn-host3'] + + @patch('trustgraph.storage.objects.cassandra.write.Cluster') + @patch('trustgraph.storage.objects.cassandra.write.PlainTextAuthProvider') + def test_authentication_configuration(self, mock_auth_provider, mock_cluster): + """Test authentication is configured when credentials are provided.""" + env_vars = { + 'CASSANDRA_HOST': 'auth-host', + 'CASSANDRA_USERNAME': 'auth-user', + 'CASSANDRA_PASSWORD': 'auth-pass' + } + + mock_auth_instance = MagicMock() + mock_auth_provider.return_value = mock_auth_instance + mock_cluster_instance = MagicMock() + mock_cluster.return_value = mock_cluster_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = ObjectsWriter(taskgroup=MagicMock()) + processor.connect_cassandra() + + # Verify auth provider was created with correct credentials + mock_auth_provider.assert_called_once_with( + username='auth-user', + password='auth-pass' + ) + + # Verify cluster was configured with auth provider + call_args = mock_cluster.call_args + assert 'auth_provider' in call_args.kwargs + assert call_args.kwargs['auth_provider'] == mock_auth_instance + + +class TestTriplesQueryConfiguration: + """Test Cassandra configuration in triples query processor.""" + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_environment_variable_configuration(self, mock_trust_graph): + """Test processor picks up configuration from environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'query-env-host1,query-env-host2', + 'CASSANDRA_USERNAME': 'query-env-user', + 'CASSANDRA_PASSWORD': 'query-env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + processor = TriplesQuery(taskgroup=MagicMock()) + + assert processor.graph_host == ['query-env-host1', 'query-env-host2'] + assert processor.username == 'query-env-user' + assert processor.password == 'query-env-pass' + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_mixed_old_new_parameters(self, mock_trust_graph): + """Test mixing old and new parameter names (new should win).""" + processor = TriplesQuery( + taskgroup=MagicMock(), + cassandra_host='new-host', + graph_host='old-host', + cassandra_username='new-user', + graph_username='old-user' + ) + + # New parameters should take precedence + assert processor.graph_host == ['new-host'] + assert processor.username == 'new-user' + + +class TestKgStoreConfiguration: + """Test Cassandra configuration in knowledge store processor.""" + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_environment_variable_configuration(self, mock_table_store): + """Test kg-store picks up configuration from environment variables.""" + env_vars = { + 'CASSANDRA_HOST': 'kg-env-host1,kg-env-host2,kg-env-host3', + 'CASSANDRA_USERNAME': 'kg-env-user', + 'CASSANDRA_PASSWORD': 'kg-env-pass' + } + + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + with patch.dict(os.environ, env_vars, clear=True): + processor = KgStore(taskgroup=MagicMock()) + + # Verify KnowledgeTableStore was called with resolved config + mock_table_store.assert_called_once_with( + cassandra_host=['kg-env-host1', 'kg-env-host2', 'kg-env-host3'], + cassandra_user='kg-env-user', + cassandra_password='kg-env-pass', + keyspace='knowledge' + ) + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_explicit_parameters(self, mock_table_store): + """Test kg-store with explicit parameters.""" + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + processor = KgStore( + taskgroup=MagicMock(), + cassandra_host='explicit-host', + cassandra_username='explicit-user', + cassandra_password='explicit-pass' + ) + + # Verify KnowledgeTableStore was called with explicit config + mock_table_store.assert_called_once_with( + cassandra_host=['explicit-host'], + cassandra_user='explicit-user', + cassandra_password='explicit-pass', + keyspace='knowledge' + ) + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_backward_compatibility_cassandra_user(self, mock_table_store): + """Test backward compatibility with cassandra_user parameter.""" + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + processor = KgStore( + taskgroup=MagicMock(), + cassandra_host='compat-host', + cassandra_user='compat-user', # Old parameter name + cassandra_password='compat-pass' + ) + + # Should still work with old parameter name + mock_table_store.assert_called_once_with( + cassandra_host=['compat-host'], + cassandra_user='compat-user', + cassandra_password='compat-pass', + keyspace='knowledge' + ) + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_default_configuration(self, mock_table_store): + """Test kg-store default configuration.""" + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + with patch.dict(os.environ, {}, clear=True): + processor = KgStore(taskgroup=MagicMock()) + + # Should use defaults + mock_table_store.assert_called_once_with( + cassandra_host=['cassandra'], + cassandra_user=None, + cassandra_password=None, + keyspace='knowledge' + ) + + +class TestCommandLineArgumentHandling: + """Test command-line argument parsing in processors.""" + + def test_triples_writer_add_args(self): + """Test that triples writer adds standard Cassandra arguments.""" + import argparse + from trustgraph.storage.triples.cassandra.write import Processor as TriplesWriter + + parser = argparse.ArgumentParser() + TriplesWriter.add_args(parser) + + # Parse empty args to check that arguments exist + args = parser.parse_args([]) + + assert hasattr(args, 'cassandra_host') + assert hasattr(args, 'cassandra_username') + assert hasattr(args, 'cassandra_password') + + def test_objects_writer_add_args(self): + """Test that objects writer adds standard Cassandra arguments.""" + import argparse + from trustgraph.storage.objects.cassandra.write import Processor as ObjectsWriter + + parser = argparse.ArgumentParser() + ObjectsWriter.add_args(parser) + + # Parse empty args to check that arguments exist + args = parser.parse_args([]) + + assert hasattr(args, 'cassandra_host') + assert hasattr(args, 'cassandra_username') + assert hasattr(args, 'cassandra_password') + assert hasattr(args, 'config_type') # Objects writer specific arg + + def test_triples_query_add_args(self): + """Test that triples query adds standard Cassandra arguments.""" + import argparse + from trustgraph.query.triples.cassandra.service import Processor as TriplesQuery + + parser = argparse.ArgumentParser() + TriplesQuery.add_args(parser) + + # Parse empty args to check that arguments exist + args = parser.parse_args([]) + + assert hasattr(args, 'cassandra_host') + assert hasattr(args, 'cassandra_username') + assert hasattr(args, 'cassandra_password') + + def test_kg_store_add_args(self): + """Test that kg-store now adds Cassandra arguments (previously missing).""" + import argparse + from trustgraph.storage.knowledge.store import Processor as KgStore + + parser = argparse.ArgumentParser() + KgStore.add_args(parser) + + # Parse empty args to check that arguments exist + args = parser.parse_args([]) + + assert hasattr(args, 'cassandra_host') + assert hasattr(args, 'cassandra_username') + assert hasattr(args, 'cassandra_password') + + def test_help_text_with_environment_variables(self): + """Test that help text shows environment variable values.""" + import argparse + from trustgraph.storage.triples.cassandra.write import Processor as TriplesWriter + + env_vars = { + 'CASSANDRA_HOST': 'help-host1,help-host2', + 'CASSANDRA_USERNAME': 'help-user', + 'CASSANDRA_PASSWORD': 'help-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + parser = argparse.ArgumentParser() + TriplesWriter.add_args(parser) + + help_text = parser.format_help() + + # Should show environment variable values (except password) + # Help text may have line breaks - argparse breaks long lines + # So check for the components that should be there + assert 'help-' in help_text and 'host1' in help_text + assert 'help-host2' in help_text + assert 'help-user' in help_text + assert '' in help_text # Password should be hidden + assert 'help-pass' not in help_text # Password value not shown + assert '[from CASSANDRA_HOST]' in help_text + # Check key components (may be split across lines by argparse) + assert '[from' in help_text and 'CASSANDRA_USERNAME]' in help_text + assert '[from' in help_text and 'CASSANDRA_PASSWORD]' in help_text + + +class TestConfigurationPriorityIntegration: + """Test complete configuration priority chain in processors.""" + + @patch('trustgraph.direct.cassandra.TrustGraph') + def test_complete_priority_chain(self, mock_trust_graph): + """Test CLI params > env vars > defaults priority in actual processor.""" + env_vars = { + 'CASSANDRA_HOST': 'env-host', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + # Explicit parameters should override environment + processor = TriplesWriter( + taskgroup=MagicMock(), + cassandra_host='cli-host1,cli-host2', + cassandra_username='cli-user' + # Password not provided - should fall back to env + ) + + assert processor.graph_host == ['cli-host1', 'cli-host2'] # From CLI + assert processor.username == 'cli-user' # From CLI + assert processor.password == 'env-pass' # From env + + @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') + def test_kg_store_priority_chain(self, mock_table_store): + """Test configuration priority chain in kg-store processor.""" + mock_store_instance = MagicMock() + mock_table_store.return_value = mock_store_instance + + env_vars = { + 'CASSANDRA_HOST': 'env-host1,env-host2', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + + with patch.dict(os.environ, env_vars, clear=True): + processor = KgStore( + taskgroup=MagicMock(), + cassandra_host='param-host' + # username and password not provided - should use env + ) + + # Verify correct priority resolution + mock_table_store.assert_called_once_with( + cassandra_host=['param-host'], # From parameter + cassandra_user='env-user', # From environment + cassandra_password='env-pass', # From environment + keyspace='knowledge' + ) \ No newline at end of file diff --git a/tests/unit/test_storage/test_triples_cassandra_storage.py b/tests/unit/test_storage/test_triples_cassandra_storage.py index 9fbeb187..9ff03d1f 100644 --- a/tests/unit/test_storage/test_triples_cassandra_storage.py +++ b/tests/unit/test_storage/test_triples_cassandra_storage.py @@ -16,23 +16,25 @@ class TestCassandraStorageProcessor: """Test processor initialization with default parameters""" taskgroup_mock = MagicMock() - processor = Processor(taskgroup=taskgroup_mock) + # Patch environment to ensure clean defaults + with patch.dict('os.environ', {}, clear=True): + processor = Processor(taskgroup=taskgroup_mock) - assert processor.graph_host == ['localhost'] + assert processor.graph_host == ['cassandra'] # Updated default assert processor.username is None assert processor.password is None assert processor.table is None def test_processor_initialization_with_custom_params(self): - """Test processor initialization with custom parameters""" + """Test processor initialization with custom parameters (new cassandra_* names)""" taskgroup_mock = MagicMock() processor = Processor( taskgroup=taskgroup_mock, id='custom-storage', - graph_host='cassandra.example.com', - graph_username='testuser', - graph_password='testpass' + cassandra_host='cassandra.example.com', + cassandra_username='testuser', + cassandra_password='testpass' ) assert processor.graph_host == ['cassandra.example.com'] @@ -46,11 +48,41 @@ class TestCassandraStorageProcessor: processor = Processor( taskgroup=taskgroup_mock, - graph_username='testuser' + cassandra_username='testuser' ) assert processor.username == 'testuser' assert processor.password is None + + def test_processor_initialization_backward_compatibility(self): + """Test processor initialization with old graph_* parameters (backward compatibility)""" + taskgroup_mock = MagicMock() + + processor = Processor( + taskgroup=taskgroup_mock, + graph_host='old-host', + graph_username='old-user', + graph_password='old-pass' + ) + + assert processor.graph_host == ['old-host'] + assert processor.username == 'old-user' + assert processor.password == 'old-pass' + + def test_processor_parameter_precedence(self): + """Test that new cassandra_* parameters take precedence over old graph_* parameters""" + taskgroup_mock = MagicMock() + + processor = Processor( + taskgroup=taskgroup_mock, + cassandra_host='new-host', + graph_host='old-host', # Should be ignored + cassandra_username='new-user', + graph_username='old-user' # Should be ignored + ) + + assert processor.graph_host == ['new-host'] # New parameter wins + assert processor.username == 'new-user' # New parameter wins @pytest.mark.asyncio @patch('trustgraph.storage.triples.cassandra.write.TrustGraph') @@ -62,8 +94,8 @@ class TestCassandraStorageProcessor: processor = Processor( taskgroup=taskgroup_mock, - graph_username='testuser', - graph_password='testpass' + cassandra_username='testuser', + cassandra_password='testpass' ) # Create mock message @@ -76,7 +108,7 @@ class TestCassandraStorageProcessor: # Verify TrustGraph was called with auth parameters mock_trustgraph.assert_called_once_with( - hosts=['localhost'], + hosts=['cassandra'], # Updated default keyspace='user1', table='collection1', username='testuser', @@ -104,7 +136,7 @@ class TestCassandraStorageProcessor: # Verify TrustGraph was called without auth parameters mock_trustgraph.assert_called_once_with( - hosts=['localhost'], + hosts=['cassandra'], # Updated default keyspace='user2', table='collection2' ) @@ -225,16 +257,16 @@ class TestCassandraStorageProcessor: # Verify parent add_args was called mock_parent_add_args.assert_called_once_with(parser) - # Verify our specific arguments were added + # Verify our specific arguments were added (now using cassandra_* names) # Parse empty args to check defaults args = parser.parse_args([]) - assert hasattr(args, 'graph_host') - assert args.graph_host == 'localhost' - assert hasattr(args, 'graph_username') - assert args.graph_username is None - assert hasattr(args, 'graph_password') - assert args.graph_password is None + assert hasattr(args, 'cassandra_host') + assert args.cassandra_host == 'cassandra' # Updated default + assert hasattr(args, 'cassandra_username') + assert args.cassandra_username is None + assert hasattr(args, 'cassandra_password') + assert args.cassandra_password is None def test_add_args_with_custom_values(self): """Test add_args with custom command line values""" @@ -246,31 +278,44 @@ class TestCassandraStorageProcessor: with patch('trustgraph.storage.triples.cassandra.write.TriplesStoreService.add_args'): Processor.add_args(parser) - # Test parsing with custom values + # Test parsing with custom values (new cassandra_* arguments) args = parser.parse_args([ - '--graph-host', 'cassandra.example.com', - '--graph-username', 'testuser', - '--graph-password', 'testpass' + '--cassandra-host', 'cassandra.example.com', + '--cassandra-username', 'testuser', + '--cassandra-password', 'testpass' ]) - assert args.graph_host == 'cassandra.example.com' - assert args.graph_username == 'testuser' - assert args.graph_password == 'testpass' + assert args.cassandra_host == 'cassandra.example.com' + assert args.cassandra_username == 'testuser' + assert args.cassandra_password == 'testpass' - def test_add_args_short_form(self): - """Test add_args with short form arguments""" + def test_add_args_with_env_vars(self): + """Test add_args shows environment variables in help text""" from argparse import ArgumentParser from unittest.mock import patch + import os parser = ArgumentParser() + # Set environment variables + env_vars = { + 'CASSANDRA_HOST': 'env-host1,env-host2', + 'CASSANDRA_USERNAME': 'env-user', + 'CASSANDRA_PASSWORD': 'env-pass' + } + with patch('trustgraph.storage.triples.cassandra.write.TriplesStoreService.add_args'): - Processor.add_args(parser) - - # Test parsing with short form - args = parser.parse_args(['-g', 'short.example.com']) - - assert args.graph_host == 'short.example.com' + with patch.dict(os.environ, env_vars, clear=True): + Processor.add_args(parser) + + # Check that help text includes environment variable info + help_text = parser.format_help() + # Argparse may break lines, so check for components + assert 'env-' in help_text and 'host1' in help_text + assert 'env-host2' in help_text + assert 'env-user' in help_text + assert '' in help_text # Password should be hidden + assert 'env-pass' not in help_text # Password value not shown @patch('trustgraph.storage.triples.cassandra.write.Processor.launch') def test_run_function(self, mock_launch): diff --git a/trustgraph-base/trustgraph/base/cassandra_config.py b/trustgraph-base/trustgraph/base/cassandra_config.py new file mode 100644 index 00000000..34875571 --- /dev/null +++ b/trustgraph-base/trustgraph/base/cassandra_config.py @@ -0,0 +1,144 @@ +""" +Cassandra configuration utilities for standardized parameter handling. + +Provides consistent Cassandra configuration across all TrustGraph processors, +including command-line arguments, environment variables, and defaults. +""" + +import os +import argparse +from typing import Optional, Tuple, List, Any + + +def get_cassandra_defaults() -> dict: + """ + Get default Cassandra configuration values from environment variables or fallback defaults. + + Returns: + dict: Dictionary with 'host', 'username', and 'password' keys + """ + return { + 'host': os.getenv('CASSANDRA_HOST', 'cassandra'), + 'username': os.getenv('CASSANDRA_USERNAME'), + 'password': os.getenv('CASSANDRA_PASSWORD') + } + + +def add_cassandra_args(parser: argparse.ArgumentParser) -> None: + """ + Add standardized Cassandra configuration arguments to an argument parser. + + Shows environment variable values in help text when they are set. + Password values are never displayed for security. + + Args: + parser: ArgumentParser instance to add arguments to + """ + defaults = get_cassandra_defaults() + + # Format help text with environment variable indication + host_help = f"Cassandra host list, comma-separated (default: {defaults['host']})" + if 'CASSANDRA_HOST' in os.environ: + host_help += " [from CASSANDRA_HOST]" + + username_help = "Cassandra username" + if defaults['username']: + username_help += f" (default: {defaults['username']})" + if 'CASSANDRA_USERNAME' in os.environ: + username_help += " [from CASSANDRA_USERNAME]" + + password_help = "Cassandra password" + if defaults['password']: + # Never show actual password value + password_help += " (default: )" + if 'CASSANDRA_PASSWORD' in os.environ: + password_help += " [from CASSANDRA_PASSWORD]" + + parser.add_argument( + '--cassandra-host', + default=defaults['host'], + help=host_help + ) + + parser.add_argument( + '--cassandra-username', + default=defaults['username'], + help=username_help + ) + + parser.add_argument( + '--cassandra-password', + default=defaults['password'], + help=password_help + ) + + +def resolve_cassandra_config( + args: Optional[Any] = None, + host: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None +) -> Tuple[List[str], Optional[str], Optional[str]]: + """ + Resolve Cassandra configuration from various sources. + + Can accept either argparse args object or explicit parameters. + Converts host string to list format for Cassandra driver. + + Args: + args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password + host: Optional explicit host parameter (overrides args) + username: Optional explicit username parameter (overrides args) + password: Optional explicit password parameter (overrides args) + + Returns: + tuple: (hosts_list, username, password) + """ + # If args provided, extract values + if args is not None: + host = host or getattr(args, 'cassandra_host', None) + username = username or getattr(args, 'cassandra_username', None) + password = password or getattr(args, 'cassandra_password', None) + + # Apply defaults if still None + defaults = get_cassandra_defaults() + host = host or defaults['host'] + username = username or defaults['username'] + password = password or defaults['password'] + + # Convert host string to list + if isinstance(host, str): + hosts = [h.strip() for h in host.split(',') if h.strip()] + else: + hosts = host + + return hosts, username, password + + +def get_cassandra_config_from_params(params: dict) -> Tuple[List[str], Optional[str], Optional[str]]: + """ + Extract and resolve Cassandra configuration from a parameters dictionary. + + Handles both old graph_* and new cassandra_* parameter names for backward compatibility. + + Args: + params: Dictionary of parameters that may contain Cassandra configuration + + Returns: + tuple: (hosts_list, username, password) + """ + # Check for new parameter names first + host = params.get('cassandra_host') + username = params.get('cassandra_username') + password = params.get('cassandra_password') + + # Fall back to old graph_* names for backward compatibility + if not host: + host = params.get('graph_host') + if not username: + username = params.get('graph_username', params.get('cassandra_user')) + if not password: + password = params.get('graph_password') + + # Use resolve function to handle defaults and list conversion + return resolve_cassandra_config(host=host, username=username, password=password) \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index c53743e8..b38532d0 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -10,32 +10,40 @@ from .... direct.cassandra import TrustGraph from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error from .... schema import Value, Triple from .... base import TriplesQueryService +from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config # Module logger logger = logging.getLogger(__name__) default_ident = "triples-query" -default_graph_host='localhost' class Processor(TriplesQueryService): def __init__(self, **params): - graph_host = params.get("graph_host", default_graph_host) - graph_username = params.get("graph_username", None) - graph_password = params.get("graph_password", None) + # Use new parameter names, fall back to old for compatibility + cassandra_host = params.get("cassandra_host", params.get("graph_host")) + cassandra_username = params.get("cassandra_username", params.get("graph_username")) + cassandra_password = params.get("cassandra_password", params.get("graph_password")) + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) super(Processor, self).__init__( **params | { - "graph_host": graph_host, - "graph_username": graph_username, + "cassandra_host": ','.join(hosts), + "cassandra_username": username, } ) - self.graph_host = [graph_host] - self.username = graph_username - self.password = graph_password + self.graph_host = hosts + self.username = username + self.password = password self.table = None def create_value(self, ent): @@ -147,24 +155,7 @@ class Processor(TriplesQueryService): def add_args(parser): TriplesQueryService.add_args(parser) - - parser.add_argument( - '-g', '--graph-host', - default="localhost", - help=f'Graph host (default: localhost)' - ) - - parser.add_argument( - '--graph-username', - default=None, - help=f'Cassandra username' - ) - - parser.add_argument( - '--graph-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py index 62e915be..ceb59ccf 100644 --- a/trustgraph-flow/trustgraph/storage/knowledge/store.py +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -8,12 +8,12 @@ import urllib.parse from ... schema import Triples, GraphEmbeddings from ... base import FlowProcessor, ConsumerSpec +from ... base.cassandra_config import add_cassandra_args, resolve_cassandra_config from ... tables.knowledge import KnowledgeTableStore default_ident = "kg-store" -default_cassandra_host = "cassandra" keyspace = "knowledge" class Processor(FlowProcessor): @@ -22,15 +22,18 @@ class Processor(FlowProcessor): id = params.get("id") - cassandra_host = params.get("cassandra_host", default_cassandra_host) - cassandra_user = params.get("cassandra_user") - cassandra_password = params.get("cassandra_password") + # Use helper to resolve configuration + hosts, username, password = resolve_cassandra_config( + host=params.get("cassandra_host"), + username=params.get("cassandra_user", params.get("cassandra_username")), + password=params.get("cassandra_password") + ) super(Processor, self).__init__( **params | { "id": id, - "cassandra_host": cassandra_host, - "cassandra_user": cassandra_user, + "cassandra_host": ','.join(hosts), + "cassandra_username": username, } ) @@ -51,9 +54,9 @@ class Processor(FlowProcessor): ) self.table_store = KnowledgeTableStore( - cassandra_host = cassandra_host.split(","), - cassandra_user = cassandra_user, - cassandra_password = cassandra_password, + cassandra_host = hosts, + cassandra_user = username, + cassandra_password = password, keyspace = keyspace, ) @@ -71,6 +74,7 @@ class Processor(FlowProcessor): def add_args(parser): FlowProcessor.add_args(parser) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py index b4d5dd3c..0d7fafc6 100644 --- a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py @@ -14,12 +14,12 @@ from cassandra import ConsistencyLevel from .... schema import ExtractedObject from .... schema import RowSchema, Field from .... base import FlowProcessor, ConsumerSpec +from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config # Module logger logger = logging.getLogger(__name__) default_ident = "objects-write" -default_graph_host = 'localhost' class Processor(FlowProcessor): @@ -27,10 +27,22 @@ class Processor(FlowProcessor): id = params.get("id", default_ident) - # Cassandra connection parameters - self.graph_host = params.get("graph_host", default_graph_host) - self.graph_username = params.get("graph_username", None) - self.graph_password = params.get("graph_password", None) + # Use new parameter names, fall back to old for compatibility + cassandra_host = params.get("cassandra_host", params.get("graph_host")) + cassandra_username = params.get("cassandra_username", params.get("graph_username")) + cassandra_password = params.get("cassandra_password", params.get("graph_password")) + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) + + # Store resolved configuration + self.graph_host = hosts # Store as list + self.graph_username = username + self.graph_password = password # Config key for schemas self.config_key = params.get("config_type", "schema") @@ -76,11 +88,11 @@ class Processor(FlowProcessor): password=self.graph_password ) self.cluster = Cluster( - contact_points=[self.graph_host], + contact_points=self.graph_host, auth_provider=auth_provider ) else: - self.cluster = Cluster(contact_points=[self.graph_host]) + self.cluster = Cluster(contact_points=self.graph_host) self.session = self.cluster.connect() logger.info(f"Connected to Cassandra cluster at {self.graph_host}") @@ -381,24 +393,7 @@ class Processor(FlowProcessor): """Add command-line arguments""" FlowProcessor.add_args(parser) - - parser.add_argument( - '-g', '--graph-host', - default=default_graph_host, - help=f'Cassandra host (default: {default_graph_host})' - ) - - parser.add_argument( - '--graph-username', - default=None, - help='Cassandra username' - ) - - parser.add_argument( - '--graph-password', - default=None, - help='Cassandra password' - ) + add_cassandra_args(parser) parser.add_argument( '--config-type', diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index e8948668..343a8dfd 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -3,6 +3,8 @@ Graph writer. Input is graph edge. Writes edges to Cassandra graph. """ +raise RuntimeError("This code is no longer in use") + import pulsar import base64 import os @@ -14,9 +16,9 @@ from cassandra.auth import PlainTextAuthProvider from ssl import SSLContext, PROTOCOL_TLSv1_2 from .... schema import Rows -from .... schema import rows_store_queue from .... log_level import LogLevel from .... base import Consumer +from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config # Module logger logger = logging.getLogger(__name__) @@ -24,9 +26,8 @@ logger = logging.getLogger(__name__) module = "rows-write" ssl_context = SSLContext(PROTOCOL_TLSv1_2) -default_input_queue = rows_store_queue +default_input_queue = "rows-store" # Default queue name default_subscriber = module -default_graph_host='localhost' class Processor(Consumer): @@ -34,26 +35,35 @@ class Processor(Consumer): input_queue = params.get("input_queue", default_input_queue) subscriber = params.get("subscriber", default_subscriber) - graph_host = params.get("graph_host", default_graph_host) - graph_username = params.get("graph_username", None) - graph_password = params.get("graph_password", None) + + # Use new parameter names, fall back to old for compatibility + cassandra_host = params.get("cassandra_host", params.get("graph_host")) + cassandra_username = params.get("cassandra_username", params.get("graph_username")) + cassandra_password = params.get("cassandra_password", params.get("graph_password")) + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) super(Processor, self).__init__( **params | { "input_queue": input_queue, "subscriber": subscriber, "input_schema": Rows, - "graph_host": graph_host, - "graph_username": graph_username, - "graph_password": graph_password, + "cassandra_host": ','.join(hosts), + "cassandra_username": username, + "cassandra_password": password, } ) - if graph_username and graph_password: - auth_provider = PlainTextAuthProvider(username=graph_username, password=graph_password) - self.cluster = Cluster(graph_host.split(","), auth_provider=auth_provider, ssl_context=ssl_context) + if username and password: + auth_provider = PlainTextAuthProvider(username=username, password=password) + self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context) else: - self.cluster = Cluster(graph_host.split(",")) + self.cluster = Cluster(hosts) self.session = self.cluster.connect() self.tables = set() @@ -128,24 +138,7 @@ class Processor(Consumer): Consumer.add_args( parser, default_input_queue, default_subscriber, ) - - parser.add_argument( - '-g', '--graph-host', - default="localhost", - help=f'Graph host (default: localhost)' - ) - - parser.add_argument( - '--graph-username', - default=None, - help=f'Cassandra username' - ) - - parser.add_argument( - '--graph-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index ac790bcc..b83d7cd6 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -12,13 +12,13 @@ import logging from .... direct.cassandra import TrustGraph from .... base import TriplesStoreService +from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config # Module logger logger = logging.getLogger(__name__) default_ident = "triples-write" -default_graph_host='localhost' class Processor(TriplesStoreService): @@ -26,20 +26,28 @@ class Processor(TriplesStoreService): id = params.get("id", default_ident) - graph_host = params.get("graph_host", default_graph_host) - graph_username = params.get("graph_username", None) - graph_password = params.get("graph_password", None) + # Use new parameter names, fall back to old for compatibility + cassandra_host = params.get("cassandra_host", params.get("graph_host")) + cassandra_username = params.get("cassandra_username", params.get("graph_username")) + cassandra_password = params.get("cassandra_password", params.get("graph_password")) + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) super(Processor, self).__init__( **params | { - "graph_host": graph_host, - "graph_username": graph_username + "cassandra_host": ','.join(hosts), + "cassandra_username": username } ) - self.graph_host = [graph_host] - self.username = graph_username - self.password = graph_password + self.graph_host = hosts + self.username = username + self.password = password self.table = None async def store_triples(self, message): @@ -82,24 +90,7 @@ class Processor(TriplesStoreService): def add_args(parser): TriplesStoreService.add_args(parser) - - parser.add_argument( - '-g', '--graph-host', - default="localhost", - help=f'Graph host (default: localhost)' - ) - - parser.add_argument( - '--graph-username', - default=None, - help=f'Cassandra username' - ) - - parser.add_argument( - '--graph-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py index c0c0a84a..346ee569 100644 --- a/trustgraph-flow/trustgraph/tables/config.py +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -24,6 +24,10 @@ class ConfigTableStore: logger.info("Connecting to Cassandra...") + # Ensure cassandra_host is a list + if isinstance(cassandra_host, str): + cassandra_host = [h.strip() for h in cassandra_host.split(',')] + if cassandra_user and cassandra_password: ssl_context = SSLContext(PROTOCOL_TLSv1_2) auth_provider = PlainTextAuthProvider( diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index dc83dbf2..92f577ae 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -24,6 +24,10 @@ class KnowledgeTableStore: logger.info("Connecting to Cassandra...") + # Ensure cassandra_host is a list + if isinstance(cassandra_host, str): + cassandra_host = [h.strip() for h in cassandra_host.split(',')] + if cassandra_user and cassandra_password: ssl_context = SSLContext(PROTOCOL_TLSv1_2) auth_provider = PlainTextAuthProvider( diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index b186d063..9f3695e1 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -28,6 +28,10 @@ class LibraryTableStore: logger.info("Connecting to Cassandra...") + # Ensure cassandra_host is a list + if isinstance(cassandra_host, str): + cassandra_host = [h.strip() for h in cassandra_host.split(',')] + if cassandra_user and cassandra_password: ssl_context = SSLContext(PROTOCOL_TLSv1_2) auth_provider = PlainTextAuthProvider(