mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-28 18:36:23 +02:00
Merge upstream/dev
This commit is contained in:
commit
440762fb07
92 changed files with 3227 additions and 2502 deletions
|
|
@ -81,7 +81,8 @@ def create_create_onedrive_file_tool(
|
|||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connectors = result.scalars().all()
|
||||
|
|
@ -95,12 +96,14 @@ def create_create_onedrive_file_tool(
|
|||
accounts = []
|
||||
for c in connectors:
|
||||
cfg = c.config or {}
|
||||
accounts.append({
|
||||
"id": c.id,
|
||||
"name": c.name,
|
||||
"user_email": cfg.get("user_email"),
|
||||
"auth_expired": cfg.get("auth_expired", False),
|
||||
})
|
||||
accounts.append(
|
||||
{
|
||||
"id": c.id,
|
||||
"name": c.name,
|
||||
"user_email": cfg.get("user_email"),
|
||||
"auth_expired": cfg.get("auth_expired", False),
|
||||
}
|
||||
)
|
||||
|
||||
if all(a.get("auth_expired") for a in accounts):
|
||||
return {
|
||||
|
|
@ -119,16 +122,22 @@ def create_create_onedrive_file_tool(
|
|||
client = OneDriveClient(session=db_session, connector_id=cid)
|
||||
items, err = await client.list_children("root")
|
||||
if err:
|
||||
logger.warning("Failed to list folders for connector %s: %s", cid, err)
|
||||
logger.warning(
|
||||
"Failed to list folders for connector %s: %s", cid, err
|
||||
)
|
||||
parent_folders[cid] = []
|
||||
else:
|
||||
parent_folders[cid] = [
|
||||
{"folder_id": item["id"], "name": item["name"]}
|
||||
for item in items
|
||||
if item.get("folder") is not None and item.get("id") and item.get("name")
|
||||
if item.get("folder") is not None
|
||||
and item.get("id")
|
||||
and item.get("name")
|
||||
]
|
||||
except Exception:
|
||||
logger.warning("Error fetching folders for connector %s", cid, exc_info=True)
|
||||
logger.warning(
|
||||
"Error fetching folders for connector %s", cid, exc_info=True
|
||||
)
|
||||
parent_folders[cid] = []
|
||||
|
||||
context: dict[str, Any] = {
|
||||
|
|
@ -152,8 +161,12 @@ def create_create_onedrive_file_tool(
|
|||
}
|
||||
)
|
||||
|
||||
decisions_raw = approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
decisions = decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
|
@ -192,7 +205,8 @@ def create_create_onedrive_file_tool(
|
|||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
|
@ -200,7 +214,10 @@ def create_create_onedrive_file_tool(
|
|||
connector = connectors[0]
|
||||
|
||||
if not connector:
|
||||
return {"status": "error", "message": "Selected OneDrive connector is invalid."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Selected OneDrive connector is invalid.",
|
||||
}
|
||||
|
||||
docx_bytes = _markdown_to_docx(final_content or "")
|
||||
|
||||
|
|
@ -212,7 +229,9 @@ def create_create_onedrive_file_tool(
|
|||
mime_type=DOCX_MIME,
|
||||
)
|
||||
|
||||
logger.info(f"OneDrive file created: id={created.get('id')}, name={created.get('name')}")
|
||||
logger.info(
|
||||
f"OneDrive file created: id={created.get('id')}, name={created.get('name')}"
|
||||
)
|
||||
|
||||
kb_message_suffix = ""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -52,10 +52,15 @@ def create_delete_onedrive_file_tool(
|
|||
- If status is "not_found", relay the exact message to the user and ask them
|
||||
to verify the file name or check if it has been indexed.
|
||||
"""
|
||||
logger.info(f"delete_onedrive_file called: file_name='{file_name}', delete_from_kb={delete_from_kb}")
|
||||
logger.info(
|
||||
f"delete_onedrive_file called: file_name='{file_name}', delete_from_kb={delete_from_kb}"
|
||||
)
|
||||
|
||||
if db_session is None or search_space_id is None or user_id is None:
|
||||
return {"status": "error", "message": "OneDrive tool not properly configured."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "OneDrive tool not properly configured.",
|
||||
}
|
||||
|
||||
try:
|
||||
doc_result = await db_session.execute(
|
||||
|
|
@ -89,8 +94,12 @@ def create_delete_onedrive_file_tool(
|
|||
Document.search_space_id == search_space_id,
|
||||
Document.document_type == DocumentType.ONEDRIVE_FILE,
|
||||
func.lower(
|
||||
cast(Document.document_metadata["onedrive_file_name"], String)
|
||||
) == func.lower(file_name),
|
||||
cast(
|
||||
Document.document_metadata["onedrive_file_name"],
|
||||
String,
|
||||
)
|
||||
)
|
||||
== func.lower(file_name),
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
)
|
||||
)
|
||||
|
|
@ -110,14 +119,20 @@ def create_delete_onedrive_file_tool(
|
|||
}
|
||||
|
||||
if not document.connector_id:
|
||||
return {"status": "error", "message": "Document has no associated connector."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Document has no associated connector.",
|
||||
}
|
||||
|
||||
meta = document.document_metadata or {}
|
||||
file_id = meta.get("onedrive_file_id")
|
||||
document_id = document.id
|
||||
|
||||
if not file_id:
|
||||
return {"status": "error", "message": "File ID is missing. Please re-index the file."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "File ID is missing. Please re-index the file.",
|
||||
}
|
||||
|
||||
conn_result = await db_session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
|
|
@ -125,13 +140,17 @@ def create_delete_onedrive_file_tool(
|
|||
SearchSourceConnector.id == document.connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
connector = conn_result.scalars().first()
|
||||
if not connector:
|
||||
return {"status": "error", "message": "OneDrive connector not found or access denied."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "OneDrive connector not found or access denied.",
|
||||
}
|
||||
|
||||
cfg = connector.config or {}
|
||||
if cfg.get("auth_expired"):
|
||||
|
|
@ -170,8 +189,12 @@ def create_delete_onedrive_file_tool(
|
|||
}
|
||||
)
|
||||
|
||||
decisions_raw = approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
decisions = decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
decisions_raw = (
|
||||
approval.get("decisions", []) if isinstance(approval, dict) else []
|
||||
)
|
||||
decisions = (
|
||||
decisions_raw if isinstance(decisions_raw, list) else [decisions_raw]
|
||||
)
|
||||
decisions = [d for d in decisions if isinstance(d, dict)]
|
||||
if not decisions:
|
||||
return {"status": "error", "message": "No approval decision received"}
|
||||
|
|
@ -206,7 +229,8 @@ def create_delete_onedrive_file_tool(
|
|||
SearchSourceConnector.id == final_connector_id,
|
||||
SearchSourceConnector.search_space_id == search_space_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
|
@ -224,10 +248,14 @@ def create_delete_onedrive_file_tool(
|
|||
f"Deleting OneDrive file: file_id='{final_file_id}', connector={actual_connector_id}"
|
||||
)
|
||||
|
||||
client = OneDriveClient(session=db_session, connector_id=actual_connector_id)
|
||||
client = OneDriveClient(
|
||||
session=db_session, connector_id=actual_connector_id
|
||||
)
|
||||
await client.trash_file(final_file_id)
|
||||
|
||||
logger.info(f"OneDrive file deleted (moved to recycle bin): file_id={final_file_id}")
|
||||
logger.info(
|
||||
f"OneDrive file deleted (moved to recycle bin): file_id={final_file_id}"
|
||||
)
|
||||
|
||||
trash_result: dict[str, Any] = {
|
||||
"status": "success",
|
||||
|
|
@ -272,6 +300,9 @@ def create_delete_onedrive_file_tool(
|
|||
if isinstance(e, GraphInterrupt):
|
||||
raise
|
||||
logger.error(f"Error deleting OneDrive file: {e}", exc_info=True)
|
||||
return {"status": "error", "message": "Something went wrong while trashing the file. Please try again."}
|
||||
return {
|
||||
"status": "error",
|
||||
"message": "Something went wrong while trashing the file. Please try again.",
|
||||
}
|
||||
|
||||
return delete_onedrive_file
|
||||
|
|
|
|||
|
|
@ -39,7 +39,9 @@ class OneDriveClient:
|
|||
|
||||
cfg = connector.config or {}
|
||||
is_encrypted = cfg.get("_token_encrypted", False)
|
||||
token_encryption = TokenEncryption(config.SECRET_KEY) if config.SECRET_KEY else None
|
||||
token_encryption = (
|
||||
TokenEncryption(config.SECRET_KEY) if config.SECRET_KEY else None
|
||||
)
|
||||
|
||||
access_token = cfg.get("access_token", "")
|
||||
refresh_token = cfg.get("refresh_token")
|
||||
|
|
@ -206,18 +208,20 @@ class OneDriveClient:
|
|||
async def download_file_to_disk(self, item_id: str, dest_path: str) -> str | None:
|
||||
"""Stream file content to disk. Returns error message on failure."""
|
||||
token = await self._get_valid_token()
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
async with client.stream(
|
||||
async with (
|
||||
httpx.AsyncClient(follow_redirects=True) as client,
|
||||
client.stream(
|
||||
"GET",
|
||||
f"{GRAPH_API_BASE}/me/drive/items/{item_id}/content",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
timeout=120.0,
|
||||
) as resp:
|
||||
if resp.status_code != 200:
|
||||
return f"Download failed: {resp.status_code}"
|
||||
with open(dest_path, "wb") as f:
|
||||
async for chunk in resp.aiter_bytes(chunk_size=5 * 1024 * 1024):
|
||||
f.write(chunk)
|
||||
) as resp,
|
||||
):
|
||||
if resp.status_code != 200:
|
||||
return f"Download failed: {resp.status_code}"
|
||||
with open(dest_path, "wb") as f:
|
||||
async for chunk in resp.aiter_bytes(chunk_size=5 * 1024 * 1024):
|
||||
f.write(chunk)
|
||||
return None
|
||||
|
||||
async def create_file(
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ extension-based, not provider-specific.
|
|||
"""
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
|
|
@ -60,7 +61,9 @@ async def download_and_extract_content(
|
|||
|
||||
temp_file_path = None
|
||||
try:
|
||||
extension = Path(file_name).suffix or get_extension_from_mime(mime_type) or ".bin"
|
||||
extension = (
|
||||
Path(file_name).suffix or get_extension_from_mime(mime_type) or ".bin"
|
||||
)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp:
|
||||
temp_file_path = tmp.name
|
||||
|
||||
|
|
@ -76,10 +79,8 @@ async def download_and_extract_content(
|
|||
return None, metadata, str(e)
|
||||
finally:
|
||||
if temp_file_path and os.path.exists(temp_file_path):
|
||||
try:
|
||||
with contextlib.suppress(Exception):
|
||||
os.unlink(temp_file_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
|
||||
|
|
@ -94,9 +95,10 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
|
|||
return f.read()
|
||||
|
||||
if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
|
||||
from app.config import config as app_config
|
||||
from litellm import atranscription
|
||||
|
||||
from app.config import config as app_config
|
||||
|
||||
stt_service_type = (
|
||||
"local"
|
||||
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
|
||||
|
|
@ -106,9 +108,13 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
|
|||
from app.services.stt_service import stt_service
|
||||
|
||||
t0 = time.monotonic()
|
||||
logger.info(f"[local-stt] START file={filename} thread={threading.current_thread().name}")
|
||||
logger.info(
|
||||
f"[local-stt] START file={filename} thread={threading.current_thread().name}"
|
||||
)
|
||||
result = await asyncio.to_thread(stt_service.transcribe_file, file_path)
|
||||
logger.info(f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s")
|
||||
logger.info(
|
||||
f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
|
||||
)
|
||||
text = result.get("text", "")
|
||||
else:
|
||||
with open(file_path, "rb") as audio_file:
|
||||
|
|
@ -150,7 +156,9 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
|
|||
parse_with_llamacloud_retry,
|
||||
)
|
||||
|
||||
result = await parse_with_llamacloud_retry(file_path=file_path, estimated_pages=50)
|
||||
result = await parse_with_llamacloud_retry(
|
||||
file_path=file_path, estimated_pages=50
|
||||
)
|
||||
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
|
||||
if not markdown_documents:
|
||||
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
|
||||
|
|
@ -161,9 +169,13 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
|
|||
|
||||
converter = DocumentConverter()
|
||||
t0 = time.monotonic()
|
||||
logger.info(f"[docling] START file={filename} thread={threading.current_thread().name}")
|
||||
logger.info(
|
||||
f"[docling] START file={filename} thread={threading.current_thread().name}"
|
||||
)
|
||||
result = await asyncio.to_thread(converter.convert, file_path)
|
||||
logger.info(f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s")
|
||||
logger.info(
|
||||
f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
|
||||
)
|
||||
return result.document.export_to_markdown()
|
||||
|
||||
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
|
||||
|
|
|
|||
|
|
@ -27,7 +27,10 @@ async def list_folder_contents(
|
|||
if item["isFolder"]:
|
||||
item.setdefault("mimeType", "application/vnd.ms-folder")
|
||||
else:
|
||||
item.setdefault("mimeType", item.get("file", {}).get("mimeType", "application/octet-stream"))
|
||||
item.setdefault(
|
||||
"mimeType",
|
||||
item.get("file", {}).get("mimeType", "application/octet-stream"),
|
||||
)
|
||||
|
||||
items.sort(key=lambda x: (not x["isFolder"], x.get("name", "").lower()))
|
||||
|
||||
|
|
@ -63,7 +66,9 @@ async def get_files_in_folder(
|
|||
client, item["id"], include_subfolders=True
|
||||
)
|
||||
if sub_error:
|
||||
logger.warning(f"Error recursing into folder {item.get('name')}: {sub_error}")
|
||||
logger.warning(
|
||||
f"Error recursing into folder {item.get('name')}: {sub_error}"
|
||||
)
|
||||
continue
|
||||
files.extend(sub_files)
|
||||
elif not should_skip_file(item):
|
||||
|
|
|
|||
|
|
@ -33,9 +33,10 @@ from .new_llm_config_routes import router as new_llm_config_router
|
|||
from .notes_routes import router as notes_router
|
||||
from .notifications_routes import router as notifications_router
|
||||
from .notion_add_connector_route import router as notion_add_connector_router
|
||||
from .onedrive_add_connector_route import router as onedrive_add_connector_router
|
||||
from .podcasts_routes import router as podcasts_router
|
||||
from .public_chat_routes import router as public_chat_router
|
||||
from .prompts_routes import router as prompts_router
|
||||
from .public_chat_routes import router as public_chat_router
|
||||
from .rbac_routes import router as rbac_router
|
||||
from .reports_routes import router as reports_router
|
||||
from .sandbox_routes import router as sandbox_router
|
||||
|
|
@ -44,7 +45,6 @@ from .search_spaces_routes import router as search_spaces_router
|
|||
from .slack_add_connector_route import router as slack_add_connector_router
|
||||
from .surfsense_docs_routes import router as surfsense_docs_router
|
||||
from .teams_add_connector_route import router as teams_add_connector_router
|
||||
from .onedrive_add_connector_route import router as onedrive_add_connector_router
|
||||
from .video_presentations_routes import router as video_presentations_router
|
||||
from .youtube_routes import router as youtube_router
|
||||
|
||||
|
|
|
|||
|
|
@ -79,9 +79,13 @@ async def connect_onedrive(space_id: int, user: User = Depends(current_active_us
|
|||
if not space_id:
|
||||
raise HTTPException(status_code=400, detail="space_id is required")
|
||||
if not config.MICROSOFT_CLIENT_ID:
|
||||
raise HTTPException(status_code=500, detail="Microsoft OneDrive OAuth not configured.")
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Microsoft OneDrive OAuth not configured."
|
||||
)
|
||||
if not config.SECRET_KEY:
|
||||
raise HTTPException(status_code=500, detail="SECRET_KEY not configured for OAuth security.")
|
||||
raise HTTPException(
|
||||
status_code=500, detail="SECRET_KEY not configured for OAuth security."
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
state_encoded = state_manager.generate_secure_state(space_id, user.id)
|
||||
|
|
@ -96,14 +100,18 @@ async def connect_onedrive(space_id: int, user: User = Depends(current_active_us
|
|||
}
|
||||
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
|
||||
|
||||
logger.info("Generated OneDrive OAuth URL for user %s, space %s", user.id, space_id)
|
||||
logger.info(
|
||||
"Generated OneDrive OAuth URL for user %s, space %s", user.id, space_id
|
||||
)
|
||||
return {"auth_url": auth_url}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Failed to initiate OneDrive OAuth: %s", str(e), exc_info=True)
|
||||
raise HTTPException(status_code=500, detail=f"Failed to initiate OneDrive OAuth: {e!s}") from e
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to initiate OneDrive OAuth: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/auth/onedrive/connector/reauth")
|
||||
|
|
@ -121,15 +129,20 @@ async def reauth_onedrive(
|
|||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.search_space_id == space_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
raise HTTPException(status_code=404, detail="OneDrive connector not found or access denied")
|
||||
raise HTTPException(
|
||||
status_code=404, detail="OneDrive connector not found or access denied"
|
||||
)
|
||||
|
||||
if not config.SECRET_KEY:
|
||||
raise HTTPException(status_code=500, detail="SECRET_KEY not configured for OAuth security.")
|
||||
raise HTTPException(
|
||||
status_code=500, detail="SECRET_KEY not configured for OAuth security."
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
extra: dict = {"connector_id": connector_id}
|
||||
|
|
@ -148,14 +161,20 @@ async def reauth_onedrive(
|
|||
}
|
||||
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
|
||||
|
||||
logger.info("Initiating OneDrive re-auth for user %s, connector %s", user.id, connector_id)
|
||||
logger.info(
|
||||
"Initiating OneDrive re-auth for user %s, connector %s",
|
||||
user.id,
|
||||
connector_id,
|
||||
)
|
||||
return {"auth_url": auth_url}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Failed to initiate OneDrive re-auth: %s", str(e), exc_info=True)
|
||||
raise HTTPException(status_code=500, detail=f"Failed to initiate OneDrive re-auth: {e!s}") from e
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to initiate OneDrive re-auth: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/auth/onedrive/connector/callback")
|
||||
|
|
@ -182,10 +201,14 @@ async def onedrive_callback(
|
|||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=onedrive_oauth_denied"
|
||||
)
|
||||
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=onedrive_oauth_denied")
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=onedrive_oauth_denied"
|
||||
)
|
||||
|
||||
if not code or not state:
|
||||
raise HTTPException(status_code=400, detail="Missing required OAuth parameters")
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Missing required OAuth parameters"
|
||||
)
|
||||
|
||||
state_manager = get_state_manager()
|
||||
try:
|
||||
|
|
@ -194,7 +217,9 @@ async def onedrive_callback(
|
|||
user_id = UUID(data["user_id"])
|
||||
except (HTTPException, ValueError, KeyError) as e:
|
||||
logger.error("Invalid OAuth state: %s", str(e))
|
||||
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state")
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state"
|
||||
)
|
||||
|
||||
reauth_connector_id = data.get("connector_id")
|
||||
reauth_return_url = data.get("return_url")
|
||||
|
|
@ -222,20 +247,26 @@ async def onedrive_callback(
|
|||
error_detail = error_json.get("error_description", error_detail)
|
||||
except Exception:
|
||||
pass
|
||||
raise HTTPException(status_code=400, detail=f"Token exchange failed: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Token exchange failed: {error_detail}"
|
||||
)
|
||||
|
||||
token_json = token_response.json()
|
||||
access_token = token_json.get("access_token")
|
||||
refresh_token = token_json.get("refresh_token")
|
||||
|
||||
if not access_token:
|
||||
raise HTTPException(status_code=400, detail="No access token received from Microsoft")
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No access token received from Microsoft"
|
||||
)
|
||||
|
||||
token_encryption = get_token_encryption()
|
||||
|
||||
expires_at = None
|
||||
if token_json.get("expires_in"):
|
||||
expires_at = datetime.now(UTC) + timedelta(seconds=int(token_json["expires_in"]))
|
||||
expires_at = datetime.now(UTC) + timedelta(
|
||||
seconds=int(token_json["expires_in"])
|
||||
)
|
||||
|
||||
user_info: dict = {}
|
||||
try:
|
||||
|
|
@ -248,7 +279,8 @@ async def onedrive_callback(
|
|||
if user_response.status_code == 200:
|
||||
user_data = user_response.json()
|
||||
user_info = {
|
||||
"user_email": user_data.get("mail") or user_data.get("userPrincipalName"),
|
||||
"user_email": user_data.get("mail")
|
||||
or user_data.get("userPrincipalName"),
|
||||
"user_name": user_data.get("displayName"),
|
||||
}
|
||||
except Exception as e:
|
||||
|
|
@ -256,7 +288,9 @@ async def onedrive_callback(
|
|||
|
||||
connector_config = {
|
||||
"access_token": token_encryption.encrypt_token(access_token),
|
||||
"refresh_token": token_encryption.encrypt_token(refresh_token) if refresh_token else None,
|
||||
"refresh_token": token_encryption.encrypt_token(refresh_token)
|
||||
if refresh_token
|
||||
else None,
|
||||
"token_type": token_json.get("token_type", "Bearer"),
|
||||
"expires_in": token_json.get("expires_in"),
|
||||
"expires_at": expires_at.isoformat() if expires_at else None,
|
||||
|
|
@ -273,22 +307,36 @@ async def onedrive_callback(
|
|||
SearchSourceConnector.id == reauth_connector_id,
|
||||
SearchSourceConnector.user_id == user_id,
|
||||
SearchSourceConnector.search_space_id == space_id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
db_connector = result.scalars().first()
|
||||
if not db_connector:
|
||||
raise HTTPException(status_code=404, detail="Connector not found or access denied during re-auth")
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Connector not found or access denied during re-auth",
|
||||
)
|
||||
|
||||
existing_delta_link = db_connector.config.get("delta_link")
|
||||
db_connector.config = {**connector_config, "delta_link": existing_delta_link, "auth_expired": False}
|
||||
db_connector.config = {
|
||||
**connector_config,
|
||||
"delta_link": existing_delta_link,
|
||||
"auth_expired": False,
|
||||
}
|
||||
flag_modified(db_connector, "config")
|
||||
await session.commit()
|
||||
await session.refresh(db_connector)
|
||||
|
||||
logger.info("Re-authenticated OneDrive connector %s for user %s", db_connector.id, user_id)
|
||||
logger.info(
|
||||
"Re-authenticated OneDrive connector %s for user %s",
|
||||
db_connector.id,
|
||||
user_id,
|
||||
)
|
||||
if reauth_return_url and reauth_return_url.startswith("/"):
|
||||
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}")
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}"
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=ONEDRIVE_CONNECTOR&connectorId={db_connector.id}"
|
||||
)
|
||||
|
|
@ -298,16 +346,26 @@ async def onedrive_callback(
|
|||
SearchSourceConnectorType.ONEDRIVE_CONNECTOR, connector_config
|
||||
)
|
||||
is_duplicate = await check_duplicate_connector(
|
||||
session, SearchSourceConnectorType.ONEDRIVE_CONNECTOR, space_id, user_id, connector_identifier,
|
||||
session,
|
||||
SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
space_id,
|
||||
user_id,
|
||||
connector_identifier,
|
||||
)
|
||||
if is_duplicate:
|
||||
logger.warning("Duplicate OneDrive connector for user %s, space %s", user_id, space_id)
|
||||
logger.warning(
|
||||
"Duplicate OneDrive connector for user %s, space %s", user_id, space_id
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=ONEDRIVE_CONNECTOR"
|
||||
)
|
||||
|
||||
connector_name = await generate_unique_connector_name(
|
||||
session, SearchSourceConnectorType.ONEDRIVE_CONNECTOR, space_id, user_id, connector_identifier,
|
||||
session,
|
||||
SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
space_id,
|
||||
user_id,
|
||||
connector_identifier,
|
||||
)
|
||||
|
||||
new_connector = SearchSourceConnector(
|
||||
|
|
@ -323,20 +381,30 @@ async def onedrive_callback(
|
|||
session.add(new_connector)
|
||||
await session.commit()
|
||||
await session.refresh(new_connector)
|
||||
logger.info("Successfully created OneDrive connector %s for user %s", new_connector.id, user_id)
|
||||
logger.info(
|
||||
"Successfully created OneDrive connector %s for user %s",
|
||||
new_connector.id,
|
||||
user_id,
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=ONEDRIVE_CONNECTOR&connectorId={new_connector.id}"
|
||||
)
|
||||
except IntegrityError as e:
|
||||
await session.rollback()
|
||||
logger.error("Database integrity error creating OneDrive connector: %s", str(e))
|
||||
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed")
|
||||
logger.error(
|
||||
"Database integrity error creating OneDrive connector: %s", str(e)
|
||||
)
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed"
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except (IntegrityError, ValueError) as e:
|
||||
logger.error("OneDrive OAuth callback error: %s", str(e), exc_info=True)
|
||||
return RedirectResponse(url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=onedrive_auth_error")
|
||||
return RedirectResponse(
|
||||
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=onedrive_auth_error"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/connectors/{connector_id}/onedrive/folders")
|
||||
|
|
@ -353,28 +421,44 @@ async def list_onedrive_folders(
|
|||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.id == connector_id,
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
SearchSourceConnector.connector_type
|
||||
== SearchSourceConnectorType.ONEDRIVE_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
if not connector:
|
||||
raise HTTPException(status_code=404, detail="OneDrive connector not found or access denied")
|
||||
raise HTTPException(
|
||||
status_code=404, detail="OneDrive connector not found or access denied"
|
||||
)
|
||||
|
||||
onedrive_client = OneDriveClient(session, connector_id)
|
||||
items, error = await list_folder_contents(onedrive_client, parent_id=parent_id)
|
||||
|
||||
if error:
|
||||
error_lower = error.lower()
|
||||
if "401" in error or "authentication expired" in error_lower or "invalid_grant" in error_lower:
|
||||
if (
|
||||
"401" in error
|
||||
or "authentication expired" in error_lower
|
||||
or "invalid_grant" in error_lower
|
||||
):
|
||||
try:
|
||||
if connector and not connector.config.get("auth_expired"):
|
||||
connector.config = {**connector.config, "auth_expired": True}
|
||||
flag_modified(connector, "config")
|
||||
await session.commit()
|
||||
except Exception:
|
||||
logger.warning("Failed to persist auth_expired for connector %s", connector_id, exc_info=True)
|
||||
raise HTTPException(status_code=400, detail="OneDrive authentication expired. Please re-authenticate.")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to list folder contents: {error}")
|
||||
logger.warning(
|
||||
"Failed to persist auth_expired for connector %s",
|
||||
connector_id,
|
||||
exc_info=True,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="OneDrive authentication expired. Please re-authenticate.",
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to list folder contents: {error}"
|
||||
)
|
||||
|
||||
return {"items": items}
|
||||
|
||||
|
|
@ -391,8 +475,13 @@ async def list_onedrive_folders(
|
|||
await session.commit()
|
||||
except Exception:
|
||||
pass
|
||||
raise HTTPException(status_code=400, detail="OneDrive authentication expired. Please re-authenticate.") from e
|
||||
raise HTTPException(status_code=500, detail=f"Failed to list OneDrive contents: {e!s}") from e
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="OneDrive authentication expired. Please re-authenticate.",
|
||||
) from e
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to list OneDrive contents: {e!s}"
|
||||
) from e
|
||||
|
||||
|
||||
async def refresh_onedrive_token(
|
||||
|
|
@ -410,10 +499,15 @@ async def refresh_onedrive_token(
|
|||
refresh_token = token_encryption.decrypt_token(refresh_token)
|
||||
except Exception as e:
|
||||
logger.error("Failed to decrypt refresh token: %s", str(e))
|
||||
raise HTTPException(status_code=500, detail="Failed to decrypt stored refresh token") from e
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Failed to decrypt stored refresh token"
|
||||
) from e
|
||||
|
||||
if not refresh_token:
|
||||
raise HTTPException(status_code=400, detail=f"No refresh token available for connector {connector.id}")
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"No refresh token available for connector {connector.id}",
|
||||
)
|
||||
|
||||
refresh_data = {
|
||||
"client_id": config.MICROSOFT_CLIENT_ID,
|
||||
|
|
@ -425,8 +519,10 @@ async def refresh_onedrive_token(
|
|||
|
||||
async with httpx.AsyncClient() as client:
|
||||
token_response = await client.post(
|
||||
TOKEN_URL, data=refresh_data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=30.0,
|
||||
TOKEN_URL,
|
||||
data=refresh_data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
if token_response.status_code != 200:
|
||||
|
|
@ -439,16 +535,27 @@ async def refresh_onedrive_token(
|
|||
except Exception:
|
||||
pass
|
||||
error_lower = (error_detail + error_code).lower()
|
||||
if "invalid_grant" in error_lower or "expired" in error_lower or "revoked" in error_lower:
|
||||
raise HTTPException(status_code=401, detail="OneDrive authentication failed. Please re-authenticate.")
|
||||
raise HTTPException(status_code=400, detail=f"Token refresh failed: {error_detail}")
|
||||
if (
|
||||
"invalid_grant" in error_lower
|
||||
or "expired" in error_lower
|
||||
or "revoked" in error_lower
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="OneDrive authentication failed. Please re-authenticate.",
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"Token refresh failed: {error_detail}"
|
||||
)
|
||||
|
||||
token_json = token_response.json()
|
||||
access_token = token_json.get("access_token")
|
||||
new_refresh_token = token_json.get("refresh_token")
|
||||
|
||||
if not access_token:
|
||||
raise HTTPException(status_code=400, detail="No access token received from Microsoft refresh")
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No access token received from Microsoft refresh"
|
||||
)
|
||||
|
||||
expires_at = None
|
||||
expires_in = token_json.get("expires_in")
|
||||
|
|
|
|||
|
|
@ -2567,8 +2567,12 @@ async def run_onedrive_indexing(
|
|||
search_space_id=search_space_id,
|
||||
folder_count=len(items_dict.get("folders", [])),
|
||||
file_count=len(items_dict.get("files", [])),
|
||||
folder_names=[f.get("name", "Unknown") for f in items_dict.get("folders", [])],
|
||||
file_names=[f.get("name", "Unknown") for f in items_dict.get("files", [])],
|
||||
folder_names=[
|
||||
f.get("name", "Unknown") for f in items_dict.get("folders", [])
|
||||
],
|
||||
file_names=[
|
||||
f.get("name", "Unknown") for f in items_dict.get("files", [])
|
||||
],
|
||||
)
|
||||
|
||||
if notification:
|
||||
|
|
@ -2593,7 +2597,9 @@ async def run_onedrive_indexing(
|
|||
)
|
||||
if _is_auth_error(error_message):
|
||||
await _persist_auth_expired(session, connector_id)
|
||||
error_message = "OneDrive authentication expired. Please re-authenticate."
|
||||
error_message = (
|
||||
"OneDrive authentication expired. Please re-authenticate."
|
||||
)
|
||||
else:
|
||||
if notification:
|
||||
await session.refresh(notification)
|
||||
|
|
|
|||
|
|
@ -56,9 +56,7 @@ class OneDriveKBSyncService:
|
|||
|
||||
indexable_content = (content or "").strip()
|
||||
if not indexable_content:
|
||||
indexable_content = (
|
||||
f"OneDrive file: {file_name} (type: {mime_type})"
|
||||
)
|
||||
indexable_content = f"OneDrive file: {file_name} (type: {mime_type})"
|
||||
|
||||
content_hash = generate_content_hash(indexable_content, search_space_id)
|
||||
|
||||
|
|
@ -95,9 +93,7 @@ class OneDriveKBSyncService:
|
|||
)
|
||||
else:
|
||||
logger.warning("No LLM configured — using fallback summary")
|
||||
summary_content = (
|
||||
f"OneDrive File: {file_name}\n\n{indexable_content}"
|
||||
)
|
||||
summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}"
|
||||
summary_embedding = embed_text(summary_content)
|
||||
|
||||
chunks = await create_document_chunks(indexable_content)
|
||||
|
|
|
|||
|
|
@ -1075,6 +1075,37 @@ async def _stream_agent_events(
|
|||
"thread_id": thread_id_str,
|
||||
},
|
||||
)
|
||||
elif tool_name == "web_search":
|
||||
xml = (
|
||||
tool_output.get("result", str(tool_output))
|
||||
if isinstance(tool_output, dict)
|
||||
else str(tool_output)
|
||||
)
|
||||
citations: dict[str, dict[str, str]] = {}
|
||||
for m in re.finditer(
|
||||
r"<title><!\[CDATA\[(.*?)\]\]></title>\s*<url><!\[CDATA\[(.*?)\]\]></url>",
|
||||
xml,
|
||||
):
|
||||
title, url = m.group(1).strip(), m.group(2).strip()
|
||||
if url.startswith("http") and url not in citations:
|
||||
citations[url] = {"title": title}
|
||||
for m in re.finditer(
|
||||
r"<chunk\s+id='([^']*)'><!\[CDATA\[([\s\S]*?)\]\]></chunk>",
|
||||
xml,
|
||||
):
|
||||
chunk_url, content = m.group(1).strip(), m.group(2).strip()
|
||||
if (
|
||||
chunk_url.startswith("http")
|
||||
and chunk_url in citations
|
||||
and content
|
||||
):
|
||||
citations[chunk_url]["snippet"] = (
|
||||
content[:200] + "…" if len(content) > 200 else content
|
||||
)
|
||||
yield streaming_service.format_tool_output_available(
|
||||
tool_call_id,
|
||||
{"status": "completed", "citations": citations},
|
||||
)
|
||||
else:
|
||||
yield streaming_service.format_tool_output_available(
|
||||
tool_call_id,
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ logger = logging.getLogger(__name__)
|
|||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _should_skip_file(
|
||||
session: AsyncSession,
|
||||
file: dict,
|
||||
|
|
@ -186,9 +187,13 @@ async def _download_files_parallel(
|
|||
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
|
||||
return None
|
||||
doc = _build_connector_doc(
|
||||
file, markdown, od_metadata,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=enable_summary,
|
||||
file,
|
||||
markdown,
|
||||
od_metadata,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
)
|
||||
async with hb_lock:
|
||||
completed_count += 1
|
||||
|
|
@ -204,9 +209,7 @@ async def _download_files_parallel(
|
|||
|
||||
failed = 0
|
||||
for outcome in outcomes:
|
||||
if isinstance(outcome, Exception):
|
||||
failed += 1
|
||||
elif outcome is None:
|
||||
if isinstance(outcome, Exception) or outcome is None:
|
||||
failed += 1
|
||||
else:
|
||||
results.append(outcome)
|
||||
|
|
@ -227,9 +230,12 @@ async def _download_and_index(
|
|||
) -> tuple[int, int]:
|
||||
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
|
||||
connector_docs, download_failed = await _download_files_parallel(
|
||||
onedrive_client, files,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=enable_summary,
|
||||
onedrive_client,
|
||||
files,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
|
|
@ -242,7 +248,9 @@ async def _download_and_index(
|
|||
return await get_user_long_context_llm(s, user_id, search_space_id)
|
||||
|
||||
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
|
||||
connector_docs, _get_llm, max_concurrency=3,
|
||||
connector_docs,
|
||||
_get_llm,
|
||||
max_concurrency=3,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
|
|
@ -305,10 +313,14 @@ async def _index_selected_files(
|
|||
|
||||
files_to_download.append(file)
|
||||
|
||||
batch_indexed, failed = await _download_and_index(
|
||||
onedrive_client, session, files_to_download,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=enable_summary,
|
||||
batch_indexed, _failed = await _download_and_index(
|
||||
onedrive_client,
|
||||
session,
|
||||
files_to_download,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
|
|
@ -319,6 +331,7 @@ async def _index_selected_files(
|
|||
# Scan strategies
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _index_full_scan(
|
||||
onedrive_client: OneDriveClient,
|
||||
session: AsyncSession,
|
||||
|
|
@ -338,7 +351,11 @@ async def _index_full_scan(
|
|||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Starting full scan of folder: {folder_name}",
|
||||
{"stage": "full_scan", "folder_id": folder_id, "include_subfolders": include_subfolders},
|
||||
{
|
||||
"stage": "full_scan",
|
||||
"folder_id": folder_id,
|
||||
"include_subfolders": include_subfolders,
|
||||
},
|
||||
)
|
||||
|
||||
renamed_count = 0
|
||||
|
|
@ -346,12 +363,16 @@ async def _index_full_scan(
|
|||
files_to_download: list[dict] = []
|
||||
|
||||
all_files, error = await get_files_in_folder(
|
||||
onedrive_client, folder_id, include_subfolders=include_subfolders,
|
||||
onedrive_client,
|
||||
folder_id,
|
||||
include_subfolders=include_subfolders,
|
||||
)
|
||||
if error:
|
||||
err_lower = error.lower()
|
||||
if "401" in error or "authentication expired" in err_lower:
|
||||
raise Exception(f"OneDrive authentication failed. Please re-authenticate. (Error: {error})")
|
||||
raise Exception(
|
||||
f"OneDrive authentication failed. Please re-authenticate. (Error: {error})"
|
||||
)
|
||||
raise Exception(f"Failed to list OneDrive files: {error}")
|
||||
|
||||
for file in all_files[:max_files]:
|
||||
|
|
@ -365,14 +386,20 @@ async def _index_full_scan(
|
|||
files_to_download.append(file)
|
||||
|
||||
batch_indexed, failed = await _download_and_index(
|
||||
onedrive_client, session, files_to_download,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=enable_summary,
|
||||
onedrive_client,
|
||||
session,
|
||||
files_to_download,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat_callback,
|
||||
)
|
||||
|
||||
indexed = renamed_count + batch_indexed
|
||||
logger.info(f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed")
|
||||
logger.info(
|
||||
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
|
||||
)
|
||||
return indexed, skipped
|
||||
|
||||
|
||||
|
|
@ -392,7 +419,8 @@ async def _index_with_delta_sync(
|
|||
) -> tuple[int, int, str | None]:
|
||||
"""Delta sync using OneDrive change tracking. Returns (indexed, skipped, new_delta_link)."""
|
||||
await task_logger.log_task_progress(
|
||||
log_entry, "Starting delta sync",
|
||||
log_entry,
|
||||
"Starting delta sync",
|
||||
{"stage": "delta_sync"},
|
||||
)
|
||||
|
||||
|
|
@ -402,7 +430,9 @@ async def _index_with_delta_sync(
|
|||
if error:
|
||||
err_lower = error.lower()
|
||||
if "401" in error or "authentication expired" in err_lower:
|
||||
raise Exception(f"OneDrive authentication failed. Please re-authenticate. (Error: {error})")
|
||||
raise Exception(
|
||||
f"OneDrive authentication failed. Please re-authenticate. (Error: {error})"
|
||||
)
|
||||
raise Exception(f"Failed to fetch OneDrive changes: {error}")
|
||||
|
||||
if not changes:
|
||||
|
|
@ -444,14 +474,20 @@ async def _index_with_delta_sync(
|
|||
files_to_download.append(change)
|
||||
|
||||
batch_indexed, failed = await _download_and_index(
|
||||
onedrive_client, session, files_to_download,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=enable_summary,
|
||||
onedrive_client,
|
||||
session,
|
||||
files_to_download,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=enable_summary,
|
||||
on_heartbeat=on_heartbeat_callback,
|
||||
)
|
||||
|
||||
indexed = renamed_count + batch_indexed
|
||||
logger.info(f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed")
|
||||
logger.info(
|
||||
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
|
||||
)
|
||||
return indexed, skipped, new_delta_link
|
||||
|
||||
|
||||
|
|
@ -459,6 +495,7 @@ async def _index_with_delta_sync(
|
|||
# Public entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def index_onedrive_files(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
|
|
@ -489,13 +526,20 @@ async def index_onedrive_files(
|
|||
)
|
||||
if not connector:
|
||||
error_msg = f"OneDrive connector with ID {connector_id} not found"
|
||||
await task_logger.log_task_failure(log_entry, error_msg, None, {"error_type": "ConnectorNotFound"})
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
|
||||
)
|
||||
return 0, 0, error_msg
|
||||
|
||||
token_encrypted = connector.config.get("_token_encrypted", False)
|
||||
if token_encrypted and not config.SECRET_KEY:
|
||||
error_msg = "SECRET_KEY not configured but credentials are encrypted"
|
||||
await task_logger.log_task_failure(log_entry, error_msg, "Missing SECRET_KEY", {"error_type": "MissingSecretKey"})
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
error_msg,
|
||||
"Missing SECRET_KEY",
|
||||
{"error_type": "MissingSecretKey"},
|
||||
)
|
||||
return 0, 0, error_msg
|
||||
|
||||
connector_enable_summary = getattr(connector, "enable_summary", True)
|
||||
|
|
@ -513,10 +557,14 @@ async def index_onedrive_files(
|
|||
selected_files = items_dict.get("files", [])
|
||||
if selected_files:
|
||||
file_tuples = [(f["id"], f.get("name")) for f in selected_files]
|
||||
indexed, skipped, errors = await _index_selected_files(
|
||||
onedrive_client, session, file_tuples,
|
||||
connector_id=connector_id, search_space_id=search_space_id,
|
||||
user_id=user_id, enable_summary=connector_enable_summary,
|
||||
indexed, skipped, _errors = await _index_selected_files(
|
||||
onedrive_client,
|
||||
session,
|
||||
file_tuples,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
enable_summary=connector_enable_summary,
|
||||
)
|
||||
total_indexed += indexed
|
||||
total_skipped += skipped
|
||||
|
|
@ -534,8 +582,16 @@ async def index_onedrive_files(
|
|||
if can_use_delta:
|
||||
logger.info(f"Using delta sync for folder {folder_name}")
|
||||
indexed, skipped, new_delta_link = await _index_with_delta_sync(
|
||||
onedrive_client, session, connector_id, search_space_id, user_id,
|
||||
folder_id, delta_link, task_logger, log_entry, max_files,
|
||||
onedrive_client,
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_id,
|
||||
delta_link,
|
||||
task_logger,
|
||||
log_entry,
|
||||
max_files,
|
||||
enable_summary=connector_enable_summary,
|
||||
)
|
||||
total_indexed += indexed
|
||||
|
|
@ -550,18 +606,36 @@ async def index_onedrive_files(
|
|||
|
||||
# Reconciliation full scan
|
||||
ri, rs = await _index_full_scan(
|
||||
onedrive_client, session, connector_id, search_space_id, user_id,
|
||||
folder_id, folder_name, task_logger, log_entry, max_files,
|
||||
include_subfolders, enable_summary=connector_enable_summary,
|
||||
onedrive_client,
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_id,
|
||||
folder_name,
|
||||
task_logger,
|
||||
log_entry,
|
||||
max_files,
|
||||
include_subfolders,
|
||||
enable_summary=connector_enable_summary,
|
||||
)
|
||||
total_indexed += ri
|
||||
total_skipped += rs
|
||||
else:
|
||||
logger.info(f"Using full scan for folder {folder_name}")
|
||||
indexed, skipped = await _index_full_scan(
|
||||
onedrive_client, session, connector_id, search_space_id, user_id,
|
||||
folder_id, folder_name, task_logger, log_entry, max_files,
|
||||
include_subfolders, enable_summary=connector_enable_summary,
|
||||
onedrive_client,
|
||||
session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
user_id,
|
||||
folder_id,
|
||||
folder_name,
|
||||
task_logger,
|
||||
log_entry,
|
||||
max_files,
|
||||
include_subfolders,
|
||||
enable_summary=connector_enable_summary,
|
||||
)
|
||||
total_indexed += indexed
|
||||
total_skipped += skipped
|
||||
|
|
@ -585,22 +659,28 @@ async def index_onedrive_files(
|
|||
f"Successfully completed OneDrive indexing for connector {connector_id}",
|
||||
{"files_processed": total_indexed, "files_skipped": total_skipped},
|
||||
)
|
||||
logger.info(f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped")
|
||||
logger.info(
|
||||
f"OneDrive indexing completed: {total_indexed} indexed, {total_skipped} skipped"
|
||||
)
|
||||
return total_indexed, total_skipped, None
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Database error during OneDrive indexing for connector {connector_id}",
|
||||
str(db_error), {"error_type": "SQLAlchemyError"},
|
||||
log_entry,
|
||||
f"Database error during OneDrive indexing for connector {connector_id}",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry, f"Failed to index OneDrive files for connector {connector_id}",
|
||||
str(e), {"error_type": type(e).__name__},
|
||||
log_entry,
|
||||
f"Failed to index OneDrive files for connector {connector_id}",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index OneDrive files: {e!s}", exc_info=True)
|
||||
return 0, 0, f"Failed to index OneDrive files: {e!s}"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue