feat(observability): add chunk reconcile metric and kill-switch flag

surfsense.indexing.reconcile.chunks counts reused/embedded/deleted chunks per
re-index. CHUNK_RECONCILE_ENABLED (default on) falls back to delete-all +
full re-embed if the diff path ever misbehaves.
This commit is contained in:
CREDO23 2026-06-12 18:52:57 +02:00
parent 8d413ea5c2
commit fd495e1b2f
3 changed files with 35 additions and 0 deletions

View file

@ -342,6 +342,11 @@ EMBEDDING_CACHE_ENABLED=false
# Rows deleted per eviction pass.
# EMBEDDING_CACHE_EVICTION_BATCH=500
# Incremental re-indexing: on document edits, keep chunks whose text is
# unchanged (reusing their embeddings) and embed only new/changed ones.
# Set to false to fall back to delete-all + full re-embed (kill switch).
# CHUNK_RECONCILE_ENABLED=true
# Daytona Sandbox (isolated code execution)
# DAYTONA_SANDBOX_ENABLED=FALSE
# DAYTONA_API_KEY=your-daytona-api-key

View file

@ -979,6 +979,13 @@ class Config:
os.getenv("EMBEDDING_CACHE_EVICTION_BATCH", "500")
)
# Incremental re-indexing: on document edits, keep chunk rows whose text is
# unchanged (reusing their embeddings) and embed only new/changed chunks.
# Kill switch -- disabling falls back to delete-all + full re-embed.
CHUNK_RECONCILE_ENABLED = (
os.getenv("CHUNK_RECONCILE_ENABLED", "true").strip().lower() == "true"
)
# Proxy provider selection. Maps to a ProxyProvider implementation registered
# in app/utils/proxy/registry.py. Add new vendors there and switch via this var.
PROXY_PROVIDER = os.getenv("PROXY_PROVIDER", "anonymous_proxies")

View file

@ -321,6 +321,17 @@ def _embedding_cache_evictions():
)
@lru_cache(maxsize=1)
def _chunk_reconcile_chunks():
return _get_meter().create_counter(
"surfsense.indexing.reconcile.chunks",
description=(
"Chunks handled by incremental re-indexing, by outcome "
"(reused/embedded/deleted)."
),
)
@lru_cache(maxsize=1)
def _celery_heartbeat_refreshes():
return _get_meter().create_counter(
@ -746,6 +757,17 @@ def record_embedding_cache_eviction(count: int, *, phase: str) -> None:
_add(_embedding_cache_evictions(), count, {"phase": phase})
def record_chunk_reconcile(*, reused: int, embedded: int, deleted: int) -> None:
"""Record an incremental re-index: how many chunks were kept vs recomputed."""
for outcome, count in (
("reused", reused),
("embedded", embedded),
("deleted", deleted),
):
if count > 0:
_add(_chunk_reconcile_chunks(), count, {"outcome": outcome})
def record_celery_heartbeat_refresh(*, heartbeat_type: str) -> None:
_add(_celery_heartbeat_refreshes(), 1, {"heartbeat.type": heartbeat_type})
@ -939,6 +961,7 @@ __all__ = [
"record_celery_queue_latency",
"record_chat_request_duration",
"record_chat_request_outcome",
"record_chunk_reconcile",
"record_compaction_run",
"record_connector_sync_duration",
"record_connector_sync_outcome",