diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index a2a613777..898ab9735 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -76,8 +76,8 @@ celery_app.conf.update( enable_utc=True, # Task execution settings task_track_started=True, - task_time_limit=3600, # 1 hour hard limit - task_soft_time_limit=3000, # 50 minutes soft limit + task_time_limit=28800, # 8 hour hard limit + task_soft_time_limit=28200, # 7 hours 50 minutes soft limit # Result backend settings result_expires=86400, # Results expire after 24 hours result_extended=True, diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index 1f29a5968..54e5d12ca 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -58,10 +58,23 @@ class NotionHistoryConnector: "timestamp": "last_edited_time", } - # First, get a list of all pages the integration has access to - search_results = await self.notion.search(**search_params) + # Paginate through all pages the integration has access to + pages = [] + has_more = True + cursor = None + + while has_more: + if cursor: + search_params["start_cursor"] = cursor + + search_results = await self.notion.search(**search_params) + + pages.extend(search_results["results"]) + has_more = search_results.get("has_more", False) + + if has_more: + cursor = search_results.get("next_cursor") - pages = search_results["results"] all_page_data = [] for page in pages: diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index b670391eb..f8c3c7869 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -389,6 +389,11 @@ async def index_airtable_records( logger.info( f"Successfully indexed new Airtable record {summary_content}" ) + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Airtable records processed so far") + await session.commit() except Exception as e: logger.error( @@ -408,7 +413,8 @@ async def index_airtable_records( session, connector, update_last_indexed ) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Airtable records processed") await session.commit() logger.info( "Successfully committed all Airtable document changes to database" diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 4c057946b..7a2b23ff6 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -353,6 +353,11 @@ async def index_clickup_tasks( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new task {task_name}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} ClickUp tasks processed so far") + await session.commit() except Exception as e: logger.error( @@ -366,6 +371,8 @@ async def index_clickup_tasks( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} ClickUp tasks processed") await session.commit() await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index afdbdd177..943880c7f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -367,6 +367,11 @@ async def index_confluence_pages( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new page {page_title}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Confluence pages processed so far") + await session.commit() except Exception as e: logger.error( @@ -384,7 +389,8 @@ async def index_confluence_pages( if update_last_indexed: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Confluence pages processed") await session.commit() logger.info( "Successfully committed all Confluence document changes to database" diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index b08a36132..3d4a441fd 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -461,6 +461,11 @@ async def index_discord_messages( logger.info( f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages" ) + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Discord channels processed so far") + await session.commit() except Exception as e: logger.error( @@ -476,6 +481,8 @@ async def index_discord_messages( if documents_indexed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Discord channels processed") await session.commit() # Prepare result message diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 8cd8ca299..89908d126 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -380,6 +380,11 @@ async def index_github_repos( ) session.add(document) documents_processed += 1 + + # Batch commit every 10 documents + if documents_processed % 10 == 0: + logger.info(f"Committing batch: {documents_processed} GitHub files processed so far") + await session.commit() except Exception as repo_err: logger.error( @@ -387,7 +392,8 @@ async def index_github_repos( ) errors.append(f"Failed processing {repo_full_name}: {repo_err}") - # Commit all changes at the end + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_processed} GitHub files processed") await session.commit() logger.info( f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files." diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index b7d8e0b59..588b965f7 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -406,6 +406,11 @@ async def index_google_calendar_events( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new event {event_summary}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Google Calendar events processed so far") + await session.commit() except Exception as e: logger.error( @@ -422,6 +427,8 @@ async def index_google_calendar_events( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Google Calendar events processed") await session.commit() await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 7e821dc9f..d05d13848 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -323,6 +323,11 @@ async def index_google_gmail_messages( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new email {summary_content}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Gmail messages processed so far") + await session.commit() except Exception as e: logger.error( @@ -338,7 +343,8 @@ async def index_google_gmail_messages( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed") await session.commit() logger.info( "Successfully committed all Google gmail document changes to database" diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 36e09c81e..0f138fc8b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -351,6 +351,11 @@ async def index_jira_issues( logger.info( f"Successfully indexed new issue {issue_identifier} - {issue_title}" ) + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Jira issues processed so far") + await session.commit() except Exception as e: logger.error( @@ -368,7 +373,8 @@ async def index_jira_issues( if update_last_indexed: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Jira issues processed") await session.commit() logger.info("Successfully committed all JIRA document changes to database") diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 33d5835ee..4a96bd943 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -370,6 +370,11 @@ async def index_linear_issues( logger.info( f"Successfully indexed new issue {issue_identifier} - {issue_title}" ) + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Linear issues processed so far") + await session.commit() except Exception as e: logger.error( @@ -387,7 +392,8 @@ async def index_linear_issues( if update_last_indexed: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Linear issues processed") await session.commit() logger.info("Successfully committed all Linear document changes to database") diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 15588afaa..0e17d209b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -437,6 +437,11 @@ async def index_luma_events( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new event {event_name}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Luma events processed so far") + await session.commit() except Exception as e: logger.error( @@ -453,6 +458,8 @@ async def index_luma_events( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Luma events processed") await session.commit() await task_logger.log_task_success( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 699d2fddf..d3ebdb649 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -356,6 +356,12 @@ async def index_notion_pages( documents_indexed += 1 logger.info(f"Successfully updated Notion page: {page_title}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} documents processed so far") + await session.commit() + continue # Document doesn't exist - create new one @@ -405,6 +411,11 @@ async def index_notion_pages( session.add(document) documents_indexed += 1 logger.info(f"Successfully indexed new Notion page: {page_title}") + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} documents processed so far") + await session.commit() except Exception as e: logger.error( @@ -423,7 +434,8 @@ async def index_notion_pages( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} documents processed") await session.commit() # Prepare result message diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index dd9edcc8d..0631b3fc4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -353,6 +353,12 @@ async def index_slack_messages( session.add(document) documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Slack channels processed so far") + await session.commit() + logger.info( f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages" ) @@ -376,7 +382,8 @@ async def index_slack_messages( if total_processed > 0: await update_connector_last_indexed(session, connector, update_last_indexed) - # Commit all changes + # Final commit for any remaining documents not yet committed in batches + logger.info(f"Final commit: Total {documents_indexed} Slack channels processed") await session.commit() # Prepare result message diff --git a/surfsense_web/components/chat/ChatInputGroup.tsx b/surfsense_web/components/chat/ChatInputGroup.tsx index dc7e6a4f4..6309b9df0 100644 --- a/surfsense_web/components/chat/ChatInputGroup.tsx +++ b/surfsense_web/components/chat/ChatInputGroup.tsx @@ -26,6 +26,7 @@ import { getConnectorIcon } from "@/contracts/enums/connectorIcons"; import { useDocumentTypes } from "@/hooks/use-document-types"; import type { Document } from "@/hooks/use-documents"; import { useLLMConfigs, useLLMPreferences } from "@/hooks/use-llm-configs"; +import { useSearchSourceConnectors } from "@/hooks/use-search-source-connectors"; const DocumentSelector = React.memo( ({ @@ -119,14 +120,31 @@ const ConnectorSelector = React.memo( true ); + // Fetch live search connectors (non-indexable) + const { + connectors: searchConnectors, + isLoading: connectorsLoading, + isLoaded: connectorsLoaded, + fetchConnectors, + } = useSearchSourceConnectors(true, Number(search_space_id)); + + // Filter for non-indexable connectors (live search) + const liveSearchConnectors = React.useMemo( + () => searchConnectors.filter((connector) => !connector.is_indexable), + [searchConnectors] + ); + const handleOpenChange = useCallback( (open: boolean) => { setIsOpen(open); if (open && !isLoaded) { fetchDocumentTypes(Number(search_space_id)); } + if (open && !connectorsLoaded) { + fetchConnectors(Number(search_space_id)); + } }, - [fetchDocumentTypes, isLoaded, search_space_id] + [fetchDocumentTypes, isLoaded, fetchConnectors, connectorsLoaded, search_space_id] ); const handleConnectorToggle = useCallback( @@ -141,14 +159,18 @@ const ConnectorSelector = React.memo( ); const handleSelectAll = useCallback(() => { - onSelectionChange?.(documentTypes.map((dt) => dt.type)); - }, [documentTypes, onSelectionChange]); + const allTypes = [ + ...documentTypes.map((dt) => dt.type), + ...liveSearchConnectors.map((c) => c.connector_type), + ]; + onSelectionChange?.(allTypes); + }, [documentTypes, liveSearchConnectors, onSelectionChange]); const handleClearAll = useCallback(() => { onSelectionChange?.([]); }, [onSelectionChange]); - // Get display name for document type + // Get display name for connector type const getDisplayName = (type: string) => { return type .split("_") @@ -158,6 +180,13 @@ const ConnectorSelector = React.memo( // Get selected document types with their counts const selectedDocTypes = documentTypes.filter((dt) => selectedConnectors.includes(dt.type)); + const selectedLiveConnectors = liveSearchConnectors.filter((c) => + selectedConnectors.includes(c.connector_type) + ); + + // Total selected count + const totalSelectedCount = selectedDocTypes.length + selectedLiveConnectors.length; + const totalAvailableCount = documentTypes.length + liveSearchConnectors.length; return ( @@ -168,10 +197,10 @@ const ConnectorSelector = React.memo( className="relative h-9 gap-2 px-3 border-dashed hover:border-solid hover:bg-accent/50 transition-all" >
- {selectedDocTypes.length > 0 ? ( + {totalSelectedCount > 0 ? ( <>
- {selectedDocTypes.slice(0, 3).map((docType) => ( + {selectedDocTypes.slice(0, 2).map((docType) => (
))} + {selectedLiveConnectors.slice(0, 3 - selectedDocTypes.slice(0, 2).length).map((connector) => ( +
+ {getConnectorIcon(connector.connector_type, "h-3 w-3")} +
+ ))}
- {selectedDocTypes.length} {selectedDocTypes.length === 1 ? "source" : "sources"} + {totalSelectedCount} {totalSelectedCount === 1 ? "source" : "sources"} ) : ( @@ -194,99 +231,169 @@ const ConnectorSelector = React.memo( - -
+ +
- Select Document Types + Select Sources - Choose which document types to include in your search + Choose indexed document types and live search connectors to include in your search
- {/* Document type selection grid */} -
- {isLoading ? ( -
-
+ {(isLoading || connectorsLoading) ? ( +
+
+
+ ) : totalAvailableCount === 0 ? ( +
+
+
- ) : documentTypes.length === 0 ? ( -
-
- -
-

No documents found

-

- Add documents to this search space to enable filtering by type -

-
- ) : ( - documentTypes.map((docType) => { - const isSelected = selectedConnectors.includes(docType.type); +

No sources found

+

+ Add documents or configure search connectors for this search space +

+
+ ) : ( + <> + {/* Live Search Connectors Section */} + {liveSearchConnectors.length > 0 && ( +
+
+ +

Live Search Connectors

+ + Real-time + +
+
+ {liveSearchConnectors.map((connector) => { + const isSelected = selectedConnectors.includes(connector.connector_type); - return ( -
-

- {docType.count} {docType.count === 1 ? "document" : "documents"} -

-
- - ); - }) - )} -
+
+
+

+ {connector.name} +

+ {isSelected && ( +
+ +
+ )} +
+

+ {getDisplayName(connector.connector_type)} +

+
+ + ); + })} +
+
+ )} - {documentTypes.length > 0 && ( - - - - + {/* Document Types Section */} + {documentTypes.length > 0 && ( +
+
+ +

Indexed Document Types

+ + Stored + +
+
+ {documentTypes.map((docType) => { + const isSelected = selectedConnectors.includes(docType.type); + + return ( + + ); + })} +
+
+ )} + )}
+ + {totalAvailableCount > 0 && ( + + + + + )}
);