SurfSense/surfsense_web/lib/folder-sync-upload.ts
DESKTOP-RTLN3BA\$punk 656e061f84
Some checks are pending
Build and Push Docker Images / tag_release (push) Waiting to run
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (backend, surfsense-backend) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (web, surfsense-web) (push) Blocked by required conditions
feat: add processing mode support for document uploads and ETL pipeline, improded error handling ux
- Introduced a `ProcessingMode` enum to differentiate between basic and premium processing modes.
- Updated `EtlRequest` to include a `processing_mode` field, defaulting to basic.
- Enhanced ETL pipeline services to utilize the selected processing mode for Azure Document Intelligence and LlamaCloud parsing.
- Modified various routes and services to handle processing mode, affecting document upload and indexing tasks.
- Improved error handling and logging to include processing mode details.
- Added tests to validate processing mode functionality and its impact on ETL operations.
2026-04-14 21:26:00 -07:00

244 lines
6.5 KiB
TypeScript

import { documentsApiService } from "@/lib/apis/documents-api.service";
const MAX_BATCH_SIZE_BYTES = 20 * 1024 * 1024; // 20 MB
const MAX_BATCH_FILES = 10;
const UPLOAD_CONCURRENCY = 3;
export interface FolderSyncProgress {
phase: "listing" | "checking" | "uploading" | "finalizing" | "done";
uploaded: number;
total: number;
}
export interface FolderSyncParams {
folderPath: string;
folderName: string;
searchSpaceId: number;
excludePatterns: string[];
fileExtensions: string[];
enableSummary: boolean;
processingMode?: "basic" | "premium";
rootFolderId?: number | null;
onProgress?: (progress: FolderSyncProgress) => void;
signal?: AbortSignal;
}
function buildBatches(entries: FolderFileEntry[]): FolderFileEntry[][] {
const batches: FolderFileEntry[][] = [];
let currentBatch: FolderFileEntry[] = [];
let currentSize = 0;
for (const entry of entries) {
if (entry.size >= MAX_BATCH_SIZE_BYTES) {
if (currentBatch.length > 0) {
batches.push(currentBatch);
currentBatch = [];
currentSize = 0;
}
batches.push([entry]);
continue;
}
if (currentBatch.length >= MAX_BATCH_FILES || currentSize + entry.size > MAX_BATCH_SIZE_BYTES) {
batches.push(currentBatch);
currentBatch = [];
currentSize = 0;
}
currentBatch.push(entry);
currentSize += entry.size;
}
if (currentBatch.length > 0) {
batches.push(currentBatch);
}
return batches;
}
async function uploadBatchesWithConcurrency(
batches: FolderFileEntry[][],
params: {
folderName: string;
searchSpaceId: number;
rootFolderId: number | null;
enableSummary: boolean;
processingMode?: "basic" | "premium";
signal?: AbortSignal;
onBatchComplete?: (filesInBatch: number) => void;
}
): Promise<number | null> {
const api = window.electronAPI;
if (!api) throw new Error("Electron API not available");
let batchIdx = 0;
let resolvedRootFolderId = params.rootFolderId;
const errors: string[] = [];
async function processNext(): Promise<void> {
while (true) {
if (params.signal?.aborted) return;
const idx = batchIdx++;
if (idx >= batches.length) return;
const batch = batches[idx];
const fullPaths = batch.map((e) => e.fullPath);
try {
const fileDataArr = await api.readLocalFiles(fullPaths);
const files: File[] = fileDataArr.map((fd) => {
const blob = new Blob([fd.data], { type: fd.mimeType || "application/octet-stream" });
return new File([blob], fd.name, { type: blob.type });
});
const result = await documentsApiService.folderUploadFiles(
files,
{
folder_name: params.folderName,
search_space_id: params.searchSpaceId,
relative_paths: batch.map((e) => e.relativePath),
root_folder_id: resolvedRootFolderId,
enable_summary: params.enableSummary,
processing_mode: params.processingMode,
},
params.signal
);
if (result.root_folder_id && !resolvedRootFolderId) {
resolvedRootFolderId = result.root_folder_id;
}
params.onBatchComplete?.(batch.length);
} catch (err) {
if (params.signal?.aborted) return;
const msg = (err as Error)?.message || "Upload failed";
errors.push(`Batch ${idx}: ${msg}`);
}
}
}
const workers = Array.from({ length: Math.min(UPLOAD_CONCURRENCY, batches.length) }, () =>
processNext()
);
await Promise.all(workers);
if (errors.length > 0 && !params.signal?.aborted) {
console.error("Some batches failed:", errors);
}
return resolvedRootFolderId;
}
/**
* Run a full upload-based folder scan: list files, mtime-check, upload
* changed files in parallel batches, and finalize (delete orphans).
*
* Returns the root_folder_id to pass to addWatchedFolder.
*/
export async function uploadFolderScan(params: FolderSyncParams): Promise<number | null> {
const api = window.electronAPI;
if (!api) throw new Error("Electron API not available");
const {
folderPath,
folderName,
searchSpaceId,
excludePatterns,
fileExtensions,
enableSummary,
processingMode,
signal,
} = params;
let rootFolderId = params.rootFolderId ?? null;
params.onProgress?.({ phase: "listing", uploaded: 0, total: 0 });
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
const allFiles = await api.listFolderFiles({
path: folderPath,
name: folderName,
excludePatterns,
fileExtensions,
rootFolderId: rootFolderId ?? null,
searchSpaceId,
active: true,
});
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
params.onProgress?.({ phase: "checking", uploaded: 0, total: allFiles.length });
const mtimeCheckResult = await documentsApiService.folderMtimeCheck({
folder_name: folderName,
search_space_id: searchSpaceId,
files: allFiles.map((f) => ({ relative_path: f.relativePath, mtime: f.mtimeMs / 1000 })),
});
const filesToUpload = mtimeCheckResult.files_to_upload;
const uploadSet = new Set(filesToUpload);
const entriesToUpload = allFiles.filter((f) => uploadSet.has(f.relativePath));
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
if (entriesToUpload.length > 0) {
const batches = buildBatches(entriesToUpload);
let uploaded = 0;
params.onProgress?.({ phase: "uploading", uploaded: 0, total: entriesToUpload.length });
const uploadedRootId = await uploadBatchesWithConcurrency(batches, {
folderName,
searchSpaceId,
rootFolderId: rootFolderId ?? null,
enableSummary,
processingMode,
signal,
onBatchComplete: (count) => {
uploaded += count;
params.onProgress?.({ phase: "uploading", uploaded, total: entriesToUpload.length });
},
});
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
if (uploadedRootId) {
rootFolderId = uploadedRootId;
}
}
if (signal?.aborted) throw new DOMException("Aborted", "AbortError");
params.onProgress?.({
phase: "finalizing",
uploaded: entriesToUpload.length,
total: entriesToUpload.length,
});
await documentsApiService.folderSyncFinalize({
folder_name: folderName,
search_space_id: searchSpaceId,
root_folder_id: rootFolderId ?? null,
all_relative_paths: allFiles.map((f) => f.relativePath),
});
params.onProgress?.({
phase: "done",
uploaded: entriesToUpload.length,
total: entriesToUpload.length,
});
// Seed the Electron mtime store so the reconciliation scan in
// startWatcher won't re-emit events for files we just indexed.
if (api.seedFolderMtimes) {
const mtimes: Record<string, number> = {};
for (const f of allFiles) {
mtimes[f.relativePath] = f.mtimeMs;
}
await api.seedFolderMtimes(folderPath, mtimes);
}
return rootFolderId;
}