Fixed error only returning a page of results (#921)

The root cause: async_execute only materialises the first result
page (by design — it says so in its docstring). The streaming query
set fetch_size=20 and expected to iterate all results, but only got
the first 20 rows back.

The fix uses
  asyncio.to_thread(lambda: list(tg.session.execute(...)))
which lets the sync driver iterate
all pages in a worker thread — exactly what the pre-async code did.
This commit is contained in:
cybermaggedon 2026-05-14 21:03:09 +01:00 committed by GitHub
parent a2dde9cafb
commit 846282c375
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -17,7 +17,6 @@ from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Term, Triple, IRI, LITERAL, TRIPLE, BLANK from .... schema import Term, Triple, IRI, LITERAL, TRIPLE, BLANK
from .... base import TriplesQueryService from .... base import TriplesQueryService
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .... tables.cassandra_async import async_execute
# Module logger # Module logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -348,7 +347,12 @@ class Processor(TriplesQueryService):
cql = f"SELECT d, s, p, o, otype, dtype, lang FROM {tg.collection_table} WHERE collection = %s" cql = f"SELECT d, s, p, o, otype, dtype, lang FROM {tg.collection_table} WHERE collection = %s"
params = [query.collection] params = [query.collection]
statement = SimpleStatement(cql, fetch_size=batch_size) statement = SimpleStatement(cql, fetch_size=batch_size)
result_set = await async_execute(tg.session, statement, params) # async_execute only materialises the first page;
# this query needs all pages, so use sync execute
# in a worker thread where page iteration can block.
result_set = await asyncio.to_thread(
lambda: list(tg.session.execute(statement, params))
)
else: else:
async for batch, is_final in self._fallback_stream(workspace, query, batch_size): async for batch, is_final in self._fallback_stream(workspace, query, batch_size):