mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
Merge pull request #842 from AnishSarkar22/refactor/upload-document-adapter-class
refactor: Migrate document reindexing to `UploadDocumentAdapter` with unified indexing pipeline
This commit is contained in:
commit
7b6f832483
5 changed files with 324 additions and 103 deletions
|
|
@ -1,47 +1,83 @@
|
|||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import DocumentStatus, DocumentType
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import compute_content_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
|
||||
async def index_uploaded_file(
|
||||
markdown_content: str,
|
||||
filename: str,
|
||||
etl_service: str,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
session: AsyncSession,
|
||||
llm,
|
||||
should_summarize: bool = False,
|
||||
) -> None:
|
||||
connector_doc = ConnectorDocument(
|
||||
title=filename,
|
||||
source_markdown=markdown_content,
|
||||
unique_id=filename,
|
||||
document_type=DocumentType.FILE,
|
||||
search_space_id=search_space_id,
|
||||
created_by_id=user_id,
|
||||
connector_id=None,
|
||||
should_summarize=should_summarize,
|
||||
should_use_code_chunker=False,
|
||||
fallback_summary=markdown_content[:4000],
|
||||
metadata={
|
||||
"FILE_NAME": filename,
|
||||
"ETL_SERVICE": etl_service,
|
||||
},
|
||||
)
|
||||
class UploadDocumentAdapter:
|
||||
def __init__(self, session: AsyncSession) -> None:
|
||||
self._session = session
|
||||
self._service = IndexingPipelineService(session)
|
||||
|
||||
service = IndexingPipelineService(session)
|
||||
documents = await service.prepare_for_indexing([connector_doc])
|
||||
async def index(
|
||||
self,
|
||||
markdown_content: str,
|
||||
filename: str,
|
||||
etl_service: str,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
llm,
|
||||
should_summarize: bool = False,
|
||||
) -> None:
|
||||
connector_doc = ConnectorDocument(
|
||||
title=filename,
|
||||
source_markdown=markdown_content,
|
||||
unique_id=filename,
|
||||
document_type=DocumentType.FILE,
|
||||
search_space_id=search_space_id,
|
||||
created_by_id=user_id,
|
||||
connector_id=None,
|
||||
should_summarize=should_summarize,
|
||||
should_use_code_chunker=False,
|
||||
fallback_summary=markdown_content[:4000],
|
||||
metadata={
|
||||
"FILE_NAME": filename,
|
||||
"ETL_SERVICE": etl_service,
|
||||
},
|
||||
)
|
||||
|
||||
if not documents:
|
||||
raise RuntimeError("prepare_for_indexing returned no documents")
|
||||
documents = await self._service.prepare_for_indexing([connector_doc])
|
||||
|
||||
indexed = await service.index(documents[0], connector_doc, llm)
|
||||
if not documents:
|
||||
raise RuntimeError("prepare_for_indexing returned no documents")
|
||||
|
||||
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
|
||||
raise RuntimeError(indexed.status.get("reason", "Indexing failed"))
|
||||
indexed = await self._service.index(documents[0], connector_doc, llm)
|
||||
|
||||
indexed.content_needs_reindexing = False
|
||||
await session.commit()
|
||||
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
|
||||
raise RuntimeError(indexed.status.get("reason", "Indexing failed"))
|
||||
|
||||
indexed.content_needs_reindexing = False
|
||||
await self._session.commit()
|
||||
|
||||
async def reindex(self, document: Document, llm) -> None:
|
||||
"""Re-index an existing document after its source_markdown has been updated."""
|
||||
if not document.source_markdown:
|
||||
raise RuntimeError("Document has no source_markdown to reindex")
|
||||
|
||||
metadata = document.document_metadata or {}
|
||||
|
||||
connector_doc = ConnectorDocument(
|
||||
title=document.title,
|
||||
source_markdown=document.source_markdown,
|
||||
unique_id=document.title,
|
||||
document_type=document.document_type,
|
||||
search_space_id=document.search_space_id,
|
||||
created_by_id=str(document.created_by_id),
|
||||
connector_id=document.connector_id,
|
||||
should_summarize=True,
|
||||
should_use_code_chunker=False,
|
||||
fallback_summary=document.source_markdown[:4000],
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
document.content_hash = compute_content_hash(connector_doc)
|
||||
|
||||
indexed = await self._service.index(document, connector_doc, llm)
|
||||
|
||||
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
|
||||
raise RuntimeError(indexed.status.get("reason", "Reindexing failed"))
|
||||
|
||||
indexed.content_needs_reindexing = False
|
||||
await self._session.commit()
|
||||
|
|
|
|||
|
|
@ -2,19 +2,16 @@
|
|||
|
||||
import logging
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.celery_app import celery_app
|
||||
from app.db import Document
|
||||
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.tasks.celery_tasks import get_celery_session_maker
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_document_summary,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -42,7 +39,6 @@ def reindex_document_task(self, document_id: int, user_id: str):
|
|||
async def _reindex_document(document_id: int, user_id: str):
|
||||
"""Async function to reindex a document."""
|
||||
async with get_celery_session_maker()() as session:
|
||||
# First, get the document to get search_space_id for logging
|
||||
result = await session.execute(
|
||||
select(Document)
|
||||
.options(selectinload(Document.chunks))
|
||||
|
|
@ -54,10 +50,8 @@ async def _reindex_document(document_id: int, user_id: str):
|
|||
logger.error(f"Document {document_id} not found")
|
||||
return
|
||||
|
||||
# Initialize task logger
|
||||
task_logger = TaskLoggingService(session, document.search_space_id)
|
||||
|
||||
# Log task start
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="document_reindex",
|
||||
source="editor",
|
||||
|
|
@ -71,10 +65,7 @@ async def _reindex_document(document_id: int, user_id: str):
|
|||
)
|
||||
|
||||
try:
|
||||
# Read markdown directly from source_markdown
|
||||
markdown_content = document.source_markdown
|
||||
|
||||
if not markdown_content:
|
||||
if not document.source_markdown:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Document {document_id} has no source_markdown to reindex",
|
||||
|
|
@ -85,51 +76,17 @@ async def _reindex_document(document_id: int, user_id: str):
|
|||
|
||||
logger.info(f"Reindexing document {document_id} ({document.title})")
|
||||
|
||||
# 1. Delete old chunks explicitly
|
||||
from app.db import Chunk
|
||||
|
||||
await session.execute(delete(Chunk).where(Chunk.document_id == document_id))
|
||||
await session.flush() # Ensure old chunks are deleted
|
||||
|
||||
# 2. Create new chunks from source_markdown
|
||||
new_chunks = await create_document_chunks(markdown_content)
|
||||
|
||||
# 3. Add new chunks to session
|
||||
for chunk in new_chunks:
|
||||
chunk.document_id = document_id
|
||||
session.add(chunk)
|
||||
|
||||
logger.info(f"Created {len(new_chunks)} chunks for document {document_id}")
|
||||
|
||||
# 4. Regenerate summary
|
||||
user_llm = await get_user_long_context_llm(
|
||||
session, user_id, document.search_space_id
|
||||
)
|
||||
|
||||
document_metadata = {
|
||||
"title": document.title,
|
||||
"document_type": document.document_type.value,
|
||||
}
|
||||
adapter = UploadDocumentAdapter(session)
|
||||
await adapter.reindex(document=document, llm=user_llm)
|
||||
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
markdown_content, user_llm, document_metadata
|
||||
)
|
||||
|
||||
# 5. Update document
|
||||
document.content = summary_content
|
||||
document.embedding = summary_embedding
|
||||
document.content_needs_reindexing = False
|
||||
|
||||
await session.commit()
|
||||
|
||||
# Log success
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully reindexed document: {document.title}",
|
||||
{
|
||||
"chunks_created": len(new_chunks),
|
||||
"document_id": document_id,
|
||||
},
|
||||
{"document_id": document_id},
|
||||
)
|
||||
|
||||
logger.info(f"Successfully reindexed document {document_id}")
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentStatus, DocumentType, Log, Notification
|
||||
from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file
|
||||
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.notification_service import NotificationService
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
|
|
@ -1871,13 +1871,13 @@ async def process_file_in_background_with_document(
|
|||
|
||||
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
|
||||
|
||||
await index_uploaded_file(
|
||||
adapter = UploadDocumentAdapter(session)
|
||||
await adapter.index(
|
||||
markdown_content=markdown_content,
|
||||
filename=filename,
|
||||
etl_service=etl_service,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
llm=user_llm,
|
||||
should_summarize=should_summarize,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import pytest
|
|||
from sqlalchemy import select
|
||||
|
||||
from app.db import Chunk, Document, DocumentStatus
|
||||
from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file
|
||||
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
|
@ -12,13 +12,13 @@ pytestmark = pytest.mark.integration
|
|||
)
|
||||
async def test_sets_status_ready(db_session, db_search_space, db_user, mocker):
|
||||
"""Document status is READY after successful indexing."""
|
||||
await index_uploaded_file(
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Hello\n\nSome content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
session=db_session,
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
|
|
@ -35,14 +35,15 @@ async def test_sets_status_ready(db_session, db_search_space, db_user, mocker):
|
|||
)
|
||||
async def test_content_is_summary(db_session, db_search_space, db_user, mocker):
|
||||
"""Document content is set to the LLM-generated summary."""
|
||||
await index_uploaded_file(
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Hello\n\nSome content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
session=db_session,
|
||||
llm=mocker.Mock(),
|
||||
should_summarize=True,
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
|
|
@ -58,13 +59,13 @@ async def test_content_is_summary(db_session, db_search_space, db_user, mocker):
|
|||
)
|
||||
async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker):
|
||||
"""Chunks derived from the source markdown are persisted in the DB."""
|
||||
await index_uploaded_file(
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Hello\n\nSome content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
session=db_session,
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
|
|
@ -87,13 +88,239 @@ async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker
|
|||
)
|
||||
async def test_raises_on_indexing_failure(db_session, db_search_space, db_user, mocker):
|
||||
"""RuntimeError is raised when the indexing step fails so the caller can fire a failure notification."""
|
||||
with pytest.raises(RuntimeError):
|
||||
await index_uploaded_file(
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
with pytest.raises(RuntimeError, match=r"Embedding failed|Indexing failed"):
|
||||
await adapter.index(
|
||||
markdown_content="## Hello\n\nSome content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
session=db_session,
|
||||
llm=mocker.Mock(),
|
||||
should_summarize=True,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# reindex() tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_text", "patched_chunk_text"
|
||||
)
|
||||
async def test_reindex_updates_content(db_session, db_search_space, db_user, mocker):
|
||||
"""Document content is updated to the new summary after reindexing."""
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
await db_session.flush()
|
||||
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
await db_session.refresh(document)
|
||||
assert document.content == "Mocked summary."
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_text", "patched_chunk_text"
|
||||
)
|
||||
async def test_reindex_updates_content_hash(
|
||||
db_session, db_search_space, db_user, mocker
|
||||
):
|
||||
"""Content hash is recomputed after reindexing with new source markdown."""
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
original_hash = document.content_hash
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
await db_session.flush()
|
||||
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
await db_session.refresh(document)
|
||||
assert document.content_hash != original_hash
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_text", "patched_chunk_text"
|
||||
)
|
||||
async def test_reindex_sets_status_ready(db_session, db_search_space, db_user, mocker):
|
||||
"""Document status is READY after successful reindexing."""
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
await db_session.flush()
|
||||
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
await db_session.refresh(document)
|
||||
assert DocumentStatus.is_state(document.status, DocumentStatus.READY)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text")
|
||||
async def test_reindex_replaces_chunks(db_session, db_search_space, db_user, mocker):
|
||||
"""Reindexing replaces old chunks with new content rather than appending."""
|
||||
mocker.patch(
|
||||
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
|
||||
side_effect=[["Original chunk."], ["Updated chunk."]],
|
||||
)
|
||||
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
document_id = document.id
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
await db_session.flush()
|
||||
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
chunks_result = await db_session.execute(
|
||||
select(Chunk).filter(Chunk.document_id == document_id)
|
||||
)
|
||||
chunks = chunks_result.scalars().all()
|
||||
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0].content == "Updated chunk."
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_text", "patched_chunk_text"
|
||||
)
|
||||
async def test_reindex_clears_reindexing_flag(
|
||||
db_session, db_search_space, db_user, mocker
|
||||
):
|
||||
"""After successful reindex, content_needs_reindexing is False."""
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
document.content_needs_reindexing = True
|
||||
await db_session.flush()
|
||||
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
await db_session.refresh(document)
|
||||
assert document.content_needs_reindexing is False
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_embed_text", "patched_chunk_text")
|
||||
async def test_reindex_raises_on_failure(db_session, db_search_space, db_user, mocker):
|
||||
"""RuntimeError is raised when reindexing fails so the caller can handle it."""
|
||||
mocker.patch(
|
||||
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
|
||||
return_value="Mocked summary.",
|
||||
)
|
||||
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
await adapter.index(
|
||||
markdown_content="## Original\n\nOriginal content.",
|
||||
filename="test.pdf",
|
||||
etl_service="UNSTRUCTURED",
|
||||
search_space_id=db_search_space.id,
|
||||
user_id=str(db_user.id),
|
||||
llm=mocker.Mock(),
|
||||
)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
document = result.scalars().first()
|
||||
|
||||
document.source_markdown = "## Edited\n\nNew content after user edit."
|
||||
await db_session.flush()
|
||||
|
||||
mocker.patch(
|
||||
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
|
||||
side_effect=RuntimeError("LLM unavailable"),
|
||||
)
|
||||
|
||||
with pytest.raises(RuntimeError, match=r"Embedding failed|Reindexing failed"):
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
||||
|
||||
async def test_reindex_raises_on_empty_source_markdown(
|
||||
db_session, db_search_space, db_user, mocker
|
||||
):
|
||||
"""Reindexing a document with no source_markdown raises immediately."""
|
||||
from app.db import DocumentType
|
||||
|
||||
document = Document(
|
||||
title="empty.pdf",
|
||||
document_type=DocumentType.FILE,
|
||||
content="placeholder",
|
||||
content_hash="abc123",
|
||||
unique_identifier_hash="def456",
|
||||
source_markdown="",
|
||||
search_space_id=db_search_space.id,
|
||||
created_by_id=str(db_user.id),
|
||||
)
|
||||
db_session.add(document)
|
||||
await db_session.flush()
|
||||
|
||||
adapter = UploadDocumentAdapter(db_session)
|
||||
|
||||
with pytest.raises(RuntimeError, match="no source_markdown"):
|
||||
await adapter.reindex(document=document, llm=mocker.Mock())
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import { useEffect, useRef } from "react";
|
||||
import type { GetCommentsResponse } from "@/contracts/types/chat-comments.types";
|
||||
import { chatCommentsApiService } from "@/lib/apis/chat-comments-api.service";
|
||||
import { cacheKeys } from "@/lib/query-client/cache-keys";
|
||||
|
||||
|
|
@ -22,20 +23,20 @@ let _batchTargetIds = new Set<number>();
|
|||
let _batchReady: Promise<void> | null = null;
|
||||
let _resolveBatchReady: (() => void) | null = null;
|
||||
|
||||
function resetBatchGate() {
|
||||
function resetBatchGate(resolveImmediately = false) {
|
||||
_batchReady = new Promise<void>((r) => {
|
||||
_resolveBatchReady = r;
|
||||
if (resolveImmediately) r();
|
||||
});
|
||||
}
|
||||
|
||||
// Open the initial gate immediately (no batch pending yet)
|
||||
resetBatchGate();
|
||||
_resolveBatchReady?.();
|
||||
resetBatchGate(true);
|
||||
|
||||
export function useComments({ messageId, enabled = true }: UseCommentsOptions) {
|
||||
const queryClient = useQueryClient();
|
||||
|
||||
return useQuery({
|
||||
return useQuery<GetCommentsResponse>({
|
||||
queryKey: cacheKeys.comments.byMessage(messageId),
|
||||
queryFn: async () => {
|
||||
// Wait for the batch gate so the useEffect in useBatchCommentsPreload
|
||||
|
|
@ -46,7 +47,7 @@ export function useComments({ messageId, enabled = true }: UseCommentsOptions) {
|
|||
|
||||
if (_batchInflight && _batchTargetIds.has(messageId)) {
|
||||
await _batchInflight;
|
||||
const cached = queryClient.getQueryData(cacheKeys.comments.byMessage(messageId));
|
||||
const cached = queryClient.getQueryData<GetCommentsResponse>(cacheKeys.comments.byMessage(messageId));
|
||||
if (cached) return cached;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue