diff --git a/trustgraph-base/trustgraph/base/cassandra_config.py b/trustgraph-base/trustgraph/base/cassandra_config.py index bacc4313..657277f6 100644 --- a/trustgraph-base/trustgraph/base/cassandra_config.py +++ b/trustgraph-base/trustgraph/base/cassandra_config.py @@ -21,7 +21,8 @@ def get_cassandra_defaults() -> dict: 'host': os.getenv('CASSANDRA_HOST', 'cassandra'), 'username': os.getenv('CASSANDRA_USERNAME'), 'password': os.getenv('CASSANDRA_PASSWORD'), - 'keyspace': os.getenv('CASSANDRA_KEYSPACE') + 'keyspace': os.getenv('CASSANDRA_KEYSPACE'), + 'replication_factor': int(os.getenv('CASSANDRA_REPLICATION_FACTOR', '1')) } @@ -85,6 +86,17 @@ def add_cassandra_args(parser: argparse.ArgumentParser) -> None: help=keyspace_help ) + replication_factor_help = f"Cassandra keyspace replication factor (default: {defaults['replication_factor']})" + if 'CASSANDRA_REPLICATION_FACTOR' in os.environ: + replication_factor_help += " [from CASSANDRA_REPLICATION_FACTOR]" + + parser.add_argument( + '--cassandra-replication-factor', + type=int, + default=defaults['replication_factor'], + help=replication_factor_help + ) + def resolve_cassandra_config( args: Optional[Any] = None, @@ -92,7 +104,7 @@ def resolve_cassandra_config( username: Optional[str] = None, password: Optional[str] = None, default_keyspace: Optional[str] = None -) -> Tuple[List[str], Optional[str], Optional[str], Optional[str]]: +) -> Tuple[List[str], Optional[str], Optional[str], Optional[str], int]: """ Resolve Cassandra configuration from various sources. @@ -100,22 +112,24 @@ def resolve_cassandra_config( Converts host string to list format for Cassandra driver. Args: - args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace + args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace, cassandra_replication_factor host: Optional explicit host parameter (overrides args) username: Optional explicit username parameter (overrides args) password: Optional explicit password parameter (overrides args) default_keyspace: Optional default keyspace if not specified elsewhere Returns: - tuple: (hosts_list, username, password, keyspace) + tuple: (hosts_list, username, password, keyspace, replication_factor) """ # If args provided, extract values keyspace = None + replication_factor = 1 if args is not None: host = host or getattr(args, 'cassandra_host', None) username = username or getattr(args, 'cassandra_username', None) password = password or getattr(args, 'cassandra_password', None) keyspace = getattr(args, 'cassandra_keyspace', None) + replication_factor = getattr(args, 'cassandra_replication_factor', 1) # Apply defaults if still None defaults = get_cassandra_defaults() @@ -123,6 +137,7 @@ def resolve_cassandra_config( username = username or defaults['username'] password = password or defaults['password'] keyspace = keyspace or defaults['keyspace'] or default_keyspace + replication_factor = replication_factor or defaults['replication_factor'] # Convert host string to list if isinstance(host, str): @@ -130,7 +145,7 @@ def resolve_cassandra_config( else: hosts = host - return hosts, username, password, keyspace + return hosts, username, password, keyspace, replication_factor def get_cassandra_config_from_params( diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index 36af6026..697a255d 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -11,13 +11,14 @@ logger = logging.getLogger(__name__) class Configuration: - def __init__(self, push, host, username, password, keyspace): + def __init__(self, push, host, username, password, keyspace, + replication_factor=1): # External function to respond to update self.push = push self.table_store = ConfigTableStore( - host, username, password, keyspace + host, username, password, keyspace, replication_factor ) async def inc_version(self): diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index ab5f78f0..e43a5a16 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -17,11 +17,12 @@ class KnowledgeManager: def __init__( self, cassandra_host, cassandra_username, cassandra_password, - keyspace, flow_config, + keyspace, flow_config, replication_factor=1, ): self.table_store = KnowledgeTableStore( - cassandra_host, cassandra_username, cassandra_password, keyspace + cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor ) self.loader_queue = asyncio.Queue(maxsize=20) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index c89f65b0..2b979310 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -245,9 +245,11 @@ def _sign_jwt(kid, private_pem, claims): class IamService: def __init__(self, host, username, password, keyspace, - bootstrap_mode, bootstrap_token=None): + bootstrap_mode, bootstrap_token=None, + replication_factor=1): self.table_store = IamTableStore( host, username, password, keyspace, + replication_factor, ) # bootstrap_mode: "token" or "bootstrap". In "token" mode the # service auto-seeds on first start using the provided diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index af1d69b1..56a7b59c 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -28,6 +28,7 @@ class Librarian: bucket_name, keyspace, load_document, object_store_use_ssl=False, object_store_region=None, min_chunk_size=1, # Default: no minimum (for Garage) + replication_factor=1, ): self.blob_store = BlobStore( @@ -36,7 +37,8 @@ class Librarian: ) self.table_store = LibraryTableStore( - cassandra_host, cassandra_username, cassandra_password, keyspace + cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor ) self.load_document = load_document diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py index 57e1fe48..162a4057 100644 --- a/trustgraph-flow/trustgraph/storage/knowledge/store.py +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -23,7 +23,7 @@ class Processor(FlowProcessor): id = params.get("id") # Use helper to resolve configuration - hosts, username, password, keyspace = resolve_cassandra_config( + hosts, username, password, keyspace, replication_factor = resolve_cassandra_config( host=params.get("cassandra_host"), username=params.get("cassandra_username"), password=params.get("cassandra_password"), @@ -59,6 +59,7 @@ class Processor(FlowProcessor): cassandra_username = username, cassandra_password = password, keyspace = keyspace, + replication_factor = replication_factor, ) async def on_triples(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py index 8fd00427..74ceb6f4 100644 --- a/trustgraph-flow/trustgraph/tables/config.py +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -20,9 +20,11 @@ class ConfigTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -57,12 +59,11 @@ class ConfigTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """); diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py index f1a0734f..8bf9c8b4 100644 --- a/trustgraph-flow/trustgraph/tables/iam.py +++ b/trustgraph-flow/trustgraph/tables/iam.py @@ -28,8 +28,10 @@ class IamTableStore: self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("IAM: connecting to Cassandra...") @@ -57,12 +59,11 @@ class IamTableStore: self._prepare_statements() def _ensure_schema(self): - # FIXME: Replication factor should be configurable. self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """) self.cassandra.set_keyspace(self.keyspace) diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 4d729956..5d45358d 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -36,9 +36,11 @@ class KnowledgeTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -73,12 +75,11 @@ class KnowledgeTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """); diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index 86706079..a3adb57c 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -40,9 +40,11 @@ class LibraryTableStore: def __init__( self, cassandra_host, cassandra_username, cassandra_password, keyspace, + replication_factor=1, ): self.keyspace = keyspace + self.replication_factor = replication_factor logger.info("Connecting to Cassandra...") @@ -77,12 +79,11 @@ class LibraryTableStore: logger.debug("Keyspace...") - # FIXME: Replication factor should be configurable self.cassandra.execute(f""" create keyspace if not exists {self.keyspace} with replication = {{ 'class' : 'SimpleStrategy', - 'replication_factor' : 1 + 'replication_factor' : {self.replication_factor} }}; """);