From 846282c37521fd75fe1849658710c37d33aa43b1 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Thu, 14 May 2026 21:03:09 +0100 Subject: [PATCH] Fixed error only returning a page of results (#921) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root cause: async_execute only materialises the first result page (by design — it says so in its docstring). The streaming query set fetch_size=20 and expected to iterate all results, but only got the first 20 rows back. The fix uses asyncio.to_thread(lambda: list(tg.session.execute(...))) which lets the sync driver iterate all pages in a worker thread — exactly what the pre-async code did. --- .../trustgraph/query/triples/cassandra/service.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 1fadaab3..822dba25 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -17,7 +17,6 @@ from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error from .... schema import Term, Triple, IRI, LITERAL, TRIPLE, BLANK from .... base import TriplesQueryService from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config -from .... tables.cassandra_async import async_execute # Module logger logger = logging.getLogger(__name__) @@ -348,7 +347,12 @@ class Processor(TriplesQueryService): cql = f"SELECT d, s, p, o, otype, dtype, lang FROM {tg.collection_table} WHERE collection = %s" params = [query.collection] statement = SimpleStatement(cql, fetch_size=batch_size) - result_set = await async_execute(tg.session, statement, params) + # async_execute only materialises the first page; + # this query needs all pages, so use sync execute + # in a worker thread where page iteration can block. + result_set = await asyncio.to_thread( + lambda: list(tg.session.execute(statement, params)) + ) else: async for batch, is_final in self._fallback_stream(workspace, query, batch_size):