fix: critical timestamp parsing and audit fixes

- Fix timestamp conversion: String(epochMs) → new Date(epochMs).toISOString()
  in use-messages-sync, use-comments-sync, use-documents, use-inbox.
  Without this, date comparisons (isEdited, cutoff filters) would fail.
- Fix updated_at: undefined → null in use-inbox to match InboxItem type
- Fix ZeroProvider: skip Zero connection for unauthenticated users
- Clean 30+ stale "Electric SQL" comments in backend Python code
This commit is contained in:
CREDO23 2026-03-23 19:49:28 +02:00
parent f04ab89418
commit cf21eaacfc
33 changed files with 62 additions and 57 deletions

View file

@ -664,7 +664,7 @@ async def index_composio_gmail(
on_heartbeat_callback=on_heartbeat_callback,
)
# CRITICAL: Always update timestamp so Electric SQL syncs
# CRITICAL: Always update timestamp so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted

View file

@ -255,7 +255,7 @@ async def index_composio_google_calendar(
await task_logger.log_task_success(
log_entry, success_msg, {"events_count": 0}
)
# CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status
# CRITICAL: Update timestamp even when no events found so Zero syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return (
@ -503,7 +503,7 @@ async def index_composio_google_calendar(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -775,7 +775,7 @@ async def index_composio_google_drive(
flag_modified(connector, "config")
logger.info(f"Saved indexing settings hash for connector {connector_id}")
# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
# CRITICAL: Always update timestamp so Zero syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit

View file

@ -712,7 +712,7 @@ class ChatComment(BaseModel, TimestampMixin):
nullable=False,
index=True,
)
# Denormalized thread_id for efficient Electric SQL subscriptions (one per thread)
# Denormalized thread_id for efficient Zero subscriptions (one per thread)
thread_id = Column(
Integer,
ForeignKey("new_chat_threads.id", ondelete="CASCADE"),
@ -782,7 +782,7 @@ class ChatCommentMention(BaseModel, TimestampMixin):
class ChatSessionState(BaseModel):
"""
Tracks real-time session state for shared chat collaboration.
One record per thread, synced via Electric SQL.
One record per thread, synced via Zero.
"""
__tablename__ = "chat_session_state"

View file

@ -80,7 +80,7 @@ router.include_router(model_list_router) # Dynamic LLM model catalogue from Ope
router.include_router(logs_router)
router.include_router(circleback_webhook_router) # Circleback meeting webhooks
router.include_router(surfsense_docs_router) # Surfsense documentation for citations
router.include_router(notifications_router) # Notifications with Electric SQL sync
router.include_router(notifications_router) # Notifications with Zero sync
router.include_router(composio_router) # Composio OAuth and toolkit management
router.include_router(public_chat_router) # Public chat sharing and cloning
router.include_router(incentive_tasks_router) # Incentive tasks for earning free pages

View file

@ -128,7 +128,7 @@ async def create_documents_file_upload(
Upload files as documents with real-time status tracking.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately via ElectricSQL)
- Phase 1: Create all documents with 'pending' status (visible in UI immediately via Zero)
- Phase 2: Celery processes each file: pending processing ready/failed
Requires DOCUMENTS_CREATE permission.

View file

@ -1,7 +1,7 @@
"""
Notifications API routes.
These endpoints allow marking notifications as read and fetching older notifications.
Electric SQL automatically syncs the changes to all connected clients for recent items.
Zero automatically syncs the changes to all connected clients for recent items.
For older items (beyond the sync window), use the list endpoint.
"""
@ -267,7 +267,7 @@ async def get_unread_count(
This allows the frontend to calculate:
- older_unread = total_unread - recent_unread (static until reconciliation)
- Display count = older_unread + live_recent_count (from Electric SQL)
- Display count = older_unread + live_recent_count (from Zero)
"""
# Calculate cutoff date for sync window
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
@ -344,7 +344,7 @@ async def list_notifications(
List notifications for the current user with pagination.
This endpoint is used as a fallback for older notifications that are
outside the Electric SQL sync window (2 weeks).
outside the Zero sync window (2 weeks).
Use `before_date` to paginate through older notifications efficiently.
"""
@ -487,7 +487,7 @@ async def mark_notification_as_read(
"""
Mark a single notification as read.
Electric SQL will automatically sync this change to all connected clients.
Zero will automatically sync this change to all connected clients.
"""
# Verify the notification belongs to the user
result = await session.execute(
@ -528,7 +528,7 @@ async def mark_all_notifications_as_read(
"""
Mark all notifications as read for the current user.
Electric SQL will automatically sync these changes to all connected clients.
Zero will automatically sync these changes to all connected clients.
"""
# Update all unread notifications for the user
result = await session.execute(

View file

@ -1433,7 +1433,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
elif documents_processed > 0:
# Update notification to storing stage
if notification:
@ -1460,7 +1460,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# No new documents processed - check if this is an error or just no changes
if error_or_warning:
@ -1486,7 +1486,7 @@ async def _run_indexing_with_notifications(
if is_duplicate_warning or is_empty_result or is_info_warning:
# These are success cases - sync worked, just found nothing new
logger.info(f"Indexing completed successfully: {error_or_warning}")
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
# Still update timestamp so Zero syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
await session.commit() # Commit timestamp update
@ -1509,7 +1509,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# Actual failure
logger.error(f"Indexing failed: {error_or_warning}")
@ -1525,13 +1525,13 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# Success - just no new documents to index (all skipped/unchanged)
logger.info(
"Indexing completed: No new documents to process (all up to date)"
)
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
# Still update timestamp so Zero syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
await session.commit() # Commit timestamp update
@ -1547,7 +1547,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
except SoftTimeLimitExceeded:
# Celery soft time limit was reached - task is about to be killed
# Gracefully save progress and mark as interrupted
@ -2650,7 +2650,7 @@ async def run_composio_indexing(
Run Composio connector indexing with real-time notifications.
This wraps the Composio indexer with the notification system so that
Electric SQL can sync indexing progress to the frontend in real-time.
Zero can sync indexing progress to the frontend in real-time.
Args:
session: Database session

View file

@ -456,7 +456,7 @@ async def create_comment(
thread = message.thread
comment = ChatComment(
message_id=message_id,
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
thread_id=thread.id, # Denormalized for efficient per-thread sync
author_id=user.id,
content=content,
)
@ -569,7 +569,7 @@ async def create_reply(
thread = parent_comment.message.thread
reply = ChatComment(
message_id=parent_comment.message_id,
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
thread_id=thread.id, # Denormalized for efficient per-thread sync
parent_id=comment_id,
author_id=user.id,
content=content,

View file

@ -1,4 +1,4 @@
"""Service for creating and managing notifications with Electric SQL sync."""
"""Service for creating and managing notifications with Zero sync."""
import logging
from datetime import UTC, datetime
@ -1045,7 +1045,7 @@ class PageLimitNotificationHandler(BaseNotificationHandler):
class NotificationService:
"""Service for creating and managing notifications that sync via Electric SQL."""
"""Service for creating and managing notifications that sync via Zero."""
# Handler instances
connector_indexing = ConnectorIndexingNotificationHandler()
@ -1065,7 +1065,7 @@ class NotificationService:
notification_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Create a notification - Electric SQL will automatically sync it to frontend.
Create a notification - Zero will automatically sync it to frontend.
Args:
session: Database session

View file

@ -887,7 +887,7 @@ async def _process_file_with_document(
)
try:
# Set status to PROCESSING (shows spinner in UI via ElectricSQL)
# Set status to PROCESSING (shows spinner in UI via Zero)
document.status = DocumentStatus.processing()
await session.commit()
logger.info(
@ -951,7 +951,7 @@ async def _process_file_with_document(
):
page_limit_error = e.__cause__
# Mark document as failed (shows error in UI via ElectricSQL)
# Mark document as failed (shows error in UI via Zero)
error_message = str(e)[:500]
document.status = DocumentStatus.failed(error_message)
document.updated_at = get_current_timestamp()

View file

@ -139,7 +139,7 @@ async def index_airtable_records(
await task_logger.log_task_success(
log_entry, success_msg, {"bases_count": 0}
)
# CRITICAL: Update timestamp even when no bases found so Electric SQL syncs
# CRITICAL: Update timestamp even when no bases found so Zero syncs
await update_connector_last_indexed(
session, connector, update_last_indexed
)
@ -460,7 +460,7 @@ async def index_airtable_records(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed

View file

@ -462,7 +462,7 @@ async def index_bookstack_pages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -470,7 +470,7 @@ async def index_clickup_tasks(
total_processed = documents_indexed
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -442,7 +442,7 @@ async def index_confluence_pages(
documents_failed += 1
continue # Skip this page and continue with others
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -718,7 +718,7 @@ async def index_discord_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -413,7 +413,7 @@ async def index_elasticsearch_documents(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
if update_last_indexed:
connector.last_indexed_at = (

View file

@ -451,7 +451,7 @@ async def index_github_repos(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit

View file

@ -554,7 +554,7 @@ async def index_google_calendar_events(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -477,7 +477,7 @@ async def index_google_gmail_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -422,7 +422,7 @@ async def index_jira_issues(
documents_failed += 1
continue # Skip this issue and continue with others
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -463,7 +463,7 @@ async def index_linear_issues(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -520,7 +520,7 @@ async def index_luma_events(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -252,7 +252,7 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
# CRITICAL: Update timestamp even when no pages found so Electric SQL syncs
# CRITICAL: Update timestamp even when no pages found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await notion_client.close()
@ -506,7 +506,7 @@ async def index_notion_pages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed

View file

@ -599,7 +599,7 @@ async def index_obsidian_vault(
failed_count += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -256,7 +256,7 @@ async def index_slack_messages(
f"No Slack channels found for connector {connector_id}",
{"channels_found": 0},
)
# CRITICAL: Update timestamp even when no channels found so Electric SQL syncs
# CRITICAL: Update timestamp even when no channels found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no channels found
@ -593,7 +593,7 @@ async def index_slack_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -249,7 +249,7 @@ async def index_teams_messages(
f"No Teams found for connector {connector_id}",
{"teams_found": 0},
)
# CRITICAL: Update timestamp even when no teams found so Electric SQL syncs
# CRITICAL: Update timestamp even when no teams found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
@ -635,7 +635,7 @@ async def index_teams_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -444,7 +444,7 @@ async def index_crawled_urls(
total_processed = documents_indexed + documents_updated
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -10,8 +10,13 @@ const cacheURL = process.env.NEXT_PUBLIC_ZERO_CACHE_URL || "http://localhost:484
export function ZeroProvider({ children }: { children: React.ReactNode }) {
const { data: user } = useAtomValue(currentUserAtom);
const userID = user?.id ? String(user.id) : "";
const context = user?.id ? { userId: String(user.id) } : undefined;
if (!user?.id) {
return <>{children}</>;
}
const userID = String(user.id);
const context = { userId: userID };
return (
<ZeroReactProvider {...{ userID, context, cacheURL, schema, queries }}>

View file

@ -203,8 +203,8 @@ export function useCommentsSync(threadId: number | null) {
parent_id: c.parentId ?? null,
author_id: c.authorId ?? null,
content: c.content,
created_at: String(c.createdAt),
updated_at: String(c.updatedAt),
created_at: new Date(c.createdAt).toISOString(),
updated_at: new Date(c.updatedAt).toISOString(),
}));
updateReactQueryCache(rows);

View file

@ -242,7 +242,7 @@ export function useDocuments(
created_by_email: doc.createdById
? (emailCacheRef.current.get(doc.createdById) ?? null)
: null,
created_at: String(doc.createdAt),
created_at: new Date(doc.createdAt).toISOString(),
status: (doc.status as unknown as DocumentStatusType) ?? { state: "ready" },
}));

View file

@ -154,8 +154,8 @@ export function useInbox(
message: item.message,
read: item.read,
metadata: item.metadata as unknown as Record<string, unknown>,
created_at: String(item.createdAt),
updated_at: item.updatedAt ? String(item.updatedAt) : undefined,
created_at: new Date(item.createdAt).toISOString(),
updated_at: item.updatedAt ? new Date(item.updatedAt).toISOString() : null,
} as InboxItem));
let updated = prev.map((existing) => {

View file

@ -30,7 +30,7 @@ export function useMessagesSync(
role: msg.role,
content: msg.content,
author_id: msg.authorId ?? null,
created_at: String(msg.createdAt),
created_at: new Date(msg.createdAt).toISOString(),
}));
onMessagesUpdateRef.current(mapped);