mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-28 18:36:23 +02:00
Merge remote-tracking branch 'upstream/dev' into feature/prompt-library
This commit is contained in:
commit
1aeb5ba645
66 changed files with 4561 additions and 139 deletions
|
|
@ -84,6 +84,7 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
|
|||
"BOOKSTACK_CONNECTOR": "BOOKSTACK_CONNECTOR",
|
||||
"CIRCLEBACK_CONNECTOR": "CIRCLEBACK", # Connector type differs from document type
|
||||
"OBSIDIAN_CONNECTOR": "OBSIDIAN_CONNECTOR",
|
||||
"DROPBOX_CONNECTOR": "DROPBOX_FILE", # Connector type differs from document type
|
||||
"ONEDRIVE_CONNECTOR": "ONEDRIVE_FILE", # Connector type differs from document type
|
||||
# Composio connectors (unified to native document types).
|
||||
# Reverse of NATIVE_TO_LEGACY_DOCTYPE in app.db.
|
||||
|
|
@ -317,6 +318,12 @@ async def create_surfsense_deep_agent(
|
|||
]
|
||||
modified_disabled_tools.extend(google_drive_tools)
|
||||
|
||||
has_dropbox_connector = (
|
||||
available_connectors is not None and "DROPBOX_FILE" in available_connectors
|
||||
)
|
||||
if not has_dropbox_connector:
|
||||
modified_disabled_tools.extend(["create_dropbox_file", "delete_dropbox_file"])
|
||||
|
||||
has_onedrive_connector = (
|
||||
available_connectors is not None and "ONEDRIVE_FILE" in available_connectors
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
from app.agents.new_chat.tools.dropbox.create_file import (
|
||||
create_create_dropbox_file_tool,
|
||||
)
|
||||
from app.agents.new_chat.tools.dropbox.trash_file import (
|
||||
create_delete_dropbox_file_tool,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"create_create_dropbox_file_tool",
|
||||
"create_delete_dropbox_file_tool",
|
||||
]
|
||||
|
|
@ -0,0 +1,304 @@
|
|||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import interrupt
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.connectors.dropbox.client import DropboxClient
|
||||
from app.db import SearchSourceConnector, SearchSourceConnectorType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DOCX_MIME = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
||||
|
||||
_FILE_TYPE_LABELS = {
|
||||
"paper": "Dropbox Paper (.paper)",
|
||||
"docx": "Word Document (.docx)",
|
||||
}
|
||||
|
||||
_SUPPORTED_TYPES = [
|
||||
{"value": "paper", "label": "Dropbox Paper (.paper)"},
|
||||
{"value": "docx", "label": "Word Document (.docx)"},
|
||||
]
|
||||
|
||||
|
||||
def _ensure_extension(name: str, file_type: str) -> str:
|
||||
"""Strip any existing extension and append the correct one."""
|
||||
stem = Path(name).stem
|
||||
ext = ".paper" if file_type == "paper" else ".docx"
|
||||
return f"{stem}{ext}"
|
||||
|
||||
|
||||
def _markdown_to_docx(markdown_text: str) -> bytes:
|
||||
"""Convert a markdown string to DOCX bytes using pypandoc."""
|
||||
import pypandoc
|
||||
|
||||
fd, tmp_path = tempfile.mkstemp(suffix=".docx")
|
||||
os.close(fd)
|
||||
try:
|
||||
pypandoc.convert_text(
|
||||
markdown_text,
|
||||
"docx",
|
||||
format="gfm",
|
||||
extra_args=["--standalone"],
|
||||
outputfile=tmp_path,
|
||||
)
|
||||
with open(tmp_path, "rb") as f:
|
||||
return f.read()
|
||||
finally:
|
||||
os.unlink(tmp_path)
|
||||
|
||||
|
||||
def create_create_dropbox_file_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
@tool
|
||||
async def create_dropbox_file(
|
||||
name: str,
|
||||
file_type: Literal["paper", "docx"] = "paper",
|
||||
content: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a new document in Dropbox.
|
||||
|
||||
Use this tool when the user explicitly asks to create a new document
|
||||
in Dropbox. The user MUST specify a topic before you call this tool.
|
||||
|
||||
Args:
|
||||
name: The document title (without extension).
|
||||
file_type: Either "paper" (Dropbox Paper, default) or "docx" (Word document).
|
||||
content: Optional initial content as markdown.
|
||||
|
||||
Returns:
|
||||
Dictionary with status, file_id, name, web_url, and message.
|
||||
"""
|
||||
logger.info(
|
||||
f"create_dropbox_file called: name='{name}', file_type='{file_type}'"
|
||||
)
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Dropbox tool not properly configured.",
|
||||
}
|
||||
|
||||
try:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connectors = result.scalars().all()
|
||||
|
||||
if not connectors:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "No Dropbox connector found. Please connect Dropbox in your workspace settings.",
|
||||
}
|
||||
|
||||
accounts = []
|
||||
for c in connectors:
|
||||
cfg = c.config or {}
|
||||
accounts.append(
|
||||
{
|
||||
"id": c.id,
|
||||
"name": c.name,
|
||||
"user_email": cfg.get("user_email"),
|
||||
"auth_expired": cfg.get("auth_expired", False),
|
||||
}
|
||||
)
|
||||
|
||||
if all(a.get("auth_expired") for a in accounts):
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "All connected Dropbox accounts need re-authentication.",
|
||||
"connector_type": "dropbox",
|
||||
}
|
||||
|
||||
parent_folders: dict[int, list[dict[str, str]]] = {}
|
||||
for acc in accounts:
|
||||
cid = acc["id"]
|
||||
if acc.get("auth_expired"):
|
||||
parent_folders[cid] = []
|
||||
continue
|
||||
try:
|
||||
client = DropboxClient(session=db_session, connector_id=cid)
|
||||
items, err = await client.list_folder("")
|
||||
if err:
|
||||
logger.warning(
|
||||
"Failed to list folders for connector %s: %s", cid, err
|
||||
)
|
||||
parent_folders[cid] = []
|
||||
else:
|
||||
parent_folders[cid] = [
|
||||
{
|
||||
"folder_path": item.get("path_lower", ""),
|
||||
"name": item["name"],
|
||||
}
|
||||
for item in items
|
||||
if item.get(".tag") == "folder"
|
||||
and item.get("name")
|
||||
]
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Error fetching folders for connector %s", cid, exc_info=True
|
||||
)
|
||||
parent_folders[cid] = []
|
||||
|
||||
context: dict[str, Any] = {
|
||||
"accounts": accounts,
|
||||
"parent_folders": parent_folders,
|
||||
"supported_types": _SUPPORTED_TYPES,
|
||||
}
|
||||
|
||||
approval = interrupt(
|
||||
{
|
||||
"type": "dropbox_file_creation",
|
||||
"action": {
|
||||
"tool": "create_dropbox_file",
|
||||
"params": {
|
||||
"name": name,
|
||||
"file_type": file_type,
|
||||
"content": content,
|
||||
"connector_id": None,
|
||||
"parent_folder_path": None,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
}
|
||||
)
|
||||
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
||||
decision = decisions[0]
|
||||
decision_type = decision.get("type") or decision.get("decision_type")
|
||||
|
||||
if decision_type == "reject":
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The file was not created.",
|
||||
}
|
||||
|
||||
final_params: dict[str, Any] = {}
|
||||
edited_action = decision.get("edited_action")
|
||||
if isinstance(edited_action, dict):
|
||||
edited_args = edited_action.get("args")
|
||||
if isinstance(edited_args, dict):
|
||||
final_params = edited_args
|
||||
elif isinstance(decision.get("args"), dict):
|
||||
final_params = decision["args"]
|
||||
|
||||
final_name = final_params.get("name", name)
|
||||
final_file_type = final_params.get("file_type", file_type)
|
||||
final_content = final_params.get("content", content)
|
||||
final_connector_id = final_params.get("connector_id")
|
||||
final_parent_folder_path = final_params.get("parent_folder_path")
|
||||
|
||||
if not final_name or not final_name.strip():
|
||||
return {"status": "error", "message": "File name cannot be empty."}
|
||||
|
||||
final_name = _ensure_extension(final_name, final_file_type)
|
||||
|
||||
if final_connector_id is not None:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
else:
|
||||
connector = connectors[0]
|
||||
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Dropbox connector is invalid.",
|
||||
}
|
||||
|
||||
client = DropboxClient(session=db_session, connector_id=connector.id)
|
||||
|
||||
parent_path = final_parent_folder_path or ""
|
||||
file_path = f"{parent_path}/{final_name}" if parent_path else f"/{final_name}"
|
||||
|
||||
if final_file_type == "paper":
|
||||
created = await client.create_paper_doc(
|
||||
file_path, final_content or ""
|
||||
)
|
||||
file_id = created.get("file_id", "")
|
||||
web_url = created.get("url", "")
|
||||
else:
|
||||
docx_bytes = _markdown_to_docx(final_content or "")
|
||||
created = await client.upload_file(
|
||||
file_path, docx_bytes, mode="add", autorename=True
|
||||
)
|
||||
file_id = created.get("id", "")
|
||||
web_url = ""
|
||||
|
||||
logger.info(
|
||||
f"Dropbox file created: id={file_id}, name={final_name}"
|
||||
)
|
||||
|
||||
kb_message_suffix = ""
|
||||
try:
|
||||
from app.services.dropbox import DropboxKBSyncService
|
||||
|
||||
kb_service = DropboxKBSyncService(db_session)
|
||||
kb_result = await kb_service.sync_after_create(
|
||||
file_id=file_id,
|
||||
file_name=final_name,
|
||||
file_path=file_path,
|
||||
web_url=web_url,
|
||||
content=final_content,
|
||||
connector_id=connector.id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
if kb_result["status"] == "success":
|
||||
kb_message_suffix = " Your knowledge base has also been updated."
|
||||
else:
|
||||
kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync."
|
||||
except Exception as kb_err:
|
||||
logger.warning(f"KB sync after create failed: {kb_err}")
|
||||
kb_message_suffix = " This file will be added to your knowledge base in the next scheduled sync."
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"file_id": file_id,
|
||||
"name": final_name,
|
||||
"web_url": web_url,
|
||||
"message": f"Successfully created '{final_name}' in Dropbox.{kb_message_suffix}",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
logger.error(f"Error creating Dropbox file: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while creating the file. Please try again.",
|
||||
}
|
||||
|
||||
return create_dropbox_file
|
||||
|
|
@ -0,0 +1,306 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import tool
|
||||
from langgraph.types import interrupt
|
||||
from sqlalchemy import String, and_, cast, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.connectors.dropbox.client import DropboxClient
|
||||
from app.db import (
|
||||
Document,
|
||||
DocumentType,
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_delete_dropbox_file_tool(
|
||||
db_session: AsyncSession | None = None,
|
||||
search_space_id: int | None = None,
|
||||
user_id: str | None = None,
|
||||
):
|
||||
@tool
|
||||
async def delete_dropbox_file(
|
||||
file_name: str,
|
||||
delete_from_kb: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Delete a file from Dropbox.
|
||||
|
||||
Use this tool when the user explicitly asks to delete, remove, or trash
|
||||
a file in Dropbox.
|
||||
|
||||
Args:
|
||||
file_name: The exact name of the file to delete.
|
||||
delete_from_kb: Whether to also remove the file from the knowledge base.
|
||||
Default is False.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- status: "success", "rejected", "not_found", or "error"
|
||||
- file_id: Dropbox file ID (if success)
|
||||
- deleted_from_kb: whether the document was removed from the knowledge base
|
||||
- message: Result message
|
||||
|
||||
IMPORTANT:
|
||||
- If status is "rejected", the user explicitly declined. Respond with a brief
|
||||
acknowledgment and do NOT retry or suggest alternatives.
|
||||
- If status is "not_found", relay the exact message to the user and ask them
|
||||
to verify the file name or check if it has been indexed.
|
||||
"""
|
||||
logger.info(
|
||||
f"delete_dropbox_file called: file_name='{file_name}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Dropbox tool not properly configured.",
|
||||
}
|
||||
|
||||
try:
|
||||
doc_result = await db_session.execute(
|
||||
select(Document)
|
||||
.join(
|
||||
SearchSourceConnector,
|
||||
Document.connector_id == SearchSourceConnector.id,
|
||||
)
|
||||
.filter(
|
||||
and_(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.DROPBOX_FILE,
|
||||
func.lower(Document.title) == func.lower(file_name),
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
.order_by(Document.updated_at.desc().nullslast())
|
||||
.limit(1)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
|
||||
if not document:
|
||||
doc_result = await db_session.execute(
|
||||
select(Document)
|
||||
.join(
|
||||
SearchSourceConnector,
|
||||
Document.connector_id == SearchSourceConnector.id,
|
||||
)
|
||||
.filter(
|
||||
and_(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.DROPBOX_FILE,
|
||||
func.lower(
|
||||
cast(
|
||||
Document.document_metadata["dropbox_file_name"],
|
||||
String,
|
||||
)
|
||||
)
|
||||
== func.lower(file_name),
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
.order_by(Document.updated_at.desc().nullslast())
|
||||
.limit(1)
|
||||
)
|
||||
document = doc_result.scalars().first()
|
||||
|
||||
if not document:
|
||||
return {
|
||||
"status": "not_found",
|
||||
"message": (
|
||||
f"File '{file_name}' not found in your indexed Dropbox files. "
|
||||
"This could mean: (1) the file doesn't exist, (2) it hasn't been indexed yet, "
|
||||
"or (3) the file name is different."
|
||||
),
|
||||
}
|
||||
|
||||
if not document.connector_id:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Document has no associated connector.",
|
||||
}
|
||||
|
||||
meta = document.document_metadata or {}
|
||||
file_path = meta.get("dropbox_path")
|
||||
file_id = meta.get("dropbox_file_id")
|
||||
document_id = document.id
|
||||
|
||||
if not file_path:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "File path is missing. Please re-index the file.",
|
||||
}
|
||||
|
||||
conn_result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.id == document.connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
connector = conn_result.scalars().first()
|
||||
if not connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Dropbox connector not found or access denied.",
|
||||
}
|
||||
|
||||
cfg = connector.config or {}
|
||||
if cfg.get("auth_expired"):
|
||||
return {
|
||||
"status": "auth_error",
|
||||
"message": "Dropbox account needs re-authentication. Please re-authenticate in your connector settings.",
|
||||
"connector_type": "dropbox",
|
||||
}
|
||||
|
||||
context = {
|
||||
"file": {
|
||||
"file_id": file_id,
|
||||
"file_path": file_path,
|
||||
"name": file_name,
|
||||
"document_id": document_id,
|
||||
},
|
||||
"account": {
|
||||
"id": connector.id,
|
||||
"name": connector.name,
|
||||
"user_email": cfg.get("user_email"),
|
||||
},
|
||||
}
|
||||
|
||||
approval = interrupt(
|
||||
{
|
||||
"type": "dropbox_file_trash",
|
||||
"action": {
|
||||
"tool": "delete_dropbox_file",
|
||||
"params": {
|
||||
"file_path": file_path,
|
||||
"connector_id": connector.id,
|
||||
"delete_from_kb": delete_from_kb,
|
||||
},
|
||||
},
|
||||
"context": context,
|
||||
}
|
||||
)
|
||||
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
||||
decision = decisions[0]
|
||||
decision_type = decision.get("type") or decision.get("decision_type")
|
||||
logger.info(f"User decision: {decision_type}")
|
||||
|
||||
if decision_type == "reject":
|
||||
return {
|
||||
"status": "rejected",
|
||||
"message": "User declined. The file was not deleted. Do not ask again or suggest alternatives.",
|
||||
}
|
||||
|
||||
final_params: dict[str, Any] = {}
|
||||
edited_action = decision.get("edited_action")
|
||||
if isinstance(edited_action, dict):
|
||||
edited_args = edited_action.get("args")
|
||||
if isinstance(edited_args, dict):
|
||||
final_params = edited_args
|
||||
elif isinstance(decision.get("args"), dict):
|
||||
final_params = decision["args"]
|
||||
|
||||
final_file_path = final_params.get("file_path", file_path)
|
||||
final_connector_id = final_params.get("connector_id", connector.id)
|
||||
final_delete_from_kb = final_params.get("delete_from_kb", delete_from_kb)
|
||||
|
||||
if final_connector_id != connector.id:
|
||||
result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
and_(
|
||||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
validated_connector = result.scalars().first()
|
||||
if not validated_connector:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected Dropbox connector is invalid or has been disconnected.",
|
||||
}
|
||||
actual_connector_id = validated_connector.id
|
||||
else:
|
||||
actual_connector_id = connector.id
|
||||
|
||||
logger.info(
|
||||
f"Deleting Dropbox file: path='{final_file_path}', connector={actual_connector_id}"
|
||||
)
|
||||
|
||||
client = DropboxClient(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
await client.delete_file(final_file_path)
|
||||
|
||||
logger.info(f"Dropbox file deleted: path={final_file_path}")
|
||||
|
||||
trash_result: dict[str, Any] = {
|
||||
"status": "success",
|
||||
"file_id": file_id,
|
||||
"message": f"Successfully deleted '{file_name}' from Dropbox.",
|
||||
}
|
||||
|
||||
deleted_from_kb = False
|
||||
if final_delete_from_kb and document_id:
|
||||
try:
|
||||
doc_result = await db_session.execute(
|
||||
select(Document).filter(Document.id == document_id)
|
||||
)
|
||||
doc = doc_result.scalars().first()
|
||||
if doc:
|
||||
await db_session.delete(doc)
|
||||
await db_session.commit()
|
||||
deleted_from_kb = True
|
||||
logger.info(
|
||||
f"Deleted document {document_id} from knowledge base"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Document {document_id} not found in KB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document from KB: {e}")
|
||||
await db_session.rollback()
|
||||
trash_result["warning"] = (
|
||||
f"File deleted, but failed to remove from knowledge base: {e!s}"
|
||||
)
|
||||
|
||||
trash_result["deleted_from_kb"] = deleted_from_kb
|
||||
if deleted_from_kb:
|
||||
trash_result["message"] = (
|
||||
f"{trash_result.get('message', '')} (also removed from knowledge base)"
|
||||
)
|
||||
|
||||
return trash_result
|
||||
|
||||
except Exception as e:
|
||||
from langgraph.errors import GraphInterrupt
|
||||
|
||||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
logger.error(f"Error deleting Dropbox file: {e}", exc_info=True)
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while deleting the file. Please try again.",
|
||||
}
|
||||
|
||||
return delete_dropbox_file
|
||||
|
|
@ -202,6 +202,7 @@ _ALL_CONNECTORS: list[str] = [
|
|||
"CIRCLEBACK",
|
||||
"OBSIDIAN_CONNECTOR",
|
||||
"ONEDRIVE_FILE",
|
||||
"DROPBOX_FILE",
|
||||
]
|
||||
|
||||
# Human-readable descriptions for each connector type
|
||||
|
|
@ -232,6 +233,7 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = {
|
|||
"CIRCLEBACK": "Circleback meeting notes, transcripts, and action items",
|
||||
"OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)",
|
||||
"ONEDRIVE_FILE": "Microsoft OneDrive files and documents (personal cloud storage)",
|
||||
"DROPBOX_FILE": "Dropbox files and documents (cloud storage)",
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -360,6 +362,7 @@ _INTERNAL_METADATA_KEYS: frozenset[str] = frozenset(
|
|||
"calendar_id",
|
||||
"google_drive_file_id",
|
||||
"onedrive_file_id",
|
||||
"dropbox_file_id",
|
||||
"page_id",
|
||||
"issue_id",
|
||||
"connector_id",
|
||||
|
|
|
|||
|
|
@ -50,6 +50,10 @@ from .confluence import (
|
|||
create_delete_confluence_page_tool,
|
||||
create_update_confluence_page_tool,
|
||||
)
|
||||
from .dropbox import (
|
||||
create_create_dropbox_file_tool,
|
||||
create_delete_dropbox_file_tool,
|
||||
)
|
||||
from .generate_image import create_generate_image_tool
|
||||
from .gmail import (
|
||||
create_create_gmail_draft_tool,
|
||||
|
|
@ -340,6 +344,30 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
|
|||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
# =========================================================================
|
||||
# DROPBOX TOOLS - create and trash files
|
||||
# Auto-disabled when no Dropbox connector is configured (see chat_deepagent.py)
|
||||
# =========================================================================
|
||||
ToolDefinition(
|
||||
name="create_dropbox_file",
|
||||
description="Create a new file in Dropbox",
|
||||
factory=lambda deps: create_create_dropbox_file_tool(
|
||||
db_session=deps["db_session"],
|
||||
search_space_id=deps["search_space_id"],
|
||||
user_id=deps["user_id"],
|
||||
),
|
||||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
ToolDefinition(
|
||||
name="delete_dropbox_file",
|
||||
description="Delete a file from Dropbox",
|
||||
factory=lambda deps: create_delete_dropbox_file_tool(
|
||||
db_session=deps["db_session"],
|
||||
search_space_id=deps["search_space_id"],
|
||||
user_id=deps["user_id"],
|
||||
),
|
||||
requires=["db_session", "search_space_id", "user_id"],
|
||||
),
|
||||
# =========================================================================
|
||||
# ONEDRIVE TOOLS - create and trash files
|
||||
# Auto-disabled when no OneDrive connector is configured (see chat_deepagent.py)
|
||||
# =========================================================================
|
||||
|
|
|
|||
|
|
@ -292,6 +292,11 @@ class Config:
|
|||
CLICKUP_CLIENT_SECRET = os.getenv("CLICKUP_CLIENT_SECRET")
|
||||
CLICKUP_REDIRECT_URI = os.getenv("CLICKUP_REDIRECT_URI")
|
||||
|
||||
# Dropbox OAuth
|
||||
DROPBOX_APP_KEY = os.getenv("DROPBOX_APP_KEY")
|
||||
DROPBOX_APP_SECRET = os.getenv("DROPBOX_APP_SECRET")
|
||||
DROPBOX_REDIRECT_URI = os.getenv("DROPBOX_REDIRECT_URI")
|
||||
|
||||
# Composio Configuration (for managed OAuth integrations)
|
||||
# Get your API key from https://app.composio.dev
|
||||
COMPOSIO_API_KEY = os.getenv("COMPOSIO_API_KEY")
|
||||
|
|
|
|||
13
surfsense_backend/app/connectors/dropbox/__init__.py
Normal file
13
surfsense_backend/app/connectors/dropbox/__init__.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
"""Dropbox Connector Module."""
|
||||
|
||||
from .client import DropboxClient
|
||||
from .content_extractor import download_and_extract_content
|
||||
from .folder_manager import get_file_by_path, get_files_in_folder, list_folder_contents
|
||||
|
||||
__all__ = [
|
||||
"DropboxClient",
|
||||
"download_and_extract_content",
|
||||
"get_file_by_path",
|
||||
"get_files_in_folder",
|
||||
"list_folder_contents",
|
||||
]
|
||||
335
surfsense_backend/app/connectors/dropbox/client.py
Normal file
335
surfsense_backend/app/connectors/dropbox/client.py
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
"""Dropbox API client using Dropbox HTTP API v2."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.config import config
|
||||
from app.db import SearchSourceConnector
|
||||
from app.utils.oauth_security import TokenEncryption
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
API_BASE = "https://api.dropboxapi.com"
|
||||
CONTENT_BASE = "https://content.dropboxapi.com"
|
||||
TOKEN_URL = "https://api.dropboxapi.com/oauth2/token"
|
||||
|
||||
|
||||
class DropboxClient:
|
||||
"""Client for Dropbox via the HTTP API v2."""
|
||||
|
||||
def __init__(self, session: AsyncSession, connector_id: int):
|
||||
self._session = session
|
||||
self._connector_id = connector_id
|
||||
|
||||
async def _get_valid_token(self) -> str:
|
||||
result = await self._session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == self._connector_id
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
raise ValueError(f"Connector {self._connector_id} not found")
|
||||
|
||||
cfg = connector.config or {}
|
||||
is_encrypted = cfg.get("_token_encrypted", False)
|
||||
token_encryption = (
|
||||
TokenEncryption(config.SECRET_KEY) if config.SECRET_KEY else None
|
||||
)
|
||||
|
||||
access_token = cfg.get("access_token", "")
|
||||
refresh_token = cfg.get("refresh_token")
|
||||
|
||||
if is_encrypted and token_encryption:
|
||||
if access_token:
|
||||
access_token = token_encryption.decrypt_token(access_token)
|
||||
if refresh_token:
|
||||
refresh_token = token_encryption.decrypt_token(refresh_token)
|
||||
|
||||
expires_at_str = cfg.get("expires_at")
|
||||
is_expired = False
|
||||
if expires_at_str:
|
||||
expires_at = datetime.fromisoformat(expires_at_str)
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=UTC)
|
||||
is_expired = expires_at <= datetime.now(UTC)
|
||||
|
||||
if not is_expired and access_token:
|
||||
return access_token
|
||||
|
||||
if not refresh_token:
|
||||
cfg["auth_expired"] = True
|
||||
connector.config = cfg
|
||||
flag_modified(connector, "config")
|
||||
await self._session.commit()
|
||||
raise ValueError("Dropbox token expired and no refresh token available")
|
||||
|
||||
token_data = await self._refresh_token(refresh_token)
|
||||
|
||||
new_access = token_data["access_token"]
|
||||
expires_in = token_data.get("expires_in")
|
||||
|
||||
new_expires_at = None
|
||||
if expires_in:
|
||||
new_expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in))
|
||||
|
||||
if token_encryption:
|
||||
cfg["access_token"] = token_encryption.encrypt_token(new_access)
|
||||
else:
|
||||
cfg["access_token"] = new_access
|
||||
|
||||
cfg["expires_at"] = new_expires_at.isoformat() if new_expires_at else None
|
||||
cfg["expires_in"] = expires_in
|
||||
cfg["_token_encrypted"] = bool(token_encryption)
|
||||
cfg.pop("auth_expired", None)
|
||||
|
||||
connector.config = cfg
|
||||
flag_modified(connector, "config")
|
||||
await self._session.commit()
|
||||
|
||||
return new_access
|
||||
|
||||
async def _refresh_token(self, refresh_token: str) -> dict:
|
||||
data = {
|
||||
"client_id": config.DROPBOX_APP_KEY,
|
||||
"client_secret": config.DROPBOX_APP_SECRET,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
TOKEN_URL,
|
||||
data=data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=30.0,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
error_detail = resp.text
|
||||
try:
|
||||
error_json = resp.json()
|
||||
error_detail = error_json.get("error_description", error_detail)
|
||||
except Exception:
|
||||
pass
|
||||
raise ValueError(f"Dropbox token refresh failed: {error_detail}")
|
||||
return resp.json()
|
||||
|
||||
async def _request(
|
||||
self, path: str, json_body: dict | None = None, **kwargs
|
||||
) -> httpx.Response:
|
||||
"""Make an authenticated RPC request to the Dropbox API."""
|
||||
token = await self._get_valid_token()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
if "headers" in kwargs:
|
||||
headers.update(kwargs.pop("headers"))
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
f"{API_BASE}{path}",
|
||||
headers=headers,
|
||||
json=json_body,
|
||||
timeout=60.0,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
if resp.status_code == 401:
|
||||
result = await self._session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == self._connector_id
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if connector:
|
||||
cfg = connector.config or {}
|
||||
cfg["auth_expired"] = True
|
||||
connector.config = cfg
|
||||
flag_modified(connector, "config")
|
||||
await self._session.commit()
|
||||
raise ValueError("Dropbox authentication expired (401)")
|
||||
|
||||
return resp
|
||||
|
||||
async def _content_request(
|
||||
self, path: str, api_arg: dict, content: bytes | None = None, **kwargs
|
||||
) -> httpx.Response:
|
||||
"""Make an authenticated content-upload/download request."""
|
||||
token = await self._get_valid_token()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Dropbox-API-Arg": json.dumps(api_arg),
|
||||
"Content-Type": "application/octet-stream",
|
||||
}
|
||||
if "headers" in kwargs:
|
||||
headers.update(kwargs.pop("headers"))
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
f"{CONTENT_BASE}{path}",
|
||||
headers=headers,
|
||||
content=content or b"",
|
||||
timeout=120.0,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
if resp.status_code == 401:
|
||||
result = await self._session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == self._connector_id
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if connector:
|
||||
cfg = connector.config or {}
|
||||
cfg["auth_expired"] = True
|
||||
connector.config = cfg
|
||||
flag_modified(connector, "config")
|
||||
await self._session.commit()
|
||||
raise ValueError("Dropbox authentication expired (401)")
|
||||
|
||||
return resp
|
||||
|
||||
async def list_folder(
|
||||
self, path: str = ""
|
||||
) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""List all items in a folder. Handles pagination via cursor."""
|
||||
all_items: list[dict[str, Any]] = []
|
||||
|
||||
resp = await self._request(
|
||||
"/2/files/list_folder",
|
||||
{"path": path, "recursive": False, "include_non_downloadable_files": True},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return [], f"Failed to list folder: {resp.status_code} - {resp.text}"
|
||||
|
||||
data = resp.json()
|
||||
all_items.extend(data.get("entries", []))
|
||||
|
||||
while data.get("has_more"):
|
||||
cursor = data["cursor"]
|
||||
resp = await self._request(
|
||||
"/2/files/list_folder/continue", {"cursor": cursor}
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return all_items, f"Pagination failed: {resp.status_code}"
|
||||
data = resp.json()
|
||||
all_items.extend(data.get("entries", []))
|
||||
|
||||
return all_items, None
|
||||
|
||||
async def get_metadata(
|
||||
self, path: str
|
||||
) -> tuple[dict[str, Any] | None, str | None]:
|
||||
resp = await self._request("/2/files/get_metadata", {"path": path})
|
||||
if resp.status_code != 200:
|
||||
return None, f"Failed to get metadata: {resp.status_code} - {resp.text}"
|
||||
return resp.json(), None
|
||||
|
||||
async def download_file(self, path: str) -> tuple[bytes | None, str | None]:
|
||||
resp = await self._content_request(
|
||||
"/2/files/download", {"path": path}
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None, f"Download failed: {resp.status_code}"
|
||||
return resp.content, None
|
||||
|
||||
async def download_file_to_disk(self, path: str, dest_path: str) -> str | None:
|
||||
"""Stream file content to disk. Returns error message on failure."""
|
||||
token = await self._get_valid_token()
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Dropbox-API-Arg": json.dumps({"path": path}),
|
||||
}
|
||||
async with (
|
||||
httpx.AsyncClient() as client,
|
||||
client.stream(
|
||||
"POST",
|
||||
f"{CONTENT_BASE}/2/files/download",
|
||||
headers=headers,
|
||||
timeout=120.0,
|
||||
) as resp,
|
||||
):
|
||||
if resp.status_code != 200:
|
||||
return f"Download failed: {resp.status_code}"
|
||||
with open(dest_path, "wb") as f:
|
||||
async for chunk in resp.aiter_bytes(chunk_size=5 * 1024 * 1024):
|
||||
f.write(chunk)
|
||||
return None
|
||||
|
||||
async def export_file(
|
||||
self,
|
||||
path: str,
|
||||
export_format: str | None = None,
|
||||
) -> tuple[bytes | None, str | None]:
|
||||
"""Export a non-downloadable file (e.g. .paper) via /2/files/export.
|
||||
|
||||
Uses the recommended new API for Paper-as-files.
|
||||
Returns (content_bytes, error_message).
|
||||
"""
|
||||
api_arg: dict[str, str] = {"path": path}
|
||||
if export_format:
|
||||
api_arg["export_format"] = export_format
|
||||
resp = await self._content_request("/2/files/export", api_arg)
|
||||
if resp.status_code != 200:
|
||||
return None, f"Export failed: {resp.status_code} - {resp.text}"
|
||||
return resp.content, None
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
path: str,
|
||||
content: bytes,
|
||||
mode: str = "add",
|
||||
autorename: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
"""Upload a file to Dropbox (up to 150MB)."""
|
||||
api_arg = {"path": path, "mode": mode, "autorename": autorename}
|
||||
resp = await self._content_request("/2/files/upload", api_arg, content)
|
||||
if resp.status_code != 200:
|
||||
raise ValueError(f"Upload failed: {resp.status_code} - {resp.text}")
|
||||
return resp.json()
|
||||
|
||||
async def create_paper_doc(
|
||||
self, path: str, markdown_content: str
|
||||
) -> dict[str, Any]:
|
||||
"""Create a Dropbox Paper document from markdown."""
|
||||
token = await self._get_valid_token()
|
||||
api_arg = {"import_format": "markdown", "path": path}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Dropbox-API-Arg": json.dumps(api_arg),
|
||||
"Content-Type": "application/octet-stream",
|
||||
}
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
f"{API_BASE}/2/files/paper/create",
|
||||
headers=headers,
|
||||
content=markdown_content.encode("utf-8"),
|
||||
timeout=60.0,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
raise ValueError(
|
||||
f"Paper doc creation failed: {resp.status_code} - {resp.text}"
|
||||
)
|
||||
return resp.json()
|
||||
|
||||
async def delete_file(self, path: str) -> dict[str, Any]:
|
||||
"""Delete a file or folder."""
|
||||
resp = await self._request("/2/files/delete_v2", {"path": path})
|
||||
if resp.status_code != 200:
|
||||
raise ValueError(f"Delete failed: {resp.status_code} - {resp.text}")
|
||||
return resp.json()
|
||||
|
||||
async def get_current_account(self) -> tuple[dict[str, Any] | None, str | None]:
|
||||
"""Get current user's account info."""
|
||||
resp = await self._request("/2/users/get_current_account", None)
|
||||
if resp.status_code != 200:
|
||||
return None, f"Failed to get account: {resp.status_code}"
|
||||
return resp.json(), None
|
||||
102
surfsense_backend/app/connectors/dropbox/content_extractor.py
Normal file
102
surfsense_backend/app/connectors/dropbox/content_extractor.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
"""Content extraction for Dropbox files.
|
||||
|
||||
Reuses the same ETL parsing logic as OneDrive/Google Drive since file parsing
|
||||
is extension-based, not provider-specific.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .client import DropboxClient
|
||||
from .file_types import get_extension_from_name, is_paper_file, should_skip_file
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _export_paper_content(
|
||||
client: DropboxClient,
|
||||
file: dict[str, Any],
|
||||
metadata: dict[str, Any],
|
||||
) -> tuple[str | None, dict[str, Any], str | None]:
|
||||
"""Export a Dropbox Paper doc as markdown via ``/2/files/export``."""
|
||||
file_path_lower = file.get("path_lower", "")
|
||||
file_name = file.get("name", "Unknown")
|
||||
|
||||
logger.info(f"Exporting Paper doc as markdown: {file_name}")
|
||||
|
||||
content_bytes, error = await client.export_file(
|
||||
file_path_lower, export_format="markdown"
|
||||
)
|
||||
if error:
|
||||
return None, metadata, error
|
||||
if not content_bytes:
|
||||
return None, metadata, "Export returned empty content"
|
||||
|
||||
markdown = content_bytes.decode("utf-8", errors="replace")
|
||||
metadata["exported_as"] = "markdown"
|
||||
metadata["original_type"] = "paper"
|
||||
return markdown, metadata, None
|
||||
|
||||
|
||||
async def download_and_extract_content(
|
||||
client: DropboxClient,
|
||||
file: dict[str, Any],
|
||||
) -> tuple[str | None, dict[str, Any], str | None]:
|
||||
"""Download a Dropbox file and extract its content as markdown.
|
||||
|
||||
Returns (markdown_content, dropbox_metadata, error_message).
|
||||
"""
|
||||
file_path_lower = file.get("path_lower", "")
|
||||
file_name = file.get("name", "Unknown")
|
||||
file_id = file.get("id", "")
|
||||
|
||||
if should_skip_file(file):
|
||||
return None, {}, "Skipping non-indexable item"
|
||||
|
||||
logger.info(f"Downloading file for content extraction: {file_name}")
|
||||
|
||||
metadata: dict[str, Any] = {
|
||||
"dropbox_file_id": file_id,
|
||||
"dropbox_file_name": file_name,
|
||||
"dropbox_path": file_path_lower,
|
||||
"source_connector": "dropbox",
|
||||
}
|
||||
|
||||
if "server_modified" in file:
|
||||
metadata["modified_time"] = file["server_modified"]
|
||||
if "client_modified" in file:
|
||||
metadata["created_time"] = file["client_modified"]
|
||||
if "size" in file:
|
||||
metadata["file_size"] = file["size"]
|
||||
if "content_hash" in file:
|
||||
metadata["content_hash"] = file["content_hash"]
|
||||
|
||||
if is_paper_file(file):
|
||||
return await _export_paper_content(client, file, metadata)
|
||||
|
||||
temp_file_path = None
|
||||
try:
|
||||
extension = get_extension_from_name(file_name) or ".bin"
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp:
|
||||
temp_file_path = tmp.name
|
||||
|
||||
error = await client.download_file_to_disk(file_path_lower, temp_file_path)
|
||||
if error:
|
||||
return None, metadata, error
|
||||
|
||||
from app.connectors.onedrive.content_extractor import _parse_file_to_markdown
|
||||
|
||||
markdown = await _parse_file_to_markdown(temp_file_path, file_name)
|
||||
return markdown, metadata, None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract content from {file_name}: {e!s}")
|
||||
return None, metadata, str(e)
|
||||
finally:
|
||||
if temp_file_path and os.path.exists(temp_file_path):
|
||||
with contextlib.suppress(Exception):
|
||||
os.unlink(temp_file_path)
|
||||
58
surfsense_backend/app/connectors/dropbox/file_types.py
Normal file
58
surfsense_backend/app/connectors/dropbox/file_types.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
"""File type handlers for Dropbox."""
|
||||
|
||||
PAPER_EXTENSION = ".paper"
|
||||
|
||||
SKIP_EXTENSIONS: frozenset[str] = frozenset()
|
||||
|
||||
MIME_TO_EXTENSION: dict[str, str] = {
|
||||
"application/pdf": ".pdf",
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
|
||||
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
|
||||
"application/vnd.ms-excel": ".xls",
|
||||
"application/msword": ".doc",
|
||||
"application/vnd.ms-powerpoint": ".ppt",
|
||||
"text/plain": ".txt",
|
||||
"text/csv": ".csv",
|
||||
"text/html": ".html",
|
||||
"text/markdown": ".md",
|
||||
"application/json": ".json",
|
||||
"application/xml": ".xml",
|
||||
"image/png": ".png",
|
||||
"image/jpeg": ".jpg",
|
||||
}
|
||||
|
||||
|
||||
def get_extension_from_name(name: str) -> str:
|
||||
"""Extract extension from filename."""
|
||||
dot = name.rfind(".")
|
||||
if dot > 0:
|
||||
return name[dot:]
|
||||
return ""
|
||||
|
||||
|
||||
def is_folder(item: dict) -> bool:
|
||||
return item.get(".tag") == "folder"
|
||||
|
||||
|
||||
def is_paper_file(item: dict) -> bool:
|
||||
"""Detect Dropbox Paper docs (exported via /2/files/export, not /2/files/download)."""
|
||||
name = item.get("name", "")
|
||||
ext = get_extension_from_name(name).lower()
|
||||
return ext == PAPER_EXTENSION
|
||||
|
||||
|
||||
def should_skip_file(item: dict) -> bool:
|
||||
"""Skip folders and truly non-indexable files.
|
||||
|
||||
Paper docs are non-downloadable but exportable, so they are NOT skipped.
|
||||
"""
|
||||
if is_folder(item):
|
||||
return True
|
||||
if is_paper_file(item):
|
||||
return False
|
||||
if not item.get("is_downloadable", True):
|
||||
return True
|
||||
name = item.get("name", "")
|
||||
ext = get_extension_from_name(name).lower()
|
||||
return ext in SKIP_EXTENSIONS
|
||||
92
surfsense_backend/app/connectors/dropbox/folder_manager.py
Normal file
92
surfsense_backend/app/connectors/dropbox/folder_manager.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
"""Folder management for Dropbox."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from .client import DropboxClient
|
||||
from .file_types import is_folder, should_skip_file
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def list_folder_contents(
|
||||
client: DropboxClient,
|
||||
path: str = "",
|
||||
) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""List folders and files in a Dropbox folder.
|
||||
|
||||
Returns (items list with folders first, error message).
|
||||
"""
|
||||
try:
|
||||
items, error = await client.list_folder(path)
|
||||
if error:
|
||||
return [], error
|
||||
|
||||
for item in items:
|
||||
item["isFolder"] = is_folder(item)
|
||||
|
||||
items.sort(key=lambda x: (not x["isFolder"], x.get("name", "").lower()))
|
||||
|
||||
folder_count = sum(1 for item in items if item["isFolder"])
|
||||
file_count = len(items) - folder_count
|
||||
logger.info(
|
||||
f"Listed {len(items)} items ({folder_count} folders, {file_count} files) "
|
||||
+ (f"in folder {path}" if path else "in root")
|
||||
)
|
||||
return items, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing folder contents: {e!s}", exc_info=True)
|
||||
return [], f"Error listing folder contents: {e!s}"
|
||||
|
||||
|
||||
async def get_files_in_folder(
|
||||
client: DropboxClient,
|
||||
path: str,
|
||||
include_subfolders: bool = True,
|
||||
) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""Get all indexable files in a folder, optionally recursing into subfolders."""
|
||||
try:
|
||||
items, error = await client.list_folder(path)
|
||||
if error:
|
||||
return [], error
|
||||
|
||||
files: list[dict[str, Any]] = []
|
||||
for item in items:
|
||||
if is_folder(item):
|
||||
if include_subfolders:
|
||||
sub_files, sub_error = await get_files_in_folder(
|
||||
client, item.get("path_lower", ""), include_subfolders=True
|
||||
)
|
||||
if sub_error:
|
||||
logger.warning(
|
||||
f"Error recursing into folder {item.get('name')}: {sub_error}"
|
||||
)
|
||||
continue
|
||||
files.extend(sub_files)
|
||||
elif not should_skip_file(item):
|
||||
files.append(item)
|
||||
|
||||
return files, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting files in folder: {e!s}", exc_info=True)
|
||||
return [], f"Error getting files in folder: {e!s}"
|
||||
|
||||
|
||||
async def get_file_by_path(
|
||||
client: DropboxClient,
|
||||
path: str,
|
||||
) -> tuple[dict[str, Any] | None, str | None]:
|
||||
"""Get file metadata by path."""
|
||||
try:
|
||||
item, error = await client.get_metadata(path)
|
||||
if error:
|
||||
return None, error
|
||||
if not item:
|
||||
return None, f"File not found: {path}"
|
||||
return item, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting file by path: {e!s}", exc_info=True)
|
||||
return None, f"Error getting file by path: {e!s}"
|
||||
|
|
@ -59,6 +59,7 @@ class DocumentType(StrEnum):
|
|||
CIRCLEBACK = "CIRCLEBACK"
|
||||
OBSIDIAN_CONNECTOR = "OBSIDIAN_CONNECTOR"
|
||||
NOTE = "NOTE"
|
||||
DROPBOX_FILE = "DROPBOX_FILE"
|
||||
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
|
||||
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
|
||||
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
|
||||
|
|
@ -103,6 +104,7 @@ class SearchSourceConnectorType(StrEnum):
|
|||
"OBSIDIAN_CONNECTOR" # Self-hosted only - Local Obsidian vault indexing
|
||||
)
|
||||
MCP_CONNECTOR = "MCP_CONNECTOR" # Model Context Protocol - User-defined API tools
|
||||
DROPBOX_CONNECTOR = "DROPBOX_CONNECTOR"
|
||||
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
|
||||
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
|
||||
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ from .clickup_add_connector_route import router as clickup_add_connector_router
|
|||
from .composio_routes import router as composio_router
|
||||
from .confluence_add_connector_route import router as confluence_add_connector_router
|
||||
from .discord_add_connector_route import router as discord_add_connector_router
|
||||
from .dropbox_add_connector_route import router as dropbox_add_connector_router
|
||||
from .documents_routes import router as documents_router
|
||||
from .editor_routes import router as editor_router
|
||||
from .folders_routes import router as folders_router
|
||||
|
|
@ -80,6 +81,7 @@ router.include_router(discord_add_connector_router)
|
|||
router.include_router(jira_add_connector_router)
|
||||
router.include_router(confluence_add_connector_router)
|
||||
router.include_router(clickup_add_connector_router)
|
||||
router.include_router(dropbox_add_connector_router)
|
||||
router.include_router(new_llm_config_router) # LLM configs with prompt configuration
|
||||
router.include_router(model_list_router) # Dynamic LLM model catalogue from OpenRouter
|
||||
router.include_router(logs_router)
|
||||
|
|
|
|||
569
surfsense_backend/app/routes/dropbox_add_connector_route.py
Normal file
569
surfsense_backend/app/routes/dropbox_add_connector_route.py
Normal file
|
|
@ -0,0 +1,569 @@
|
|||
"""
|
||||
Dropbox Connector OAuth Routes.
|
||||
|
||||
Endpoints:
|
||||
- GET /auth/dropbox/connector/add - Initiate OAuth
|
||||
- GET /auth/dropbox/connector/callback - Handle OAuth callback
|
||||
- GET /auth/dropbox/connector/reauth - Re-authenticate existing connector
|
||||
- GET /connectors/{connector_id}/dropbox/folders - List folder contents
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from urllib.parse import urlencode
|
||||
from uuid import UUID
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.dropbox import DropboxClient, list_folder_contents
|
||||
from app.db import (
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
User,
|
||||
get_async_session,
|
||||
)
|
||||
from app.users import current_active_user
|
||||
from app.utils.connector_naming import (
|
||||
check_duplicate_connector,
|
||||
extract_identifier_from_credentials,
|
||||
generate_unique_connector_name,
|
||||
)
|
||||
from app.utils.oauth_security import OAuthStateManager, TokenEncryption
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
AUTHORIZATION_URL = "https://www.dropbox.com/oauth2/authorize"
|
||||
TOKEN_URL = "https://api.dropboxapi.com/oauth2/token"
|
||||
|
||||
_state_manager = None
|
||||
_token_encryption = None
|
||||
|
||||
|
||||
def get_state_manager() -> OAuthStateManager:
|
||||
global _state_manager
|
||||
if _state_manager is None:
|
||||
if not config.SECRET_KEY:
|
||||
raise ValueError("SECRET_KEY must be set for OAuth security")
|
||||
_state_manager = OAuthStateManager(config.SECRET_KEY)
|
||||
return _state_manager
|
||||
|
||||
|
||||
def get_token_encryption() -> TokenEncryption:
|
||||
global _token_encryption
|
||||
if _token_encryption is None:
|
||||
if not config.SECRET_KEY:
|
||||
raise ValueError("SECRET_KEY must be set for token encryption")
|
||||
_token_encryption = TokenEncryption(config.SECRET_KEY)
|
||||
return _token_encryption
|
||||
|
||||
|
||||
@router.get("/auth/dropbox/connector/add")
|
||||
async def connect_dropbox(space_id: int, user: User = Depends(current_active_user)):
|
||||
"""Initiate Dropbox OAuth flow."""
|
||||
try:
|
||||
if not space_id:
|
||||
raise HTTPException(status_code=400, detail="space_id is required")
|
||||
if not config.DROPBOX_APP_KEY:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Dropbox OAuth not configured."
|
||||
)
|
||||
if not config.SECRET_KEY:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="SECRET_KEY not configured for OAuth security."
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
state_encoded = state_manager.generate_secure_state(space_id, user.id)
|
||||
|
||||
auth_params = {
|
||||
"client_id": config.DROPBOX_APP_KEY,
|
||||
"response_type": "code",
|
||||
"redirect_uri": config.DROPBOX_REDIRECT_URI,
|
||||
"state": state_encoded,
|
||||
"token_access_type": "offline",
|
||||
}
|
||||
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
|
||||
|
||||
logger.info(
|
||||
"Generated Dropbox OAuth URL for user %s, space %s", user.id, space_id
|
||||
)
|
||||
return {"auth_url": auth_url}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Failed to initiate Dropbox OAuth: %s", str(e), exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to initiate Dropbox OAuth: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/auth/dropbox/connector/reauth")
|
||||
async def reauth_dropbox(
|
||||
space_id: int,
|
||||
connector_id: int,
|
||||
return_url: str | None = None,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""Re-authenticate an existing Dropbox connector."""
|
||||
try:
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.search_space_id == space_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
raise HTTPException(
|
||||
status_code=404, detail="Dropbox connector not found or access denied"
|
||||
)
|
||||
|
||||
if not config.SECRET_KEY:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="SECRET_KEY not configured for OAuth security."
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
extra: dict = {"connector_id": connector_id}
|
||||
if return_url and return_url.startswith("/"):
|
||||
extra["return_url"] = return_url
|
||||
state_encoded = state_manager.generate_secure_state(space_id, user.id, **extra)
|
||||
|
||||
auth_params = {
|
||||
"client_id": config.DROPBOX_APP_KEY,
|
||||
"response_type": "code",
|
||||
"redirect_uri": config.DROPBOX_REDIRECT_URI,
|
||||
"state": state_encoded,
|
||||
"token_access_type": "offline",
|
||||
"force_reapprove": "true",
|
||||
}
|
||||
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
|
||||
|
||||
logger.info(
|
||||
"Initiating Dropbox re-auth for user %s, connector %s",
|
||||
user.id,
|
||||
connector_id,
|
||||
)
|
||||
return {"auth_url": auth_url}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Failed to initiate Dropbox re-auth: %s", str(e), exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to initiate Dropbox re-auth: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/auth/dropbox/connector/callback")
|
||||
async def dropbox_callback(
|
||||
code: str | None = None,
|
||||
error: str | None = None,
|
||||
error_description: str | None = None,
|
||||
state: str | None = None,
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""Handle Dropbox OAuth callback."""
|
||||
try:
|
||||
if error:
|
||||
error_msg = error_description or error
|
||||
logger.warning("Dropbox OAuth error: %s", error_msg)
|
||||
space_id = None
|
||||
if state:
|
||||
try:
|
||||
data = get_state_manager().validate_state(state)
|
||||
space_id = data.get("space_id")
|
||||
except Exception:
|
||||
pass
|
||||
if space_id:
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=dropbox_oauth_denied"
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_oauth_denied"
|
||||
)
|
||||
|
||||
if not code or not state:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Missing required OAuth parameters"
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
try:
|
||||
data = state_manager.validate_state(state)
|
||||
space_id = data["space_id"]
|
||||
user_id = UUID(data["user_id"])
|
||||
except (HTTPException, ValueError, KeyError) as e:
|
||||
logger.error("Invalid OAuth state: %s", str(e))
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state"
|
||||
)
|
||||
|
||||
reauth_connector_id = data.get("connector_id")
|
||||
reauth_return_url = data.get("return_url")
|
||||
|
||||
token_data = {
|
||||
"client_id": config.DROPBOX_APP_KEY,
|
||||
"client_secret": config.DROPBOX_APP_SECRET,
|
||||
"code": code,
|
||||
"redirect_uri": config.DROPBOX_REDIRECT_URI,
|
||||
"grant_type": "authorization_code",
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
token_response = await client.post(
|
||||
TOKEN_URL,
|
||||
data=token_data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
if token_response.status_code != 200:
|
||||
error_detail = token_response.text
|
||||
try:
|
||||
error_json = token_response.json()
|
||||
error_detail = error_json.get("error_description", error_detail)
|
||||
except Exception:
|
||||
pass
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Token exchange failed: {error_detail}"
|
||||
)
|
||||
|
||||
token_json = token_response.json()
|
||||
access_token = token_json.get("access_token")
|
||||
refresh_token = token_json.get("refresh_token")
|
||||
|
||||
if not access_token:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No access token received from Dropbox"
|
||||
)
|
||||
|
||||
token_encryption = get_token_encryption()
|
||||
|
||||
expires_at = None
|
||||
if token_json.get("expires_in"):
|
||||
expires_at = datetime.now(UTC) + timedelta(
|
||||
seconds=int(token_json["expires_in"])
|
||||
)
|
||||
|
||||
user_info: dict = {}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
user_response = await client.post(
|
||||
"https://api.dropboxapi.com/2/users/get_current_account",
|
||||
headers={
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
content=b"null",
|
||||
timeout=30.0,
|
||||
)
|
||||
if user_response.status_code == 200:
|
||||
user_data = user_response.json()
|
||||
user_info = {
|
||||
"user_email": user_data.get("email"),
|
||||
"user_name": user_data.get("name", {}).get("display_name"),
|
||||
"account_id": user_data.get("account_id"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to fetch user info from Dropbox: %s", str(e))
|
||||
|
||||
connector_config = {
|
||||
"access_token": token_encryption.encrypt_token(access_token),
|
||||
"refresh_token": token_encryption.encrypt_token(refresh_token)
|
||||
if refresh_token
|
||||
else None,
|
||||
"token_type": token_json.get("token_type", "bearer"),
|
||||
"expires_in": token_json.get("expires_in"),
|
||||
"expires_at": expires_at.isoformat() if expires_at else None,
|
||||
"user_email": user_info.get("user_email"),
|
||||
"user_name": user_info.get("user_name"),
|
||||
"account_id": user_info.get("account_id"),
|
||||
"_token_encrypted": True,
|
||||
}
|
||||
|
||||
if reauth_connector_id:
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == reauth_connector_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.search_space_id == space_id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
db_connector = result.scalars().first()
|
||||
if not db_connector:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Connector not found or access denied during re-auth",
|
||||
)
|
||||
|
||||
existing_cursor = db_connector.config.get("cursor")
|
||||
db_connector.config = {
|
||||
**connector_config,
|
||||
"cursor": existing_cursor,
|
||||
"auth_expired": False,
|
||||
}
|
||||
flag_modified(db_connector, "config")
|
||||
await session.commit()
|
||||
await session.refresh(db_connector)
|
||||
|
||||
logger.info(
|
||||
"Re-authenticated Dropbox connector %s for user %s",
|
||||
db_connector.id,
|
||||
user_id,
|
||||
)
|
||||
if reauth_return_url and reauth_return_url.startswith("/"):
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}"
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={db_connector.id}"
|
||||
)
|
||||
|
||||
connector_identifier = extract_identifier_from_credentials(
|
||||
SearchSourceConnectorType.DROPBOX_CONNECTOR, connector_config
|
||||
)
|
||||
is_duplicate = await check_duplicate_connector(
|
||||
session,
|
||||
SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
space_id,
|
||||
user_id,
|
||||
connector_identifier,
|
||||
)
|
||||
if is_duplicate:
|
||||
logger.warning(
|
||||
"Duplicate Dropbox connector for user %s, space %s", user_id, space_id
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=DROPBOX_CONNECTOR"
|
||||
)
|
||||
|
||||
connector_name = await generate_unique_connector_name(
|
||||
session,
|
||||
SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
space_id,
|
||||
user_id,
|
||||
connector_identifier,
|
||||
)
|
||||
|
||||
new_connector = SearchSourceConnector(
|
||||
name=connector_name,
|
||||
connector_type=SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
is_indexable=True,
|
||||
config=connector_config,
|
||||
search_space_id=space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
try:
|
||||
session.add(new_connector)
|
||||
await session.commit()
|
||||
await session.refresh(new_connector)
|
||||
logger.info(
|
||||
"Successfully created Dropbox connector %s for user %s",
|
||||
new_connector.id,
|
||||
user_id,
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={new_connector.id}"
|
||||
)
|
||||
except IntegrityError as e:
|
||||
await session.rollback()
|
||||
logger.error(
|
||||
"Database integrity error creating Dropbox connector: %s", str(e)
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed"
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except (IntegrityError, ValueError) as e:
|
||||
logger.error("Dropbox OAuth callback error: %s", str(e), exc_info=True)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_auth_error"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/connectors/{connector_id}/dropbox/folders")
|
||||
async def list_dropbox_folders(
|
||||
connector_id: int,
|
||||
parent_path: str = "",
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
user: User = Depends(current_active_user),
|
||||
):
|
||||
"""List folders and files in user's Dropbox."""
|
||||
connector = None
|
||||
try:
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
raise HTTPException(
|
||||
status_code=404, detail="Dropbox connector not found or access denied"
|
||||
)
|
||||
|
||||
dropbox_client = DropboxClient(session, connector_id)
|
||||
items, error = await list_folder_contents(dropbox_client, path=parent_path)
|
||||
|
||||
if error:
|
||||
error_lower = error.lower()
|
||||
if (
|
||||
"401" in error
|
||||
or "authentication expired" in error_lower
|
||||
or "expired_access_token" in error_lower
|
||||
):
|
||||
try:
|
||||
if connector and not connector.config.get("auth_expired"):
|
||||
connector.config = {**connector.config, "auth_expired": True}
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
connector_id,
|
||||
exc_info=True,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Dropbox authentication expired. Please re-authenticate.",
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to list folder contents: {error}"
|
||||
)
|
||||
|
||||
return {"items": items}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Error listing Dropbox contents: %s", str(e), exc_info=True)
|
||||
error_lower = str(e).lower()
|
||||
if "401" in str(e) or "authentication expired" in error_lower:
|
||||
try:
|
||||
if connector and not connector.config.get("auth_expired"):
|
||||
connector.config = {**connector.config, "auth_expired": True}
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
except Exception:
|
||||
pass
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Dropbox authentication expired. Please re-authenticate.",
|
||||
) from e
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to list Dropbox contents: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
async def refresh_dropbox_token(
|
||||
session: AsyncSession, connector: SearchSourceConnector
|
||||
) -> SearchSourceConnector:
|
||||
"""Refresh Dropbox OAuth tokens."""
|
||||
logger.info("Refreshing Dropbox OAuth tokens for connector %s", connector.id)
|
||||
|
||||
token_encryption = get_token_encryption()
|
||||
is_encrypted = connector.config.get("_token_encrypted", False)
|
||||
refresh_token = connector.config.get("refresh_token")
|
||||
|
||||
if is_encrypted and refresh_token:
|
||||
try:
|
||||
refresh_token = token_encryption.decrypt_token(refresh_token)
|
||||
except Exception as e:
|
||||
logger.error("Failed to decrypt refresh token: %s", str(e))
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Failed to decrypt stored refresh token"
|
||||
) from e
|
||||
|
||||
if not refresh_token:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"No refresh token available for connector {connector.id}",
|
||||
)
|
||||
|
||||
refresh_data = {
|
||||
"client_id": config.DROPBOX_APP_KEY,
|
||||
"client_secret": config.DROPBOX_APP_SECRET,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
token_response = await client.post(
|
||||
TOKEN_URL,
|
||||
data=refresh_data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
if token_response.status_code != 200:
|
||||
error_detail = token_response.text
|
||||
error_code = ""
|
||||
try:
|
||||
error_json = token_response.json()
|
||||
error_detail = error_json.get("error_description", error_detail)
|
||||
error_code = error_json.get("error", "")
|
||||
except Exception:
|
||||
pass
|
||||
error_lower = (error_detail + error_code).lower()
|
||||
if (
|
||||
"invalid_grant" in error_lower
|
||||
or "expired" in error_lower
|
||||
or "revoked" in error_lower
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Dropbox authentication failed. Please re-authenticate.",
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Token refresh failed: {error_detail}"
|
||||
)
|
||||
|
||||
token_json = token_response.json()
|
||||
access_token = token_json.get("access_token")
|
||||
|
||||
if not access_token:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No access token received from Dropbox refresh"
|
||||
)
|
||||
|
||||
expires_at = None
|
||||
expires_in = token_json.get("expires_in")
|
||||
if expires_in:
|
||||
expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in))
|
||||
|
||||
cfg = dict(connector.config)
|
||||
cfg["access_token"] = token_encryption.encrypt_token(access_token)
|
||||
cfg["expires_in"] = expires_in
|
||||
cfg["expires_at"] = expires_at.isoformat() if expires_at else None
|
||||
cfg["_token_encrypted"] = True
|
||||
cfg.pop("auth_expired", None)
|
||||
|
||||
connector.config = cfg
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
await session.refresh(connector)
|
||||
|
||||
logger.info("Successfully refreshed Dropbox tokens for connector %s", connector.id)
|
||||
return connector
|
||||
|
|
@ -1046,6 +1046,53 @@ async def index_connector_content(
|
|||
)
|
||||
response_message = "OneDrive indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.DROPBOX_CONNECTOR:
|
||||
from app.tasks.celery_tasks.connector_tasks import (
|
||||
index_dropbox_files_task,
|
||||
)
|
||||
|
||||
if drive_items and drive_items.has_items():
|
||||
logger.info(
|
||||
f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id}, "
|
||||
f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}"
|
||||
)
|
||||
items_dict = drive_items.model_dump()
|
||||
else:
|
||||
config = connector.config or {}
|
||||
selected_folders = config.get("selected_folders", [])
|
||||
selected_files = config.get("selected_files", [])
|
||||
if not selected_folders and not selected_files:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Dropbox indexing requires folders or files to be configured. "
|
||||
"Please select folders/files to index.",
|
||||
)
|
||||
indexing_options = config.get(
|
||||
"indexing_options",
|
||||
{
|
||||
"max_files_per_folder": 100,
|
||||
"incremental_sync": True,
|
||||
"include_subfolders": True,
|
||||
},
|
||||
)
|
||||
items_dict = {
|
||||
"folders": selected_folders,
|
||||
"files": selected_files,
|
||||
"indexing_options": indexing_options,
|
||||
}
|
||||
logger.info(
|
||||
f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id} "
|
||||
f"using existing config"
|
||||
)
|
||||
|
||||
index_dropbox_files_task.delay(
|
||||
connector_id,
|
||||
search_space_id,
|
||||
str(user.id),
|
||||
items_dict,
|
||||
)
|
||||
response_message = "Dropbox indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
|
||||
from app.tasks.celery_tasks.connector_tasks import (
|
||||
index_discord_messages_task,
|
||||
|
|
@ -2644,6 +2691,114 @@ async def run_onedrive_indexing(
|
|||
logger.error(f"Failed to update notification: {notif_error!s}")
|
||||
|
||||
|
||||
async def run_dropbox_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
items_dict: dict,
|
||||
):
|
||||
"""Runs the Dropbox indexing task for folders and files with notifications."""
|
||||
from uuid import UUID
|
||||
|
||||
notification = None
|
||||
try:
|
||||
from app.tasks.connector_indexers.dropbox_indexer import index_dropbox_files
|
||||
|
||||
connector_result = await session.execute(
|
||||
select(SearchSourceConnector).where(
|
||||
SearchSourceConnector.id == connector_id
|
||||
)
|
||||
)
|
||||
connector = connector_result.scalar_one_or_none()
|
||||
|
||||
if connector:
|
||||
notification = await NotificationService.connector_indexing.notify_google_drive_indexing_started(
|
||||
session=session,
|
||||
user_id=UUID(user_id),
|
||||
connector_id=connector_id,
|
||||
connector_name=connector.name,
|
||||
connector_type=connector.connector_type.value,
|
||||
search_space_id=search_space_id,
|
||||
folder_count=len(items_dict.get("folders", [])),
|
||||
file_count=len(items_dict.get("files", [])),
|
||||
folder_names=[
|
||||
f.get("name", "Unknown") for f in items_dict.get("folders", [])
|
||||
],
|
||||
file_names=[
|
||||
f.get("name", "Unknown") for f in items_dict.get("files", [])
|
||||
],
|
||||
)
|
||||
|
||||
if notification:
|
||||
await NotificationService.connector_indexing.notify_indexing_progress(
|
||||
session=session,
|
||||
notification=notification,
|
||||
indexed_count=0,
|
||||
stage="fetching",
|
||||
)
|
||||
|
||||
total_indexed, total_skipped, error_message = await index_dropbox_files(
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
items_dict,
|
||||
)
|
||||
|
||||
if error_message:
|
||||
logger.error(
|
||||
f"Dropbox indexing completed with errors for connector {connector_id}: {error_message}"
|
||||
)
|
||||
if _is_auth_error(error_message):
|
||||
await _persist_auth_expired(session, connector_id)
|
||||
error_message = (
|
||||
"Dropbox authentication expired. Please re-authenticate."
|
||||
)
|
||||
else:
|
||||
if notification:
|
||||
await session.refresh(notification)
|
||||
await NotificationService.connector_indexing.notify_indexing_progress(
|
||||
session=session,
|
||||
notification=notification,
|
||||
indexed_count=total_indexed,
|
||||
stage="storing",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Dropbox indexing successful for connector {connector_id}. Indexed {total_indexed} documents."
|
||||
)
|
||||
await _update_connector_timestamp_by_id(session, connector_id)
|
||||
await session.commit()
|
||||
|
||||
if notification:
|
||||
await session.refresh(notification)
|
||||
await NotificationService.connector_indexing.notify_indexing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
indexed_count=total_indexed,
|
||||
error_message=error_message,
|
||||
skipped_count=total_skipped,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Critical error in run_dropbox_indexing for connector {connector_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
if notification:
|
||||
try:
|
||||
await session.refresh(notification)
|
||||
await NotificationService.connector_indexing.notify_indexing_completed(
|
||||
session=session,
|
||||
notification=notification,
|
||||
indexed_count=0,
|
||||
error_message=str(e),
|
||||
)
|
||||
except Exception as notif_error:
|
||||
logger.error(f"Failed to update notification: {notif_error!s}")
|
||||
|
||||
|
||||
# Add new helper functions for luma indexing
|
||||
async def run_luma_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
|
|
|
|||
5
surfsense_backend/app/services/dropbox/__init__.py
Normal file
5
surfsense_backend/app/services/dropbox/__init__.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
from app.services.dropbox.kb_sync_service import DropboxKBSyncService
|
||||
|
||||
__all__ = [
|
||||
"DropboxKBSyncService",
|
||||
]
|
||||
159
surfsense_backend/app/services/dropbox/kb_sync_service.py
Normal file
159
surfsense_backend/app/services/dropbox/kb_sync_service.py
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.db import Document, DocumentType
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
embed_text,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DropboxKBSyncService:
|
||||
def __init__(self, db_session: AsyncSession):
|
||||
self.db_session = db_session
|
||||
|
||||
async def sync_after_create(
|
||||
self,
|
||||
file_id: str,
|
||||
file_name: str,
|
||||
file_path: str,
|
||||
web_url: str | None,
|
||||
content: str | None,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
) -> dict:
|
||||
from app.tasks.connector_indexers.base import (
|
||||
check_document_by_unique_identifier,
|
||||
check_duplicate_document_by_hash,
|
||||
get_current_timestamp,
|
||||
safe_set_chunks,
|
||||
)
|
||||
|
||||
try:
|
||||
unique_hash = compute_identifier_hash(
|
||||
DocumentType.DROPBOX_FILE.value, file_id, search_space_id
|
||||
)
|
||||
|
||||
existing = await check_document_by_unique_identifier(
|
||||
self.db_session, unique_hash
|
||||
)
|
||||
if existing:
|
||||
logger.info(
|
||||
"Document for Dropbox file %s already exists (doc_id=%s), skipping",
|
||||
file_id,
|
||||
existing.id,
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
indexable_content = (content or "").strip()
|
||||
if not indexable_content:
|
||||
indexable_content = f"Dropbox file: {file_name}"
|
||||
|
||||
content_hash = generate_content_hash(indexable_content, search_space_id)
|
||||
|
||||
with self.db_session.no_autoflush:
|
||||
dup = await check_duplicate_document_by_hash(
|
||||
self.db_session, content_hash
|
||||
)
|
||||
if dup:
|
||||
logger.info(
|
||||
"Content-hash collision for Dropbox file %s — identical content "
|
||||
"exists in doc %s. Using unique_identifier_hash as content_hash.",
|
||||
file_id,
|
||||
dup.id,
|
||||
)
|
||||
content_hash = unique_hash
|
||||
|
||||
user_llm = await get_user_long_context_llm(
|
||||
self.db_session,
|
||||
user_id,
|
||||
search_space_id,
|
||||
disable_streaming=True,
|
||||
)
|
||||
|
||||
doc_metadata_for_summary = {
|
||||
"file_name": file_name,
|
||||
"document_type": "Dropbox File",
|
||||
"connector_type": "Dropbox",
|
||||
}
|
||||
|
||||
if user_llm:
|
||||
summary_content, summary_embedding = await generate_document_summary(
|
||||
indexable_content, user_llm, doc_metadata_for_summary
|
||||
)
|
||||
else:
|
||||
logger.warning("No LLM configured — using fallback summary")
|
||||
summary_content = f"Dropbox File: {file_name}\n\n{indexable_content}"
|
||||
summary_embedding = embed_text(summary_content)
|
||||
|
||||
chunks = await create_document_chunks(indexable_content)
|
||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
document = Document(
|
||||
title=file_name,
|
||||
document_type=DocumentType.DROPBOX_FILE,
|
||||
document_metadata={
|
||||
"dropbox_file_id": file_id,
|
||||
"dropbox_file_name": file_name,
|
||||
"dropbox_path": file_path,
|
||||
"web_url": web_url,
|
||||
"source_connector": "dropbox",
|
||||
"indexed_at": now_str,
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
unique_identifier_hash=unique_hash,
|
||||
embedding=summary_embedding,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
source_markdown=content,
|
||||
updated_at=get_current_timestamp(),
|
||||
created_by_id=user_id,
|
||||
)
|
||||
|
||||
self.db_session.add(document)
|
||||
await self.db_session.flush()
|
||||
await safe_set_chunks(self.db_session, document, chunks)
|
||||
await self.db_session.commit()
|
||||
|
||||
logger.info(
|
||||
"KB sync after create succeeded: doc_id=%s, file=%s, chunks=%d",
|
||||
document.id,
|
||||
file_name,
|
||||
len(chunks),
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if (
|
||||
"duplicate key value violates unique constraint" in error_str
|
||||
or "uniqueviolationerror" in error_str
|
||||
):
|
||||
logger.warning(
|
||||
"Duplicate constraint hit during KB sync for file %s. "
|
||||
"Rolling back — periodic indexer will handle it. Error: %s",
|
||||
file_id,
|
||||
e,
|
||||
)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": "Duplicate document detected"}
|
||||
|
||||
logger.error(
|
||||
"KB sync after create failed for file %s: %s",
|
||||
file_id,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
await self.db_session.rollback()
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
|
@ -574,6 +574,54 @@ async def _index_onedrive_files(
|
|||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_dropbox_files", bind=True)
|
||||
def index_dropbox_files_task(
|
||||
self,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
items_dict: dict,
|
||||
):
|
||||
"""Celery task to index Dropbox folders and files."""
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
_index_dropbox_files(
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
items_dict,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
async def _index_dropbox_files(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
items_dict: dict,
|
||||
):
|
||||
"""Index Dropbox folders and files with new session."""
|
||||
from app.routes.search_source_connectors_routes import (
|
||||
run_dropbox_indexing,
|
||||
)
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
await run_dropbox_indexing(
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
items_dict,
|
||||
)
|
||||
|
||||
|
||||
@celery_app.task(name="index_discord_messages", bind=True)
|
||||
def index_discord_messages_task(
|
||||
self,
|
||||
|
|
|
|||
|
|
@ -1025,6 +1025,8 @@ async def _stream_agent_events(
|
|||
"delete_google_drive_file",
|
||||
"create_onedrive_file",
|
||||
"delete_onedrive_file",
|
||||
"create_dropbox_file",
|
||||
"delete_dropbox_file",
|
||||
"create_gmail_draft",
|
||||
"update_gmail_draft",
|
||||
"send_gmail_email",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,525 @@
|
|||
"""Dropbox indexer using the shared IndexingPipelineService.
|
||||
|
||||
File-level pre-filter (_should_skip_file) handles content_hash and
|
||||
server_modified checks. download_and_extract_content() returns
|
||||
markdown which is fed into ConnectorDocument -> pipeline.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
from sqlalchemy import String, cast, select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm.attributes import flag_modified
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.dropbox import (
|
||||
DropboxClient,
|
||||
download_and_extract_content,
|
||||
get_file_by_path,
|
||||
get_files_in_folder,
|
||||
)
|
||||
from app.connectors.dropbox.file_types import should_skip_file as skip_item
|
||||
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.tasks.connector_indexers.base import (
|
||||
check_document_by_unique_identifier,
|
||||
get_connector_by_id,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
|
||||
HEARTBEAT_INTERVAL_SECONDS = 30
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _should_skip_file(
|
||||
session: AsyncSession,
|
||||
file: dict,
|
||||
search_space_id: int,
|
||||
) -> tuple[bool, str | None]:
|
||||
"""Pre-filter: detect unchanged / rename-only files."""
|
||||
file_id = file.get("id", "")
|
||||
file_name = file.get("name", "Unknown")
|
||||
|
||||
if skip_item(file):
|
||||
return True, "folder/non-downloadable"
|
||||
if not file_id:
|
||||
return True, "missing file_id"
|
||||
|
||||
primary_hash = compute_identifier_hash(
|
||||
DocumentType.DROPBOX_FILE.value, file_id, search_space_id
|
||||
)
|
||||
existing = await check_document_by_unique_identifier(session, primary_hash)
|
||||
|
||||
if not existing:
|
||||
result = await session.execute(
|
||||
select(Document).where(
|
||||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.DROPBOX_FILE,
|
||||
cast(Document.document_metadata["dropbox_file_id"], String) == file_id,
|
||||
)
|
||||
)
|
||||
existing = result.scalar_one_or_none()
|
||||
if existing:
|
||||
existing.unique_identifier_hash = primary_hash
|
||||
logger.debug(f"Found Dropbox doc by metadata for file_id: {file_id}")
|
||||
|
||||
if not existing:
|
||||
return False, None
|
||||
|
||||
incoming_content_hash = file.get("content_hash")
|
||||
meta = existing.document_metadata or {}
|
||||
stored_content_hash = meta.get("content_hash")
|
||||
|
||||
incoming_mtime = file.get("server_modified")
|
||||
stored_mtime = meta.get("modified_time")
|
||||
|
||||
content_unchanged = False
|
||||
if incoming_content_hash and stored_content_hash:
|
||||
content_unchanged = incoming_content_hash == stored_content_hash
|
||||
elif incoming_content_hash and not stored_content_hash:
|
||||
return False, None
|
||||
elif not incoming_content_hash and incoming_mtime and stored_mtime:
|
||||
content_unchanged = incoming_mtime == stored_mtime
|
||||
elif not incoming_content_hash:
|
||||
return False, None
|
||||
|
||||
if not content_unchanged:
|
||||
return False, None
|
||||
|
||||
old_name = meta.get("dropbox_file_name")
|
||||
if old_name and old_name != file_name:
|
||||
existing.title = file_name
|
||||
if not existing.document_metadata:
|
||||
existing.document_metadata = {}
|
||||
existing.document_metadata["dropbox_file_name"] = file_name
|
||||
if incoming_mtime:
|
||||
existing.document_metadata["modified_time"] = incoming_mtime
|
||||
flag_modified(existing, "document_metadata")
|
||||
await session.commit()
|
||||
logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'")
|
||||
return True, f"File renamed: '{old_name}' -> '{file_name}'"
|
||||
|
||||
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
|
||||
return True, "skipped (previously failed)"
|
||||
return True, "unchanged"
|
||||
|
||||
|
||||
def _build_connector_doc(
|
||||
file: dict,
|
||||
markdown: str,
|
||||
dropbox_metadata: dict,
|
||||
*,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
enable_summary: bool,
|
||||
) -> ConnectorDocument:
|
||||
file_id = file.get("id", "")
|
||||
file_name = file.get("name", "Unknown")
|
||||
|
||||
metadata = {
|
||||
**dropbox_metadata,
|
||||
"connector_id": connector_id,
|
||||
"document_type": "Dropbox File",
|
||||
"connector_type": "Dropbox",
|
||||
}
|
||||
|
||||
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
|
||||
|
||||
return ConnectorDocument(
|
||||
title=file_name,
|
||||
source_markdown=markdown,
|
||||
unique_id=file_id,
|
||||
document_type=DocumentType.DROPBOX_FILE,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
created_by_id=user_id,
|
||||
should_summarize=enable_summary,
|
||||
fallback_summary=fallback_summary,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
async def _download_files_parallel(
|
||||
dropbox_client: DropboxClient,
|
||||
files: list[dict],
|
||||
*,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
enable_summary: bool,
|
||||
max_concurrency: int = 3,
|
||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||
) -> tuple[list[ConnectorDocument], int]:
|
||||
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
|
||||
results: list[ConnectorDocument] = []
|
||||
sem = asyncio.Semaphore(max_concurrency)
|
||||
last_heartbeat = time.time()
|
||||
completed_count = 0
|
||||
hb_lock = asyncio.Lock()
|
||||
|
||||
async def _download_one(file: dict) -> ConnectorDocument | None:
|
||||
nonlocal last_heartbeat, completed_count
|
||||
async with sem:
|
||||
markdown, db_metadata, error = await download_and_extract_content(
|
||||
dropbox_client, file
|
||||
)
|
||||
if error or not markdown:
|
||||
file_name = file.get("name", "Unknown")
|
||||
reason = error or "empty content"
|
||||
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
|
||||
return None
|
||||
doc = _build_connector_doc(
|
||||
file,
|
||||
markdown,
|
||||
db_metadata,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
)
|
||||
async with hb_lock:
|
||||
completed_count += 1
|
||||
if on_heartbeat:
|
||||
now = time.time()
|
||||
if now - last_heartbeat >= HEARTBEAT_INTERVAL_SECONDS:
|
||||
await on_heartbeat(completed_count)
|
||||
last_heartbeat = now
|
||||
return doc
|
||||
|
||||
tasks = [_download_one(f) for f in files]
|
||||
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
failed = 0
|
||||
for outcome in outcomes:
|
||||
if isinstance(outcome, Exception) or outcome is None:
|
||||
failed += 1
|
||||
else:
|
||||
results.append(outcome)
|
||||
|
||||
return results, failed
|
||||
|
||||
|
||||
async def _download_and_index(
|
||||
dropbox_client: DropboxClient,
|
||||
session: AsyncSession,
|
||||
files: list[dict],
|
||||
*,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
enable_summary: bool,
|
||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||
) -> tuple[int, int]:
|
||||
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
|
||||
connector_docs, download_failed = await _download_files_parallel(
|
||||
dropbox_client,
|
||||
files,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
batch_indexed = 0
|
||||
batch_failed = 0
|
||||
if connector_docs:
|
||||
pipeline = IndexingPipelineService(session)
|
||||
|
||||
async def _get_llm(s):
|
||||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
||||
|
||||
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
||||
connector_docs,
|
||||
_get_llm,
|
||||
max_concurrency=3,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
return batch_indexed, download_failed + batch_failed
|
||||
|
||||
|
||||
async def _index_full_scan(
|
||||
dropbox_client: DropboxClient,
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
folder_path: str,
|
||||
folder_name: str,
|
||||
task_logger: TaskLoggingService,
|
||||
log_entry: object,
|
||||
max_files: int,
|
||||
include_subfolders: bool = True,
|
||||
incremental_sync: bool = True,
|
||||
on_heartbeat_callback: HeartbeatCallbackType | None = None,
|
||||
enable_summary: bool = True,
|
||||
) -> tuple[int, int]:
|
||||
"""Full scan indexing of a folder."""
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Starting full scan of folder: {folder_name}",
|
||||
{
|
||||
"stage": "full_scan",
|
||||
"folder_path": folder_path,
|
||||
"include_subfolders": include_subfolders,
|
||||
"incremental_sync": incremental_sync,
|
||||
},
|
||||
)
|
||||
|
||||
renamed_count = 0
|
||||
skipped = 0
|
||||
files_to_download: list[dict] = []
|
||||
|
||||
all_files, error = await get_files_in_folder(
|
||||
dropbox_client,
|
||||
folder_path,
|
||||
include_subfolders=include_subfolders,
|
||||
)
|
||||
if error:
|
||||
err_lower = error.lower()
|
||||
if "401" in error or "authentication expired" in err_lower:
|
||||
raise Exception(
|
||||
f"Dropbox authentication failed. Please re-authenticate. (Error: {error})"
|
||||
)
|
||||
raise Exception(f"Failed to list Dropbox files: {error}")
|
||||
|
||||
for file in all_files[:max_files]:
|
||||
if incremental_sync:
|
||||
skip, msg = await _should_skip_file(session, file, search_space_id)
|
||||
if skip:
|
||||
if msg and "renamed" in msg.lower():
|
||||
renamed_count += 1
|
||||
else:
|
||||
skipped += 1
|
||||
continue
|
||||
elif skip_item(file):
|
||||
skipped += 1
|
||||
continue
|
||||
files_to_download.append(file)
|
||||
|
||||
batch_indexed, failed = await _download_and_index(
|
||||
dropbox_client,
|
||||
session,
|
||||
files_to_download,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat_callback,
|
||||
)
|
||||
|
||||
indexed = renamed_count + batch_indexed
|
||||
logger.info(
|
||||
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
|
||||
)
|
||||
return indexed, skipped
|
||||
|
||||
|
||||
async def _index_selected_files(
|
||||
dropbox_client: DropboxClient,
|
||||
session: AsyncSession,
|
||||
file_paths: list[tuple[str, str | None]],
|
||||
*,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
enable_summary: bool,
|
||||
incremental_sync: bool = True,
|
||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||
) -> tuple[int, int, list[str]]:
|
||||
"""Index user-selected files using the parallel pipeline."""
|
||||
files_to_download: list[dict] = []
|
||||
errors: list[str] = []
|
||||
renamed_count = 0
|
||||
skipped = 0
|
||||
|
||||
for file_path, file_name in file_paths:
|
||||
file, error = await get_file_by_path(dropbox_client, file_path)
|
||||
if error or not file:
|
||||
display = file_name or file_path
|
||||
errors.append(f"File '{display}': {error or 'File not found'}")
|
||||
continue
|
||||
|
||||
if incremental_sync:
|
||||
skip, msg = await _should_skip_file(session, file, search_space_id)
|
||||
if skip:
|
||||
if msg and "renamed" in msg.lower():
|
||||
renamed_count += 1
|
||||
else:
|
||||
skipped += 1
|
||||
continue
|
||||
elif skip_item(file):
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
files_to_download.append(file)
|
||||
|
||||
batch_indexed, _failed = await _download_and_index(
|
||||
dropbox_client,
|
||||
session,
|
||||
files_to_download,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
return renamed_count + batch_indexed, skipped, errors
|
||||
|
||||
|
||||
async def index_dropbox_files(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
items_dict: dict,
|
||||
) -> tuple[int, int, str | None]:
|
||||
"""Index Dropbox files for a specific connector.
|
||||
|
||||
items_dict format:
|
||||
{
|
||||
"folders": [{"path": "...", "name": "..."}, ...],
|
||||
"files": [{"path": "...", "name": "..."}, ...],
|
||||
"indexing_options": {
|
||||
"max_files": 500,
|
||||
"incremental_sync": true,
|
||||
"include_subfolders": true,
|
||||
}
|
||||
}
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="dropbox_files_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting Dropbox indexing for connector {connector_id}",
|
||||
metadata={"connector_id": connector_id, "user_id": str(user_id)},
|
||||
)
|
||||
|
||||
try:
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.DROPBOX_CONNECTOR
|
||||
)
|
||||
if not connector:
|
||||
error_msg = f"Dropbox connector with ID {connector_id} not found"
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
|
||||
)
|
||||
return 0, 0, error_msg
|
||||
|
||||
token_encrypted = connector.config.get("_token_encrypted", False)
|
||||
if token_encrypted and not config.SECRET_KEY:
|
||||
error_msg = "SECRET_KEY not configured but credentials are encrypted"
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
error_msg,
|
||||
"Missing SECRET_KEY",
|
||||
{"error_type": "MissingSecretKey"},
|
||||
)
|
||||
return 0, 0, error_msg
|
||||
|
||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
||||
dropbox_client = DropboxClient(session, connector_id)
|
||||
|
||||
indexing_options = items_dict.get("indexing_options", {})
|
||||
max_files = indexing_options.get("max_files", 500)
|
||||
incremental_sync = indexing_options.get("incremental_sync", True)
|
||||
include_subfolders = indexing_options.get("include_subfolders", True)
|
||||
|
||||
total_indexed = 0
|
||||
total_skipped = 0
|
||||
|
||||
selected_files = items_dict.get("files", [])
|
||||
if selected_files:
|
||||
file_tuples = [
|
||||
(f.get("path", f.get("path_lower", f.get("id", ""))), f.get("name"))
|
||||
for f in selected_files
|
||||
]
|
||||
indexed, skipped, file_errors = await _index_selected_files(
|
||||
dropbox_client,
|
||||
session,
|
||||
file_tuples,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=connector_enable_summary,
|
||||
incremental_sync=incremental_sync,
|
||||
)
|
||||
total_indexed += indexed
|
||||
total_skipped += skipped
|
||||
if file_errors:
|
||||
logger.warning(
|
||||
f"File indexing errors for connector {connector_id}: {file_errors}"
|
||||
)
|
||||
|
||||
folders = items_dict.get("folders", [])
|
||||
for folder in folders:
|
||||
folder_path = folder.get("path", folder.get("path_lower", folder.get("id", "")))
|
||||
folder_name = folder.get("name", "Root")
|
||||
|
||||
logger.info(f"Using full scan for folder {folder_name}")
|
||||
indexed, skipped = await _index_full_scan(
|
||||
dropbox_client,
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_path,
|
||||
folder_name,
|
||||
task_logger,
|
||||
log_entry,
|
||||
max_files,
|
||||
include_subfolders,
|
||||
incremental_sync=incremental_sync,
|
||||
enable_summary=connector_enable_summary,
|
||||
)
|
||||
total_indexed += indexed
|
||||
total_skipped += skipped
|
||||
|
||||
if total_indexed > 0 or folders:
|
||||
await update_connector_last_indexed(session, connector, True)
|
||||
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Dropbox indexing for connector {connector_id}",
|
||||
{"files_processed": total_indexed, "files_skipped": total_skipped},
|
||||
)
|
||||
logger.info(
|
||||
f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped"
|
||||
)
|
||||
return total_indexed, total_skipped, None
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during Dropbox indexing for connector {connector_id}",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to index Dropbox files for connector {connector_id}",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True)
|
||||
return 0, 0, f"Failed to index Dropbox files: {e!s}"
|
||||
|
|
@ -22,6 +22,7 @@ BASE_NAME_FOR_TYPE = {
|
|||
SearchSourceConnectorType.SLACK_CONNECTOR: "Slack",
|
||||
SearchSourceConnectorType.TEAMS_CONNECTOR: "Microsoft Teams",
|
||||
SearchSourceConnectorType.ONEDRIVE_CONNECTOR: "OneDrive",
|
||||
SearchSourceConnectorType.DROPBOX_CONNECTOR: "Dropbox",
|
||||
SearchSourceConnectorType.NOTION_CONNECTOR: "Notion",
|
||||
SearchSourceConnectorType.LINEAR_CONNECTOR: "Linear",
|
||||
SearchSourceConnectorType.JIRA_CONNECTOR: "Jira",
|
||||
|
|
@ -65,6 +66,9 @@ def extract_identifier_from_credentials(
|
|||
if connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR:
|
||||
return credentials.get("user_email")
|
||||
|
||||
if connector_type == SearchSourceConnectorType.DROPBOX_CONNECTOR:
|
||||
return credentials.get("user_email")
|
||||
|
||||
if connector_type == SearchSourceConnectorType.NOTION_CONNECTOR:
|
||||
return credentials.get("workspace_name")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue