From 44abf56d6c88def3e11806bdd5036665cb18ce2f Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 4 Jun 2026 10:25:06 +0200 Subject: [PATCH] fix: restore connector mentions, free-tier fallback, orphan-row guard in new chat flow --- .../chat/streaming/flows/new_chat/auto_pin.py | 9 +++- .../streaming/flows/new_chat/input_state.py | 52 +++++++++++++++++-- .../streaming/flows/new_chat/orchestrator.py | 20 ++++--- .../flows/new_chat/runtime_context.py | 7 +++ .../test_parallel_refactor_parity.py | 27 ++++++++++ 5 files changed, 104 insertions(+), 11 deletions(-) diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/auto_pin.py b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/auto_pin.py index af496cee7..dbb8ee2e4 100644 --- a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/auto_pin.py +++ b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/auto_pin.py @@ -50,8 +50,14 @@ async def resolve_initial_auto_pin( selected_llm_config_id: int, requires_image_input: bool, requested_llm_config_id: int, + force_repin_free: bool = False, ) -> AutoPinResult: - """Run the resolver and classify any ``ValueError`` for the SSE error path.""" + """Run the resolver and classify any ``ValueError`` for the SSE error path. + + ``force_repin_free`` forces a fresh re-pin to a free-tier config (used on + the premium-quota-exhausted fallback so an out-of-quota user isn't repinned + onto another paid model). + """ try: pinned = await resolve_or_get_pinned_llm_config_id( session, @@ -60,6 +66,7 @@ async def resolve_initial_auto_pin( user_id=user_id, selected_llm_config_id=selected_llm_config_id, requires_image_input=requires_image_input, + force_repin_free=force_repin_free, ) ot.add_event( "model.pin.resolved", diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/input_state.py b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/input_state.py index 0c6704bd1..b5187190d 100644 --- a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/input_state.py +++ b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/input_state.py @@ -9,9 +9,9 @@ Pipeline: can resolve ``report_id`` for versioning without spelunking history. 3. **@-mention resolve** (cloud mode) — substitute ``@title`` tokens in the query with canonical ``\`/documents/...\``` paths the LLM expects. - 4. **Context block render** — XML-wrap recent reports, prepend to the - rewritten query, optionally prefix with display name for SEARCH_SPACE - visibility. + 4. **Context block render** — XML-wrap @-mentioned connectors and recent + reports, prepend to the rewritten query, optionally prefix with display + name for SEARCH_SPACE visibility. 5. **HumanMessage** — multimodal content if images are attached. Returns the assembled ``input_state`` dict plus side-channel data the @@ -62,6 +62,7 @@ async def build_new_chat_input_state( user_image_data_urls: list[str] | None, mentioned_document_ids: list[int] | None, mentioned_folder_ids: list[int] | None, + mentioned_connectors: list[dict[str, Any]] | None, mentioned_documents: list[dict[str, Any]] | None, needs_history_bootstrap: bool, thread_visibility: ChatVisibility, @@ -111,6 +112,7 @@ async def build_new_chat_input_state( final_query = _render_query_with_context( agent_user_query=agent_user_query, recent_reports=recent_reports, + mentioned_connectors=mentioned_connectors, ) if thread_visibility == ChatVisibility.SEARCH_SPACE and current_user_display_name: @@ -193,14 +195,56 @@ async def _resolve_mentions_for_query( return agent_user_query, accepted_folder_ids +def _render_connector_block(mentioned_connectors: list[dict[str, Any]]) -> str | None: + """Render the ```` block, or ``None`` when empty. + + Malformed entries (non-dict, or missing id/type) are skipped. + """ + connector_lines: list[str] = [] + for connector in mentioned_connectors: + if not isinstance(connector, dict): + continue + connector_id = connector.get("id") + connector_type = connector.get("connector_type") or connector.get( + "document_type" + ) + account_name = connector.get("account_name") or connector.get("title") + if connector_id is None or connector_type is None: + continue + connector_lines.append( + f' - connector_id={connector_id}, connector_type="{connector_type}", ' + f'account_name="{account_name or ""}"' + ) + if not connector_lines: + return None + return ( + "\n" + "The user selected these exact connector accounts with @. " + "These entries are selection metadata, not retrieved connector content. " + "When a connector-backed tool needs an account, use the matching " + "connector_id from this list if the tool supports connector_id:\n" + + "\n".join(connector_lines) + + "\n" + ) + + def _render_query_with_context( *, agent_user_query: str, recent_reports: list[Report], + mentioned_connectors: list[dict[str, Any]] | None, ) -> str: - """Prepend recent-reports XML block to the user query.""" + """Prepend the ```` then ```` blocks. + + Order is load-bearing for legacy parity. + """ context_parts: list[str] = [] + if mentioned_connectors: + connector_block = _render_connector_block(mentioned_connectors) + if connector_block: + context_parts.append(connector_block) + if recent_reports: report_lines: list[str] = [] for r in recent_reports: diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/orchestrator.py b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/orchestrator.py index 1892320d3..f1cdfa186 100644 --- a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/orchestrator.py +++ b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/orchestrator.py @@ -124,6 +124,8 @@ async def stream_new_chat( llm_config_id: int = -1, mentioned_document_ids: list[int] | None = None, mentioned_folder_ids: list[int] | None = None, + mentioned_connector_ids: list[int] | None = None, + mentioned_connectors: list[dict[str, Any]] | None = None, mentioned_documents: list[dict[str, Any]] | None = None, checkpoint_id: str | None = None, needs_history_bootstrap: bool = False, @@ -272,6 +274,7 @@ async def stream_new_chat( selected_llm_config_id=0, requires_image_input=requires_image_input, requested_llm_config_id=requested_llm_config_id, + force_repin_free=True, ) if pin_fallback.error is not None: message, error_code, error_kind = pin_fallback.error @@ -367,12 +370,6 @@ async def stream_new_chat( mentioned_documents=mentioned_documents, background_tasks=_background_tasks, ) - persist_asst_task = spawn_persist_assistant_shell_task( - chat_id=chat_id, - user_id=user_id, - turn_id=stream_result.turn_id, - background_tasks=_background_tasks, - ) _t0 = time.perf_counter() connector_service, firecrawl_api_key = await setup_connector_and_firecrawl( @@ -435,6 +432,7 @@ async def stream_new_chat( user_image_data_urls=user_image_data_urls, mentioned_document_ids=mentioned_document_ids, mentioned_folder_ids=mentioned_folder_ids, + mentioned_connectors=mentioned_connectors, mentioned_documents=mentioned_documents, needs_history_bootstrap=needs_history_bootstrap, thread_visibility=visibility, @@ -523,6 +521,14 @@ async def stream_new_chat( {"message_id": user_message_id, "turn_id": stream_result.turn_id}, ) + # Spawned only after the user row is confirmed, so a user-persist + # failure can't orphan an assistant shell on the same turn. + persist_asst_task = spawn_persist_assistant_shell_task( + chat_id=chat_id, + user_id=user_id, + turn_id=stream_result.turn_id, + background_tasks=_background_tasks, + ) assistant_message_id = await await_persist_task( persist_asst_task, chat_id=chat_id, @@ -588,6 +594,8 @@ async def stream_new_chat( mentioned_document_ids=mentioned_document_ids, accepted_folder_ids=accepted_folder_ids, mentioned_folder_ids=mentioned_folder_ids, + mentioned_connector_ids=mentioned_connector_ids, + mentioned_connectors=mentioned_connectors, request_id=request_id, turn_id=stream_result.turn_id, ) diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/runtime_context.py b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/runtime_context.py index cf1e8c3fb..2bbb0b769 100644 --- a/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/runtime_context.py +++ b/surfsense_backend/app/tasks/chat/streaming/flows/new_chat/runtime_context.py @@ -17,6 +17,8 @@ def build_new_chat_runtime_context( mentioned_document_ids: list[int] | None, accepted_folder_ids: list[int], mentioned_folder_ids: list[int] | None, + mentioned_connector_ids: list[int] | None, + mentioned_connectors: list[dict[str, object]] | None, request_id: str | None, turn_id: str, ) -> SurfSenseContextSchema: @@ -26,11 +28,16 @@ def build_new_chat_runtime_context( ``mentioned_folder_ids`` from the request: the resolver drops chips that pointed at deleted folders or folders the caller can't see, so middlewares only get authorized ids. + + Connector mentions are set on the schema for legacy parity even though no + middleware reads them yet. """ return SurfSenseContextSchema( search_space_id=search_space_id, mentioned_document_ids=list(mentioned_document_ids or []), mentioned_folder_ids=list(accepted_folder_ids or mentioned_folder_ids or []), + mentioned_connector_ids=list(mentioned_connector_ids or []), + mentioned_connectors=list(mentioned_connectors or []), request_id=request_id, turn_id=turn_id, ) diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_parallel_refactor_parity.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_parallel_refactor_parity.py index e014bb911..655f34fa6 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_parallel_refactor_parity.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_parallel_refactor_parity.py @@ -241,11 +241,14 @@ def test_image_capability_blocks_known_text_only_models() -> None: def test_new_chat_runtime_context_prefers_accepted_folder_ids() -> None: + """Post-resolve accepted folder ids win over the raw requested ids.""" ctx = build_new_chat_runtime_context( search_space_id=7, mentioned_document_ids=[1, 2], accepted_folder_ids=[10], mentioned_folder_ids=[20, 30], + mentioned_connector_ids=None, + mentioned_connectors=None, request_id="req", turn_id="t1", ) @@ -258,17 +261,41 @@ def test_new_chat_runtime_context_prefers_accepted_folder_ids() -> None: def test_new_chat_runtime_context_falls_back_to_mentioned_folder_ids() -> None: + """With no accepted ids, the raw requested folder ids flow through.""" ctx = build_new_chat_runtime_context( search_space_id=7, mentioned_document_ids=None, accepted_folder_ids=[], mentioned_folder_ids=[20, 30], + mentioned_connector_ids=None, + mentioned_connectors=None, request_id=None, turn_id="t2", ) assert list(ctx.mentioned_folder_ids) == [20, 30] +def test_new_chat_runtime_context_propagates_connector_mentions() -> None: + """@-selected connector ids/accounts ride onto the runtime context schema. + + Parity with the legacy ``stream_new_chat`` runtime context, which set both + ``mentioned_connector_ids`` and ``mentioned_connectors`` on the schema. + """ + connectors = [{"id": 5, "connector_type": "SLACK_CONNECTOR", "title": "acme"}] + ctx = build_new_chat_runtime_context( + search_space_id=7, + mentioned_document_ids=None, + accepted_folder_ids=[], + mentioned_folder_ids=None, + mentioned_connector_ids=[5], + mentioned_connectors=connectors, + request_id=None, + turn_id="t3", + ) + assert list(ctx.mentioned_connector_ids) == [5] + assert list(ctx.mentioned_connectors) == connectors + + def test_resume_chat_runtime_context_empty_mention_lists() -> None: ctx = build_resume_chat_runtime_context( search_space_id=42, request_id="req-r", turn_id="t-r"