mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-17 18:35:19 +02:00
chore: ran both frontend and backend linting
This commit is contained in:
parent
99bd2df463
commit
5bd6bd3d67
21 changed files with 861 additions and 739 deletions
|
|
@ -6,6 +6,7 @@ Revises: 61
|
|||
Note: Electric SQL replication setup (REPLICA IDENTITY FULL and publication)
|
||||
is handled in app/db.py setup_electric_replication() which runs on app startup.
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from alembic import op
|
||||
|
|
|
|||
|
|
@ -711,15 +711,22 @@ class Notification(BaseModel, TimestampMixin):
|
|||
__tablename__ = "notifications"
|
||||
|
||||
user_id = Column(
|
||||
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False, index=True
|
||||
UUID(as_uuid=True),
|
||||
ForeignKey("user.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
index=True,
|
||||
)
|
||||
search_space_id = Column(
|
||||
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
|
||||
)
|
||||
type = Column(String(50), nullable=False) # 'connector_indexing', 'document_processing', etc.
|
||||
type = Column(
|
||||
String(50), nullable=False
|
||||
) # 'connector_indexing', 'document_processing', etc.
|
||||
title = Column(String(200), nullable=False)
|
||||
message = Column(Text, nullable=False)
|
||||
read = Column(Boolean, nullable=False, default=False, server_default=text("false"), index=True)
|
||||
read = Column(
|
||||
Boolean, nullable=False, default=False, server_default=text("false"), index=True
|
||||
)
|
||||
notification_metadata = Column("metadata", JSONB, nullable=True, default={})
|
||||
|
||||
user = relationship("User", back_populates="notifications")
|
||||
|
|
@ -990,7 +997,9 @@ async def setup_electric_replication():
|
|||
# Set REPLICA IDENTITY FULL (required by Electric SQL for replication)
|
||||
# This logs full row data for UPDATE/DELETE operations in the WAL
|
||||
await conn.execute(text("ALTER TABLE notifications REPLICA IDENTITY FULL;"))
|
||||
await conn.execute(text("ALTER TABLE search_source_connectors REPLICA IDENTITY FULL;"))
|
||||
await conn.execute(
|
||||
text("ALTER TABLE search_source_connectors REPLICA IDENTITY FULL;")
|
||||
)
|
||||
await conn.execute(text("ALTER TABLE documents REPLICA IDENTITY FULL;"))
|
||||
|
||||
# Add tables to Electric SQL publication for replication
|
||||
|
|
|
|||
|
|
@ -986,21 +986,25 @@ async def _run_indexing_with_notifications(
|
|||
try:
|
||||
# Get connector info for notification
|
||||
connector_result = await session.execute(
|
||||
select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
|
||||
select(SearchSourceConnector).where(
|
||||
SearchSourceConnector.id == connector_id
|
||||
)
|
||||
)
|
||||
connector = connector_result.scalar_one_or_none()
|
||||
|
||||
if connector:
|
||||
# Create notification when indexing starts
|
||||
notification = await NotificationService.connector_indexing.notify_indexing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
connector_id=connector_id,
|
||||
connector_name=connector.name,
|
||||
connector_type=connector.connector_type.value,
|
||||
search_space_id=search_space_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
notification = (
|
||||
await NotificationService.connector_indexing.notify_indexing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
connector_id=connector_id,
|
||||
connector_name=connector.name,
|
||||
connector_type=connector.connector_type.value,
|
||||
search_space_id=search_space_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
)
|
||||
|
||||
# Update notification to fetching stage
|
||||
|
|
@ -1640,6 +1644,7 @@ async def run_google_gmail_indexing(
|
|||
start_date: Start date for indexing
|
||||
end_date: End date for indexing
|
||||
"""
|
||||
|
||||
# Create a wrapper function that calls index_google_gmail_messages with max_messages
|
||||
async def gmail_indexing_wrapper(
|
||||
session: AsyncSession,
|
||||
|
|
@ -1701,7 +1706,9 @@ async def run_google_drive_indexing(
|
|||
|
||||
# Get connector info for notification
|
||||
connector_result = await session.execute(
|
||||
select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
|
||||
select(SearchSourceConnector).where(
|
||||
SearchSourceConnector.id == connector_id
|
||||
)
|
||||
)
|
||||
connector = connector_result.scalar_one_or_none()
|
||||
|
||||
|
|
@ -1813,7 +1820,7 @@ async def run_google_drive_indexing(
|
|||
f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
# Update notification on exception
|
||||
if notification:
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -99,7 +99,9 @@ class BaseNotificationHandler:
|
|||
flag_modified(notification, "notification_metadata")
|
||||
await session.commit()
|
||||
await session.refresh(notification)
|
||||
logger.info(f"Updated notification {notification.id} for operation {operation_id}")
|
||||
logger.info(
|
||||
f"Updated notification {notification.id} for operation {operation_id}"
|
||||
)
|
||||
return notification
|
||||
|
||||
# Create new notification
|
||||
|
|
@ -119,7 +121,9 @@ class BaseNotificationHandler:
|
|||
session.add(notification)
|
||||
await session.commit()
|
||||
await session.refresh(notification)
|
||||
logger.info(f"Created notification {notification.id} for operation {operation_id}")
|
||||
logger.info(
|
||||
f"Created notification {notification.id} for operation {operation_id}"
|
||||
)
|
||||
return notification
|
||||
|
||||
async def update_notification(
|
||||
|
|
@ -153,9 +157,9 @@ class BaseNotificationHandler:
|
|||
if status is not None:
|
||||
notification.notification_metadata["status"] = status
|
||||
if status in ("completed", "failed"):
|
||||
notification.notification_metadata["completed_at"] = (
|
||||
datetime.now(UTC).isoformat()
|
||||
)
|
||||
notification.notification_metadata["completed_at"] = datetime.now(
|
||||
UTC
|
||||
).isoformat()
|
||||
# Mark JSONB column as modified so SQLAlchemy detects the change
|
||||
flag_modified(notification, "notification_metadata")
|
||||
|
||||
|
|
@ -180,7 +184,10 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
super().__init__("connector_indexing")
|
||||
|
||||
def _generate_operation_id(
|
||||
self, connector_id: int, start_date: str | None = None, end_date: str | None = None
|
||||
self,
|
||||
connector_id: int,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
Generate a unique operation ID for a connector indexing operation.
|
||||
|
|
@ -298,7 +305,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
"processing": "Preparing for search",
|
||||
"storing": "Almost done",
|
||||
}
|
||||
|
||||
|
||||
# Use stage-based message if stage provided, otherwise fallback
|
||||
if stage or stage_message:
|
||||
progress_msg = stage_message or stage_messages.get(stage, "Processing")
|
||||
|
|
@ -341,7 +348,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
Returns:
|
||||
Updated notification
|
||||
"""
|
||||
connector_name = notification.notification_metadata.get("connector_name", "Connector")
|
||||
connector_name = notification.notification_metadata.get(
|
||||
"connector_name", "Connector"
|
||||
)
|
||||
|
||||
if error_message:
|
||||
title = f"Failed: {connector_name}"
|
||||
|
|
@ -414,7 +423,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
"indexed_count": 0,
|
||||
"sync_stage": "connecting",
|
||||
}
|
||||
|
||||
|
||||
if folder_names:
|
||||
metadata["folder_names"] = folder_names
|
||||
if file_names:
|
||||
|
|
@ -454,6 +463,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
|
||||
# Create a short hash of filename to ensure uniqueness
|
||||
import hashlib
|
||||
|
||||
filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
|
||||
return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
|
||||
|
||||
|
|
@ -480,7 +490,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
Returns:
|
||||
Notification: The created notification
|
||||
"""
|
||||
operation_id = self._generate_operation_id(document_type, document_name, search_space_id)
|
||||
operation_id = self._generate_operation_id(
|
||||
document_type, document_name, search_space_id
|
||||
)
|
||||
title = f"Processing: {document_name}"
|
||||
message = "Waiting in queue"
|
||||
|
||||
|
|
@ -489,7 +501,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
"document_name": document_name,
|
||||
"processing_stage": "queued",
|
||||
}
|
||||
|
||||
|
||||
if file_size is not None:
|
||||
metadata["file_size"] = file_size
|
||||
|
||||
|
|
@ -531,7 +543,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
"embedding": "Preparing for search",
|
||||
"storing": "Finalizing",
|
||||
}
|
||||
|
||||
|
||||
message = stage_message or stage_messages.get(stage, "Processing")
|
||||
|
||||
metadata_updates = {"processing_stage": stage}
|
||||
|
|
@ -568,7 +580,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
Returns:
|
||||
Updated notification
|
||||
"""
|
||||
document_name = notification.notification_metadata.get("document_name", "Document")
|
||||
document_name = notification.notification_metadata.get(
|
||||
"document_name", "Document"
|
||||
)
|
||||
|
||||
if error_message:
|
||||
title = f"Failed: {document_name}"
|
||||
|
|
@ -583,7 +597,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
"processing_stage": "completed" if not error_message else "failed",
|
||||
"error_message": error_message,
|
||||
}
|
||||
|
||||
|
||||
if document_id is not None:
|
||||
metadata_updates["document_id"] = document_id
|
||||
# Store chunks_count in metadata for debugging, but don't show to user
|
||||
|
|
@ -645,4 +659,3 @@ class NotificationService:
|
|||
await session.refresh(notification)
|
||||
logger.info(f"Created notification {notification.id} for user {user_id}")
|
||||
return notification
|
||||
|
||||
|
|
|
|||
|
|
@ -92,12 +92,14 @@ async def _process_extension_document(
|
|||
page_title += "..."
|
||||
|
||||
# Create notification for document processing
|
||||
notification = await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="EXTENSION",
|
||||
document_name=page_title,
|
||||
search_space_id=search_space_id,
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="EXTENSION",
|
||||
document_name=page_title,
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
|
|
@ -115,7 +117,10 @@ async def _process_extension_document(
|
|||
try:
|
||||
# Update notification: parsing stage
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Reading page content"
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Reading page content",
|
||||
)
|
||||
|
||||
result = await add_extension_received_document(
|
||||
|
|
@ -130,11 +135,13 @@ async def _process_extension_document(
|
|||
)
|
||||
|
||||
# Update notification on success
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
|
|
@ -144,10 +151,12 @@ async def _process_extension_document(
|
|||
)
|
||||
|
||||
# Update notification for duplicate
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Page already saved (duplicate)",
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Page already saved (duplicate)",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
|
|
@ -198,12 +207,14 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
video_name = url.split("v=")[-1][:11] if "v=" in url else url
|
||||
|
||||
# Create notification for document processing
|
||||
notification = await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="YOUTUBE_VIDEO",
|
||||
document_name=f"YouTube: {video_name}",
|
||||
search_space_id=search_space_id,
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="YOUTUBE_VIDEO",
|
||||
document_name=f"YouTube: {video_name}",
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
|
|
@ -216,7 +227,10 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
try:
|
||||
# Update notification: parsing (fetching transcript)
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Fetching video transcript"
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Fetching video transcript",
|
||||
)
|
||||
|
||||
result = await add_youtube_video_document(
|
||||
|
|
@ -235,11 +249,13 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
)
|
||||
|
||||
# Update notification on success
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
await task_logger.log_task_success(
|
||||
|
|
@ -249,10 +265,12 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
|
|||
)
|
||||
|
||||
# Update notification for duplicate
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Video already exists (duplicate)",
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Video already exists (duplicate)",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
|
|
@ -317,13 +335,15 @@ async def _process_file_upload(
|
|||
file_size = None
|
||||
|
||||
# Create notification for document processing
|
||||
notification = await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="FILE",
|
||||
document_name=filename,
|
||||
search_space_id=search_space_id,
|
||||
file_size=file_size,
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="FILE",
|
||||
document_name=filename,
|
||||
search_space_id=search_space_id,
|
||||
file_size=file_size,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
|
|
@ -352,18 +372,22 @@ async def _process_file_upload(
|
|||
|
||||
# Update notification on success
|
||||
if result:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
document_id=result.id,
|
||||
chunks_count=None,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Duplicate detected
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Document already exists (duplicate)",
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message="Document already exists (duplicate)",
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -456,12 +480,14 @@ async def _process_circleback_meeting(
|
|||
# Create notification if user_id is available
|
||||
notification = None
|
||||
if user_id:
|
||||
notification = await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="CIRCLEBACK",
|
||||
document_name=f"Meeting: {meeting_name[:40]}",
|
||||
search_space_id=search_space_id,
|
||||
notification = (
|
||||
await NotificationService.document_processing.notify_processing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
document_type="CIRCLEBACK",
|
||||
document_name=f"Meeting: {meeting_name[:40]}",
|
||||
search_space_id=search_space_id,
|
||||
)
|
||||
)
|
||||
|
||||
log_entry = await task_logger.log_task_start(
|
||||
|
|
@ -479,8 +505,13 @@ async def _process_circleback_meeting(
|
|||
try:
|
||||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Reading meeting notes"
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Reading meeting notes",
|
||||
)
|
||||
)
|
||||
|
||||
result = await add_circleback_meeting_document(
|
||||
|
|
@ -535,10 +566,12 @@ async def _process_circleback_meeting(
|
|||
|
||||
# Update notification on failure
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:100],
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
error_message=str(e)[:100],
|
||||
)
|
||||
)
|
||||
|
||||
logger.error(f"Error processing Circleback meeting: {e!s}")
|
||||
|
|
|
|||
|
|
@ -476,15 +476,21 @@ async def process_file_in_background(
|
|||
log_entry: Log,
|
||||
connector: dict
|
||||
| None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}}
|
||||
notification: Notification | None = None, # Optional notification for progress updates
|
||||
notification: Notification
|
||||
| None = None, # Optional notification for progress updates
|
||||
) -> Document | None:
|
||||
try:
|
||||
# Check if the file is a markdown or text file
|
||||
if filename.lower().endswith((".md", ".markdown", ".txt")):
|
||||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Reading file"
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Reading file",
|
||||
)
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -508,8 +514,10 @@ async def process_file_in_background(
|
|||
|
||||
# Update notification: chunking stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="chunking"
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="chunking"
|
||||
)
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -554,8 +562,13 @@ async def process_file_in_background(
|
|||
):
|
||||
# Update notification: parsing stage (transcription)
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Transcribing audio"
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Transcribing audio",
|
||||
)
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -643,8 +656,10 @@ async def process_file_in_background(
|
|||
|
||||
# Update notification: chunking stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="chunking"
|
||||
await (
|
||||
NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="chunking"
|
||||
)
|
||||
)
|
||||
|
||||
# Clean up the temp file
|
||||
|
|
@ -749,7 +764,10 @@ async def process_file_in_background(
|
|||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Extracting content"
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Extracting content",
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -859,7 +877,10 @@ async def process_file_in_background(
|
|||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Extracting content"
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Extracting content",
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -904,7 +925,10 @@ async def process_file_in_background(
|
|||
# Update notification: chunking stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="chunking", chunks_count=len(markdown_documents)
|
||||
session,
|
||||
notification,
|
||||
stage="chunking",
|
||||
chunks_count=len(markdown_documents),
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -1018,7 +1042,10 @@ async def process_file_in_background(
|
|||
# Update notification: parsing stage
|
||||
if notification:
|
||||
await NotificationService.document_processing.notify_processing_progress(
|
||||
session, notification, stage="parsing", stage_message="Extracting content"
|
||||
session,
|
||||
notification,
|
||||
stage="parsing",
|
||||
stage_message="Extracting content",
|
||||
)
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import { YouTubeCrawlerView } from "./connector-popup/views/youtube-crawler-view
|
|||
export const ConnectorIndicator: FC = () => {
|
||||
const searchSpaceId = useAtomValue(activeSearchSpaceIdAtom);
|
||||
const searchParams = useSearchParams();
|
||||
|
||||
|
||||
// Fetch document type counts using Electric SQL + PGlite for real-time updates
|
||||
const { documentTypeCounts, loading: documentTypesLoading } = useDocumentsElectric(searchSpaceId);
|
||||
|
||||
|
|
@ -96,9 +96,10 @@ export const ConnectorIndicator: FC = () => {
|
|||
} = useConnectorsElectric(searchSpaceId);
|
||||
|
||||
// Fallback to API if Electric fails or is not available
|
||||
const connectors = connectorsFromElectric.length > 0 || !connectorsError
|
||||
? connectorsFromElectric
|
||||
: allConnectors || [];
|
||||
const connectors =
|
||||
connectorsFromElectric.length > 0 || !connectorsError
|
||||
? connectorsFromElectric
|
||||
: allConnectors || [];
|
||||
|
||||
// Manual refresh function that works with both Electric and API
|
||||
const refreshConnectors = async () => {
|
||||
|
|
|
|||
|
|
@ -5,17 +5,17 @@ import type { SearchSourceConnector } from "@/contracts/types/connector.types";
|
|||
|
||||
/**
|
||||
* Hook to track which connectors are currently indexing using local state.
|
||||
*
|
||||
*
|
||||
* This provides a better UX than polling by:
|
||||
* 1. Setting indexing state immediately when user triggers indexing (optimistic)
|
||||
* 2. Clearing indexing state when Electric SQL detects last_indexed_at changed
|
||||
*
|
||||
*
|
||||
* The actual `last_indexed_at` value comes from Electric SQL/PGlite, not local state.
|
||||
*/
|
||||
export function useIndexingConnectors(connectors: SearchSourceConnector[]) {
|
||||
// Set of connector IDs that are currently indexing
|
||||
const [indexingConnectorIds, setIndexingConnectorIds] = useState<Set<number>>(new Set());
|
||||
|
||||
|
||||
// Track previous last_indexed_at values to detect changes
|
||||
const previousLastIndexedAtRef = useRef<Map<number, string | null>>(new Map());
|
||||
|
||||
|
|
@ -79,4 +79,3 @@ export function useIndexingConnectors(connectors: SearchSourceConnector[]) {
|
|||
isIndexing,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -63,7 +63,6 @@ export const ActiveConnectorsTab: FC<ActiveConnectorsTabProps> = ({
|
|||
return `${m.replace(/\.0$/, "")}M docs`;
|
||||
};
|
||||
|
||||
|
||||
// Document types that should be shown as standalone cards (not from connectors)
|
||||
const standaloneDocumentTypes = ["EXTENSION", "FILE", "NOTE", "YOUTUBE_VIDEO", "CRAWLED_URL"];
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,6 @@ export const AllConnectorsTab: FC<AllConnectorsTabProps> = ({
|
|||
onManage,
|
||||
onViewAccountsList,
|
||||
}) => {
|
||||
|
||||
// Filter connectors based on search
|
||||
const filteredOAuth = OAUTH_CONNECTORS.filter(
|
||||
(c) =>
|
||||
|
|
|
|||
|
|
@ -119,10 +119,7 @@ const DocumentUploadPopupContent: FC<{
|
|||
<div className="flex-1 min-h-0 relative overflow-hidden">
|
||||
<div className="h-full overflow-y-auto">
|
||||
<div className="px-6 sm:px-12 pb-5 sm:pb-16">
|
||||
<DocumentUploadTab
|
||||
searchSpaceId={searchSpaceId}
|
||||
onSuccess={handleSuccess}
|
||||
/>
|
||||
<DocumentUploadTab searchSpaceId={searchSpaceId} onSuccess={handleSuccess} />
|
||||
</div>
|
||||
</div>
|
||||
{/* Bottom fade shadow */}
|
||||
|
|
|
|||
|
|
@ -27,11 +27,7 @@ export const TooltipIconButton = forwardRef<HTMLButtonElement, TooltipIconButton
|
|||
<span className="aui-sr-only sr-only">{tooltip}</span>
|
||||
</Button>
|
||||
</TooltipTrigger>
|
||||
<TooltipContent
|
||||
side={side}
|
||||
>
|
||||
{tooltip}
|
||||
</TooltipContent>
|
||||
<TooltipContent side={side}>{tooltip}</TooltipContent>
|
||||
</Tooltip>
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,8 @@ import { cn } from "@/lib/utils";
|
|||
export function NotificationButton() {
|
||||
const { data: user } = useAtomValue(currentUserAtom);
|
||||
const userId = user?.id ? String(user.id) : null;
|
||||
const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = useNotifications(userId);
|
||||
const { notifications, unreadCount, loading, markAsRead, markAllAsRead } =
|
||||
useNotifications(userId);
|
||||
|
||||
return (
|
||||
<Popover>
|
||||
|
|
@ -50,4 +51,3 @@ export function NotificationButton() {
|
|||
</Popover>
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ export function NotificationPopup({
|
|||
|
||||
const getStatusIcon = (notification: Notification) => {
|
||||
const status = notification.metadata?.status as string | undefined;
|
||||
|
||||
|
||||
switch (status) {
|
||||
case "in_progress":
|
||||
return <Loader2 className="h-4 w-4 text-blue-500 animate-spin" />;
|
||||
|
|
@ -62,12 +62,7 @@ export function NotificationPopup({
|
|||
<h3 className="font-semibold text-sm">Notifications</h3>
|
||||
</div>
|
||||
{unreadCount > 0 && (
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={handleMarkAllAsRead}
|
||||
className="h-7 text-xs"
|
||||
>
|
||||
<Button variant="ghost" size="sm" onClick={handleMarkAllAsRead} className="h-7 text-xs">
|
||||
<CheckCheck className="h-3.5 w-3.5 mr-0" />
|
||||
Mark all read
|
||||
</Button>
|
||||
|
|
@ -98,9 +93,7 @@ export function NotificationPopup({
|
|||
)}
|
||||
>
|
||||
<div className="flex items-start gap-3">
|
||||
<div className="flex-shrink-0 mt-0.5">
|
||||
{getStatusIcon(notification)}
|
||||
</div>
|
||||
<div className="flex-shrink-0 mt-0.5">{getStatusIcon(notification)}</div>
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className="flex items-start justify-between gap-2 mb-1">
|
||||
<p
|
||||
|
|
|
|||
|
|
@ -1,54 +1,54 @@
|
|||
"use client"
|
||||
"use client";
|
||||
|
||||
import { useEffect, useState } from 'react'
|
||||
import { initElectric, isElectricInitialized } from '@/lib/electric/client'
|
||||
import { useEffect, useState } from "react";
|
||||
import { initElectric, isElectricInitialized } from "@/lib/electric/client";
|
||||
|
||||
interface ElectricProviderProps {
|
||||
children: React.ReactNode
|
||||
children: React.ReactNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* ElectricProvider initializes the Electric SQL client with PGlite
|
||||
*
|
||||
*
|
||||
* This provider ensures Electric is initialized before rendering children,
|
||||
* but doesn't block if initialization fails (app can still work without real-time sync)
|
||||
*/
|
||||
export function ElectricProvider({ children }: ElectricProviderProps) {
|
||||
const [initialized, setInitialized] = useState(false)
|
||||
const [error, setError] = useState<Error | null>(null)
|
||||
const [initialized, setInitialized] = useState(false);
|
||||
const [error, setError] = useState<Error | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
// Skip if already initialized
|
||||
if (isElectricInitialized()) {
|
||||
setInitialized(true)
|
||||
return
|
||||
setInitialized(true);
|
||||
return;
|
||||
}
|
||||
|
||||
let mounted = true
|
||||
let mounted = true;
|
||||
|
||||
async function init() {
|
||||
try {
|
||||
await initElectric()
|
||||
await initElectric();
|
||||
if (mounted) {
|
||||
setInitialized(true)
|
||||
setError(null)
|
||||
setInitialized(true);
|
||||
setError(null);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to initialize Electric SQL:', err)
|
||||
console.error("Failed to initialize Electric SQL:", err);
|
||||
if (mounted) {
|
||||
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
|
||||
setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL"));
|
||||
// Don't block rendering if Electric SQL fails - app can still work
|
||||
setInitialized(true)
|
||||
setInitialized(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
init()
|
||||
init();
|
||||
|
||||
return () => {
|
||||
mounted = false
|
||||
}
|
||||
}, [])
|
||||
mounted = false;
|
||||
};
|
||||
}, []);
|
||||
|
||||
// Show loading state only briefly, then render children
|
||||
// Electric SQL will sync in the background
|
||||
|
|
@ -57,13 +57,13 @@ export function ElectricProvider({ children }: ElectricProviderProps) {
|
|||
<div className="flex items-center justify-center min-h-screen">
|
||||
<div className="text-muted-foreground">Initializing...</div>
|
||||
</div>
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// If there's an error, still render children but log the error
|
||||
if (error) {
|
||||
console.warn('Electric SQL initialization failed, notifications may not sync:', error.message)
|
||||
console.warn("Electric SQL initialization failed, notifications may not sync:", error.message);
|
||||
}
|
||||
|
||||
return <>{children}</>
|
||||
return <>{children}</>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,19 +5,12 @@ import { documentTypeEnum } from "./document.types";
|
|||
/**
|
||||
* Notification type enum - matches backend notification types
|
||||
*/
|
||||
export const notificationTypeEnum = z.enum([
|
||||
"connector_indexing",
|
||||
"document_processing",
|
||||
]);
|
||||
export const notificationTypeEnum = z.enum(["connector_indexing", "document_processing"]);
|
||||
|
||||
/**
|
||||
* Notification status enum - used in metadata
|
||||
*/
|
||||
export const notificationStatusEnum = z.enum([
|
||||
"in_progress",
|
||||
"completed",
|
||||
"failed",
|
||||
]);
|
||||
export const notificationStatusEnum = z.enum(["in_progress", "completed", "failed"]);
|
||||
|
||||
/**
|
||||
* Document processing stage enum
|
||||
|
|
@ -125,4 +118,3 @@ export type NotificationMetadata = z.infer<typeof notificationMetadata>;
|
|||
export type Notification = z.infer<typeof notification>;
|
||||
export type ConnectorIndexingNotification = z.infer<typeof connectorIndexingNotification>;
|
||||
export type DocumentProcessingNotification = z.infer<typeof documentProcessingNotification>;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,21 @@
|
|||
"use client"
|
||||
"use client";
|
||||
|
||||
import { useEffect, useState, useCallback, useRef } from 'react'
|
||||
import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
|
||||
import type { SearchSourceConnector } from '@/contracts/types/connector.types'
|
||||
import { useEffect, useState, useCallback, useRef } from "react";
|
||||
import {
|
||||
initElectric,
|
||||
isElectricInitialized,
|
||||
type ElectricClient,
|
||||
type SyncHandle,
|
||||
} from "@/lib/electric/client";
|
||||
import type { SearchSourceConnector } from "@/contracts/types/connector.types";
|
||||
|
||||
export function useConnectorsElectric(searchSpaceId: number | string | null) {
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null)
|
||||
const [connectors, setConnectors] = useState<SearchSourceConnector[]>([])
|
||||
const [loading, setLoading] = useState(true)
|
||||
const [error, setError] = useState<Error | null>(null)
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null)
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null)
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null);
|
||||
const [connectors, setConnectors] = useState<SearchSourceConnector[]>([]);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [error, setError] = useState<Error | null>(null);
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
|
||||
// Transform connector data from Electric SQL/PGlite to match expected format
|
||||
// Converts Date objects to ISO strings as expected by Zod schema
|
||||
|
|
@ -18,184 +23,193 @@ export function useConnectorsElectric(searchSpaceId: number | string | null) {
|
|||
return {
|
||||
...connector,
|
||||
last_indexed_at: connector.last_indexed_at
|
||||
? typeof connector.last_indexed_at === 'string'
|
||||
? typeof connector.last_indexed_at === "string"
|
||||
? connector.last_indexed_at
|
||||
: new Date(connector.last_indexed_at).toISOString()
|
||||
: null,
|
||||
next_scheduled_at: connector.next_scheduled_at
|
||||
? typeof connector.next_scheduled_at === 'string'
|
||||
? typeof connector.next_scheduled_at === "string"
|
||||
? connector.next_scheduled_at
|
||||
: new Date(connector.next_scheduled_at).toISOString()
|
||||
: null,
|
||||
created_at: connector.created_at
|
||||
? typeof connector.created_at === 'string'
|
||||
? typeof connector.created_at === "string"
|
||||
? connector.created_at
|
||||
: new Date(connector.created_at).toISOString()
|
||||
: new Date().toISOString(), // fallback
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Initialize Electric SQL and start syncing with real-time updates
|
||||
useEffect(() => {
|
||||
if (!searchSpaceId) {
|
||||
setLoading(false)
|
||||
setConnectors([])
|
||||
return
|
||||
setLoading(false);
|
||||
setConnectors([]);
|
||||
return;
|
||||
}
|
||||
|
||||
let mounted = true
|
||||
let mounted = true;
|
||||
|
||||
async function init() {
|
||||
try {
|
||||
const electricClient = await initElectric()
|
||||
if (!mounted) return
|
||||
const electricClient = await initElectric();
|
||||
if (!mounted) return;
|
||||
|
||||
setElectric(electricClient)
|
||||
setElectric(electricClient);
|
||||
|
||||
// Start syncing connectors for this search space via Electric SQL
|
||||
console.log('Starting Electric SQL sync for connectors, search_space_id:', searchSpaceId)
|
||||
|
||||
console.log("Starting Electric SQL sync for connectors, search_space_id:", searchSpaceId);
|
||||
|
||||
// Use numeric format for WHERE clause (PGlite sync plugin expects this format)
|
||||
const handle = await electricClient.syncShape({
|
||||
table: 'search_source_connectors',
|
||||
table: "search_source_connectors",
|
||||
where: `search_space_id = ${searchSpaceId}`,
|
||||
primaryKey: ['id'],
|
||||
})
|
||||
|
||||
console.log('Electric SQL sync started for connectors:', {
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
console.log("Electric SQL sync started for connectors:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
hasInitialSyncPromise: !!handle.initialSyncPromise,
|
||||
})
|
||||
|
||||
});
|
||||
|
||||
// Optimized: Check if already up-to-date before waiting
|
||||
if (handle.isUpToDate) {
|
||||
console.log('Connectors sync already up-to-date, skipping wait')
|
||||
console.log("Connectors sync already up-to-date, skipping wait");
|
||||
} else if (handle.initialSyncPromise) {
|
||||
// Only wait if not already up-to-date
|
||||
console.log('Waiting for initial connectors sync to complete...')
|
||||
console.log("Waiting for initial connectors sync to complete...");
|
||||
try {
|
||||
// Use Promise.race with a shorter timeout to avoid long waits
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
])
|
||||
console.log('Initial connectors sync promise resolved or timed out, checking status:', {
|
||||
new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
]);
|
||||
console.log("Initial connectors sync promise resolved or timed out, checking status:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
})
|
||||
});
|
||||
} catch (syncErr) {
|
||||
console.error('Initial connectors sync failed:', syncErr)
|
||||
console.error("Initial connectors sync failed:", syncErr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Check status after waiting
|
||||
console.log('Connectors sync status after waiting:', {
|
||||
console.log("Connectors sync status after waiting:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
})
|
||||
});
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe()
|
||||
return
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle
|
||||
setLoading(false)
|
||||
setError(null)
|
||||
syncHandleRef.current = handle;
|
||||
setLoading(false);
|
||||
setError(null);
|
||||
|
||||
// Fetch connectors after sync is complete (we already waited above)
|
||||
await fetchConnectors(electricClient.db)
|
||||
await fetchConnectors(electricClient.db);
|
||||
|
||||
// Set up real-time updates using PGlite live queries
|
||||
// Electric SQL syncs data to PGlite in real-time via HTTP streaming
|
||||
// PGlite live queries detect when the synced data changes and trigger callbacks
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db = electricClient.db as any
|
||||
|
||||
const db = electricClient.db as any;
|
||||
|
||||
// Use PGlite's live query API for real-time updates
|
||||
// CORRECT API: await db.live.query() then use .subscribe()
|
||||
if (db.live?.query && typeof db.live.query === 'function') {
|
||||
if (db.live?.query && typeof db.live.query === "function") {
|
||||
// IMPORTANT: db.live.query() returns a Promise - must await it!
|
||||
const liveQuery = await db.live.query(
|
||||
`SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`,
|
||||
[searchSpaceId]
|
||||
)
|
||||
|
||||
);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.()
|
||||
return
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Set initial results immediately from the resolved query
|
||||
if (liveQuery.initialResults?.rows) {
|
||||
console.log('📋 Initial live query results for connectors:', liveQuery.initialResults.rows.length)
|
||||
setConnectors(liveQuery.initialResults.rows.map(transformConnector))
|
||||
console.log(
|
||||
"📋 Initial live query results for connectors:",
|
||||
liveQuery.initialResults.rows.length
|
||||
);
|
||||
setConnectors(liveQuery.initialResults.rows.map(transformConnector));
|
||||
} else if (liveQuery.rows) {
|
||||
// Some versions have rows directly on the result
|
||||
console.log('📋 Initial live query results for connectors (direct):', liveQuery.rows.length)
|
||||
setConnectors(liveQuery.rows.map(transformConnector))
|
||||
console.log(
|
||||
"📋 Initial live query results for connectors (direct):",
|
||||
liveQuery.rows.length
|
||||
);
|
||||
setConnectors(liveQuery.rows.map(transformConnector));
|
||||
}
|
||||
|
||||
|
||||
// Subscribe to changes - this is the correct API!
|
||||
// The callback fires automatically when Electric SQL syncs new data to PGlite
|
||||
if (typeof liveQuery.subscribe === 'function') {
|
||||
if (typeof liveQuery.subscribe === "function") {
|
||||
liveQuery.subscribe((result: { rows: any[] }) => {
|
||||
if (mounted && result.rows) {
|
||||
console.log('🔄 Connectors updated via live query:', result.rows.length)
|
||||
setConnectors(result.rows.map(transformConnector))
|
||||
console.log("🔄 Connectors updated via live query:", result.rows.length);
|
||||
setConnectors(result.rows.map(transformConnector));
|
||||
}
|
||||
})
|
||||
|
||||
});
|
||||
|
||||
// Store unsubscribe function for cleanup
|
||||
liveQueryRef.current = liveQuery
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
} else {
|
||||
console.warn('PGlite live query API not available, falling back to polling')
|
||||
console.warn("PGlite live query API not available, falling back to polling");
|
||||
}
|
||||
} catch (liveQueryErr) {
|
||||
console.error('Failed to set up live query for connectors:', liveQueryErr)
|
||||
console.error("Failed to set up live query for connectors:", liveQueryErr);
|
||||
// Don't fail completely - we still have the initial fetch
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to initialize Electric SQL for connectors:', err)
|
||||
console.error("Failed to initialize Electric SQL for connectors:", err);
|
||||
if (mounted) {
|
||||
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for connectors'))
|
||||
setLoading(false)
|
||||
setError(
|
||||
err instanceof Error
|
||||
? err
|
||||
: new Error("Failed to initialize Electric SQL for connectors")
|
||||
);
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
init()
|
||||
init();
|
||||
|
||||
return () => {
|
||||
mounted = false
|
||||
syncHandleRef.current?.unsubscribe?.()
|
||||
liveQueryRef.current?.unsubscribe?.()
|
||||
syncHandleRef.current = null
|
||||
liveQueryRef.current = null
|
||||
}
|
||||
}, [searchSpaceId])
|
||||
mounted = false;
|
||||
syncHandleRef.current?.unsubscribe?.();
|
||||
liveQueryRef.current?.unsubscribe?.();
|
||||
syncHandleRef.current = null;
|
||||
liveQueryRef.current = null;
|
||||
};
|
||||
}, [searchSpaceId]);
|
||||
|
||||
async function fetchConnectors(db: any) {
|
||||
try {
|
||||
const result = await db.query(
|
||||
`SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`,
|
||||
[searchSpaceId]
|
||||
)
|
||||
console.log('📋 Fetched connectors from PGlite:', result.rows?.length || 0)
|
||||
setConnectors((result.rows || []).map(transformConnector))
|
||||
);
|
||||
console.log("📋 Fetched connectors from PGlite:", result.rows?.length || 0);
|
||||
setConnectors((result.rows || []).map(transformConnector));
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch connectors from PGlite:', err)
|
||||
console.error("Failed to fetch connectors from PGlite:", err);
|
||||
}
|
||||
}
|
||||
|
||||
// Manual refresh function (optional, for fallback)
|
||||
const refreshConnectors = useCallback(async () => {
|
||||
if (!electric) return
|
||||
await fetchConnectors(electric.db)
|
||||
}, [electric])
|
||||
if (!electric) return;
|
||||
await fetchConnectors(electric.db);
|
||||
}, [electric]);
|
||||
|
||||
return { connectors, loading, error, refreshConnectors }
|
||||
return { connectors, loading, error, refreshConnectors };
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,190 +1,201 @@
|
|||
"use client"
|
||||
"use client";
|
||||
|
||||
import { useEffect, useState, useRef, useMemo } from 'react'
|
||||
import { initElectric, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
|
||||
import { useEffect, useState, useRef, useMemo } from "react";
|
||||
import { initElectric, type ElectricClient, type SyncHandle } from "@/lib/electric/client";
|
||||
|
||||
interface Document {
|
||||
id: number
|
||||
search_space_id: number
|
||||
document_type: string
|
||||
created_at: string
|
||||
id: number;
|
||||
search_space_id: number;
|
||||
document_type: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export function useDocumentsElectric(searchSpaceId: number | string | null) {
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null)
|
||||
const [documents, setDocuments] = useState<Document[]>([])
|
||||
const [loading, setLoading] = useState(true)
|
||||
const [error, setError] = useState<Error | null>(null)
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null)
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null)
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null);
|
||||
const [documents, setDocuments] = useState<Document[]>([]);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [error, setError] = useState<Error | null>(null);
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
|
||||
// Calculate document type counts from synced documents
|
||||
const documentTypeCounts = useMemo(() => {
|
||||
if (!documents.length) return {}
|
||||
|
||||
const counts: Record<string, number> = {}
|
||||
if (!documents.length) return {};
|
||||
|
||||
const counts: Record<string, number> = {};
|
||||
for (const doc of documents) {
|
||||
counts[doc.document_type] = (counts[doc.document_type] || 0) + 1
|
||||
counts[doc.document_type] = (counts[doc.document_type] || 0) + 1;
|
||||
}
|
||||
return counts
|
||||
}, [documents])
|
||||
return counts;
|
||||
}, [documents]);
|
||||
|
||||
// Initialize Electric SQL and start syncing with real-time updates
|
||||
useEffect(() => {
|
||||
if (!searchSpaceId) {
|
||||
setLoading(false)
|
||||
setDocuments([])
|
||||
return
|
||||
setLoading(false);
|
||||
setDocuments([]);
|
||||
return;
|
||||
}
|
||||
|
||||
let mounted = true
|
||||
let mounted = true;
|
||||
|
||||
async function init() {
|
||||
try {
|
||||
const electricClient = await initElectric()
|
||||
if (!mounted) return
|
||||
const electricClient = await initElectric();
|
||||
if (!mounted) return;
|
||||
|
||||
setElectric(electricClient)
|
||||
setElectric(electricClient);
|
||||
|
||||
// Start syncing documents for this search space via Electric SQL
|
||||
// Only sync id, document_type, search_space_id columns for efficiency
|
||||
console.log('Starting Electric SQL sync for documents, search_space_id:', searchSpaceId)
|
||||
|
||||
console.log("Starting Electric SQL sync for documents, search_space_id:", searchSpaceId);
|
||||
|
||||
const handle = await electricClient.syncShape({
|
||||
table: 'documents',
|
||||
table: "documents",
|
||||
where: `search_space_id = ${searchSpaceId}`,
|
||||
columns: ['id', 'document_type', 'search_space_id', 'created_at'],
|
||||
primaryKey: ['id'],
|
||||
})
|
||||
|
||||
console.log('Electric SQL sync started for documents:', {
|
||||
columns: ["id", "document_type", "search_space_id", "created_at"],
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
console.log("Electric SQL sync started for documents:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
hasInitialSyncPromise: !!handle.initialSyncPromise,
|
||||
})
|
||||
|
||||
});
|
||||
|
||||
// Optimized: Check if already up-to-date before waiting
|
||||
if (handle.isUpToDate) {
|
||||
console.log('Documents sync already up-to-date, skipping wait')
|
||||
console.log("Documents sync already up-to-date, skipping wait");
|
||||
} else if (handle.initialSyncPromise) {
|
||||
// Only wait if not already up-to-date
|
||||
console.log('Waiting for initial documents sync to complete...')
|
||||
console.log("Waiting for initial documents sync to complete...");
|
||||
try {
|
||||
// Use Promise.race with a shorter timeout to avoid long waits
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
])
|
||||
console.log('Initial documents sync promise resolved or timed out, checking status:', {
|
||||
new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
]);
|
||||
console.log("Initial documents sync promise resolved or timed out, checking status:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
})
|
||||
});
|
||||
} catch (syncErr) {
|
||||
console.error('Initial documents sync failed:', syncErr)
|
||||
console.error("Initial documents sync failed:", syncErr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Check status after waiting
|
||||
console.log('Documents sync status after waiting:', {
|
||||
console.log("Documents sync status after waiting:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
})
|
||||
});
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe()
|
||||
return
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle
|
||||
setLoading(false)
|
||||
setError(null)
|
||||
syncHandleRef.current = handle;
|
||||
setLoading(false);
|
||||
setError(null);
|
||||
|
||||
// Fetch documents after sync is complete (we already waited above)
|
||||
await fetchDocuments(electricClient.db)
|
||||
await fetchDocuments(electricClient.db);
|
||||
|
||||
// Set up real-time updates using PGlite live queries
|
||||
// Electric SQL syncs data to PGlite in real-time via HTTP streaming
|
||||
// PGlite live queries detect when the synced data changes and trigger callbacks
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db = electricClient.db as any
|
||||
|
||||
const db = electricClient.db as any;
|
||||
|
||||
// Use PGlite's live query API for real-time updates
|
||||
// CORRECT API: await db.live.query() then use .subscribe()
|
||||
if (db.live?.query && typeof db.live.query === 'function') {
|
||||
if (db.live?.query && typeof db.live.query === "function") {
|
||||
// IMPORTANT: db.live.query() returns a Promise - must await it!
|
||||
const liveQuery = await db.live.query(
|
||||
`SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`,
|
||||
[searchSpaceId]
|
||||
)
|
||||
|
||||
);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.()
|
||||
return
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Set initial results immediately from the resolved query
|
||||
if (liveQuery.initialResults?.rows) {
|
||||
console.log('📋 Initial live query results for documents:', liveQuery.initialResults.rows.length)
|
||||
setDocuments(liveQuery.initialResults.rows)
|
||||
console.log(
|
||||
"📋 Initial live query results for documents:",
|
||||
liveQuery.initialResults.rows.length
|
||||
);
|
||||
setDocuments(liveQuery.initialResults.rows);
|
||||
} else if (liveQuery.rows) {
|
||||
// Some versions have rows directly on the result
|
||||
console.log('📋 Initial live query results for documents (direct):', liveQuery.rows.length)
|
||||
setDocuments(liveQuery.rows)
|
||||
console.log(
|
||||
"📋 Initial live query results for documents (direct):",
|
||||
liveQuery.rows.length
|
||||
);
|
||||
setDocuments(liveQuery.rows);
|
||||
}
|
||||
|
||||
|
||||
// Subscribe to changes - this is the correct API!
|
||||
// The callback fires automatically when Electric SQL syncs new data to PGlite
|
||||
if (typeof liveQuery.subscribe === 'function') {
|
||||
if (typeof liveQuery.subscribe === "function") {
|
||||
liveQuery.subscribe((result: { rows: Document[] }) => {
|
||||
if (mounted && result.rows) {
|
||||
console.log('🔄 Documents updated via live query:', result.rows.length)
|
||||
setDocuments(result.rows)
|
||||
console.log("🔄 Documents updated via live query:", result.rows.length);
|
||||
setDocuments(result.rows);
|
||||
}
|
||||
})
|
||||
|
||||
});
|
||||
|
||||
// Store unsubscribe function for cleanup
|
||||
liveQueryRef.current = liveQuery
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
} else {
|
||||
console.warn('PGlite live query API not available for documents, falling back to polling')
|
||||
console.warn(
|
||||
"PGlite live query API not available for documents, falling back to polling"
|
||||
);
|
||||
}
|
||||
} catch (liveQueryErr) {
|
||||
console.error('Failed to set up live query for documents:', liveQueryErr)
|
||||
console.error("Failed to set up live query for documents:", liveQueryErr);
|
||||
// Don't fail completely - we still have the initial fetch
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to initialize Electric SQL for documents:', err)
|
||||
console.error("Failed to initialize Electric SQL for documents:", err);
|
||||
if (mounted) {
|
||||
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL for documents'))
|
||||
setLoading(false)
|
||||
setError(
|
||||
err instanceof Error
|
||||
? err
|
||||
: new Error("Failed to initialize Electric SQL for documents")
|
||||
);
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
init()
|
||||
init();
|
||||
|
||||
return () => {
|
||||
mounted = false
|
||||
syncHandleRef.current?.unsubscribe?.()
|
||||
liveQueryRef.current?.unsubscribe?.()
|
||||
syncHandleRef.current = null
|
||||
liveQueryRef.current = null
|
||||
}
|
||||
}, [searchSpaceId])
|
||||
mounted = false;
|
||||
syncHandleRef.current?.unsubscribe?.();
|
||||
liveQueryRef.current?.unsubscribe?.();
|
||||
syncHandleRef.current = null;
|
||||
liveQueryRef.current = null;
|
||||
};
|
||||
}, [searchSpaceId]);
|
||||
|
||||
async function fetchDocuments(db: any) {
|
||||
try {
|
||||
const result = await db.query(
|
||||
`SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`,
|
||||
[searchSpaceId]
|
||||
)
|
||||
console.log('📋 Fetched documents from PGlite:', result.rows?.length || 0)
|
||||
setDocuments(result.rows || [])
|
||||
);
|
||||
console.log("📋 Fetched documents from PGlite:", result.rows?.length || 0);
|
||||
setDocuments(result.rows || []);
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch documents from PGlite:', err)
|
||||
console.error("Failed to fetch documents from PGlite:", err);
|
||||
}
|
||||
}
|
||||
|
||||
return { documentTypeCounts, loading, error }
|
||||
return { documentTypeCounts, loading, error };
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,211 +1,226 @@
|
|||
"use client"
|
||||
"use client";
|
||||
|
||||
import { useEffect, useState, useCallback, useRef } from 'react'
|
||||
import { initElectric, isElectricInitialized, type ElectricClient, type SyncHandle } from '@/lib/electric/client'
|
||||
import type { Notification } from '@/contracts/types/notification.types'
|
||||
import { useEffect, useState, useCallback, useRef } from "react";
|
||||
import {
|
||||
initElectric,
|
||||
isElectricInitialized,
|
||||
type ElectricClient,
|
||||
type SyncHandle,
|
||||
} from "@/lib/electric/client";
|
||||
import type { Notification } from "@/contracts/types/notification.types";
|
||||
|
||||
export type { Notification } from '@/contracts/types/notification.types'
|
||||
export type { Notification } from "@/contracts/types/notification.types";
|
||||
|
||||
export function useNotifications(userId: string | null) {
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null)
|
||||
const [notifications, setNotifications] = useState<Notification[]>([])
|
||||
const [loading, setLoading] = useState(true)
|
||||
const [error, setError] = useState<Error | null>(null)
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null)
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null)
|
||||
const [electric, setElectric] = useState<ElectricClient | null>(null);
|
||||
const [notifications, setNotifications] = useState<Notification[]>([]);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [error, setError] = useState<Error | null>(null);
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
// Use ref instead of state to track initialization - prevents cleanup from running when set
|
||||
const initializedRef = useRef(false)
|
||||
const initializedRef = useRef(false);
|
||||
|
||||
// Initialize Electric SQL and start syncing with real-time updates
|
||||
useEffect(() => {
|
||||
// Use ref to prevent re-initialization without triggering cleanup
|
||||
if (!userId || initializedRef.current) return
|
||||
initializedRef.current = true
|
||||
if (!userId || initializedRef.current) return;
|
||||
initializedRef.current = true;
|
||||
|
||||
let mounted = true
|
||||
let mounted = true;
|
||||
|
||||
async function init() {
|
||||
try {
|
||||
const electricClient = await initElectric()
|
||||
if (!mounted) return
|
||||
const electricClient = await initElectric();
|
||||
if (!mounted) return;
|
||||
|
||||
setElectric(electricClient)
|
||||
setElectric(electricClient);
|
||||
|
||||
// Start syncing notifications for this user via Electric SQL
|
||||
// Note: user_id is stored as TEXT in PGlite (UUID from backend is converted)
|
||||
console.log('Starting Electric SQL sync for user:', userId)
|
||||
|
||||
console.log("Starting Electric SQL sync for user:", userId);
|
||||
|
||||
// Use string format for WHERE clause (PGlite sync plugin expects this format)
|
||||
// The user_id is a UUID string, so we need to quote it properly
|
||||
const handle = await electricClient.syncShape({
|
||||
table: 'notifications',
|
||||
table: "notifications",
|
||||
where: `user_id = '${userId}'`,
|
||||
primaryKey: ['id'],
|
||||
})
|
||||
|
||||
console.log('Electric SQL sync started:', {
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
console.log("Electric SQL sync started:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
hasInitialSyncPromise: !!handle.initialSyncPromise,
|
||||
})
|
||||
|
||||
});
|
||||
|
||||
// Optimized: Check if already up-to-date before waiting
|
||||
if (handle.isUpToDate) {
|
||||
console.log('Sync already up-to-date, skipping wait')
|
||||
console.log("Sync already up-to-date, skipping wait");
|
||||
} else if (handle.initialSyncPromise) {
|
||||
// Only wait if not already up-to-date
|
||||
console.log('Waiting for initial sync to complete...')
|
||||
console.log("Waiting for initial sync to complete...");
|
||||
try {
|
||||
// Use Promise.race with a shorter timeout to avoid long waits
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
])
|
||||
console.log('Initial sync promise resolved or timed out, checking status:', {
|
||||
new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait
|
||||
]);
|
||||
console.log("Initial sync promise resolved or timed out, checking status:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
})
|
||||
});
|
||||
} catch (syncErr) {
|
||||
console.error('Initial sync failed:', syncErr)
|
||||
console.error("Initial sync failed:", syncErr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Check status after waiting
|
||||
console.log('Sync status after waiting:', {
|
||||
console.log("Sync status after waiting:", {
|
||||
isUpToDate: handle.isUpToDate,
|
||||
hasStream: !!handle.stream,
|
||||
})
|
||||
});
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe()
|
||||
return
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle
|
||||
setLoading(false)
|
||||
setError(null)
|
||||
syncHandleRef.current = handle;
|
||||
setLoading(false);
|
||||
setError(null);
|
||||
|
||||
// Fetch notifications after sync is complete (we already waited above)
|
||||
await fetchNotifications(electricClient.db)
|
||||
await fetchNotifications(electricClient.db);
|
||||
|
||||
// Set up real-time updates using PGlite live queries
|
||||
// Electric SQL syncs data to PGlite in real-time via HTTP streaming
|
||||
// PGlite live queries detect when the synced data changes and trigger callbacks
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const db = electricClient.db as any
|
||||
|
||||
const db = electricClient.db as any;
|
||||
|
||||
// Use PGlite's live query API for real-time updates
|
||||
// CORRECT API: await db.live.query() then use .subscribe()
|
||||
if (db.live?.query && typeof db.live.query === 'function') {
|
||||
if (db.live?.query && typeof db.live.query === "function") {
|
||||
// IMPORTANT: db.live.query() returns a Promise - must await it!
|
||||
const liveQuery = await db.live.query(
|
||||
`SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`,
|
||||
[userId]
|
||||
)
|
||||
|
||||
);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.()
|
||||
return
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Set initial results immediately from the resolved query
|
||||
if (liveQuery.initialResults?.rows) {
|
||||
console.log('📋 Initial live query results:', liveQuery.initialResults.rows.length)
|
||||
setNotifications(liveQuery.initialResults.rows)
|
||||
console.log("📋 Initial live query results:", liveQuery.initialResults.rows.length);
|
||||
setNotifications(liveQuery.initialResults.rows);
|
||||
} else if (liveQuery.rows) {
|
||||
// Some versions have rows directly on the result
|
||||
console.log('📋 Initial live query results (direct):', liveQuery.rows.length)
|
||||
setNotifications(liveQuery.rows)
|
||||
console.log("📋 Initial live query results (direct):", liveQuery.rows.length);
|
||||
setNotifications(liveQuery.rows);
|
||||
}
|
||||
|
||||
|
||||
// Subscribe to changes - this is the correct API!
|
||||
// The callback fires automatically when Electric SQL syncs new data to PGlite
|
||||
if (typeof liveQuery.subscribe === 'function') {
|
||||
if (typeof liveQuery.subscribe === "function") {
|
||||
liveQuery.subscribe((result: { rows: Notification[] }) => {
|
||||
console.log('🔔 Live query update received:', result.rows?.length || 0, 'notifications')
|
||||
console.log(
|
||||
"🔔 Live query update received:",
|
||||
result.rows?.length || 0,
|
||||
"notifications"
|
||||
);
|
||||
if (mounted && result.rows) {
|
||||
setNotifications(result.rows)
|
||||
setNotifications(result.rows);
|
||||
}
|
||||
})
|
||||
console.log('✅ Real-time notifications enabled via PGlite live queries')
|
||||
});
|
||||
console.log("✅ Real-time notifications enabled via PGlite live queries");
|
||||
} else {
|
||||
console.warn('⚠️ Live query subscribe method not available')
|
||||
console.warn("⚠️ Live query subscribe method not available");
|
||||
}
|
||||
|
||||
|
||||
// Store for cleanup
|
||||
if (typeof liveQuery.unsubscribe === 'function') {
|
||||
liveQueryRef.current = liveQuery
|
||||
if (typeof liveQuery.unsubscribe === "function") {
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
} else {
|
||||
console.error('❌ PGlite live queries not available - db.live.query is not a function')
|
||||
console.log('db.live:', db.live)
|
||||
console.error("❌ PGlite live queries not available - db.live.query is not a function");
|
||||
console.log("db.live:", db.live);
|
||||
}
|
||||
} catch (liveErr) {
|
||||
console.error('❌ Failed to set up real-time updates:', liveErr)
|
||||
console.error("❌ Failed to set up real-time updates:", liveErr);
|
||||
}
|
||||
} catch (err) {
|
||||
if (!mounted) return
|
||||
console.error('Failed to initialize Electric SQL:', err)
|
||||
setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL'))
|
||||
if (!mounted) return;
|
||||
console.error("Failed to initialize Electric SQL:", err);
|
||||
setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL"));
|
||||
// Still mark as loaded so the UI doesn't block
|
||||
setLoading(false)
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchNotifications(db: InstanceType<typeof import('@electric-sql/pglite').PGlite>) {
|
||||
async function fetchNotifications(
|
||||
db: InstanceType<typeof import("@electric-sql/pglite").PGlite>
|
||||
) {
|
||||
try {
|
||||
// Debug: Check all notifications first
|
||||
const allNotifications = await db.query<Notification>(
|
||||
`SELECT * FROM notifications ORDER BY created_at DESC`
|
||||
)
|
||||
console.log('All notifications in PGlite:', allNotifications.rows?.length || 0, allNotifications.rows)
|
||||
|
||||
);
|
||||
console.log(
|
||||
"All notifications in PGlite:",
|
||||
allNotifications.rows?.length || 0,
|
||||
allNotifications.rows
|
||||
);
|
||||
|
||||
// Use PGlite's query method (not exec for SELECT queries)
|
||||
const result = await db.query<Notification>(
|
||||
`SELECT * FROM notifications
|
||||
WHERE user_id = $1
|
||||
ORDER BY created_at DESC`,
|
||||
[userId]
|
||||
)
|
||||
console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows)
|
||||
|
||||
);
|
||||
console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows);
|
||||
|
||||
if (mounted) {
|
||||
// PGlite query returns { rows: [] } format
|
||||
setNotifications(result.rows || [])
|
||||
setNotifications(result.rows || []);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch notifications:', err)
|
||||
console.error("Failed to fetch notifications:", err);
|
||||
// Log more details for debugging
|
||||
console.error('Error details:', err)
|
||||
console.error("Error details:", err);
|
||||
}
|
||||
}
|
||||
|
||||
init()
|
||||
init();
|
||||
|
||||
return () => {
|
||||
mounted = false
|
||||
mounted = false;
|
||||
// Reset initialization state so we can reinitialize with a new userId
|
||||
initializedRef.current = false
|
||||
setLoading(true)
|
||||
initializedRef.current = false;
|
||||
setLoading(true);
|
||||
if (syncHandleRef.current) {
|
||||
syncHandleRef.current.unsubscribe()
|
||||
syncHandleRef.current = null
|
||||
syncHandleRef.current.unsubscribe();
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
if (liveQueryRef.current) {
|
||||
liveQueryRef.current.unsubscribe()
|
||||
liveQueryRef.current = null
|
||||
liveQueryRef.current.unsubscribe();
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
}
|
||||
// Only depend on userId - using ref for initialization tracking to prevent cleanup issues
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [userId])
|
||||
};
|
||||
// Only depend on userId - using ref for initialization tracking to prevent cleanup issues
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [userId]);
|
||||
|
||||
// Mark notification as read (local only - needs backend sync)
|
||||
const markAsRead = useCallback(
|
||||
async (notificationId: number) => {
|
||||
if (!electric || !isElectricInitialized()) {
|
||||
console.warn('Electric SQL not initialized')
|
||||
return false
|
||||
console.warn("Electric SQL not initialized");
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -213,46 +228,46 @@ export function useNotifications(userId: string | null) {
|
|||
await electric.db.query(
|
||||
`UPDATE notifications SET read = true, updated_at = NOW() WHERE id = $1`,
|
||||
[notificationId]
|
||||
)
|
||||
);
|
||||
|
||||
// Update local state
|
||||
setNotifications(prev =>
|
||||
prev.map(n => n.id === notificationId ? { ...n, read: true } : n)
|
||||
)
|
||||
setNotifications((prev) =>
|
||||
prev.map((n) => (n.id === notificationId ? { ...n, read: true } : n))
|
||||
);
|
||||
|
||||
// TODO: Also send to backend to persist the change
|
||||
// This could be done via a REST API call
|
||||
|
||||
return true
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error('Failed to mark notification as read:', err)
|
||||
return false
|
||||
console.error("Failed to mark notification as read:", err);
|
||||
return false;
|
||||
}
|
||||
},
|
||||
[electric]
|
||||
)
|
||||
);
|
||||
|
||||
// Mark all notifications as read
|
||||
const markAllAsRead = useCallback(async () => {
|
||||
if (!electric || !isElectricInitialized()) {
|
||||
console.warn('Electric SQL not initialized')
|
||||
return false
|
||||
console.warn("Electric SQL not initialized");
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const unread = notifications.filter(n => !n.read)
|
||||
const unread = notifications.filter((n) => !n.read);
|
||||
for (const notification of unread) {
|
||||
await markAsRead(notification.id)
|
||||
await markAsRead(notification.id);
|
||||
}
|
||||
return true
|
||||
return true;
|
||||
} catch (err) {
|
||||
console.error('Failed to mark all notifications as read:', err)
|
||||
return false
|
||||
console.error("Failed to mark all notifications as read:", err);
|
||||
return false;
|
||||
}
|
||||
}, [electric, notifications, markAsRead])
|
||||
}, [electric, notifications, markAsRead]);
|
||||
|
||||
// Get unread count
|
||||
const unreadCount = notifications.filter(n => !n.read).length
|
||||
const unreadCount = notifications.filter((n) => !n.read).length;
|
||||
|
||||
return {
|
||||
notifications,
|
||||
|
|
@ -261,5 +276,5 @@ export function useNotifications(userId: string | null) {
|
|||
markAllAsRead,
|
||||
loading,
|
||||
error,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,17 +5,16 @@
|
|||
|
||||
export async function getElectricAuthToken(): Promise<string> {
|
||||
// For insecure mode (development), return empty string
|
||||
if (process.env.NEXT_PUBLIC_ELECTRIC_AUTH_MODE === 'insecure') {
|
||||
return ''
|
||||
if (process.env.NEXT_PUBLIC_ELECTRIC_AUTH_MODE === "insecure") {
|
||||
return "";
|
||||
}
|
||||
|
||||
// In production, get token from your auth system
|
||||
// This should match your backend auth token
|
||||
if (typeof window !== 'undefined') {
|
||||
const token = localStorage.getItem('surfsense_bearer_token')
|
||||
return token || ''
|
||||
if (typeof window !== "undefined") {
|
||||
const token = localStorage.getItem("surfsense_bearer_token");
|
||||
return token || "";
|
||||
}
|
||||
|
||||
return ''
|
||||
return "";
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,53 +1,53 @@
|
|||
/**
|
||||
* Electric SQL client setup for ElectricSQL 1.x with PGlite
|
||||
*
|
||||
*
|
||||
* This uses the new ElectricSQL 1.x architecture:
|
||||
* - PGlite: In-browser PostgreSQL database (local storage)
|
||||
* - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite
|
||||
* - @electric-sql/client: HTTP client for subscribing to shapes
|
||||
*/
|
||||
|
||||
import { PGlite } from '@electric-sql/pglite'
|
||||
import { electricSync } from '@electric-sql/pglite-sync'
|
||||
import { live } from '@electric-sql/pglite/live'
|
||||
import { PGlite } from "@electric-sql/pglite";
|
||||
import { electricSync } from "@electric-sql/pglite-sync";
|
||||
import { live } from "@electric-sql/pglite/live";
|
||||
|
||||
// Types
|
||||
export interface ElectricClient {
|
||||
db: PGlite
|
||||
syncShape: (options: SyncShapeOptions) => Promise<SyncHandle>
|
||||
db: PGlite;
|
||||
syncShape: (options: SyncShapeOptions) => Promise<SyncHandle>;
|
||||
}
|
||||
|
||||
export interface SyncShapeOptions {
|
||||
table: string
|
||||
where?: string
|
||||
columns?: string[]
|
||||
primaryKey?: string[]
|
||||
table: string;
|
||||
where?: string;
|
||||
columns?: string[];
|
||||
primaryKey?: string[];
|
||||
}
|
||||
|
||||
export interface SyncHandle {
|
||||
unsubscribe: () => void
|
||||
readonly isUpToDate: boolean
|
||||
unsubscribe: () => void;
|
||||
readonly isUpToDate: boolean;
|
||||
// The stream property contains the ShapeStreamInterface from pglite-sync
|
||||
stream?: unknown
|
||||
stream?: unknown;
|
||||
// Promise that resolves when initial sync is complete
|
||||
initialSyncPromise?: Promise<void>
|
||||
initialSyncPromise?: Promise<void>;
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
let electricClient: ElectricClient | null = null
|
||||
let isInitializing = false
|
||||
let initPromise: Promise<ElectricClient> | null = null
|
||||
let electricClient: ElectricClient | null = null;
|
||||
let isInitializing = false;
|
||||
let initPromise: Promise<ElectricClient> | null = null;
|
||||
|
||||
// Version for sync state - increment this to force fresh sync when Electric config changes
|
||||
// Incremented to v4 to fix sync completion issues
|
||||
const SYNC_VERSION = 4
|
||||
const SYNC_VERSION = 4;
|
||||
|
||||
// Get Electric URL from environment
|
||||
function getElectricUrl(): string {
|
||||
if (typeof window !== 'undefined') {
|
||||
return process.env.NEXT_PUBLIC_ELECTRIC_URL || 'http://localhost:5133'
|
||||
if (typeof window !== "undefined") {
|
||||
return process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133";
|
||||
}
|
||||
return 'http://localhost:5133'
|
||||
return "http://localhost:5133";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -55,14 +55,14 @@ function getElectricUrl(): string {
|
|||
*/
|
||||
export async function initElectric(): Promise<ElectricClient> {
|
||||
if (electricClient) {
|
||||
return electricClient
|
||||
return electricClient;
|
||||
}
|
||||
|
||||
if (isInitializing && initPromise) {
|
||||
return initPromise
|
||||
return initPromise;
|
||||
}
|
||||
|
||||
isInitializing = true
|
||||
isInitializing = true;
|
||||
initPromise = (async () => {
|
||||
try {
|
||||
// Create PGlite instance with Electric sync plugin and live queries
|
||||
|
|
@ -75,7 +75,7 @@ export async function initElectric(): Promise<ElectricClient> {
|
|||
electric: electricSync({ debug: true }),
|
||||
live, // Enable live queries for real-time updates
|
||||
},
|
||||
})
|
||||
});
|
||||
|
||||
// Create the notifications table schema in PGlite
|
||||
// This matches the backend schema
|
||||
|
|
@ -95,7 +95,7 @@ export async function initElectric(): Promise<ElectricClient> {
|
|||
|
||||
CREATE INDEX IF NOT EXISTS idx_notifications_user_id ON notifications(user_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(read);
|
||||
`)
|
||||
`);
|
||||
|
||||
// Create the search_source_connectors table schema in PGlite
|
||||
// This matches the backend schema
|
||||
|
|
@ -118,7 +118,7 @@ export async function initElectric(): Promise<ElectricClient> {
|
|||
CREATE INDEX IF NOT EXISTS idx_connectors_search_space_id ON search_source_connectors(search_space_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_connectors_type ON search_source_connectors(connector_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_connectors_user_id ON search_source_connectors(user_id);
|
||||
`)
|
||||
`);
|
||||
|
||||
// Create the documents table schema in PGlite
|
||||
// Only sync minimal fields needed for type counts: id, document_type, search_space_id
|
||||
|
|
@ -133,292 +133,309 @@ export async function initElectric(): Promise<ElectricClient> {
|
|||
CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents(search_space_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_documents_type ON documents(document_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type);
|
||||
`)
|
||||
`);
|
||||
|
||||
const electricUrl = getElectricUrl()
|
||||
const electricUrl = getElectricUrl();
|
||||
|
||||
// Create the client wrapper
|
||||
electricClient = {
|
||||
db,
|
||||
syncShape: async (options: SyncShapeOptions): Promise<SyncHandle> => {
|
||||
const { table, where, columns, primaryKey = ['id'] } = options
|
||||
const { table, where, columns, primaryKey = ["id"] } = options;
|
||||
|
||||
// Build params for the shape request
|
||||
// Electric SQL expects params as URL query parameters
|
||||
const params: Record<string, string> = { table }
|
||||
|
||||
// Validate and fix WHERE clause to ensure string literals are properly quoted
|
||||
let validatedWhere = where
|
||||
if (where) {
|
||||
// Check if where uses positional parameters
|
||||
if (where.includes('$1')) {
|
||||
// Extract the value from the where clause if it's embedded
|
||||
// For now, we'll use the where clause as-is and let Electric handle it
|
||||
params.where = where
|
||||
validatedWhere = where
|
||||
} else {
|
||||
// Validate that string literals are properly quoted
|
||||
// Count single quotes - should be even (pairs) for properly quoted strings
|
||||
const singleQuoteCount = (where.match(/'/g) || []).length
|
||||
|
||||
if (singleQuoteCount % 2 !== 0) {
|
||||
// Odd number of quotes means unterminated string literal
|
||||
console.warn('Where clause has unmatched quotes, fixing:', where)
|
||||
// Add closing quote at the end
|
||||
validatedWhere = `${where}'`
|
||||
params.where = validatedWhere
|
||||
// Build params for the shape request
|
||||
// Electric SQL expects params as URL query parameters
|
||||
const params: Record<string, string> = { table };
|
||||
|
||||
// Validate and fix WHERE clause to ensure string literals are properly quoted
|
||||
let validatedWhere = where;
|
||||
if (where) {
|
||||
// Check if where uses positional parameters
|
||||
if (where.includes("$1")) {
|
||||
// Extract the value from the where clause if it's embedded
|
||||
// For now, we'll use the where clause as-is and let Electric handle it
|
||||
params.where = where;
|
||||
validatedWhere = where;
|
||||
} else {
|
||||
// Use the where clause directly (already formatted)
|
||||
params.where = where
|
||||
validatedWhere = where
|
||||
// Validate that string literals are properly quoted
|
||||
// Count single quotes - should be even (pairs) for properly quoted strings
|
||||
const singleQuoteCount = (where.match(/'/g) || []).length;
|
||||
|
||||
if (singleQuoteCount % 2 !== 0) {
|
||||
// Odd number of quotes means unterminated string literal
|
||||
console.warn("Where clause has unmatched quotes, fixing:", where);
|
||||
// Add closing quote at the end
|
||||
validatedWhere = `${where}'`;
|
||||
params.where = validatedWhere;
|
||||
} else {
|
||||
// Use the where clause directly (already formatted)
|
||||
params.where = where;
|
||||
validatedWhere = where;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (columns) params.columns = columns.join(',')
|
||||
|
||||
console.log('Syncing shape with params:', params)
|
||||
console.log('Electric URL:', `${electricUrl}/v1/shape`)
|
||||
console.log('Where clause:', where, 'Validated:', validatedWhere)
|
||||
if (columns) params.columns = columns.join(",");
|
||||
|
||||
console.log("Syncing shape with params:", params);
|
||||
console.log("Electric URL:", `${electricUrl}/v1/shape`);
|
||||
console.log("Where clause:", where, "Validated:", validatedWhere);
|
||||
|
||||
try {
|
||||
// Debug: Test Electric SQL connection directly first
|
||||
// Use validatedWhere to ensure proper URL encoding
|
||||
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ''}`
|
||||
console.log('Testing Electric SQL directly:', testUrl)
|
||||
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`;
|
||||
console.log("Testing Electric SQL directly:", testUrl);
|
||||
try {
|
||||
const testResponse = await fetch(testUrl)
|
||||
const testResponse = await fetch(testUrl);
|
||||
const testHeaders = {
|
||||
handle: testResponse.headers.get('electric-handle'),
|
||||
offset: testResponse.headers.get('electric-offset'),
|
||||
upToDate: testResponse.headers.get('electric-up-to-date'),
|
||||
}
|
||||
console.log('Direct Electric SQL response headers:', testHeaders)
|
||||
const testData = await testResponse.json()
|
||||
console.log('Direct Electric SQL data count:', Array.isArray(testData) ? testData.length : 'not array', testData)
|
||||
handle: testResponse.headers.get("electric-handle"),
|
||||
offset: testResponse.headers.get("electric-offset"),
|
||||
upToDate: testResponse.headers.get("electric-up-to-date"),
|
||||
};
|
||||
console.log("Direct Electric SQL response headers:", testHeaders);
|
||||
const testData = await testResponse.json();
|
||||
console.log(
|
||||
"Direct Electric SQL data count:",
|
||||
Array.isArray(testData) ? testData.length : "not array",
|
||||
testData
|
||||
);
|
||||
} catch (testErr) {
|
||||
console.error('Direct Electric SQL test failed:', testErr)
|
||||
console.error("Direct Electric SQL test failed:", testErr);
|
||||
}
|
||||
|
||||
// Use PGlite's electric sync plugin to sync the shape
|
||||
// According to Electric SQL docs, the shape config uses params for table, where, columns
|
||||
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
|
||||
|
||||
// Create a promise that resolves when initial sync is complete
|
||||
// Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout
|
||||
// IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates
|
||||
let resolveInitialSync: () => void
|
||||
let rejectInitialSync: (error: Error) => void
|
||||
let syncResolved = false
|
||||
|
||||
const initialSyncPromise = new Promise<void>((resolve, reject) => {
|
||||
resolveInitialSync = () => {
|
||||
if (!syncResolved) {
|
||||
syncResolved = true
|
||||
// DON'T unsubscribe from stream - it needs to stay active for real-time updates
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
rejectInitialSync = (error: Error) => {
|
||||
if (!syncResolved) {
|
||||
syncResolved = true
|
||||
// DON'T unsubscribe from stream even on error - let Electric handle it
|
||||
reject(error)
|
||||
}
|
||||
}
|
||||
|
||||
// Shorter timeout (5 seconds) as fallback
|
||||
const timeoutId = setTimeout(() => {
|
||||
if (!syncResolved) {
|
||||
console.warn(`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`)
|
||||
// Check isUpToDate one more time before resolving
|
||||
// This will be checked after shape is created
|
||||
setTimeout(() => {
|
||||
if (!syncResolved) {
|
||||
console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`)
|
||||
resolveInitialSync()
|
||||
}
|
||||
}, 100)
|
||||
}
|
||||
}, 5000)
|
||||
|
||||
// Store timeout ID for cleanup if needed
|
||||
// Note: timeout will be cleared if sync completes early
|
||||
})
|
||||
|
||||
const shapeConfig = {
|
||||
shape: {
|
||||
url: `${electricUrl}/v1/shape`,
|
||||
params: {
|
||||
table,
|
||||
...(validatedWhere ? { where: validatedWhere } : {}),
|
||||
...(columns ? { columns: columns.join(',') } : {}),
|
||||
// According to Electric SQL docs, the shape config uses params for table, where, columns
|
||||
// Note: mapColumns is OPTIONAL per pglite-sync types.ts
|
||||
|
||||
// Create a promise that resolves when initial sync is complete
|
||||
// Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout
|
||||
// IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates
|
||||
let resolveInitialSync: () => void;
|
||||
let rejectInitialSync: (error: Error) => void;
|
||||
let syncResolved = false;
|
||||
|
||||
const initialSyncPromise = new Promise<void>((resolve, reject) => {
|
||||
resolveInitialSync = () => {
|
||||
if (!syncResolved) {
|
||||
syncResolved = true;
|
||||
// DON'T unsubscribe from stream - it needs to stay active for real-time updates
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
rejectInitialSync = (error: Error) => {
|
||||
if (!syncResolved) {
|
||||
syncResolved = true;
|
||||
// DON'T unsubscribe from stream even on error - let Electric handle it
|
||||
reject(error);
|
||||
}
|
||||
};
|
||||
|
||||
// Shorter timeout (5 seconds) as fallback
|
||||
const timeoutId = setTimeout(() => {
|
||||
if (!syncResolved) {
|
||||
console.warn(
|
||||
`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`
|
||||
);
|
||||
// Check isUpToDate one more time before resolving
|
||||
// This will be checked after shape is created
|
||||
setTimeout(() => {
|
||||
if (!syncResolved) {
|
||||
console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`);
|
||||
resolveInitialSync();
|
||||
}
|
||||
}, 100);
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
// Store timeout ID for cleanup if needed
|
||||
// Note: timeout will be cleared if sync completes early
|
||||
});
|
||||
|
||||
const shapeConfig = {
|
||||
shape: {
|
||||
url: `${electricUrl}/v1/shape`,
|
||||
params: {
|
||||
table,
|
||||
...(validatedWhere ? { where: validatedWhere } : {}),
|
||||
...(columns ? { columns: columns.join(",") } : {}),
|
||||
},
|
||||
},
|
||||
},
|
||||
table,
|
||||
primaryKey,
|
||||
shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, '_') || 'all'}`, // Versioned key to force fresh sync when needed
|
||||
onInitialSync: () => {
|
||||
console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`)
|
||||
resolveInitialSync()
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
console.error(`❌ Shape sync error for ${table}:`, error)
|
||||
console.error('Error details:', JSON.stringify(error, Object.getOwnPropertyNames(error)))
|
||||
rejectInitialSync(error)
|
||||
},
|
||||
}
|
||||
|
||||
console.log('syncShapeToTable config:', JSON.stringify(shapeConfig, null, 2))
|
||||
|
||||
// Type assertion to PGlite with electric extension
|
||||
const pgWithElectric = db as PGlite & { electric: { syncShapeToTable: (config: typeof shapeConfig) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }> } }
|
||||
const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig)
|
||||
table,
|
||||
primaryKey,
|
||||
shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // Versioned key to force fresh sync when needed
|
||||
onInitialSync: () => {
|
||||
console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`);
|
||||
resolveInitialSync();
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
console.error(`❌ Shape sync error for ${table}:`, error);
|
||||
console.error(
|
||||
"Error details:",
|
||||
JSON.stringify(error, Object.getOwnPropertyNames(error))
|
||||
);
|
||||
rejectInitialSync(error);
|
||||
},
|
||||
};
|
||||
|
||||
if (!shape) {
|
||||
throw new Error('syncShapeToTable returned undefined')
|
||||
}
|
||||
console.log("syncShapeToTable config:", JSON.stringify(shapeConfig, null, 2));
|
||||
|
||||
// Log the actual shape result structure
|
||||
console.log('Shape sync result (initial):', {
|
||||
hasUnsubscribe: typeof shape?.unsubscribe === 'function',
|
||||
isUpToDate: shape?.isUpToDate,
|
||||
hasStream: !!shape?.stream,
|
||||
streamType: typeof shape?.stream,
|
||||
})
|
||||
|
||||
// Recommended Approach Step 1: Check isUpToDate immediately
|
||||
if (shape.isUpToDate) {
|
||||
console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`)
|
||||
resolveInitialSync()
|
||||
} else {
|
||||
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
|
||||
if (shape?.stream) {
|
||||
const stream = shape.stream as any
|
||||
console.log('Shape stream details:', {
|
||||
shapeHandle: stream?.shapeHandle,
|
||||
lastOffset: stream?.lastOffset,
|
||||
isUpToDate: stream?.isUpToDate,
|
||||
error: stream?.error,
|
||||
hasSubscribe: typeof stream?.subscribe === 'function',
|
||||
hasUnsubscribe: typeof stream?.unsubscribe === 'function',
|
||||
})
|
||||
|
||||
// Subscribe to the stream to watch for "up-to-date" control message
|
||||
// NOTE: We keep this subscription active - don't unsubscribe!
|
||||
// The stream is what Electric SQL uses for real-time updates
|
||||
if (typeof stream?.subscribe === 'function') {
|
||||
console.log('Subscribing to shape stream to watch for up-to-date message...')
|
||||
// Subscribe but don't store unsubscribe - we want it to stay active
|
||||
stream.subscribe((messages: unknown[]) => {
|
||||
// Continue receiving updates even after sync is resolved
|
||||
if (!syncResolved) {
|
||||
console.log('🔵 Shape stream received messages:', messages?.length || 0)
|
||||
}
|
||||
|
||||
// Check if any message indicates sync is complete
|
||||
if (messages && messages.length > 0) {
|
||||
for (const message of messages) {
|
||||
const msg = message as any
|
||||
// Check for "up-to-date" control message
|
||||
if (msg?.headers?.control === 'up-to-date' ||
|
||||
msg?.headers?.electric_up_to_date === 'true' ||
|
||||
(typeof msg === 'object' && 'up-to-date' in msg)) {
|
||||
if (!syncResolved) {
|
||||
console.log(`✅ Received up-to-date message for ${table}`)
|
||||
resolveInitialSync()
|
||||
// Type assertion to PGlite with electric extension
|
||||
const pgWithElectric = db as PGlite & {
|
||||
electric: {
|
||||
syncShapeToTable: (
|
||||
config: typeof shapeConfig
|
||||
) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>;
|
||||
};
|
||||
};
|
||||
const shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
|
||||
|
||||
if (!shape) {
|
||||
throw new Error("syncShapeToTable returned undefined");
|
||||
}
|
||||
|
||||
// Log the actual shape result structure
|
||||
console.log("Shape sync result (initial):", {
|
||||
hasUnsubscribe: typeof shape?.unsubscribe === "function",
|
||||
isUpToDate: shape?.isUpToDate,
|
||||
hasStream: !!shape?.stream,
|
||||
streamType: typeof shape?.stream,
|
||||
});
|
||||
|
||||
// Recommended Approach Step 1: Check isUpToDate immediately
|
||||
if (shape.isUpToDate) {
|
||||
console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`);
|
||||
resolveInitialSync();
|
||||
} else {
|
||||
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
|
||||
if (shape?.stream) {
|
||||
const stream = shape.stream as any;
|
||||
console.log("Shape stream details:", {
|
||||
shapeHandle: stream?.shapeHandle,
|
||||
lastOffset: stream?.lastOffset,
|
||||
isUpToDate: stream?.isUpToDate,
|
||||
error: stream?.error,
|
||||
hasSubscribe: typeof stream?.subscribe === "function",
|
||||
hasUnsubscribe: typeof stream?.unsubscribe === "function",
|
||||
});
|
||||
|
||||
// Subscribe to the stream to watch for "up-to-date" control message
|
||||
// NOTE: We keep this subscription active - don't unsubscribe!
|
||||
// The stream is what Electric SQL uses for real-time updates
|
||||
if (typeof stream?.subscribe === "function") {
|
||||
console.log("Subscribing to shape stream to watch for up-to-date message...");
|
||||
// Subscribe but don't store unsubscribe - we want it to stay active
|
||||
stream.subscribe((messages: unknown[]) => {
|
||||
// Continue receiving updates even after sync is resolved
|
||||
if (!syncResolved) {
|
||||
console.log("🔵 Shape stream received messages:", messages?.length || 0);
|
||||
}
|
||||
|
||||
// Check if any message indicates sync is complete
|
||||
if (messages && messages.length > 0) {
|
||||
for (const message of messages) {
|
||||
const msg = message as any;
|
||||
// Check for "up-to-date" control message
|
||||
if (
|
||||
msg?.headers?.control === "up-to-date" ||
|
||||
msg?.headers?.electric_up_to_date === "true" ||
|
||||
(typeof msg === "object" && "up-to-date" in msg)
|
||||
) {
|
||||
if (!syncResolved) {
|
||||
console.log(`✅ Received up-to-date message for ${table}`);
|
||||
resolveInitialSync();
|
||||
}
|
||||
// Continue listening for real-time updates - don't return!
|
||||
}
|
||||
// Continue listening for real-time updates - don't return!
|
||||
}
|
||||
if (!syncResolved && messages.length > 0) {
|
||||
console.log("First message:", JSON.stringify(messages[0], null, 2));
|
||||
}
|
||||
}
|
||||
if (!syncResolved && messages.length > 0) {
|
||||
console.log('First message:', JSON.stringify(messages[0], null, 2))
|
||||
}
|
||||
}
|
||||
|
||||
// Also check stream's isUpToDate property after receiving messages
|
||||
if (!syncResolved && stream?.isUpToDate) {
|
||||
console.log(`✅ Stream isUpToDate is true for ${table}`)
|
||||
resolveInitialSync()
|
||||
}
|
||||
})
|
||||
|
||||
// Also check stream's isUpToDate property immediately
|
||||
if (stream?.isUpToDate) {
|
||||
console.log(`✅ Stream isUpToDate is true immediately for ${table}`)
|
||||
resolveInitialSync()
|
||||
}
|
||||
}
|
||||
|
||||
// Also poll isUpToDate periodically as a backup (every 200ms)
|
||||
const pollInterval = setInterval(() => {
|
||||
if (syncResolved) {
|
||||
clearInterval(pollInterval)
|
||||
return
|
||||
}
|
||||
|
||||
if (shape.isUpToDate || stream?.isUpToDate) {
|
||||
console.log(`✅ Sync completed (detected via polling) for ${table}`)
|
||||
clearInterval(pollInterval)
|
||||
resolveInitialSync()
|
||||
}
|
||||
}, 200)
|
||||
|
||||
// Clean up polling when promise resolves
|
||||
initialSyncPromise.finally(() => {
|
||||
clearInterval(pollInterval)
|
||||
})
|
||||
} else {
|
||||
console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`)
|
||||
}
|
||||
}
|
||||
|
||||
// Return the shape handle - isUpToDate is a getter that reflects current state
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
console.log('unsubscribing')
|
||||
if (shape && typeof shape.unsubscribe === 'function') {
|
||||
shape.unsubscribe()
|
||||
// Also check stream's isUpToDate property after receiving messages
|
||||
if (!syncResolved && stream?.isUpToDate) {
|
||||
console.log(`✅ Stream isUpToDate is true for ${table}`);
|
||||
resolveInitialSync();
|
||||
}
|
||||
});
|
||||
|
||||
// Also check stream's isUpToDate property immediately
|
||||
if (stream?.isUpToDate) {
|
||||
console.log(`✅ Stream isUpToDate is true immediately for ${table}`);
|
||||
resolveInitialSync();
|
||||
}
|
||||
}
|
||||
|
||||
// Also poll isUpToDate periodically as a backup (every 200ms)
|
||||
const pollInterval = setInterval(() => {
|
||||
if (syncResolved) {
|
||||
clearInterval(pollInterval);
|
||||
return;
|
||||
}
|
||||
|
||||
if (shape.isUpToDate || stream?.isUpToDate) {
|
||||
console.log(`✅ Sync completed (detected via polling) for ${table}`);
|
||||
clearInterval(pollInterval);
|
||||
resolveInitialSync();
|
||||
}
|
||||
}, 200);
|
||||
|
||||
// Clean up polling when promise resolves
|
||||
initialSyncPromise.finally(() => {
|
||||
clearInterval(pollInterval);
|
||||
});
|
||||
} else {
|
||||
console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`);
|
||||
}
|
||||
},
|
||||
// Use getter to always return current state
|
||||
get isUpToDate() {
|
||||
return shape?.isUpToDate ?? false
|
||||
},
|
||||
stream: shape?.stream,
|
||||
initialSyncPromise, // Expose promise so callers can wait for sync
|
||||
}
|
||||
}
|
||||
|
||||
// Return the shape handle - isUpToDate is a getter that reflects current state
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
console.log("unsubscribing");
|
||||
if (shape && typeof shape.unsubscribe === "function") {
|
||||
shape.unsubscribe();
|
||||
}
|
||||
},
|
||||
// Use getter to always return current state
|
||||
get isUpToDate() {
|
||||
return shape?.isUpToDate ?? false;
|
||||
},
|
||||
stream: shape?.stream,
|
||||
initialSyncPromise, // Expose promise so callers can wait for sync
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Failed to sync shape:', error)
|
||||
console.error("Failed to sync shape:", error);
|
||||
// Check if Electric SQL server is reachable
|
||||
try {
|
||||
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
|
||||
method: 'GET',
|
||||
})
|
||||
console.log('Electric SQL server response:', response.status, response.statusText)
|
||||
method: "GET",
|
||||
});
|
||||
console.log("Electric SQL server response:", response.status, response.statusText);
|
||||
if (!response.ok) {
|
||||
console.error('Electric SQL server error:', await response.text())
|
||||
console.error("Electric SQL server error:", await response.text());
|
||||
}
|
||||
} catch (fetchError) {
|
||||
console.error('Cannot reach Electric SQL server:', fetchError)
|
||||
console.error('Make sure Electric SQL is running at:', electricUrl)
|
||||
console.error("Cannot reach Electric SQL server:", fetchError);
|
||||
console.error("Make sure Electric SQL is running at:", electricUrl);
|
||||
}
|
||||
throw error
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
console.log('Electric SQL initialized successfully with PGlite')
|
||||
return electricClient
|
||||
console.log("Electric SQL initialized successfully with PGlite");
|
||||
return electricClient;
|
||||
} catch (error) {
|
||||
console.error('Failed to initialize Electric SQL:', error)
|
||||
throw error
|
||||
console.error("Failed to initialize Electric SQL:", error);
|
||||
throw error;
|
||||
} finally {
|
||||
isInitializing = false
|
||||
isInitializing = false;
|
||||
}
|
||||
})()
|
||||
})();
|
||||
|
||||
return initPromise
|
||||
return initPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -426,21 +443,21 @@ export async function initElectric(): Promise<ElectricClient> {
|
|||
*/
|
||||
export function getElectric(): ElectricClient {
|
||||
if (!electricClient) {
|
||||
throw new Error('Electric not initialized. Call initElectric() first.')
|
||||
throw new Error("Electric not initialized. Call initElectric() first.");
|
||||
}
|
||||
return electricClient
|
||||
return electricClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Electric is initialized
|
||||
*/
|
||||
export function isElectricInitialized(): boolean {
|
||||
return electricClient !== null
|
||||
return electricClient !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the PGlite database instance
|
||||
*/
|
||||
export function getDb(): PGlite | null {
|
||||
return electricClient?.db ?? null
|
||||
return electricClient?.db ?? null;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue