Refactor more Cassandra stuff to use the helper (#490)

This commit is contained in:
cybermaggedon 2025-09-04 12:54:58 +01:00 committed by GitHub
parent 27d657c58d
commit c078ca45cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 63 additions and 69 deletions

View file

@ -15,6 +15,7 @@ from trustgraph.schema import FlowRequest, FlowResponse
from trustgraph.schema import flow_request_queue, flow_response_queue from trustgraph.schema import flow_request_queue, flow_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from . config import Configuration from . config import Configuration
from . flow import FlowConfig from . flow import FlowConfig
@ -60,9 +61,21 @@ class Processor(AsyncProcessor):
"flow_response_queue", default_flow_response_queue "flow_response_queue", default_flow_response_queue
) )
cassandra_host = params.get("cassandra_host", default_cassandra_host) cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username") cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password") cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
id = params.get("id") id = params.get("id")
@ -76,8 +89,9 @@ class Processor(AsyncProcessor):
"config_push_schema": ConfigPush.__name__, "config_push_schema": ConfigPush.__name__,
"flow_request_schema": FlowRequest.__name__, "flow_request_schema": FlowRequest.__name__,
"flow_response_schema": FlowResponse.__name__, "flow_response_schema": FlowResponse.__name__,
"cassandra_host": cassandra_host, "cassandra_host": self.cassandra_host,
"cassandra_username": cassandra_username, "cassandra_username": self.cassandra_username,
"cassandra_password": self.cassandra_password,
} }
) )
@ -142,9 +156,9 @@ class Processor(AsyncProcessor):
) )
self.config = Configuration( self.config = Configuration(
host = cassandra_host.split(","), host = self.cassandra_host,
username = cassandra_username, username = self.cassandra_username,
password = cassandra_password, password = self.cassandra_password,
keyspace = keyspace, keyspace = keyspace,
push = self.push push = self.push
) )
@ -276,23 +290,7 @@ class Processor(AsyncProcessor):
help=f'Flow response queue {default_flow_response_queue}', help=f'Flow response queue {default_flow_response_queue}',
) )
parser.add_argument( add_cassandra_args(parser)
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run(): def run():

View file

@ -11,6 +11,7 @@ import logging
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .. schema import KnowledgeRequest, KnowledgeResponse, Error from .. schema import KnowledgeRequest, KnowledgeResponse, Error
from .. schema import knowledge_request_queue, knowledge_response_queue from .. schema import knowledge_request_queue, knowledge_response_queue
@ -49,16 +50,29 @@ class Processor(AsyncProcessor):
"knowledge_response_queue", default_knowledge_response_queue "knowledge_response_queue", default_knowledge_response_queue
) )
cassandra_host = params.get("cassandra_host", default_cassandra_host) cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username") cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password") cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"knowledge_request_queue": knowledge_request_queue, "knowledge_request_queue": knowledge_request_queue,
"knowledge_response_queue": knowledge_response_queue, "knowledge_response_queue": knowledge_response_queue,
"cassandra_host": cassandra_host, "cassandra_host": self.cassandra_host,
"cassandra_user": cassandra_user, "cassandra_username": self.cassandra_username,
"cassandra_password": self.cassandra_password,
} }
) )
@ -89,9 +103,9 @@ class Processor(AsyncProcessor):
) )
self.knowledge = KnowledgeManager( self.knowledge = KnowledgeManager(
cassandra_host = cassandra_host.split(","), cassandra_host = self.cassandra_host,
cassandra_username = cassandra_username, cassandra_username = self.cassandra_username,
cassandra_password = cassandra_password, cassandra_password = self.cassandra_password,
keyspace = keyspace, keyspace = keyspace,
flow_config = self, flow_config = self,
) )
@ -210,23 +224,7 @@ class Processor(AsyncProcessor):
help=f'Config response queue {default_knowledge_response_queue}', help=f'Config response queue {default_knowledge_response_queue}',
) )
parser.add_argument( add_cassandra_args(parser)
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run(): def run():

View file

@ -11,6 +11,7 @@ import logging
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .. schema import LibrarianRequest, LibrarianResponse, Error from .. schema import LibrarianRequest, LibrarianResponse, Error
from .. schema import librarian_request_queue, librarian_response_queue from .. schema import librarian_request_queue, librarian_response_queue
@ -66,9 +67,21 @@ class Processor(AsyncProcessor):
default_minio_secret_key default_minio_secret_key
) )
cassandra_host = params.get("cassandra_host", default_cassandra_host) cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username") cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password") cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -76,8 +89,9 @@ class Processor(AsyncProcessor):
"librarian_response_queue": librarian_response_queue, "librarian_response_queue": librarian_response_queue,
"minio_host": minio_host, "minio_host": minio_host,
"minio_access_key": minio_access_key, "minio_access_key": minio_access_key,
"cassandra_host": cassandra_host, "cassandra_host": self.cassandra_host,
"cassandra_username": cassandra_username, "cassandra_username": self.cassandra_username,
"cassandra_password": self.cassandra_password,
} }
) )
@ -108,9 +122,9 @@ class Processor(AsyncProcessor):
) )
self.librarian = Librarian( self.librarian = Librarian(
cassandra_host = cassandra_host.split(","), cassandra_host = self.cassandra_host,
cassandra_username = cassandra_username, cassandra_username = self.cassandra_username,
cassandra_password = cassandra_password, cassandra_password = self.cassandra_password,
minio_host = minio_host, minio_host = minio_host,
minio_access_key = minio_access_key, minio_access_key = minio_access_key,
minio_secret_key = minio_secret_key, minio_secret_key = minio_secret_key,
@ -319,23 +333,7 @@ class Processor(AsyncProcessor):
f'(default: {default_minio_access_key})', f'(default: {default_minio_access_key})',
) )
parser.add_argument( add_cassandra_args(parser)
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run(): def run():