diff --git a/trustgraph-flow/trustgraph/direct/cassandra.py b/trustgraph-flow/trustgraph/direct/cassandra.py index 568411a9..d06b270f 100644 --- a/trustgraph-flow/trustgraph/direct/cassandra.py +++ b/trustgraph-flow/trustgraph/direct/cassandra.py @@ -6,7 +6,7 @@ class TrustGraph: def __init__( self, hosts=None, - keyspace="trustgraph", table="default", + keyspace="trustgraph", table="default", username=None, password=None ): if hosts is None: @@ -14,8 +14,13 @@ class TrustGraph: self.keyspace = keyspace self.table = table + self.username = username - self.cluster = Cluster(hosts) + if username and password: + auth_provider = PlainTextAuthProvider(username=username, password=password) + self.cluster = Cluster(hosts, auth_provider=auth_provider) + else: + self.cluster = Cluster(hosts) self.session = self.cluster.connect() self.init() diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 4245784d..22fbf84d 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -26,6 +26,8 @@ class Processor(ConsumerProducer): output_queue = params.get("output_queue", default_output_queue) subscriber = params.get("subscriber", default_subscriber) graph_host = params.get("graph_host", default_graph_host) + graph_username = params.get("graph_username", None) + graph_password = params.get("graph_password", None) super(Processor, self).__init__( **params | { @@ -35,10 +37,14 @@ class Processor(ConsumerProducer): "input_schema": TriplesQueryRequest, "output_schema": TriplesQueryResponse, "graph_host": graph_host, + "graph_username": graph_username, + "graph_password": graph_password, } ) self.graph_host = [graph_host] + self.username = graph_username + self.password = graph_password self.table = None def create_value(self, ent): @@ -56,10 +62,17 @@ class Processor(ConsumerProducer): table = (v.user, v.collection) if table != self.table: - self.tg = TrustGraph( - hosts=self.graph_host, - keyspace=v.user, table=v.collection, - ) + if self.username and self.password: + self.tg = TrustGraph( + hosts=self.graph_host, + keyspace=v.user, table=v.collection, + username=self.username, password=self.password + ) + else: + self.tg = TrustGraph( + hosts=self.graph_host, + keyspace=v.user, table=v.collection, + ) self.table = table # Sender-produced ID @@ -176,6 +189,19 @@ class Processor(ConsumerProducer): default="localhost", help=f'Graph host (default: localhost)' ) + + parser.add_argument( + '--graph-username', + default=None, + help=f'Cassandra username' + ) + + parser.add_argument( + '--graph-password', + default=None, + help=f'Cassandra password' + ) + def run(): diff --git a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py index d44864fe..fc8f6686 100755 --- a/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/rows/cassandra/write.py @@ -29,6 +29,8 @@ class Processor(Consumer): input_queue = params.get("input_queue", default_input_queue) subscriber = params.get("subscriber", default_subscriber) graph_host = params.get("graph_host", default_graph_host) + graph_username = params.get("graph_username", None) + graph_password = params.get("graph_password", None) super(Processor, self).__init__( **params | { @@ -36,10 +38,16 @@ class Processor(Consumer): "subscriber": subscriber, "input_schema": Rows, "graph_host": graph_host, + "graph_username": graph_username, + "graph_password": graph_password, } ) - - self.cluster = Cluster(graph_host.split(",")) + + if graph_username and graph_password: + auth_provider = PlainTextAuthProvider(username=graph_username, password=graph_password) + self.cluster = Cluster(graph_host.split(","), auth_provider=auth_provider) + else: + self.cluster = Cluster(graph_host.split(",")) self.session = self.cluster.connect() self.tables = set() @@ -120,6 +128,18 @@ class Processor(Consumer): default="localhost", help=f'Graph host (default: localhost)' ) + + parser.add_argument( + '--graph-username', + default=None, + help=f'Cassandra username' + ) + + parser.add_argument( + '--graph-password', + default=None, + help=f'Cassandra password' + ) def run(): diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index e7078e08..d940d0ec 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -28,6 +28,8 @@ class Processor(Consumer): input_queue = params.get("input_queue", default_input_queue) subscriber = params.get("subscriber", default_subscriber) graph_host = params.get("graph_host", default_graph_host) + graph_username = params.get("graph_username", None) + graph_password = params.get("graph_password", None) super(Processor, self).__init__( **params | { @@ -35,10 +37,14 @@ class Processor(Consumer): "subscriber": subscriber, "input_schema": Triples, "graph_host": graph_host, + "graph_username": graph_username, + "graph_password": graph_password, } ) - + self.graph_host = [graph_host] + self.username = graph_username + self.password = graph_password self.table = None def handle(self, msg): @@ -52,10 +58,17 @@ class Processor(Consumer): self.tg = None try: - self.tg = TrustGraph( - hosts=self.graph_host, - keyspace=v.metadata.user, table=v.metadata.collection, - ) + if self.username and self.password: + self.tg = TrustGraph( + hosts=self.graph_host, + keyspace=v.metadata.user, table=v.metadata.collection, + username=self.username, password=self.password + ) + else: + self.tg = TrustGraph( + hosts=self.graph_host, + keyspace=v.metadata.user, table=v.metadata.collection, + ) except Exception as e: print("Exception", e, flush=True) time.sleep(1) @@ -82,6 +95,18 @@ class Processor(Consumer): default="localhost", help=f'Graph host (default: localhost)' ) + + parser.add_argument( + '--graph-username', + default=None, + help=f'Cassandra username' + ) + + parser.add_argument( + '--graph-password', + default=None, + help=f'Cassandra password' + ) def run():