Commit graph

41 commits

Author SHA1 Message Date
cybermaggedon
89cabee1b4
release/v2.4 -> master (#844) 2026-04-22 15:19:57 +01:00
Het Patel
adea976203 feat: implement retry logic and exponential backoff for S3 operations (#829)
* feat: implement retry logic and exponential backoff for S3 operations

* test: fix librarian mocks after BlobStore async conversion
2026-04-18 12:05:37 +01:00
cybermaggedon
3505bfdd25
refactor: use one fanout exchange per topic instead of shared topic exchange (#827)
The RabbitMQ backend used a single topic exchange per topicspace
with routing keys to differentiate logical topics. This meant the
flow service had to manually create named queues for every
processor-topic pair, including producer-side topics — creating
phantom queues that accumulated unread message copies indefinitely.

Replace with one fanout exchange per logical topic. Consumers now
declare and bind their own queues on connect. The flow service
manages topic lifecycle (create/delete exchanges) rather than queue
lifecycle, and only collects unique topic identifiers instead of
per-processor (topic, subscription) pairs.

Backend API: create_queue/delete_queue/ensure_queue replaced with
create_topic/delete_topic/ensure_topic (subscription parameter
removed).
2026-04-17 18:01:35 +01:00
cybermaggedon
9f84891fcc
Flow service lifecycle management (#822)
feat: separate flow service from config service with explicit queue
lifecycle management

The flow service is now an independent service that owns the lifecycle
of flow and blueprint queues. System services own their own queues.
Consumers never create queues.

Flow service separation:
- New service at trustgraph-flow/trustgraph/flow/service/
- Uses async ConfigClient (RequestResponse pattern) to talk to config
  service
- Config service stripped of all flow handling

Queue lifecycle management:
- PubSubBackend protocol gains create_queue, delete_queue,
  queue_exists, ensure_queue — all async
- RabbitMQ: implements via pika with asyncio.to_thread internally
- Pulsar: stubs for future admin REST API implementation
- Consumer _connect() no longer creates queues (passive=True for named
  queues)
- System services call ensure_queue on startup
- Flow service creates queues on flow start, deletes on flow stop
- Flow service ensures queues for pre-existing flows on startup

Two-phase flow stop:
- Phase 1: set flow status to "stopping", delete processor config
  entries
- Phase 2: retry queue deletion, then delete flow record

Config restructure:
- active-flow config replaced with processor:{name} types
- Each processor has its own config type, each flow variant is a key
- Flow start/stop use batch put/delete — single config push per
  operation
- FlowProcessor subscribes to its own type only

Blueprint format:
- Processor entries split into topics and parameters dicts
- Flow interfaces use {"flow": "topic"} instead of bare strings
- Specs (ConsumerSpec, ProducerSpec, etc.) read from
  definition["topics"]

Tests updated
2026-04-16 17:19:39 +01:00
cybermaggedon
c20e6540ec
Subscriber resilience and RabbitMQ fixes (#765)
Subscriber resilience: recreate consumer after connection failure

- Move consumer creation from Subscriber.start() into the run() loop,
  matching the pattern used by Consumer. If the connection drops and the
  consumer is closed in the finally block, the loop now recreates it on
  the next iteration instead of spinning forever on a None consumer.

Consumer thread safety:
- Dedicated ThreadPoolExecutor per consumer so all pika operations
  (create, receive, acknowledge, negative_acknowledge) run on the
  same thread — pika BlockingConnection is not thread-safe
- Applies to both Consumer and Subscriber classes

Config handler type audit — fix four mismatched type registrations:
- librarian: was ["librarian"] (non-existent type), now ["flow",
  "active-flow"] (matches config["flow"] that the handler reads)
- cores/service: was ["kg-core"], now ["flow"] (reads
  config["flow"])
- metering/counter: was ["token-costs"], now ["token-cost"]
  (singular)
- agent/mcp_tool: was ["mcp-tool"], now ["mcp"] (reads
  config["mcp"])

Update tests
2026-04-07 14:51:14 +01:00
cybermaggedon
4acd853023
Config push notify pattern: replace stateful pub/sub with signal+ fetch (#760)
Replace the config push mechanism that broadcast the full config
blob on a 'state' class pub/sub queue with a lightweight notify
signal containing only the version number and affected config
types. Processors fetch the full config via request/response from
the config service when notified.

This eliminates the need for the pub/sub 'state' queue class and
stateful pub/sub services entirely. The config push queue moves
from 'state' to 'flow' class — a simple transient signal rather
than a retained message.  This solves the RabbitMQ
late-subscriber problem where restarting processes never received
the current config because their fresh queue had no historical
messages.

Key changes:
- ConfigPush schema: config dict replaced with types list
- Subscribe-then-fetch startup with retry: processors subscribe
  to notify queue, fetch config via request/response, then
  process buffered notifies with version comparison to avoid race
  conditions
- register_config_handler() accepts optional types parameter so
  handlers only fire when their config types change
- Short-lived config request/response clients to avoid subscriber
  contention on non-persistent response topics
- Config service passes affected types through put/delete/flow
  operations
- Gateway ConfigReceiver rewritten with same notify pattern and
  retry loop

Tests updated

New tests:
- register_config_handler: without types, with types, multiple
  types, multiple handlers
- on_config_notify: old/same version skipped, irrelevant types
  skipped (version still updated), relevant type triggers fetch,
  handler without types always called, mixed handler filtering,
  empty types invokes all, fetch failure handled gracefully
- fetch_config: returns config+version, raises on error response,
  stops client even on exception
- fetch_and_apply_config: applies to all handlers on startup,
  retries on failure
2026-04-06 16:57:27 +01:00
cybermaggedon
5c6fe90fe2
Add universal document decoder with multi-format support (#705)
Add universal document decoder with multi-format support
using 'unstructured'.

New universal decoder service powered by the unstructured
library, handling DOCX, XLSX, PPTX, HTML, Markdown, CSV, RTF,
ODT, EPUB and more through a single service. Tables are preserved
as HTML markup for better downstream extraction. Images are
stored in the librarian but excluded from the text
pipeline. Configurable section grouping strategies
(whole-document, heading, element-type, count, size) for non-page
formats. Page-based formats (PDF, PPTX, XLSX) are automatically
grouped by page.

All four decoders (PDF, Mistral OCR, Tesseract OCR, universal)
now share the "document-decoder" ident so they are
interchangeable.  PDF-only decoders fetch document metadata to
check MIME type and gracefully skip unsupported formats.

Librarian changes: removed MIME type whitelist validation so any
document format can be ingested. Simplified routing so text/plain
goes to text-load and everything else goes to document-load.
Removed dual inline/streaming data paths — documents always use
document_id for content retrieval.

New provenance entity types (tg:Section, tg:Image) and metadata
predicates (tg:elementTypes, tg:tableCount, tg:imageCount) for
richer explainability.

Universal decoder is in its own package (trustgraph-unstructured)
and container image (trustgraph-unstructured).
2026-03-23 12:56:35 +00:00
cybermaggedon
286f762369
The id field in pipeline Metadata was being overwritten at each processing (#686)
The id field in pipeline Metadata was being overwritten at each processing
stage (document → page → chunk), causing knowledge storage to create
separate cores per chunk instead of grouping by document.

Add a root field that:
- Is set by librarian to the original document ID
- Is copied unchanged through PDF decoder, chunkers, and extractors
- Is used by knowledge storage for document_id grouping (with fallback to id)

Changes:
- Add root field to Metadata schema with empty string default
- Set root=document.id in librarian when initiating document processing
- Copy root through PDF decoder, recursive chunker, and all extractors
- Update knowledge storage to use root (or id as fallback) for grouping
- Add root handling to translators and gateway serialization
- Update test mock Metadata class to include root parameter
2026-03-11 12:16:39 +00:00
cybermaggedon
aa4f5c6c00
Remove redundant metadata (#685)
The metadata field (list of triples) in the pipeline Metadata class
was redundant. Document metadata triples already flow directly from
librarian to triple-store via emit_document_provenance() - they don't
need to pass through the extraction pipeline.

Additionally, chunker and PDF decoder were overwriting metadata to []
anyway, so any metadata passed through the pipeline was being
discarded.

Changes:
- Remove metadata field from Metadata dataclass
  (schema/core/metadata.py)
- Update all Metadata instantiations to remove metadata=[]
  parameter
- Remove metadata handling from translators (document_loading,
  knowledge)
- Remove metadata consumption from extractors (ontology, agent)
- Update gateway serializers and import handlers
- Update all unit, integration, and contract tests
2026-03-11 10:51:39 +00:00
cybermaggedon
ca525b679f
Filter answers from document lists (#683) 2026-03-10 17:22:19 +00:00
cybermaggedon
3c3e11bef5
Fix/librarian broken (#674)
* Set end-of-stream cleanly - clean streaming message structures

* Add tg-get-document-content
2026-03-09 13:36:24 +00:00
cybermaggedon
df1808768d
Fix/doc streaming proto (#673)
* Librarian streaming doc download

* Document stream download endpoint
2026-03-09 12:36:10 +00:00
cybermaggedon
cd5580be59
Extract-time provenance (#661)
1. Shared Provenance Module - URI generators, namespace constants,
   triple builders, vocabulary bootstrap
2. Librarian - Emits document metadata to graph on processing
   initiation (vocabulary bootstrap + PROV-O triples)
3. PDF Extractor - Saves pages as child documents, emits parent-child
   provenance edges, forwards page IDs
4. Chunker - Saves chunks as child documents, emits provenance edges,
   forwards chunk ID + content
5. Knowledge Extractors (both definitions and relationships):
   - Link entities to chunks via SUBJECT_OF (not top-level document)
   - Removed duplicate metadata emission (now handled by librarian)
   - Get chunk_doc_id and chunk_uri from incoming Chunk message
6. Embedding Provenance:
   - EntityContext schema has chunk_id field
   - EntityEmbeddings schema has chunk_id field
   - Definitions extractor sets chunk_id when creating EntityContext
   - Graph embeddings processor passes chunk_id through to
     EntityEmbeddings

Provenance Flow:
Document → Page (PDF) → Chunk → Extracted Facts/Embeddings
    ↓           ↓          ↓              ↓
  librarian  librarian  librarian    (chunk_id reference)
  + graph    + graph    + graph

Each artifact is stored in librarian with parent-child linking, and PROV-O
edges are emitted to the knowledge graph for full traceability from any
extracted fact back to its source document.

Also, updating tests
2026-03-05 18:36:10 +00:00
cybermaggedon
d8f0a576af
Document API updates (#660)
* Doc streaming from librarian

* Fix chunk minimum confusion

* Add CLI args
2026-03-05 15:20:45 +00:00
cybermaggedon
a630e143ef
Incremental / large document loading (#659)
Tech spec

BlobStore (trustgraph-flow/trustgraph/librarian/blob_store.py):
- get_stream() - yields document content in chunks for streaming retrieval
- create_multipart_upload() - initializes S3 multipart upload, returns
  upload_id
- upload_part() - uploads a single part, returns etag
- complete_multipart_upload() - finalizes upload with part etags
- abort_multipart_upload() - cancels and cleans up

Cassandra schema (trustgraph-flow/trustgraph/tables/library.py):
- New upload_session table with 24-hour TTL
- Index on user for listing sessions
- Prepared statements for all operations
- Methods: create_upload_session(), get_upload_session(),
  update_upload_session_chunk(), delete_upload_session(),
  list_upload_sessions()

- Schema extended with UploadSession, UploadProgress, and new
  request/response fields
- Librarian methods: begin_upload, upload_chunk, complete_upload,
  abort_upload, get_upload_status, list_uploads
- Service routing for all new operations
- Python SDK with transparent chunked upload:
  - add_document() auto-switches to chunked for files > 10MB
  - Progress callback support (on_progress)
  - get_pending_uploads(), get_upload_status(), abort_upload(),
    resume_upload()

- Document table: Added parent_id and document_type columns with index
- Document schema (knowledge/document.py): Added document_id field for
  streaming retrieval
- Librarian operations:
  - add-child-document for extracted PDF pages
  - list-children to get child documents
  - stream-document for chunked content retrieval
  - Cascade delete removes children when parent is deleted
  - list-documents filters children by default
- PDF decoder (decoding/pdf/pdf_decoder.py): Updated to stream large
  documents from librarian API to temp file
- Librarian service (librarian/service.py): Sends document_id instead of
  content for large PDFs (>2MB)
- Deprecated tools (load_pdf.py, load_text.py): Added deprecation
  warnings directing users to tg-add-library-document +
  tg-start-library-processing

Remove load_pdf and load_text utils

Move chunker/librarian comms to base class

Updating tests
2026-03-04 16:57:58 +00:00
cybermaggedon
387afee7b7
Fix load-doc (#610) 2026-01-14 15:46:29 +00:00
cybermaggedon
7a5bf47959
Fix collection existence test logic (#597) 2026-01-05 16:31:26 +00:00
cybermaggedon
25563bae3c
Change MinIO integration options in librarian to be more generic - to support a Garage integration (#594)
* Tweak object store parameters to be more generic for other S3-type store integration

* Update librarian to have region & SSL params

* Update MinIO migration tech spec
2025-12-27 18:01:51 +00:00
cybermaggedon
34eb083836
Messaging fabric plugins (#592)
* Plugin architecture for messaging fabric

* Schemas use a technology neutral expression

* Schemas strictness has uncovered some incorrect schema use which is fixed
2025-12-17 21:40:43 +00:00
cybermaggedon
39f6a8b940
Fix/queue configurations (#585)
* Fix config-svc startup dupe CLI args

* Fix missing params on collection service

* Fix collection management handling
2025-12-06 14:54:47 +00:00
cybermaggedon
7d07f802a8
Basic multitenant support (#583)
* Tech spec

* Address multi-tenant queue option problems in CLI

* Modified collection service to use config

* Changed storage management to use the config service definition
2025-12-05 21:45:30 +00:00
cybermaggedon
310a2deb06
Feature/streaming llm phase 1 (#566)
* Tidy up duplicate tech specs in doc directory

* Streaming LLM text-completion service tech spec.

* text-completion and prompt interfaces

* streaming change applied to all LLMs, so far tested with VertexAI

* Skip Pinecone unit tests, upstream module issue is affecting things, tests are passing again

* Added agent streaming, not working and has broken tests
2025-11-26 09:59:10 +00:00
cybermaggedon
943a9d83b0
Fix Minio incompatible library change (#565) 2025-11-25 22:45:48 +00:00
cybermaggedon
3e23d3c3ed
Fix collection management sync prob (#544)
* Address creation/deletion sync problems

* Fix object writer management

* Get Milvus to use ANN
2025-09-30 23:04:28 +01:00
cybermaggedon
52b133fc86
Collection delete pt. 3 (#542)
* Fixing collection deletion

* Fixing collection management param error

* Always test for collections

* Add Cassandra collection table

* Updated tech spec for explicit creation/deletion

* Remove implicit collection creation

* Fix up collection tracking in all processors
2025-09-30 16:02:33 +01:00
cybermaggedon
fcd15d1833
Collection management part 2 (#522)
* Plumb collection manager into librarian

* Test end-to-end
2025-09-19 16:08:47 +01:00
cybermaggedon
13ff7d765d
Collection management (#520)
* Tech spec

* Refactored Cassanda knowledge graph for single table

* Collection management, librarian services to manage metadata and collection deletion
2025-09-18 15:57:52 +01:00
cybermaggedon
c078ca45cd
Refactor more Cassandra stuff to use the helper (#490) 2025-09-04 12:54:58 +01:00
cybermaggedon
85e669c763
Fixing more Cassandra consistency issues (#488)
* Fixing more Cassandra work

* Fix tests
2025-09-04 00:58:11 +01:00
cybermaggedon
6c681967ab
Validate librarian collection (#453) 2025-08-07 21:36:24 +01:00
cybermaggedon
dd70aade11
Implement logging strategy (#444)
* Logging strategy and convert all prints() to logging invocations
2025-07-30 23:18:38 +01:00
cybermaggedon
807c19fd22
knowledge service (#367)
* Write knowledge core elements to Cassandra

* Store service works, building management service

* kg-manager
2025-05-06 23:44:10 +01:00
cybermaggedon
54e475fa3a
Sample docs loader (#365) 2025-05-06 13:43:17 +01:00
cybermaggedon
8146f0f2ff
Librarian doc submission (#362) 2025-05-04 22:56:47 +01:00
cybermaggedon
ff28d26f4d
Feature/flow librarian (#361)
* Update librarian to new API

* Implementing new schema with document + processing objects
2025-05-04 22:26:19 +01:00
cybermaggedon
a9197d11ee
Feature/configure flows (#345)
- Keeps processing in different flows separate so that data can go to different stores / collections etc.
- Potentially supports different processing flows
- Tidies the processing API with common base-classes for e.g. LLMs, and automatic configuration of 'clients' to use the right queue names in a flow
2025-04-22 20:21:38 +01:00
Tyler Oliver
fe422b2b95
Add support for Cassandra auth with SSL check (#318)
Following recommended approach in Datastax documenation I've added the necessary TLS/SSL check

https://docs.datastax.com/en/developer/python-driver/3.17/security/index.html
2025-03-20 22:25:23 +00:00
cybermaggedon
f1559c5944
Feature/librarian (#310)
* Add fields to library schema

* Added list function, incomplete

* Librarian list operation
2025-03-11 16:52:59 +00:00
cybermaggedon
f7df2df266
Feature/librarian (#307)
* Bring QDrant up-to-date

* Tables for data from queue outputs

- Pass single Pulsar client to everything in gateway & librarian
- Pulsar listener-name support in gateway
- PDF and text load working in librarian

* Complete Cassandra schema

* Add librarian support to templates
2025-02-12 23:39:24 +00:00
cybermaggedon
f350abb415
Maint/asyncio (#305)
* Move to asyncio services, even though everything is largely sync
2025-02-11 23:24:46 +00:00
cybermaggedon
a0bf2362f6
Librarian (#304) 2025-02-11 16:01:03 +00:00