mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
feat: implement batch unread counts for notifications to reduce API calls and improve performance
This commit is contained in:
parent
7362da52d3
commit
403097646d
18 changed files with 450 additions and 51 deletions
|
|
@ -89,6 +89,108 @@ async def _run_heartbeat_loop(notification_id: int):
|
|||
pass # Normal cancellation when task completes
|
||||
|
||||
|
||||
@celery_app.task(
|
||||
name="delete_document_background",
|
||||
bind=True,
|
||||
autoretry_for=(Exception,),
|
||||
retry_backoff=True,
|
||||
retry_backoff_max=300,
|
||||
max_retries=5,
|
||||
)
|
||||
def delete_document_task(self, document_id: int):
|
||||
"""Celery task to delete a document and its chunks in batches."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(_delete_document_background(document_id))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _delete_document_background(document_id: int) -> None:
|
||||
"""Delete chunks in batches first, then remove the document row."""
|
||||
from sqlalchemy import delete as sa_delete, select
|
||||
|
||||
from app.db import Chunk, Document
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
batch_size = 500
|
||||
while True:
|
||||
chunk_ids_result = await session.execute(
|
||||
select(Chunk.id)
|
||||
.where(Chunk.document_id == document_id)
|
||||
.limit(batch_size)
|
||||
)
|
||||
chunk_ids = chunk_ids_result.scalars().all()
|
||||
if not chunk_ids:
|
||||
break
|
||||
await session.execute(sa_delete(Chunk).where(Chunk.id.in_(chunk_ids)))
|
||||
await session.commit()
|
||||
|
||||
doc = await session.get(Document, document_id)
|
||||
if doc:
|
||||
await session.delete(doc)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@celery_app.task(
|
||||
name="delete_search_space_background",
|
||||
bind=True,
|
||||
autoretry_for=(Exception,),
|
||||
retry_backoff=True,
|
||||
retry_backoff_max=300,
|
||||
max_retries=5,
|
||||
)
|
||||
def delete_search_space_task(self, search_space_id: int):
|
||||
"""Celery task to delete a search space and heavy child rows in batches."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(_delete_search_space_background(search_space_id))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _delete_search_space_background(search_space_id: int) -> None:
|
||||
"""Delete chunks/docs in batches first, then delete the search space."""
|
||||
from sqlalchemy import delete as sa_delete, select
|
||||
|
||||
from app.db import Chunk, Document, SearchSpace
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
batch_size = 500
|
||||
|
||||
while True:
|
||||
chunk_ids_result = await session.execute(
|
||||
select(Chunk.id)
|
||||
.join(Document, Chunk.document_id == Document.id)
|
||||
.where(Document.search_space_id == search_space_id)
|
||||
.limit(batch_size)
|
||||
)
|
||||
chunk_ids = chunk_ids_result.scalars().all()
|
||||
if not chunk_ids:
|
||||
break
|
||||
await session.execute(sa_delete(Chunk).where(Chunk.id.in_(chunk_ids)))
|
||||
await session.commit()
|
||||
|
||||
while True:
|
||||
doc_ids_result = await session.execute(
|
||||
select(Document.id)
|
||||
.where(Document.search_space_id == search_space_id)
|
||||
.limit(batch_size)
|
||||
)
|
||||
doc_ids = doc_ids_result.scalars().all()
|
||||
if not doc_ids:
|
||||
break
|
||||
await session.execute(sa_delete(Document).where(Document.id.in_(doc_ids)))
|
||||
await session.commit()
|
||||
|
||||
space = await session.get(SearchSpace, search_space_id)
|
||||
if space:
|
||||
await session.delete(space)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@celery_app.task(name="process_extension_document", bind=True)
|
||||
def process_extension_document_task(
|
||||
self, individual_document_dict, search_space_id: int, user_id: str
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue