trustgraph/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py

228 lines
7 KiB
Python
Raw Normal View History

2024-07-10 23:20:06 +01:00
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
Supports both inline document data and fetching from librarian via Pulsar
for large documents.
2024-07-10 23:20:06 +01:00
"""
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
import os
2024-07-10 23:20:06 +01:00
import tempfile
import base64
import logging
from ... schema import Document, TextDocument, Metadata
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
from ... schema import librarian_request_queue, librarian_response_queue
from ... schema import Triples
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec, LibrarianClient
PyPDFLoader = None
from ... provenance import (
document_uri, page_uri as make_page_uri, derived_entity_triples,
Terminology Rename, and named-graphs for explainability (#682) Terminology Rename, and named-graphs for explainability data Changed terminology: - session -> question - retrieval -> exploration - selection -> focus - answer -> synthesis - uris.py: Renamed query_session_uri → question_uri, retrieval_uri → exploration_uri, selection_uri → focus_uri, answer_uri → synthesis_uri - triples.py: Renamed corresponding triple generation functions with updated labels ("GraphRAG question", "Exploration", "Focus", "Synthesis") - namespaces.py: Added named graph constants GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL - init.py: Updated exports - graph_rag.py: Updated to use new terminology - invoke_graph_rag.py: Updated CLI to display new stage names (Question, Exploration, Focus, Synthesis) Query-Time Explainability → Named Graph - triples.py: Added set_graph() helper function to set named graph on triples - graph_rag.py: All explainability triples now use GRAPH_RETRIEVAL named graph - rag.py: Explainability triples stored in user's collection (not separate collection) with named graph Extraction Provenance → Named Graph - relationships/extract.py: Provenance triples use GRAPH_SOURCE named graph - definitions/extract.py: Provenance triples use GRAPH_SOURCE named graph - chunker.py: Provenance triples use GRAPH_SOURCE named graph - pdf_decoder.py: Provenance triples use GRAPH_SOURCE named graph CLI Updates - show_graph.py: Added -g/--graph option to filter by named graph and --show-graph to display graph column Also: - Fix knowledge core schemas
2026-03-10 14:35:21 +00:00
set_graph, GRAPH_SOURCE,
)
# Component identification for provenance
COMPONENT_NAME = "pdf-decoder"
COMPONENT_VERSION = "1.0.0"
# Module logger
logger = logging.getLogger(__name__)
default_ident = "document-decoder"
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
default_librarian_request_queue = librarian_request_queue
default_librarian_response_queue = librarian_response_queue
class Processor(FlowProcessor):
2024-07-10 23:20:06 +01:00
def __init__(self, **params):
id = params.get("id", default_ident)
2024-07-10 23:20:06 +01:00
super(Processor, self).__init__(
**params | {
"id": id,
}
2024-07-10 23:20:06 +01:00
)
self.register_specification(
ConsumerSpec(
name = "input",
schema = Document,
handler = self.on_message,
)
)
self.register_specification(
ProducerSpec(
name = "output",
schema = TextDocument,
)
)
self.register_specification(
ProducerSpec(
name = "triples",
schema = Triples,
)
)
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
# Librarian client
self.librarian = LibrarianClient(
id=id, backend=self.pubsub, taskgroup=self.taskgroup,
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
)
logger.info("PDF decoder initialized")
2024-07-15 17:17:04 +01:00
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
async def start(self):
await super(Processor, self).start()
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
await self.librarian.start()
async def on_message(self, msg, consumer, flow):
2024-07-10 23:20:06 +01:00
logger.debug("PDF message received")
2024-07-15 17:17:04 +01:00
v = msg.value()
2024-07-10 23:20:06 +01:00
logger.info(f"Decoding PDF {v.metadata.id}...")
2024-07-10 23:20:06 +01:00
# Check MIME type if fetching from librarian
if v.document_id:
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
doc_meta = await self.librarian.fetch_document_metadata(
document_id=v.document_id,
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840) Introduces `workspace` as the isolation boundary for config, flows, library, and knowledge data. Removes `user` as a schema-level field throughout the code, API specs, and tests; workspace provides the same separation more cleanly at the trusted flow.workspace layer rather than through client-supplied message fields. Design ------ - IAM tech spec (docs/tech-specs/iam.md) documents current state, proposed auth/access model, and migration direction. - Data ownership model (docs/tech-specs/data-ownership-model.md) captures the workspace/collection/flow hierarchy. Schema + messaging ------------------ - Drop `user` field from AgentRequest/Step, GraphRagQuery, DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest, Sparql/Rows/Structured QueryRequest, ToolServiceRequest. - Keep collection/workspace routing via flow.workspace at the service layer. - Translators updated to not serialise/deserialise user. API specs --------- - OpenAPI schemas and path examples cleaned of user fields. - Websocket async-api messages updated. - Removed the unused parameters/User.yaml. Services + base --------------- - Librarian, collection manager, knowledge, config: all operations scoped by workspace. Config client API takes workspace as first positional arg. - `flow.workspace` set at flow start time by the infrastructure; no longer pass-through from clients. - Tool service drops user-personalisation passthrough. CLI + SDK --------- - tg-init-workspace and workspace-aware import/export. - All tg-* commands drop user args; accept --workspace. - Python API/SDK (flow, socket_client, async_*, explainability, library) drop user kwargs from every method signature. MCP server ---------- - All tool endpoints drop user parameters; socket_manager no longer keyed per user. Flow service ------------ - Closure-based topic cleanup on flow stop: only delete topics whose blueprint template was parameterised AND no remaining live flow (across all workspaces) still resolves to that topic. Three scopes fall out naturally from template analysis: * {id} -> per-flow, deleted on stop * {blueprint} -> per-blueprint, kept while any flow of the same blueprint exists * {workspace} -> per-workspace, kept while any flow in the workspace exists * literal -> global, never deleted (e.g. tg.request.librarian) Fixes a bug where stopping a flow silently destroyed the global librarian exchange, wedging all library operations until manual restart. RabbitMQ backend ---------------- - heartbeat=60, blocked_connection_timeout=300. Catches silently dead connections (broker restart, orphaned channels, network partitions) within ~2 heartbeat windows, so the consumer reconnects and re-binds its queue rather than sitting forever on a zombie connection. Tests ----- - Full test refresh: unit, integration, contract, provenance. - Dropped user-field assertions and constructor kwargs across ~100 test files. - Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00
workspace=flow.workspace,
)
if doc_meta and doc_meta.kind and doc_meta.kind != "application/pdf":
logger.error(
f"Unsupported MIME type: {doc_meta.kind}. "
f"PDF decoder only handles application/pdf. "
f"Ignoring document {v.metadata.id}."
)
return
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp:
temp_path = fp.name
# Check if we should fetch from librarian or use inline data
if v.document_id:
# Fetch from librarian via Pulsar
logger.info(f"Fetching document {v.document_id} from librarian...")
fp.close()
2024-07-15 17:17:04 +01:00
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
content = await self.librarian.fetch_document_content(
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
document_id=v.document_id,
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840) Introduces `workspace` as the isolation boundary for config, flows, library, and knowledge data. Removes `user` as a schema-level field throughout the code, API specs, and tests; workspace provides the same separation more cleanly at the trusted flow.workspace layer rather than through client-supplied message fields. Design ------ - IAM tech spec (docs/tech-specs/iam.md) documents current state, proposed auth/access model, and migration direction. - Data ownership model (docs/tech-specs/data-ownership-model.md) captures the workspace/collection/flow hierarchy. Schema + messaging ------------------ - Drop `user` field from AgentRequest/Step, GraphRagQuery, DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest, Sparql/Rows/Structured QueryRequest, ToolServiceRequest. - Keep collection/workspace routing via flow.workspace at the service layer. - Translators updated to not serialise/deserialise user. API specs --------- - OpenAPI schemas and path examples cleaned of user fields. - Websocket async-api messages updated. - Removed the unused parameters/User.yaml. Services + base --------------- - Librarian, collection manager, knowledge, config: all operations scoped by workspace. Config client API takes workspace as first positional arg. - `flow.workspace` set at flow start time by the infrastructure; no longer pass-through from clients. - Tool service drops user-personalisation passthrough. CLI + SDK --------- - tg-init-workspace and workspace-aware import/export. - All tg-* commands drop user args; accept --workspace. - Python API/SDK (flow, socket_client, async_*, explainability, library) drop user kwargs from every method signature. MCP server ---------- - All tool endpoints drop user parameters; socket_manager no longer keyed per user. Flow service ------------ - Closure-based topic cleanup on flow stop: only delete topics whose blueprint template was parameterised AND no remaining live flow (across all workspaces) still resolves to that topic. Three scopes fall out naturally from template analysis: * {id} -> per-flow, deleted on stop * {blueprint} -> per-blueprint, kept while any flow of the same blueprint exists * {workspace} -> per-workspace, kept while any flow in the workspace exists * literal -> global, never deleted (e.g. tg.request.librarian) Fixes a bug where stopping a flow silently destroyed the global librarian exchange, wedging all library operations until manual restart. RabbitMQ backend ---------------- - heartbeat=60, blocked_connection_timeout=300. Catches silently dead connections (broker restart, orphaned channels, network partitions) within ~2 heartbeat windows, so the consumer reconnects and re-binds its queue rather than sitting forever on a zombie connection. Tests ----- - Full test refresh: unit, integration, contract, provenance. - Dropped user-field assertions and constructor kwargs across ~100 test files. - Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00
workspace=flow.workspace,
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
)
2024-07-10 23:20:06 +01:00
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
# Content is base64 encoded
if isinstance(content, str):
content = content.encode('utf-8')
decoded_content = base64.b64decode(content)
2024-07-10 23:20:06 +01:00
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
with open(temp_path, 'wb') as f:
f.write(decoded_content)
2024-07-10 23:20:06 +01:00
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
logger.info(f"Fetched {len(decoded_content)} bytes from librarian")
else:
# Use inline data (backward compatibility)
fp.write(base64.b64decode(v.data))
fp.close()
2024-07-10 23:20:06 +01:00
global PyPDFLoader
if PyPDFLoader is None:
from langchain_community.document_loaders import (
PyPDFLoader as _cls,
)
PyPDFLoader = _cls
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
loader = PyPDFLoader(temp_path)
pages = loader.load()
# Get the source document ID
source_doc_id = v.document_id or v.metadata.id
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
for ix, page in enumerate(pages):
page_num = ix + 1 # 1-indexed page numbers
logger.debug(f"Processing page {page_num}")
2024-07-10 23:20:06 +01:00
# Generate unique page ID
pg_uri = make_page_uri()
page_doc_id = pg_uri
page_content = page.page_content.encode("utf-8")
# Save page as child document in librarian
RabbitMQ pub/sub backend with topic exchange architecture (#752) Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
2026-04-02 12:47:16 +01:00
await self.librarian.save_child_document(
doc_id=page_doc_id,
parent_id=source_doc_id,
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840) Introduces `workspace` as the isolation boundary for config, flows, library, and knowledge data. Removes `user` as a schema-level field throughout the code, API specs, and tests; workspace provides the same separation more cleanly at the trusted flow.workspace layer rather than through client-supplied message fields. Design ------ - IAM tech spec (docs/tech-specs/iam.md) documents current state, proposed auth/access model, and migration direction. - Data ownership model (docs/tech-specs/data-ownership-model.md) captures the workspace/collection/flow hierarchy. Schema + messaging ------------------ - Drop `user` field from AgentRequest/Step, GraphRagQuery, DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest, Sparql/Rows/Structured QueryRequest, ToolServiceRequest. - Keep collection/workspace routing via flow.workspace at the service layer. - Translators updated to not serialise/deserialise user. API specs --------- - OpenAPI schemas and path examples cleaned of user fields. - Websocket async-api messages updated. - Removed the unused parameters/User.yaml. Services + base --------------- - Librarian, collection manager, knowledge, config: all operations scoped by workspace. Config client API takes workspace as first positional arg. - `flow.workspace` set at flow start time by the infrastructure; no longer pass-through from clients. - Tool service drops user-personalisation passthrough. CLI + SDK --------- - tg-init-workspace and workspace-aware import/export. - All tg-* commands drop user args; accept --workspace. - Python API/SDK (flow, socket_client, async_*, explainability, library) drop user kwargs from every method signature. MCP server ---------- - All tool endpoints drop user parameters; socket_manager no longer keyed per user. Flow service ------------ - Closure-based topic cleanup on flow stop: only delete topics whose blueprint template was parameterised AND no remaining live flow (across all workspaces) still resolves to that topic. Three scopes fall out naturally from template analysis: * {id} -> per-flow, deleted on stop * {blueprint} -> per-blueprint, kept while any flow of the same blueprint exists * {workspace} -> per-workspace, kept while any flow in the workspace exists * literal -> global, never deleted (e.g. tg.request.librarian) Fixes a bug where stopping a flow silently destroyed the global librarian exchange, wedging all library operations until manual restart. RabbitMQ backend ---------------- - heartbeat=60, blocked_connection_timeout=300. Catches silently dead connections (broker restart, orphaned channels, network partitions) within ~2 heartbeat windows, so the consumer reconnects and re-binds its queue rather than sitting forever on a zombie connection. Tests ----- - Full test refresh: unit, integration, contract, provenance. - Dropped user-field assertions and constructor kwargs across ~100 test files. - Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00
workspace=flow.workspace,
content=page_content,
document_type="page",
title=f"Page {page_num}",
)
Terminology Rename, and named-graphs for explainability (#682) Terminology Rename, and named-graphs for explainability data Changed terminology: - session -> question - retrieval -> exploration - selection -> focus - answer -> synthesis - uris.py: Renamed query_session_uri → question_uri, retrieval_uri → exploration_uri, selection_uri → focus_uri, answer_uri → synthesis_uri - triples.py: Renamed corresponding triple generation functions with updated labels ("GraphRAG question", "Exploration", "Focus", "Synthesis") - namespaces.py: Added named graph constants GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL - init.py: Updated exports - graph_rag.py: Updated to use new terminology - invoke_graph_rag.py: Updated CLI to display new stage names (Question, Exploration, Focus, Synthesis) Query-Time Explainability → Named Graph - triples.py: Added set_graph() helper function to set named graph on triples - graph_rag.py: All explainability triples now use GRAPH_RETRIEVAL named graph - rag.py: Explainability triples stored in user's collection (not separate collection) with named graph Extraction Provenance → Named Graph - relationships/extract.py: Provenance triples use GRAPH_SOURCE named graph - definitions/extract.py: Provenance triples use GRAPH_SOURCE named graph - chunker.py: Provenance triples use GRAPH_SOURCE named graph - pdf_decoder.py: Provenance triples use GRAPH_SOURCE named graph CLI Updates - show_graph.py: Added -g/--graph option to filter by named graph and --show-graph to display graph column Also: - Fix knowledge core schemas
2026-03-10 14:35:21 +00:00
# Emit provenance triples (stored in source graph for separation from core knowledge)
doc_uri = document_uri(source_doc_id)
prov_triples = derived_entity_triples(
entity_uri=pg_uri,
parent_uri=doc_uri,
component_name=COMPONENT_NAME,
component_version=COMPONENT_VERSION,
label=f"Page {page_num}",
page_number=page_num,
)
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
await flow("triples").send(Triples(
metadata=Metadata(
id=pg_uri,
root=v.metadata.root,
collection=v.metadata.collection,
),
Terminology Rename, and named-graphs for explainability (#682) Terminology Rename, and named-graphs for explainability data Changed terminology: - session -> question - retrieval -> exploration - selection -> focus - answer -> synthesis - uris.py: Renamed query_session_uri → question_uri, retrieval_uri → exploration_uri, selection_uri → focus_uri, answer_uri → synthesis_uri - triples.py: Renamed corresponding triple generation functions with updated labels ("GraphRAG question", "Exploration", "Focus", "Synthesis") - namespaces.py: Added named graph constants GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL - init.py: Updated exports - graph_rag.py: Updated to use new terminology - invoke_graph_rag.py: Updated CLI to display new stage names (Question, Exploration, Focus, Synthesis) Query-Time Explainability → Named Graph - triples.py: Added set_graph() helper function to set named graph on triples - graph_rag.py: All explainability triples now use GRAPH_RETRIEVAL named graph - rag.py: Explainability triples stored in user's collection (not separate collection) with named graph Extraction Provenance → Named Graph - relationships/extract.py: Provenance triples use GRAPH_SOURCE named graph - definitions/extract.py: Provenance triples use GRAPH_SOURCE named graph - chunker.py: Provenance triples use GRAPH_SOURCE named graph - pdf_decoder.py: Provenance triples use GRAPH_SOURCE named graph CLI Updates - show_graph.py: Added -g/--graph option to filter by named graph and --show-graph to display graph column Also: - Fix knowledge core schemas
2026-03-10 14:35:21 +00:00
triples=set_graph(prov_triples, GRAPH_SOURCE),
))
# Forward page document ID to chunker
# Chunker will fetch content from librarian
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
r = TextDocument(
metadata=Metadata(
id=pg_uri,
root=v.metadata.root,
collection=v.metadata.collection,
),
document_id=page_doc_id,
text=b"", # Empty, chunker will fetch from librarian
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
)
await flow("output").send(r)
# Clean up temp file
try:
os.unlink(temp_path)
except OSError:
pass
2024-07-10 23:20:06 +01:00
logger.debug("PDF decoding complete")
2024-07-10 23:20:06 +01:00
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
2024-07-10 23:20:06 +01:00
Incremental / large document loading (#659) Tech spec BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py): - get_stream() - yields document content in chunks for streaming retrieval - create_multipart_upload() - initializes S3 multipart upload, returns upload_id - upload_part() - uploads a single part, returns etag - complete_multipart_upload() - finalizes upload with part etags - abort_multipart_upload() - cancels and cleans up Cassandra schema (trustgraph-flow/trustgraph/tables/library.py): - New upload_session table with 24-hour TTL - Index on user for listing sessions - Prepared statements for all operations - Methods: create_upload_session(), get_upload_session(), update_upload_session_chunk(), delete_upload_session(), list_upload_sessions() - Schema extended with UploadSession, UploadProgress, and new request/response fields - Librarian methods: begin_upload, upload_chunk, complete_upload, abort_upload, get_upload_status, list_uploads - Service routing for all new operations - Python SDK with transparent chunked upload: - add_document() auto-switches to chunked for files > 10MB - Progress callback support (on_progress) - get_pending_uploads(), get_upload_status(), abort_upload(), resume_upload() - Document table: Added parent_id and document_type columns with index - Document schema (knowledge/document.py): Added document_id field for streaming retrieval - Librarian operations: - add-child-document for extracted PDF pages - list-children to get child documents - stream-document for chunked content retrieval - Cascade delete removes children when parent is deleted - list-documents filters children by default - PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large documents from librarian API to temp file - Librarian service (librarian/service.py): Sends document_id instead of content for large PDFs (>2MB) - Deprecated tools (load_pdf.py, load_text.py): Added deprecation warnings directing users to tg-add-library-document + tg-start-library-processing Remove load_pdf and load_text utils Move chunker/librarian comms to base class Updating tests
2026-03-04 16:57:58 +00:00
parser.add_argument(
'--librarian-request-queue',
default=default_librarian_request_queue,
help=f'Librarian request queue (default: {default_librarian_request_queue})',
)
parser.add_argument(
'--librarian-response-queue',
default=default_librarian_response_queue,
help=f'Librarian response queue (default: {default_librarian_response_queue})',
)
2024-07-10 23:20:06 +01:00
def run():
Processor.launch(default_ident, __doc__)