diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index a23a33b9..84ed2a6a 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -15,6 +15,7 @@ from trustgraph.schema import FlowRequest, FlowResponse from trustgraph.schema import flow_request_queue, flow_response_queue from trustgraph.base import AsyncProcessor, Consumer, Producer +from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config from . config import Configuration from . flow import FlowConfig @@ -60,9 +61,21 @@ class Processor(AsyncProcessor): "flow_response_queue", default_flow_response_queue ) - cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) + + # Store resolved configuration + self.cassandra_host = hosts + self.cassandra_username = username + self.cassandra_password = password id = params.get("id") @@ -76,8 +89,9 @@ class Processor(AsyncProcessor): "config_push_schema": ConfigPush.__name__, "flow_request_schema": FlowRequest.__name__, "flow_response_schema": FlowResponse.__name__, - "cassandra_host": cassandra_host, - "cassandra_username": cassandra_username, + "cassandra_host": self.cassandra_host, + "cassandra_username": self.cassandra_username, + "cassandra_password": self.cassandra_password, } ) @@ -142,9 +156,9 @@ class Processor(AsyncProcessor): ) self.config = Configuration( - host = cassandra_host.split(","), - username = cassandra_username, - password = cassandra_password, + host = self.cassandra_host, + username = self.cassandra_username, + password = self.cassandra_password, keyspace = keyspace, push = self.push ) @@ -276,23 +290,7 @@ class Processor(AsyncProcessor): help=f'Flow response queue {default_flow_response_queue}', ) - parser.add_argument( - '--cassandra-host', - default="cassandra", - help=f'Graph host (default: cassandra)' - ) - - parser.add_argument( - '--cassandra-user', - default=None, - help=f'Cassandra user' - ) - - parser.add_argument( - '--cassandra-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 00b8a7d0..9cb0e1d0 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -11,6 +11,7 @@ import logging from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber from .. base import ConsumerMetrics, ProducerMetrics +from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config from .. schema import KnowledgeRequest, KnowledgeResponse, Error from .. schema import knowledge_request_queue, knowledge_response_queue @@ -49,16 +50,29 @@ class Processor(AsyncProcessor): "knowledge_response_queue", default_knowledge_response_queue ) - cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) + + # Store resolved configuration + self.cassandra_host = hosts + self.cassandra_username = username + self.cassandra_password = password super(Processor, self).__init__( **params | { "knowledge_request_queue": knowledge_request_queue, "knowledge_response_queue": knowledge_response_queue, - "cassandra_host": cassandra_host, - "cassandra_user": cassandra_user, + "cassandra_host": self.cassandra_host, + "cassandra_username": self.cassandra_username, + "cassandra_password": self.cassandra_password, } ) @@ -89,9 +103,9 @@ class Processor(AsyncProcessor): ) self.knowledge = KnowledgeManager( - cassandra_host = cassandra_host.split(","), - cassandra_username = cassandra_username, - cassandra_password = cassandra_password, + cassandra_host = self.cassandra_host, + cassandra_username = self.cassandra_username, + cassandra_password = self.cassandra_password, keyspace = keyspace, flow_config = self, ) @@ -210,23 +224,7 @@ class Processor(AsyncProcessor): help=f'Config response queue {default_knowledge_response_queue}', ) - parser.add_argument( - '--cassandra-host', - default="cassandra", - help=f'Graph host (default: cassandra)' - ) - - parser.add_argument( - '--cassandra-user', - default=None, - help=f'Cassandra user' - ) - - parser.add_argument( - '--cassandra-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 5ab228f9..d1e2ae01 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -11,6 +11,7 @@ import logging from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber from .. base import ConsumerMetrics, ProducerMetrics +from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config from .. schema import LibrarianRequest, LibrarianResponse, Error from .. schema import librarian_request_queue, librarian_response_queue @@ -66,9 +67,21 @@ class Processor(AsyncProcessor): default_minio_secret_key ) - cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") + + # Resolve configuration with environment variable fallback + hosts, username, password = resolve_cassandra_config( + host=cassandra_host, + username=cassandra_username, + password=cassandra_password + ) + + # Store resolved configuration + self.cassandra_host = hosts + self.cassandra_username = username + self.cassandra_password = password super(Processor, self).__init__( **params | { @@ -76,8 +89,9 @@ class Processor(AsyncProcessor): "librarian_response_queue": librarian_response_queue, "minio_host": minio_host, "minio_access_key": minio_access_key, - "cassandra_host": cassandra_host, - "cassandra_username": cassandra_username, + "cassandra_host": self.cassandra_host, + "cassandra_username": self.cassandra_username, + "cassandra_password": self.cassandra_password, } ) @@ -108,9 +122,9 @@ class Processor(AsyncProcessor): ) self.librarian = Librarian( - cassandra_host = cassandra_host.split(","), - cassandra_username = cassandra_username, - cassandra_password = cassandra_password, + cassandra_host = self.cassandra_host, + cassandra_username = self.cassandra_username, + cassandra_password = self.cassandra_password, minio_host = minio_host, minio_access_key = minio_access_key, minio_secret_key = minio_secret_key, @@ -319,23 +333,7 @@ class Processor(AsyncProcessor): f'(default: {default_minio_access_key})', ) - parser.add_argument( - '--cassandra-host', - default="cassandra", - help=f'Graph host (default: cassandra)' - ) - - parser.add_argument( - '--cassandra-user', - default=None, - help=f'Cassandra user' - ) - - parser.add_argument( - '--cassandra-password', - default=None, - help=f'Cassandra password' - ) + add_cassandra_args(parser) def run():