Merge remote-tracking branch 'upstream/dev' into impr/thinking-steps

This commit is contained in:
Anish Sarkar 2026-03-25 01:50:10 +05:30
commit 778cfac6fa
96 changed files with 4065 additions and 3274 deletions

View file

@ -17,10 +17,6 @@ REDIS_APP_URL=redis://localhost:6379/0
# Only uncomment if running the backend outside Docker (e.g. uvicorn on host).
# SEARXNG_DEFAULT_HOST=http://localhost:8888
#Electric(for migrations only)
ELECTRIC_DB_USER=electric
ELECTRIC_DB_PASSWORD=electric_password
# Periodic task interval
# # Run every minute (default)
# SCHEDULE_CHECKER_INTERVAL=1m

View file

@ -25,13 +25,6 @@ database_url = os.getenv("DATABASE_URL")
if database_url:
config.set_main_option("sqlalchemy.url", database_url)
# Electric SQL user credentials - centralized configuration for migrations
# These are used by migrations that set up Electric SQL replication
config.set_main_option("electric_db_user", os.getenv("ELECTRIC_DB_USER", "electric"))
config.set_main_option(
"electric_db_password", os.getenv("ELECTRIC_DB_PASSWORD", "electric_password")
)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:

View file

@ -30,21 +30,25 @@ def upgrade() -> None:
"ix_notifications_user_read_type_created",
"notifications",
["user_id", "read", "type", "created_at"],
if_not_exists=True,
)
op.create_index(
"ix_notifications_user_space_created",
"notifications",
["user_id", "search_space_id", "created_at"],
if_not_exists=True,
)
op.create_index(
"ix_notifications_type",
"notifications",
["type"],
if_not_exists=True,
)
op.create_index(
"ix_notifications_search_space_id",
"notifications",
["search_space_id"],
if_not_exists=True,
)

View file

@ -35,52 +35,60 @@ def upgrade() -> None:
END $$;
""")
op.create_table(
"video_presentations",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("title", sa.String(length=500), nullable=False),
sa.Column("slides", JSONB(), nullable=True),
sa.Column("scene_codes", JSONB(), nullable=True),
sa.Column(
"status",
video_presentation_status_enum,
server_default="ready",
nullable=False,
),
sa.Column("search_space_id", sa.Integer(), nullable=False),
sa.Column("thread_id", sa.Integer(), nullable=True),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["search_space_id"],
["searchspaces.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["thread_id"],
["new_chat_threads.id"],
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id"),
conn = op.get_bind()
result = conn.execute(
sa.text("SELECT 1 FROM information_schema.tables WHERE table_name = 'video_presentations'")
)
if not result.fetchone():
op.create_table(
"video_presentations",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("title", sa.String(length=500), nullable=False),
sa.Column("slides", JSONB(), nullable=True),
sa.Column("scene_codes", JSONB(), nullable=True),
sa.Column(
"status",
video_presentation_status_enum,
server_default="ready",
nullable=False,
),
sa.Column("search_space_id", sa.Integer(), nullable=False),
sa.Column("thread_id", sa.Integer(), nullable=True),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["search_space_id"],
["searchspaces.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["thread_id"],
["new_chat_threads.id"],
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_video_presentations_status",
"video_presentations",
["status"],
if_not_exists=True,
)
op.create_index(
"ix_video_presentations_thread_id",
"video_presentations",
["thread_id"],
if_not_exists=True,
)
op.create_index(
"ix_video_presentations_created_at",
"video_presentations",
["created_at"],
if_not_exists=True,
)

View file

@ -0,0 +1,104 @@
"""Clean up Electric SQL artifacts (user, publication, replication slots)
Revision ID: 108
Revises: 107
Removes leftover Electric SQL infrastructure that is no longer needed after
the migration to Rocicorp Zero. Fully idempotent safe on databases that
never had Electric SQL set up (fresh installs).
Cleaned up:
- Replication slots containing 'electric' (prevents unbounded WAL growth)
- The 'electric_publication_default' publication
- Default privileges, grants, and the 'electric' database user
"""
from collections.abc import Sequence
from alembic import op
revision: str = "108"
down_revision: str | None = "107"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"""
DO $$
DECLARE
slot RECORD;
BEGIN
-- 1. Drop inactive Electric replication slots (prevents WAL growth)
FOR slot IN
SELECT slot_name FROM pg_replication_slots
WHERE slot_name LIKE '%electric%' AND active = false
LOOP
BEGIN
PERFORM pg_drop_replication_slot(slot.slot_name);
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not drop replication slot %: %', slot.slot_name, SQLERRM;
END;
END LOOP;
-- Warn about active Electric slots that cannot be safely dropped
FOR slot IN
SELECT slot_name FROM pg_replication_slots
WHERE slot_name LIKE '%electric%' AND active = true
LOOP
RAISE WARNING 'Active Electric replication slot "%" was not dropped — drop it manually to stop WAL growth', slot.slot_name;
END LOOP;
-- 2. Drop the Electric publication
BEGIN
IF EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'electric_publication_default') THEN
DROP PUBLICATION electric_publication_default;
END IF;
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not drop publication electric_publication_default: %', SQLERRM;
END;
-- 3. Revoke privileges and drop the Electric user
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'electric') THEN
BEGIN
ALTER DEFAULT PRIVILEGES IN SCHEMA public
REVOKE SELECT ON TABLES FROM electric;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
REVOKE SELECT ON SEQUENCES FROM electric;
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not revoke default privileges from electric: %', SQLERRM;
END;
BEGIN
REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA public FROM electric;
REVOKE ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public FROM electric;
REVOKE USAGE ON SCHEMA public FROM electric;
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not revoke schema privileges from electric: %', SQLERRM;
END;
BEGIN
EXECUTE format(
'REVOKE CONNECT ON DATABASE %I FROM electric',
current_database()
);
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not revoke CONNECT from electric: %', SQLERRM;
END;
BEGIN
REASSIGN OWNED BY electric TO CURRENT_USER;
DROP ROLE electric;
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Could not drop role electric: %', SQLERRM;
END;
END IF;
END
$$;
"""
)
def downgrade() -> None:
pass

View file

@ -722,7 +722,7 @@ class ChatComment(BaseModel, TimestampMixin):
nullable=False,
index=True,
)
# Denormalized thread_id for efficient Electric SQL subscriptions (one per thread)
# Denormalized thread_id for efficient Zero subscriptions (one per thread)
thread_id = Column(
Integer,
ForeignKey("new_chat_threads.id", ondelete="CASCADE"),
@ -792,7 +792,7 @@ class ChatCommentMention(BaseModel, TimestampMixin):
class ChatSessionState(BaseModel):
"""
Tracks real-time session state for shared chat collaboration.
One record per thread, synced via Electric SQL.
One record per thread, synced via Zero.
"""
__tablename__ = "chat_session_state"

View file

@ -80,7 +80,7 @@ router.include_router(model_list_router) # Dynamic LLM model catalogue from Ope
router.include_router(logs_router)
router.include_router(circleback_webhook_router) # Circleback meeting webhooks
router.include_router(surfsense_docs_router) # Surfsense documentation for citations
router.include_router(notifications_router) # Notifications with Electric SQL sync
router.include_router(notifications_router) # Notifications with Zero sync
router.include_router(composio_router) # Composio OAuth and toolkit management
router.include_router(public_chat_router) # Public chat sharing and cloning
router.include_router(incentive_tasks_router) # Incentive tasks for earning free pages

View file

@ -128,7 +128,7 @@ async def create_documents_file_upload(
Upload files as documents with real-time status tracking.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately via ElectricSQL)
- Phase 1: Create all documents with 'pending' status (visible in UI immediately via Zero)
- Phase 2: Celery processes each file: pending processing ready/failed
Requires DOCUMENTS_CREATE permission.

View file

@ -1,7 +1,7 @@
"""
Notifications API routes.
These endpoints allow marking notifications as read and fetching older notifications.
Electric SQL automatically syncs the changes to all connected clients for recent items.
Zero automatically syncs the changes to all connected clients for recent items.
For older items (beyond the sync window), use the list endpoint.
"""
@ -267,7 +267,7 @@ async def get_unread_count(
This allows the frontend to calculate:
- older_unread = total_unread - recent_unread (static until reconciliation)
- Display count = older_unread + live_recent_count (from Electric SQL)
- Display count = older_unread + live_recent_count (from Zero)
"""
# Calculate cutoff date for sync window
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
@ -344,7 +344,7 @@ async def list_notifications(
List notifications for the current user with pagination.
This endpoint is used as a fallback for older notifications that are
outside the Electric SQL sync window (2 weeks).
outside the Zero sync window (2 weeks).
Use `before_date` to paginate through older notifications efficiently.
"""
@ -487,7 +487,7 @@ async def mark_notification_as_read(
"""
Mark a single notification as read.
Electric SQL will automatically sync this change to all connected clients.
Zero will automatically sync this change to all connected clients.
"""
# Verify the notification belongs to the user
result = await session.execute(
@ -528,7 +528,7 @@ async def mark_all_notifications_as_read(
"""
Mark all notifications as read for the current user.
Electric SQL will automatically sync these changes to all connected clients.
Zero will automatically sync these changes to all connected clients.
"""
# Update all unread notifications for the user
result = await session.execute(

View file

@ -1543,7 +1543,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
elif documents_processed > 0:
# Update notification to storing stage
if notification:
@ -1570,7 +1570,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# No new documents processed - check if this is an error or just no changes
if error_or_warning:
@ -1596,7 +1596,7 @@ async def _run_indexing_with_notifications(
if is_duplicate_warning or is_empty_result or is_info_warning:
# These are success cases - sync worked, just found nothing new
logger.info(f"Indexing completed successfully: {error_or_warning}")
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
# Still update timestamp so Zero syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
await session.commit() # Commit timestamp update
@ -1619,7 +1619,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# Actual failure
logger.error(f"Indexing failed: {error_or_warning}")
@ -1637,13 +1637,13 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
else:
# Success - just no new documents to index (all skipped/unchanged)
logger.info(
"Indexing completed: No new documents to process (all up to date)"
)
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
# Still update timestamp so Zero syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
await session.commit() # Commit timestamp update
@ -1659,7 +1659,7 @@ async def _run_indexing_with_notifications(
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
) # Commit to ensure Zero syncs the notification update
except SoftTimeLimitExceeded:
# Celery soft time limit was reached - task is about to be killed
# Gracefully save progress and mark as interrupted
@ -2776,7 +2776,7 @@ async def run_composio_indexing(
Run Composio connector indexing with real-time notifications.
This wraps the Composio indexer with the notification system so that
Electric SQL can sync indexing progress to the frontend in real-time.
Zero can sync indexing progress to the frontend in real-time.
Args:
session: Database session

View file

@ -456,7 +456,7 @@ async def create_comment(
thread = message.thread
comment = ChatComment(
message_id=message_id,
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
thread_id=thread.id, # Denormalized for efficient per-thread sync
author_id=user.id,
content=content,
)
@ -569,7 +569,7 @@ async def create_reply(
thread = parent_comment.message.thread
reply = ChatComment(
message_id=parent_comment.message_id,
thread_id=thread.id, # Denormalized for efficient Electric subscriptions
thread_id=thread.id, # Denormalized for efficient per-thread sync
parent_id=comment_id,
author_id=user.id,
content=content,

View file

@ -1,4 +1,4 @@
"""Service for creating and managing notifications with Electric SQL sync."""
"""Service for creating and managing notifications with Zero sync."""
import logging
from datetime import UTC, datetime
@ -1045,7 +1045,7 @@ class PageLimitNotificationHandler(BaseNotificationHandler):
class NotificationService:
"""Service for creating and managing notifications that sync via Electric SQL."""
"""Service for creating and managing notifications that sync via Zero."""
# Handler instances
connector_indexing = ConnectorIndexingNotificationHandler()
@ -1065,7 +1065,7 @@ class NotificationService:
notification_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Create a notification - Electric SQL will automatically sync it to frontend.
Create a notification - Zero will automatically sync it to frontend.
Args:
session: Database session

View file

@ -887,7 +887,7 @@ async def _process_file_with_document(
)
try:
# Set status to PROCESSING (shows spinner in UI via ElectricSQL)
# Set status to PROCESSING (shows spinner in UI via Zero)
document.status = DocumentStatus.processing()
await session.commit()
logger.info(
@ -951,7 +951,7 @@ async def _process_file_with_document(
):
page_limit_error = e.__cause__
# Mark document as failed (shows error in UI via ElectricSQL)
# Mark document as failed (shows error in UI via Zero)
error_message = str(e)[:500]
document.status = DocumentStatus.failed(error_message)
document.updated_at = get_current_timestamp()

View file

@ -139,7 +139,7 @@ async def index_airtable_records(
await task_logger.log_task_success(
log_entry, success_msg, {"bases_count": 0}
)
# CRITICAL: Update timestamp even when no bases found so Electric SQL syncs
# CRITICAL: Update timestamp even when no bases found so Zero syncs
await update_connector_last_indexed(
session, connector, update_last_indexed
)
@ -460,7 +460,7 @@ async def index_airtable_records(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed

View file

@ -462,7 +462,7 @@ async def index_bookstack_pages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -470,7 +470,7 @@ async def index_clickup_tasks(
total_processed = documents_indexed
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -442,7 +442,7 @@ async def index_confluence_pages(
documents_failed += 1
continue # Skip this page and continue with others
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -718,7 +718,7 @@ async def index_discord_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -413,7 +413,7 @@ async def index_elasticsearch_documents(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
if update_last_indexed:
connector.last_indexed_at = (

View file

@ -451,7 +451,7 @@ async def index_github_repos(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit

View file

@ -599,7 +599,7 @@ async def index_google_calendar_events(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -519,7 +519,7 @@ async def index_google_gmail_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -422,7 +422,7 @@ async def index_jira_issues(
documents_failed += 1
continue # Skip this issue and continue with others
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -463,7 +463,7 @@ async def index_linear_issues(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -520,7 +520,7 @@ async def index_luma_events(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)

View file

@ -252,7 +252,7 @@ async def index_notion_pages(
{"pages_found": 0},
)
logger.info("No Notion pages found to index")
# CRITICAL: Update timestamp even when no pages found so Electric SQL syncs
# CRITICAL: Update timestamp even when no pages found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
await notion_client.close()
@ -506,7 +506,7 @@ async def index_notion_pages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
total_processed = documents_indexed

View file

@ -599,7 +599,7 @@ async def index_obsidian_vault(
failed_count += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -256,7 +256,7 @@ async def index_slack_messages(
f"No Slack channels found for connector {connector_id}",
{"channels_found": 0},
)
# CRITICAL: Update timestamp even when no channels found so Electric SQL syncs
# CRITICAL: Update timestamp even when no channels found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no channels found
@ -593,7 +593,7 @@ async def index_slack_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -249,7 +249,7 @@ async def index_teams_messages(
f"No Teams found for connector {connector_id}",
{"teams_found": 0},
)
# CRITICAL: Update timestamp even when no teams found so Electric SQL syncs
# CRITICAL: Update timestamp even when no teams found so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
@ -635,7 +635,7 @@ async def index_teams_messages(
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -444,7 +444,7 @@ async def index_crawled_urls(
total_processed = documents_indexed + documents_updated
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Zero syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches

View file

@ -10,7 +10,7 @@ document upload pipeline. It includes various markdown formatting elements.
- Document upload and processing
- Automatic chunking of content
- Embedding generation for semantic search
- Real-time status tracking via ElectricSQL
- Real-time status tracking via Zero
## Technical Architecture