From 09930bdb86e1f807b3f6abd7d9131d6f68b69207 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 6 Oct 2025 17:45:02 +0100 Subject: [PATCH] Fix deletion error in Cassandra object store (#546) * Iteration algorithm modified to look for tables using schema definitions rather than iterate over all tables --- .../storage/objects/cassandra/write.py | 94 +++++++++++++------ 1 file changed, 66 insertions(+), 28 deletions(-) diff --git a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py index 053dbcb2..e9dda4d6 100644 --- a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py @@ -521,7 +521,7 @@ class Processor(FlowProcessor): logger.info(f"Collection {collection} ready for user {user} (using keyspace {safe_keyspace})") async def delete_collection(self, user: str, collection: str): - """Delete all data for a specific collection""" + """Delete all data for a specific collection using schema information""" # Connect if not already connected self.connect_cassandra() @@ -541,40 +541,78 @@ class Processor(FlowProcessor): return self.known_keyspaces.add(safe_keyspace) - # Get all tables in the keyspace that might contain collection data - get_tables_cql = """ - SELECT table_name FROM system_schema.tables - WHERE keyspace_name = %s - """ - - tables = self.session.execute(get_tables_cql, (safe_keyspace,)) + # Iterate over schemas we manage to delete from relevant tables tables_deleted = 0 - for row in tables: - table_name = row.table_name + for schema_name, schema in self.schemas.items(): + safe_table = self.sanitize_table(schema_name) - # Check if the table has a collection column - check_column_cql = """ - SELECT column_name FROM system_schema.columns - WHERE keyspace_name = %s AND table_name = %s AND column_name = 'collection' - """ + # Check if table exists + table_key = f"{user}.{schema_name}" + if table_key not in self.known_tables.get(user, set()): + logger.debug(f"Table {safe_keyspace}.{safe_table} not in known tables, skipping") + continue - result = self.session.execute(check_column_cql, (safe_keyspace, table_name)) - if result.one(): - # Table has collection column, delete data for this collection - try: - delete_cql = f""" - DELETE FROM {safe_keyspace}.{table_name} + try: + # Get primary key fields from schema + primary_key_fields = [field for field in schema.fields if field.primary] + + if primary_key_fields: + # Schema has primary keys: need to query for partition keys first + # Build SELECT query for primary key fields + pk_field_names = [self.sanitize_name(field.name) for field in primary_key_fields] + select_cql = f""" + SELECT {', '.join(pk_field_names)} + FROM {safe_keyspace}.{safe_table} WHERE collection = %s + ALLOW FILTERING """ - self.session.execute(delete_cql, (collection,)) - tables_deleted += 1 - logger.info(f"Deleted collection {collection} from table {safe_keyspace}.{table_name}") - except Exception as e: - logger.error(f"Failed to delete from table {safe_keyspace}.{table_name}: {e}") - raise - logger.info(f"Deleted collection {collection} from {tables_deleted} tables in keyspace {safe_keyspace}") + rows = self.session.execute(select_cql, (collection,)) + + # Delete each row using full partition key + for row in rows: + where_clauses = ["collection = %s"] + values = [collection] + + for field_name in pk_field_names: + where_clauses.append(f"{field_name} = %s") + values.append(getattr(row, field_name)) + + delete_cql = f""" + DELETE FROM {safe_keyspace}.{safe_table} + WHERE {' AND '.join(where_clauses)} + """ + + self.session.execute(delete_cql, tuple(values)) + else: + # No primary keys, uses synthetic_id + # Need to query for synthetic_ids first + select_cql = f""" + SELECT synthetic_id + FROM {safe_keyspace}.{safe_table} + WHERE collection = %s + ALLOW FILTERING + """ + + rows = self.session.execute(select_cql, (collection,)) + + # Delete each row using collection and synthetic_id + for row in rows: + delete_cql = f""" + DELETE FROM {safe_keyspace}.{safe_table} + WHERE collection = %s AND synthetic_id = %s + """ + self.session.execute(delete_cql, (collection, row.synthetic_id)) + + tables_deleted += 1 + logger.info(f"Deleted collection {collection} from table {safe_keyspace}.{safe_table}") + + except Exception as e: + logger.error(f"Failed to delete from table {safe_keyspace}.{safe_table}: {e}") + raise + + logger.info(f"Deleted collection {collection} from {tables_deleted} schema-based tables in keyspace {safe_keyspace}") def close(self): """Clean up Cassandra connections"""