feat: fixed vision/image provider specific errors and fixed podcast/video streaming
Some checks are pending
Build and Push Docker Images / tag_release (push) Waiting to run
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (backend, surfsense-backend) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (web, surfsense-web) (push) Blocked by required conditions

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-05-02 19:18:53 -07:00
parent ae9d36d77f
commit 47b2994ec7
54 changed files with 4469 additions and 563 deletions

View file

@ -0,0 +1,318 @@
"""Regression tests for ``run_async_celery_task``.
These tests pin down the production bug observed on 2026-05-02 where
the video-presentation Celery task hung at ``[billable_call] finalize``
because the shared ``app.db.engine`` had pooled asyncpg connections
bound to a *previous* task's now-closed event loop. Reusing such a
connection on a fresh loop crashes inside ``pool_pre_ping`` with::
AttributeError: 'NoneType' object has no attribute 'send'
(the proactor is None because the loop is gone) and can hang forever
inside the asyncpg ``Connection._cancel`` cleanup coroutine.
The fix is ``run_async_celery_task``: a small helper that runs every
async celery task body inside a fresh event loop and disposes the
shared engine pool both before (defends against a previous task that
crashed) and after (releases connections we opened on this loop).
Tests here exercise the helper with a stub engine that records
``dispose()`` calls and panics if a coroutine produced by one loop is
awaited on another mirroring the real asyncpg behaviour.
"""
from __future__ import annotations
import asyncio
import gc
import sys
from collections.abc import Iterator
from contextlib import contextmanager
from unittest.mock import patch
import pytest
pytestmark = pytest.mark.unit
# ---------------------------------------------------------------------------
# Stub engine that emulates the asyncpg-on-stale-loop crash
# ---------------------------------------------------------------------------
class _StaleLoopEngine:
"""Tiny stand-in for ``app.db.engine`` that tracks dispose() calls.
``dispose()`` is async (matches ``AsyncEngine.dispose``) and records
the running event loop id so tests can assert it ran on *each*
fresh loop.
"""
def __init__(self) -> None:
self.dispose_loop_ids: list[int] = []
async def dispose(self) -> None:
loop = asyncio.get_running_loop()
self.dispose_loop_ids.append(id(loop))
@contextmanager
def _patch_shared_engine(stub: _StaleLoopEngine) -> Iterator[None]:
"""Patch ``from app.db import engine as shared_engine`` lookup.
The helper imports lazily inside the function body, so we have to
patch the attribute on the already-loaded ``app.db`` module.
"""
import app.db as app_db
original = getattr(app_db, "engine", None)
app_db.engine = stub # type: ignore[attr-defined]
try:
yield
finally:
if original is None:
with pytest.raises(AttributeError):
_ = app_db.engine
else:
app_db.engine = original # type: ignore[attr-defined]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_runner_returns_value_and_disposes_engine_around_call() -> None:
"""Happy path: the coroutine result is returned, and the shared
engine is disposed both before and after the task body runs.
"""
from app.tasks.celery_tasks import run_async_celery_task
stub = _StaleLoopEngine()
async def _body() -> str:
# Engine should already have been disposed once before we run.
assert len(stub.dispose_loop_ids) == 1
return "ok"
with _patch_shared_engine(stub):
result = run_async_celery_task(_body)
assert result == "ok"
# Once before the body, once after (in finally).
assert len(stub.dispose_loop_ids) == 2
# Both disposes ran on the SAME (fresh) loop the task body used.
assert stub.dispose_loop_ids[0] == stub.dispose_loop_ids[1]
def test_runner_creates_fresh_loop_per_invocation() -> None:
"""Each call must spin its own loop. Without this guarantee a
previous task's loop would be reused and the asyncpg-stale-loop
crash would never be avoided.
"""
import app.tasks.celery_tasks as celery_tasks_pkg
stub = _StaleLoopEngine()
new_loop_calls = 0
closed_loops: list[bool] = []
real_new_event_loop = asyncio.new_event_loop
def _counting_new_loop() -> asyncio.AbstractEventLoop:
nonlocal new_loop_calls
new_loop_calls += 1
loop = real_new_event_loop()
# Hook close() so we can verify each loop was closed properly
# before the next one was created.
original_close = loop.close
def _tracked_close() -> None:
closed_loops.append(True)
original_close()
loop.close = _tracked_close # type: ignore[method-assign]
return loop
async def _body() -> None:
# Loop is alive and current at body execution time.
running = asyncio.get_running_loop()
assert not running.is_closed()
with (
_patch_shared_engine(stub),
patch.object(asyncio, "new_event_loop", _counting_new_loop),
):
for _ in range(3):
celery_tasks_pkg.run_async_celery_task(_body)
assert new_loop_calls == 3
assert closed_loops == [True, True, True]
# Each invocation disposed twice (before + after).
assert len(stub.dispose_loop_ids) == 6
def test_runner_disposes_engine_even_when_body_raises() -> None:
"""Cleanup MUST run on the failure path too — otherwise stale
connections leak into the next task and cause the original hang.
"""
from app.tasks.celery_tasks import run_async_celery_task
stub = _StaleLoopEngine()
class _BoomError(RuntimeError):
pass
async def _body() -> None:
raise _BoomError("kaboom")
with _patch_shared_engine(stub), pytest.raises(_BoomError):
run_async_celery_task(_body)
assert len(stub.dispose_loop_ids) == 2 # before + after still ran
def test_runner_swallows_dispose_errors() -> None:
"""A flaky engine.dispose() must NEVER take down a celery task.
Production scenario: the very first dispose (before the body runs)
might hit a partially-initialised engine; the helper logs and
moves on. The task body still runs; the result is still returned.
"""
from app.tasks.celery_tasks import run_async_celery_task
class _AngryEngine:
def __init__(self) -> None:
self.calls = 0
async def dispose(self) -> None:
self.calls += 1
raise RuntimeError("dispose() blew up")
stub = _AngryEngine()
async def _body() -> int:
return 42
with _patch_shared_engine(stub):
assert run_async_celery_task(_body) == 42
assert stub.calls == 2 # before + after both attempted
def test_runner_propagates_value_from_async_body() -> None:
"""Sanity: pass-through of any pickleable celery return value."""
from app.tasks.celery_tasks import run_async_celery_task
stub = _StaleLoopEngine()
async def _body() -> dict[str, object]:
return {"status": "ready", "video_presentation_id": 19}
with _patch_shared_engine(stub):
out = run_async_celery_task(_body)
assert out == {"status": "ready", "video_presentation_id": 19}
def test_video_presentation_task_uses_runner_helper() -> None:
"""Defence-in-depth: confirm the celery task module imports
``run_async_celery_task``. If a future refactor inlines a
``loop = asyncio.new_event_loop(); ... loop.close()`` block again,
the original hang will return.
"""
# The module's task body should not contain a manual new_event_loop
# call — that's exactly what the helper exists to centralise.
import inspect
from app.tasks.celery_tasks import video_presentation_tasks
src = inspect.getsource(video_presentation_tasks)
assert "run_async_celery_task" in src, (
"video_presentation_tasks.py must use run_async_celery_task; "
"manual asyncio.new_event_loop() in a celery task hangs on the "
"shared SQLAlchemy pool when reused across tasks."
)
assert "asyncio.new_event_loop" not in src, (
"video_presentation_tasks.py contains a raw asyncio.new_event_loop "
"call — route every async task through run_async_celery_task to "
"avoid the stale-pool hang."
)
def test_podcast_task_uses_runner_helper() -> None:
"""Symmetric assertion for the podcast task — same root cause, same
fix, same regression risk.
"""
import inspect
from app.tasks.celery_tasks import podcast_tasks
src = inspect.getsource(podcast_tasks)
assert "run_async_celery_task" in src
assert "asyncio.new_event_loop" not in src
def test_runner_runs_shutdown_asyncgens_before_close() -> None:
"""If the task body created any async generators that didn't get
fully iterated, we must still call ``loop.shutdown_asyncgens()``
before closing otherwise we leak event-loop bound resources
that re-emerge as ``RuntimeError: Event loop is closed`` later.
"""
from app.tasks.celery_tasks import run_async_celery_task
stub = _StaleLoopEngine()
async def _agen():
try:
yield 1
yield 2
finally:
pass
async def _body() -> None:
# Iterate the agen partially, then leave it dangling — exactly
# the situation shutdown_asyncgens() is designed to clean up.
async for v in _agen():
if v == 1:
break
with _patch_shared_engine(stub):
run_async_celery_task(_body)
# By the time the helper returns, garbage collection + shutdown_asyncgens
# should have ensured no live async-gen references remain. We don't
# assert agen.closed directly (it depends on GC ordering); the real
# contract is "no warnings, no event-loop-closed errors". A successful
# second invocation proves the loop was cleaned up properly.
with _patch_shared_engine(stub):
run_async_celery_task(_body)
# Force a GC pass to surface any 'coroutine was never awaited'
# warnings that would indicate the cleanup is broken.
gc.collect()
def test_runner_uses_proactor_loop_on_windows() -> None:
"""On Windows the celery worker preselects a Proactor policy so
subprocess (ffmpeg) calls work. The helper must not silently fall
back to a Selector loop and re-break video/podcast generation.
"""
if not sys.platform.startswith("win"):
pytest.skip("Windows-specific event-loop policy assertion")
from app.tasks.celery_tasks import run_async_celery_task
stub = _StaleLoopEngine()
# Mirror the policy set at the top of every Windows celery task.
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
observed: list[str] = []
async def _body() -> None:
observed.append(type(asyncio.get_running_loop()).__name__)
with _patch_shared_engine(stub):
run_async_celery_task(_body)
assert observed == ["ProactorEventLoop"]

View file

@ -113,6 +113,19 @@ async def _denying_billable_call(**kwargs):
yield SimpleNamespace() # pragma: no cover — for grammar only
@contextlib.asynccontextmanager
async def _settlement_failing_billable_call(**kwargs):
from app.services.billable_calls import BillingSettlementError
_CALL_LOG.append(kwargs)
yield SimpleNamespace()
raise BillingSettlementError(
usage_type=kwargs.get("usage_type", "?"),
user_id=kwargs["user_id"],
cause=RuntimeError("finalize failed"),
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@ -187,8 +200,15 @@ async def test_billable_call_invoked_with_correct_kwargs_for_free_config(monkeyp
call["quota_reserve_micros_override"]
== app_config.QUOTA_DEFAULT_PODCAST_RESERVE_MICROS
)
assert call["thread_id"] == 99
assert call["call_details"] == {"podcast_id": 7, "title": "Test Podcast"}
# Background artifact audit rows intentionally omit the TokenUsage.thread_id
# FK to avoid coupling Celery audit commits to an active chat transaction.
assert "thread_id" not in call
assert call["call_details"] == {
"podcast_id": 7,
"title": "Test Podcast",
"thread_id": 99,
}
assert callable(call["billable_session_factory"])
@pytest.mark.asyncio
@ -279,6 +299,49 @@ async def test_quota_insufficient_marks_podcast_failed_and_skips_graph(monkeypat
assert graph_invoked == [] # Graph never ran on denied reservation.
@pytest.mark.asyncio
async def test_billing_settlement_failure_marks_podcast_failed(monkeypatch):
from app.db import PodcastStatus
from app.tasks.celery_tasks import podcast_tasks
podcast = _make_podcast(podcast_id=10)
session = _FakeSession(podcast)
monkeypatch.setattr(
podcast_tasks,
"get_celery_session_maker",
lambda: _FakeSessionMaker(session),
)
async def _fake_resolver(sess, search_space_id, *, thread_id=None):
return uuid4(), "premium", "gpt-5.4"
monkeypatch.setattr(
podcast_tasks, "_resolve_agent_billing_for_search_space", _fake_resolver
)
monkeypatch.setattr(
podcast_tasks, "billable_call", _settlement_failing_billable_call
)
async def _fake_graph_invoke(state, config):
return {"podcast_transcript": [], "final_podcast_file_path": "x.wav"}
monkeypatch.setattr(podcast_tasks.podcaster_graph, "ainvoke", _fake_graph_invoke)
result = await podcast_tasks._generate_content_podcast(
podcast_id=10,
source_content="hi",
search_space_id=555,
user_prompt=None,
)
assert result == {
"status": "failed",
"podcast_id": 10,
"reason": "billing_settlement_failed",
}
assert podcast.status == PodcastStatus.FAILED
@pytest.mark.asyncio
async def test_resolver_failure_marks_podcast_failed(monkeypatch):
"""If the resolver raises (e.g. search-space deleted), the task fails

View file

@ -0,0 +1,119 @@
"""Predicate-level test for the chat streaming safety net.
The safety net in ``stream_new_chat`` rejects an image turn early with
a friendly ``MODEL_DOES_NOT_SUPPORT_IMAGE_INPUT`` SSE error when the
selected model is *known* to be text-only. The earlier round of this
work used a strict opt-in flag (``supports_image_input`` defaulting to
False on every YAML entry) which blocked vision-capable Azure GPT-5.x
deployments this is the regression we're fixing.
The new predicate is :func:`is_known_text_only_chat_model`, which
returns True only when LiteLLM's authoritative model map *explicitly*
sets ``supports_vision=False``. Anything else (vision True, missing
key, exception) returns False so the request flows through to the
provider.
We exercise the predicate directly here rather than driving the full
``stream_new_chat`` generator covering the gate in isolation keeps
the test focused on the regression while the generator's wider behavior
is exercised by the integration suite.
"""
from __future__ import annotations
import pytest
from app.services.provider_capabilities import is_known_text_only_chat_model
pytestmark = pytest.mark.unit
def test_safety_net_does_not_fire_for_azure_gpt_4o():
"""Regression: ``azure/gpt-4o`` (and the GPT-5.x variants) is
vision-capable per LiteLLM's model map. The previous round's
blanket-False default blocked it; the new predicate must NOT mark
it text-only."""
assert (
is_known_text_only_chat_model(
provider="AZURE_OPENAI",
model_name="my-azure-deployment",
base_model="gpt-4o",
)
is False
)
def test_safety_net_does_not_fire_for_unknown_model():
"""Default-pass on unknown — the safety net only blocks definitive
text-only confirmations. A freshly added third-party model that
LiteLLM doesn't know about must flow through to the provider."""
assert (
is_known_text_only_chat_model(
provider="CUSTOM",
custom_provider="brand_new_proxy",
model_name="brand-new-model-x9",
)
is False
)
def test_safety_net_does_not_fire_when_lookup_raises(monkeypatch):
"""Transient ``litellm.get_model_info`` exception ≠ block. The
helper swallows the error and treats it as 'unknown' False."""
import app.services.provider_capabilities as pc
def _raise(**_kwargs):
raise RuntimeError("intentional test failure")
monkeypatch.setattr(pc.litellm, "get_model_info", _raise)
assert (
is_known_text_only_chat_model(
provider="OPENAI",
model_name="gpt-4o",
)
is False
)
def test_safety_net_fires_only_on_explicit_false(monkeypatch):
"""Stub LiteLLM to assert the only path that returns True is the
explicit ``supports_vision=False`` case. Anything else (True,
None, missing key) returns False from the predicate."""
import app.services.provider_capabilities as pc
def _info_explicit_false(**_kwargs):
return {"supports_vision": False, "max_input_tokens": 8192}
monkeypatch.setattr(pc.litellm, "get_model_info", _info_explicit_false)
assert (
is_known_text_only_chat_model(
provider="OPENAI",
model_name="text-only-stub",
)
is True
)
def _info_true(**_kwargs):
return {"supports_vision": True}
monkeypatch.setattr(pc.litellm, "get_model_info", _info_true)
assert (
is_known_text_only_chat_model(
provider="OPENAI",
model_name="vision-stub",
)
is False
)
def _info_missing(**_kwargs):
return {"max_input_tokens": 8192}
monkeypatch.setattr(pc.litellm, "get_model_info", _info_missing)
assert (
is_known_text_only_chat_model(
provider="OPENAI",
model_name="missing-key-stub",
)
is False
)

View file

@ -105,6 +105,19 @@ async def _denying_billable_call(**kwargs):
yield SimpleNamespace() # pragma: no cover
@contextlib.asynccontextmanager
async def _settlement_failing_billable_call(**kwargs):
from app.services.billable_calls import BillingSettlementError
_CALL_LOG.append(kwargs)
yield SimpleNamespace()
raise BillingSettlementError(
usage_type=kwargs.get("usage_type", "?"),
user_id=kwargs["user_id"],
cause=RuntimeError("finalize failed"),
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@ -176,11 +189,15 @@ async def test_billable_call_invoked_with_correct_kwargs_for_free_config(monkeyp
call["quota_reserve_micros_override"]
== app_config.QUOTA_DEFAULT_VIDEO_PRESENTATION_RESERVE_MICROS
)
assert call["thread_id"] == 99
# Background artifact audit rows intentionally omit the TokenUsage.thread_id
# FK to avoid coupling Celery audit commits to an active chat transaction.
assert "thread_id" not in call
assert call["call_details"] == {
"video_presentation_id": 11,
"title": "Test Presentation",
"thread_id": 99,
}
assert callable(call["billable_session_factory"])
@pytest.mark.asyncio
@ -280,6 +297,57 @@ async def test_quota_insufficient_marks_video_failed_and_skips_graph(monkeypatch
assert graph_invoked == []
@pytest.mark.asyncio
async def test_billing_settlement_failure_marks_video_failed(monkeypatch):
from app.db import VideoPresentationStatus
from app.tasks.celery_tasks import video_presentation_tasks
video = _make_video(video_id=14)
session = _FakeSession(video)
monkeypatch.setattr(
video_presentation_tasks,
"get_celery_session_maker",
lambda: _FakeSessionMaker(session),
)
async def _fake_resolver(sess, search_space_id, *, thread_id=None):
return uuid4(), "premium", "gpt-5.4"
monkeypatch.setattr(
video_presentation_tasks,
"_resolve_agent_billing_for_search_space",
_fake_resolver,
)
monkeypatch.setattr(
video_presentation_tasks,
"billable_call",
_settlement_failing_billable_call,
)
async def _fake_graph_invoke(state, config):
return {"slides": [], "slide_audio_results": [], "slide_scene_codes": []}
monkeypatch.setattr(
video_presentation_tasks.video_presentation_graph,
"ainvoke",
_fake_graph_invoke,
)
result = await video_presentation_tasks._generate_video_presentation(
video_presentation_id=14,
source_content="content",
search_space_id=777,
user_prompt=None,
)
assert result == {
"status": "failed",
"video_presentation_id": 14,
"reason": "billing_settlement_failed",
}
assert video.status == VideoPresentationStatus.FAILED
@pytest.mark.asyncio
async def test_resolver_failure_marks_video_failed(monkeypatch):
from app.db import VideoPresentationStatus