release/v2.4 -> master (#932)

* CLI auth migration, document embeddings core lifecycle (#913)

Migrate get_kg_core and put_kg_core CLI tools to use Api/SocketClient
with first-frame auth (fixes broken raw websocket path). Fix wire
format field names (root/vector). Remove ~600 lines of dead raw
websocket code from invoke_graph_rag.py.

Add document embeddings core lifecycle to the knowledge service:
list/get/put/delete/load operations across schema, translator,
Cassandra table store, knowledge manager, gateway registry, REST API,
socket client, and CLI (tg-get-de-core, tg-put-de-core).

Fix delete_kg_core to also clean up document embeddings rows.

* Remove spurious workspace parameter from SPARQL algebra evaluator (#915)

Fix threading of workspace paramater:
- The SPARQL algebra evaluator was threading a workspace parameter
  through every function and passing it to TriplesClient.query(),
  which doesn't accept it. Workspace isolation is handled by pub/sub
  topic routing — the TriplesClient is already scoped to a
  workspace-specific flow, same as GraphRAG. Passing workspace
  explicitly was both incorrect and unnecessary.

Update tests:
- tests/unit/test_query/test_sparql_algebra.py (new) — Tests
  _query_pattern, _eval_bgp, and evaluate() with various algebra
  nodes. Key tests assert workspace is never in tc.query() kwargs,
  plus correctness tests for BGP, JOIN, UNION, SLICE, DISTINCT, and
  edge cases.
- tests/unit/test_retrieval/test_graph_rag.py — Added
  test_triples_query_never_passes_workspace (checks query()) and
  test_follow_edges_never_passes_workspace (checks query_stream()).

* Make all Cassandra and Qdrant I/O async-safe with proper concurrency controls (#916)

Cassandra triples services were using syncronous EntityCentricKnowledgeGraph
methods from async contexts, and connection state was managed with
threading.local which is wrong for asyncio coroutines sharing a single
thread. Qdrant services had no async wrapping at all, blocking the event
loop on every network call. Rows services had unprotected shared state
mutations across concurrent coroutines.

- Add async methods to EntityCentricKnowledgeGraph (async_insert,
  async_get_s/p/o/sp/po/os/spo/all, async_collection_exists,
  async_create_collection, async_delete_collection) using the existing
  cassandra_async.async_execute bridge
- Rewrite triples write + query services: replace threading.local with
  asyncio.Lock + dict cache for per-workspace connections, use async
  ECKG methods for all data operations, keep asyncio.to_thread only for
  one-time blocking ECKG construction
- Wrap all Qdrant calls in asyncio.to_thread across all 6 services
  (doc/graph/row embeddings write + query), add asyncio.Lock + set cache
  for collection existence checks
- Add asyncio.Lock to rows write + query services to protect shared
  state (schemas, sessions, config caches) from concurrent mutation
- Update all affected tests to match new async patterns

* Fixed error only returning a page of results (#921)

The root cause: async_execute only materialises the first result
page (by design — it says so in its docstring). The streaming query
set fetch_size=20 and expected to iterate all results, but only got
the first 20 rows back.

The fix uses
  asyncio.to_thread(lambda: list(tg.session.execute(...)))
which lets the sync driver iterate
all pages in a worker thread — exactly what the pre-async code did.

* Optional test warning suppression (#923)

* Fix test collection module errors & silence upstream Pytest warnings (#823)

* chore: add virtual environment and .env directories to gitignore

* test: filter upstream DeprecationWarning and UserWarning messages

* fix(namespace): remove empty __init__.py files to fix PEP 420 implicit namespace routing for trustgraph sub-packages

* Revert __init__.py deletions

* Add .ini changes but commented out, will be useful at times

---------

Co-authored-by: Salil M <d2kyt@protonmail.com>

* fix(openai): fail fast on unrecoverable RateLimitError codes (#901) (#904) (#925)

Co-authored-by: Sahil Yadav <sahilyadav.sy2004@gmail.com>

* Ensure retry exception is properly raised (#926)

* fix: library API get/update document round-trip bugs (#893) (#928)

Fix 5 cascading bugs in the Library API wrapper that prevented
the get_documents → update_document round-trip from working:

- Tolerate missing title field in document metadata (use .get())
- Use attribute access on Triple objects instead of subscript
- Serialize datetime to int seconds for JSON compatibility
- Handle empty server response on successful update
- Send both id and document-id keys in update request

Added library API tests

* Fix ontology selector defaults, add bypass mode, enforce domain/range (#929)

- Align similarity_threshold default to 0.3 everywhere (class signature
  had stale 0.7). Fix matching contradiction in tech-spec.
- Add bypass_selector_below parameter (default 5) to skip vector
  similarity selection when ontology element count is small enough.
- Enforce domain/range constraints in TripleConverter for object
  properties and datatype properties, with subclass hierarchy support.
  Properties with no declared domain/range pass through unchanged.
- Add unit tests for domain/range validation, subclass acceptance,
  polymorphic pass-through, and selector bypass.

Fixes #908, #920

* Close producers on flow stop to prevent stale non-persistent topics (#930)

Flow.stop() only stopped consumers, leaving response producers
connected to non-persistent Pulsar topics. After flow restart, the
orphaned producers held stale broker routing state, causing response
messages to never reach new consumers — manifesting as 120s timeouts
on document-embeddings and similar RPC paths.

Fix: Flow.stop() now explicitly stops all producers. Producer.stop()
closes the underlying Pulsar producer connection rather than just
setting a flag.

Fixes #906

* fix(gateway): propagate --timeout flag to per-service dispatchers (#931)

The api-gateway accepts a --timeout flag (default 600s) but the value
was not propagated into DispatcherManager, which hard-coded
timeout=120 for every per-service dispatcher (graph-rag, document-rag,
text-completion, embeddings, librarian, etc.).

This meant any synchronous request taking more than 120 seconds would
always return a Timeout error at the 120s mark, regardless of the
--timeout value set on the gateway.

Changes:
- Add timeout parameter to DispatcherManager.__init__ (default: 120
  for backward compatibility)
- Store self.timeout in DispatcherManager
- Replace both hardcoded timeout=120 with self.timeout in
  invoke_global_service and invoke_flow_service
- Pass self.timeout from Api to DispatcherManager in service.py
- Document the timeout parameter in the docstring

Fixes #894

---------

Co-authored-by: Salil M <d2kyt@protonmail.com>
Co-authored-by: Sahil Yadav <sahilyadav.sy2004@gmail.com>
Co-authored-by: Mister Lobster <jlaportebot@gmail.com>
This commit is contained in:
cybermaggedon 2026-05-18 09:46:58 +01:00 committed by GitHub
parent 142dd0231c
commit 71517e6417
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 849 additions and 29 deletions

View file

@ -121,6 +121,7 @@ class Processor(FlowProcessor):
# Configuration
self.top_k = params.get("top_k", 10)
self.similarity_threshold = params.get("similarity_threshold", 0.3)
self.bypass_selector_below = params.get("bypass_selector_below", 5)
# Per-workspace ontology version tracking
self.current_ontology_versions = {} # workspace -> version
@ -187,7 +188,8 @@ class Processor(FlowProcessor):
ontology_embedder=ontology_embedder,
ontology_loader=loader,
top_k=self.top_k,
similarity_threshold=self.similarity_threshold
similarity_threshold=self.similarity_threshold,
bypass_selector_below=self.bypass_selector_below,
)
# Store flow-specific components
@ -981,6 +983,13 @@ class Processor(FlowProcessor):
default=0.3,
help='Similarity threshold for ontology matching (default: 0.3, range: 0.0-1.0)'
)
parser.add_argument(
'--bypass-selector-below',
type=int,
default=5,
help='Bypass ontology selector when total ontology elements '
'(classes + properties) is below this value (default: 5)'
)
parser.add_argument(
'--triples-batch-size',
type=int,

View file

@ -33,19 +33,44 @@ class OntologySelector:
def __init__(self, ontology_embedder: OntologyEmbedder,
ontology_loader: OntologyLoader,
top_k: int = 10,
similarity_threshold: float = 0.7):
"""Initialize the ontology selector.
Args:
ontology_embedder: Embedder with vector store
ontology_loader: Loader with ontology definitions
top_k: Number of top results to retrieve per segment
similarity_threshold: Minimum similarity score
"""
similarity_threshold: float = 0.3,
bypass_selector_below: int = 5):
self.embedder = ontology_embedder
self.loader = ontology_loader
self.top_k = top_k
self.similarity_threshold = similarity_threshold
self.bypass_selector_below = bypass_selector_below
def _total_ontology_elements(self) -> int:
total = 0
for ontology in self.loader.get_all_ontologies().values():
total += len(ontology.classes)
total += len(ontology.object_properties)
total += len(ontology.datatype_properties)
return total
def _build_full_subsets(self) -> List[OntologySubset]:
subsets = []
for ont_id, ontology in self.loader.get_all_ontologies().items():
subset = OntologySubset(
ontology_id=ont_id,
classes={
cid: cls.__dict__
for cid, cls in ontology.classes.items()
},
object_properties={
pid: prop.__dict__
for pid, prop in ontology.object_properties.items()
},
datatype_properties={
pid: prop.__dict__
for pid, prop in ontology.datatype_properties.items()
},
metadata=ontology.metadata,
relevance_score=1.0,
)
subsets.append(subset)
return subsets
async def select_ontology_subset(self, segments: List[TextSegment]) -> List[OntologySubset]:
"""Select relevant ontology subsets for text segments.
@ -56,6 +81,15 @@ class OntologySelector:
Returns:
List of ontology subsets with relevant elements
"""
total = self._total_ontology_elements()
if total < self.bypass_selector_below:
logger.info(
f"Ontology has {total} elements (below "
f"bypass_selector_below={self.bypass_selector_below}), "
f"using full ontology"
)
return self._build_full_subsets()
# Collect all relevant elements
relevant_elements = await self._find_relevant_elements(segments)

View file

@ -6,7 +6,7 @@ with full URIs and correct is_uri flags.
"""
import logging
from typing import List, Optional
from typing import List, Optional, Set
from .... schema import Triple, Term, IRI, LITERAL
from .... rdf import RDF_TYPE, RDF_LABEL
@ -32,6 +32,25 @@ class TripleConverter:
self.ontology_id = ontology_id
self.entity_registry = EntityRegistry(ontology_id)
def _get_ancestor_classes(self, class_id: str) -> Set[str]:
ancestors = set()
current = class_id
while current:
cls_def = self.ontology_subset.classes.get(current)
if not cls_def:
break
parent = cls_def.get("subclass_of") if isinstance(cls_def, dict) else getattr(cls_def, "subclass_of", None)
if not parent or parent in ancestors:
break
ancestors.add(parent)
current = parent
return ancestors
def _matches_class_constraint(self, actual_type: str, expected_type: str) -> bool:
if actual_type == expected_type:
return True
return expected_type in self._get_ancestor_classes(actual_type)
def convert_all(self, extraction: ExtractionResult) -> List[Triple]:
"""Convert complete extraction result to RDF triples.
@ -129,6 +148,29 @@ class TripleConverter:
logger.warning(f"Unknown relationship '{relationship.relation}', skipping")
return None
# Enforce domain/range constraints when declared
prop_def = self.ontology_subset.object_properties.get(
relationship.relation, {}
)
domain = prop_def.get("domain") if isinstance(prop_def, dict) else getattr(prop_def, "domain", None)
range_ = prop_def.get("range") if isinstance(prop_def, dict) else getattr(prop_def, "range", None)
if domain and not self._matches_class_constraint(relationship.subject_type, domain):
logger.warning(
f"Domain violation: '{relationship.relation}' expects "
f"domain '{domain}', got subject type "
f"'{relationship.subject_type}', skipping"
)
return None
if range_ and not self._matches_class_constraint(relationship.object_type, range_):
logger.warning(
f"Range violation: '{relationship.relation}' expects "
f"range '{range_}', got object type "
f"'{relationship.object_type}', skipping"
)
return None
# Generate triple: subject property object
return Triple(
s=Term(type=IRI, iri=subject_uri),
@ -157,11 +199,25 @@ class TripleConverter:
logger.warning(f"Unknown attribute '{attribute.attribute}', skipping")
return None
# Enforce domain constraint when declared
prop_def = self.ontology_subset.datatype_properties.get(
attribute.attribute, {}
)
domain = prop_def.get("domain") if isinstance(prop_def, dict) else getattr(prop_def, "domain", None)
if domain and not self._matches_class_constraint(attribute.entity_type, domain):
logger.warning(
f"Domain violation: attribute '{attribute.attribute}' "
f"expects domain '{domain}', got entity type "
f"'{attribute.entity_type}', skipping"
)
return None
# Generate triple: entity property "literal value"
return Triple(
s=Term(type=IRI, iri=entity_uri),
p=Term(type=IRI, iri=property_uri),
o=Term(type=LITERAL, value=attribute.value) # Literal!
o=Term(type=LITERAL, value=attribute.value)
)
def _get_class_uri(self, class_id: str) -> Optional[str]:

View file

@ -135,13 +135,19 @@ class DispatcherWrapper:
class DispatcherManager:
def __init__(self, backend, config_receiver, auth,
prefix="api-gateway", queue_overrides=None):
prefix="api-gateway", queue_overrides=None, timeout=120):
"""
``auth`` is required. It flows into the Mux for first-frame
WebSocket authentication and into downstream dispatcher
construction. There is no permissive default constructing
a DispatcherManager without an authenticator would be a
silent downgrade to no-auth on the socket path.
``timeout`` is the per-request timeout in seconds, propagated
to every dispatcher created by this manager. Must match the
gateway's ``--timeout`` flag so that long-running requests
are not prematurely cut off at the old hard-coded 120 s
ceiling.
"""
if auth is None:
raise ValueError(
@ -149,6 +155,8 @@ class DispatcherManager:
"is no no-auth mode"
)
self.timeout = timeout
self.backend = backend
self.config_receiver = config_receiver
self.config_receiver.add_handler(self)
@ -291,7 +299,7 @@ class DispatcherManager:
dispatcher = global_dispatchers[kind](
backend = self.backend,
timeout = 120,
timeout = self.timeout,
consumer = consumer_name,
subscriber = consumer_name,
request_queue = request_queue,
@ -448,7 +456,7 @@ class DispatcherManager:
backend = self.backend,
request_queue = qconfig["request"],
response_queue = qconfig["response"],
timeout = 120,
timeout = self.timeout,
consumer = f"{self.prefix}-{workspace}-{flow}-{kind}-request",
subscriber = f"{self.prefix}-{workspace}-{flow}-{kind}-request",
)

View file

@ -119,6 +119,7 @@ class Api:
prefix = "gateway",
queue_overrides = queue_overrides,
auth = self.auth,
timeout = self.timeout,
)
self.endpoint_manager = EndpointManager(

View file

@ -104,7 +104,15 @@ class Processor(LlmService):
return resp
except RateLimitError:
except RateLimitError as e:
try:
body = getattr(e, 'body', {})
if isinstance(body, dict):
code = body.get('error', {}).get('code')
if code in ('insufficient_quota', 'invalid_api_key', 'account_deactivated'):
raise RuntimeError(f"OpenAI unrecoverable error: {code} - {body['error'].get('message', '')}")
except (ValueError, KeyError, TypeError, AttributeError):
pass
# Leave rate limit retries to the base handler
raise TooManyRequests()
@ -188,7 +196,16 @@ class Processor(LlmService):
logger.debug("Streaming complete")
except RateLimitError:
except RateLimitError as e:
try:
body = getattr(e, 'body', {})
if isinstance(body, dict):
code = body.get('error', {}).get('code')
if code in ('insufficient_quota', 'invalid_api_key', 'account_deactivated'):
logger.warning(f"Hit unrecoverable rate limit error during streaming: {code}")
raise RuntimeError(f"OpenAI unrecoverable error: {code} - {body['error'].get('message', '')}")
except (ValueError, KeyError, TypeError, AttributeError):
pass
logger.warning("Hit rate limit during streaming")
raise TooManyRequests()