Fixing more Cassandra consistency issues (#488)

* Fixing more Cassandra work

* Fix tests
This commit is contained in:
cybermaggedon 2025-09-04 00:58:11 +01:00 committed by GitHub
parent ccaec88a72
commit 85e669c763
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 196 additions and 208 deletions

View file

@ -45,13 +45,13 @@ class Configuration:
# FIXME: Some version vs config race conditions
def __init__(self, push, host, user, password, keyspace):
def __init__(self, push, host, username, password, keyspace):
# External function to respond to update
self.push = push
self.table_store = ConfigTableStore(
host, user, password, keyspace
host, username, password, keyspace
)
async def inc_version(self):

View file

@ -61,7 +61,7 @@ class Processor(AsyncProcessor):
)
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
id = params.get("id")
@ -77,7 +77,7 @@ class Processor(AsyncProcessor):
"flow_request_schema": FlowRequest.__name__,
"flow_response_schema": FlowResponse.__name__,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
"cassandra_username": cassandra_username,
}
)
@ -143,7 +143,7 @@ class Processor(AsyncProcessor):
self.config = Configuration(
host = cassandra_host.split(","),
user = cassandra_user,
username = cassandra_username,
password = cassandra_password,
keyspace = keyspace,
push = self.push

View file

@ -16,12 +16,12 @@ logger = logging.getLogger(__name__)
class KnowledgeManager:
def __init__(
self, cassandra_host, cassandra_user, cassandra_password,
self, cassandra_host, cassandra_username, cassandra_password,
keyspace, flow_config,
):
self.table_store = KnowledgeTableStore(
cassandra_host, cassandra_user, cassandra_password, keyspace
cassandra_host, cassandra_username, cassandra_password, keyspace
)
self.loader_queue = asyncio.Queue(maxsize=20)

View file

@ -50,7 +50,7 @@ class Processor(AsyncProcessor):
)
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
super(Processor, self).__init__(
@ -90,7 +90,7 @@ class Processor(AsyncProcessor):
self.knowledge = KnowledgeManager(
cassandra_host = cassandra_host.split(","),
cassandra_user = cassandra_user,
cassandra_username = cassandra_username,
cassandra_password = cassandra_password,
keyspace = keyspace,
flow_config = self,

View file

@ -16,7 +16,7 @@ class Librarian:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password,
cassandra_host, cassandra_username, cassandra_password,
minio_host, minio_access_key, minio_secret_key,
bucket_name, keyspace, load_document,
):
@ -26,7 +26,7 @@ class Librarian:
)
self.table_store = LibraryTableStore(
cassandra_host, cassandra_user, cassandra_password, keyspace
cassandra_host, cassandra_username, cassandra_password, keyspace
)
self.load_document = load_document

View file

@ -67,7 +67,7 @@ class Processor(AsyncProcessor):
)
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
super(Processor, self).__init__(
@ -77,7 +77,7 @@ class Processor(AsyncProcessor):
"minio_host": minio_host,
"minio_access_key": minio_access_key,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
"cassandra_username": cassandra_username,
}
)
@ -109,7 +109,7 @@ class Processor(AsyncProcessor):
self.librarian = Librarian(
cassandra_host = cassandra_host.split(","),
cassandra_user = cassandra_user,
cassandra_username = cassandra_username,
cassandra_password = cassandra_password,
minio_host = minio_host,
minio_access_key = minio_access_key,

View file

@ -21,12 +21,12 @@ from strawberry.tools import create_type
from .... schema import ObjectsQueryRequest, ObjectsQueryResponse, GraphQLError
from .... schema import Error, RowSchema, Field as SchemaField
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
# Module logger
logger = logging.getLogger(__name__)
default_ident = "objects-query"
default_graph_host = 'localhost'
# GraphQL filter input types
@strawberry.input
@ -68,10 +68,22 @@ class Processor(FlowProcessor):
id = params.get("id", default_ident)
# Cassandra connection parameters
self.graph_host = params.get("graph_host", default_graph_host)
self.graph_username = params.get("graph_username", None)
self.graph_password = params.get("graph_password", None)
# Get Cassandra parameters
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
)
# Store resolved configuration with proper names
self.cassandra_host = hosts # Store as list
self.cassandra_username = username
self.cassandra_password = password
# Config key for schemas
self.config_key = params.get("config_type", "schema")
@ -124,20 +136,20 @@ class Processor(FlowProcessor):
return
try:
if self.graph_username and self.graph_password:
if self.cassandra_username and self.cassandra_password:
auth_provider = PlainTextAuthProvider(
username=self.graph_username,
password=self.graph_password
username=self.cassandra_username,
password=self.cassandra_password
)
self.cluster = Cluster(
contact_points=[self.graph_host],
contact_points=self.cassandra_host,
auth_provider=auth_provider
)
else:
self.cluster = Cluster(contact_points=[self.graph_host])
self.cluster = Cluster(contact_points=self.cassandra_host)
self.session = self.cluster.connect()
logger.info(f"Connected to Cassandra cluster at {self.graph_host}")
logger.info(f"Connected to Cassandra cluster at {self.cassandra_host}")
except Exception as e:
logger.error(f"Failed to connect to Cassandra: {e}", exc_info=True)
@ -712,24 +724,7 @@ class Processor(FlowProcessor):
"""Add command-line arguments"""
FlowProcessor.add_args(parser)
parser.add_argument(
'-g', '--graph-host',
default=default_graph_host,
help=f'Cassandra host (default: {default_graph_host})'
)
parser.add_argument(
'--graph-username',
default=None,
help='Cassandra username'
)
parser.add_argument(
'--graph-password',
default=None,
help='Cassandra password'
)
add_cassandra_args(parser)
parser.add_argument(
'--config-type',

View file

@ -22,10 +22,10 @@ class Processor(TriplesQueryService):
def __init__(self, **params):
# Use new parameter names, fall back to old for compatibility
cassandra_host = params.get("cassandra_host", params.get("graph_host"))
cassandra_username = params.get("cassandra_username", params.get("graph_username"))
cassandra_password = params.get("cassandra_password", params.get("graph_password"))
# Get Cassandra parameters
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
@ -41,9 +41,9 @@ class Processor(TriplesQueryService):
}
)
self.graph_host = hosts
self.username = username
self.password = password
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
self.table = None
def create_value(self, ent):
@ -59,15 +59,15 @@ class Processor(TriplesQueryService):
table = (query.user, query.collection)
if table != self.table:
if self.username and self.password:
if self.cassandra_username and self.cassandra_password:
self.tg = TrustGraph(
hosts=self.graph_host,
hosts=self.cassandra_host,
keyspace=query.user, table=query.collection,
username=self.username, password=self.password
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = TrustGraph(
hosts=self.graph_host,
hosts=self.cassandra_host,
keyspace=query.user, table=query.collection,
)
self.table = table

View file

@ -25,7 +25,7 @@ class Processor(FlowProcessor):
# Use helper to resolve configuration
hosts, username, password = resolve_cassandra_config(
host=params.get("cassandra_host"),
username=params.get("cassandra_user", params.get("cassandra_username")),
username=params.get("cassandra_username"),
password=params.get("cassandra_password")
)
@ -55,7 +55,7 @@ class Processor(FlowProcessor):
self.table_store = KnowledgeTableStore(
cassandra_host = hosts,
cassandra_user = username,
cassandra_username = username,
cassandra_password = password,
keyspace = keyspace,
)

View file

@ -27,10 +27,10 @@ class Processor(FlowProcessor):
id = params.get("id", default_ident)
# Use new parameter names, fall back to old for compatibility
cassandra_host = params.get("cassandra_host", params.get("graph_host"))
cassandra_username = params.get("cassandra_username", params.get("graph_username"))
cassandra_password = params.get("cassandra_password", params.get("graph_password"))
# Get Cassandra parameters
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
@ -39,10 +39,10 @@ class Processor(FlowProcessor):
password=cassandra_password
)
# Store resolved configuration
self.graph_host = hosts # Store as list
self.graph_username = username
self.graph_password = password
# Store resolved configuration with proper names
self.cassandra_host = hosts # Store as list
self.cassandra_username = username
self.cassandra_password = password
# Config key for schemas
self.config_key = params.get("config_type", "schema")
@ -82,20 +82,20 @@ class Processor(FlowProcessor):
return
try:
if self.graph_username and self.graph_password:
if self.cassandra_username and self.cassandra_password:
auth_provider = PlainTextAuthProvider(
username=self.graph_username,
password=self.graph_password
username=self.cassandra_username,
password=self.cassandra_password
)
self.cluster = Cluster(
contact_points=self.graph_host,
contact_points=self.cassandra_host,
auth_provider=auth_provider
)
else:
self.cluster = Cluster(contact_points=self.graph_host)
self.cluster = Cluster(contact_points=self.cassandra_host)
self.session = self.cluster.connect()
logger.info(f"Connected to Cassandra cluster at {self.graph_host}")
logger.info(f"Connected to Cassandra cluster at {self.cassandra_host}")
except Exception as e:
logger.error(f"Failed to connect to Cassandra: {e}", exc_info=True)

View file

@ -36,10 +36,10 @@ class Processor(Consumer):
input_queue = params.get("input_queue", default_input_queue)
subscriber = params.get("subscriber", default_subscriber)
# Use new parameter names, fall back to old for compatibility
cassandra_host = params.get("cassandra_host", params.get("graph_host"))
cassandra_username = params.get("cassandra_username", params.get("graph_username"))
cassandra_password = params.get("cassandra_password", params.get("graph_password"))
# Get Cassandra parameters
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(

View file

@ -26,10 +26,10 @@ class Processor(TriplesStoreService):
id = params.get("id", default_ident)
# Use new parameter names, fall back to old for compatibility
cassandra_host = params.get("cassandra_host", params.get("graph_host"))
cassandra_username = params.get("cassandra_username", params.get("graph_username"))
cassandra_password = params.get("cassandra_password", params.get("graph_password"))
# Get Cassandra parameters
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
@ -45,9 +45,9 @@ class Processor(TriplesStoreService):
}
)
self.graph_host = hosts
self.username = username
self.password = password
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
self.table = None
async def store_triples(self, message):
@ -59,16 +59,16 @@ class Processor(TriplesStoreService):
self.tg = None
try:
if self.username and self.password:
if self.cassandra_username and self.cassandra_password:
self.tg = TrustGraph(
hosts=self.graph_host,
hosts=self.cassandra_host,
keyspace=message.metadata.user,
table=message.metadata.collection,
username=self.username, password=self.password
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = TrustGraph(
hosts=self.graph_host,
hosts=self.cassandra_host,
keyspace=message.metadata.user,
table=message.metadata.collection,
)

View file

@ -17,7 +17,7 @@ class ConfigTableStore:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password, keyspace,
cassandra_host, cassandra_username, cassandra_password, keyspace,
):
self.keyspace = keyspace
@ -28,10 +28,10 @@ class ConfigTableStore:
if isinstance(cassandra_host, str):
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_user and cassandra_password:
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_user, password=cassandra_password
username=cassandra_username, password=cassandra_password
)
self.cluster = Cluster(
cassandra_host,

View file

@ -17,7 +17,7 @@ class KnowledgeTableStore:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password, keyspace,
cassandra_host, cassandra_username, cassandra_password, keyspace,
):
self.keyspace = keyspace
@ -28,10 +28,10 @@ class KnowledgeTableStore:
if isinstance(cassandra_host, str):
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_user and cassandra_password:
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_user, password=cassandra_password
username=cassandra_username, password=cassandra_password
)
self.cluster = Cluster(
cassandra_host,

View file

@ -21,7 +21,7 @@ class LibraryTableStore:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password, keyspace,
cassandra_host, cassandra_username, cassandra_password, keyspace,
):
self.keyspace = keyspace
@ -32,10 +32,10 @@ class LibraryTableStore:
if isinstance(cassandra_host, str):
cassandra_host = [h.strip() for h in cassandra_host.split(',')]
if cassandra_user and cassandra_password:
if cassandra_username and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_user, password=cassandra_password
username=cassandra_username, password=cassandra_password
)
self.cluster = Cluster(
cassandra_host,