diff --git a/docs/tech-specs/large-document-loading.md b/docs/tech-specs/large-document-loading.md new file mode 100644 index 00000000..497d7bce --- /dev/null +++ b/docs/tech-specs/large-document-loading.md @@ -0,0 +1,984 @@ +# Large Document Loading Technical Specification + +## Overview + +This specification addresses scalability and user experience issues when loading +large documents into TrustGraph. The current architecture treats document upload +as a single atomic operation, causing memory pressure at multiple points in the +pipeline and providing no feedback or recovery options to users. + +This implementation targets the following use cases: + +1. **Large PDF Processing**: Upload and process multi-hundred-megabyte PDF files + without exhausting memory +2. **Resumable Uploads**: Allow interrupted uploads to continue from where they + left off rather than restarting +3. **Progress Feedback**: Provide users with real-time visibility into upload + and processing progress +4. **Memory-Efficient Processing**: Process documents in a streaming fashion + without holding entire files in memory + +## Goals + +- **Incremental Upload**: Support chunked document upload via REST and WebSocket +- **Resumable Transfers**: Enable recovery from interrupted uploads +- **Progress Visibility**: Provide upload/processing progress feedback to clients +- **Memory Efficiency**: Eliminate full-document buffering throughout the pipeline +- **Backward Compatibility**: Existing small-document workflows continue unchanged +- **Streaming Processing**: PDF decoding and text chunking operate on streams + +## Background + +### Current Architecture + +Document submission flows through the following path: + +1. **Client** submits document via REST (`POST /api/v1/librarian`) or WebSocket +2. **API Gateway** receives complete request with base64-encoded document content +3. **LibrarianRequestor** translates request to Pulsar message +4. **Librarian Service** receives message, decodes document into memory +5. **BlobStore** uploads document to Garage/S3 +6. **Cassandra** stores metadata with object reference +7. For processing: document retrieved from S3, decoded, chunked—all in memory + +Key files: +- REST/WebSocket entry: `trustgraph-flow/trustgraph/gateway/service.py` +- Librarian core: `trustgraph-flow/trustgraph/librarian/librarian.py` +- Blob storage: `trustgraph-flow/trustgraph/librarian/blob_store.py` +- Cassandra tables: `trustgraph-flow/trustgraph/tables/library.py` +- API schema: `trustgraph-base/trustgraph/schema/services/library.py` + +### Current Limitations + +The current design has several compounding memory and UX issues: + +1. **Atomic Upload Operation**: The entire document must be transmitted in a + single request. Large documents require long-running requests with no + progress indication and no retry mechanism if the connection fails. + +2. **API Design**: Both REST and WebSocket APIs expect the complete document + in a single message. The schema (`LibrarianRequest`) has a single `content` + field containing the entire base64-encoded document. + +3. **Librarian Memory**: The librarian service decodes the entire document + into memory before uploading to S3. For a 500MB PDF, this means holding + 500MB+ in process memory. + +4. **PDF Decoder Memory**: When processing begins, the PDF decoder loads the + entire PDF into memory to extract text. PyPDF and similar libraries + typically require full document access. + +5. **Chunker Memory**: The text chunker receives the complete extracted text + and holds it in memory while producing chunks. + +**Memory Impact Example** (500MB PDF): +- Gateway: ~700MB (base64 encoding overhead) +- Librarian: ~500MB (decoded bytes) +- PDF Decoder: ~500MB + extraction buffers +- Chunker: extracted text (variable, potentially 100MB+) + +Total peak memory can exceed 2GB for a single large document. + +## Technical Design + +### Design Principles + +1. **API Facade**: All client interaction goes through the librarian API. Clients + have no direct access to or knowledge of the underlying S3/Garage storage. + +2. **S3 Multipart Upload**: Use standard S3 multipart upload under the hood. + This is widely supported across S3-compatible systems (AWS S3, MinIO, Garage, + Ceph, DigitalOcean Spaces, Backblaze B2, etc.) ensuring portability. + +3. **Atomic Completion**: S3 multipart uploads are inherently atomic - uploaded + parts are invisible until `CompleteMultipartUpload` is called. No temporary + files or rename operations needed. + +4. **Trackable State**: Upload sessions tracked in Cassandra, providing + visibility into incomplete uploads and enabling resume capability. + +### Chunked Upload Flow + +``` +Client Librarian API S3/Garage + │ │ │ + │── begin-upload ───────────►│ │ + │ (metadata, size) │── CreateMultipartUpload ────►│ + │ │◄── s3_upload_id ─────────────│ + │◄── upload_id ──────────────│ (store session in │ + │ │ Cassandra) │ + │ │ │ + │── upload-chunk ───────────►│ │ + │ (upload_id, index, data) │── UploadPart ───────────────►│ + │ │◄── etag ─────────────────────│ + │◄── ack + progress ─────────│ (store etag in session) │ + │ ⋮ │ ⋮ │ + │ (repeat for all chunks) │ │ + │ │ │ + │── complete-upload ────────►│ │ + │ (upload_id) │── CompleteMultipartUpload ──►│ + │ │ (parts coalesced by S3) │ + │ │── store doc metadata ───────►│ Cassandra + │◄── document_id ────────────│ (delete session) │ +``` + +The client never interacts with S3 directly. The librarian translates between +our chunked upload API and S3 multipart operations internally. + +### Librarian API Operations + +#### `begin-upload` + +Initialize a chunked upload session. + +Request: +```json +{ + "operation": "begin-upload", + "document-metadata": { + "id": "doc-123", + "kind": "application/pdf", + "title": "Large Document", + "user": "user-id", + "tags": ["tag1", "tag2"] + }, + "total-size": 524288000, + "chunk-size": 5242880 +} +``` + +Response: +```json +{ + "upload-id": "upload-abc-123", + "chunk-size": 5242880, + "total-chunks": 100 +} +``` + +The librarian: +1. Generates a unique `upload_id` and `object_id` (UUID for blob storage) +2. Calls S3 `CreateMultipartUpload`, receives `s3_upload_id` +3. Creates session record in Cassandra +4. Returns `upload_id` to client + +#### `upload-chunk` + +Upload a single chunk. + +Request: +```json +{ + "operation": "upload-chunk", + "upload-id": "upload-abc-123", + "chunk-index": 0, + "content": "" +} +``` + +Response: +```json +{ + "upload-id": "upload-abc-123", + "chunk-index": 0, + "chunks-received": 1, + "total-chunks": 100, + "bytes-received": 5242880, + "total-bytes": 524288000 +} +``` + +The librarian: +1. Looks up session by `upload_id` +2. Validates ownership (user must match session creator) +3. Calls S3 `UploadPart` with chunk data, receives `etag` +4. Updates session record with chunk index and etag +5. Returns progress to client + +Failed chunks can be retried - just send the same `chunk-index` again. + +#### `complete-upload` + +Finalize the upload and create the document. + +Request: +```json +{ + "operation": "complete-upload", + "upload-id": "upload-abc-123" +} +``` + +Response: +```json +{ + "document-id": "doc-123", + "object-id": "550e8400-e29b-41d4-a716-446655440000" +} +``` + +The librarian: +1. Looks up session, verifies all chunks received +2. Calls S3 `CompleteMultipartUpload` with part etags (S3 coalesces parts + internally - zero memory cost to librarian) +3. Creates document record in Cassandra with metadata and object reference +4. Deletes upload session record +5. Returns document ID to client + +#### `abort-upload` + +Cancel an in-progress upload. + +Request: +```json +{ + "operation": "abort-upload", + "upload-id": "upload-abc-123" +} +``` + +The librarian: +1. Calls S3 `AbortMultipartUpload` to clean up parts +2. Deletes session record from Cassandra + +#### `get-upload-status` + +Query status of an upload (for resume capability). + +Request: +```json +{ + "operation": "get-upload-status", + "upload-id": "upload-abc-123" +} +``` + +Response: +```json +{ + "upload-id": "upload-abc-123", + "state": "in-progress", + "chunks-received": [0, 1, 2, 5, 6], + "missing-chunks": [3, 4, 7, 8], + "total-chunks": 100, + "bytes-received": 36700160, + "total-bytes": 524288000 +} +``` + +#### `list-uploads` + +List incomplete uploads for a user. + +Request: +```json +{ + "operation": "list-uploads" +} +``` + +Response: +```json +{ + "uploads": [ + { + "upload-id": "upload-abc-123", + "document-metadata": { "title": "Large Document", ... }, + "progress": { "chunks-received": 43, "total-chunks": 100 }, + "created-at": "2024-01-15T10:30:00Z" + } + ] +} +``` + +### Upload Session Storage + +Track in-progress uploads in Cassandra: + +```sql +CREATE TABLE upload_session ( + upload_id text PRIMARY KEY, + user text, + document_id text, + document_metadata text, -- JSON: title, kind, tags, comments, etc. + s3_upload_id text, -- internal, for S3 operations + object_id uuid, -- target blob ID + total_size bigint, + chunk_size int, + total_chunks int, + chunks_received map, -- chunk_index → etag + created_at timestamp, + updated_at timestamp +) WITH default_time_to_live = 86400; -- 24 hour TTL + +CREATE INDEX upload_session_user ON upload_session (user); +``` + +**TTL Behavior:** +- Sessions expire after 24 hours if not completed +- When Cassandra TTL expires, the session record is deleted +- Orphaned S3 parts are cleaned up by S3 lifecycle policy (configure on bucket) + +### Failure Handling and Atomicity + +**Chunk upload failure:** +- Client retries the failed chunk (same `upload_id` and `chunk-index`) +- S3 `UploadPart` is idempotent for the same part number +- Session tracks which chunks succeeded + +**Client disconnect mid-upload:** +- Session remains in Cassandra with received chunks recorded +- Client can call `get-upload-status` to see what's missing +- Resume by uploading only missing chunks, then `complete-upload` + +**Complete-upload failure:** +- S3 `CompleteMultipartUpload` is atomic - either succeeds fully or fails +- On failure, parts remain and client can retry `complete-upload` +- No partial document is ever visible + +**Session expiry:** +- Cassandra TTL deletes session record after 24 hours +- S3 bucket lifecycle policy cleans up incomplete multipart uploads +- No manual cleanup required + +### S3 Multipart Atomicity + +S3 multipart uploads provide built-in atomicity: + +1. **Parts are invisible**: Uploaded parts cannot be accessed as objects. + They exist only as parts of an incomplete multipart upload. + +2. **Atomic completion**: `CompleteMultipartUpload` either succeeds (object + appears atomically) or fails (no object created). No partial state. + +3. **No rename needed**: The final object key is specified at + `CreateMultipartUpload` time. Parts are coalesced directly to that key. + +4. **Server-side coalesce**: S3 combines parts internally. The librarian + never reads parts back - zero memory overhead regardless of document size. + +### BlobStore Extensions + +**File:** `trustgraph-flow/trustgraph/librarian/blob_store.py` + +Add multipart upload methods: + +```python +class BlobStore: + # Existing methods... + + def create_multipart_upload(self, object_id: UUID, kind: str) -> str: + """Initialize multipart upload, return s3_upload_id.""" + # minio client: create_multipart_upload() + + def upload_part( + self, object_id: UUID, s3_upload_id: str, + part_number: int, data: bytes + ) -> str: + """Upload a single part, return etag.""" + # minio client: upload_part() + # Note: S3 part numbers are 1-indexed + + def complete_multipart_upload( + self, object_id: UUID, s3_upload_id: str, + parts: List[Tuple[int, str]] # [(part_number, etag), ...] + ) -> None: + """Finalize multipart upload.""" + # minio client: complete_multipart_upload() + + def abort_multipart_upload( + self, object_id: UUID, s3_upload_id: str + ) -> None: + """Cancel multipart upload, clean up parts.""" + # minio client: abort_multipart_upload() +``` + +### Chunk Size Considerations + +- **S3 minimum**: 5MB per part (except last part) +- **S3 maximum**: 10,000 parts per upload +- **Practical default**: 5MB chunks + - 500MB document = 100 chunks + - 5GB document = 1,000 chunks +- **Progress granularity**: Smaller chunks = finer progress updates +- **Network efficiency**: Larger chunks = fewer round trips + +Chunk size could be client-configurable within bounds (5MB - 100MB). + +### Document Processing: Streaming Retrieval + +The upload flow addresses getting documents into storage efficiently. The +processing flow addresses extracting and chunking documents without loading +them entirely into memory. + +#### Design Principle: Identifier, Not Content + +Currently, when processing is triggered, document content flows through Pulsar +messages. This loads entire documents into memory. Instead: + +- Pulsar messages carry only the **document identifier** +- Processors fetch document content directly from librarian +- Fetching happens as a **stream to temporary file** +- Document-specific parsing (PDF, text, etc.) works with files, not memory buffers + +This keeps the librarian document-structure-agnostic. PDF parsing, text +extraction, and other format-specific logic stays in the respective decoders. + +#### Processing Flow + +``` +Pulsar PDF Decoder Librarian S3 + │ │ │ │ + │── doc-id ───────────►│ │ │ + │ (processing msg) │ │ │ + │ │ │ │ + │ │── stream-document ──────►│ │ + │ │ (doc-id) │── GetObject ────►│ + │ │ │ │ + │ │◄── chunk ────────────────│◄── stream ───────│ + │ │ (write to temp file) │ │ + │ │◄── chunk ────────────────│◄── stream ───────│ + │ │ (append to temp file) │ │ + │ │ ⋮ │ ⋮ │ + │ │◄── EOF ──────────────────│ │ + │ │ │ │ + │ │ ┌──────────────────────────┐ │ + │ │ │ temp file on disk │ │ + │ │ │ (memory stays bounded) │ │ + │ │ └────────────┬─────────────┘ │ + │ │ │ │ + │ │ PDF library opens file │ + │ │ extract page 1 text ──► chunker │ + │ │ extract page 2 text ──► chunker │ + │ │ ⋮ │ + │ │ close file │ + │ │ delete temp file │ +``` + +#### Librarian Stream API + +Add a streaming document retrieval operation: + +**`stream-document`** + +Request: +```json +{ + "operation": "stream-document", + "document-id": "doc-123" +} +``` + +Response: Streamed binary chunks (not a single response). + +For REST API, this returns a streaming response with `Transfer-Encoding: chunked`. + +For internal service-to-service calls (processor to librarian), this could be: +- Direct S3 streaming via presigned URL (if internal network allows) +- Chunked responses over the service protocol +- A dedicated streaming endpoint + +The key requirement: data flows in chunks, never fully buffered in librarian. + +#### PDF Decoder Changes + +**Current implementation** (memory-intensive): + +```python +def decode_pdf(document_content: bytes) -> str: + reader = PdfReader(BytesIO(document_content)) # full doc in memory + text = "" + for page in reader.pages: + text += page.extract_text() # accumulating + return text # full text in memory +``` + +**New implementation** (temp file, incremental): + +```python +def decode_pdf_streaming(doc_id: str, librarian_client) -> Iterator[str]: + """Yield extracted text page by page.""" + + with tempfile.NamedTemporaryFile(delete=True, suffix='.pdf') as tmp: + # Stream document to temp file + for chunk in librarian_client.stream_document(doc_id): + tmp.write(chunk) + tmp.flush() + + # Open PDF from file (not memory) + reader = PdfReader(tmp.name) + + # Yield pages incrementally + for page in reader.pages: + yield page.extract_text() + + # tmp file auto-deleted on context exit +``` + +Memory profile: +- Temp file on disk: size of PDF (disk is cheap) +- In memory: one page's text at a time +- Peak memory: bounded, independent of document size + +#### Text Document Decoder Changes + +For plain text documents, even simpler - no temp file needed: + +```python +def decode_text_streaming(doc_id: str, librarian_client) -> Iterator[str]: + """Yield text in chunks as it streams from storage.""" + + buffer = "" + for chunk in librarian_client.stream_document(doc_id): + buffer += chunk.decode('utf-8') + + # Yield complete lines/paragraphs as they arrive + while '\n\n' in buffer: + paragraph, buffer = buffer.split('\n\n', 1) + yield paragraph + '\n\n' + + # Yield remaining buffer + if buffer: + yield buffer +``` + +Text documents can stream directly without temp file since they're +linearly structured. + +#### Streaming Chunker Integration + +The chunker receives an iterator of text (pages or paragraphs) and produces +chunks incrementally: + +```python +class StreamingChunker: + def __init__(self, chunk_size: int, overlap: int): + self.chunk_size = chunk_size + self.overlap = overlap + + def process(self, text_stream: Iterator[str]) -> Iterator[str]: + """Yield chunks as text arrives.""" + buffer = "" + + for text_segment in text_stream: + buffer += text_segment + + while len(buffer) >= self.chunk_size: + chunk = buffer[:self.chunk_size] + yield chunk + # Keep overlap for context continuity + buffer = buffer[self.chunk_size - self.overlap:] + + # Yield remaining buffer as final chunk + if buffer.strip(): + yield buffer +``` + +#### End-to-End Processing Pipeline + +```python +async def process_document(doc_id: str, librarian_client, embedder): + """Process document with bounded memory.""" + + # Get document metadata to determine type + metadata = await librarian_client.get_document_metadata(doc_id) + + # Select decoder based on document type + if metadata.kind == 'application/pdf': + text_stream = decode_pdf_streaming(doc_id, librarian_client) + elif metadata.kind == 'text/plain': + text_stream = decode_text_streaming(doc_id, librarian_client) + else: + raise UnsupportedDocumentType(metadata.kind) + + # Chunk incrementally + chunker = StreamingChunker(chunk_size=1000, overlap=100) + + # Process each chunk as it's produced + for chunk in chunker.process(text_stream): + # Generate embeddings, store in vector DB, etc. + embedding = await embedder.embed(chunk) + await store_chunk(doc_id, chunk, embedding) +``` + +At no point is the full document or full extracted text held in memory. + +#### Temp File Considerations + +**Location**: Use system temp directory (`/tmp` or equivalent). For +containerized deployments, ensure temp directory has sufficient space +and is on fast storage (not network-mounted if possible). + +**Cleanup**: Use context managers (`with tempfile...`) to ensure cleanup +even on exceptions. + +**Concurrent processing**: Each processing job gets its own temp file. +No conflicts between parallel document processing. + +**Disk space**: Temp files are short-lived (duration of processing). For +a 500MB PDF, need 500MB temp space during processing. Size limit could +be enforced at upload time if disk space is constrained. + +### Unified Processing Interface: Child Documents + +PDF extraction and text document processing need to feed into the same +downstream pipeline (chunker → embeddings → storage). To achieve this with +a consistent "fetch by ID" interface, extracted text blobs are stored back +to librarian as child documents. + +#### Processing Flow with Child Documents + +``` +PDF Document Text Document + │ │ + ▼ │ +pdf-extractor │ + │ │ + │ (stream PDF from librarian) │ + │ (extract page 1 text) │ + │ (store as child doc → librarian) │ + │ (extract page 2 text) │ + │ (store as child doc → librarian) │ + │ ⋮ │ + ▼ ▼ +[child-doc-id, child-doc-id, ...] [doc-id] + │ │ + └─────────────────────┬───────────────────────────────┘ + ▼ + chunker + │ + │ (receives document ID) + │ (streams content from librarian) + │ (chunks incrementally) + ▼ + [chunks → embedding → storage] +``` + +The chunker has one uniform interface: +- Receive a document ID (via Pulsar) +- Stream content from librarian +- Chunk it + +It doesn't know or care whether the ID refers to: +- A user-uploaded text document +- An extracted text blob from a PDF page +- Any future document type + +#### Child Document Metadata + +Extend the document schema to track parent/child relationships: + +```sql +-- Add columns to document table +ALTER TABLE document ADD parent_id text; +ALTER TABLE document ADD document_type text; + +-- Index for finding children of a parent +CREATE INDEX document_parent ON document (parent_id); +``` + +**Document types:** + +| `document_type` | Description | +|-----------------|-------------| +| `source` | User-uploaded document (PDF, text, etc.) | +| `extracted` | Derived from a source document (e.g., PDF page text) | + +**Metadata fields:** + +| Field | Source Document | Extracted Child | +|-------|-----------------|-----------------| +| `id` | user-provided or generated | generated (e.g., `{parent-id}-page-{n}`) | +| `parent_id` | `NULL` | parent document ID | +| `document_type` | `source` | `extracted` | +| `kind` | `application/pdf`, etc. | `text/plain` | +| `title` | user-provided | generated (e.g., "Page 3 of Report.pdf") | +| `user` | authenticated user | same as parent | + +#### Librarian API for Child Documents + +**Creating child documents** (internal, used by pdf-extractor): + +```json +{ + "operation": "add-child-document", + "parent-id": "doc-123", + "document-metadata": { + "id": "doc-123-page-1", + "kind": "text/plain", + "title": "Page 1" + }, + "content": "" +} +``` + +For small extracted text (typical page text is < 100KB), single-operation +upload is acceptable. For very large text extractions, chunked upload +could be used. + +**Listing child documents** (for debugging/admin): + +```json +{ + "operation": "list-children", + "parent-id": "doc-123" +} +``` + +Response: +```json +{ + "children": [ + { "id": "doc-123-page-1", "title": "Page 1", "kind": "text/plain" }, + { "id": "doc-123-page-2", "title": "Page 2", "kind": "text/plain" }, + ... + ] +} +``` + +#### User-Facing Behavior + +**`list-documents` default behavior:** + +```sql +SELECT * FROM document WHERE user = ? AND parent_id IS NULL; +``` + +Only top-level (source) documents appear in the user's document list. +Child documents are filtered out by default. + +**Optional include-children flag** (for admin/debugging): + +```json +{ + "operation": "list-documents", + "include-children": true +} +``` + +#### Cascade Delete + +When a parent document is deleted, all children must be deleted: + +```python +def delete_document(doc_id: str): + # Find all children + children = query("SELECT id, object_id FROM document WHERE parent_id = ?", doc_id) + + # Delete child blobs from S3 + for child in children: + blob_store.delete(child.object_id) + + # Delete child metadata from Cassandra + execute("DELETE FROM document WHERE parent_id = ?", doc_id) + + # Delete parent blob and metadata + parent = get_document(doc_id) + blob_store.delete(parent.object_id) + execute("DELETE FROM document WHERE id = ? AND user = ?", doc_id, user) +``` + +#### Storage Considerations + +Extracted text blobs do duplicate content: +- Original PDF stored in Garage +- Extracted text per page also stored in Garage + +This tradeoff enables: +- **Uniform chunker interface**: Chunker always fetches by ID +- **Resume/retry**: Can restart at chunker stage without re-extracting PDF +- **Debugging**: Extracted text is inspectable +- **Separation of concerns**: PDF extractor and chunker are independent services + +For a 500MB PDF with 200 pages averaging 5KB text per page: +- PDF storage: 500MB +- Extracted text storage: ~1MB total +- Overhead: negligible + +#### PDF Extractor Output + +The pdf-extractor, after processing a document: + +1. Streams PDF from librarian to temp file +2. Extracts text page by page +3. For each page, stores extracted text as child document via librarian +4. Sends child document IDs to chunker queue + +```python +async def extract_pdf(doc_id: str, librarian_client, output_queue): + """Extract PDF pages and store as child documents.""" + + with tempfile.NamedTemporaryFile(delete=True, suffix='.pdf') as tmp: + # Stream PDF to temp file + for chunk in librarian_client.stream_document(doc_id): + tmp.write(chunk) + tmp.flush() + + # Extract pages + reader = PdfReader(tmp.name) + for page_num, page in enumerate(reader.pages, start=1): + text = page.extract_text() + + # Store as child document + child_id = f"{doc_id}-page-{page_num}" + await librarian_client.add_child_document( + parent_id=doc_id, + document_id=child_id, + kind="text/plain", + title=f"Page {page_num}", + content=text.encode('utf-8') + ) + + # Send to chunker queue + await output_queue.send(child_id) +``` + +The chunker receives these child IDs and processes them identically to +how it would process a user-uploaded text document. + +### Client Updates + +#### Python SDK + +The Python SDK (`trustgraph-base/trustgraph/api/library.py`) should handle +chunked uploads transparently. The public interface remains unchanged: + +```python +# Existing interface - no change for users +library.add_document( + id="doc-123", + title="Large Report", + kind="application/pdf", + content=large_pdf_bytes, # Can be hundreds of MB + tags=["reports"] +) +``` + +Internally, the SDK detects document size and switches strategy: + +```python +class Library: + CHUNKED_UPLOAD_THRESHOLD = 2 * 1024 * 1024 # 2MB + + def add_document(self, id, title, kind, content, tags=None, ...): + if len(content) < self.CHUNKED_UPLOAD_THRESHOLD: + # Small document: single operation (existing behavior) + return self._add_document_single(id, title, kind, content, tags) + else: + # Large document: chunked upload + return self._add_document_chunked(id, title, kind, content, tags) + + def _add_document_chunked(self, id, title, kind, content, tags): + # 1. begin-upload + session = self._begin_upload( + document_metadata={...}, + total_size=len(content), + chunk_size=5 * 1024 * 1024 + ) + + # 2. upload-chunk for each chunk + for i, chunk in enumerate(self._chunk_bytes(content, session.chunk_size)): + self._upload_chunk(session.upload_id, i, chunk) + + # 3. complete-upload + return self._complete_upload(session.upload_id) +``` + +**Progress callbacks** (optional enhancement): + +```python +def add_document(self, ..., on_progress=None): + """ + on_progress: Optional callback(bytes_sent, total_bytes) + """ +``` + +This allows UIs to display upload progress without changing the basic API. + +#### CLI Tools + +**`tg-add-library-document`** continues to work unchanged: + +```bash +# Works transparently for any size - SDK handles chunking internally +tg-add-library-document --file large-report.pdf --title "Large Report" +``` + +Optional progress display could be added: + +```bash +tg-add-library-document --file large-report.pdf --title "Large Report" --progress +# Output: +# Uploading: 45% (225MB / 500MB) +``` + +**Legacy tools removed:** + +- `tg-load-pdf` - deprecated, use `tg-add-library-document` +- `tg-load-text` - deprecated, use `tg-add-library-document` + +**Admin/debug commands** (optional, low priority): + +```bash +# List incomplete uploads (admin troubleshooting) +tg-add-library-document --list-pending + +# Resume specific upload (recovery scenario) +tg-add-library-document --resume upload-abc-123 --file large-report.pdf +``` + +These could be flags on the existing command rather than separate tools. + +#### API Specification Updates + +The OpenAPI spec (`specs/api/paths/librarian.yaml`) needs updates for: + +**New operations:** + +- `begin-upload` - Initialize chunked upload session +- `upload-chunk` - Upload individual chunk +- `complete-upload` - Finalize upload +- `abort-upload` - Cancel upload +- `get-upload-status` - Query upload progress +- `list-uploads` - List incomplete uploads for user +- `stream-document` - Streaming document retrieval +- `add-child-document` - Store extracted text (internal) +- `list-children` - List child documents (admin) + +**Modified operations:** + +- `list-documents` - Add `include-children` parameter + +**New schemas:** + +- `ChunkedUploadBeginRequest` +- `ChunkedUploadBeginResponse` +- `ChunkedUploadChunkRequest` +- `ChunkedUploadChunkResponse` +- `UploadSession` +- `UploadProgress` + +**WebSocket spec updates** (`specs/websocket/`): + +Mirror the REST operations for WebSocket clients, enabling real-time +progress updates during upload. + +#### UX Considerations + +The API spec updates enable frontend improvements: + +**Upload progress UI:** +- Progress bar showing chunks uploaded +- Estimated time remaining +- Pause/resume capability + +**Error recovery:** +- "Resume upload" option for interrupted uploads +- List of pending uploads on reconnect + +**Large file handling:** +- Client-side file size detection +- Automatic chunked upload for large files +- Clear feedback during long uploads + +These UX improvements require frontend work guided by the updated API spec. diff --git a/tests/unit/test_chunking/test_recursive_chunker.py b/tests/unit/test_chunking/test_recursive_chunker.py index 8f91d95f..ef4aec69 100644 --- a/tests/unit/test_chunking/test_recursive_chunker.py +++ b/tests/unit/test_chunking/test_recursive_chunker.py @@ -17,13 +17,17 @@ class MockAsyncProcessor: self.config_handlers = [] self.id = params.get('id', 'test-service') self.specifications = [] + self.pubsub = MagicMock() + self.taskgroup = params.get('taskgroup', MagicMock()) class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): """Test Recursive chunker functionality""" + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - def test_processor_initialization_basic(self): + def test_processor_initialization_basic(self, mock_producer, mock_consumer): """Test basic processor initialization""" # Arrange config = { @@ -47,8 +51,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): if hasattr(spec, 'name') and spec.name in ['chunk-size', 'chunk-overlap']] assert len(param_specs) == 2 + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_chunk_size_override(self): + async def test_chunk_document_with_chunk_size_override(self, mock_producer, mock_consumer): """Test chunk_document with chunk-size parameter override""" # Arrange config = { @@ -79,8 +85,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 2000 # Should use overridden value assert chunk_overlap == 100 # Should use default value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_chunk_overlap_override(self): + async def test_chunk_document_with_chunk_overlap_override(self, mock_producer, mock_consumer): """Test chunk_document with chunk-overlap parameter override""" # Arrange config = { @@ -111,8 +119,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 1000 # Should use default value assert chunk_overlap == 200 # Should use overridden value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_both_parameters_override(self): + async def test_chunk_document_with_both_parameters_override(self, mock_producer, mock_consumer): """Test chunk_document with both chunk-size and chunk-overlap overrides""" # Arrange config = { @@ -143,9 +153,11 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 1500 # Should use overridden value assert chunk_overlap == 150 # Should use overridden value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.chunking.recursive.chunker.RecursiveCharacterTextSplitter') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_on_message_uses_flow_parameters(self, mock_splitter_class): + async def test_on_message_uses_flow_parameters(self, mock_splitter_class, mock_producer, mock_consumer): """Test that on_message method uses parameters from flow""" # Arrange mock_splitter = MagicMock() @@ -174,6 +186,7 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): collection="test-collection" ) mock_text_doc.text = b"This is test document content" + mock_text_doc.document_id = "" # No librarian fetch needed mock_message.value.return_value = mock_text_doc # Mock consumer and flow with parameter overrides @@ -202,8 +215,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): sent_chunk = mock_producer.send.call_args[0][0] assert isinstance(sent_chunk, Chunk) + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_no_overrides(self): + async def test_chunk_document_with_no_overrides(self, mock_producer, mock_consumer): """Test chunk_document when no parameters are overridden (flow returns None)""" # Arrange config = { diff --git a/tests/unit/test_chunking/test_token_chunker.py b/tests/unit/test_chunking/test_token_chunker.py index 600df930..9030a17f 100644 --- a/tests/unit/test_chunking/test_token_chunker.py +++ b/tests/unit/test_chunking/test_token_chunker.py @@ -17,13 +17,17 @@ class MockAsyncProcessor: self.config_handlers = [] self.id = params.get('id', 'test-service') self.specifications = [] + self.pubsub = MagicMock() + self.taskgroup = params.get('taskgroup', MagicMock()) class TestTokenChunkerSimple(IsolatedAsyncioTestCase): """Test Token chunker functionality""" + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - def test_processor_initialization_basic(self): + def test_processor_initialization_basic(self, mock_producer, mock_consumer): """Test basic processor initialization""" # Arrange config = { @@ -47,8 +51,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): if hasattr(spec, 'name') and spec.name in ['chunk-size', 'chunk-overlap']] assert len(param_specs) == 2 + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_chunk_size_override(self): + async def test_chunk_document_with_chunk_size_override(self, mock_producer, mock_consumer): """Test chunk_document with chunk-size parameter override""" # Arrange config = { @@ -79,8 +85,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 400 # Should use overridden value assert chunk_overlap == 15 # Should use default value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_chunk_overlap_override(self): + async def test_chunk_document_with_chunk_overlap_override(self, mock_producer, mock_consumer): """Test chunk_document with chunk-overlap parameter override""" # Arrange config = { @@ -111,8 +119,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 250 # Should use default value assert chunk_overlap == 25 # Should use overridden value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_both_parameters_override(self): + async def test_chunk_document_with_both_parameters_override(self, mock_producer, mock_consumer): """Test chunk_document with both chunk-size and chunk-overlap overrides""" # Arrange config = { @@ -143,9 +153,11 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 350 # Should use overridden value assert chunk_overlap == 30 # Should use overridden value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.chunking.token.chunker.TokenTextSplitter') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_on_message_uses_flow_parameters(self, mock_splitter_class): + async def test_on_message_uses_flow_parameters(self, mock_splitter_class, mock_producer, mock_consumer): """Test that on_message method uses parameters from flow""" # Arrange mock_splitter = MagicMock() @@ -174,6 +186,7 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): collection="test-collection" ) mock_text_doc.text = b"This is test document content for token chunking" + mock_text_doc.document_id = "" # No librarian fetch needed mock_message.value.return_value = mock_text_doc # Mock consumer and flow with parameter overrides @@ -206,8 +219,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): sent_chunk = mock_producer.send.call_args[0][0] assert isinstance(sent_chunk, Chunk) + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - async def test_chunk_document_with_no_overrides(self): + async def test_chunk_document_with_no_overrides(self, mock_producer, mock_consumer): """Test chunk_document when no parameters are overridden (flow returns None)""" # Arrange config = { @@ -235,8 +250,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): assert chunk_size == 250 # Should use default value assert chunk_overlap == 15 # Should use default value + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) - def test_token_chunker_uses_different_defaults(self): + def test_token_chunker_uses_different_defaults(self, mock_producer, mock_consumer): """Test that token chunker has different defaults than recursive chunker""" # Arrange & Act config = { diff --git a/tests/unit/test_decoding/test_pdf_decoder.py b/tests/unit/test_decoding/test_pdf_decoder.py index b40accdf..1e6021d1 100644 --- a/tests/unit/test_decoding/test_pdf_decoder.py +++ b/tests/unit/test_decoding/test_pdf_decoder.py @@ -12,218 +12,165 @@ from trustgraph.decoding.pdf.pdf_decoder import Processor from trustgraph.schema import Document, TextDocument, Metadata +class MockAsyncProcessor: + def __init__(self, **params): + self.config_handlers = [] + self.id = params.get('id', 'test-service') + self.specifications = [] + self.pubsub = MagicMock() + self.taskgroup = params.get('taskgroup', MagicMock()) + + class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): """Test PDF decoder processor functionality""" - @patch('trustgraph.base.flow_processor.FlowProcessor.__init__') - async def test_processor_initialization(self, mock_flow_init): + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Consumer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Producer') + @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) + async def test_processor_initialization(self, mock_producer, mock_consumer, mock_cs_producer, mock_cs_consumer): """Test PDF decoder processor initialization""" - # Arrange - mock_flow_init.return_value = None - config = { 'id': 'test-pdf-decoder', 'taskgroup': AsyncMock() } - # Act - with patch.object(Processor, 'register_specification') as mock_register: - processor = Processor(**config) + processor = Processor(**config) - # Assert - mock_flow_init.assert_called_once() - # Verify register_specification was called twice (consumer and producer) - assert mock_register.call_count == 2 - # Check consumer spec - consumer_call = mock_register.call_args_list[0] - consumer_spec = consumer_call[0][0] - assert consumer_spec.name == "input" - assert consumer_spec.schema == Document - assert consumer_spec.handler == processor.on_message - - # Check producer spec - producer_call = mock_register.call_args_list[1] - producer_spec = producer_call[0][0] - assert producer_spec.name == "output" - assert producer_spec.schema == TextDocument + consumer_specs = [s for s in processor.specifications if hasattr(s, 'handler')] + assert len(consumer_specs) >= 1 + assert consumer_specs[0].name == "input" + assert consumer_specs[0].schema == Document + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Consumer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Producer') @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') - @patch('trustgraph.base.flow_processor.FlowProcessor.__init__') - async def test_on_message_success(self, mock_flow_init, mock_pdf_loader_class): + @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) + async def test_on_message_success(self, mock_pdf_loader_class, mock_producer, mock_consumer, mock_cs_producer, mock_cs_consumer): """Test successful PDF processing""" - # Arrange - mock_flow_init.return_value = None - # Mock PDF content pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') - + # Mock PyPDFLoader mock_loader = MagicMock() mock_page1 = MagicMock(page_content="Page 1 content") mock_page2 = MagicMock(page_content="Page 2 content") mock_loader.load.return_value = [mock_page1, mock_page2] mock_pdf_loader_class.return_value = mock_loader - + # Mock message mock_metadata = Metadata(id="test-doc") mock_document = Document(metadata=mock_metadata, data=pdf_base64) mock_msg = MagicMock() mock_msg.value.return_value = mock_document - - # Mock flow - needs to be a callable that returns an object with send method + + # Mock flow mock_output_flow = AsyncMock() mock_flow = MagicMock(return_value=mock_output_flow) - + config = { 'id': 'test-pdf-decoder', 'taskgroup': AsyncMock() } - with patch.object(Processor, 'register_specification'): - processor = Processor(**config) + processor = Processor(**config) - # Act await processor.on_message(mock_msg, None, mock_flow) - # Assert - # Verify PyPDFLoader was called - mock_pdf_loader_class.assert_called_once() - mock_loader.load.assert_called_once() - # Verify output was sent for each page assert mock_output_flow.send.call_count == 2 - - # Check first page output - first_call = mock_output_flow.send.call_args_list[0] - first_output = first_call[0][0] - assert isinstance(first_output, TextDocument) - assert first_output.metadata == mock_metadata - assert first_output.text == b"Page 1 content" - - # Check second page output - second_call = mock_output_flow.send.call_args_list[1] - second_output = second_call[0][0] - assert isinstance(second_output, TextDocument) - assert second_output.metadata == mock_metadata - assert second_output.text == b"Page 2 content" + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Consumer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Producer') @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') - @patch('trustgraph.base.flow_processor.FlowProcessor.__init__') - async def test_on_message_empty_pdf(self, mock_flow_init, mock_pdf_loader_class): + @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) + async def test_on_message_empty_pdf(self, mock_pdf_loader_class, mock_producer, mock_consumer, mock_cs_producer, mock_cs_consumer): """Test handling of empty PDF""" - # Arrange - mock_flow_init.return_value = None - - # Mock PDF content pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') - - # Mock PyPDFLoader with no pages + mock_loader = MagicMock() mock_loader.load.return_value = [] mock_pdf_loader_class.return_value = mock_loader - - # Mock message + mock_metadata = Metadata(id="test-doc") mock_document = Document(metadata=mock_metadata, data=pdf_base64) mock_msg = MagicMock() mock_msg.value.return_value = mock_document - - # Mock flow - needs to be a callable that returns an object with send method + mock_output_flow = AsyncMock() mock_flow = MagicMock(return_value=mock_output_flow) - + config = { 'id': 'test-pdf-decoder', 'taskgroup': AsyncMock() } - with patch.object(Processor, 'register_specification'): - processor = Processor(**config) + processor = Processor(**config) - # Act await processor.on_message(mock_msg, None, mock_flow) - # Assert - # Verify PyPDFLoader was called - mock_pdf_loader_class.assert_called_once() - mock_loader.load.assert_called_once() - - # Verify no output was sent mock_output_flow.send.assert_not_called() + @patch('trustgraph.base.chunking_service.Consumer') + @patch('trustgraph.base.chunking_service.Producer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Consumer') + @patch('trustgraph.decoding.pdf.pdf_decoder.Producer') @patch('trustgraph.decoding.pdf.pdf_decoder.PyPDFLoader') - @patch('trustgraph.base.flow_processor.FlowProcessor.__init__') - async def test_on_message_unicode_content(self, mock_flow_init, mock_pdf_loader_class): + @patch('trustgraph.base.async_processor.AsyncProcessor', MockAsyncProcessor) + async def test_on_message_unicode_content(self, mock_pdf_loader_class, mock_producer, mock_consumer, mock_cs_producer, mock_cs_consumer): """Test handling of unicode content in PDF""" - # Arrange - mock_flow_init.return_value = None - - # Mock PDF content pdf_content = b"fake pdf content" pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') - - # Mock PyPDFLoader with unicode content + mock_loader = MagicMock() mock_page = MagicMock(page_content="Page with unicode: 你好世界 🌍") mock_loader.load.return_value = [mock_page] mock_pdf_loader_class.return_value = mock_loader - - # Mock message + mock_metadata = Metadata(id="test-doc") mock_document = Document(metadata=mock_metadata, data=pdf_base64) mock_msg = MagicMock() mock_msg.value.return_value = mock_document - - # Mock flow - needs to be a callable that returns an object with send method + mock_output_flow = AsyncMock() mock_flow = MagicMock(return_value=mock_output_flow) - + config = { 'id': 'test-pdf-decoder', 'taskgroup': AsyncMock() } - with patch.object(Processor, 'register_specification'): - processor = Processor(**config) + processor = Processor(**config) - # Act await processor.on_message(mock_msg, None, mock_flow) - # Assert - # Verify output was sent mock_output_flow.send.assert_called_once() - - # Check output call_args = mock_output_flow.send.call_args[0][0] - assert isinstance(call_args, TextDocument) assert call_args.text == "Page with unicode: 你好世界 🌍".encode('utf-8') @patch('trustgraph.base.flow_processor.FlowProcessor.add_args') def test_add_args(self, mock_parent_add_args): """Test add_args calls parent method""" - # Arrange mock_parser = MagicMock() - - # Act Processor.add_args(mock_parser) - - # Assert mock_parent_add_args.assert_called_once_with(mock_parser) @patch('trustgraph.decoding.pdf.pdf_decoder.Processor.launch') def test_run(self, mock_launch): """Test run function""" - # Act from trustgraph.decoding.pdf.pdf_decoder import run run() - - # Assert - mock_launch.assert_called_once_with("pdf-decoder", - "\nSimple decoder, accepts PDF documents on input, outputs pages from the\nPDF document as text as separate output objects.\n") + mock_launch.assert_called_once_with("pdf-decoder", + "\nSimple decoder, accepts PDF documents on input, outputs pages from the\nPDF document as text as separate output objects.\n\nSupports both inline document data and fetching from librarian via Pulsar\nfor large documents.\n") if __name__ == '__main__': - pytest.main([__file__]) \ No newline at end of file + pytest.main([__file__]) diff --git a/trustgraph-base/trustgraph/api/library.py b/trustgraph-base/trustgraph/api/library.py index e50dc0aa..b61348fc 100644 --- a/trustgraph-base/trustgraph/api/library.py +++ b/trustgraph-base/trustgraph/api/library.py @@ -6,6 +6,7 @@ including document storage, metadata management, and processing workflow coordin """ import datetime +import math import time import base64 import logging @@ -17,6 +18,13 @@ from . exceptions import * logger = logging.getLogger(__name__) +# Threshold for switching to chunked upload (2MB) +# Lower threshold provides progress feedback and resumability on slower connections +CHUNKED_UPLOAD_THRESHOLD = 2 * 1024 * 1024 + +# Default chunk size (5MB - S3 multipart minimum) +DEFAULT_CHUNK_SIZE = 5 * 1024 * 1024 + def to_value(x): """Convert wire format to Uri or Literal.""" @@ -67,13 +75,14 @@ class Library: def add_document( self, document, id, metadata, user, title, comments, - kind="text/plain", tags=[], + kind="text/plain", tags=[], on_progress=None, ): """ Add a document to the library. Stores a document with associated metadata in the library for - retrieval and processing. + retrieval and processing. For large documents (> 10MB), automatically + uses chunked upload for better reliability and progress tracking. Args: document: Document content as bytes @@ -84,6 +93,7 @@ class Library: comments: Document description or comments kind: MIME type of the document (default: "text/plain") tags: List of tags for categorization (default: []) + on_progress: Optional callback(bytes_sent, total_bytes) for progress updates Returns: dict: Response from the add operation @@ -107,6 +117,22 @@ class Library: kind="application/pdf", tags=["research", "physics"] ) + + # Add a large document with progress tracking + def progress(sent, total): + print(f"Uploaded {sent}/{total} bytes ({100*sent//total}%)") + + with open("large_document.pdf", "rb") as f: + library.add_document( + document=f.read(), + id="large-doc-001", + metadata=[], + user="trustgraph", + title="Large Document", + comments="A very large document", + kind="application/pdf", + on_progress=progress + ) ``` """ @@ -124,6 +150,21 @@ class Library: if not title: title = "" if not comments: comments = "" + # Check if we should use chunked upload + if len(document) >= CHUNKED_UPLOAD_THRESHOLD: + return self._add_document_chunked( + document=document, + id=id, + metadata=metadata, + user=user, + title=title, + comments=comments, + kind=kind, + tags=tags, + on_progress=on_progress, + ) + + # Small document: use single operation (existing behavior) triples = [] def emit(t): @@ -167,14 +208,111 @@ class Library: return self.request(input) - def get_documents(self, user): + def _add_document_chunked( + self, document, id, metadata, user, title, comments, + kind, tags, on_progress=None, + ): + """ + Add a large document using chunked upload. + + Internal method that handles multipart upload for large documents. + """ + total_size = len(document) + chunk_size = DEFAULT_CHUNK_SIZE + + logger.info(f"Starting chunked upload for document {id} ({total_size} bytes)") + + # Begin upload session + begin_request = { + "operation": "begin-upload", + "document-metadata": { + "id": id, + "time": int(time.time()), + "kind": kind, + "title": title, + "comments": comments, + "user": user, + "tags": tags, + }, + "total-size": total_size, + "chunk-size": chunk_size, + } + + begin_response = self.request(begin_request) + + upload_id = begin_response.get("upload-id") + if not upload_id: + raise RuntimeError("Failed to begin upload: no upload_id returned") + + actual_chunk_size = begin_response.get("chunk-size", chunk_size) + total_chunks = begin_response.get("total-chunks", math.ceil(total_size / actual_chunk_size)) + + logger.info(f"Upload session {upload_id} created, {total_chunks} chunks") + + try: + # Upload chunks + bytes_sent = 0 + for chunk_index in range(total_chunks): + start = chunk_index * actual_chunk_size + end = min(start + actual_chunk_size, total_size) + chunk_data = document[start:end] + + chunk_request = { + "operation": "upload-chunk", + "upload-id": upload_id, + "chunk-index": chunk_index, + "content": base64.b64encode(chunk_data).decode("utf-8"), + "user": user, + } + + chunk_response = self.request(chunk_request) + + bytes_sent = end + + # Call progress callback if provided + if on_progress: + on_progress(bytes_sent, total_size) + + logger.debug(f"Chunk {chunk_index + 1}/{total_chunks} uploaded") + + # Complete upload + complete_request = { + "operation": "complete-upload", + "upload-id": upload_id, + "user": user, + } + + complete_response = self.request(complete_request) + + logger.info(f"Chunked upload completed for document {id}") + + return complete_response + + except Exception as e: + # Try to abort on failure + logger.error(f"Chunked upload failed: {e}") + try: + abort_request = { + "operation": "abort-upload", + "upload-id": upload_id, + "user": user, + } + self.request(abort_request) + logger.info(f"Aborted failed upload {upload_id}") + except Exception as abort_error: + logger.warning(f"Failed to abort upload: {abort_error}") + raise + + def get_documents(self, user, include_children=False): """ List all documents for a user. Retrieves metadata for all documents owned by the specified user. + By default, only returns top-level documents (not child/extracted documents). Args: user: User identifier + include_children: If True, also include child documents (default: False) Returns: list[DocumentMetadata]: List of document metadata objects @@ -185,18 +323,24 @@ class Library: Example: ```python library = api.library() + + # Get only top-level documents docs = library.get_documents(user="trustgraph") for doc in docs: print(f"{doc.id}: {doc.title} ({doc.kind})") print(f" Uploaded: {doc.time}") print(f" Tags: {', '.join(doc.tags)}") + + # Get all documents including extracted pages + all_docs = library.get_documents(user="trustgraph", include_children=True) ``` """ input = { "operation": "list-documents", "user": user, + "include-children": include_children, } object = self.request(input) @@ -218,7 +362,9 @@ class Library: for w in v["metadata"] ], user = v["user"], - tags = v["tags"] + tags = v["tags"], + parent_id = v.get("parent-id", ""), + document_type = v.get("document-type", "source"), ) for v in object["document-metadatas"] ] @@ -261,7 +407,7 @@ class Library: doc = object["document-metadata"] try: - DocumentMetadata( + return DocumentMetadata( id = doc["id"], time = datetime.datetime.fromtimestamp(doc["time"]), kind = doc["kind"], @@ -276,7 +422,9 @@ class Library: for w in doc["metadata"] ], user = doc["user"], - tags = doc["tags"] + tags = doc["tags"], + parent_id = doc.get("parent-id", ""), + document_type = doc.get("document-type", "source"), ) except Exception as e: logger.error("Failed to parse document response", exc_info=True) @@ -535,3 +683,447 @@ class Library: logger.error("Failed to parse processing list response", exc_info=True) raise ProtocolException(f"Response not formatted correctly") + # Chunked upload management methods + + def get_pending_uploads(self, user): + """ + List all pending (in-progress) uploads for a user. + + Retrieves information about chunked uploads that have been started + but not yet completed. + + Args: + user: User identifier + + Returns: + list[dict]: List of pending upload information + + Example: + ```python + library = api.library() + pending = library.get_pending_uploads(user="trustgraph") + + for upload in pending: + print(f"Upload {upload['upload_id']}:") + print(f" Document: {upload['document_id']}") + print(f" Progress: {upload['chunks_received']}/{upload['total_chunks']}") + ``` + """ + input = { + "operation": "list-uploads", + "user": user, + } + + response = self.request(input) + + return response.get("upload-sessions", []) + + def get_upload_status(self, upload_id, user): + """ + Get the status of a specific upload. + + Retrieves detailed status information about a chunked upload, + including which chunks have been received and which are missing. + + Args: + upload_id: Upload session identifier + user: User identifier + + Returns: + dict: Upload status information including: + - upload_id: The upload session ID + - state: "in-progress", "completed", or "expired" + - chunks_received: Number of chunks received + - total_chunks: Total number of chunks expected + - received_chunks: List of received chunk indices + - missing_chunks: List of missing chunk indices + - bytes_received: Total bytes received + - total_bytes: Total expected bytes + + Example: + ```python + library = api.library() + status = library.get_upload_status( + upload_id="abc-123", + user="trustgraph" + ) + + if status['state'] == 'in-progress': + print(f"Missing chunks: {status['missing_chunks']}") + ``` + """ + input = { + "operation": "get-upload-status", + "upload-id": upload_id, + "user": user, + } + + return self.request(input) + + def abort_upload(self, upload_id, user): + """ + Abort an in-progress upload. + + Cancels a chunked upload and cleans up any uploaded chunks. + + Args: + upload_id: Upload session identifier + user: User identifier + + Returns: + dict: Empty response on success + + Example: + ```python + library = api.library() + library.abort_upload(upload_id="abc-123", user="trustgraph") + ``` + """ + input = { + "operation": "abort-upload", + "upload-id": upload_id, + "user": user, + } + + return self.request(input) + + def resume_upload(self, upload_id, document, user, on_progress=None): + """ + Resume an interrupted upload. + + Continues a chunked upload that was previously interrupted, + uploading only the missing chunks. + + Args: + upload_id: Upload session identifier to resume + document: Complete document content as bytes + user: User identifier + on_progress: Optional callback(bytes_sent, total_bytes) for progress updates + + Returns: + dict: Response from completing the upload + + Example: + ```python + library = api.library() + + # Check what's missing + status = library.get_upload_status( + upload_id="abc-123", + user="trustgraph" + ) + + if status['state'] == 'in-progress': + # Resume with the same document + with open("large_document.pdf", "rb") as f: + library.resume_upload( + upload_id="abc-123", + document=f.read(), + user="trustgraph" + ) + ``` + """ + # Get current status + status = self.get_upload_status(upload_id, user) + + if status.get("upload-state") == "expired": + raise RuntimeError("Upload session has expired, please start a new upload") + + if status.get("upload-state") == "completed": + return {"message": "Upload already completed"} + + missing_chunks = status.get("missing-chunks", []) + total_chunks = status.get("total-chunks", 0) + total_bytes = status.get("total-bytes", len(document)) + chunk_size = total_bytes // total_chunks if total_chunks > 0 else DEFAULT_CHUNK_SIZE + + logger.info(f"Resuming upload {upload_id}, {len(missing_chunks)} chunks remaining") + + # Upload missing chunks + for chunk_index in missing_chunks: + start = chunk_index * chunk_size + end = min(start + chunk_size, len(document)) + chunk_data = document[start:end] + + chunk_request = { + "operation": "upload-chunk", + "upload-id": upload_id, + "chunk-index": chunk_index, + "content": base64.b64encode(chunk_data).decode("utf-8"), + "user": user, + } + + self.request(chunk_request) + + if on_progress: + # Estimate progress including previously uploaded chunks + uploaded = total_chunks - len(missing_chunks) + missing_chunks.index(chunk_index) + 1 + bytes_sent = min(uploaded * chunk_size, total_bytes) + on_progress(bytes_sent, total_bytes) + + logger.debug(f"Resumed chunk {chunk_index}") + + # Complete upload + complete_request = { + "operation": "complete-upload", + "upload-id": upload_id, + "user": user, + } + + return self.request(complete_request) + + # Child document methods + + def add_child_document( + self, document, id, parent_id, user, title, comments, + kind="text/plain", tags=[], metadata=None, + ): + """ + Add a child document linked to a parent document. + + Child documents are typically extracted content (e.g., pages from a PDF). + They are automatically marked with document_type="extracted" and linked + to their parent via parent_id. + + Args: + document: Document content as bytes + id: Document identifier (auto-generated if None) + parent_id: Parent document identifier (required) + user: User/owner identifier + title: Document title + comments: Document description or comments + kind: MIME type of the document (default: "text/plain") + tags: List of tags for categorization (default: []) + metadata: Optional metadata as list of Triple objects + + Returns: + dict: Response from the add operation + + Raises: + RuntimeError: If parent_id is not provided + + Example: + ```python + library = api.library() + + # Add extracted page from a PDF + library.add_child_document( + document=page_text.encode('utf-8'), + id="doc-123-page-1", + parent_id="doc-123", + user="trustgraph", + title="Page 1 of Research Paper", + comments="First page extracted from PDF", + kind="text/plain", + tags=["extracted", "page"] + ) + ``` + """ + if not parent_id: + raise RuntimeError("parent_id is required for child documents") + + if id is None: + id = hash(document) + + if not title: + title = "" + if not comments: + comments = "" + + triples = [] + if metadata: + if isinstance(metadata, list): + triples = [ + { + "s": from_value(t.s), + "p": from_value(t.p), + "o": from_value(t.o), + } + for t in metadata + ] + + input = { + "operation": "add-child-document", + "document-metadata": { + "id": id, + "time": int(time.time()), + "kind": kind, + "title": title, + "comments": comments, + "metadata": triples, + "user": user, + "tags": tags, + "parent-id": parent_id, + "document-type": "extracted", + }, + "content": base64.b64encode(document).decode("utf-8"), + } + + return self.request(input) + + def list_children(self, document_id, user): + """ + List all child documents for a given parent document. + + Args: + document_id: Parent document identifier + user: User identifier + + Returns: + list[DocumentMetadata]: List of child document metadata objects + + Example: + ```python + library = api.library() + children = library.list_children( + document_id="doc-123", + user="trustgraph" + ) + + for child in children: + print(f"{child.id}: {child.title}") + ``` + """ + input = { + "operation": "list-children", + "document-id": document_id, + "user": user, + } + + response = self.request(input) + + try: + return [ + DocumentMetadata( + id=v["id"], + time=datetime.datetime.fromtimestamp(v["time"]), + kind=v["kind"], + title=v["title"], + comments=v.get("comments", ""), + metadata=[ + Triple( + s=to_value(w["s"]), + p=to_value(w["p"]), + o=to_value(w["o"]) + ) + for w in v.get("metadata", []) + ], + user=v["user"], + tags=v.get("tags", []), + parent_id=v.get("parent-id", ""), + document_type=v.get("document-type", "source"), + ) + for v in response.get("document-metadatas", []) + ] + except Exception as e: + logger.error("Failed to parse children response", exc_info=True) + raise ProtocolException("Response not formatted correctly") + + def get_document_content(self, user, id): + """ + Get the content of a document. + + Retrieves the full content of a document as bytes. + + Args: + user: User identifier + id: Document identifier + + Returns: + bytes: Document content + + Example: + ```python + library = api.library() + content = library.get_document_content( + user="trustgraph", + id="doc-123" + ) + + # Write to file + with open("output.pdf", "wb") as f: + f.write(content) + ``` + """ + input = { + "operation": "get-document-content", + "user": user, + "document-id": id, + } + + response = self.request(input) + content_b64 = response.get("content", "") + + return base64.b64decode(content_b64) + + def stream_document_to_file(self, user, id, file_path, chunk_size=1024*1024, on_progress=None): + """ + Stream document content to a file. + + Downloads document content in chunks and writes directly to a file, + enabling memory-efficient handling of large documents. + + Args: + user: User identifier + id: Document identifier + file_path: Path to write the document content + chunk_size: Size of each chunk to download (default 1MB) + on_progress: Optional callback(bytes_received, total_bytes) for progress updates + + Returns: + int: Total bytes written + + Example: + ```python + library = api.library() + + def progress(received, total): + print(f"Downloaded {received}/{total} bytes") + + library.stream_document_to_file( + user="trustgraph", + id="large-doc-123", + file_path="/tmp/document.pdf", + on_progress=progress + ) + ``` + """ + chunk_index = 0 + total_bytes_written = 0 + total_bytes = None + + with open(file_path, "wb") as f: + while True: + input = { + "operation": "stream-document", + "user": user, + "document-id": id, + "chunk-index": chunk_index, + "chunk-size": chunk_size, + } + + response = self.request(input) + + content_b64 = response.get("content", "") + chunk_data = base64.b64decode(content_b64) + + if not chunk_data: + break + + f.write(chunk_data) + total_bytes_written += len(chunk_data) + + total_chunks = response.get("total-chunks", 1) + total_bytes = response.get("total-bytes", total_bytes_written) + + if on_progress: + on_progress(total_bytes_written, total_bytes) + + # Check if we've received all chunks + if chunk_index >= total_chunks - 1: + break + + chunk_index += 1 + + return total_bytes_written + diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index 3b4e476e..47aa5ae0 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -64,6 +64,8 @@ class DocumentMetadata: metadata: List of RDF triples providing structured metadata user: User/owner identifier tags: List of tags for categorization + parent_id: Parent document ID for child documents (empty for top-level docs) + document_type: "source" for uploaded documents, "extracted" for derived content """ id : str time : datetime.datetime @@ -73,6 +75,8 @@ class DocumentMetadata: metadata : List[Triple] user : str tags : List[str] + parent_id : str = "" + document_type : str = "source" @dataclasses.dataclass class ProcessingMetadata: diff --git a/trustgraph-base/trustgraph/base/chunking_service.py b/trustgraph-base/trustgraph/base/chunking_service.py index 2e18a933..890ed3f5 100644 --- a/trustgraph-base/trustgraph/base/chunking_service.py +++ b/trustgraph-base/trustgraph/base/chunking_service.py @@ -1,20 +1,37 @@ """ Base chunking service that provides parameter specification functionality -for chunk-size and chunk-overlap parameters +for chunk-size and chunk-overlap parameters, and librarian client for +fetching large document content. """ +import asyncio +import base64 import logging +import uuid + from .flow_processor import FlowProcessor from .parameter_spec import ParameterSpec +from .consumer import Consumer +from .producer import Producer +from .metrics import ConsumerMetrics, ProducerMetrics + +from ..schema import LibrarianRequest, LibrarianResponse +from ..schema import librarian_request_queue, librarian_response_queue # Module logger logger = logging.getLogger(__name__) +default_librarian_request_queue = librarian_request_queue +default_librarian_response_queue = librarian_response_queue + + class ChunkingService(FlowProcessor): """Base service for chunking processors with parameter specification support""" def __init__(self, **params): + id = params.get("id", "chunker") + # Call parent constructor super(ChunkingService, self).__init__(**params) @@ -27,8 +44,122 @@ class ChunkingService(FlowProcessor): ParameterSpec(name="chunk-overlap") ) + # Librarian client for fetching document content + librarian_request_q = params.get( + "librarian_request_queue", default_librarian_request_queue + ) + librarian_response_q = params.get( + "librarian_response_queue", default_librarian_response_queue + ) + + librarian_request_metrics = ProducerMetrics( + processor=id, flow=None, name="librarian-request" + ) + + self.librarian_request_producer = Producer( + backend=self.pubsub, + topic=librarian_request_q, + schema=LibrarianRequest, + metrics=librarian_request_metrics, + ) + + librarian_response_metrics = ConsumerMetrics( + processor=id, flow=None, name="librarian-response" + ) + + self.librarian_response_consumer = Consumer( + taskgroup=self.taskgroup, + backend=self.pubsub, + flow=None, + topic=librarian_response_q, + subscriber=f"{id}-librarian", + schema=LibrarianResponse, + handler=self.on_librarian_response, + metrics=librarian_response_metrics, + ) + + # Pending librarian requests: request_id -> asyncio.Future + self.pending_requests = {} + logger.debug("ChunkingService initialized with parameter specifications") + async def start(self): + await super(ChunkingService, self).start() + await self.librarian_request_producer.start() + await self.librarian_response_consumer.start() + + async def on_librarian_response(self, msg, consumer, flow): + """Handle responses from the librarian service.""" + response = msg.value() + request_id = msg.properties().get("id") + + if request_id and request_id in self.pending_requests: + future = self.pending_requests.pop(request_id) + future.set_result(response) + else: + logger.warning(f"Received unexpected librarian response: {request_id}") + + async def fetch_document_content(self, document_id, user, timeout=120): + """ + Fetch document content from librarian via Pulsar. + """ + request_id = str(uuid.uuid4()) + + request = LibrarianRequest( + operation="get-document-content", + document_id=document_id, + user=user, + ) + + # Create future for response + future = asyncio.get_event_loop().create_future() + self.pending_requests[request_id] = future + + try: + # Send request + await self.librarian_request_producer.send( + request, properties={"id": request_id} + ) + + # Wait for response + response = await asyncio.wait_for(future, timeout=timeout) + + if response.error: + raise RuntimeError( + f"Librarian error: {response.error.type}: {response.error.message}" + ) + + return response.content + + except asyncio.TimeoutError: + self.pending_requests.pop(request_id, None) + raise RuntimeError(f"Timeout fetching document {document_id}") + + async def get_document_text(self, doc): + """ + Get text content from a TextDocument, fetching from librarian if needed. + + Args: + doc: TextDocument with either inline text or document_id + + Returns: + str: The document text content + """ + if doc.document_id and not doc.text: + logger.info(f"Fetching document {doc.document_id} from librarian...") + content = await self.fetch_document_content( + document_id=doc.document_id, + user=doc.metadata.user, + ) + # Content is base64 encoded + if isinstance(content, str): + content = content.encode('utf-8') + text = base64.b64decode(content).decode("utf-8") + logger.info(f"Fetched {len(text)} characters from librarian") + return text + else: + return doc.text.decode("utf-8") + async def chunk_document(self, msg, consumer, flow, default_chunk_size, default_chunk_overlap): """ Extract chunk parameters from flow and return effective values @@ -59,4 +190,16 @@ class ChunkingService(FlowProcessor): @staticmethod def add_args(parser): """Add chunking service arguments to parser""" - FlowProcessor.add_args(parser) \ No newline at end of file + FlowProcessor.add_args(parser) + + parser.add_argument( + '--librarian-request-queue', + default=default_librarian_request_queue, + help=f'Librarian request queue (default: {default_librarian_request_queue})', + ) + + parser.add_argument( + '--librarian-response-queue', + default=default_librarian_response_queue, + help=f'Librarian response queue (default: {default_librarian_response_queue})', + ) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/messaging/translators/library.py b/trustgraph-base/trustgraph/messaging/translators/library.py index fc355dda..333a803a 100644 --- a/trustgraph-base/trustgraph/messaging/translators/library.py +++ b/trustgraph-base/trustgraph/messaging/translators/library.py @@ -44,14 +44,21 @@ class LibraryRequestTranslator(MessageTranslator): return LibrarianRequest( operation=data.get("operation"), - document_id=data.get("document-id"), - processing_id=data.get("processing-id"), + document_id=data.get("document-id", ""), + processing_id=data.get("processing-id", ""), document_metadata=doc_metadata, processing_metadata=proc_metadata, content=content, - user=data.get("user"), - collection=data.get("collection"), - criteria=criteria + user=data.get("user", ""), + collection=data.get("collection", ""), + criteria=criteria, + # Chunked upload fields + total_size=data.get("total-size", 0), + chunk_size=data.get("chunk-size", 0), + upload_id=data.get("upload-id", ""), + chunk_index=data.get("chunk-index", 0), + # List documents filtering + include_children=data.get("include-children", False), ) def from_pulsar(self, obj: LibrarianRequest) -> Dict[str, Any]: @@ -98,25 +105,71 @@ class LibraryResponseTranslator(MessageTranslator): def from_pulsar(self, obj: LibrarianResponse) -> Dict[str, Any]: result = {} - + + if obj.error: + result["error"] = { + "type": obj.error.type, + "message": obj.error.message, + } + if obj.document_metadata: result["document-metadata"] = self.doc_metadata_translator.from_pulsar(obj.document_metadata) - + if obj.content: result["content"] = obj.content.decode("utf-8") if isinstance(obj.content, bytes) else obj.content - + if obj.document_metadatas is not None: result["document-metadatas"] = [ self.doc_metadata_translator.from_pulsar(dm) for dm in obj.document_metadatas ] - + if obj.processing_metadatas is not None: result["processing-metadatas"] = [ self.proc_metadata_translator.from_pulsar(pm) for pm in obj.processing_metadatas ] - + + # Chunked upload response fields + if obj.upload_id: + result["upload-id"] = obj.upload_id + if obj.chunk_size: + result["chunk-size"] = obj.chunk_size + if obj.total_chunks: + result["total-chunks"] = obj.total_chunks + if obj.chunk_index: + result["chunk-index"] = obj.chunk_index + if obj.chunks_received: + result["chunks-received"] = obj.chunks_received + if obj.bytes_received: + result["bytes-received"] = obj.bytes_received + if obj.total_bytes: + result["total-bytes"] = obj.total_bytes + if obj.document_id: + result["document-id"] = obj.document_id + if obj.object_id: + result["object-id"] = obj.object_id + if obj.upload_state: + result["upload-state"] = obj.upload_state + if obj.received_chunks: + result["received-chunks"] = obj.received_chunks + if obj.missing_chunks: + result["missing-chunks"] = obj.missing_chunks + if obj.upload_sessions: + result["upload-sessions"] = [ + { + "upload-id": s.upload_id, + "document-id": s.document_id, + "document-metadata-json": s.document_metadata_json, + "total-size": s.total_size, + "chunk-size": s.chunk_size, + "total-chunks": s.total_chunks, + "chunks-received": s.chunks_received, + "created-at": s.created_at, + } + for s in obj.upload_sessions + ] + return result def from_response_with_completion(self, obj: LibrarianResponse) -> Tuple[Dict[str, Any], bool]: diff --git a/trustgraph-base/trustgraph/messaging/translators/metadata.py b/trustgraph-base/trustgraph/messaging/translators/metadata.py index 006b222c..46a28d0a 100644 --- a/trustgraph-base/trustgraph/messaging/translators/metadata.py +++ b/trustgraph-base/trustgraph/messaging/translators/metadata.py @@ -20,12 +20,14 @@ class DocumentMetadataTranslator(Translator): comments=data.get("comments"), metadata=self.subgraph_translator.to_pulsar(metadata) if metadata is not None else [], user=data.get("user"), - tags=data.get("tags") + tags=data.get("tags"), + parent_id=data.get("parent-id", ""), + document_type=data.get("document-type", "source"), ) def from_pulsar(self, obj: DocumentMetadata) -> Dict[str, Any]: result = {} - + if obj.id: result["id"] = obj.id if obj.time: @@ -42,7 +44,11 @@ class DocumentMetadataTranslator(Translator): result["user"] = obj.user if obj.tags is not None: result["tags"] = obj.tags - + if obj.parent_id: + result["parent-id"] = obj.parent_id + if obj.document_type: + result["document-type"] = obj.document_type + return result diff --git a/trustgraph-base/trustgraph/schema/knowledge/document.py b/trustgraph-base/trustgraph/schema/knowledge/document.py index d8ce97b4..5d18b265 100644 --- a/trustgraph-base/trustgraph/schema/knowledge/document.py +++ b/trustgraph-base/trustgraph/schema/knowledge/document.py @@ -10,6 +10,9 @@ from ..core.topic import topic class Document: metadata: Metadata | None = None data: bytes = b"" + # For large document streaming: if document_id is set, the receiver should + # fetch content from librarian instead of using inline data + document_id: str = "" ############################################################################ @@ -19,6 +22,9 @@ class Document: class TextDocument: metadata: Metadata | None = None text: bytes = b"" + # For large document streaming: if document_id is set, the receiver should + # fetch content from librarian instead of using inline text + document_id: str = "" ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/services/library.py b/trustgraph-base/trustgraph/schema/services/library.py index 391d49e1..4025977a 100644 --- a/trustgraph-base/trustgraph/schema/services/library.py +++ b/trustgraph-base/trustgraph/schema/services/library.py @@ -49,6 +49,36 @@ from ..core.metadata import Metadata # <- (processing_metadata[]) # <- (error) +# begin-upload +# -> (document_metadata, total_size, chunk_size) +# <- (upload_id, chunk_size, total_chunks) +# <- (error) + +# upload-chunk +# -> (upload_id, chunk_index, content) +# <- (upload_id, chunk_index, chunks_received, total_chunks, bytes_received, total_bytes) +# <- (error) + +# complete-upload +# -> (upload_id) +# <- (document_id, object_id) +# <- (error) + +# abort-upload +# -> (upload_id) +# <- () +# <- (error) + +# get-upload-status +# -> (upload_id) +# <- (upload_id, state, chunks_received, missing_chunks, total_chunks, bytes_received, total_bytes) +# <- (error) + +# list-uploads +# -> (user) +# <- (uploads[]) +# <- (error) + @dataclass class DocumentMetadata: id: str = "" @@ -59,6 +89,9 @@ class DocumentMetadata: metadata: list[Triple] = field(default_factory=list) user: str = "" tags: list[str] = field(default_factory=list) + # Child document support + parent_id: str = "" # Empty for top-level docs, set for children + document_type: str = "source" # "source" or "extracted" @dataclass class ProcessingMetadata: @@ -76,11 +109,33 @@ class Criteria: value: str = "" operator: str = "" +@dataclass +class UploadProgress: + """Progress information for chunked uploads.""" + upload_id: str = "" + chunks_received: int = 0 + total_chunks: int = 0 + bytes_received: int = 0 + total_bytes: int = 0 + +@dataclass +class UploadSession: + """Information about an in-progress upload.""" + upload_id: str = "" + document_id: str = "" + document_metadata_json: str = "" # JSON-encoded DocumentMetadata + total_size: int = 0 + chunk_size: int = 0 + total_chunks: int = 0 + chunks_received: int = 0 + created_at: str = "" + @dataclass class LibrarianRequest: # add-document, remove-document, update-document, get-document-metadata, # get-document-content, add-processing, remove-processing, list-documents, - # list-processing + # list-processing, begin-upload, upload-chunk, complete-upload, abort-upload, + # get-upload-status, list-uploads operation: str = "" # add-document, remove-document, update-document, get-document-metadata, @@ -90,16 +145,16 @@ class LibrarianRequest: # add-processing, remove-processing processing_id: str = "" - # add-document, update-document + # add-document, update-document, begin-upload document_metadata: DocumentMetadata | None = None # add-processing processing_metadata: ProcessingMetadata | None = None - # add-document + # add-document, upload-chunk content: bytes = b"" - # list-documents, list-processing + # list-documents, list-processing, list-uploads user: str = "" # list-documents?, list-processing? @@ -108,6 +163,19 @@ class LibrarianRequest: # criteria: list[Criteria] = field(default_factory=list) + # begin-upload + total_size: int = 0 + chunk_size: int = 0 + + # upload-chunk, complete-upload, abort-upload, get-upload-status + upload_id: str = "" + + # upload-chunk, stream-document + chunk_index: int = 0 + + # list-documents - whether to include child documents (default False) + include_children: bool = False + @dataclass class LibrarianResponse: error: Error | None = None @@ -116,6 +184,29 @@ class LibrarianResponse: document_metadatas: list[DocumentMetadata] = field(default_factory=list) processing_metadatas: list[ProcessingMetadata] = field(default_factory=list) + # begin-upload response + upload_id: str = "" + chunk_size: int = 0 + total_chunks: int = 0 + + # upload-chunk response + chunk_index: int = 0 + chunks_received: int = 0 + bytes_received: int = 0 + total_bytes: int = 0 + + # complete-upload response + document_id: str = "" + object_id: str = "" + + # get-upload-status response + upload_state: str = "" # "in-progress", "completed", "expired" + received_chunks: list[int] = field(default_factory=list) + missing_chunks: list[int] = field(default_factory=list) + + # list-uploads response + upload_sessions: list[UploadSession] = field(default_factory=list) + # FIXME: Is this right? Using persistence on librarian so that # message chunking works diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 6a8b4803..530e448e 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -54,9 +54,7 @@ tg-invoke-prompt = "trustgraph.cli.invoke_prompt:main" tg-invoke-structured-query = "trustgraph.cli.invoke_structured_query:main" tg-load-doc-embeds = "trustgraph.cli.load_doc_embeds:main" tg-load-kg-core = "trustgraph.cli.load_kg_core:main" -tg-load-pdf = "trustgraph.cli.load_pdf:main" tg-load-sample-documents = "trustgraph.cli.load_sample_documents:main" -tg-load-text = "trustgraph.cli.load_text:main" tg-load-turtle = "trustgraph.cli.load_turtle:main" tg-load-knowledge = "trustgraph.cli.load_knowledge:main" tg-load-structured-data = "trustgraph.cli.load_structured_data:main" diff --git a/trustgraph-cli/trustgraph/cli/load_pdf.py b/trustgraph-cli/trustgraph/cli/load_pdf.py deleted file mode 100644 index d305cb4b..00000000 --- a/trustgraph-cli/trustgraph/cli/load_pdf.py +++ /dev/null @@ -1,200 +0,0 @@ -""" -Loads a PDF document into TrustGraph processing by directing to -the pdf-decoder queue. -Consider using tg-add-library-document to load -a document, followed by tg-start-library-processing to initiate processing. -""" - -import hashlib -import argparse -import os -import time -import uuid - -from trustgraph.api import Api -from trustgraph.knowledge import hash, to_uri -from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG -from trustgraph.knowledge import Organization, PublicationEvent -from trustgraph.knowledge import DigitalDocument - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -default_user = 'trustgraph' -default_collection = 'default' - -class Loader: - - def __init__( - self, - url, - flow_id, - user, - collection, - metadata, - ): - - self.api = Api(url).flow().id(flow_id) - - self.user = user - self.collection = collection - self.metadata = metadata - - def load(self, files): - - for file in files: - self.load_file(file) - - def load_file(self, file): - - try: - - path = file - data = open(path, "rb").read() - - # Create a SHA256 hash from the data - id = hash(data) - - id = to_uri(PREF_DOC, id) - - self.metadata.id = id - - self.api.load_document( - document=data, id=id, metadata=self.metadata, - user=self.user, - collection=self.collection, - ) - - print(f"{file}: Loaded successfully.") - - except Exception as e: - print(f"{file}: Failed: {str(e)}", flush=True) - raise e - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-load-pdf', - description=__doc__, - ) - - parser.add_argument( - '-u', '--url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - parser.add_argument( - '-f', '--flow-id', - default="default", - help=f'Flow ID (default: default)' - ) - - parser.add_argument( - '-U', '--user', - default=default_user, - help=f'User ID (default: {default_user})' - ) - - parser.add_argument( - '-C', '--collection', - default=default_collection, - help=f'Collection ID (default: {default_collection})' - ) - - parser.add_argument( - '--name', help=f'Document name' - ) - - parser.add_argument( - '--description', help=f'Document description' - ) - - parser.add_argument( - '--copyright-notice', help=f'Copyright notice' - ) - - parser.add_argument( - '--copyright-holder', help=f'Copyright holder' - ) - - parser.add_argument( - '--copyright-year', help=f'Copyright year' - ) - - parser.add_argument( - '--license', help=f'Copyright license' - ) - - parser.add_argument( - '--publication-organization', help=f'Publication organization' - ) - - parser.add_argument( - '--publication-description', help=f'Publication description' - ) - - parser.add_argument( - '--publication-date', help=f'Publication date' - ) - - parser.add_argument( - '--document-url', help=f'Document URL' - ) - - parser.add_argument( - '--keyword', nargs='+', help=f'Keyword' - ) - - parser.add_argument( - '--identifier', '--id', help=f'Document ID' - ) - - parser.add_argument( - 'files', nargs='+', - help=f'File to load' - ) - - args = parser.parse_args() - - try: - - document = DigitalDocument( - id, - name=args.name, - description=args.description, - copyright_notice=args.copyright_notice, - copyright_holder=args.copyright_holder, - copyright_year=args.copyright_year, - license=args.license, - url=args.document_url, - keywords=args.keyword, - ) - - if args.publication_organization: - org = Organization( - id=to_uri(PREF_ORG, hash(args.publication_organization)), - name=args.publication_organization, - ) - document.publication = PublicationEvent( - id = to_uri(PREF_PUBEV, str(uuid.uuid4())), - organization=org, - description=args.publication_description, - start_date=args.publication_date, - end_date=args.publication_date, - ) - - p = Loader( - url=args.url, - flow_id = args.flow_id, - user=args.user, - collection=args.collection, - metadata=document, - ) - - p.load(args.files) - - except Exception as e: - - print("Exception:", e, flush=True) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/trustgraph-cli/trustgraph/cli/load_text.py b/trustgraph-cli/trustgraph/cli/load_text.py deleted file mode 100644 index 594d1c04..00000000 --- a/trustgraph-cli/trustgraph/cli/load_text.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Loads a text document into TrustGraph processing by directing to a text -loader queue. -Consider using tg-add-library-document to load -a document, followed by tg-start-library-processing to initiate processing. -""" - -import pulsar -from pulsar.schema import JsonSchema -import hashlib -import argparse -import os -import time -import uuid - -from trustgraph.api import Api -from trustgraph.knowledge import hash, to_uri -from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG -from trustgraph.knowledge import Organization, PublicationEvent -from trustgraph.knowledge import DigitalDocument - -default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -default_user = 'trustgraph' -default_collection = 'default' - -class Loader: - - def __init__( - self, - url, - flow_id, - user, - collection, - metadata, - ): - - self.api = Api(url).flow().id(flow_id) - - self.user = user - self.collection = collection - self.metadata = metadata - - def load(self, files): - - for file in files: - self.load_file(file) - - def load_file(self, file): - - try: - - path = file - data = open(path, "rb").read() - - # Create a SHA256 hash from the data - id = hash(data) - - id = to_uri(PREF_DOC, id) - - self.metadata.id = id - - self.api.load_text( - text=data, id=id, metadata=self.metadata, - user=self.user, - collection=self.collection, - ) - - print(f"{file}: Loaded successfully.") - - except Exception as e: - print(f"{file}: Failed: {str(e)}", flush=True) - raise e - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-load-text', - description=__doc__, - ) - - parser.add_argument( - '-u', '--url', - default=default_url, - help=f'API URL (default: {default_url})', - ) - - parser.add_argument( - '-f', '--flow-id', - default="default", - help=f'Flow ID (default: default)' - ) - - parser.add_argument( - '-U', '--user', - default=default_user, - help=f'User ID (default: {default_user})' - ) - - parser.add_argument( - '-C', '--collection', - default=default_collection, - help=f'Collection ID (default: {default_collection})' - ) - - parser.add_argument( - '--name', help=f'Document name' - ) - - parser.add_argument( - '--description', help=f'Document description' - ) - - parser.add_argument( - '--copyright-notice', help=f'Copyright notice' - ) - - parser.add_argument( - '--copyright-holder', help=f'Copyright holder' - ) - - parser.add_argument( - '--copyright-year', help=f'Copyright year' - ) - - parser.add_argument( - '--license', help=f'Copyright license' - ) - - parser.add_argument( - '--publication-organization', help=f'Publication organization' - ) - - parser.add_argument( - '--publication-description', help=f'Publication description' - ) - - parser.add_argument( - '--publication-date', help=f'Publication date' - ) - - parser.add_argument( - '--document-url', help=f'Document URL' - ) - - parser.add_argument( - '--keyword', nargs='+', help=f'Keyword' - ) - - parser.add_argument( - '--identifier', '--id', help=f'Document ID' - ) - - parser.add_argument( - 'files', nargs='+', - help=f'File to load' - ) - - args = parser.parse_args() - - - try: - - document = DigitalDocument( - id, - name=args.name, - description=args.description, - copyright_notice=args.copyright_notice, - copyright_holder=args.copyright_holder, - copyright_year=args.copyright_year, - license=args.license, - url=args.document_url, - keywords=args.keyword, - ) - - if args.publication_organization: - org = Organization( - id=to_uri(PREF_ORG, hash(args.publication_organization)), - name=args.publication_organization, - ) - document.publication = PublicationEvent( - id = to_uri(PREF_PUBEV, str(uuid.uuid4())), - organization=org, - description=args.publication_description, - start_date=args.publication_date, - end_date=args.publication_date, - ) - - p = Loader( - url = args.url, - flow_id = args.flow_id, - user = args.user, - collection = args.collection, - metadata = document, - ) - - p.load(args.files) - - print("All done.") - - except Exception as e: - - print("Exception:", e, flush=True) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index bc6d9cb9..529e1ff1 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -16,6 +16,7 @@ logger = logging.getLogger(__name__) default_ident = "chunker" + class Processor(ChunkingService): def __init__(self, **params): @@ -23,7 +24,7 @@ class Processor(ChunkingService): id = params.get("id", default_ident) chunk_size = params.get("chunk_size", 2000) chunk_overlap = params.get("chunk_overlap", 100) - + super(Processor, self).__init__( **params | { "id": id } ) @@ -69,6 +70,9 @@ class Processor(ChunkingService): v = msg.value() logger.info(f"Chunking document {v.metadata.id}...") + # Get text content (fetches from librarian if needed) + text = await self.get_document_text(v) + # Extract chunk parameters from flow (allows runtime override) chunk_size, chunk_overlap = await self.chunk_document( msg, consumer, flow, @@ -90,9 +94,7 @@ class Processor(ChunkingService): is_separator_regex=False, ) - texts = text_splitter.create_documents( - [v.text.decode("utf-8")] - ) + texts = text_splitter.create_documents([text]) for ix, chunk in enumerate(texts): @@ -133,4 +135,3 @@ class Processor(ChunkingService): def run(): Processor.launch(default_ident, __doc__) - diff --git a/trustgraph-flow/trustgraph/chunking/token/chunker.py b/trustgraph-flow/trustgraph/chunking/token/chunker.py index 876cab07..cfb068f0 100755 --- a/trustgraph-flow/trustgraph/chunking/token/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/token/chunker.py @@ -16,6 +16,7 @@ logger = logging.getLogger(__name__) default_ident = "chunker" + class Processor(ChunkingService): def __init__(self, **params): @@ -68,6 +69,9 @@ class Processor(ChunkingService): v = msg.value() logger.info(f"Chunking document {v.metadata.id}...") + # Get text content (fetches from librarian if needed) + text = await self.get_document_text(v) + # Extract chunk parameters from flow (allows runtime override) chunk_size, chunk_overlap = await self.chunk_document( msg, consumer, flow, @@ -88,9 +92,7 @@ class Processor(ChunkingService): chunk_overlap=chunk_overlap, ) - texts = text_splitter.create_documents( - [v.text.decode("utf-8")] - ) + texts = text_splitter.create_documents([text]) for ix, chunk in enumerate(texts): diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index bb641a26..67cf4200 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -2,21 +2,34 @@ """ Simple decoder, accepts PDF documents on input, outputs pages from the PDF document as text as separate output objects. + +Supports both inline document data and fetching from librarian via Pulsar +for large documents. """ +import asyncio +import os import tempfile import base64 import logging +import uuid from langchain_community.document_loaders import PyPDFLoader from ... schema import Document, TextDocument, Metadata +from ... schema import LibrarianRequest, LibrarianResponse +from ... schema import librarian_request_queue, librarian_response_queue from ... base import FlowProcessor, ConsumerSpec, ProducerSpec +from ... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics # Module logger logger = logging.getLogger(__name__) default_ident = "pdf-decoder" +default_librarian_request_queue = librarian_request_queue +default_librarian_response_queue = librarian_response_queue + + class Processor(FlowProcessor): def __init__(self, **params): @@ -44,8 +57,97 @@ class Processor(FlowProcessor): ) ) + # Librarian client for fetching document content + librarian_request_q = params.get( + "librarian_request_queue", default_librarian_request_queue + ) + librarian_response_q = params.get( + "librarian_response_queue", default_librarian_response_queue + ) + + librarian_request_metrics = ProducerMetrics( + processor = id, flow = None, name = "librarian-request" + ) + + self.librarian_request_producer = Producer( + backend = self.pubsub, + topic = librarian_request_q, + schema = LibrarianRequest, + metrics = librarian_request_metrics, + ) + + librarian_response_metrics = ConsumerMetrics( + processor = id, flow = None, name = "librarian-response" + ) + + self.librarian_response_consumer = Consumer( + taskgroup = self.taskgroup, + backend = self.pubsub, + flow = None, + topic = librarian_response_q, + subscriber = f"{id}-librarian", + schema = LibrarianResponse, + handler = self.on_librarian_response, + metrics = librarian_response_metrics, + ) + + # Pending librarian requests: request_id -> asyncio.Future + self.pending_requests = {} + logger.info("PDF decoder initialized") + async def start(self): + await super(Processor, self).start() + await self.librarian_request_producer.start() + await self.librarian_response_consumer.start() + + async def on_librarian_response(self, msg, consumer, flow): + """Handle responses from the librarian service.""" + response = msg.value() + request_id = msg.properties().get("id") + + if request_id and request_id in self.pending_requests: + future = self.pending_requests.pop(request_id) + future.set_result(response) + else: + logger.warning(f"Received unexpected librarian response: {request_id}") + + async def fetch_document_content(self, document_id, user, timeout=120): + """ + Fetch document content from librarian via Pulsar. + """ + request_id = str(uuid.uuid4()) + + request = LibrarianRequest( + operation="get-document-content", + document_id=document_id, + user=user, + ) + + # Create future for response + future = asyncio.get_event_loop().create_future() + self.pending_requests[request_id] = future + + try: + # Send request + await self.librarian_request_producer.send( + request, properties={"id": request_id} + ) + + # Wait for response + response = await asyncio.wait_for(future, timeout=timeout) + + if response.error: + raise RuntimeError( + f"Librarian error: {response.error.type}: {response.error.message}" + ) + + return response.content + + except asyncio.TimeoutError: + self.pending_requests.pop(request_id, None) + raise RuntimeError(f"Timeout fetching document {document_id}") + async def on_message(self, msg, consumer, flow): logger.debug("PDF message received") @@ -54,26 +156,53 @@ class Processor(FlowProcessor): logger.info(f"Decoding PDF {v.metadata.id}...") - with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: + with tempfile.NamedTemporaryFile(delete_on_close=False, suffix='.pdf') as fp: + temp_path = fp.name - fp.write(base64.b64decode(v.data)) - fp.close() + # Check if we should fetch from librarian or use inline data + if v.document_id: + # Fetch from librarian via Pulsar + logger.info(f"Fetching document {v.document_id} from librarian...") + fp.close() - with open(fp.name, mode='rb') as f: + content = await self.fetch_document_content( + document_id=v.document_id, + user=v.metadata.user, + ) - loader = PyPDFLoader(fp.name) - pages = loader.load() + # Content is base64 encoded + if isinstance(content, str): + content = content.encode('utf-8') + decoded_content = base64.b64decode(content) - for ix, page in enumerate(pages): + with open(temp_path, 'wb') as f: + f.write(decoded_content) - logger.debug(f"Processing page {ix}") + logger.info(f"Fetched {len(decoded_content)} bytes from librarian") + else: + # Use inline data (backward compatibility) + fp.write(base64.b64decode(v.data)) + fp.close() - r = TextDocument( - metadata=v.metadata, - text=page.page_content.encode("utf-8"), - ) + loader = PyPDFLoader(temp_path) + pages = loader.load() - await flow("output").send(r) + for ix, page in enumerate(pages): + + logger.debug(f"Processing page {ix}") + + r = TextDocument( + metadata=v.metadata, + text=page.page_content.encode("utf-8"), + ) + + await flow("output").send(r) + + # Clean up temp file + try: + os.unlink(temp_path) + except OSError: + pass logger.debug("PDF decoding complete") @@ -81,7 +210,18 @@ class Processor(FlowProcessor): def add_args(parser): FlowProcessor.add_args(parser) + parser.add_argument( + '--librarian-request-queue', + default=default_librarian_request_queue, + help=f'Librarian request queue (default: {default_librarian_request_queue})', + ) + + parser.add_argument( + '--librarian-response-queue', + default=default_librarian_response_queue, + help=f'Librarian response queue (default: {default_librarian_response_queue})', + ) + def run(): Processor.launch(default_ident, __doc__) - diff --git a/trustgraph-flow/trustgraph/librarian/blob_store.py b/trustgraph-flow/trustgraph/librarian/blob_store.py index 436e2718..9b138c40 100644 --- a/trustgraph-flow/trustgraph/librarian/blob_store.py +++ b/trustgraph-flow/trustgraph/librarian/blob_store.py @@ -3,9 +3,12 @@ from .. knowledge import hash from .. exceptions import RequestError from minio import Minio +from minio.datatypes import Part import time import io import logging +from typing import Iterator, List, Tuple +from uuid import UUID # Module logger logger = logging.getLogger(__name__) @@ -78,3 +81,141 @@ class BlobStore: return resp.read() + def get_stream(self, object_id, chunk_size: int = 1024 * 1024) -> Iterator[bytes]: + """ + Stream document content in chunks. + + Yields chunks of the document, allowing processing without loading + the entire document into memory. + + Args: + object_id: The UUID of the document object + chunk_size: Size of each chunk in bytes (default 1MB) + + Yields: + Chunks of document content as bytes + """ + resp = self.client.get_object( + bucket_name=self.bucket_name, + object_name="doc/" + str(object_id), + ) + + try: + while True: + chunk = resp.read(chunk_size) + if not chunk: + break + yield chunk + finally: + resp.close() + resp.release_conn() + + logger.debug("Stream complete") + + def create_multipart_upload(self, object_id: UUID, kind: str) -> str: + """ + Initialize a multipart upload. + + Args: + object_id: The UUID for the new object + kind: MIME type of the document + + Returns: + The S3 upload_id for this multipart upload session + """ + object_name = "doc/" + str(object_id) + + # Use minio's internal method to create multipart upload + upload_id = self.client._create_multipart_upload( + bucket_name=self.bucket_name, + object_name=object_name, + headers={"Content-Type": kind}, + ) + + logger.info(f"Created multipart upload {upload_id} for {object_id}") + return upload_id + + def upload_part( + self, + object_id: UUID, + upload_id: str, + part_number: int, + data: bytes + ) -> str: + """ + Upload a single part of a multipart upload. + + Args: + object_id: The UUID of the object being uploaded + upload_id: The S3 upload_id from create_multipart_upload + part_number: Part number (1-indexed, as per S3 spec) + data: The chunk data to upload + + Returns: + The ETag for this part (needed for complete_multipart_upload) + """ + object_name = "doc/" + str(object_id) + + etag = self.client._upload_part( + bucket_name=self.bucket_name, + object_name=object_name, + data=data, + headers={"Content-Length": str(len(data))}, + upload_id=upload_id, + part_number=part_number, + ) + + logger.debug(f"Uploaded part {part_number} for {object_id}, etag={etag}") + return etag + + def complete_multipart_upload( + self, + object_id: UUID, + upload_id: str, + parts: List[Tuple[int, str]] + ) -> None: + """ + Complete a multipart upload, assembling all parts into the final object. + + S3 coalesces the parts server-side - no data transfer through this client. + + Args: + object_id: The UUID of the object + upload_id: The S3 upload_id from create_multipart_upload + parts: List of (part_number, etag) tuples in order + """ + object_name = "doc/" + str(object_id) + + # Convert to Part objects as expected by minio + part_objects = [ + Part(part_number, etag) + for part_number, etag in parts + ] + + self.client._complete_multipart_upload( + bucket_name=self.bucket_name, + object_name=object_name, + upload_id=upload_id, + parts=part_objects, + ) + + logger.info(f"Completed multipart upload for {object_id}") + + def abort_multipart_upload(self, object_id: UUID, upload_id: str) -> None: + """ + Abort a multipart upload, cleaning up any uploaded parts. + + Args: + object_id: The UUID of the object + upload_id: The S3 upload_id from create_multipart_upload + """ + object_name = "doc/" + str(object_id) + + self.client._abort_multipart_upload( + bucket_name=self.bucket_name, + object_name=object_name, + upload_id=upload_id, + ) + + logger.info(f"Aborted multipart upload {upload_id} for {object_id}") + diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 8835cc73..a0bea9e6 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -1,17 +1,24 @@ from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple +from .. schema import UploadSession from .. knowledge import hash from .. exceptions import RequestError from .. tables.library import LibraryTableStore from . blob_store import BlobStore import base64 +import json import logging +import math +import time import uuid # Module logger logger = logging.getLogger(__name__) +# Default chunk size for multipart uploads (5MB - S3 minimum) +DEFAULT_CHUNK_SIZE = 5 * 1024 * 1024 + class Librarian: def __init__( @@ -66,13 +73,7 @@ class Librarian: logger.debug("Add complete") - return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, - processing_metadatas = None, - ) + return LibrarianResponse() async def remove_document(self, request): @@ -84,6 +85,21 @@ class Librarian: ): raise RuntimeError("Document does not exist") + # First, cascade delete all child documents + children = await self.table_store.list_children(request.document_id) + for child in children: + logger.debug(f"Cascade deleting child document {child.id}") + try: + child_object_id = await self.table_store.get_document_object_id( + child.user, + child.id + ) + await self.blob_store.remove(child_object_id) + await self.table_store.remove_document(child.user, child.id) + except Exception as e: + logger.warning(f"Failed to delete child document {child.id}: {e}") + + # Now remove the parent document object_id = await self.table_store.get_document_object_id( request.user, request.document_id @@ -100,13 +116,7 @@ class Librarian: logger.debug("Remove complete") - return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, - processing_metadatas = None, - ) + return LibrarianResponse() async def update_document(self, request): @@ -124,13 +134,7 @@ class Librarian: logger.debug("Update complete") - return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, - processing_metadatas = None, - ) + return LibrarianResponse() async def get_document_metadata(self, request): @@ -147,8 +151,6 @@ class Librarian: error = None, document_metadata = doc, content = None, - document_metadatas = None, - processing_metadatas = None, ) async def get_document_content(self, request): @@ -170,8 +172,6 @@ class Librarian: error = None, document_metadata = None, content = base64.b64encode(content), - document_metadatas = None, - processing_metadatas = None, ) async def add_processing(self, request): @@ -217,13 +217,7 @@ class Librarian: logger.debug("Add complete") - return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, - processing_metadatas = None, - ) + return LibrarianResponse() async def remove_processing(self, request): @@ -243,24 +237,22 @@ class Librarian: logger.debug("Remove complete") - return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, - processing_metadatas = None, - ) + return LibrarianResponse() async def list_documents(self, request): docs = await self.table_store.list_documents(request.user) + # Filter out child documents by default unless include_children is True + include_children = getattr(request, 'include_children', False) + if not include_children: + docs = [ + doc for doc in docs + if not doc.parent_id # Only include top-level documents + ] + return LibrarianResponse( - error = None, - document_metadata = None, - content = None, document_metadatas = docs, - processing_metadatas = None, ) async def list_processing(self, request): @@ -268,10 +260,438 @@ class Librarian: procs = await self.table_store.list_processing(request.user) return LibrarianResponse( - error = None, - document_metadata = None, - content = None, - document_metadatas = None, processing_metadatas = procs, ) + # Chunked upload operations + + async def begin_upload(self, request): + """ + Initialize a chunked upload session. + + Creates an S3 multipart upload and stores session state in Cassandra. + """ + logger.info(f"Beginning chunked upload for document {request.document_metadata.id}") + + if request.document_metadata.kind not in ("text/plain", "application/pdf"): + raise RequestError( + "Invalid document kind: " + request.document_metadata.kind + ) + + if await self.table_store.document_exists( + request.document_metadata.user, + request.document_metadata.id + ): + raise RequestError("Document already exists") + + # Validate sizes + total_size = request.total_size + if total_size <= 0: + raise RequestError("total_size must be positive") + + # Use provided chunk size or default (minimum 5MB for S3) + chunk_size = request.chunk_size if request.chunk_size > 0 else DEFAULT_CHUNK_SIZE + if chunk_size < DEFAULT_CHUNK_SIZE: + chunk_size = DEFAULT_CHUNK_SIZE + + # Calculate total chunks + total_chunks = math.ceil(total_size / chunk_size) + + # Generate IDs + upload_id = str(uuid.uuid4()) + object_id = uuid.uuid4() + + # Create S3 multipart upload + s3_upload_id = self.blob_store.create_multipart_upload( + object_id, request.document_metadata.kind + ) + + # Serialize document metadata for storage + doc_meta_json = json.dumps({ + "id": request.document_metadata.id, + "time": request.document_metadata.time, + "kind": request.document_metadata.kind, + "title": request.document_metadata.title, + "comments": request.document_metadata.comments, + "user": request.document_metadata.user, + "tags": request.document_metadata.tags, + }) + + # Store session in Cassandra + await self.table_store.create_upload_session( + upload_id=upload_id, + user=request.document_metadata.user, + document_id=request.document_metadata.id, + document_metadata=doc_meta_json, + s3_upload_id=s3_upload_id, + object_id=object_id, + total_size=total_size, + chunk_size=chunk_size, + total_chunks=total_chunks, + ) + + logger.info(f"Created upload session {upload_id} with {total_chunks} chunks") + + return LibrarianResponse( + error=None, + upload_id=upload_id, + chunk_size=chunk_size, + total_chunks=total_chunks, + ) + + async def upload_chunk(self, request): + """ + Upload a single chunk of a document. + + Forwards the chunk to S3 and updates session state. + """ + logger.debug(f"Uploading chunk {request.chunk_index} for upload {request.upload_id}") + + # Get session + session = await self.table_store.get_upload_session(request.upload_id) + if session is None: + raise RequestError("Upload session not found or expired") + + # Validate ownership + if session["user"] != request.user: + raise RequestError("Not authorized to upload to this session") + + # Validate chunk index + if request.chunk_index < 0 or request.chunk_index >= session["total_chunks"]: + raise RequestError( + f"Invalid chunk index {request.chunk_index}, " + f"must be 0-{session['total_chunks']-1}" + ) + + # Decode content + content = base64.b64decode(request.content) + + # Upload to S3 (part numbers are 1-indexed in S3) + part_number = request.chunk_index + 1 + etag = self.blob_store.upload_part( + object_id=session["object_id"], + upload_id=session["s3_upload_id"], + part_number=part_number, + data=content, + ) + + # Update session with chunk info + await self.table_store.update_upload_session_chunk( + upload_id=request.upload_id, + chunk_index=request.chunk_index, + etag=etag, + ) + + # Calculate progress + chunks_received = session["chunks_received"] + # Add this chunk if not already present + if request.chunk_index not in chunks_received: + chunks_received[request.chunk_index] = etag + + num_chunks_received = len(chunks_received) + 1 # +1 for this chunk + bytes_received = num_chunks_received * session["chunk_size"] + # Adjust for last chunk potentially being smaller + if bytes_received > session["total_size"]: + bytes_received = session["total_size"] + + logger.debug(f"Chunk {request.chunk_index} uploaded, {num_chunks_received}/{session['total_chunks']} complete") + + return LibrarianResponse( + error=None, + upload_id=request.upload_id, + chunk_index=request.chunk_index, + chunks_received=num_chunks_received, + total_chunks=session["total_chunks"], + bytes_received=bytes_received, + total_bytes=session["total_size"], + ) + + async def complete_upload(self, request): + """ + Finalize a chunked upload and create the document. + + Completes the S3 multipart upload and creates the document metadata. + """ + logger.info(f"Completing upload {request.upload_id}") + + # Get session + session = await self.table_store.get_upload_session(request.upload_id) + if session is None: + raise RequestError("Upload session not found or expired") + + # Validate ownership + if session["user"] != request.user: + raise RequestError("Not authorized to complete this upload") + + # Verify all chunks received + chunks_received = session["chunks_received"] + if len(chunks_received) != session["total_chunks"]: + missing = [ + i for i in range(session["total_chunks"]) + if i not in chunks_received + ] + raise RequestError( + f"Missing chunks: {missing[:10]}{'...' if len(missing) > 10 else ''}" + ) + + # Build parts list for S3 (sorted by part number) + parts = [ + (chunk_index + 1, etag) # S3 part numbers are 1-indexed + for chunk_index, etag in sorted(chunks_received.items()) + ] + + # Complete S3 multipart upload + self.blob_store.complete_multipart_upload( + object_id=session["object_id"], + upload_id=session["s3_upload_id"], + parts=parts, + ) + + # Parse document metadata from session + doc_meta_dict = json.loads(session["document_metadata"]) + + # Create DocumentMetadata object + from .. schema import DocumentMetadata + doc_metadata = DocumentMetadata( + id=doc_meta_dict["id"], + time=doc_meta_dict.get("time", int(time.time())), + kind=doc_meta_dict["kind"], + title=doc_meta_dict.get("title", ""), + comments=doc_meta_dict.get("comments", ""), + user=doc_meta_dict["user"], + tags=doc_meta_dict.get("tags", []), + metadata=[], # Triples not supported in chunked upload yet + ) + + # Add document to table + await self.table_store.add_document(doc_metadata, session["object_id"]) + + # Delete upload session + await self.table_store.delete_upload_session(request.upload_id) + + logger.info(f"Upload {request.upload_id} completed, document {doc_metadata.id} created") + + return LibrarianResponse( + error=None, + document_id=doc_metadata.id, + object_id=str(session["object_id"]), + ) + + async def abort_upload(self, request): + """ + Cancel a chunked upload and clean up resources. + """ + logger.info(f"Aborting upload {request.upload_id}") + + # Get session + session = await self.table_store.get_upload_session(request.upload_id) + if session is None: + raise RequestError("Upload session not found or expired") + + # Validate ownership + if session["user"] != request.user: + raise RequestError("Not authorized to abort this upload") + + # Abort S3 multipart upload + self.blob_store.abort_multipart_upload( + object_id=session["object_id"], + upload_id=session["s3_upload_id"], + ) + + # Delete session from Cassandra + await self.table_store.delete_upload_session(request.upload_id) + + logger.info(f"Upload {request.upload_id} aborted") + + return LibrarianResponse(error=None) + + async def get_upload_status(self, request): + """ + Get the status of an in-progress upload. + """ + logger.debug(f"Getting status for upload {request.upload_id}") + + # Get session + session = await self.table_store.get_upload_session(request.upload_id) + if session is None: + return LibrarianResponse( + error=None, + upload_id=request.upload_id, + upload_state="expired", + ) + + # Validate ownership + if session["user"] != request.user: + raise RequestError("Not authorized to view this upload") + + chunks_received = session["chunks_received"] + received_list = sorted(chunks_received.keys()) + missing_list = [ + i for i in range(session["total_chunks"]) + if i not in chunks_received + ] + + bytes_received = len(chunks_received) * session["chunk_size"] + if bytes_received > session["total_size"]: + bytes_received = session["total_size"] + + return LibrarianResponse( + error=None, + upload_id=request.upload_id, + upload_state="in-progress", + received_chunks=received_list, + missing_chunks=missing_list, + chunks_received=len(chunks_received), + total_chunks=session["total_chunks"], + bytes_received=bytes_received, + total_bytes=session["total_size"], + ) + + async def list_uploads(self, request): + """ + List all in-progress uploads for a user. + """ + logger.debug(f"Listing uploads for user {request.user}") + + sessions = await self.table_store.list_upload_sessions(request.user) + + upload_sessions = [ + UploadSession( + upload_id=s["upload_id"], + document_id=s["document_id"], + document_metadata_json=s.get("document_metadata", ""), + total_size=s["total_size"], + chunk_size=s["chunk_size"], + total_chunks=s["total_chunks"], + chunks_received=s["chunks_received"], + created_at=str(s.get("created_at", "")), + ) + for s in sessions + ] + + return LibrarianResponse( + error=None, + upload_sessions=upload_sessions, + ) + + # Child document operations + + async def add_child_document(self, request): + """ + Add a child document linked to a parent document. + + Child documents are typically extracted content (e.g., pages from a PDF). + They have a parent_id pointing to the source document and document_type + set to "extracted". + """ + logger.info(f"Adding child document {request.document_metadata.id} " + f"for parent {request.document_metadata.parent_id}") + + if not request.document_metadata.parent_id: + raise RequestError("parent_id is required for child documents") + + # Verify parent exists + if not await self.table_store.document_exists( + request.document_metadata.user, + request.document_metadata.parent_id + ): + raise RequestError( + f"Parent document {request.document_metadata.parent_id} does not exist" + ) + + if await self.table_store.document_exists( + request.document_metadata.user, + request.document_metadata.id + ): + raise RequestError("Document already exists") + + # Ensure document_type is set to "extracted" + request.document_metadata.document_type = "extracted" + + # Create object ID for blob + object_id = uuid.uuid4() + + logger.debug("Adding blob...") + + await self.blob_store.add( + object_id, base64.b64decode(request.content), + request.document_metadata.kind + ) + + logger.debug("Adding to table...") + + await self.table_store.add_document( + request.document_metadata, object_id + ) + + logger.debug("Add child document complete") + + return LibrarianResponse( + error=None, + document_id=request.document_metadata.id, + ) + + async def list_children(self, request): + """ + List all child documents for a given parent document. + """ + logger.debug(f"Listing children for parent {request.document_id}") + + children = await self.table_store.list_children(request.document_id) + + return LibrarianResponse( + error=None, + document_metadatas=children, + ) + + async def stream_document(self, request): + """ + Stream document content in chunks. + + This operation returns document content in smaller chunks, allowing + memory-efficient processing of large documents. The response includes + chunk information for reassembly. + + Note: This operation returns a single chunk at a time. Clients should + call repeatedly with increasing chunk_index until all chunks are received. + """ + logger.debug(f"Streaming document {request.document_id}, chunk {request.chunk_index}") + + object_id = await self.table_store.get_document_object_id( + request.user, + request.document_id + ) + + # Default chunk size of 1MB + chunk_size = request.chunk_size if request.chunk_size > 0 else 1024 * 1024 + + # Get the full content and slice out the requested chunk + # Note: This is a simple implementation. For true streaming, we'd need + # range requests on the object storage. + content = await self.blob_store.get(object_id) + total_size = len(content) + total_chunks = math.ceil(total_size / chunk_size) + + if request.chunk_index >= total_chunks: + raise RequestError( + f"Invalid chunk index {request.chunk_index}, " + f"document has {total_chunks} chunks" + ) + + start = request.chunk_index * chunk_size + end = min(start + chunk_size, total_size) + chunk_content = content[start:end] + + logger.debug(f"Returning chunk {request.chunk_index}/{total_chunks}, " + f"bytes {start}-{end} of {total_size}") + + return LibrarianResponse( + error=None, + content=base64.b64encode(chunk_content), + chunk_index=request.chunk_index, + chunks_received=1, # Using as "current chunk" indicator + total_chunks=total_chunks, + bytes_received=end, + total_bytes=total_size, + ) + diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 7c1e428c..d374adc2 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -271,6 +271,9 @@ class Processor(AsyncProcessor): pass + # Threshold for sending document_id instead of inline content (2MB) + STREAMING_THRESHOLD = 2 * 1024 * 1024 + async def load_document(self, document, processing, content): logger.debug("Ready for document processing...") @@ -292,26 +295,57 @@ class Processor(AsyncProcessor): q = flow["interfaces"][kind] if kind == "text-load": - doc = TextDocument( - metadata = Metadata( - id = document.id, - metadata = document.metadata, - user = processing.user, - collection = processing.collection - ), - text = content, - ) + # For large text documents, send document_id for streaming retrieval + if len(content) >= self.STREAMING_THRESHOLD: + logger.info(f"Text document {document.id} is large ({len(content)} bytes), " + f"sending document_id for streaming retrieval") + doc = TextDocument( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + document_id = document.id, + text = b"", # Empty, receiver will fetch via librarian + ) + else: + doc = TextDocument( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + text = content, + ) schema = TextDocument else: - doc = Document( - metadata = Metadata( - id = document.id, - metadata = document.metadata, - user = processing.user, - collection = processing.collection - ), - data = base64.b64encode(content).decode("utf-8") - ) + # For large PDF documents, send document_id for streaming retrieval + # instead of embedding the entire content in the message + if len(content) >= self.STREAMING_THRESHOLD: + logger.info(f"Document {document.id} is large ({len(content)} bytes), " + f"sending document_id for streaming retrieval") + doc = Document( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + document_id = document.id, + data = b"", # Empty data, receiver will fetch via API + ) + else: + doc = Document( + metadata = Metadata( + id = document.id, + metadata = document.metadata, + user = processing.user, + collection = processing.collection + ), + data = base64.b64encode(content).decode("utf-8") + ) schema = Document logger.debug(f"Submitting to queue {q}...") @@ -361,6 +395,17 @@ class Processor(AsyncProcessor): "remove-processing": self.librarian.remove_processing, "list-documents": self.librarian.list_documents, "list-processing": self.librarian.list_processing, + # Chunked upload operations + "begin-upload": self.librarian.begin_upload, + "upload-chunk": self.librarian.upload_chunk, + "complete-upload": self.librarian.complete_upload, + "abort-upload": self.librarian.abort_upload, + "get-upload-status": self.librarian.get_upload_status, + "list-uploads": self.librarian.list_uploads, + # Child document and streaming operations + "add-child-document": self.librarian.add_child_document, + "list-children": self.librarian.list_children, + "stream-document": self.librarian.stream_document, } if v.operation not in impls: diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index 8bbe2bad..11dd9022 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -112,6 +112,34 @@ class LibraryTableStore: ON document (object_id) """); + # Add parent_id and document_type columns for child document support + logger.debug("document table parent_id column...") + + try: + self.cassandra.execute(""" + ALTER TABLE document ADD parent_id text + """); + except Exception as e: + # Column may already exist + if "already exists" not in str(e).lower() and "Invalid column name" not in str(e): + logger.debug(f"parent_id column may already exist: {e}") + + try: + self.cassandra.execute(""" + ALTER TABLE document ADD document_type text + """); + except Exception as e: + # Column may already exist + if "already exists" not in str(e).lower() and "Invalid column name" not in str(e): + logger.debug(f"document_type column may already exist: {e}") + + logger.debug("document parent index...") + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS document_parent + ON document (parent_id) + """); + logger.debug("processing table...") self.cassandra.execute(""" @@ -127,6 +155,32 @@ class LibraryTableStore: ); """); + logger.debug("upload_session table...") + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS upload_session ( + upload_id text PRIMARY KEY, + user text, + document_id text, + document_metadata text, + s3_upload_id text, + object_id uuid, + total_size bigint, + chunk_size int, + total_chunks int, + chunks_received map, + created_at timestamp, + updated_at timestamp + ) WITH default_time_to_live = 86400; + """); + + logger.debug("upload_session user index...") + + self.cassandra.execute(""" + CREATE INDEX IF NOT EXISTS upload_session_user + ON upload_session (user) + """); + logger.info("Cassandra schema OK.") def prepare_statements(self): @@ -136,9 +190,10 @@ class LibraryTableStore: ( id, user, time, kind, title, comments, - metadata, tags, object_id + metadata, tags, object_id, + parent_id, document_type ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """) self.update_document_stmt = self.cassandra.prepare(""" @@ -149,7 +204,8 @@ class LibraryTableStore: """) self.get_document_stmt = self.cassandra.prepare(""" - SELECT time, kind, title, comments, metadata, tags, object_id + SELECT time, kind, title, comments, metadata, tags, object_id, + parent_id, document_type FROM document WHERE user = ? AND id = ? """) @@ -168,14 +224,16 @@ class LibraryTableStore: self.list_document_stmt = self.cassandra.prepare(""" SELECT - id, time, kind, title, comments, metadata, tags, object_id + id, time, kind, title, comments, metadata, tags, object_id, + parent_id, document_type FROM document WHERE user = ? """) self.list_document_by_tag_stmt = self.cassandra.prepare(""" SELECT - id, time, kind, title, comments, metadata, tags, object_id + id, time, kind, title, comments, metadata, tags, object_id, + parent_id, document_type FROM document WHERE user = ? AND tags CONTAINS ? ALLOW FILTERING @@ -210,6 +268,57 @@ class LibraryTableStore: WHERE user = ? """) + # Upload session prepared statements + self.insert_upload_session_stmt = self.cassandra.prepare(""" + INSERT INTO upload_session + ( + upload_id, user, document_id, document_metadata, + s3_upload_id, object_id, total_size, chunk_size, + total_chunks, chunks_received, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """) + + self.get_upload_session_stmt = self.cassandra.prepare(""" + SELECT + upload_id, user, document_id, document_metadata, + s3_upload_id, object_id, total_size, chunk_size, + total_chunks, chunks_received, created_at, updated_at + FROM upload_session + WHERE upload_id = ? + """) + + self.update_upload_session_chunk_stmt = self.cassandra.prepare(""" + UPDATE upload_session + SET chunks_received = chunks_received + ?, + updated_at = ? + WHERE upload_id = ? + """) + + self.delete_upload_session_stmt = self.cassandra.prepare(""" + DELETE FROM upload_session + WHERE upload_id = ? + """) + + self.list_upload_sessions_stmt = self.cassandra.prepare(""" + SELECT + upload_id, document_id, document_metadata, + total_size, chunk_size, total_chunks, + chunks_received, created_at, updated_at + FROM upload_session + WHERE user = ? + """) + + # Child document queries + self.list_children_stmt = self.cassandra.prepare(""" + SELECT + id, user, time, kind, title, comments, metadata, tags, + object_id, parent_id, document_type + FROM document + WHERE parent_id = ? + ALLOW FILTERING + """) + async def document_exists(self, user, id): resp = self.cassandra.execute( @@ -236,6 +345,10 @@ class LibraryTableStore: for v in document.metadata ] + # Get parent_id and document_type from document, defaulting if not set + parent_id = getattr(document, 'parent_id', '') or '' + document_type = getattr(document, 'document_type', 'source') or 'source' + while True: try: @@ -245,7 +358,8 @@ class LibraryTableStore: ( document.id, document.user, int(document.time * 1000), document.kind, document.title, document.comments, - metadata, document.tags, object_id + metadata, document.tags, object_id, + parent_id, document_type ) ) @@ -349,9 +463,58 @@ class LibraryTableStore: p=tuple_to_term(m[2], m[3]), o=tuple_to_term(m[4], m[5]) ) - for m in row[5] + for m in (row[5] or []) ], tags = row[6] if row[6] else [], + parent_id = row[8] if row[8] else "", + document_type = row[9] if row[9] else "source", + ) + for row in resp + ] + + logger.debug("Done") + + return lst + + async def list_children(self, parent_id): + """List all child documents for a given parent document ID.""" + + logger.debug(f"List children for parent {parent_id}") + + while True: + + try: + + resp = self.cassandra.execute( + self.list_children_stmt, + (parent_id,) + ) + + break + + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + lst = [ + DocumentMetadata( + id = row[0], + user = row[1], + time = int(time.mktime(row[2].timetuple())), + kind = row[3], + title = row[4], + comments = row[5], + metadata = [ + Triple( + s=tuple_to_term(m[0], m[1]), + p=tuple_to_term(m[2], m[3]), + o=tuple_to_term(m[4], m[5]) + ) + for m in (row[6] or []) + ], + tags = row[7] if row[7] else [], + parent_id = row[9] if row[9] else "", + document_type = row[10] if row[10] else "source", ) for row in resp ] @@ -394,9 +557,11 @@ class LibraryTableStore: p=tuple_to_term(m[2], m[3]), o=tuple_to_term(m[4], m[5]) ) - for m in row[4] + for m in (row[4] or []) ], tags = row[5] if row[5] else [], + parent_id = row[7] if row[7] else "", + document_type = row[8] if row[8] else "source", ) logger.debug("Done") @@ -532,3 +697,152 @@ class LibraryTableStore: logger.debug("Done") return lst + + # Upload session methods + + async def create_upload_session( + self, + upload_id, + user, + document_id, + document_metadata, + s3_upload_id, + object_id, + total_size, + chunk_size, + total_chunks, + ): + """Create a new upload session for chunked upload.""" + + logger.info(f"Creating upload session {upload_id}") + + now = int(time.time() * 1000) + + while True: + try: + self.cassandra.execute( + self.insert_upload_session_stmt, + ( + upload_id, user, document_id, document_metadata, + s3_upload_id, object_id, total_size, chunk_size, + total_chunks, {}, now, now + ) + ) + break + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + logger.debug("Upload session created") + + async def get_upload_session(self, upload_id): + """Get an upload session by ID.""" + + logger.debug(f"Get upload session {upload_id}") + + while True: + try: + resp = self.cassandra.execute( + self.get_upload_session_stmt, + (upload_id,) + ) + break + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + for row in resp: + session = { + "upload_id": row[0], + "user": row[1], + "document_id": row[2], + "document_metadata": row[3], + "s3_upload_id": row[4], + "object_id": row[5], + "total_size": row[6], + "chunk_size": row[7], + "total_chunks": row[8], + "chunks_received": row[9] if row[9] else {}, + "created_at": row[10], + "updated_at": row[11], + } + logger.debug("Done") + return session + + return None + + async def update_upload_session_chunk(self, upload_id, chunk_index, etag): + """Record a successfully uploaded chunk.""" + + logger.debug(f"Update upload session {upload_id} chunk {chunk_index}") + + now = int(time.time() * 1000) + + while True: + try: + self.cassandra.execute( + self.update_upload_session_chunk_stmt, + ( + {chunk_index: etag}, + now, + upload_id + ) + ) + break + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + logger.debug("Chunk recorded") + + async def delete_upload_session(self, upload_id): + """Delete an upload session.""" + + logger.info(f"Deleting upload session {upload_id}") + + while True: + try: + self.cassandra.execute( + self.delete_upload_session_stmt, + (upload_id,) + ) + break + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + logger.debug("Upload session deleted") + + async def list_upload_sessions(self, user): + """List all upload sessions for a user.""" + + logger.debug(f"List upload sessions for {user}") + + while True: + try: + resp = self.cassandra.execute( + self.list_upload_sessions_stmt, + (user,) + ) + break + except Exception as e: + logger.error("Exception occurred", exc_info=True) + raise e + + sessions = [] + for row in resp: + chunks_received = row[6] if row[6] else {} + sessions.append({ + "upload_id": row[0], + "document_id": row[1], + "document_metadata": row[2], + "total_size": row[3], + "chunk_size": row[4], + "total_chunks": row[5], + "chunks_received": len(chunks_received), + "created_at": row[7], + "updated_at": row[8], + }) + + logger.debug("Done") + return sessions