From fd8d5b2c429dad2c18bcffeb8ca87ae2022b275f Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Fri, 8 May 2026 19:48:12 +0100 Subject: [PATCH] Recent fixes -> release/v2.4 (#891) * Fix publisher resource leak in librarian submit_document (#883) Wrap pub.start()/pub.send() in try/finally to guarantee pub.stop() is called on error. Remove unnecessary asyncio.sleep(1) kludge. * Make Cassandra replication factor configurable (issue #787) (#887) Add CASSANDRA_REPLICATION_FACTOR environment variable and --cassandra-replication-factor CLI argument to cassandra_config.py. Update all four table store constructors (ConfigTableStore, KnowledgeTableStore, LibraryTableStore, IamTableStore) to accept an optional replication_factor parameter and use it in keyspace creation CQL queries. Thread the replication factor through all service constructors: Configuration, KnowledgeManager, Librarian, IamService, and knowledge store Processor. * Update tests --------- Co-authored-by: gittihub-jpg --- .../test_cassandra_config_end_to_end.py | 15 ++++--- tests/unit/test_base/test_cassandra_config.py | 42 +++++++++---------- .../test_cassandra_config_integration.py | 15 ++++--- .../trustgraph/base/cassandra_config.py | 31 ++++++++++---- .../trustgraph/config/service/config.py | 5 ++- .../trustgraph/config/service/service.py | 3 +- trustgraph-flow/trustgraph/cores/knowledge.py | 5 ++- trustgraph-flow/trustgraph/cores/service.py | 3 +- trustgraph-flow/trustgraph/iam/service/iam.py | 4 +- .../trustgraph/iam/service/service.py | 3 +- .../trustgraph/librarian/librarian.py | 4 +- .../trustgraph/librarian/service.py | 16 ++++--- .../query/rows/cassandra/service.py | 2 +- .../query/triples/cassandra/service.py | 2 +- .../trustgraph/storage/knowledge/store.py | 3 +- .../storage/rows/cassandra/write.py | 2 +- .../storage/triples/cassandra/write.py | 2 +- trustgraph-flow/trustgraph/tables/config.py | 5 ++- trustgraph-flow/trustgraph/tables/iam.py | 5 ++- .../trustgraph/tables/knowledge.py | 5 ++- trustgraph-flow/trustgraph/tables/library.py | 5 ++- 21 files changed, 105 insertions(+), 72 deletions(-) diff --git a/tests/integration/test_cassandra_config_end_to_end.py b/tests/integration/test_cassandra_config_end_to_end.py index 1e4276fe..514a5dbf 100644 --- a/tests/integration/test_cassandra_config_end_to_end.py +++ b/tests/integration/test_cassandra_config_end_to_end.py @@ -110,7 +110,8 @@ class TestEndToEndConfigurationFlow: cassandra_host=['kg-host1', 'kg-host2', 'kg-host3', 'kg-host4'], cassandra_username='kg-user', cassandra_password='kg-pass', - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @@ -182,7 +183,8 @@ class TestConfigurationPriorityEndToEnd: cassandra_host=['partial-host'], # From parameter cassandra_username='fallback-user', # From environment cassandra_password='fallback-pass', # From environment - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @pytest.mark.asyncio @@ -273,7 +275,8 @@ class TestNoBackwardCompatibilityEndToEnd: cassandra_host=['legacy-kg-host'], cassandra_username=None, # Should be None since cassandra_user is not recognized cassandra_password='legacy-kg-pass', - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @pytest.mark.asyncio @@ -367,13 +370,13 @@ class TestMultipleHostsHandling: from trustgraph.base.cassandra_config import resolve_cassandra_config # Test various whitespace scenarios - hosts1, _, _, _ = resolve_cassandra_config(host='host1, host2 , host3') + hosts1, _, _, _, _ = resolve_cassandra_config(host='host1, host2 , host3') assert hosts1 == ['host1', 'host2', 'host3'] - hosts2, _, _, _ = resolve_cassandra_config(host='host1,host2,host3,') + hosts2, _, _, _, _ = resolve_cassandra_config(host='host1,host2,host3,') assert hosts2 == ['host1', 'host2', 'host3'] - hosts3, _, _, _ = resolve_cassandra_config(host=' host1 , host2 ') + hosts3, _, _, _, _ = resolve_cassandra_config(host=' host1 , host2 ') assert hosts3 == ['host1', 'host2'] diff --git a/tests/unit/test_base/test_cassandra_config.py b/tests/unit/test_base/test_cassandra_config.py index 5703c7e1..a291434d 100644 --- a/tests/unit/test_base/test_cassandra_config.py +++ b/tests/unit/test_base/test_cassandra_config.py @@ -145,7 +145,7 @@ class TestResolveCassandraConfig: def test_default_configuration(self): """Test resolution with no parameters or environment variables.""" with patch.dict(os.environ, {}, clear=True): - hosts, username, password, keyspace = resolve_cassandra_config() + hosts, username, password, keyspace, _ = resolve_cassandra_config() assert hosts == ['cassandra'] assert username is None @@ -160,7 +160,7 @@ class TestResolveCassandraConfig: } with patch.dict(os.environ, env_vars, clear=True): - hosts, username, password, keyspace = resolve_cassandra_config() + hosts, username, password, keyspace, _ = resolve_cassandra_config() assert hosts == ['env1', 'env2', 'env3'] assert username == 'env-user' @@ -175,7 +175,7 @@ class TestResolveCassandraConfig: } with patch.dict(os.environ, env_vars, clear=True): - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host='explicit-host', username='explicit-user', password='explicit-pass' @@ -188,19 +188,19 @@ class TestResolveCassandraConfig: def test_host_list_parsing(self): """Test different host list formats.""" # Single host - hosts, _, _, _ = resolve_cassandra_config(host='single-host') + hosts, _, _, _, _ = resolve_cassandra_config(host='single-host') assert hosts == ['single-host'] # Multiple hosts with spaces - hosts, _, _, _ = resolve_cassandra_config(host='host1, host2 ,host3') + hosts, _, _, _, _ = resolve_cassandra_config(host='host1, host2 ,host3') assert hosts == ['host1', 'host2', 'host3'] # Empty elements filtered out - hosts, _, _, _ = resolve_cassandra_config(host='host1,,host2,') + hosts, _, _, _, _ = resolve_cassandra_config(host='host1,,host2,') assert hosts == ['host1', 'host2'] # Already a list - hosts, _, _, _ = resolve_cassandra_config(host=['list-host1', 'list-host2']) + hosts, _, _, _, _ = resolve_cassandra_config(host=['list-host1', 'list-host2']) assert hosts == ['list-host1', 'list-host2'] def test_args_object_resolution(self): @@ -212,7 +212,7 @@ class TestResolveCassandraConfig: cassandra_password = 'args-pass' args = MockArgs() - hosts, username, password, keyspace = resolve_cassandra_config(args) + hosts, username, password, keyspace, _ = resolve_cassandra_config(args) assert hosts == ['args-host1', 'args-host2'] assert username == 'args-user' @@ -233,7 +233,7 @@ class TestResolveCassandraConfig: with patch.dict(os.environ, env_vars, clear=True): args = PartialArgs() - hosts, username, password, keyspace = resolve_cassandra_config(args) + hosts, username, password, keyspace, _ = resolve_cassandra_config(args) assert hosts == ['args-host'] # From args assert username == 'env-user' # From env @@ -251,7 +251,7 @@ class TestGetCassandraConfigFromParams: 'cassandra_password': 'new-pass' } - hosts, username, password, keyspace = get_cassandra_config_from_params(params) + hosts, username, password, keyspace, _ = get_cassandra_config_from_params(params) assert hosts == ['new-host1', 'new-host2'] assert username == 'new-user' @@ -265,7 +265,7 @@ class TestGetCassandraConfigFromParams: 'graph_password': 'old-pass' } - hosts, username, password, keyspace = get_cassandra_config_from_params(params) + hosts, username, password, keyspace, _ = get_cassandra_config_from_params(params) # Should use defaults since graph_* params are not recognized assert hosts == ['cassandra'] # Default @@ -280,7 +280,7 @@ class TestGetCassandraConfigFromParams: 'cassandra_password': 'compat-pass' } - hosts, username, password, keyspace = get_cassandra_config_from_params(params) + hosts, username, password, keyspace, _ = get_cassandra_config_from_params(params) assert hosts == ['compat-host'] assert username is None # cassandra_user is not recognized @@ -298,7 +298,7 @@ class TestGetCassandraConfigFromParams: 'graph_password': 'old-pass' } - hosts, username, password, keyspace = get_cassandra_config_from_params(params) + hosts, username, password, keyspace, _ = get_cassandra_config_from_params(params) assert hosts == ['new-host'] # Only cassandra_* params work assert username == 'new-user' # Only cassandra_* params work @@ -314,7 +314,7 @@ class TestGetCassandraConfigFromParams: with patch.dict(os.environ, env_vars, clear=True): params = {} - hosts, username, password, keyspace = get_cassandra_config_from_params(params) + hosts, username, password, keyspace, _ = get_cassandra_config_from_params(params) assert hosts == ['fallback-host1', 'fallback-host2'] assert username == 'fallback-user' @@ -334,7 +334,7 @@ class TestConfigurationPriority: with patch.dict(os.environ, env_vars, clear=True): # CLI args should override everything - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host='cli-host', username='cli-user', password='cli-pass' @@ -354,7 +354,7 @@ class TestConfigurationPriority: with patch.dict(os.environ, env_vars, clear=True): # Only provide host via CLI - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host='cli-host' # username and password not provided ) @@ -366,7 +366,7 @@ class TestConfigurationPriority: def test_no_config_defaults(self): """Test that defaults are used when no configuration is provided.""" with patch.dict(os.environ, {}, clear=True): - hosts, username, password, keyspace = resolve_cassandra_config() + hosts, username, password, keyspace, _ = resolve_cassandra_config() assert hosts == ['cassandra'] # Default assert username is None # Default @@ -378,17 +378,17 @@ class TestEdgeCases: def test_empty_host_string(self): """Test handling of empty host string falls back to default.""" - hosts, _, _, _ = resolve_cassandra_config(host='') + hosts, _, _, _, _ = resolve_cassandra_config(host='') assert hosts == ['cassandra'] # Falls back to default def test_whitespace_only_host(self): """Test handling of whitespace-only host string.""" - hosts, _, _, _ = resolve_cassandra_config(host=' ') + hosts, _, _, _, _ = resolve_cassandra_config(host=' ') assert hosts == [] # Empty after stripping whitespace def test_none_values_preserved(self): """Test that None values are preserved correctly.""" - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host=None, username=None, password=None @@ -401,7 +401,7 @@ class TestEdgeCases: def test_mixed_none_and_values(self): """Test mixing None and actual values.""" - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host='mixed-host', username=None, password='mixed-pass' diff --git a/tests/unit/test_storage/test_cassandra_config_integration.py b/tests/unit/test_storage/test_cassandra_config_integration.py index 0956f4e7..87a366fc 100644 --- a/tests/unit/test_storage/test_cassandra_config_integration.py +++ b/tests/unit/test_storage/test_cassandra_config_integration.py @@ -218,7 +218,8 @@ class TestKgStoreConfiguration: cassandra_host=['kg-env-host1', 'kg-env-host2', 'kg-env-host3'], cassandra_username='kg-env-user', cassandra_password='kg-env-pass', - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') @@ -239,7 +240,8 @@ class TestKgStoreConfiguration: cassandra_host=['explicit-host'], cassandra_username='explicit-user', cassandra_password='explicit-pass', - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') @@ -260,7 +262,8 @@ class TestKgStoreConfiguration: cassandra_host=['compat-host'], cassandra_username=None, # Should be None since cassandra_user is ignored cassandra_password='compat-pass', - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @patch('trustgraph.storage.knowledge.store.KnowledgeTableStore') @@ -277,7 +280,8 @@ class TestKgStoreConfiguration: cassandra_host=['cassandra'], cassandra_username=None, cassandra_password=None, - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) @@ -425,5 +429,6 @@ class TestConfigurationPriorityIntegration: cassandra_host=['param-host'], # From parameter cassandra_username='env-user', # From environment cassandra_password='env-pass', # From environment - keyspace='knowledge' + keyspace='knowledge', + replication_factor=1, ) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/base/cassandra_config.py b/trustgraph-base/trustgraph/base/cassandra_config.py index bacc4313..78505c68 100644 --- a/trustgraph-base/trustgraph/base/cassandra_config.py +++ b/trustgraph-base/trustgraph/base/cassandra_config.py @@ -21,7 +21,8 @@ def get_cassandra_defaults() -> dict: 'host': os.getenv('CASSANDRA_HOST', 'cassandra'), 'username': os.getenv('CASSANDRA_USERNAME'), 'password': os.getenv('CASSANDRA_PASSWORD'), - 'keyspace': os.getenv('CASSANDRA_KEYSPACE') + 'keyspace': os.getenv('CASSANDRA_KEYSPACE'), + 'replication_factor': int(os.getenv('CASSANDRA_REPLICATION_FACTOR', '1')) } @@ -85,6 +86,17 @@ def add_cassandra_args(parser: argparse.ArgumentParser) -> None: help=keyspace_help ) + replication_factor_help = f"Cassandra keyspace replication factor (default: {defaults['replication_factor']})" + if 'CASSANDRA_REPLICATION_FACTOR' in os.environ: + replication_factor_help += " [from CASSANDRA_REPLICATION_FACTOR]" + + parser.add_argument( + '--cassandra-replication-factor', + type=int, + default=defaults['replication_factor'], + help=replication_factor_help + ) + def resolve_cassandra_config( args: Optional[Any] = None, @@ -92,7 +104,7 @@ def resolve_cassandra_config( username: Optional[str] = None, password: Optional[str] = None, default_keyspace: Optional[str] = None -) -> Tuple[List[str], Optional[str], Optional[str], Optional[str]]: +) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]: """ Resolve Cassandra configuration from various sources. @@ -100,22 +112,24 @@ def resolve_cassandra_config( Converts host string to list format for Cassandra driver. Args: - args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace + args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace, cassandra_replication_factor host: Optional explicit host parameter (overrides args) username: Optional explicit username parameter (overrides args) password: Optional explicit password parameter (overrides args) default_keyspace: Optional default keyspace if not specified elsewhere Returns: - tuple: (hosts_list, username, password, keyspace) + tuple: (hosts_list, username, password, keyspace, replication_factor) """ # If args provided, extract values keyspace = None + replication_factor = 1 if args is not None: host = host or getattr(args, 'cassandra_host', None) username = username or getattr(args, 'cassandra_username', None) password = password or getattr(args, 'cassandra_password', None) keyspace = getattr(args, 'cassandra_keyspace', None) + replication_factor = getattr(args, 'cassandra_replication_factor', 1) # Apply defaults if still None defaults = get_cassandra_defaults() @@ -123,6 +137,7 @@ def resolve_cassandra_config( username = username or defaults['username'] password = password or defaults['password'] keyspace = keyspace or defaults['keyspace'] or default_keyspace + replication_factor = replication_factor or defaults['replication_factor'] # Convert host string to list if isinstance(host, str): @@ -130,13 +145,13 @@ def resolve_cassandra_config( else: hosts = host - return hosts, username, password, keyspace + return hosts, username, password, keyspace, replication_factor def get_cassandra_config_from_params( params: dict, default_keyspace: Optional[str] = None -) -> Tuple[List[str], Optional[str], Optional[str], Optional[str]]: +) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]: """ Extract and resolve Cassandra configuration from a parameters dictionary. @@ -145,14 +160,12 @@ def get_cassandra_config_from_params( default_keyspace: Optional default keyspace if not specified in params Returns: - tuple: (hosts_list, username, password, keyspace) + tuple: (hosts_list, username, password, keyspace, replication_factor) """ - # Get Cassandra parameters host = params.get('cassandra_host') username = params.get('cassandra_username') password = params.get('cassandra_password') - # Use resolve function to handle defaults and list conversion return resolve_cassandra_config( host=host, username=username, diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index ced4cbe7..a9314772 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -15,13 +15,14 @@ TEMPLATE_WORKSPACE = "__template__" class Configuration: - def __init__(self, push, host, username, password, keyspace): + def __init__(self, push, host, username, password, keyspace, + replication_factor=1): # External function to respond to update self.push = push self.table_store = ConfigTableStore( - host, username, password, keyspace + host, username, password, keyspace, replication_factor ) async def inc_version(self): diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index fd911352..c5fac198 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -79,7 +79,7 @@ class Processor(AsyncProcessor): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password, @@ -147,6 +147,7 @@ class Processor(AsyncProcessor): username = self.cassandra_username, password = self.cassandra_password, keyspace = keyspace, + replication_factor = replication_factor, push = self.push ) diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index c3ecfe96..09c6137d 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -17,11 +17,12 @@ class KnowledgeManager: def __init__( self, cassandra_host, cassandra_username, cassandra_password, - keyspace, flow_config, + keyspace, flow_config, replication_factor=1, ): self.table_store = KnowledgeTableStore( - cassandra_host, cassandra_username, cassandra_password, keyspace + cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor ) self.loader_queue = asyncio.Queue(maxsize=20) diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index f3472b58..c84b536c 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -56,7 +56,7 @@ class Processor(WorkspaceProcessor): cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password, @@ -83,6 +83,7 @@ class Processor(WorkspaceProcessor): cassandra_password = self.cassandra_password, keyspace = keyspace, flow_config = self, + replication_factor = replication_factor, ) self.register_config_handler(self.on_knowledge_config, types=["flow"]) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index b9febb0f..755a1c5d 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -246,9 +246,11 @@ class IamService: def __init__(self, host, username, password, keyspace, bootstrap_mode, bootstrap_token=None, - on_workspace_created=None, on_workspace_deleted=None): + on_workspace_created=None, on_workspace_deleted=None, + replication_factor=1): self.table_store = IamTableStore( host, username, password, keyspace, + replication_factor, ) # bootstrap_mode: "token" or "bootstrap". In "token" mode the # service auto-seeds on first start using the provided diff --git a/trustgraph-flow/trustgraph/iam/service/service.py b/trustgraph-flow/trustgraph/iam/service/service.py index 9e1fe707..8ce22757 100644 --- a/trustgraph-flow/trustgraph/iam/service/service.py +++ b/trustgraph-flow/trustgraph/iam/service/service.py @@ -96,7 +96,7 @@ class Processor(AsyncProcessor): cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password, @@ -149,6 +149,7 @@ class Processor(AsyncProcessor): username=self.cassandra_username, password=self.cassandra_password, keyspace=keyspace, + replication_factor=replication_factor, bootstrap_mode=self.bootstrap_mode, bootstrap_token=self.bootstrap_token, on_workspace_created=self._ensure_workspace_registered, diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 653c573f..1c4d010e 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -28,6 +28,7 @@ class Librarian: bucket_name, keyspace, load_document, object_store_use_ssl=False, object_store_region=None, min_chunk_size=1, # Default: no minimum (for Garage) + replication_factor=1, ): self.blob_store = BlobStore( @@ -36,7 +37,8 @@ class Librarian: ) self.table_store = LibraryTableStore( - cassandra_host, cassandra_username, cassandra_password, keyspace + cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor ) self.load_document = load_document diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 2e73e6f8..cc5efdae 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -117,7 +117,7 @@ class Processor(WorkspaceProcessor): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password, @@ -179,6 +179,7 @@ class Processor(WorkspaceProcessor): object_store_secret_key = object_store_secret_key, bucket_name = bucket_name, keyspace = keyspace, + replication_factor = replication_factor, load_document = self.load_document, object_store_use_ssl = object_store_use_ssl, object_store_region = object_store_region, @@ -450,14 +451,11 @@ class Processor(WorkspaceProcessor): self.pubsub, q, schema=schema ) - await pub.start() - - # FIXME: Time wait kludge? - await asyncio.sleep(1) - - await pub.send(None, doc) - - await pub.stop() + try: + await pub.start() + await pub.send(None, doc) + finally: + await pub.stop() logger.debug("Document submitted") diff --git a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py index cabdf617..73cfcd83 100644 --- a/trustgraph-flow/trustgraph/query/rows/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/rows/cassandra/service.py @@ -47,7 +47,7 @@ class Processor(FlowProcessor): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index efce5968..a9bdbbac 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -160,7 +160,7 @@ class Processor(TriplesQueryService): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py index 57e1fe48..162a4057 100644 --- a/trustgraph-flow/trustgraph/storage/knowledge/store.py +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -23,7 +23,7 @@ class Processor(FlowProcessor): id = params.get("id") # Use helper to resolve configuration - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=params.get("cassandra_host"), username=params.get("cassandra_username"), password=params.get("cassandra_password"), @@ -59,6 +59,7 @@ class Processor(FlowProcessor): cassandra_username = username, cassandra_password = password, keyspace = keyspace, + replication_factor = replication_factor, ) async def on_triples(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index acfe00d2..a5dad748 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -47,7 +47,7 @@ class Processor(CollectionConfigHandler, FlowProcessor): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index 05331d09..0774153b 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -125,7 +125,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, _ = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py index 8fd00427..74ceb6f4 100644 --- a/trustgraph-flow/trustgraph/tables/config.py +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -20,9 +20,11 @@ class ConfigTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -57,12 +59,11 @@ class ConfigTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """); diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py index f1a0734f..8bf9c8b4 100644 --- a/trustgraph-flow/trustgraph/tables/iam.py +++ b/trustgraph-flow/trustgraph/tables/iam.py @@ -28,8 +28,10 @@ class IamTableStore: self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("IAM: connecting to Cassandra...") @@ -57,12 +59,11 @@ class IamTableStore: self._prepare_statements() def _ensure_schema(self): - # FIXME: Replication factor should be configurable. self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """) self.cassandra.set_keyspace(self.keyspace) diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 4d729956..5d45358d 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -36,9 +36,11 @@ class KnowledgeTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -73,12 +75,11 @@ class KnowledgeTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """); diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index 6dd5d3e4..58486f0e 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -40,9 +40,11 @@ class LibraryTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -77,12 +79,11 @@ class LibraryTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """);