Fix/librarian broken (#674)

* Set end-of-stream cleanly - clean streaming message structures

* Add tg-get-document-content
This commit is contained in:
cybermaggedon 2026-03-09 13:36:24 +00:00 committed by GitHub
parent df1808768d
commit 3c3e11bef5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 99 additions and 15 deletions

View file

@ -174,6 +174,4 @@ class LibraryResponseTranslator(MessageTranslator):
def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]: def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]:
"""Returns (response_dict, is_final)""" """Returns (response_dict, is_final)"""
# For streaming responses, check end_of_stream to determine if this is the final message return self.from_pulsar(obj), obj.is_final
is_final = getattr(obj, 'end_of_stream', True)
return self.from_pulsar(obj), is_final

View file

@ -212,8 +212,10 @@ class LibrarianResponse:
# list-uploads response # list-uploads response
upload_sessions: list[UploadSession] = field(default_factory=list) upload_sessions: list[UploadSession] = field(default_factory=list)
# stream-document response - indicates final chunk in stream # Protocol flag: True if this is the final response for a request.
end_of_stream: bool = False # Default True since most operations are single request/response.
# Only stream-document sets False for intermediate chunks.
is_final: bool = True
# FIXME: Is this right? Using persistence on librarian so that # FIXME: Is this right? Using persistence on librarian so that
# message chunking works # message chunking works

View file

@ -37,6 +37,7 @@ tg-dump-msgpack = "trustgraph.cli.dump_msgpack:main"
tg-dump-queues = "trustgraph.cli.dump_queues:main" tg-dump-queues = "trustgraph.cli.dump_queues:main"
tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main"
tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main"
tg-get-document-content = "trustgraph.cli.get_document_content:main"
tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main"
tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main" tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main"
tg-invoke-agent = "trustgraph.cli.invoke_agent:main" tg-invoke-agent = "trustgraph.cli.invoke_agent:main"

View file

@ -0,0 +1,87 @@
"""
Gets document content from the library by document ID.
"""
import argparse
import os
import sys
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
default_user = "trustgraph"
def get_content(url, user, document_id, output_file, token=None):
api = Api(url, token=token).library()
content = api.get_document_content(user=user, id=document_id)
if output_file:
with open(output_file, 'wb') as f:
f.write(content)
print(f"Written {len(content)} bytes to {output_file}")
else:
# Write to stdout
# Try to decode as text, fall back to binary info
try:
text = content.decode('utf-8')
print(text)
except UnicodeDecodeError:
print(f"Binary content: {len(content)} bytes", file=sys.stderr)
sys.stdout.buffer.write(content)
def main():
parser = argparse.ArgumentParser(
prog='tg-get-document-content',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-t', '--token',
default=default_token,
help='Authentication token (default: $TRUSTGRAPH_TOKEN)',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-o', '--output',
default=None,
help='Output file (default: stdout)'
)
parser.add_argument(
'document_id',
help='Document ID (IRI) to retrieve',
)
args = parser.parse_args()
try:
get_content(
url=args.api_url,
user=args.user,
document_id=args.document_id,
output_file=args.output,
token=args.token,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -656,9 +656,8 @@ class Librarian:
This is an async generator that yields document content in smaller chunks, This is an async generator that yields document content in smaller chunks,
allowing memory-efficient processing of large documents. Each yielded allowing memory-efficient processing of large documents. Each yielded
response includes chunk information and an end_of_stream flag. response includes chunk_index and total_chunks for tracking progress.
Completion is determined by chunk_index reaching total_chunks - 1.
The final chunk will have end_of_stream=True.
""" """
logger.debug(f"Streaming document {request.document_id}") logger.debug(f"Streaming document {request.document_id}")
@ -688,11 +687,10 @@ class Librarian:
# Fetch only the requested range # Fetch only the requested range
chunk_content = await self.blob_store.get_range(object_id, offset, length) chunk_content = await self.blob_store.get_range(object_id, offset, length)
is_last_chunk = (chunk_index == total_chunks - 1) is_last = (chunk_index == total_chunks - 1)
logger.debug(f"Streaming chunk {chunk_index}/{total_chunks}, " logger.debug(f"Streaming chunk {chunk_index + 1}/{total_chunks}, "
f"bytes {offset}-{offset + length} of {total_size}, " f"bytes {offset}-{offset + length} of {total_size}")
f"end_of_stream={is_last_chunk}")
yield LibrarianResponse( yield LibrarianResponse(
error=None, error=None,
@ -702,6 +700,6 @@ class Librarian:
total_chunks=total_chunks, total_chunks=total_chunks,
bytes_received=offset + length, bytes_received=offset + length,
total_bytes=total_size, total_bytes=total_size,
end_of_stream=is_last_chunk, is_final=is_last,
) )

View file

@ -527,7 +527,6 @@ class Processor(AsyncProcessor):
type = "request-error", type = "request-error",
message = str(e), message = str(e),
), ),
end_of_stream = True,
) )
await self.librarian_response_producer.send( await self.librarian_response_producer.send(
@ -541,7 +540,6 @@ class Processor(AsyncProcessor):
type = "unexpected-error", type = "unexpected-error",
message = str(e), message = str(e),
), ),
end_of_stream = True,
) )
await self.librarian_response_producer.send( await self.librarian_response_producer.send(