diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 1fadaab3..822dba25 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -17,7 +17,6 @@ from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error from .... schema import Term, Triple, IRI, LITERAL, TRIPLE, BLANK from .... base import TriplesQueryService from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config -from .... tables.cassandra_async import async_execute # Module logger logger = logging.getLogger(__name__) @@ -348,7 +347,12 @@ class Processor(TriplesQueryService): cql = f"SELECT d, s, p, o, otype, dtype, lang FROM {tg.collection_table} WHERE collection = %s" params = [query.collection] statement = SimpleStatement(cql, fetch_size=batch_size) - result_set = await async_execute(tg.session, statement, params) + # async_execute only materialises the first page; + # this query needs all pages, so use sync execute + # in a worker thread where page iteration can block. + result_set = await asyncio.to_thread( + lambda: list(tg.session.execute(statement, params)) + ) else: async for batch, is_final in self._fallback_stream(workspace, query, batch_size):