From 71517e6417ec3f658eb46b381576f9dcc8bdc580 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 18 May 2026 09:46:58 +0100 Subject: [PATCH] release/v2.4 -> master (#932) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * CLI auth migration, document embeddings core lifecycle (#913) Migrate get_kg_core and put_kg_core CLI tools to use Api/SocketClient with first-frame auth (fixes broken raw websocket path). Fix wire format field names (root/vector). Remove ~600 lines of dead raw websocket code from invoke_graph_rag.py. Add document embeddings core lifecycle to the knowledge service: list/get/put/delete/load operations across schema, translator, Cassandra table store, knowledge manager, gateway registry, REST API, socket client, and CLI (tg-get-de-core, tg-put-de-core). Fix delete_kg_core to also clean up document embeddings rows. * Remove spurious workspace parameter from SPARQL algebra evaluator (#915) Fix threading of workspace paramater: - The SPARQL algebra evaluator was threading a workspace parameter through every function and passing it to TriplesClient.query(), which doesn't accept it. Workspace isolation is handled by pub/sub topic routing — the TriplesClient is already scoped to a workspace-specific flow, same as GraphRAG. Passing workspace explicitly was both incorrect and unnecessary. Update tests: - tests/unit/test_query/test_sparql_algebra.py (new) — Tests _query_pattern, _eval_bgp, and evaluate() with various algebra nodes. Key tests assert workspace is never in tc.query() kwargs, plus correctness tests for BGP, JOIN, UNION, SLICE, DISTINCT, and edge cases. - tests/unit/test_retrieval/test_graph_rag.py — Added test_triples_query_never_passes_workspace (checks query()) and test_follow_edges_never_passes_workspace (checks query_stream()). * Make all Cassandra and Qdrant I/O async-safe with proper concurrency controls (#916) Cassandra triples services were using syncronous EntityCentricKnowledgeGraph methods from async contexts, and connection state was managed with threading.local which is wrong for asyncio coroutines sharing a single thread. Qdrant services had no async wrapping at all, blocking the event loop on every network call. Rows services had unprotected shared state mutations across concurrent coroutines. - Add async methods to EntityCentricKnowledgeGraph (async_insert, async_get_s/p/o/sp/po/os/spo/all, async_collection_exists, async_create_collection, async_delete_collection) using the existing cassandra_async.async_execute bridge - Rewrite triples write + query services: replace threading.local with asyncio.Lock + dict cache for per-workspace connections, use async ECKG methods for all data operations, keep asyncio.to_thread only for one-time blocking ECKG construction - Wrap all Qdrant calls in asyncio.to_thread across all 6 services (doc/graph/row embeddings write + query), add asyncio.Lock + set cache for collection existence checks - Add asyncio.Lock to rows write + query services to protect shared state (schemas, sessions, config caches) from concurrent mutation - Update all affected tests to match new async patterns * Fixed error only returning a page of results (#921) The root cause: async_execute only materialises the first result page (by design — it says so in its docstring). The streaming query set fetch_size=20 and expected to iterate all results, but only got the first 20 rows back. The fix uses asyncio.to_thread(lambda: list(tg.session.execute(...))) which lets the sync driver iterate all pages in a worker thread — exactly what the pre-async code did. * Optional test warning suppression (#923) * Fix test collection module errors & silence upstream Pytest warnings (#823) * chore: add virtual environment and .env directories to gitignore * test: filter upstream DeprecationWarning and UserWarning messages * fix(namespace): remove empty __init__.py files to fix PEP 420 implicit namespace routing for trustgraph sub-packages * Revert __init__.py deletions * Add .ini changes but commented out, will be useful at times --------- Co-authored-by: Salil M * fix(openai): fail fast on unrecoverable RateLimitError codes (#901) (#904) (#925) Co-authored-by: Sahil Yadav * Ensure retry exception is properly raised (#926) * fix: library API get/update document round-trip bugs (#893) (#928) Fix 5 cascading bugs in the Library API wrapper that prevented the get_documents → update_document round-trip from working: - Tolerate missing title field in document metadata (use .get()) - Use attribute access on Triple objects instead of subscript - Serialize datetime to int seconds for JSON compatibility - Handle empty server response on successful update - Send both id and document-id keys in update request Added library API tests * Fix ontology selector defaults, add bypass mode, enforce domain/range (#929) - Align similarity_threshold default to 0.3 everywhere (class signature had stale 0.7). Fix matching contradiction in tech-spec. - Add bypass_selector_below parameter (default 5) to skip vector similarity selection when ontology element count is small enough. - Enforce domain/range constraints in TripleConverter for object properties and datatype properties, with subclass hierarchy support. Properties with no declared domain/range pass through unchanged. - Add unit tests for domain/range validation, subclass acceptance, polymorphic pass-through, and selector bypass. Fixes #908, #920 * Close producers on flow stop to prevent stale non-persistent topics (#930) Flow.stop() only stopped consumers, leaving response producers connected to non-persistent Pulsar topics. After flow restart, the orphaned producers held stale broker routing state, causing response messages to never reach new consumers — manifesting as 120s timeouts on document-embeddings and similar RPC paths. Fix: Flow.stop() now explicitly stops all producers. Producer.stop() closes the underlying Pulsar producer connection rather than just setting a flag. Fixes #906 * fix(gateway): propagate --timeout flag to per-service dispatchers (#931) The api-gateway accepts a --timeout flag (default 600s) but the value was not propagated into DispatcherManager, which hard-coded timeout=120 for every per-service dispatcher (graph-rag, document-rag, text-completion, embeddings, librarian, etc.). This meant any synchronous request taking more than 120 seconds would always return a Timeout error at the 120s mark, regardless of the --timeout value set on the gateway. Changes: - Add timeout parameter to DispatcherManager.__init__ (default: 120 for backward compatibility) - Store self.timeout in DispatcherManager - Replace both hardcoded timeout=120 with self.timeout in invoke_global_service and invoke_flow_service - Pass self.timeout from Api to DispatcherManager in service.py - Document the timeout parameter in the docstring Fixes #894 --------- Co-authored-by: Salil M Co-authored-by: Sahil Yadav Co-authored-by: Mister Lobster --- docs/tech-specs/ontorag.md | 2 +- tests/unit/test_api/test_library_api.py | 296 +++++++++++++ .../test_triple_converter_validation.py | 389 ++++++++++++++++++ trustgraph-base/trustgraph/api/library.py | 27 +- trustgraph-base/trustgraph/base/flow.py | 2 + trustgraph-base/trustgraph/base/producer.py | 3 + .../trustgraph/extract/kg/ontology/extract.py | 11 +- .../extract/kg/ontology/ontology_selector.py | 52 ++- .../extract/kg/ontology/triple_converter.py | 60 ++- .../trustgraph/gateway/dispatch/manager.py | 14 +- trustgraph-flow/trustgraph/gateway/service.py | 1 + .../model/text_completion/openai/llm.py | 21 +- 12 files changed, 849 insertions(+), 29 deletions(-) create mode 100644 tests/unit/test_api/test_library_api.py create mode 100644 tests/unit/test_extract/test_ontology/test_triple_converter_validation.py diff --git a/docs/tech-specs/ontorag.md b/docs/tech-specs/ontorag.md index 86a3cd19..460e72ba 100644 --- a/docs/tech-specs/ontorag.md +++ b/docs/tech-specs/ontorag.md @@ -278,7 +278,7 @@ The system uses **FAISS (Facebook AI Similarity Search)** with IndexFlatIP for e 3. **Similarity Search**: - For each text segment embedding, search the vector store - Retrieve top-k (e.g., 10) most similar ontology elements - - Apply similarity threshold (e.g., 0.7) to filter weak matches + - Apply similarity threshold (e.g., 0.3) to filter weak matches - Aggregate results across all segments, tracking match frequencies 4. **Dependency Resolution**: diff --git a/tests/unit/test_api/test_library_api.py b/tests/unit/test_api/test_library_api.py new file mode 100644 index 00000000..086ecd63 --- /dev/null +++ b/tests/unit/test_api/test_library_api.py @@ -0,0 +1,296 @@ +""" +Tests for the Library API wrapper round-trip behavior. +Covers the get_documents → update_document path and edge cases +from issue #893. +""" + +import datetime +import pytest +from unittest.mock import MagicMock, patch + +from trustgraph.api.library import Library, to_value, from_value +from trustgraph.api.types import DocumentMetadata, Triple +from trustgraph.knowledge import Uri, Literal + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_library(response=None): + api = MagicMock() + api.workspace = "default" + api.request.return_value = response or {} + lib = Library(api) + return lib, api + + +def _wire_triple(s_iri, p_iri, o_val): + return { + "s": {"t": "i", "i": s_iri}, + "p": {"t": "i", "i": p_iri}, + "o": {"t": "l", "v": o_val}, + } + + +def _doc_wire(id="doc-1", time=1700000000, title="Test Doc", + kind="text/plain", comments="", tags=None, + metadata=None, parent_id="", document_type="source", + include_title=True): + doc = { + "id": id, + "time": time, + "kind": kind, + "comments": comments, + "metadata": metadata or [], + "tags": tags or [], + "parent-id": parent_id, + "document-type": document_type, + } + if include_title: + doc["title"] = title + return doc + + +# --------------------------------------------------------------------------- +# Bug 1: get_documents tolerates missing title +# --------------------------------------------------------------------------- + +class TestGetDocumentsMissingTitle: + + def test_missing_title_defaults_to_empty(self): + doc = _doc_wire(include_title=False) + lib, api = _make_library({"document-metadatas": [doc]}) + + result = lib.get_documents() + + assert len(result) == 1 + assert result[0].title == "" + + def test_present_title_preserved(self): + doc = _doc_wire(title="My Title") + lib, api = _make_library({"document-metadatas": [doc]}) + + result = lib.get_documents() + + assert result[0].title == "My Title" + + +# --------------------------------------------------------------------------- +# Bug 2: update_document handles Triple objects (attribute access) +# --------------------------------------------------------------------------- + +class TestUpdateDocumentTripleAccess: + + def test_triple_objects_serialized_correctly(self): + lib, api = _make_library({}) + + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(1700000000), + kind="text/plain", + title="Test", + comments="", + metadata=[ + Triple( + s=Uri("http://example.org/entity/alice"), + p=Uri("http://example.org/rel/knows"), + o=Literal("Bob"), + ), + ], + tags=["test"], + ) + + lib.update_document(id="doc-1", metadata=metadata) + + call_args = api.request.call_args[0][1] + triples = call_args["document-metadata"]["metadata"] + + assert len(triples) == 1 + assert triples[0]["s"]["i"] == "http://example.org/entity/alice" + assert triples[0]["p"]["i"] == "http://example.org/rel/knows" + assert triples[0]["o"]["v"] == "Bob" + + def test_empty_metadata_list(self): + lib, api = _make_library({}) + + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(1700000000), + kind="text/plain", + title="Test", + comments="", + metadata=[], + tags=[], + ) + + lib.update_document(id="doc-1", metadata=metadata) + + call_args = api.request.call_args[0][1] + assert call_args["document-metadata"]["metadata"] == [] + + +# --------------------------------------------------------------------------- +# Bug 3: update_document serializes datetime to int seconds +# --------------------------------------------------------------------------- + +class TestUpdateDocumentTimeSerialization: + + def test_datetime_serialized_to_int(self): + lib, api = _make_library({}) + + ts = 1700000000 + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(ts), + kind="text/plain", + title="Test", + comments="", + metadata=[], + tags=[], + ) + + lib.update_document(id="doc-1", metadata=metadata) + + call_args = api.request.call_args[0][1] + wire_time = call_args["document-metadata"]["time"] + + assert isinstance(wire_time, int) + assert wire_time == ts + + def test_int_time_passed_through(self): + lib, api = _make_library({}) + + metadata = DocumentMetadata( + id="doc-1", + time=1700000000, + kind="text/plain", + title="Test", + comments="", + metadata=[], + tags=[], + ) + + lib.update_document(id="doc-1", metadata=metadata) + + call_args = api.request.call_args[0][1] + assert call_args["document-metadata"]["time"] == 1700000000 + + +# --------------------------------------------------------------------------- +# Bug 4: update_document handles empty server response +# --------------------------------------------------------------------------- + +class TestUpdateDocumentEmptyResponse: + + def test_empty_response_returns_input_metadata(self): + lib, api = _make_library({}) + + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(1700000000), + kind="text/plain", + title="Updated Title", + comments="notes", + metadata=[], + tags=["a"], + ) + + result = lib.update_document(id="doc-1", metadata=metadata) + + assert result is metadata + + def test_full_response_parsed(self): + response_doc = _doc_wire( + id="doc-1", title="Server Title", tags=["b"], + ) + lib, api = _make_library({"document-metadata": response_doc}) + + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(1700000000), + kind="text/plain", + title="Client Title", + comments="", + metadata=[], + tags=["a"], + ) + + result = lib.update_document(id="doc-1", metadata=metadata) + + assert result.title == "Server Title" + assert result.tags == ["b"] + + +# --------------------------------------------------------------------------- +# Bug 5: update_document sends both id and document-id +# --------------------------------------------------------------------------- + +class TestUpdateDocumentIdKeys: + + def test_both_id_keys_sent(self): + lib, api = _make_library({}) + + metadata = DocumentMetadata( + id="doc-1", + time=datetime.datetime.fromtimestamp(1700000000), + kind="text/plain", + title="Test", + comments="", + metadata=[], + tags=[], + ) + + lib.update_document(id="doc-1", metadata=metadata) + + call_args = api.request.call_args[0][1] + doc_meta = call_args["document-metadata"] + + assert doc_meta["id"] == "doc-1" + assert doc_meta["document-id"] == "doc-1" + + +# --------------------------------------------------------------------------- +# Round-trip: get_documents → update_document +# --------------------------------------------------------------------------- + +class TestGetUpdateRoundTrip: + + def test_full_round_trip(self): + wire_doc = _doc_wire( + id="doc-42", + title="Original", + tags=["v1"], + metadata=[_wire_triple( + "http://example.org/e/1", + "http://example.org/r/type", + "report", + )], + ) + + lib, api = _make_library({"document-metadatas": [wire_doc]}) + + docs = lib.get_documents() + assert len(docs) == 1 + + doc = docs[0] + doc.title = "Updated" + doc.tags.append("v2") + + # Server returns empty on update + api.request.return_value = {} + result = lib.update_document(id=doc.id, metadata=doc) + + # Should not raise, should return the input metadata + assert result.title == "Updated" + assert "v2" in result.tags + + # Verify the wire format sent + call_args = api.request.call_args[0][1] + doc_meta = call_args["document-metadata"] + + assert doc_meta["id"] == "doc-42" + assert doc_meta["title"] == "Updated" + assert isinstance(doc_meta["time"], int) + assert len(doc_meta["metadata"]) == 1 + assert doc_meta["metadata"][0]["o"]["v"] == "report" diff --git a/tests/unit/test_extract/test_ontology/test_triple_converter_validation.py b/tests/unit/test_extract/test_ontology/test_triple_converter_validation.py new file mode 100644 index 00000000..195e8adf --- /dev/null +++ b/tests/unit/test_extract/test_ontology/test_triple_converter_validation.py @@ -0,0 +1,389 @@ +""" +Tests for TripleConverter domain/range enforcement and +OntologySelector bypass for small ontologies. + +Covers fixes for #908 (bypass_selector_below) and #920 (domain/range validation). +""" + +import pytest +from unittest.mock import Mock, AsyncMock + +from trustgraph.extract.kg.ontology.triple_converter import TripleConverter +from trustgraph.extract.kg.ontology.ontology_selector import ( + OntologySelector, + OntologySubset, +) +from trustgraph.extract.kg.ontology.ontology_loader import ( + Ontology, + OntologyClass, + OntologyProperty, +) +from trustgraph.extract.kg.ontology.simplified_parser import ( + Relationship, + Attribute, +) +from trustgraph.extract.kg.ontology.text_processor import TextSegment + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def ontology_subset(): + """Ontology subset with classes, hierarchy, and constrained properties.""" + return OntologySubset( + ontology_id="test", + classes={ + "Person": { + "uri": "http://example.org/Person", + "type": "owl:Class", + "labels": [{"value": "Person"}], + "subclass_of": None, + }, + "Employee": { + "uri": "http://example.org/Employee", + "type": "owl:Class", + "labels": [{"value": "Employee"}], + "subclass_of": "Person", + }, + "Manager": { + "uri": "http://example.org/Manager", + "type": "owl:Class", + "labels": [{"value": "Manager"}], + "subclass_of": "Employee", + }, + "Company": { + "uri": "http://example.org/Company", + "type": "owl:Class", + "labels": [{"value": "Company"}], + "subclass_of": None, + }, + "Product": { + "uri": "http://example.org/Product", + "type": "owl:Class", + "labels": [{"value": "Product"}], + "subclass_of": None, + }, + }, + object_properties={ + "worksFor": { + "uri": "http://example.org/worksFor", + "type": "owl:ObjectProperty", + "labels": [{"value": "works for"}], + "domain": "Person", + "range": "Company", + }, + "manages": { + "uri": "http://example.org/manages", + "type": "owl:ObjectProperty", + "labels": [{"value": "manages"}], + "domain": "Manager", + "range": "Employee", + }, + "relatedTo": { + "uri": "http://example.org/relatedTo", + "type": "owl:ObjectProperty", + "labels": [{"value": "related to"}], + "domain": None, + "range": None, + }, + }, + datatype_properties={ + "employeeId": { + "uri": "http://example.org/employeeId", + "type": "owl:DatatypeProperty", + "labels": [{"value": "employee ID"}], + "domain": "Employee", + }, + "description": { + "uri": "http://example.org/description", + "type": "owl:DatatypeProperty", + "labels": [{"value": "description"}], + "domain": None, + }, + }, + metadata={"name": "Test Ontology"}, + ) + + +@pytest.fixture +def converter(ontology_subset): + return TripleConverter(ontology_subset=ontology_subset, ontology_id="test") + + +# --------------------------------------------------------------------------- +# Domain/range enforcement — relationships +# --------------------------------------------------------------------------- + +class TestRelationshipDomainRange: + + def test_valid_domain_and_range(self, converter): + rel = Relationship( + subject="Alice", subject_type="Person", + relation="worksFor", + object="Acme Corp", object_type="Company", + ) + triple = converter.convert_relationship(rel) + assert triple is not None + + def test_domain_violation_rejected(self, converter): + rel = Relationship( + subject="Widget", subject_type="Product", + relation="worksFor", + object="Acme Corp", object_type="Company", + ) + assert converter.convert_relationship(rel) is None + + def test_range_violation_rejected(self, converter): + rel = Relationship( + subject="Alice", subject_type="Person", + relation="worksFor", + object="Widget", object_type="Product", + ) + assert converter.convert_relationship(rel) is None + + def test_both_domain_and_range_violated(self, converter): + rel = Relationship( + subject="Widget", subject_type="Product", + relation="worksFor", + object="Gadget", object_type="Product", + ) + assert converter.convert_relationship(rel) is None + + +# --------------------------------------------------------------------------- +# Subclass acceptance +# --------------------------------------------------------------------------- + +class TestSubclassAcceptance: + + def test_direct_subclass_matches_domain(self, converter): + """Employee is subclass of Person; worksFor domain is Person.""" + rel = Relationship( + subject="Bob", subject_type="Employee", + relation="worksFor", + object="Acme Corp", object_type="Company", + ) + assert converter.convert_relationship(rel) is not None + + def test_transitive_subclass_matches_domain(self, converter): + """Manager → Employee → Person; worksFor domain is Person.""" + rel = Relationship( + subject="Carol", subject_type="Manager", + relation="worksFor", + object="Acme Corp", object_type="Company", + ) + assert converter.convert_relationship(rel) is not None + + def test_subclass_matches_range(self, converter): + """manages range is Employee; Manager is subclass of Employee.""" + rel = Relationship( + subject="Carol", subject_type="Manager", + relation="manages", + object="Dave", object_type="Manager", + ) + assert converter.convert_relationship(rel) is not None + + def test_superclass_does_not_match_subclass_constraint(self, converter): + """manages domain is Manager; Person is NOT a subclass of Manager.""" + rel = Relationship( + subject="Alice", subject_type="Person", + relation="manages", + object="Bob", object_type="Employee", + ) + assert converter.convert_relationship(rel) is None + + +# --------------------------------------------------------------------------- +# Polymorphic properties (no domain/range) +# --------------------------------------------------------------------------- + +class TestPolymorphicProperties: + + def test_no_domain_no_range_allows_anything(self, converter): + rel = Relationship( + subject="Alice", subject_type="Person", + relation="relatedTo", + object="Acme Corp", object_type="Company", + ) + assert converter.convert_relationship(rel) is not None + + def test_polymorphic_with_unrelated_types(self, converter): + rel = Relationship( + subject="Widget", subject_type="Product", + relation="relatedTo", + object="Bob", object_type="Employee", + ) + assert converter.convert_relationship(rel) is not None + + +# --------------------------------------------------------------------------- +# Datatype property domain enforcement +# --------------------------------------------------------------------------- + +class TestAttributeDomainValidation: + + def test_valid_domain(self, converter): + attr = Attribute( + entity="Bob", entity_type="Employee", + attribute="employeeId", value="E-1234", + ) + assert converter.convert_attribute(attr) is not None + + def test_subclass_matches_domain(self, converter): + """Manager is subclass of Employee; employeeId domain is Employee.""" + attr = Attribute( + entity="Carol", entity_type="Manager", + attribute="employeeId", value="M-5678", + ) + assert converter.convert_attribute(attr) is not None + + def test_domain_violation_rejected(self, converter): + attr = Attribute( + entity="Acme Corp", entity_type="Company", + attribute="employeeId", value="E-0000", + ) + assert converter.convert_attribute(attr) is None + + def test_no_domain_allows_anything(self, converter): + attr = Attribute( + entity="Widget", entity_type="Product", + attribute="description", value="A useful widget", + ) + assert converter.convert_attribute(attr) is not None + + +# --------------------------------------------------------------------------- +# OntologySelector bypass for small ontologies (#908) +# --------------------------------------------------------------------------- + +def _make_ontology(n_classes, n_obj_props=0, n_dt_props=0): + classes = { + f"C{i}": OntologyClass(uri=f"http://example.org/C{i}") + for i in range(n_classes) + } + obj_props = { + f"op{i}": OntologyProperty( + uri=f"http://example.org/op{i}", type="owl:ObjectProperty" + ) + for i in range(n_obj_props) + } + dt_props = { + f"dp{i}": OntologyProperty( + uri=f"http://example.org/dp{i}", type="owl:DatatypeProperty" + ) + for i in range(n_dt_props) + } + return Ontology( + id="tiny", + metadata={"name": "Tiny"}, + classes=classes, + object_properties=obj_props, + datatype_properties=dt_props, + ) + + +def _make_loader(ontology): + loader = Mock() + loader.get_ontology.return_value = ontology + loader.get_all_ontologies.return_value = {"tiny": ontology} + return loader + + +class TestBypassSelectorBelow: + + async def test_bypass_returns_full_ontology(self): + """With 3 elements and bypass_selector_below=5, selector is bypassed.""" + ont = _make_ontology(2, 1, 0) + loader = _make_loader(ont) + embedder = Mock() + + selector = OntologySelector( + ontology_embedder=embedder, + ontology_loader=loader, + bypass_selector_below=5, + ) + + segments = [TextSegment(text="some text", type="sentence", position=0)] + subsets = await selector.select_ontology_subset(segments) + + assert len(subsets) == 1 + assert subsets[0].ontology_id == "tiny" + assert len(subsets[0].classes) == 2 + assert len(subsets[0].object_properties) == 1 + assert subsets[0].relevance_score == 1.0 + # Embedder should never be called + embedder.embed_text.assert_not_called() + + async def test_no_bypass_when_above_threshold(self): + """With 10 elements and bypass_selector_below=5, selector runs normally.""" + ont = _make_ontology(6, 3, 1) + loader = _make_loader(ont) + + embedder = Mock() + embedder.embed_text = AsyncMock(return_value=[0.1, 0.2]) + vector_store = Mock() + vector_store.size.return_value = 10 + vector_store.search.return_value = [] + embedder.get_vector_store.return_value = vector_store + + selector = OntologySelector( + ontology_embedder=embedder, + ontology_loader=loader, + bypass_selector_below=5, + ) + + segments = [TextSegment(text="some text", type="sentence", position=0)] + subsets = await selector.select_ontology_subset(segments) + + # Vector store was consulted (selector ran normally) + vector_store.size.assert_called_once() + + async def test_bypass_at_exact_threshold_not_triggered(self): + """With exactly 5 elements and bypass_selector_below=5, selector runs (< not <=).""" + ont = _make_ontology(3, 1, 1) # total = 5 + loader = _make_loader(ont) + + embedder = Mock() + embedder.embed_text = AsyncMock(return_value=[0.1, 0.2]) + vector_store = Mock() + vector_store.size.return_value = 5 + vector_store.search.return_value = [] + embedder.get_vector_store.return_value = vector_store + + selector = OntologySelector( + ontology_embedder=embedder, + ontology_loader=loader, + bypass_selector_below=5, + ) + + segments = [TextSegment(text="some text", type="sentence", position=0)] + subsets = await selector.select_ontology_subset(segments) + + # Should NOT bypass — 5 is not < 5 + vector_store.size.assert_called_once() + + async def test_bypass_zero_disables(self): + """bypass_selector_below=0 means bypass never triggers.""" + ont = _make_ontology(0, 0, 0) # empty ontology + loader = _make_loader(ont) + + embedder = Mock() + embedder.embed_text = AsyncMock(return_value=[0.1]) + vector_store = Mock() + vector_store.size.return_value = 0 + vector_store.search.return_value = [] + embedder.get_vector_store.return_value = vector_store + + selector = OntologySelector( + ontology_embedder=embedder, + ontology_loader=loader, + bypass_selector_below=0, + ) + + segments = [TextSegment(text="some text", type="sentence", position=0)] + subsets = await selector.select_ontology_subset(segments) + + # 0 is not < 0, so bypass doesn't trigger + vector_store.size.assert_called_once() diff --git a/trustgraph-base/trustgraph/api/library.py b/trustgraph-base/trustgraph/api/library.py index 024e933d..b3506bb7 100644 --- a/trustgraph-base/trustgraph/api/library.py +++ b/trustgraph-base/trustgraph/api/library.py @@ -365,7 +365,7 @@ class Library: id = v["id"], time = datetime.datetime.fromtimestamp(v["time"]), kind = v["kind"], - title = v["title"], + title = v.get("title", ""), comments = v.get("comments", ""), metadata = [ Triple( @@ -482,14 +482,15 @@ class Library: "workspace": self.api.workspace, "document-metadata": { "document-id": id, - "time": metadata.time, + "id": id, + "time": int(metadata.time.timestamp()) if hasattr(metadata.time, 'timestamp') else metadata.time, "title": metadata.title, "comments": metadata.comments, "metadata": [ { - "s": from_value(t["s"]), - "p": from_value(t["p"]), - "o": from_value(t["o"]), + "s": from_value(t.s), + "p": from_value(t.p), + "o": from_value(t.o), } for t in metadata.metadata ], @@ -498,14 +499,17 @@ class Library: } object = self.request(input) - doc = object["document-metadata"] + doc = object.get("document-metadata") if isinstance(object, dict) else None + + if not doc: + return metadata try: - DocumentMetadata( + return DocumentMetadata( id = doc["id"], time = datetime.datetime.fromtimestamp(doc["time"]), kind = doc["kind"], - title = doc["title"], + title = doc.get("title", ""), comments = doc.get("comments", ""), metadata = [ Triple( @@ -513,10 +517,11 @@ class Library: p = to_value(w["p"]), o = to_value(w["o"]) ) - for w in doc["metadata"] + for w in doc.get("metadata", []) ], - workspace = doc.get("workspace", ""), - tags = doc["tags"] + tags = doc.get("tags", []), + parent_id = doc.get("parent-id", ""), + document_type = doc.get("document-type", "source"), ) except Exception as e: logger.error("Failed to parse document update response", exc_info=True) diff --git a/trustgraph-base/trustgraph/base/flow.py b/trustgraph-base/trustgraph/base/flow.py index 0f42bbe2..3b928d3e 100644 --- a/trustgraph-base/trustgraph/base/flow.py +++ b/trustgraph-base/trustgraph/base/flow.py @@ -34,6 +34,8 @@ class Flow: async def stop(self): for c in self.consumer.values(): await c.stop() + for p in self.producer.values(): + await p.stop() if self.librarian: await self.librarian.stop() diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index 20b4b0d6..9af9d22e 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -34,6 +34,9 @@ class Producer: async def stop(self): self.running = False + if self.producer: + self.producer.close() + self.producer = None async def send(self, msg, properties={}): diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 1d45d3f9..6a43e547 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -121,6 +121,7 @@ class Processor(FlowProcessor): # Configuration self.top_k = params.get("top_k", 10) self.similarity_threshold = params.get("similarity_threshold", 0.3) + self.bypass_selector_below = params.get("bypass_selector_below", 5) # Per-workspace ontology version tracking self.current_ontology_versions = {} # workspace -> version @@ -187,7 +188,8 @@ class Processor(FlowProcessor): ontology_embedder=ontology_embedder, ontology_loader=loader, top_k=self.top_k, - similarity_threshold=self.similarity_threshold + similarity_threshold=self.similarity_threshold, + bypass_selector_below=self.bypass_selector_below, ) # Store flow-specific components @@ -981,6 +983,13 @@ class Processor(FlowProcessor): default=0.3, help='Similarity threshold for ontology matching (default: 0.3, range: 0.0-1.0)' ) + parser.add_argument( + '--bypass-selector-below', + type=int, + default=5, + help='Bypass ontology selector when total ontology elements ' + '(classes + properties) is below this value (default: 5)' + ) parser.add_argument( '--triples-batch-size', type=int, diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/ontology_selector.py b/trustgraph-flow/trustgraph/extract/kg/ontology/ontology_selector.py index 5111529a..5fd60a0f 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/ontology_selector.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/ontology_selector.py @@ -33,19 +33,44 @@ class OntologySelector: def __init__(self, ontology_embedder: OntologyEmbedder, ontology_loader: OntologyLoader, top_k: int = 10, - similarity_threshold: float = 0.7): - """Initialize the ontology selector. - - Args: - ontology_embedder: Embedder with vector store - ontology_loader: Loader with ontology definitions - top_k: Number of top results to retrieve per segment - similarity_threshold: Minimum similarity score - """ + similarity_threshold: float = 0.3, + bypass_selector_below: int = 5): self.embedder = ontology_embedder self.loader = ontology_loader self.top_k = top_k self.similarity_threshold = similarity_threshold + self.bypass_selector_below = bypass_selector_below + + def _total_ontology_elements(self) -> int: + total = 0 + for ontology in self.loader.get_all_ontologies().values(): + total += len(ontology.classes) + total += len(ontology.object_properties) + total += len(ontology.datatype_properties) + return total + + def _build_full_subsets(self) -> List[OntologySubset]: + subsets = [] + for ont_id, ontology in self.loader.get_all_ontologies().items(): + subset = OntologySubset( + ontology_id=ont_id, + classes={ + cid: cls.__dict__ + for cid, cls in ontology.classes.items() + }, + object_properties={ + pid: prop.__dict__ + for pid, prop in ontology.object_properties.items() + }, + datatype_properties={ + pid: prop.__dict__ + for pid, prop in ontology.datatype_properties.items() + }, + metadata=ontology.metadata, + relevance_score=1.0, + ) + subsets.append(subset) + return subsets async def select_ontology_subset(self, segments: List[TextSegment]) -> List[OntologySubset]: """Select relevant ontology subsets for text segments. @@ -56,6 +81,15 @@ class OntologySelector: Returns: List of ontology subsets with relevant elements """ + total = self._total_ontology_elements() + if total < self.bypass_selector_below: + logger.info( + f"Ontology has {total} elements (below " + f"bypass_selector_below={self.bypass_selector_below}), " + f"using full ontology" + ) + return self._build_full_subsets() + # Collect all relevant elements relevant_elements = await self._find_relevant_elements(segments) diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/triple_converter.py b/trustgraph-flow/trustgraph/extract/kg/ontology/triple_converter.py index 06fff4f4..d9e6c837 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/triple_converter.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/triple_converter.py @@ -6,7 +6,7 @@ with full URIs and correct is_uri flags. """ import logging -from typing import List, Optional +from typing import List, Optional, Set from .... schema import Triple, Term, IRI, LITERAL from .... rdf import RDF_TYPE, RDF_LABEL @@ -32,6 +32,25 @@ class TripleConverter: self.ontology_id = ontology_id self.entity_registry = EntityRegistry(ontology_id) + def _get_ancestor_classes(self, class_id: str) -> Set[str]: + ancestors = set() + current = class_id + while current: + cls_def = self.ontology_subset.classes.get(current) + if not cls_def: + break + parent = cls_def.get("subclass_of") if isinstance(cls_def, dict) else getattr(cls_def, "subclass_of", None) + if not parent or parent in ancestors: + break + ancestors.add(parent) + current = parent + return ancestors + + def _matches_class_constraint(self, actual_type: str, expected_type: str) -> bool: + if actual_type == expected_type: + return True + return expected_type in self._get_ancestor_classes(actual_type) + def convert_all(self, extraction: ExtractionResult) -> List[Triple]: """Convert complete extraction result to RDF triples. @@ -129,6 +148,29 @@ class TripleConverter: logger.warning(f"Unknown relationship '{relationship.relation}', skipping") return None + # Enforce domain/range constraints when declared + prop_def = self.ontology_subset.object_properties.get( + relationship.relation, {} + ) + domain = prop_def.get("domain") if isinstance(prop_def, dict) else getattr(prop_def, "domain", None) + range_ = prop_def.get("range") if isinstance(prop_def, dict) else getattr(prop_def, "range", None) + + if domain and not self._matches_class_constraint(relationship.subject_type, domain): + logger.warning( + f"Domain violation: '{relationship.relation}' expects " + f"domain '{domain}', got subject type " + f"'{relationship.subject_type}', skipping" + ) + return None + + if range_ and not self._matches_class_constraint(relationship.object_type, range_): + logger.warning( + f"Range violation: '{relationship.relation}' expects " + f"range '{range_}', got object type " + f"'{relationship.object_type}', skipping" + ) + return None + # Generate triple: subject property object return Triple( s=Term(type=IRI, iri=subject_uri), @@ -157,11 +199,25 @@ class TripleConverter: logger.warning(f"Unknown attribute '{attribute.attribute}', skipping") return None + # Enforce domain constraint when declared + prop_def = self.ontology_subset.datatype_properties.get( + attribute.attribute, {} + ) + domain = prop_def.get("domain") if isinstance(prop_def, dict) else getattr(prop_def, "domain", None) + + if domain and not self._matches_class_constraint(attribute.entity_type, domain): + logger.warning( + f"Domain violation: attribute '{attribute.attribute}' " + f"expects domain '{domain}', got entity type " + f"'{attribute.entity_type}', skipping" + ) + return None + # Generate triple: entity property "literal value" return Triple( s=Term(type=IRI, iri=entity_uri), p=Term(type=IRI, iri=property_uri), - o=Term(type=LITERAL, value=attribute.value) # Literal! + o=Term(type=LITERAL, value=attribute.value) ) def _get_class_uri(self, class_id: str) -> Optional[str]: diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 51161f9b..bddb009d 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -135,13 +135,19 @@ class DispatcherWrapper: class DispatcherManager: def __init__(self, backend, config_receiver, auth, - prefix="api-gateway", queue_overrides=None): + prefix="api-gateway", queue_overrides=None, timeout=120): """ ``auth`` is required. It flows into the Mux for first-frame WebSocket authentication and into downstream dispatcher construction. There is no permissive default — constructing a DispatcherManager without an authenticator would be a silent downgrade to no-auth on the socket path. + + ``timeout`` is the per-request timeout in seconds, propagated + to every dispatcher created by this manager. Must match the + gateway's ``--timeout`` flag so that long-running requests + are not prematurely cut off at the old hard-coded 120 s + ceiling. """ if auth is None: raise ValueError( @@ -149,6 +155,8 @@ class DispatcherManager: "is no no-auth mode" ) + self.timeout = timeout + self.backend = backend self.config_receiver = config_receiver self.config_receiver.add_handler(self) @@ -291,7 +299,7 @@ class DispatcherManager: dispatcher = global_dispatchers[kind]( backend = self.backend, - timeout = 120, + timeout = self.timeout, consumer = consumer_name, subscriber = consumer_name, request_queue = request_queue, @@ -448,7 +456,7 @@ class DispatcherManager: backend = self.backend, request_queue = qconfig["request"], response_queue = qconfig["response"], - timeout = 120, + timeout = self.timeout, consumer = f"{self.prefix}-{workspace}-{flow}-{kind}-request", subscriber = f"{self.prefix}-{workspace}-{flow}-{kind}-request", ) diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 0f6a5070..fb51e1a2 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -119,6 +119,7 @@ class Api: prefix = "gateway", queue_overrides = queue_overrides, auth = self.auth, + timeout = self.timeout, ) self.endpoint_manager = EndpointManager( diff --git a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py index 1cefcbe9..c8ab9c36 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py @@ -104,7 +104,15 @@ class Processor(LlmService): return resp - except RateLimitError: + except RateLimitError as e: + try: + body = getattr(e, 'body', {}) + if isinstance(body, dict): + code = body.get('error', {}).get('code') + if code in ('insufficient_quota', 'invalid_api_key', 'account_deactivated'): + raise RuntimeError(f"OpenAI unrecoverable error: {code} - {body['error'].get('message', '')}") + except (ValueError, KeyError, TypeError, AttributeError): + pass # Leave rate limit retries to the base handler raise TooManyRequests() @@ -188,7 +196,16 @@ class Processor(LlmService): logger.debug("Streaming complete") - except RateLimitError: + except RateLimitError as e: + try: + body = getattr(e, 'body', {}) + if isinstance(body, dict): + code = body.get('error', {}).get('code') + if code in ('insufficient_quota', 'invalid_api_key', 'account_deactivated'): + logger.warning(f"Hit unrecoverable rate limit error during streaming: {code}") + raise RuntimeError(f"OpenAI unrecoverable error: {code} - {body['error'].get('message', '')}") + except (ValueError, KeyError, TypeError, AttributeError): + pass logger.warning("Hit rate limit during streaming") raise TooManyRequests()