mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
Fix RabbitMQ request/response race and chunker Flow API drift (#779)
* Fix Metadata/EntityEmbeddings schema migration tail and add regression tests (#776)
The Metadata dataclass dropped its `metadata: list[Triple]` field
and EntityEmbeddings/ChunkEmbeddings settled on a singular
`vector: list[float]` field, but several call sites kept passing
`Metadata(metadata=...)` and `EntityEmbeddings(vectors=...)`. The
bugs were latent until a websocket client first hit
`/api/v1/flow/default/import/entity-contexts`, at which point the
dispatcher TypeError'd on construction.
Production fixes (5 call sites on the same migration tail):
* trustgraph-flow gateway dispatchers entity_contexts_import.py
and graph_embeddings_import.py — drop the stale
Metadata(metadata=...) kwarg; switch graph_embeddings_import
to the singular `vector` wire key.
* trustgraph-base messaging translators knowledge.py and
document_loading.py — fix decode side to read the singular
`"vector"` key, matching what their own encode sides have
always written.
* trustgraph-flow tables/knowledge.py — fix Cassandra row
deserialiser to construct EntityEmbeddings(vector=...)
instead of vectors=.
* trustgraph-flow gateway core_import/core_export — switch the
kg-core msgpack wire format to the singular `"v"`/`"vector"`
key and drop the dead `m["m"]` envelope field that referenced
the removed Metadata.metadata triples list (it was a
guaranteed KeyError on the export side).
Defense-in-depth regression coverage (32 new tests across 7 files):
* tests/contract/test_schema_field_contracts.py — pin the field
set of Metadata, EntityEmbeddings, ChunkEmbeddings,
EntityContext so any future schema rename fails CI loudly
with a clear diff.
* tests/unit/test_translators/test_knowledge_translator_roundtrip.py
and test_document_embeddings_translator_roundtrip.py -
encode→decode round-trip the affected translators end to end,
locking in the singular `"vector"` wire key.
* tests/unit/test_gateway/test_entity_contexts_import_dispatcher.py
and test_graph_embeddings_import_dispatcher.py — exercise the
websocket dispatchers' receive() path with realistic
payloads, the direct regression test for the original
production crash.
* tests/unit/test_gateway/test_core_import_export_roundtrip.py
— pack/unpack the kg-core msgpack format through the real
dispatcher classes (with KnowledgeRequestor mocked),
including a full export→import round-trip.
* tests/unit/test_tables/test_knowledge_table_store.py —
exercise the Cassandra row → schema conversion via __new__ to
bypass the live cluster connection.
Also fixes an unrelated leaked-coroutine RuntimeWarning in
test_gateway/test_service.py::test_run_method_calls_web_run_app: the
mocked aiohttp.web.run_app now closes the coroutine that Api.run() hands
it, mirroring what the real run_app would do, instead of leaving it for
the GC to complain about.
* Fix RabbitMQ request/response race and chunker Flow API drift
Two unrelated regressions surfaced after the v2.2 queue class
refactor. Bundled here because both are small and both block
production.
1. Request/response race against ephemeral RabbitMQ response
queues
Commit feeb92b3 switched response/notify queues to per-subscriber
auto-delete exclusive queues. That fixed orphaned-queue
accumulation but introduced a setup race: Subscriber.start()
created the run() task and returned immediately, while the
underlying RabbitMQ consumer only declared and bound its queue
lazily on the first receive() call. RequestResponse.request()
therefore published the request before any queue was bound to the
matching routing key, and the broker dropped the reply. Symptoms:
"Failed to fetch config on notify" / "Request timeout exception"
repeating roughly every 10s in api-gateway, document-embeddings
and any other service exercising the config notify path.
Fix:
* Add ensure_connected() to the BackendConsumer protocol;
implement it on RabbitMQBackendConsumer (calls _connect
synchronously, declaring and binding the queue) and as a
no-op on PulsarBackendConsumer (Pulsar's client.subscribe is
already synchronous at construction).
* Convert Subscriber's readiness signal from a non-existent
Event to an asyncio.Future created in start(). run() calls
consumer.ensure_connected() immediately after
create_consumer() and sets _ready.set_result(None) on first
successful bind. start() awaits the future via asyncio.wait
so it returns only once the consumer is fully bound. Any
reply published after start() returns is therefore guaranteed
to land in a bound queue.
* First-attempt connection failures call
_ready.set_exception(e) and exit run() so start() unblocks
with the error rather than hanging forever — the existing
higher-level retry pattern in fetch_and_apply_config takes
over from there. Runtime failures after a successful start
still go through the existing retry-with-backoff path.
* Update the two existing graceful-shutdown tests that
monkey-patch Subscriber.run with a custom coroutine to honor
the new contract by signalling _ready themselves.
* Add tests/unit/test_base/test_subscriber_readiness.py with
five regression tests pinning the readiness contract:
ensure_connected must be called before start() returns;
start() must block while ensure_connected runs
(race-condition guard with a threading.Event gate);
first-attempt create_consumer and ensure_connected failures
must propagate to start() instead of hanging;
ensure_connected must run before any receive() call.
2. Chunker Flow parameter lookup using the wrong attribute
trustgraph-base/trustgraph/base/chunking_service.py was reading
flow.parameters.get("chunk-size") and chunk-overlap, but the Flow
class has no `parameters` attribute — parameter lookup is exposed
through Flow.__call__ (flow("chunk-size") returns the resolved
value or None). The exception was caught and logged as a
WARNING, so chunking continued with the default sizes and any
configured chunk-size / chunk-overlap was silently ignored:
chunker - WARNING - Could not parse chunk-size parameter:
'Flow' object has no attribute 'parameters'
The chunker tests didn't catch this because they constructed
mock_flow = MagicMock() and configured
mock_flow.parameters.get.side_effect = ..., which is the same
phantom attribute MagicMock auto-creates on demand. Tests and
production agreed on the wrong API.
Fix: switch chunking_service.py to flow("chunk-size") /
flow("chunk-overlap"). Update both chunker test files to mock the
__call__ side_effect instead of the phantom parameters.get,
merging parameter values into the existing flow() lookup the
on_message tests already used for producer resolution.
This commit is contained in:
parent
c23e28aa66
commit
ffe310af7c
9 changed files with 329 additions and 24 deletions
|
|
@ -236,6 +236,10 @@ async def test_subscriber_graceful_shutdown():
|
||||||
with patch.object(subscriber, 'run') as mock_run:
|
with patch.object(subscriber, 'run') as mock_run:
|
||||||
# Mock run that simulates graceful shutdown
|
# Mock run that simulates graceful shutdown
|
||||||
async def mock_run_graceful():
|
async def mock_run_graceful():
|
||||||
|
# Honor the readiness contract: real run() signals _ready
|
||||||
|
# after binding the consumer, so start() can unblock. Mocks
|
||||||
|
# of run() must do the same or start() hangs forever.
|
||||||
|
subscriber._ready.set_result(None)
|
||||||
# Process messages while running, then drain
|
# Process messages while running, then drain
|
||||||
while subscriber.running or subscriber.draining:
|
while subscriber.running or subscriber.draining:
|
||||||
if subscriber.draining:
|
if subscriber.draining:
|
||||||
|
|
@ -337,6 +341,8 @@ async def test_subscriber_pending_acks_cleanup():
|
||||||
with patch.object(subscriber, 'run') as mock_run:
|
with patch.object(subscriber, 'run') as mock_run:
|
||||||
# Mock run that simulates cleanup of pending acks
|
# Mock run that simulates cleanup of pending acks
|
||||||
async def mock_run_cleanup():
|
async def mock_run_cleanup():
|
||||||
|
# Honor the readiness contract — see test_subscriber_graceful_shutdown.
|
||||||
|
subscriber._ready.set_result(None)
|
||||||
while subscriber.running or subscriber.draining:
|
while subscriber.running or subscriber.draining:
|
||||||
await asyncio.sleep(0.05)
|
await asyncio.sleep(0.05)
|
||||||
if subscriber.draining:
|
if subscriber.draining:
|
||||||
|
|
|
||||||
189
tests/unit/test_base/test_subscriber_readiness.py
Normal file
189
tests/unit/test_base/test_subscriber_readiness.py
Normal file
|
|
@ -0,0 +1,189 @@
|
||||||
|
"""
|
||||||
|
Regression tests for Subscriber.start() readiness barrier.
|
||||||
|
|
||||||
|
Background: prior to the eager-connect fix, Subscriber.start() created
|
||||||
|
the run() task and returned immediately. The underlying backend consumer
|
||||||
|
was lazily connected on its first receive() call, which left a setup
|
||||||
|
race for request/response clients using ephemeral per-subscriber response
|
||||||
|
queues (RabbitMQ auto-delete exclusive queues): the request would be
|
||||||
|
published before the response queue was bound, and the broker would
|
||||||
|
silently drop the reply. fetch_config(), document-embeddings, and
|
||||||
|
api-gateway all hit this with "Failed to fetch config on notify" /
|
||||||
|
"Request timeout exception" symptoms.
|
||||||
|
|
||||||
|
These tests pin the readiness contract:
|
||||||
|
|
||||||
|
await subscriber.start()
|
||||||
|
# at this point, consumer.ensure_connected() MUST have run
|
||||||
|
|
||||||
|
so that any future change which removes the eager bind, or moves it
|
||||||
|
back to lazy initialisation, fails CI loudly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from trustgraph.base.subscriber import Subscriber
|
||||||
|
|
||||||
|
|
||||||
|
def _make_backend(ensure_connected_side_effect=None,
|
||||||
|
receive_side_effect=None):
|
||||||
|
"""Build a fake backend whose consumer records ensure_connected /
|
||||||
|
receive calls. ensure_connected_side_effect lets a test inject a
|
||||||
|
delay or exception."""
|
||||||
|
backend = MagicMock()
|
||||||
|
consumer = MagicMock()
|
||||||
|
|
||||||
|
consumer.ensure_connected = MagicMock(
|
||||||
|
side_effect=ensure_connected_side_effect,
|
||||||
|
)
|
||||||
|
|
||||||
|
# By default receive raises a timeout-style exception that the
|
||||||
|
# subscriber loop is supposed to swallow as a "no message yet" — this
|
||||||
|
# keeps the subscriber idling cleanly while the test inspects state.
|
||||||
|
if receive_side_effect is None:
|
||||||
|
receive_side_effect = TimeoutError("No message received within timeout")
|
||||||
|
consumer.receive = MagicMock(side_effect=receive_side_effect)
|
||||||
|
|
||||||
|
consumer.acknowledge = MagicMock()
|
||||||
|
consumer.negative_acknowledge = MagicMock()
|
||||||
|
consumer.pause_message_listener = MagicMock()
|
||||||
|
consumer.unsubscribe = MagicMock()
|
||||||
|
consumer.close = MagicMock()
|
||||||
|
|
||||||
|
backend.create_consumer.return_value = consumer
|
||||||
|
return backend, consumer
|
||||||
|
|
||||||
|
|
||||||
|
def _make_subscriber(backend):
|
||||||
|
return Subscriber(
|
||||||
|
backend=backend,
|
||||||
|
topic="response:tg:config",
|
||||||
|
subscription="test-sub",
|
||||||
|
consumer_name="test-consumer",
|
||||||
|
schema=dict,
|
||||||
|
max_size=10,
|
||||||
|
drain_timeout=1.0,
|
||||||
|
backpressure_strategy="block",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSubscriberReadiness:
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_calls_ensure_connected_before_returning(self):
|
||||||
|
"""The barrier: ensure_connected must have been invoked at least
|
||||||
|
once by the time start() returns."""
|
||||||
|
backend, consumer = _make_backend()
|
||||||
|
subscriber = _make_subscriber(backend)
|
||||||
|
|
||||||
|
await subscriber.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.ensure_connected.assert_called_once()
|
||||||
|
finally:
|
||||||
|
await subscriber.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_blocks_until_ensure_connected_completes(self):
|
||||||
|
"""If ensure_connected is slow, start() must wait for it. This is
|
||||||
|
the actual race-condition guard — it would have failed against
|
||||||
|
the buggy version where start() returned before run() had even
|
||||||
|
scheduled the consumer creation."""
|
||||||
|
connect_started = asyncio.Event()
|
||||||
|
release_connect = asyncio.Event()
|
||||||
|
|
||||||
|
# ensure_connected runs in the executor thread, so we need a
|
||||||
|
# threading-safe gate. Use a simple busy-wait on a flag set by
|
||||||
|
# the asyncio side via call_soon_threadsafe — but the simpler
|
||||||
|
# path is to give it a sleep and observe ordering.
|
||||||
|
import threading
|
||||||
|
gate = threading.Event()
|
||||||
|
|
||||||
|
def slow_connect():
|
||||||
|
connect_started.set() # safe: only mutates the Event flag
|
||||||
|
gate.wait(timeout=2.0)
|
||||||
|
|
||||||
|
backend, consumer = _make_backend(
|
||||||
|
ensure_connected_side_effect=slow_connect,
|
||||||
|
)
|
||||||
|
subscriber = _make_subscriber(backend)
|
||||||
|
|
||||||
|
start_task = asyncio.create_task(subscriber.start())
|
||||||
|
|
||||||
|
# Wait until ensure_connected has begun executing.
|
||||||
|
await asyncio.wait_for(connect_started.wait(), timeout=2.0)
|
||||||
|
|
||||||
|
# ensure_connected is in flight — start() must NOT have returned.
|
||||||
|
assert not start_task.done(), (
|
||||||
|
"start() returned before ensure_connected() completed — "
|
||||||
|
"the readiness barrier is broken and the request/response "
|
||||||
|
"race condition is back."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Release the gate; start() should now complete promptly.
|
||||||
|
gate.set()
|
||||||
|
await asyncio.wait_for(start_task, timeout=2.0)
|
||||||
|
|
||||||
|
consumer.ensure_connected.assert_called_once()
|
||||||
|
|
||||||
|
await subscriber.stop()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_propagates_consumer_creation_failure(self):
|
||||||
|
"""If create_consumer() raises, start() must surface the error
|
||||||
|
rather than hang on the readiness future. The old code path
|
||||||
|
retried indefinitely inside run() and never let start() unblock."""
|
||||||
|
backend = MagicMock()
|
||||||
|
backend.create_consumer.side_effect = RuntimeError("broker down")
|
||||||
|
|
||||||
|
subscriber = _make_subscriber(backend)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="broker down"):
|
||||||
|
await asyncio.wait_for(subscriber.start(), timeout=2.0)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_propagates_ensure_connected_failure(self):
|
||||||
|
"""Same contract for an ensure_connected() that raises (e.g. the
|
||||||
|
broker is up but the queue declare/bind fails)."""
|
||||||
|
backend, consumer = _make_backend(
|
||||||
|
ensure_connected_side_effect=RuntimeError("queue declare failed"),
|
||||||
|
)
|
||||||
|
subscriber = _make_subscriber(backend)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="queue declare failed"):
|
||||||
|
await asyncio.wait_for(subscriber.start(), timeout=2.0)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ensure_connected_runs_before_subscriber_running_log(self):
|
||||||
|
"""Subtle ordering: ensure_connected MUST happen before the
|
||||||
|
receive loop, so that any reply is captured. We assert this by
|
||||||
|
checking ensure_connected was called before any receive call."""
|
||||||
|
call_order = []
|
||||||
|
|
||||||
|
def record_ensure():
|
||||||
|
call_order.append("ensure_connected")
|
||||||
|
|
||||||
|
def record_receive(*args, **kwargs):
|
||||||
|
call_order.append("receive")
|
||||||
|
raise TimeoutError("No message received within timeout")
|
||||||
|
|
||||||
|
backend, consumer = _make_backend(
|
||||||
|
ensure_connected_side_effect=record_ensure,
|
||||||
|
receive_side_effect=record_receive,
|
||||||
|
)
|
||||||
|
subscriber = _make_subscriber(backend)
|
||||||
|
|
||||||
|
await subscriber.start()
|
||||||
|
|
||||||
|
# Give the receive loop a tick to run at least once.
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
await subscriber.stop()
|
||||||
|
|
||||||
|
# ensure_connected must come first; receive may not have happened
|
||||||
|
# yet on a fast machine, but if it did, it must come after.
|
||||||
|
assert call_order, "neither ensure_connected nor receive was called"
|
||||||
|
assert call_order[0] == "ensure_connected"
|
||||||
|
|
@ -70,11 +70,12 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
# Mock message and flow
|
# Mock message and flow
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
|
# Flow exposes parameter lookup via __call__: flow("chunk-size")
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 2000, # Override chunk size
|
"chunk-size": 2000, # Override chunk size
|
||||||
"chunk-overlap": None # Use default chunk overlap
|
"chunk-overlap": None # Use default chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -105,10 +106,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": None, # Use default chunk size
|
"chunk-size": None, # Use default chunk size
|
||||||
"chunk-overlap": 200 # Override chunk overlap
|
"chunk-overlap": 200 # Override chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -139,10 +140,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 1500, # Override chunk size
|
"chunk-size": 1500, # Override chunk size
|
||||||
"chunk-overlap": 150 # Override chunk overlap
|
"chunk-overlap": 150 # Override chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -195,15 +196,15 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_producer = AsyncMock()
|
mock_producer = AsyncMock()
|
||||||
mock_triples_producer = AsyncMock()
|
mock_triples_producer = AsyncMock()
|
||||||
|
# Flow.__call__ resolves parameters and producers/consumers from the
|
||||||
|
# same dict — merge both kinds here.
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 1500,
|
"chunk-size": 1500,
|
||||||
"chunk-overlap": 150,
|
"chunk-overlap": 150,
|
||||||
}.get(param)
|
|
||||||
mock_flow.side_effect = lambda name: {
|
|
||||||
"output": mock_producer,
|
"output": mock_producer,
|
||||||
"triples": mock_triples_producer,
|
"triples": mock_triples_producer,
|
||||||
}.get(name)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
await processor.on_message(mock_message, mock_consumer, mock_flow)
|
await processor.on_message(mock_message, mock_consumer, mock_flow)
|
||||||
|
|
@ -241,7 +242,7 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.return_value = None # No overrides
|
mock_flow.side_effect = lambda key: None # No overrides
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
|
||||||
|
|
@ -70,11 +70,12 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
# Mock message and flow
|
# Mock message and flow
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
|
# Flow exposes parameter lookup via __call__: flow("chunk-size")
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 400, # Override chunk size
|
"chunk-size": 400, # Override chunk size
|
||||||
"chunk-overlap": None # Use default chunk overlap
|
"chunk-overlap": None # Use default chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -105,10 +106,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": None, # Use default chunk size
|
"chunk-size": None, # Use default chunk size
|
||||||
"chunk-overlap": 25 # Override chunk overlap
|
"chunk-overlap": 25 # Override chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -139,10 +140,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 350, # Override chunk size
|
"chunk-size": 350, # Override chunk size
|
||||||
"chunk-overlap": 30 # Override chunk overlap
|
"chunk-overlap": 30 # Override chunk overlap
|
||||||
}.get(param)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
@ -195,15 +196,15 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_producer = AsyncMock()
|
mock_producer = AsyncMock()
|
||||||
mock_triples_producer = AsyncMock()
|
mock_triples_producer = AsyncMock()
|
||||||
|
# Flow.__call__ resolves parameters and producers/consumers from the
|
||||||
|
# same dict — merge both kinds here.
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.side_effect = lambda param: {
|
mock_flow.side_effect = lambda key: {
|
||||||
"chunk-size": 400,
|
"chunk-size": 400,
|
||||||
"chunk-overlap": 40,
|
"chunk-overlap": 40,
|
||||||
}.get(param)
|
|
||||||
mock_flow.side_effect = lambda name: {
|
|
||||||
"output": mock_producer,
|
"output": mock_producer,
|
||||||
"triples": mock_triples_producer,
|
"triples": mock_triples_producer,
|
||||||
}.get(name)
|
}.get(key)
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
await processor.on_message(mock_message, mock_consumer, mock_flow)
|
await processor.on_message(mock_message, mock_consumer, mock_flow)
|
||||||
|
|
@ -245,7 +246,7 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase):
|
||||||
mock_message = MagicMock()
|
mock_message = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_flow = MagicMock()
|
mock_flow = MagicMock()
|
||||||
mock_flow.parameters.get.return_value = None # No overrides
|
mock_flow.side_effect = lambda key: None # No overrides
|
||||||
|
|
||||||
# Act
|
# Act
|
||||||
chunk_size, chunk_overlap = await processor.chunk_document(
|
chunk_size, chunk_overlap = await processor.chunk_document(
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,18 @@ class BackendProducer(Protocol):
|
||||||
class BackendConsumer(Protocol):
|
class BackendConsumer(Protocol):
|
||||||
"""Protocol for backend-specific consumer."""
|
"""Protocol for backend-specific consumer."""
|
||||||
|
|
||||||
|
def ensure_connected(self) -> None:
|
||||||
|
"""
|
||||||
|
Eagerly establish the underlying connection and bind the queue.
|
||||||
|
|
||||||
|
Backends that lazily connect on first receive() must implement this
|
||||||
|
so that callers can guarantee the consumer is fully bound — and
|
||||||
|
therefore able to receive responses — before any related request is
|
||||||
|
published. Backends that connect at construction time may make this
|
||||||
|
a no-op.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
def receive(self, timeout_millis: int = 2000) -> Message:
|
def receive(self, timeout_millis: int = 2000) -> Message:
|
||||||
"""
|
"""
|
||||||
Receive a message from the topic.
|
Receive a message from the topic.
|
||||||
|
|
|
||||||
|
|
@ -88,14 +88,14 @@ class ChunkingService(FlowProcessor):
|
||||||
chunk_overlap = default_chunk_overlap
|
chunk_overlap = default_chunk_overlap
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cs = flow.parameters.get("chunk-size")
|
cs = flow("chunk-size")
|
||||||
if cs is not None:
|
if cs is not None:
|
||||||
chunk_size = int(cs)
|
chunk_size = int(cs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Could not parse chunk-size parameter: {e}")
|
logger.warning(f"Could not parse chunk-size parameter: {e}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
co = flow.parameters.get("chunk-overlap")
|
co = flow("chunk-overlap")
|
||||||
if co is not None:
|
if co is not None:
|
||||||
chunk_overlap = int(co)
|
chunk_overlap = int(co)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,16 @@ class PulsarBackendConsumer:
|
||||||
self._consumer = pulsar_consumer
|
self._consumer = pulsar_consumer
|
||||||
self._schema_cls = schema_cls
|
self._schema_cls = schema_cls
|
||||||
|
|
||||||
|
def ensure_connected(self) -> None:
|
||||||
|
"""No-op for Pulsar.
|
||||||
|
|
||||||
|
PulsarBackend.create_consumer() calls client.subscribe() which is
|
||||||
|
synchronous and returns a fully-subscribed consumer, so the
|
||||||
|
consumer is already ready by the time this object is constructed.
|
||||||
|
Defined for parity with the BackendConsumer protocol used by
|
||||||
|
Subscriber.start()'s readiness barrier."""
|
||||||
|
pass
|
||||||
|
|
||||||
def receive(self, timeout_millis: int = 2000) -> Message:
|
def receive(self, timeout_millis: int = 2000) -> Message:
|
||||||
"""Receive a message. Raises TimeoutError if no message available."""
|
"""Receive a message. Raises TimeoutError if no message available."""
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -214,6 +214,18 @@ class RabbitMQBackendConsumer:
|
||||||
and self._channel.is_open
|
and self._channel.is_open
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def ensure_connected(self) -> None:
|
||||||
|
"""Eagerly declare and bind the queue.
|
||||||
|
|
||||||
|
Without this, the queue is only declared lazily on the first
|
||||||
|
receive() call. For request/response with ephemeral per-subscriber
|
||||||
|
response queues that is a race: a request published before the
|
||||||
|
response queue is bound will have its reply silently dropped by
|
||||||
|
the broker. Subscriber.start() calls this so callers get a hard
|
||||||
|
readiness barrier."""
|
||||||
|
if not self._is_alive():
|
||||||
|
self._connect()
|
||||||
|
|
||||||
def receive(self, timeout_millis: int = 2000) -> Message:
|
def receive(self, timeout_millis: int = 2000) -> Message:
|
||||||
"""Receive a message. Raises TimeoutError if none available."""
|
"""Receive a message. Raises TimeoutError if none available."""
|
||||||
if not self._is_alive():
|
if not self._is_alive():
|
||||||
|
|
|
||||||
|
|
@ -41,14 +41,55 @@ class Subscriber:
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
self.executor = None
|
self.executor = None
|
||||||
|
|
||||||
|
# Readiness barrier — completed by run() once the underlying
|
||||||
|
# backend consumer is fully connected and bound. start() awaits
|
||||||
|
# this so callers know any subsequently published request will
|
||||||
|
# have a queue ready to receive its response. Without this,
|
||||||
|
# ephemeral per-subscriber response queues (RabbitMQ auto-delete
|
||||||
|
# exclusive queues) would race the request and lose the reply.
|
||||||
|
# A Future is used (rather than an Event) so that a first-attempt
|
||||||
|
# connection failure can be propagated to start() as an exception.
|
||||||
|
self._ready = None # created in start() so we have a running loop
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
|
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
||||||
|
self._ready = asyncio.get_event_loop().create_future()
|
||||||
self.task = asyncio.create_task(self.run())
|
self.task = asyncio.create_task(self.run())
|
||||||
|
|
||||||
|
# Block until run() signals readiness OR exits. The future
|
||||||
|
# carries the outcome of the first connect attempt: a value on
|
||||||
|
# success, an exception on first-attempt failure. If run() exits
|
||||||
|
# without ever signalling (e.g. cancelled, or a code path bug),
|
||||||
|
# we surface that as a clear RuntimeError rather than hanging
|
||||||
|
# forever waiting on the future.
|
||||||
|
ready_wait = asyncio.ensure_future(
|
||||||
|
asyncio.shield(self._ready)
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await asyncio.wait(
|
||||||
|
{self.task, ready_wait},
|
||||||
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
ready_wait.cancel()
|
||||||
|
|
||||||
|
if self._ready.done():
|
||||||
|
# Re-raise first-attempt connect failure if any.
|
||||||
|
self._ready.result()
|
||||||
|
return
|
||||||
|
|
||||||
|
# run() exited before _ready was settled. Propagate its exception
|
||||||
|
# if it had one, otherwise raise a generic readiness error.
|
||||||
|
if self.task.done() and self.task.exception() is not None:
|
||||||
|
raise self.task.exception()
|
||||||
|
raise RuntimeError(
|
||||||
|
"Subscriber.run() exited before signalling readiness"
|
||||||
|
)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
"""Initiate graceful shutdown with draining"""
|
"""Initiate graceful shutdown with draining"""
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
@ -66,6 +107,7 @@ class Subscriber:
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
"""Enhanced run method with integrated draining logic"""
|
"""Enhanced run method with integrated draining logic"""
|
||||||
|
first_attempt = True
|
||||||
while self.running or self.draining:
|
while self.running or self.draining:
|
||||||
|
|
||||||
if self.metrics:
|
if self.metrics:
|
||||||
|
|
@ -87,10 +129,27 @@ class Subscriber:
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Eagerly bind the queue. For backends that connect
|
||||||
|
# lazily on first receive (RabbitMQ), this is what
|
||||||
|
# closes the request/response setup race — without
|
||||||
|
# it the response queue is not bound until later and
|
||||||
|
# any reply published in the meantime is dropped.
|
||||||
|
await loop.run_in_executor(
|
||||||
|
self.executor,
|
||||||
|
lambda: self.consumer.ensure_connected(),
|
||||||
|
)
|
||||||
|
|
||||||
if self.metrics:
|
if self.metrics:
|
||||||
self.metrics.state("running")
|
self.metrics.state("running")
|
||||||
|
|
||||||
logger.info("Subscriber running...")
|
logger.info("Subscriber running...")
|
||||||
|
|
||||||
|
# Signal start() that the consumer is ready. This must
|
||||||
|
# happen AFTER ensure_connected() above so callers can
|
||||||
|
# safely publish requests immediately after start() returns.
|
||||||
|
if first_attempt and not self._ready.done():
|
||||||
|
self._ready.set_result(None)
|
||||||
|
first_attempt = False
|
||||||
drain_end_time = None
|
drain_end_time = None
|
||||||
|
|
||||||
while self.running or self.draining:
|
while self.running or self.draining:
|
||||||
|
|
@ -162,6 +221,16 @@ class Subscriber:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Subscriber exception: {e}", exc_info=True)
|
logger.error(f"Subscriber exception: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# First-attempt connection failure: propagate to start()
|
||||||
|
# so the caller can decide what to do (retry, give up).
|
||||||
|
# Subsequent failures use the existing retry-with-backoff
|
||||||
|
# path so a long-lived subscriber survives broker blips.
|
||||||
|
if first_attempt and not self._ready.done():
|
||||||
|
self._ready.set_exception(e)
|
||||||
|
first_attempt = False
|
||||||
|
# Falls through into finally for cleanup, then the
|
||||||
|
# outer return below ends run() so start() unblocks.
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Negative acknowledge any pending messages
|
# Negative acknowledge any pending messages
|
||||||
for msg in self.pending_acks.values():
|
for msg in self.pending_acks.values():
|
||||||
|
|
@ -193,6 +262,11 @@ class Subscriber:
|
||||||
if not self.running and not self.draining:
|
if not self.running and not self.draining:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# If start() has already returned with an exception there is
|
||||||
|
# nothing more to do — exit run() rather than busy-retry.
|
||||||
|
if self._ready.done() and self._ready.exception() is not None:
|
||||||
|
return
|
||||||
|
|
||||||
# Sleep before retry
|
# Sleep before retry
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue