diff --git a/docs/tech-specs/extraction-time-provenance.md b/docs/tech-specs/extraction-time-provenance.md index 2f5761dd..72c1c971 100644 --- a/docs/tech-specs/extraction-time-provenance.md +++ b/docs/tech-specs/extraction-time-provenance.md @@ -1,46 +1,616 @@ # Extraction-Time Provenance: Source Layer -## Status - -Notes - Not yet started - ## Overview This document captures notes on extraction-time provenance for future specification work. Extraction-time provenance records the "source layer" - where data came from originally, how it was extracted and transformed. This is separate from query-time provenance (see `query-time-provenance.md`) which records agent reasoning. -## Current State +## Problem Statement -Source metadata is already partially stored in the knowledge graph (~40% solved): -- Documents have source URLs, timestamps -- Some extraction metadata exists +### Current Implementation -## Scope +Provenance currently works as follows: +- Document metadata is stored as RDF triples in the knowledge graph +- A document ID ties metadata to the document, so the document appears as a node in the graph +- When edges (relationships/facts) are extracted from documents, a `subjectOf` relationship links the extracted edge back to the source document -Extraction-time provenance should capture: +### Problems with Current Approach -### Source Layer (Origin) -- URL / file path -- Retrieval timestamp -- Funding sources -- Authorship / authority -- Document metadata (title, date, version) +1. **Repetitive metadata loading:** Document metadata is bundled and loaded repeatedly with every batch of triples extracted from that document. This is wasteful and redundant - the same metadata travels as cargo with every extraction output. -### Transformation Layer (Extraction) -- Extraction tool used (e.g., PDF parser, table extractor) -- Extraction method / version -- Confidence scores -- Raw-to-structured mapping -- Parent-child relationships (PDF → table → row → fact) +2. **Shallow provenance:** The current `subjectOf` relationship only links facts directly to the top-level document. There is no visibility into the transformation chain - which page the fact came from, which chunk, what extraction method was used. -## Key Questions for Future Spec +### Desired State -1. What metadata is already captured today? -2. What gaps exist? -3. How to structure the extraction DAG? -4. How does query-time provenance link to extraction-time nodes? -5. Storage format - RDF triples? Separate schema? +1. **Load metadata once:** Document metadata should be loaded once and attached to the top-level document node, not repeated with every triple batch. + +2. **Rich provenance DAG:** Capture the full transformation chain from source document through all intermediate artifacts down to extracted facts. For example, a PDF document transformation: + + ``` + PDF file (source document with metadata) + → Page 1 (decoded text) + → Chunk 1 + → Extracted edge/fact (via subjectOf) + → Extracted edge/fact + → Chunk 2 + → Extracted edge/fact + → Page 2 + → Chunk 3 + → ... + ``` + +3. **Unified storage:** The provenance DAG is stored in the same knowledge graph as the extracted knowledge. This allows provenance to be queried the same way as knowledge - following edges back up the chain from any fact to its exact source location. + +4. **Stable IDs:** Each intermediate artifact (page, chunk) has a stable ID as a node in the graph. + +5. **Parent-child linking:** Derived documents are linked to their parents all the way up to the top-level source document using consistent relationship types. + +6. **Precise fact attribution:** The `subjectOf` relationship on extracted edges points to the immediate parent (chunk), not the top-level document. Full provenance is recovered by traversing up the DAG. + +## Use Cases + +### UC1: Source Attribution in GraphRAG Responses + +**Scenario:** A user runs a GraphRAG query and receives a response from the agent. + +**Flow:** +1. User submits a query to the GraphRAG agent +2. Agent retrieves relevant facts from the knowledge graph to formulate a response +3. Per the query-time provenance spec, the agent reports which facts contributed to the response +4. Each fact links to its source chunk via the provenance DAG +5. Chunks link to pages, pages link to source documents + +**UX Outcome:** The interface displays the LLM response alongside source attribution. The user can: +- See which facts supported the response +- Drill down from facts → chunks → pages → documents +- Peruse the original source documents to verify claims +- Understand exactly where in a document (which page, which section) a fact originated + +**Value:** Users can verify AI-generated responses against primary sources, building trust and enabling fact-checking. + +### UC2: Debugging Extraction Quality + +A fact looks wrong. Trace back through chunk → page → document to see the original text. Was it a bad extraction, or was the source itself wrong? + +### UC3: Incremental Re-extraction + +Source document gets updated. Which chunks/facts were derived from it? Invalidate and regenerate just those, rather than re-processing everything. + +### UC4: Data Deletion / Right to be Forgotten + +A source document must be removed (GDPR, legal, etc.). Traverse the DAG to find and remove all derived facts. + +### UC5: Conflict Resolution + +Two facts contradict each other. Trace both back to their sources to understand why and decide which to trust (more authoritative source, more recent, etc.). + +### UC6: Source Authority Weighting + +Some sources are more authoritative than others. Facts can be weighted or filtered based on the authority/quality of their origin documents. + +### UC7: Extraction Pipeline Comparison + +Compare outputs from different extraction methods/versions. Which extractor produced better facts from the same source? + +## Integration Points + +### Librarian + +The librarian component already provides document storage with unique document IDs. The provenance system integrates with this existing infrastructure. + +#### Existing Capabilities (already implemented) + +**Parent-Child Document Linking:** +- `parent_id` field in `DocumentMetadata` - links child to parent document +- `document_type` field - values: `"source"` (original) or `"extracted"` (derived) +- `add-child-document` API - creates child document with automatic `document_type = "extracted"` +- `list-children` API - retrieves all children of a parent document +- Cascade deletion - removing a parent automatically deletes all child documents + +**Document Identification:** +- Document IDs are client-specified (not auto-generated) +- Documents keyed by composite `(user, document_id)` in Cassandra +- Object IDs (UUIDs) generated internally for blob storage + +**Metadata Support:** +- `metadata: list[Triple]` field - RDF triples for structured metadata +- `title`, `comments`, `tags` - basic document metadata +- `time` - timestamp, `kind` - MIME type + +**Storage Architecture:** +- Metadata stored in Cassandra (`librarian` keyspace, `document` table) +- Content stored in MinIO/S3 blob storage (`library` bucket) +- Smart content delivery: documents < 2MB embedded, larger documents streamed + +#### Key Files + +- `trustgraph-flow/trustgraph/librarian/librarian.py` - Core librarian operations +- `trustgraph-flow/trustgraph/librarian/service.py` - Service processor, document loading +- `trustgraph-flow/trustgraph/tables/library.py` - Cassandra table store +- `trustgraph-base/trustgraph/schema/services/library.py` - Schema definitions + +#### Gaps to Address + +The librarian has the building blocks but currently: +1. Parent-child linking is one level deep - no multi-level DAG traversal helpers +2. No standard relationship type vocabulary (e.g., `derivedFrom`, `extractedFrom`) +3. Provenance metadata (extraction method, confidence, chunk position) not standardized +4. No query API to traverse the full provenance chain from a fact back to source + +## End-to-End Flow Design + +Each processor in the pipeline follows a consistent pattern: +- Receive document ID from upstream +- Fetch content from librarian +- Produce child artifacts +- For each child: save to librarian, emit edge to graph, forward ID downstream + +### Processing Flows + +There are two flows depending on document type: + +#### PDF Document Flow + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ Librarian (initiate processing) │ +│ 1. Emit root document metadata to knowledge graph (once) │ +│ 2. Send root document ID to PDF extractor │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ PDF Extractor (per page) │ +│ 1. Fetch PDF content from librarian using document ID │ +│ 2. Extract pages as text │ +│ 3. For each page: │ +│ a. Save page as child document in librarian (parent = root doc) │ +│ b. Emit parent-child edge to knowledge graph │ +│ c. Send page document ID to chunker │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ Chunker (per chunk) │ +│ 1. Fetch page content from librarian using document ID │ +│ 2. Split text into chunks │ +│ 3. For each chunk: │ +│ a. Save chunk as child document in librarian (parent = page) │ +│ b. Emit parent-child edge to knowledge graph │ +│ c. Send chunk document ID + chunk content to next processor │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ + ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + Post-chunker optimization: messages carry both + chunk ID (for provenance) and content (to avoid + librarian round-trip). Chunks are small (2-4KB). + ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ Knowledge Extractor (per chunk) │ +│ 1. Receive chunk ID + content directly (no librarian fetch needed) │ +│ 2. Extract facts/triples and embeddings from chunk content │ +│ 3. For each triple: │ +│ a. Emit triple to knowledge graph │ +│ b. Emit reified edge linking triple → chunk ID (edge pointing │ +│ to edge - first use of reification support) │ +│ 4. For each embedding: │ +│ a. Emit embedding with its entity ID │ +│ b. Link entity ID → chunk ID in knowledge graph │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +#### Text Document Flow + +Text documents skip the PDF extractor and go directly to the chunker: + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ Librarian (initiate processing) │ +│ 1. Emit root document metadata to knowledge graph (once) │ +│ 2. Send root document ID directly to chunker (skip PDF extractor) │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ Chunker (per chunk) │ +│ 1. Fetch text content from librarian using document ID │ +│ 2. Split text into chunks │ +│ 3. For each chunk: │ +│ a. Save chunk as child document in librarian (parent = root doc) │ +│ b. Emit parent-child edge to knowledge graph │ +│ c. Send chunk document ID + chunk content to next processor │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ Knowledge Extractor │ +│ (same as PDF flow) │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +The resulting DAG is one level shorter: + +``` +PDF: Document → Pages → Chunks → Triples/Embeddings +Text: Document → Chunks → Triples/Embeddings +``` + +The design accommodates both because the chunker treats its input generically - it uses whatever document ID it receives as the parent, regardless of whether that's a source document or a page. + +### Metadata Schema (PROV-O) + +Provenance metadata uses the W3C PROV-O ontology. This provides a standard vocabulary and enables future signing/authentication of extraction outputs. + +#### PROV-O Core Concepts + +| PROV-O Type | TrustGraph Usage | +|-------------|------------------| +| `prov:Entity` | Document, Page, Chunk, Triple, Embedding | +| `prov:Activity` | Instances of extraction operations | +| `prov:Agent` | TG components (PDF extractor, chunker, etc.) with versions | + +#### PROV-O Relationships + +| Predicate | Meaning | Example | +|-----------|---------|---------| +| `prov:wasDerivedFrom` | Entity derived from another entity | Page wasDerivedFrom Document | +| `prov:wasGeneratedBy` | Entity generated by an activity | Page wasGeneratedBy PDFExtractionActivity | +| `prov:used` | Activity used an entity as input | PDFExtractionActivity used Document | +| `prov:wasAssociatedWith` | Activity performed by an agent | PDFExtractionActivity wasAssociatedWith tg:PDFExtractor | + +#### Metadata at Each Level + +**Source Document (emitted by Librarian):** +``` +doc:123 a prov:Entity . +doc:123 dc:title "Research Paper" . +doc:123 dc:source . +doc:123 dc:date "2024-01-15" . +doc:123 dc:creator "Author Name" . +doc:123 tg:pageCount 42 . +doc:123 tg:mimeType "application/pdf" . +``` + +**Page (emitted by PDF Extractor):** +``` +page:123-1 a prov:Entity . +page:123-1 prov:wasDerivedFrom doc:123 . +page:123-1 prov:wasGeneratedBy activity:pdf-extract-456 . +page:123-1 tg:pageNumber 1 . + +activity:pdf-extract-456 a prov:Activity . +activity:pdf-extract-456 prov:used doc:123 . +activity:pdf-extract-456 prov:wasAssociatedWith tg:PDFExtractor . +activity:pdf-extract-456 tg:componentVersion "1.2.3" . +activity:pdf-extract-456 prov:startedAtTime "2024-01-15T10:30:00Z" . +``` + +**Chunk (emitted by Chunker):** +``` +chunk:123-1-1 a prov:Entity . +chunk:123-1-1 prov:wasDerivedFrom page:123-1 . +chunk:123-1-1 prov:wasGeneratedBy activity:chunk-789 . +chunk:123-1-1 tg:chunkIndex 1 . +chunk:123-1-1 tg:charOffset 0 . +chunk:123-1-1 tg:charLength 2048 . + +activity:chunk-789 a prov:Activity . +activity:chunk-789 prov:used page:123-1 . +activity:chunk-789 prov:wasAssociatedWith tg:Chunker . +activity:chunk-789 tg:componentVersion "1.0.0" . +activity:chunk-789 tg:chunkSize 2048 . +activity:chunk-789 tg:chunkOverlap 200 . +``` + +**Triple (emitted by Knowledge Extractor):** +``` +# The extracted triple (edge) +entity:JohnSmith rel:worksAt entity:AcmeCorp . + +# Statement object pointing at the edge (RDF 1.2 reification) +stmt:001 tg:reifies <> . +stmt:001 prov:wasDerivedFrom chunk:123-1-1 . +stmt:001 prov:wasGeneratedBy activity:extract-999 . + +activity:extract-999 a prov:Activity . +activity:extract-999 prov:used chunk:123-1-1 . +activity:extract-999 prov:wasAssociatedWith tg:KnowledgeExtractor . +activity:extract-999 tg:componentVersion "2.1.0" . +activity:extract-999 tg:llmModel "claude-3" . +activity:extract-999 tg:ontology . +``` + +**Embedding (stored in vector store, not triple store):** + +Embeddings are stored in the vector store with metadata, not as RDF triples. Each embedding record contains: + +| Field | Description | Example | +|-------|-------------|---------| +| vector | The embedding vector | [0.123, -0.456, ...] | +| entity | Node URI the embedding represents | `entity:JohnSmith` | +| chunk_id | Source chunk (provenance) | `chunk:123-1-1` | +| model | Embedding model used | `text-embedding-ada-002` | +| component_version | TG embedder version | `1.0.0` | + +The `entity` field links the embedding to the knowledge graph (node URI). The `chunk_id` field provides provenance back to the source chunk, enabling traversal up the DAG to the original document. + +#### TrustGraph Namespace Extensions + +Custom predicates under the `tg:` namespace for extraction-specific metadata: + +| Predicate | Domain | Description | +|-----------|--------|-------------| +| `tg:reifies` | Statement | Points at the triple this statement object represents | +| `tg:pageCount` | Document | Total number of pages in source document | +| `tg:mimeType` | Document | MIME type of source document | +| `tg:pageNumber` | Page | Page number in source document | +| `tg:chunkIndex` | Chunk | Index of chunk within parent | +| `tg:charOffset` | Chunk | Character offset in parent text | +| `tg:charLength` | Chunk | Length of chunk in characters | +| `tg:chunkSize` | Activity | Configured chunk size | +| `tg:chunkOverlap` | Activity | Configured overlap between chunks | +| `tg:componentVersion` | Activity | Version of TG component | +| `tg:llmModel` | Activity | LLM used for extraction | +| `tg:ontology` | Activity | Ontology URI used to guide extraction | +| `tg:embeddingModel` | Activity | Model used for embeddings | +| `tg:sourceText` | Statement | Exact text from which a triple was extracted | +| `tg:sourceCharOffset` | Statement | Character offset within chunk where source text starts | +| `tg:sourceCharLength` | Statement | Length of source text in characters | + +#### Vocabulary Bootstrap (Per Collection) + +The knowledge graph is ontology-neutral and initialises empty. When writing PROV-O provenance data to a collection for the first time, the vocabulary must be bootstrapped with RDF labels for all classes and predicates. This ensures human-readable display in queries and UI. + +**PROV-O Classes:** +``` +prov:Entity rdfs:label "Entity" . +prov:Activity rdfs:label "Activity" . +prov:Agent rdfs:label "Agent" . +``` + +**PROV-O Predicates:** +``` +prov:wasDerivedFrom rdfs:label "was derived from" . +prov:wasGeneratedBy rdfs:label "was generated by" . +prov:used rdfs:label "used" . +prov:wasAssociatedWith rdfs:label "was associated with" . +prov:startedAtTime rdfs:label "started at" . +``` + +**TrustGraph Predicates:** +``` +tg:reifies rdfs:label "reifies" . +tg:pageCount rdfs:label "page count" . +tg:mimeType rdfs:label "MIME type" . +tg:pageNumber rdfs:label "page number" . +tg:chunkIndex rdfs:label "chunk index" . +tg:charOffset rdfs:label "character offset" . +tg:charLength rdfs:label "character length" . +tg:chunkSize rdfs:label "chunk size" . +tg:chunkOverlap rdfs:label "chunk overlap" . +tg:componentVersion rdfs:label "component version" . +tg:llmModel rdfs:label "LLM model" . +tg:ontology rdfs:label "ontology" . +tg:embeddingModel rdfs:label "embedding model" . +tg:sourceText rdfs:label "source text" . +tg:sourceCharOffset rdfs:label "source character offset" . +tg:sourceCharLength rdfs:label "source character length" . +``` + +**Implementation note:** This vocabulary bootstrap should be idempotent - safe to run multiple times without creating duplicates. Could be triggered on first document processing in a collection, or as a separate collection initialisation step. + +#### Sub-Chunk Provenance (Aspirational) + +For finer-grained provenance, it would be valuable to record exactly where within a chunk a triple was extracted from. This enables: + +- Highlighting the exact source text in the UI +- Verifying extraction accuracy against source +- Debugging extraction quality at the sentence level + +**Example with position tracking:** +``` +# The extracted triple +entity:JohnSmith rel:worksAt entity:AcmeCorp . + +# Statement with sub-chunk provenance +stmt:001 tg:reifies <> . +stmt:001 prov:wasDerivedFrom chunk:123-1-1 . +stmt:001 tg:sourceText "John Smith has worked at Acme Corp since 2019" . +stmt:001 tg:sourceCharOffset 1547 . +stmt:001 tg:sourceCharLength 46 . +``` + +**Example with text range (alternative):** +``` +stmt:001 tg:reifies <> . +stmt:001 prov:wasDerivedFrom chunk:123-1-1 . +stmt:001 tg:sourceRange "1547-1593" . +stmt:001 tg:sourceText "John Smith has worked at Acme Corp since 2019" . +``` + +**Implementation considerations:** + +- LLM-based extraction may not naturally provide character positions +- Could prompt the LLM to return the source sentence/phrase alongside extracted triples +- Alternatively, post-process to fuzzy-match extracted entities back to source text +- Trade-off between extraction complexity and provenance granularity +- May be easier to achieve with structured extraction methods than free-form LLM extraction + +This is marked as aspirational - the basic chunk-level provenance should be implemented first, with sub-chunk tracking as a future enhancement if feasible. + +### Dual Storage Model + +The provenance DAG is built progressively as documents flow through the pipeline: + +| Store | What's Stored | Purpose | +|-------|---------------|---------| +| Librarian | Document content + parent-child links | Content retrieval, cascade deletion | +| Knowledge Graph | Parent-child edges + metadata | Provenance queries, fact attribution | + +Both stores maintain the same DAG structure. The librarian holds content; the graph holds relationships and enables traversal queries. + +### Key Design Principles + +1. **Document ID as the unit of flow** - Processors pass IDs, not content. Content is fetched from librarian when needed. + +2. **Emit once at source** - Metadata is written to the graph once when processing begins, not repeated downstream. + +3. **Consistent processor pattern** - Every processor follows the same receive/fetch/produce/save/emit/forward pattern. + +4. **Progressive DAG construction** - Each processor adds its level to the DAG. The full provenance chain is built incrementally. + +5. **Post-chunker optimization** - After chunking, messages carry both ID and content. Chunks are small (2-4KB), so including content avoids unnecessary librarian round-trips while preserving provenance via the ID. + +## Implementation Tasks + +### Librarian Changes + +#### Current State + +- Initiates document processing by sending document ID to first processor +- No connection to triple store - metadata is bundled with extraction outputs +- `add-child-document` creates one-level parent-child links +- `list-children` returns immediate children only + +#### Required Changes + +**1. New interface: Triple store connection** + +Librarian needs to emit document metadata edges directly to the knowledge graph when initiating processing. +- Add triple store client/publisher to librarian service +- On processing initiation: emit root document metadata as graph edges (once) + +**2. Document type vocabulary** + +Standardize `document_type` values for child documents: +- `source` - original uploaded document +- `page` - page extracted from source (PDF, etc.) +- `chunk` - text chunk derived from page or source + +#### Interface Changes Summary + +| Interface | Change | +|-----------|--------| +| Triple store | New outbound connection - emit document metadata edges | +| Processing initiation | Emit metadata to graph before forwarding document ID | + +### PDF Extractor Changes + +#### Current State + +- Receives document content (or streams large documents) +- Extracts text from PDF pages +- Forwards page content to chunker +- No interaction with librarian or triple store + +#### Required Changes + +**1. New interface: Librarian client** + +PDF extractor needs to save each page as a child document in librarian. +- Add librarian client to PDF extractor service +- For each page: call `add-child-document` with parent = root document ID + +**2. New interface: Triple store connection** + +PDF extractor needs to emit parent-child edges to knowledge graph. +- Add triple store client/publisher +- For each page: emit edge linking page document to parent document + +**3. Change output format** + +Instead of forwarding page content directly, forward page document ID. +- Chunker will fetch content from librarian using the ID + +#### Interface Changes Summary + +| Interface | Change | +|-----------|--------| +| Librarian | New outbound - save child documents | +| Triple store | New outbound - emit parent-child edges | +| Output message | Change from content to document ID | + +### Chunker Changes + +#### Current State + +- Receives page/text content +- Splits into chunks +- Forwards chunk content to downstream processors +- No interaction with librarian or triple store + +#### Required Changes + +**1. Change input handling** + +Receive document ID instead of content, fetch from librarian. +- Add librarian client to chunker service +- Fetch page content using document ID + +**2. New interface: Librarian client (write)** + +Save each chunk as a child document in librarian. +- For each chunk: call `add-child-document` with parent = page document ID + +**3. New interface: Triple store connection** + +Emit parent-child edges to knowledge graph. +- Add triple store client/publisher +- For each chunk: emit edge linking chunk document to page document + +**4. Change output format** + +Forward both chunk document ID and chunk content (post-chunker optimization). +- Downstream processors receive ID for provenance + content to work with + +#### Interface Changes Summary + +| Interface | Change | +|-----------|--------| +| Input message | Change from content to document ID | +| Librarian | New outbound (read + write) - fetch content, save child documents | +| Triple store | New outbound - emit parent-child edges | +| Output message | Change from content-only to ID + content | + +### Knowledge Extractor Changes + +#### Current State + +- Receives chunk content +- Extracts triples and embeddings +- Emits to triple store and embedding store +- `subjectOf` relationship points to top-level document (not chunk) + +#### Required Changes + +**1. Change input handling** + +Receive chunk document ID alongside content. +- Use chunk ID for provenance linking (content already included per optimization) + +**2. Update triple provenance** + +Link extracted triples to chunk (not top-level document). +- Use reification to create edge pointing to edge +- `subjectOf` relationship: triple → chunk document ID +- First use of existing reification support + +**3. Update embedding provenance** + +Link embedding entity IDs to chunk. +- Emit edge: embedding entity ID → chunk document ID + +#### Interface Changes Summary + +| Interface | Change | +|-----------|--------| +| Input message | Expect chunk ID + content (not content only) | +| Triple store | Use reification for triple → chunk provenance | +| Embedding provenance | Link entity ID → chunk ID | ## References diff --git a/tests/unit/test_chunking/test_recursive_chunker.py b/tests/unit/test_chunking/test_recursive_chunker.py index ef4aec69..41286eae 100644 --- a/tests/unit/test_chunking/test_recursive_chunker.py +++ b/tests/unit/test_chunking/test_recursive_chunker.py @@ -176,6 +176,9 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): processor = Processor(**config) + # Mock save_child_document to avoid waiting for librarian response + processor.save_child_document = AsyncMock(return_value="mock-doc-id") + # Mock message with TextDocument mock_message = MagicMock() mock_text_doc = MagicMock() @@ -192,11 +195,13 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): # Mock consumer and flow with parameter overrides mock_consumer = MagicMock() mock_producer = AsyncMock() + mock_triples_producer = AsyncMock() mock_flow = MagicMock() mock_flow.side_effect = lambda param: { "chunk-size": 1500, "chunk-overlap": 150, - "output": mock_producer + "output": mock_producer, + "triples": mock_triples_producer, }.get(param) # Act diff --git a/tests/unit/test_decoding/test_pdf_decoder.py b/tests/unit/test_decoding/test_pdf_decoder.py index 1e6021d1..a3ca3514 100644 --- a/tests/unit/test_decoding/test_pdf_decoder.py +++ b/tests/unit/test_decoding/test_pdf_decoder.py @@ -69,9 +69,13 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): mock_msg = MagicMock() mock_msg.value.return_value = mock_document - # Mock flow + # Mock flow - separate mocks for output and triples mock_output_flow = AsyncMock() - mock_flow = MagicMock(return_value=mock_output_flow) + mock_triples_flow = AsyncMock() + mock_flow = MagicMock(side_effect=lambda name: { + "output": mock_output_flow, + "triples": mock_triples_flow, + }.get(name)) config = { 'id': 'test-pdf-decoder', @@ -80,10 +84,15 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): processor = Processor(**config) + # Mock save_child_document to avoid waiting for librarian response + processor.save_child_document = AsyncMock(return_value="mock-doc-id") + await processor.on_message(mock_msg, None, mock_flow) # Verify output was sent for each page assert mock_output_flow.send.call_count == 2 + # Verify triples were sent for each page (provenance) + assert mock_triples_flow.send.call_count == 2 @patch('trustgraph.base.chunking_service.Consumer') @patch('trustgraph.base.chunking_service.Producer') @@ -140,8 +149,13 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): mock_msg = MagicMock() mock_msg.value.return_value = mock_document + # Mock flow - separate mocks for output and triples mock_output_flow = AsyncMock() - mock_flow = MagicMock(return_value=mock_output_flow) + mock_triples_flow = AsyncMock() + mock_flow = MagicMock(side_effect=lambda name: { + "output": mock_output_flow, + "triples": mock_triples_flow, + }.get(name)) config = { 'id': 'test-pdf-decoder', @@ -150,11 +164,16 @@ class TestPdfDecoderProcessor(IsolatedAsyncioTestCase): processor = Processor(**config) + # Mock save_child_document to avoid waiting for librarian response + processor.save_child_document = AsyncMock(return_value="mock-doc-id") + await processor.on_message(mock_msg, None, mock_flow) mock_output_flow.send.assert_called_once() call_args = mock_output_flow.send.call_args[0][0] - assert call_args.text == "Page with unicode: 你好世界 🌍".encode('utf-8') + # PDF decoder now forwards document_id, chunker fetches content from librarian + assert call_args.document_id == "test-doc/p1" + assert call_args.text == b"" # Content stored in librarian, not inline @patch('trustgraph.base.flow_processor.FlowProcessor.add_args') def test_add_args(self, mock_parent_add_args): diff --git a/trustgraph-base/trustgraph/base/chunking_service.py b/trustgraph-base/trustgraph/base/chunking_service.py index 890ed3f5..6ba73e08 100644 --- a/trustgraph-base/trustgraph/base/chunking_service.py +++ b/trustgraph-base/trustgraph/base/chunking_service.py @@ -15,7 +15,7 @@ from .consumer import Consumer from .producer import Producer from .metrics import ConsumerMetrics, ProducerMetrics -from ..schema import LibrarianRequest, LibrarianResponse +from ..schema import LibrarianRequest, LibrarianResponse, DocumentMetadata from ..schema import librarian_request_queue, librarian_response_queue # Module logger @@ -135,6 +135,67 @@ class ChunkingService(FlowProcessor): self.pending_requests.pop(request_id, None) raise RuntimeError(f"Timeout fetching document {document_id}") + async def save_child_document(self, doc_id, parent_id, user, content, + document_type="chunk", title=None, timeout=120): + """ + Save a child document (chunk) to the librarian. + + Args: + doc_id: ID for the new child document + parent_id: ID of the parent document + user: User ID + content: Document content (bytes or str) + document_type: Type of document ("chunk", etc.) + title: Optional title + timeout: Request timeout in seconds + + Returns: + The document ID on success + """ + request_id = str(uuid.uuid4()) + + if isinstance(content, str): + content = content.encode("utf-8") + + doc_metadata = DocumentMetadata( + id=doc_id, + user=user, + kind="text/plain", + title=title or doc_id, + parent_id=parent_id, + document_type=document_type, + ) + + request = LibrarianRequest( + operation="add-child-document", + document_metadata=doc_metadata, + content=base64.b64encode(content).decode("utf-8"), + ) + + # Create future for response + future = asyncio.get_event_loop().create_future() + self.pending_requests[request_id] = future + + try: + # Send request + await self.librarian_request_producer.send( + request, properties={"id": request_id} + ) + + # Wait for response + response = await asyncio.wait_for(future, timeout=timeout) + + if response.error: + raise RuntimeError( + f"Librarian error saving chunk: {response.error.type}: {response.error.message}" + ) + + return doc_id + + except asyncio.TimeoutError: + self.pending_requests.pop(request_id, None) + raise RuntimeError(f"Timeout saving chunk {doc_id}") + async def get_document_text(self, doc): """ Get text content from a TextDocument, fetching from librarian if needed. diff --git a/trustgraph-base/trustgraph/provenance/__init__.py b/trustgraph-base/trustgraph/provenance/__init__.py new file mode 100644 index 00000000..3e80dad6 --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/__init__.py @@ -0,0 +1,110 @@ +""" +Provenance module for extraction-time provenance support. + +Provides helpers for: +- URI generation for documents, pages, chunks, activities, statements +- PROV-O triple building for provenance metadata +- Vocabulary bootstrap for per-collection initialization + +Usage example: + + from trustgraph.provenance import ( + document_uri, page_uri, chunk_uri_from_page, + document_triples, derived_entity_triples, + get_vocabulary_triples, + ) + + # Generate URIs + doc_uri = document_uri("my-doc-123") + page_uri = page_uri("my-doc-123", page_number=1) + + # Build provenance triples + triples = document_triples( + doc_uri, + title="My Document", + mime_type="application/pdf", + page_count=10, + ) + + # Get vocabulary bootstrap triples (once per collection) + vocab_triples = get_vocabulary_triples() +""" + +# URI generation +from . uris import ( + TRUSTGRAPH_BASE, + document_uri, + page_uri, + chunk_uri_from_page, + chunk_uri_from_doc, + activity_uri, + statement_uri, + agent_uri, +) + +# Namespace constants +from . namespaces import ( + # PROV-O + PROV, PROV_ENTITY, PROV_ACTIVITY, PROV_AGENT, + PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, + PROV_USED, PROV_WAS_ASSOCIATED_WITH, PROV_STARTED_AT_TIME, + # Dublin Core + DC, DC_TITLE, DC_SOURCE, DC_DATE, DC_CREATOR, + # RDF/RDFS + RDF, RDF_TYPE, RDFS, RDFS_LABEL, + # TrustGraph + TG, TG_REIFIES, TG_PAGE_COUNT, TG_MIME_TYPE, TG_PAGE_NUMBER, + TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH, + TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, + TG_LLM_MODEL, TG_ONTOLOGY, TG_EMBEDDING_MODEL, + TG_SOURCE_TEXT, TG_SOURCE_CHAR_OFFSET, TG_SOURCE_CHAR_LENGTH, +) + +# Triple builders +from . triples import ( + document_triples, + derived_entity_triples, + triple_provenance_triples, +) + +# Vocabulary bootstrap +from . vocabulary import ( + get_vocabulary_triples, + PROV_CLASS_LABELS, + PROV_PREDICATE_LABELS, + DC_PREDICATE_LABELS, + TG_PREDICATE_LABELS, +) + +__all__ = [ + # URIs + "TRUSTGRAPH_BASE", + "document_uri", + "page_uri", + "chunk_uri_from_page", + "chunk_uri_from_doc", + "activity_uri", + "statement_uri", + "agent_uri", + # Namespaces + "PROV", "PROV_ENTITY", "PROV_ACTIVITY", "PROV_AGENT", + "PROV_WAS_DERIVED_FROM", "PROV_WAS_GENERATED_BY", + "PROV_USED", "PROV_WAS_ASSOCIATED_WITH", "PROV_STARTED_AT_TIME", + "DC", "DC_TITLE", "DC_SOURCE", "DC_DATE", "DC_CREATOR", + "RDF", "RDF_TYPE", "RDFS", "RDFS_LABEL", + "TG", "TG_REIFIES", "TG_PAGE_COUNT", "TG_MIME_TYPE", "TG_PAGE_NUMBER", + "TG_CHUNK_INDEX", "TG_CHAR_OFFSET", "TG_CHAR_LENGTH", + "TG_CHUNK_SIZE", "TG_CHUNK_OVERLAP", "TG_COMPONENT_VERSION", + "TG_LLM_MODEL", "TG_ONTOLOGY", "TG_EMBEDDING_MODEL", + "TG_SOURCE_TEXT", "TG_SOURCE_CHAR_OFFSET", "TG_SOURCE_CHAR_LENGTH", + # Triple builders + "document_triples", + "derived_entity_triples", + "triple_provenance_triples", + # Vocabulary + "get_vocabulary_triples", + "PROV_CLASS_LABELS", + "PROV_PREDICATE_LABELS", + "DC_PREDICATE_LABELS", + "TG_PREDICATE_LABELS", +] diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py new file mode 100644 index 00000000..9801f34c --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -0,0 +1,48 @@ +""" +RDF namespace constants for provenance. + +Includes PROV-O, Dublin Core, and TrustGraph namespace URIs. +""" + +# PROV-O namespace (W3C Provenance Ontology) +PROV = "http://www.w3.org/ns/prov#" +PROV_ENTITY = PROV + "Entity" +PROV_ACTIVITY = PROV + "Activity" +PROV_AGENT = PROV + "Agent" +PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom" +PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy" +PROV_USED = PROV + "used" +PROV_WAS_ASSOCIATED_WITH = PROV + "wasAssociatedWith" +PROV_STARTED_AT_TIME = PROV + "startedAtTime" + +# Dublin Core namespace +DC = "http://purl.org/dc/elements/1.1/" +DC_TITLE = DC + "title" +DC_SOURCE = DC + "source" +DC_DATE = DC + "date" +DC_CREATOR = DC + "creator" + +# RDF/RDFS namespace (also in rdf.py, but included here for completeness) +RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" +RDF_TYPE = RDF + "type" +RDFS = "http://www.w3.org/2000/01/rdf-schema#" +RDFS_LABEL = RDFS + "label" + +# TrustGraph namespace for custom predicates +TG = "https://trustgraph.ai/ns/" +TG_REIFIES = TG + "reifies" +TG_PAGE_COUNT = TG + "pageCount" +TG_MIME_TYPE = TG + "mimeType" +TG_PAGE_NUMBER = TG + "pageNumber" +TG_CHUNK_INDEX = TG + "chunkIndex" +TG_CHAR_OFFSET = TG + "charOffset" +TG_CHAR_LENGTH = TG + "charLength" +TG_CHUNK_SIZE = TG + "chunkSize" +TG_CHUNK_OVERLAP = TG + "chunkOverlap" +TG_COMPONENT_VERSION = TG + "componentVersion" +TG_LLM_MODEL = TG + "llmModel" +TG_ONTOLOGY = TG + "ontology" +TG_EMBEDDING_MODEL = TG + "embeddingModel" +TG_SOURCE_TEXT = TG + "sourceText" +TG_SOURCE_CHAR_OFFSET = TG + "sourceCharOffset" +TG_SOURCE_CHAR_LENGTH = TG + "sourceCharLength" diff --git a/trustgraph-base/trustgraph/provenance/triples.py b/trustgraph-base/trustgraph/provenance/triples.py new file mode 100644 index 00000000..5e4dc774 --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/triples.py @@ -0,0 +1,251 @@ +""" +Helper functions to build PROV-O triples for extraction-time provenance. +""" + +from datetime import datetime +from typing import List, Optional + +from .. schema import Triple, Term, IRI, LITERAL + +from . namespaces import ( + RDF_TYPE, RDFS_LABEL, + PROV_ENTITY, PROV_ACTIVITY, PROV_AGENT, + PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, + PROV_USED, PROV_WAS_ASSOCIATED_WITH, PROV_STARTED_AT_TIME, + DC_TITLE, DC_SOURCE, DC_DATE, DC_CREATOR, + TG_PAGE_COUNT, TG_MIME_TYPE, TG_PAGE_NUMBER, + TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH, + TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, + TG_LLM_MODEL, TG_ONTOLOGY, TG_REIFIES, +) + +from . uris import activity_uri, agent_uri + + +def _iri(uri: str) -> Term: + """Create an IRI term.""" + return Term(type=IRI, iri=uri) + + +def _literal(value) -> Term: + """Create a literal term.""" + return Term(type=LITERAL, value=str(value)) + + +def _triple(s: str, p: str, o_term: Term) -> Triple: + """Create a triple with IRI subject and predicate.""" + return Triple(s=_iri(s), p=_iri(p), o=o_term) + + +def document_triples( + doc_uri: str, + title: Optional[str] = None, + source: Optional[str] = None, + date: Optional[str] = None, + creator: Optional[str] = None, + page_count: Optional[int] = None, + mime_type: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for a source document entity. + + Args: + doc_uri: The document URI (from uris.document_uri) + title: Document title + source: Source URL/path + date: Document date + creator: Author/creator + page_count: Number of pages (for PDFs) + mime_type: MIME type + + Returns: + List of Triple objects + """ + triples = [ + _triple(doc_uri, RDF_TYPE, _iri(PROV_ENTITY)), + ] + + if title: + triples.append(_triple(doc_uri, DC_TITLE, _literal(title))) + triples.append(_triple(doc_uri, RDFS_LABEL, _literal(title))) + + if source: + triples.append(_triple(doc_uri, DC_SOURCE, _iri(source))) + + if date: + triples.append(_triple(doc_uri, DC_DATE, _literal(date))) + + if creator: + triples.append(_triple(doc_uri, DC_CREATOR, _literal(creator))) + + if page_count is not None: + triples.append(_triple(doc_uri, TG_PAGE_COUNT, _literal(page_count))) + + if mime_type: + triples.append(_triple(doc_uri, TG_MIME_TYPE, _literal(mime_type))) + + return triples + + +def derived_entity_triples( + entity_uri: str, + parent_uri: str, + component_name: str, + component_version: str, + label: Optional[str] = None, + page_number: Optional[int] = None, + chunk_index: Optional[int] = None, + char_offset: Optional[int] = None, + char_length: Optional[int] = None, + chunk_size: Optional[int] = None, + chunk_overlap: Optional[int] = None, + timestamp: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for a derived entity (page or chunk) with full PROV-O provenance. + + Creates: + - Entity declaration + - wasDerivedFrom relationship to parent + - Activity for the extraction + - Agent for the component + + Args: + entity_uri: URI of the derived entity (page or chunk) + parent_uri: URI of the parent entity + component_name: Name of TG component (e.g., "pdf-extractor", "chunker") + component_version: Version of the component + label: Human-readable label + page_number: Page number (for pages) + chunk_index: Chunk index (for chunks) + char_offset: Character offset in parent (for chunks) + char_length: Character length (for chunks) + chunk_size: Configured chunk size (for chunking activity) + chunk_overlap: Configured chunk overlap (for chunking activity) + timestamp: ISO timestamp (defaults to now) + + Returns: + List of Triple objects + """ + if timestamp is None: + timestamp = datetime.utcnow().isoformat() + "Z" + + act_uri = activity_uri() + agt_uri = agent_uri(component_name) + + triples = [ + # Entity declaration + _triple(entity_uri, RDF_TYPE, _iri(PROV_ENTITY)), + + # Derivation from parent + _triple(entity_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)), + + # Generation by activity + _triple(entity_uri, PROV_WAS_GENERATED_BY, _iri(act_uri)), + + # Activity declaration + _triple(act_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(act_uri, PROV_USED, _iri(parent_uri)), + _triple(act_uri, PROV_WAS_ASSOCIATED_WITH, _iri(agt_uri)), + _triple(act_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), + _triple(act_uri, TG_COMPONENT_VERSION, _literal(component_version)), + + # Agent declaration + _triple(agt_uri, RDF_TYPE, _iri(PROV_AGENT)), + _triple(agt_uri, RDFS_LABEL, _literal(component_name)), + ] + + if label: + triples.append(_triple(entity_uri, RDFS_LABEL, _literal(label))) + + if page_number is not None: + triples.append(_triple(entity_uri, TG_PAGE_NUMBER, _literal(page_number))) + + if chunk_index is not None: + triples.append(_triple(entity_uri, TG_CHUNK_INDEX, _literal(chunk_index))) + + if char_offset is not None: + triples.append(_triple(entity_uri, TG_CHAR_OFFSET, _literal(char_offset))) + + if char_length is not None: + triples.append(_triple(entity_uri, TG_CHAR_LENGTH, _literal(char_length))) + + if chunk_size is not None: + triples.append(_triple(act_uri, TG_CHUNK_SIZE, _literal(chunk_size))) + + if chunk_overlap is not None: + triples.append(_triple(act_uri, TG_CHUNK_OVERLAP, _literal(chunk_overlap))) + + return triples + + +def triple_provenance_triples( + stmt_uri: str, + subject_uri: str, + predicate_uri: str, + object_term: Term, + chunk_uri: str, + component_name: str, + component_version: str, + llm_model: Optional[str] = None, + ontology_uri: Optional[str] = None, + timestamp: Optional[str] = None, +) -> List[Triple]: + """ + Build provenance triples for an extracted knowledge triple using reification. + + Creates: + - Statement object that reifies the triple + - wasDerivedFrom link to source chunk + - Activity and agent metadata + + Args: + stmt_uri: URI for the reified statement + subject_uri: Subject of the extracted triple + predicate_uri: Predicate of the extracted triple + object_term: Object of the extracted triple (Term) + chunk_uri: URI of source chunk + component_name: Name of extractor component + component_version: Version of the component + llm_model: LLM model used for extraction + ontology_uri: Ontology URI used for extraction + timestamp: ISO timestamp + + Returns: + List of Triple objects for the provenance (not the triple itself) + """ + if timestamp is None: + timestamp = datetime.utcnow().isoformat() + "Z" + + act_uri = activity_uri() + agt_uri = agent_uri(component_name) + + # Note: The actual reification (tg:reifies pointing at the edge) requires + # RDF 1.2 triple term support. This builds the surrounding provenance. + # The actual reification link must be handled by the knowledge extractor + # using the graph store's reification API. + + triples = [ + # Statement provenance + _triple(stmt_uri, PROV_WAS_DERIVED_FROM, _iri(chunk_uri)), + _triple(stmt_uri, PROV_WAS_GENERATED_BY, _iri(act_uri)), + + # Activity + _triple(act_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(act_uri, PROV_USED, _iri(chunk_uri)), + _triple(act_uri, PROV_WAS_ASSOCIATED_WITH, _iri(agt_uri)), + _triple(act_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), + _triple(act_uri, TG_COMPONENT_VERSION, _literal(component_version)), + + # Agent + _triple(agt_uri, RDF_TYPE, _iri(PROV_AGENT)), + _triple(agt_uri, RDFS_LABEL, _literal(component_name)), + ] + + if llm_model: + triples.append(_triple(act_uri, TG_LLM_MODEL, _literal(llm_model))) + + if ontology_uri: + triples.append(_triple(act_uri, TG_ONTOLOGY, _iri(ontology_uri))) + + return triples diff --git a/trustgraph-base/trustgraph/provenance/uris.py b/trustgraph-base/trustgraph/provenance/uris.py new file mode 100644 index 00000000..ff5570e4 --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/uris.py @@ -0,0 +1,61 @@ +""" +URI generation for provenance entities. + +URI patterns: +- Document: https://trustgraph.ai/doc/{doc_id} +- Page: https://trustgraph.ai/page/{doc_id}/p{page_number} +- Chunk: https://trustgraph.ai/chunk/{doc_id}/p{page}/c{chunk} (from page) + https://trustgraph.ai/chunk/{doc_id}/c{chunk} (from text doc) +- Activity: https://trustgraph.ai/activity/{uuid} +- Statement: https://trustgraph.ai/stmt/{uuid} +""" + +import uuid +import urllib.parse + +# Base URI prefix +TRUSTGRAPH_BASE = "https://trustgraph.ai" + + +def _encode_id(id_str: str) -> str: + """URL-encode an ID component for safe inclusion in URIs.""" + return urllib.parse.quote(str(id_str), safe='') + + +def document_uri(doc_id: str) -> str: + """Generate URI for a source document.""" + return f"{TRUSTGRAPH_BASE}/doc/{_encode_id(doc_id)}" + + +def page_uri(doc_id: str, page_number: int) -> str: + """Generate URI for a page extracted from a document.""" + return f"{TRUSTGRAPH_BASE}/page/{_encode_id(doc_id)}/p{page_number}" + + +def chunk_uri_from_page(doc_id: str, page_number: int, chunk_index: int) -> str: + """Generate URI for a chunk extracted from a page.""" + return f"{TRUSTGRAPH_BASE}/chunk/{_encode_id(doc_id)}/p{page_number}/c{chunk_index}" + + +def chunk_uri_from_doc(doc_id: str, chunk_index: int) -> str: + """Generate URI for a chunk extracted directly from a text document.""" + return f"{TRUSTGRAPH_BASE}/chunk/{_encode_id(doc_id)}/c{chunk_index}" + + +def activity_uri(activity_id: str = None) -> str: + """Generate URI for a PROV-O activity. Auto-generates UUID if not provided.""" + if activity_id is None: + activity_id = str(uuid.uuid4()) + return f"{TRUSTGRAPH_BASE}/activity/{_encode_id(activity_id)}" + + +def statement_uri(stmt_id: str = None) -> str: + """Generate URI for a reified statement. Auto-generates UUID if not provided.""" + if stmt_id is None: + stmt_id = str(uuid.uuid4()) + return f"{TRUSTGRAPH_BASE}/stmt/{_encode_id(stmt_id)}" + + +def agent_uri(component_name: str) -> str: + """Generate URI for a TrustGraph component agent.""" + return f"{TRUSTGRAPH_BASE}/agent/{_encode_id(component_name)}" diff --git a/trustgraph-base/trustgraph/provenance/vocabulary.py b/trustgraph-base/trustgraph/provenance/vocabulary.py new file mode 100644 index 00000000..0124e0cd --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/vocabulary.py @@ -0,0 +1,101 @@ +""" +Vocabulary bootstrap for provenance. + +The knowledge graph is ontology-neutral and initializes empty. When writing +PROV-O provenance data to a collection for the first time, the vocabulary +must be bootstrapped with RDF labels for all classes and predicates. +""" + +from typing import List + +from .. schema import Triple, Term, IRI, LITERAL + +from . namespaces import ( + RDFS_LABEL, + PROV_ENTITY, PROV_ACTIVITY, PROV_AGENT, + PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, + PROV_USED, PROV_WAS_ASSOCIATED_WITH, PROV_STARTED_AT_TIME, + DC_TITLE, DC_SOURCE, DC_DATE, DC_CREATOR, + TG_REIFIES, TG_PAGE_COUNT, TG_MIME_TYPE, TG_PAGE_NUMBER, + TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH, + TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, + TG_LLM_MODEL, TG_ONTOLOGY, TG_EMBEDDING_MODEL, + TG_SOURCE_TEXT, TG_SOURCE_CHAR_OFFSET, TG_SOURCE_CHAR_LENGTH, +) + + +def _label_triple(uri: str, label: str) -> Triple: + """Create a label triple for a URI.""" + return Triple( + s=Term(type=IRI, iri=uri), + p=Term(type=IRI, iri=RDFS_LABEL), + o=Term(type=LITERAL, value=label), + ) + + +# PROV-O class labels +PROV_CLASS_LABELS = [ + _label_triple(PROV_ENTITY, "Entity"), + _label_triple(PROV_ACTIVITY, "Activity"), + _label_triple(PROV_AGENT, "Agent"), +] + +# PROV-O predicate labels +PROV_PREDICATE_LABELS = [ + _label_triple(PROV_WAS_DERIVED_FROM, "was derived from"), + _label_triple(PROV_WAS_GENERATED_BY, "was generated by"), + _label_triple(PROV_USED, "used"), + _label_triple(PROV_WAS_ASSOCIATED_WITH, "was associated with"), + _label_triple(PROV_STARTED_AT_TIME, "started at"), +] + +# Dublin Core predicate labels +DC_PREDICATE_LABELS = [ + _label_triple(DC_TITLE, "title"), + _label_triple(DC_SOURCE, "source"), + _label_triple(DC_DATE, "date"), + _label_triple(DC_CREATOR, "creator"), +] + +# TrustGraph predicate labels +TG_PREDICATE_LABELS = [ + _label_triple(TG_REIFIES, "reifies"), + _label_triple(TG_PAGE_COUNT, "page count"), + _label_triple(TG_MIME_TYPE, "MIME type"), + _label_triple(TG_PAGE_NUMBER, "page number"), + _label_triple(TG_CHUNK_INDEX, "chunk index"), + _label_triple(TG_CHAR_OFFSET, "character offset"), + _label_triple(TG_CHAR_LENGTH, "character length"), + _label_triple(TG_CHUNK_SIZE, "chunk size"), + _label_triple(TG_CHUNK_OVERLAP, "chunk overlap"), + _label_triple(TG_COMPONENT_VERSION, "component version"), + _label_triple(TG_LLM_MODEL, "LLM model"), + _label_triple(TG_ONTOLOGY, "ontology"), + _label_triple(TG_EMBEDDING_MODEL, "embedding model"), + _label_triple(TG_SOURCE_TEXT, "source text"), + _label_triple(TG_SOURCE_CHAR_OFFSET, "source character offset"), + _label_triple(TG_SOURCE_CHAR_LENGTH, "source character length"), +] + + +def get_vocabulary_triples() -> List[Triple]: + """ + Get all vocabulary bootstrap triples. + + Returns a list of triples that define labels for all PROV-O classes, + PROV-O predicates, Dublin Core predicates, and TrustGraph predicates + used in extraction-time provenance. + + This should be emitted to the knowledge graph once per collection + before any provenance data is written. The operation is idempotent - + re-emitting the same triples is harmless. + + Returns: + List of Triple objects defining vocabulary labels + """ + return ( + PROV_CLASS_LABELS + + PROV_PREDICATE_LABELS + + DC_PREDICATE_LABELS + + TG_PREDICATE_LABELS + ) diff --git a/trustgraph-base/trustgraph/schema/knowledge/document.py b/trustgraph-base/trustgraph/schema/knowledge/document.py index 5d18b265..c75a1227 100644 --- a/trustgraph-base/trustgraph/schema/knowledge/document.py +++ b/trustgraph-base/trustgraph/schema/knowledge/document.py @@ -34,5 +34,9 @@ class TextDocument: class Chunk: metadata: Metadata | None = None chunk: bytes = b"" + # For provenance: document_id of this chunk in librarian + # Post-chunker optimization: both document_id AND chunk content are included + # so downstream processors have the ID for provenance and content to work with + document_id: str = "" ############################################################################ diff --git a/trustgraph-base/trustgraph/schema/knowledge/embeddings.py b/trustgraph-base/trustgraph/schema/knowledge/embeddings.py index 93559056..b39bf6ea 100644 --- a/trustgraph-base/trustgraph/schema/knowledge/embeddings.py +++ b/trustgraph-base/trustgraph/schema/knowledge/embeddings.py @@ -12,6 +12,8 @@ from ..core.topic import topic class EntityEmbeddings: entity: Term | None = None vectors: list[list[float]] = field(default_factory=list) + # Provenance: which chunk this embedding was derived from + chunk_id: str = "" # This is a 'batching' mechanism for the above data @dataclass diff --git a/trustgraph-base/trustgraph/schema/knowledge/graph.py b/trustgraph-base/trustgraph/schema/knowledge/graph.py index 4ee8d2c0..b4a05084 100644 --- a/trustgraph-base/trustgraph/schema/knowledge/graph.py +++ b/trustgraph-base/trustgraph/schema/knowledge/graph.py @@ -12,6 +12,8 @@ from ..core.topic import topic class EntityContext: entity: Term | None = None context: str = "" + # Provenance: which chunk this entity context was derived from + chunk_id: str = "" # This is a 'batching' mechanism for the above data @dataclass diff --git a/trustgraph-base/trustgraph/schema/services/library.py b/trustgraph-base/trustgraph/schema/services/library.py index 4025977a..6dcdee1a 100644 --- a/trustgraph-base/trustgraph/schema/services/library.py +++ b/trustgraph-base/trustgraph/schema/services/library.py @@ -91,7 +91,12 @@ class DocumentMetadata: tags: list[str] = field(default_factory=list) # Child document support parent_id: str = "" # Empty for top-level docs, set for children - document_type: str = "source" # "source" or "extracted" + # Document type vocabulary: + # "source" - original uploaded document + # "page" - page extracted from source (e.g., PDF page) + # "chunk" - text chunk derived from page or source + # "extracted" - legacy value, kept for backwards compatibility + document_type: str = "source" @dataclass class ProcessingMetadata: diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index 529e1ff1..cb607067 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -8,9 +8,18 @@ import logging from langchain_text_splitters import RecursiveCharacterTextSplitter from prometheus_client import Histogram -from ... schema import TextDocument, Chunk +from ... schema import TextDocument, Chunk, Metadata, Triples from ... base import ChunkingService, ConsumerSpec, ProducerSpec +from ... provenance import ( + page_uri, chunk_uri_from_page, chunk_uri_from_doc, + derived_entity_triples, document_uri, +) + +# Component identification for provenance +COMPONENT_NAME = "chunker" +COMPONENT_VERSION = "1.0.0" + # Module logger logger = logging.getLogger(__name__) @@ -63,6 +72,13 @@ class Processor(ChunkingService): ) ) + self.register_specification( + ProducerSpec( + name = "triples", + schema = Triples, + ) + ) + logger.info("Recursive chunker initialized") async def on_message(self, msg, consumer, flow): @@ -96,21 +112,99 @@ class Processor(ChunkingService): texts = text_splitter.create_documents([text]) + # Get parent document ID for provenance linking + parent_doc_id = v.document_id or v.metadata.id + + # Determine if parent is a page (from PDF) or source document (text) + # Check if parent_doc_id contains "/p" which indicates a page + is_from_page = "/p" in parent_doc_id + + # Extract the root document ID for chunk URI generation + if is_from_page: + # Parent is a page like "doc123/p3", extract page number + parts = parent_doc_id.rsplit("/p", 1) + root_doc_id = parts[0] + page_num = int(parts[1]) if len(parts) > 1 else 1 + else: + root_doc_id = parent_doc_id + page_num = None + + # Track character offset for provenance + char_offset = 0 + for ix, chunk in enumerate(texts): + chunk_index = ix + 1 # 1-indexed logger.debug(f"Created chunk of size {len(chunk.page_content)}") + # Generate chunk document ID + if is_from_page: + chunk_doc_id = f"{root_doc_id}/p{page_num}/c{chunk_index}" + chunk_uri = chunk_uri_from_page(root_doc_id, page_num, chunk_index) + parent_uri = page_uri(root_doc_id, page_num) + else: + chunk_doc_id = f"{root_doc_id}/c{chunk_index}" + chunk_uri = chunk_uri_from_doc(root_doc_id, chunk_index) + parent_uri = document_uri(root_doc_id) + + chunk_content = chunk.page_content.encode("utf-8") + chunk_length = len(chunk.page_content) + + # Save chunk to librarian as child document + await self.save_child_document( + doc_id=chunk_doc_id, + parent_id=parent_doc_id, + user=v.metadata.user, + content=chunk_content, + document_type="chunk", + title=f"Chunk {chunk_index}", + ) + + # Emit provenance triples + prov_triples = derived_entity_triples( + entity_uri=chunk_uri, + parent_uri=parent_uri, + component_name=COMPONENT_NAME, + component_version=COMPONENT_VERSION, + label=f"Chunk {chunk_index}", + chunk_index=chunk_index, + char_offset=char_offset, + char_length=chunk_length, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + await flow("triples").send(Triples( + metadata=Metadata( + id=chunk_uri, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples=prov_triples, + )) + + # Forward chunk ID + content (post-chunker optimization) r = Chunk( - metadata=v.metadata, - chunk=chunk.page_content.encode("utf-8"), + metadata=Metadata( + id=chunk_uri, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + chunk=chunk_content, + document_id=chunk_doc_id, ) __class__.chunk_metric.labels( id=consumer.id, flow=consumer.flow - ).observe(len(chunk.page_content)) + ).observe(chunk_length) await flow("output").send(r) + # Update character offset (approximate, doesn't account for overlap) + char_offset += chunk_length - chunk_overlap + logger.debug("Document chunking complete") @staticmethod diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index 67cf4200..1248c7a0 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -16,11 +16,20 @@ import uuid from langchain_community.document_loaders import PyPDFLoader from ... schema import Document, TextDocument, Metadata -from ... schema import LibrarianRequest, LibrarianResponse +from ... schema import LibrarianRequest, LibrarianResponse, DocumentMetadata from ... schema import librarian_request_queue, librarian_response_queue +from ... schema import Triples from ... base import FlowProcessor, ConsumerSpec, ProducerSpec from ... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics +from ... provenance import ( + document_uri, page_uri, derived_entity_triples, +) + +# Component identification for provenance +COMPONENT_NAME = "pdf-decoder" +COMPONENT_VERSION = "1.0.0" + # Module logger logger = logging.getLogger(__name__) @@ -57,6 +66,13 @@ class Processor(FlowProcessor): ) ) + self.register_specification( + ProducerSpec( + name = "triples", + schema = Triples, + ) + ) + # Librarian client for fetching document content librarian_request_q = params.get( "librarian_request_queue", default_librarian_request_queue @@ -148,6 +164,66 @@ class Processor(FlowProcessor): self.pending_requests.pop(request_id, None) raise RuntimeError(f"Timeout fetching document {document_id}") + async def save_child_document(self, doc_id, parent_id, user, content, + document_type="page", title=None, timeout=120): + """ + Save a child document to the librarian. + + Args: + doc_id: ID for the new child document + parent_id: ID of the parent document + user: User ID + content: Document content (bytes) + document_type: Type of document ("page", "chunk", etc.) + title: Optional title + timeout: Request timeout in seconds + + Returns: + The document ID on success + """ + import base64 + + request_id = str(uuid.uuid4()) + + doc_metadata = DocumentMetadata( + id=doc_id, + user=user, + kind="text/plain", + title=title or doc_id, + parent_id=parent_id, + document_type=document_type, + ) + + request = LibrarianRequest( + operation="add-child-document", + document_metadata=doc_metadata, + content=base64.b64encode(content).decode("utf-8"), + ) + + # Create future for response + future = asyncio.get_event_loop().create_future() + self.pending_requests[request_id] = future + + try: + # Send request + await self.librarian_request_producer.send( + request, properties={"id": request_id} + ) + + # Wait for response + response = await asyncio.wait_for(future, timeout=timeout) + + if response.error: + raise RuntimeError( + f"Librarian error saving child document: {response.error.type}: {response.error.message}" + ) + + return doc_id + + except asyncio.TimeoutError: + self.pending_requests.pop(request_id, None) + raise RuntimeError(f"Timeout saving child document {doc_id}") + async def on_message(self, msg, consumer, flow): logger.debug("PDF message received") @@ -187,13 +263,62 @@ class Processor(FlowProcessor): loader = PyPDFLoader(temp_path) pages = loader.load() + # Get the source document ID + source_doc_id = v.document_id or v.metadata.id + for ix, page in enumerate(pages): + page_num = ix + 1 # 1-indexed page numbers - logger.debug(f"Processing page {ix}") + logger.debug(f"Processing page {page_num}") + # Generate page document ID + page_doc_id = f"{source_doc_id}/p{page_num}" + page_content = page.page_content.encode("utf-8") + + # Save page as child document in librarian + await self.save_child_document( + doc_id=page_doc_id, + parent_id=source_doc_id, + user=v.metadata.user, + content=page_content, + document_type="page", + title=f"Page {page_num}", + ) + + # Emit provenance triples + doc_uri = document_uri(source_doc_id) + pg_uri = page_uri(source_doc_id, page_num) + + prov_triples = derived_entity_triples( + entity_uri=pg_uri, + parent_uri=doc_uri, + component_name=COMPONENT_NAME, + component_version=COMPONENT_VERSION, + label=f"Page {page_num}", + page_number=page_num, + ) + + await flow("triples").send(Triples( + metadata=Metadata( + id=pg_uri, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples=prov_triples, + )) + + # Forward page document ID to chunker + # Chunker will fetch content from librarian r = TextDocument( - metadata=v.metadata, - text=page.page_content.encode("utf-8"), + metadata=Metadata( + id=pg_uri, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + document_id=page_doc_id, + text=b"", # Empty, chunker will fetch from librarian ) await flow("output").send(r) diff --git a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py index 1b63774d..fec528bd 100755 --- a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py @@ -71,7 +71,8 @@ class Processor(FlowProcessor): entities.append( EntityEmbeddings( entity=entity.entity, - vectors=vectors + vectors=vectors, + chunk_id=entity.chunk_id, # Provenance: source chunk ) ) diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index 72275a8c..578f519d 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -128,10 +128,12 @@ class Processor(FlowProcessor): triples = [] entities = [] - # FIXME: Putting metadata into triples store is duplicated in - # relationships extractor too - for t in v.metadata.metadata: - triples.append(t) + # Get chunk document ID for provenance linking + chunk_doc_id = v.document_id if v.document_id else v.metadata.id + chunk_uri = v.metadata.id # The URI form for the chunk + + # Note: Document metadata is now emitted once by librarian at processing + # initiation, so we don't need to duplicate it here. for defn in defs: @@ -159,22 +161,27 @@ class Processor(FlowProcessor): s=s_value, p=DEFINITION_VALUE, o=o_value )) + # Link entity to chunk (not top-level document) triples.append(Triple( s=s_value, p=SUBJECT_OF_VALUE, - o=Term(type=IRI, iri=v.metadata.id) + o=Term(type=IRI, iri=chunk_uri) )) # Output entity name as context for direct name matching + # Include chunk_id for embedding provenance entities.append(EntityContext( entity=s_value, context=s, + chunk_id=chunk_doc_id, )) # Output definition as context for semantic matching + # Include chunk_id for embedding provenance entities.append(EntityContext( entity=s_value, context=defn["definition"], + chunk_id=chunk_doc_id, )) # Send triples in batches diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 7ab51555..1a719500 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -109,10 +109,12 @@ class Processor(FlowProcessor): triples = [] - # FIXME: Putting metadata into triples store is duplicated in - # relationships extractor too - for t in v.metadata.metadata: - triples.append(t) + # Get chunk document ID for provenance linking + chunk_doc_id = v.document_id if v.document_id else v.metadata.id + chunk_uri = v.metadata.id # The URI form for the chunk + + # Note: Document metadata is now emitted once by librarian at processing + # initiation, so we don't need to duplicate it here. for rel in rels: @@ -168,19 +170,19 @@ class Processor(FlowProcessor): o=Term(type=LITERAL, value=str(o)) )) - # 'Subject of' for s + # Link entity to chunk (not top-level document) triples.append(Triple( s=s_value, p=SUBJECT_OF_VALUE, - o=Term(type=IRI, iri=v.metadata.id) + o=Term(type=IRI, iri=chunk_uri) )) if rel["object-entity"]: - # 'Subject of' for o + # Link object entity to chunk triples.append(Triple( s=o_value, p=SUBJECT_OF_VALUE, - o=Term(type=IRI, iri=v.metadata.id) + o=Term(type=IRI, iri=chunk_uri) )) # Send triples in batches diff --git a/trustgraph-flow/trustgraph/librarian/librarian.py b/trustgraph-flow/trustgraph/librarian/librarian.py index 82a47084..2497e4a2 100644 --- a/trustgraph-flow/trustgraph/librarian/librarian.py +++ b/trustgraph-flow/trustgraph/librarian/librarian.py @@ -609,8 +609,10 @@ class Librarian: ): raise RequestError("Document already exists") - # Ensure document_type is set to "extracted" - request.document_metadata.document_type = "extracted" + # Set document_type if not specified by caller + # Valid types: "page", "chunk", or "extracted" (legacy) + if not request.document_metadata.document_type or request.document_metadata.document_type == "source": + request.document_metadata.document_type = "extracted" # Create object ID for blob object_id = uuid.uuid4() diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index e5bd2455..9f92d9fb 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -23,9 +23,14 @@ from .. schema import config_request_queue, config_response_queue from .. schema import Document, Metadata from .. schema import TextDocument, Metadata +from .. schema import Triples from .. exceptions import RequestError +from .. provenance import ( + document_uri, document_triples, get_vocabulary_triples, +) + from . librarian import Librarian from . collection_manager import CollectionManager @@ -281,6 +286,67 @@ class Processor(AsyncProcessor): # Threshold for sending document_id instead of inline content (2MB) STREAMING_THRESHOLD = 2 * 1024 * 1024 + async def emit_document_provenance(self, document, processing, triples_queue): + """ + Emit document provenance metadata to the knowledge graph. + + This emits: + 1. Vocabulary bootstrap triples (idempotent, safe to re-emit) + 2. Document metadata as PROV-O triples + """ + logger.debug(f"Emitting document provenance for {document.id}") + + # Build document URI and provenance triples + doc_uri = document_uri(document.id) + + # Get page count for PDFs (if available from document metadata) + page_count = None + if document.kind == "application/pdf": + # Page count might be in document metadata triples + # For now, we don't have it at this point - it gets determined during extraction + pass + + # Build document metadata triples + prov_triples = document_triples( + doc_uri=doc_uri, + title=document.title if document.title else None, + mime_type=document.kind, + ) + + # Include any existing metadata triples from the document + if document.metadata: + prov_triples.extend(document.metadata) + + # Get vocabulary bootstrap triples (idempotent) + vocab_triples = get_vocabulary_triples() + + # Combine all triples + all_triples = vocab_triples + prov_triples + + # Create publisher and emit + triples_pub = Publisher( + self.pubsub, triples_queue, schema=Triples + ) + + try: + await triples_pub.start() + + triples_msg = Triples( + metadata=Metadata( + id=doc_uri, + metadata=[], + user=processing.user, + collection=processing.collection, + ), + triples=all_triples, + ) + + await triples_pub.send(None, triples_msg) + logger.debug(f"Emitted {len(all_triples)} provenance triples for {document.id}") + + finally: + await triples_pub.stop() + async def load_document(self, document, processing, content): logger.debug("Ready for document processing...") @@ -301,6 +367,12 @@ class Processor(AsyncProcessor): q = flow["interfaces"][kind] + # Emit document provenance to knowledge graph + if "triples-store" in flow["interfaces"]: + await self.emit_document_provenance( + document, processing, flow["interfaces"]["triples-store"] + ) + if kind == "text-load": # For large text documents, send document_id for streaming retrieval if len(content) >= self.STREAMING_THRESHOLD: