diff --git a/trustgraph-base/trustgraph/messaging/translators/library.py b/trustgraph-base/trustgraph/messaging/translators/library.py index 8d7cc930..c7e849aa 100644 --- a/trustgraph-base/trustgraph/messaging/translators/library.py +++ b/trustgraph-base/trustgraph/messaging/translators/library.py @@ -174,6 +174,4 @@ class LibraryResponseTranslator(MessageTranslator): def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]: """Returns (response_dict, is_final)""" - # For streaming responses, check end_of_stream to determine if this is the final message - is_final = getattr(obj, 'end_of_stream', True) - return self.from_pulsar(obj), is_final + return self.from_pulsar(obj), obj.is_final diff --git a/trustgraph-base/trustgraph/schema/services/library.py b/trustgraph-base/trustgraph/schema/services/library.py index 37a3f6ad..f1ab360f 100644 --- a/trustgraph-base/trustgraph/schema/services/library.py +++ b/trustgraph-base/trustgraph/schema/services/library.py @@ -212,8 +212,10 @@ class LibrarianResponse: # list-uploads response upload_sessions: list[UploadSession] = field(default_factory=list) - # stream-document response - indicates final chunk in stream - end_of_stream: bool = False + # Protocol flag: True if this is the final response for a request. + # Default True since most operations are single request/response. + # Only stream-document sets False for intermediate chunks. + is_final: bool = True # FIXME: Is this right? Using persistence on librarian so that # message chunking works diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 530e448e..8d7ce569 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -37,6 +37,7 @@ tg-dump-msgpack = "trustgraph.cli.dump_msgpack:main" tg-dump-queues = "trustgraph.cli.dump_queues:main" tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main" +tg-get-document-content = "trustgraph.cli.get_document_content:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main" tg-invoke-agent = "trustgraph.cli.invoke_agent:main" diff --git a/trustgraph-cli/trustgraph/cli/get_document_content.py b/trustgraph-cli/trustgraph/cli/get_document_content.py new file mode 100644 index 00000000..3d70f37d --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/get_document_content.py @@ -0,0 +1,87 @@ +""" +Gets document content from the library by document ID. +""" + +import argparse +import os +import sys +from trustgraph.api import Api + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) +default_user = "trustgraph" + +def get_content(url, user, document_id, output_file, token=None): + + api = Api(url, token=token).library() + + content = api.get_document_content(user=user, id=document_id) + + if output_file: + with open(output_file, 'wb') as f: + f.write(content) + print(f"Written {len(content)} bytes to {output_file}") + else: + # Write to stdout + # Try to decode as text, fall back to binary info + try: + text = content.decode('utf-8') + print(text) + except UnicodeDecodeError: + print(f"Binary content: {len(content)} bytes", file=sys.stderr) + sys.stdout.buffer.write(content) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-get-document-content', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-t', '--token', + default=default_token, + help='Authentication token (default: $TRUSTGRAPH_TOKEN)', + ) + + parser.add_argument( + '-U', '--user', + default=default_user, + help=f'User ID (default: {default_user})' + ) + + parser.add_argument( + '-o', '--output', + default=None, + help='Output file (default: stdout)' + ) + + parser.add_argument( + 'document_id', + help='Document ID (IRI) to retrieve', + ) + + args = parser.parse_args() + + try: + + get_content( + url=args.api_url, + user=args.user, + document_id=args.document_id, + output_file=args.output, + token=args.token, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +if __name__ == "__main__": + main() diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index aecc1289..4944835e 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -656,9 +656,8 @@ class Librarian: This is an async generator that yields document content in smaller chunks, allowing memory-efficient processing of large documents. Each yielded - response includes chunk information and an end_of_stream flag. - - The final chunk will have end_of_stream=True. + response includes chunk_index and total_chunks for tracking progress. + Completion is determined by chunk_index reaching total_chunks - 1. """ logger.debug(f"Streaming document {request.document_id}") @@ -688,11 +687,10 @@ class Librarian: # Fetch only the requested range chunk_content = await self.blob_store.get_range(object_id, offset, length) - is_last_chunk = (chunk_index == total_chunks - 1) + is_last = (chunk_index == total_chunks - 1) - logger.debug(f"Streaming chunk {chunk_index}/{total_chunks}, " - f"bytes {offset}-{offset + length} of {total_size}, " - f"end_of_stream={is_last_chunk}") + logger.debug(f"Streaming chunk {chunk_index + 1}/{total_chunks}, " + f"bytes {offset}-{offset + length} of {total_size}") yield LibrarianResponse( error=None, @@ -702,6 +700,6 @@ class Librarian: total_chunks=total_chunks, bytes_received=offset + length, total_bytes=total_size, - end_of_stream=is_last_chunk, + is_final=is_last, ) diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 9f24bbc7..009d23bd 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -527,7 +527,6 @@ class Processor(AsyncProcessor): type = "request-error", message = str(e), ), - end_of_stream = True, ) await self.librarian_response_producer.send( @@ -541,7 +540,6 @@ class Processor(AsyncProcessor): type = "unexpected-error", message = str(e), ), - end_of_stream = True, ) await self.librarian_response_producer.send(