diff --git a/trustgraph-base/trustgraph/messaging/translators/library.py b/trustgraph-base/trustgraph/messaging/translators/library.py index 333a803a..8d7cc930 100644 --- a/trustgraph-base/trustgraph/messaging/translators/library.py +++ b/trustgraph-base/trustgraph/messaging/translators/library.py @@ -174,4 +174,6 @@ class LibraryResponseTranslator(MessageTranslator): def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]: """Returns (response_dict, is_final)""" - return self.from_pulsar(obj), True + # For streaming responses, check end_of_stream to determine if this is the final message + is_final = getattr(obj, 'end_of_stream', True) + return self.from_pulsar(obj), is_final diff --git a/trustgraph-base/trustgraph/schema/services/library.py b/trustgraph-base/trustgraph/schema/services/library.py index 6dcdee1a..37a3f6ad 100644 --- a/trustgraph-base/trustgraph/schema/services/library.py +++ b/trustgraph-base/trustgraph/schema/services/library.py @@ -212,6 +212,9 @@ class LibrarianResponse: # list-uploads response upload_sessions: list[UploadSession] = field(default_factory=list) + # stream-document response - indicates final chunk in stream + end_of_stream: bool = False + # FIXME: Is this right? Using persistence on librarian so that # message chunking works diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py new file mode 100644 index 00000000..e70bf6de --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_stream.py @@ -0,0 +1,65 @@ + +import asyncio +import uuid +import logging +from . librarian import LibrarianRequestor + +# Module logger +logger = logging.getLogger(__name__) + +class DocumentStreamExport: + + def __init__(self, backend): + self.backend = backend + + async def process(self, data, error, ok, request): + + user = request.query.get("user") + document_id = request.query.get("document-id") + chunk_size = int(request.query.get("chunk-size", 1024 * 1024)) + + if not user or not document_id: + return await error("Missing required parameters: user, document-id") + + response = await ok() + + lr = LibrarianRequestor( + backend=self.backend, + consumer="api-gateway-doc-stream-" + str(uuid.uuid4()), + subscriber="api-gateway-doc-stream-" + str(uuid.uuid4()), + ) + + try: + + await lr.start() + + async def responder(resp, fin): + if "content" in resp: + content = resp["content"] + # Content is base64 encoded, write as-is for client to decode + # Or decode here and write raw bytes + import base64 + chunk_data = base64.b64decode(content) + await response.write(chunk_data) + + await lr.process( + { + "operation": "stream-document", + "user": user, + "document-id": document_id, + "chunk-size": chunk_size, + }, + responder + ) + + except Exception as e: + + logger.error(f"Document stream exception: {e}", exc_info=True) + + finally: + + await lr.stop() + + await response.write_eof() + + return response diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 35edad76..d068ecef 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -45,6 +45,7 @@ from . rows_import import RowsImport from . core_export import CoreExport from . core_import import CoreImport +from . document_stream import DocumentStreamExport from . mux import Mux @@ -135,6 +136,14 @@ class DispatcherManager: def dispatch_core_import(self): return DispatcherWrapper(self.process_core_import) + def dispatch_document_stream(self): + return DispatcherWrapper(self.process_document_stream) + + async def process_document_stream(self, data, error, ok, request): + + ds = DocumentStreamExport(self.backend) + return await ds.process(data, error, ok, request) + async def process_core_import(self, data, error, ok, request): ci = CoreImport(self.backend) diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index f9616d9a..ff92dca2 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -64,6 +64,12 @@ class EndpointManager: method = "GET", dispatcher = dispatcher_manager.dispatch_core_export(), ), + StreamEndpoint( + endpoint_path = "/api/v1/document-stream", + auth = auth, + method = "GET", + dispatcher = dispatcher_manager.dispatch_document_stream(), + ), ] def add_routes(self, app): diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 2497e4a2..aecc1289 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -654,14 +654,13 @@ class Librarian: """ Stream document content in chunks. - This operation returns document content in smaller chunks, allowing - memory-efficient processing of large documents. The response includes - chunk information for reassembly. + This is an async generator that yields document content in smaller chunks, + allowing memory-efficient processing of large documents. Each yielded + response includes chunk information and an end_of_stream flag. - Note: This operation returns a single chunk at a time. Clients should - call repeatedly with increasing chunk_index until all chunks are received. + The final chunk will have end_of_stream=True. """ - logger.debug(f"Streaming document {request.document_id}, chunk {request.chunk_index}") + logger.debug(f"Streaming document {request.document_id}") DEFAULT_CHUNK_SIZE = 1024 * 1024 # 1MB default @@ -680,29 +679,29 @@ class Librarian: total_size = await self.blob_store.get_size(object_id) total_chunks = math.ceil(total_size / chunk_size) - if request.chunk_index >= total_chunks: - raise RequestError( - f"Invalid chunk index {request.chunk_index}, " - f"document has {total_chunks} chunks" + # Stream all chunks + for chunk_index in range(total_chunks): + # Calculate byte range + offset = chunk_index * chunk_size + length = min(chunk_size, total_size - offset) + + # Fetch only the requested range + chunk_content = await self.blob_store.get_range(object_id, offset, length) + + is_last_chunk = (chunk_index == total_chunks - 1) + + logger.debug(f"Streaming chunk {chunk_index}/{total_chunks}, " + f"bytes {offset}-{offset + length} of {total_size}, " + f"end_of_stream={is_last_chunk}") + + yield LibrarianResponse( + error=None, + content=base64.b64encode(chunk_content), + chunk_index=chunk_index, + chunks_received=chunk_index + 1, + total_chunks=total_chunks, + bytes_received=offset + length, + total_bytes=total_size, + end_of_stream=is_last_chunk, ) - # Calculate byte range - offset = request.chunk_index * chunk_size - length = min(chunk_size, total_size - offset) - - # Fetch only the requested range - chunk_content = await self.blob_store.get_range(object_id, offset, length) - - logger.debug(f"Returning chunk {request.chunk_index}/{total_chunks}, " - f"bytes {offset}-{offset + length} of {total_size}") - - return LibrarianResponse( - error=None, - content=base64.b64encode(chunk_content), - chunk_index=request.chunk_index, - chunks_received=1, # Using as "current chunk" indicator - total_chunks=total_chunks, - bytes_received=offset + length, - total_bytes=total_size, - ) - diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 9f92d9fb..9f24bbc7 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -504,6 +504,15 @@ class Processor(AsyncProcessor): try: + # Handle streaming operations specially + if v.operation == "stream-document": + async for resp in self.librarian.stream_document(v): + await self.librarian_response_producer.send( + resp, properties={"id": id} + ) + return + + # Non-streaming operations resp = await self.process_request(v) await self.librarian_response_producer.send( @@ -517,7 +526,8 @@ class Processor(AsyncProcessor): error = Error( type = "request-error", message = str(e), - ) + ), + end_of_stream = True, ) await self.librarian_response_producer.send( @@ -530,7 +540,8 @@ class Processor(AsyncProcessor): error = Error( type = "unexpected-error", message = str(e), - ) + ), + end_of_stream = True, ) await self.librarian_response_producer.send(