Merge remote-tracking branch 'upstream/dev' into fix/docker-dev

This commit is contained in:
Anish Sarkar 2026-03-10 14:23:57 +05:30
commit 4cca366e11
18 changed files with 450 additions and 51 deletions

View file

@ -0,0 +1,55 @@
"""104_add_notification_composite_indexes
Revision ID: 104
Revises: 103
Create Date: 2026-03-10
Add composite indexes on the notifications table to speed up the
most common query patterns:
- Unread count by user/category: (user_id, read, type, created_at)
- Notification list by user/space: (user_id, search_space_id, created_at)
- Single-column index on type (for category filtering)
- Single-column index on search_space_id (for space filtering)
"""
from __future__ import annotations
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "104"
down_revision: str | None = "103"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_index(
"ix_notifications_user_read_type_created",
"notifications",
["user_id", "read", "type", "created_at"],
)
op.create_index(
"ix_notifications_user_space_created",
"notifications",
["user_id", "search_space_id", "created_at"],
)
op.create_index(
"ix_notifications_type",
"notifications",
["type"],
)
op.create_index(
"ix_notifications_search_space_id",
"notifications",
["search_space_id"],
)
def downgrade() -> None:
op.drop_index("ix_notifications_search_space_id", table_name="notifications")
op.drop_index("ix_notifications_type", table_name="notifications")
op.drop_index("ix_notifications_user_space_created", table_name="notifications")
op.drop_index("ix_notifications_user_read_type_created", table_name="notifications")

View file

@ -647,6 +647,14 @@ async def search_knowledge_base_async(
top_k,
)
# --- Fast-path: no connectors left after filtering ---
if not connectors:
perf.info(
"[kb_search] TOTAL in %.3fs — no connectors to search, returning empty",
time.perf_counter() - t0,
)
return "No documents found in the knowledge base. The search space has no indexed content yet."
# --- Fast-path: degenerate queries (*, **, empty, etc.) ---
# Semantic embedding of '*' is noise and plainto_tsquery('english', '*')
# yields an empty tsquery, so both retrieval signals are useless.

View file

@ -15,6 +15,7 @@ from sqlalchemy import (
Column,
Enum as SQLAlchemyEnum,
ForeignKey,
Index,
Integer,
String,
Text,
@ -1423,6 +1424,24 @@ class Log(BaseModel, TimestampMixin):
class Notification(BaseModel, TimestampMixin):
__tablename__ = "notifications"
__table_args__ = (
# Composite index for unread-count queries that filter by
# (user_id, read, type) and order by created_at.
Index(
"ix_notifications_user_read_type_created",
"user_id",
"read",
"type",
"created_at",
),
# Covers the common list query: user_id + search_space_id + created_at DESC
Index(
"ix_notifications_user_space_created",
"user_id",
"search_space_id",
"created_at",
),
)
user_id = Column(
UUID(as_uuid=True),
@ -1431,10 +1450,13 @@ class Notification(BaseModel, TimestampMixin):
index=True,
)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
Integer,
ForeignKey("searchspaces.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
type = Column(
String(50), nullable=False
String(50), nullable=False, index=True
) # 'connector_indexing', 'document_processing', etc.
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)

View file

@ -209,8 +209,12 @@ class ChucksHybridSearchRetriever:
tsvector = func.to_tsvector("english", Chunk.content)
tsquery = func.plainto_tsquery("english", query_text)
# Base conditions for chunk filtering - search space is required
base_conditions = [Document.search_space_id == search_space_id]
# Base conditions for chunk filtering - search space is required.
# Exclude documents in "deleting" state (background deletion in progress).
base_conditions = [
Document.search_space_id == search_space_id,
func.coalesce(Document.status["state"].astext, "ready") != "deleting",
]
# Add document type filter if provided
if document_type is not None:

View file

@ -190,8 +190,12 @@ class DocumentHybridSearchRetriever:
tsvector = func.to_tsvector("english", Document.content)
tsquery = func.plainto_tsquery("english", query_text)
# Base conditions for document filtering - search space is required
base_conditions = [Document.search_space_id == search_space_id]
# Base conditions for document filtering - search space is required.
# Exclude documents in "deleting" state (background deletion in progress).
base_conditions = [
Document.search_space_id == search_space_id,
func.coalesce(Document.status["state"].astext, "ready") != "deleting",
]
# Add document type filter if provided
if document_type is not None:

View file

@ -1056,6 +1056,9 @@ async def delete_document(
Delete a document.
Requires DOCUMENTS_DELETE permission for the search space.
Documents in "processing" state cannot be deleted.
Heavy cascade deletion runs asynchronously via Celery so the API
response is fast and the deletion remains durable across API restarts.
"""
try:
result = await session.execute(
@ -1068,13 +1071,17 @@ async def delete_document(
status_code=404, detail=f"Document with id {document_id} not found"
)
# Check if document is pending or currently being processed
doc_state = document.status.get("state") if document.status else None
if doc_state in ("pending", "processing"):
raise HTTPException(
status_code=409, # Conflict
status_code=409,
detail="Cannot delete document while it is pending or being processed. Please wait for processing to complete.",
)
if doc_state == "deleting":
raise HTTPException(
status_code=409,
detail="Document is already being deleted.",
)
# Check permission for the search space
await check_permission(
@ -1085,8 +1092,25 @@ async def delete_document(
"You don't have permission to delete documents in this search space",
)
await session.delete(document)
# Mark the document as "deleting" so it's excluded from searches,
# then commit immediately so the user gets a fast response.
document.status = {"state": "deleting"}
await session.commit()
# Dispatch durable background deletion via Celery.
# If queue dispatch fails, revert status to avoid a stuck "deleting" document.
try:
from app.tasks.celery_tasks.document_tasks import delete_document_task
delete_document_task.delay(document_id)
except Exception as dispatch_error:
document.status = {"state": "ready"}
await session.commit()
raise HTTPException(
status_code=503,
detail="Failed to queue background deletion. Please try again.",
) from dispatch_error
return {"message": "Document deleted successfully"}
except HTTPException:
raise

View file

@ -35,7 +35,6 @@ from app.db import (
shielded_async_session,
)
from app.schemas.new_chat import (
NewChatMessageAppend,
NewChatMessageRead,
NewChatRequest,
NewChatThreadCreate,
@ -891,8 +890,16 @@ async def append_message(
status_code=400, detail="Missing required field: content"
)
# Create message object manually
message = NewChatMessageAppend(role=role, content=content)
# Validate role early (before any DB work)
role_str = role.lower() if isinstance(role, str) else role
try:
message_role = NewChatMessageRole(role_str)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid role: {role}. Must be 'user', 'assistant', or 'system'.",
) from None
# Get thread
result = await session.execute(
select(NewChatThread).filter(NewChatThread.id == thread_id)
@ -913,23 +920,11 @@ async def append_message(
# Check thread-level access based on visibility
await check_thread_access(session, thread, user)
# Convert string role to enum
role_str = (
message.role.lower() if isinstance(message.role, str) else message.role
)
try:
message_role = NewChatMessageRole(role_str)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid role: {message.role}. Must be 'user', 'assistant', or 'system'.",
) from None
# Create message
db_message = NewChatMessage(
thread_id=thread_id,
role=message_role,
content=message.content,
content=content,
author_id=user.id,
)
session.add(db_message)
@ -937,11 +932,12 @@ async def append_message(
# Update thread's updated_at timestamp
thread.updated_at = datetime.now(UTC)
# Note: Title generation now happens in stream_new_chat.py after the first response
# using LLM to generate a descriptive title (with truncation as fallback)
# flush assigns the PK/defaults without a round-trip SELECT
await session.flush()
await session.commit()
await session.refresh(db_message)
# Return the in-memory object (already has id from flush) instead of
# doing an extra refresh() SELECT.
return db_message
except HTTPException:

View file

@ -10,7 +10,7 @@ from typing import Literal
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import desc, func, literal, literal_column, select, update
from sqlalchemy import case, desc, func, literal, literal_column, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Notification, User, get_async_session
@ -108,6 +108,73 @@ class UnreadCountResponse(BaseModel):
recent_unread: int # Within SYNC_WINDOW_DAYS
class CategoryUnreadCount(BaseModel):
total_unread: int
recent_unread: int
class BatchUnreadCountResponse(BaseModel):
"""Batched unread counts for all categories in a single response."""
comments: CategoryUnreadCount
status: CategoryUnreadCount
@router.get("/unread-counts-batch", response_model=BatchUnreadCountResponse)
async def get_unread_counts_batch(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> BatchUnreadCountResponse:
"""
Get unread counts for all notification categories in a single DB query.
Replaces multiple separate calls to /unread-count with different category
filters, reducing round-trips from 2+ to 1.
"""
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
base_filter = [
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
]
if search_space_id is not None:
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
is_comments = Notification.type.in_(CATEGORY_TYPES["comments"])
is_status = Notification.type.in_(CATEGORY_TYPES["status"])
is_recent = Notification.created_at > cutoff_date
query = select(
func.count(case((is_comments, Notification.id))).label("comments_total"),
func.count(case((is_comments & is_recent, Notification.id))).label(
"comments_recent"
),
func.count(case((is_status, Notification.id))).label("status_total"),
func.count(case((is_status & is_recent, Notification.id))).label(
"status_recent"
),
).where(*base_filter)
result = await session.execute(query)
row = result.one()
return BatchUnreadCountResponse(
comments=CategoryUnreadCount(
total_unread=row.comments_total,
recent_unread=row.comments_recent,
),
status=CategoryUnreadCount(
total_unread=row.status_total,
recent_unread=row.status_recent,
),
)
@router.get("/source-types", response_model=SourceTypesResponse)
async def get_notification_source_types(
search_space_id: int | None = Query(None, description="Filter by search space ID"),

View file

@ -274,6 +274,9 @@ async def delete_search_space(
"""
Delete a search space.
Requires SETTINGS_DELETE permission (only owners have this by default).
Heavy cascade deletion (documents, chunks, threads, etc.) is dispatched
to Celery so the response is immediate and durable across API restarts.
"""
try:
# Check permission - only those with SETTINGS_DELETE can delete
@ -293,8 +296,34 @@ async def delete_search_space(
if not db_search_space:
raise HTTPException(status_code=404, detail="Search space not found")
await session.delete(db_search_space)
if (db_search_space.name or "").startswith("[DELETING] "):
raise HTTPException(
status_code=409,
detail="Search space is already being deleted.",
)
# Soft-delete marker (length-safe for String(100)) so users see pending state.
prefix = "[DELETING] "
max_len = 100
available = max_len - len(prefix)
base_name = db_search_space.name or ""
db_search_space.name = f"{prefix}{base_name[:available]}"
await session.commit()
# Dispatch durable background deletion via Celery.
# If queue dispatch fails, revert name to avoid stuck "[DELETING]" state.
try:
from app.tasks.celery_tasks.document_tasks import delete_search_space_task
delete_search_space_task.delay(search_space_id)
except Exception as dispatch_error:
db_search_space.name = base_name
await session.commit()
raise HTTPException(
status_code=503,
detail="Failed to queue background deletion. Please try again.",
) from dispatch_error
return {"message": "Search space deleted successfully"}
except HTTPException:
raise

View file

@ -306,6 +306,9 @@ class ConnectorService:
document_type,
)
if not chunk_results and not doc_results:
return []
# Helper to extract document_id from our doc-grouped result
def _doc_id(item: dict[str, Any]) -> int | None:
doc = item.get("document", {})

View file

@ -89,6 +89,108 @@ async def _run_heartbeat_loop(notification_id: int):
pass # Normal cancellation when task completes
@celery_app.task(
name="delete_document_background",
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=300,
max_retries=5,
)
def delete_document_task(self, document_id: int):
"""Celery task to delete a document and its chunks in batches."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_delete_document_background(document_id))
finally:
loop.close()
async def _delete_document_background(document_id: int) -> None:
"""Delete chunks in batches first, then remove the document row."""
from sqlalchemy import delete as sa_delete, select
from app.db import Chunk, Document
async with get_celery_session_maker()() as session:
batch_size = 500
while True:
chunk_ids_result = await session.execute(
select(Chunk.id)
.where(Chunk.document_id == document_id)
.limit(batch_size)
)
chunk_ids = chunk_ids_result.scalars().all()
if not chunk_ids:
break
await session.execute(sa_delete(Chunk).where(Chunk.id.in_(chunk_ids)))
await session.commit()
doc = await session.get(Document, document_id)
if doc:
await session.delete(doc)
await session.commit()
@celery_app.task(
name="delete_search_space_background",
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=300,
max_retries=5,
)
def delete_search_space_task(self, search_space_id: int):
"""Celery task to delete a search space and heavy child rows in batches."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_delete_search_space_background(search_space_id))
finally:
loop.close()
async def _delete_search_space_background(search_space_id: int) -> None:
"""Delete chunks/docs in batches first, then delete the search space."""
from sqlalchemy import delete as sa_delete, select
from app.db import Chunk, Document, SearchSpace
async with get_celery_session_maker()() as session:
batch_size = 500
while True:
chunk_ids_result = await session.execute(
select(Chunk.id)
.join(Document, Chunk.document_id == Document.id)
.where(Document.search_space_id == search_space_id)
.limit(batch_size)
)
chunk_ids = chunk_ids_result.scalars().all()
if not chunk_ids:
break
await session.execute(sa_delete(Chunk).where(Chunk.id.in_(chunk_ids)))
await session.commit()
while True:
doc_ids_result = await session.execute(
select(Document.id)
.where(Document.search_space_id == search_space_id)
.limit(batch_size)
)
doc_ids = doc_ids_result.scalars().all()
if not doc_ids:
break
await session.execute(sa_delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
space = await session.get(SearchSpace, search_space_id)
if space:
await session.delete(space)
await session.commit()
@celery_app.task(name="process_extension_document", bind=True)
def process_extension_document_task(
self, individual_document_dict, search_space_id: int, user_id: str

View file

@ -0,0 +1,8 @@
import { atom } from "jotai";
import type { InboxItem } from "@/contracts/types/inbox.types";
/**
* Shared atom for status inbox items populated by LayoutDataProvider.
* Avoids duplicate useInbox("status") calls in child components like ConnectorPopup.
*/
export const statusInboxItemsAtom = atom<InboxItem[]>([]);

View file

@ -19,8 +19,8 @@ import { Dialog, DialogContent, DialogTitle } from "@/components/ui/dialog";
import { Spinner } from "@/components/ui/spinner";
import { Tabs, TabsContent } from "@/components/ui/tabs";
import type { SearchSourceConnector } from "@/contracts/types/connector.types";
import { statusInboxItemsAtom } from "@/atoms/inbox/status-inbox.atom";
import { useConnectorsElectric } from "@/hooks/use-connectors-electric";
import { useInbox } from "@/hooks/use-inbox";
import { cn } from "@/lib/utils";
import { ConnectorDialogHeader } from "./connector-popup/components/connector-dialog-header";
import { ConnectorConnectView } from "./connector-popup/connector-configs/views/connector-connect-view";
@ -75,12 +75,9 @@ export const ConnectorIndicator = forwardRef<ConnectorIndicatorHandle, Connector
const { data: documentTypeCounts, isFetching: documentTypesLoading } =
useAtomValue(documentTypeCountsAtom);
// Fetch status notifications to detect indexing failures
const { inboxItems: statusInboxItems = [] } = useInbox(
currentUser?.id ?? null,
searchSpaceId ? Number(searchSpaceId) : null,
"status"
);
// Read status inbox items from shared atom (populated by LayoutDataProvider)
// instead of creating a duplicate useInbox("status") hook.
const statusInboxItems = useAtomValue(statusInboxItemsAtom);
const inboxItems = useMemo(
() => statusInboxItems.filter((item) => item.type === "connector_indexing"),
[statusInboxItems]

View file

@ -10,6 +10,7 @@ import { Fragment, useCallback, useEffect, useMemo, useRef, useState } from "rea
import { toast } from "sonner";
import { currentThreadAtom, resetCurrentThreadAtom } from "@/atoms/chat/current-thread.atom";
import { documentsSidebarOpenAtom } from "@/atoms/documents/ui.atoms";
import { statusInboxItemsAtom } from "@/atoms/inbox/status-inbox.atom";
import { deleteSearchSpaceMutationAtom } from "@/atoms/search-spaces/search-space-mutation.atoms";
import { searchSpacesAtom } from "@/atoms/search-spaces/search-space-query.atoms";
import { currentUserAtom } from "@/atoms/user/user-query.atoms";
@ -37,6 +38,7 @@ import { isPageLimitExceededMetadata } from "@/contracts/types/inbox.types";
import { useAnnouncements } from "@/hooks/use-announcements";
import { useDocumentsProcessing } from "@/hooks/use-documents-processing";
import { useInbox } from "@/hooks/use-inbox";
import { notificationsApiService } from "@/lib/apis/notifications-api.service";
import { searchSpacesApiService } from "@/lib/apis/search-spaces-api.service";
import { logout } from "@/lib/auth-utils";
import { deleteThread, fetchThreads, updateThread } from "@/lib/chat/thread-persistence";
@ -144,11 +146,39 @@ export function LayoutDataProvider({ searchSpaceId, children }: LayoutDataProvid
const userId = user?.id ? String(user.id) : null;
const numericSpaceId = Number(searchSpaceId) || null;
const commentsInbox = useInbox(userId, numericSpaceId, "comments");
const statusInbox = useInbox(userId, numericSpaceId, "status");
// Batch-fetch unread counts for all categories in a single request
// instead of 2 separate /unread-count calls.
const { data: batchUnread, isLoading: isBatchUnreadLoading } = useQuery({
queryKey: cacheKeys.notifications.batchUnreadCounts(numericSpaceId),
queryFn: () => notificationsApiService.getBatchUnreadCounts(numericSpaceId ?? undefined),
enabled: !!userId && !!numericSpaceId,
staleTime: 30_000,
});
const commentsInbox = useInbox(
userId,
numericSpaceId,
"comments",
batchUnread?.comments,
!isBatchUnreadLoading
);
const statusInbox = useInbox(
userId,
numericSpaceId,
"status",
batchUnread?.status,
!isBatchUnreadLoading
);
const totalUnreadCount = commentsInbox.unreadCount + statusInbox.unreadCount;
// Sync status inbox items to a shared atom so child components
// (e.g. ConnectorPopup) can read them without creating duplicate useInbox hooks.
const setStatusInboxItems = useSetAtom(statusInboxItemsAtom);
useEffect(() => {
setStatusInboxItems(statusInbox.inboxItems);
}, [statusInbox.inboxItems, setStatusInboxItems]);
// Document processing status — drives sidebar status indicator (spinner / check / error)
const documentsProcessingStatus = useDocumentsProcessing(numericSpaceId);

View file

@ -284,6 +284,20 @@ export const getSourceTypesResponse = z.object({
sources: z.array(sourceTypeItem),
});
/**
* Batched unread counts for all categories in a single response.
* Replaces 2 separate /unread-count calls (comments + status).
*/
export const categoryUnreadCount = z.object({
total_unread: z.number(),
recent_unread: z.number(),
});
export const getBatchUnreadCountResponse = z.object({
comments: categoryUnreadCount,
status: categoryUnreadCount,
});
// =============================================================================
// Type Guards for Metadata
// =============================================================================
@ -412,3 +426,4 @@ export type GetUnreadCountRequest = z.infer<typeof getUnreadCountRequest>;
export type GetUnreadCountResponse = z.infer<typeof getUnreadCountResponse>;
export type SourceTypeItem = z.infer<typeof sourceTypeItem>;
export type GetSourceTypesResponse = z.infer<typeof getSourceTypesResponse>;
export type GetBatchUnreadCountResponse = z.infer<typeof getBatchUnreadCountResponse>;

View file

@ -57,7 +57,9 @@ function getSyncCutoffDate(): string {
export function useInbox(
userId: string | null,
searchSpaceId: number | null,
category: NotificationCategory
category: NotificationCategory,
prefetchedUnread?: { total_unread: number; recent_unread: number } | null,
prefetchedUnreadReady = true,
) {
const electricClient = useElectricClient();
@ -77,9 +79,12 @@ export function useInbox(
const olderUnreadOffsetRef = useRef<number | null>(null);
const apiUnreadTotalRef = useRef(0);
// EFFECT 1: Fetch first page + unread count from API with category filter
// EFFECT 1: Fetch first page + unread count from API with category filter.
// When prefetchedUnreadReady=false, we wait for the batch query to settle
// before deciding whether we need an individual unread-count fallback call.
useEffect(() => {
if (!userId || !searchSpaceId) return;
if (!prefetchedUnreadReady) return;
let cancelled = false;
@ -94,15 +99,22 @@ export function useInbox(
const fetchInitialData = async () => {
try {
const notificationsPromise = notificationsApiService.getNotifications({
queryParams: {
search_space_id: searchSpaceId,
category,
limit: INITIAL_PAGE_SIZE,
},
});
// Use prefetched counts when available, otherwise fetch individually.
const unreadPromise = prefetchedUnread
? Promise.resolve(prefetchedUnread)
: notificationsApiService.getUnreadCount(searchSpaceId, undefined, category);
const [notificationsResponse, unreadResponse] = await Promise.all([
notificationsApiService.getNotifications({
queryParams: {
search_space_id: searchSpaceId,
category,
limit: INITIAL_PAGE_SIZE,
},
}),
notificationsApiService.getUnreadCount(searchSpaceId, undefined, category),
notificationsPromise,
unreadPromise,
]);
if (cancelled) return;
@ -127,7 +139,7 @@ export function useInbox(
return () => {
cancelled = true;
};
}, [userId, searchSpaceId, category]);
}, [userId, searchSpaceId, category, prefetchedUnread, prefetchedUnreadReady]);
// EFFECT 2: Electric sync (shared shape) + per-instance type-filtered live queries
useEffect(() => {

View file

@ -1,8 +1,10 @@
import {
type GetBatchUnreadCountResponse,
type GetNotificationsRequest,
type GetNotificationsResponse,
type GetSourceTypesResponse,
type GetUnreadCountResponse,
getBatchUnreadCountResponse,
getNotificationsRequest,
getNotificationsResponse,
getSourceTypesResponse,
@ -149,6 +151,25 @@ class NotificationsApiService {
getUnreadCountResponse
);
};
/**
* Get unread counts for all categories in a single request.
* Replaces 2 separate getUnreadCount calls (comments + status).
*/
getBatchUnreadCounts = async (
searchSpaceId?: number
): Promise<GetBatchUnreadCountResponse> => {
const params = new URLSearchParams();
if (searchSpaceId !== undefined) {
params.append("search_space_id", String(searchSpaceId));
}
const queryString = params.toString();
return baseApiService.get(
`/api/v1/notifications/unread-counts-batch${queryString ? `?${queryString}` : ""}`,
getBatchUnreadCountResponse
);
};
}
export const notificationsApiService = new NotificationsApiService();

View file

@ -98,5 +98,7 @@ export const cacheKeys = {
["notifications", "search", searchSpaceId, search, tab] as const,
sourceTypes: (searchSpaceId: number | null) =>
["notifications", "source-types", searchSpaceId] as const,
batchUnreadCounts: (searchSpaceId: number | null) =>
["notifications", "unread-counts-batch", searchSpaceId] as const,
},
};