feat: implement pending file event handling using durable queue with acknowledgment support in folder synchronization

This commit is contained in:
Anish Sarkar 2026-04-03 00:40:49 +05:30
parent b46c5532b3
commit e0b35cfbab
6 changed files with 175 additions and 35 deletions

View file

@ -17,6 +17,8 @@ export const IPC_CHANNELS = {
FOLDER_SYNC_PAUSE: 'folder-sync:pause',
FOLDER_SYNC_RESUME: 'folder-sync:resume',
FOLDER_SYNC_RENDERER_READY: 'folder-sync:renderer-ready',
FOLDER_SYNC_GET_PENDING_EVENTS: 'folder-sync:get-pending-events',
FOLDER_SYNC_ACK_EVENTS: 'folder-sync:ack-events',
BROWSE_FILE_OR_FOLDER: 'browse:file-or-folder',
READ_LOCAL_FILES: 'browse:read-local-files',
} as const;

View file

@ -6,6 +6,8 @@ import {
removeWatchedFolder,
getWatchedFolders,
getWatcherStatus,
getPendingFileEvents,
acknowledgeFileEvents,
pauseWatcher,
resumeWatcher,
markRendererReady,
@ -52,6 +54,14 @@ export function registerIpcHandlers(): void {
markRendererReady();
});
ipcMain.handle(IPC_CHANNELS.FOLDER_SYNC_GET_PENDING_EVENTS, () =>
getPendingFileEvents()
);
ipcMain.handle(IPC_CHANNELS.FOLDER_SYNC_ACK_EVENTS, (_event, eventIds: string[]) =>
acknowledgeFileEvents(eventIds)
);
ipcMain.handle(IPC_CHANNELS.BROWSE_FILE_OR_FOLDER, () => browseFileOrFolder());
ipcMain.handle(IPC_CHANNELS.READ_LOCAL_FILES, (_event, paths: string[]) =>

View file

@ -1,5 +1,6 @@
import { BrowserWindow, dialog } from 'electron';
import chokidar, { type FSWatcher } from 'chokidar';
import { randomUUID } from 'crypto';
import * as path from 'path';
import * as fs from 'fs';
import { IPC_CHANNELS } from '../ipc/channels';
@ -20,12 +21,27 @@ interface WatcherEntry {
}
type MtimeMap = Record<string, number>;
type FolderSyncAction = 'add' | 'change' | 'unlink';
export interface FolderSyncFileChangedEvent {
id: string;
rootFolderId: number | null;
searchSpaceId: number;
folderPath: string;
folderName: string;
relativePath: string;
fullPath: string;
action: FolderSyncAction;
timestamp: number;
}
const STORE_KEY = 'watchedFolders';
const OUTBOX_STORE_KEY = 'events';
const MTIME_TOLERANCE_S = 1.0;
let store: any = null;
let mtimeStore: any = null;
let outboxStore: any = null;
let watchers: Map<string, WatcherEntry> = new Map();
/**
@ -35,22 +51,11 @@ let watchers: Map<string, WatcherEntry> = new Map();
const mtimeMaps: Map<string, MtimeMap> = new Map();
let rendererReady = false;
const pendingEvents: any[] = [];
const outboxEvents: Map<string, FolderSyncFileChangedEvent> = new Map();
let outboxLoaded = false;
export function markRendererReady() {
rendererReady = true;
for (const event of pendingEvents) {
sendToRenderer(IPC_CHANNELS.FOLDER_SYNC_FILE_CHANGED, event);
}
pendingEvents.length = 0;
}
function sendFileChangedEvent(data: any) {
if (rendererReady) {
sendToRenderer(IPC_CHANNELS.FOLDER_SYNC_FILE_CHANGED, data);
} else {
pendingEvents.push(data);
}
}
async function getStore() {
@ -77,6 +82,57 @@ async function getMtimeStore() {
return mtimeStore;
}
async function getOutboxStore() {
if (!outboxStore) {
const { default: Store } = await import('electron-store');
outboxStore = new Store({
name: 'folder-sync-outbox',
defaults: {
[OUTBOX_STORE_KEY]: [] as FolderSyncFileChangedEvent[],
},
});
}
return outboxStore;
}
function makeEventKey(event: Pick<FolderSyncFileChangedEvent, 'folderPath' | 'relativePath'>): string {
return `${event.folderPath}:${event.relativePath}`;
}
function persistOutbox() {
getOutboxStore().then((s) => {
s.set(OUTBOX_STORE_KEY, Array.from(outboxEvents.values()));
});
}
async function loadOutbox() {
if (outboxLoaded) return;
const s = await getOutboxStore();
const stored: FolderSyncFileChangedEvent[] = s.get(OUTBOX_STORE_KEY, []);
outboxEvents.clear();
for (const event of stored) {
if (!event?.id || !event.folderPath || !event.relativePath) continue;
outboxEvents.set(makeEventKey(event), event);
}
outboxLoaded = true;
}
function sendFileChangedEvent(
data: Omit<FolderSyncFileChangedEvent, 'id'>
) {
const event: FolderSyncFileChangedEvent = {
id: randomUUID(),
...data,
};
outboxEvents.set(makeEventKey(event), event);
persistOutbox();
if (rendererReady) {
sendToRenderer(IPC_CHANNELS.FOLDER_SYNC_FILE_CHANGED, event);
}
}
function loadMtimeMap(folderPath: string): MtimeMap {
return mtimeMaps.get(folderPath) ?? {};
}
@ -235,7 +291,7 @@ async function startWatcher(config: WatchedFolderConfig) {
});
});
const handleFileEvent = (filePath: string, action: string) => {
const handleFileEvent = (filePath: string, action: FolderSyncAction) => {
if (!ready) return;
const relativePath = path.relative(config.path, filePath);
@ -357,6 +413,32 @@ export async function getWatcherStatus(): Promise<
}));
}
export async function getPendingFileEvents(): Promise<FolderSyncFileChangedEvent[]> {
await loadOutbox();
return Array.from(outboxEvents.values()).sort((a, b) => a.timestamp - b.timestamp);
}
export async function acknowledgeFileEvents(eventIds: string[]): Promise<{ acknowledged: number }> {
if (!eventIds || eventIds.length === 0) return { acknowledged: 0 };
await loadOutbox();
const ackSet = new Set(eventIds);
let acknowledged = 0;
for (const [key, event] of outboxEvents.entries()) {
if (ackSet.has(event.id)) {
outboxEvents.delete(key);
acknowledged += 1;
}
}
if (acknowledged > 0) {
persistOutbox();
}
return { acknowledged };
}
export async function pauseWatcher(): Promise<void> {
for (const [, entry] of watchers) {
if (entry.watcher) {
@ -375,6 +457,7 @@ export async function resumeWatcher(): Promise<void> {
}
export async function registerFolderWatcher(): Promise<void> {
await loadOutbox();
const s = await getStore();
const folders: WatchedFolderConfig[] = s.get(STORE_KEY, []);

View file

@ -45,6 +45,8 @@ contextBridge.exposeInMainWorld('electronAPI', {
pauseWatcher: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_PAUSE),
resumeWatcher: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_RESUME),
signalRendererReady: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_RENDERER_READY),
getPendingFileEvents: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_GET_PENDING_EVENTS),
acknowledgeFileEvents: (eventIds: string[]) => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_ACK_EVENTS, eventIds),
// Unified browse (files + folders)
browseFileOrFolder: () => ipcRenderer.invoke(IPC_CHANNELS.BROWSE_FILE_OR_FOLDER),

View file

@ -4,6 +4,7 @@ import { useEffect, useRef } from "react";
import { documentsApiService } from "@/lib/apis/documents-api.service";
interface FileChangedEvent {
id: string;
rootFolderId: number | null;
searchSpaceId: number;
folderPath: string;
@ -15,25 +16,35 @@ interface FileChangedEvent {
}
const DEBOUNCE_MS = 2000;
interface QueueItem {
event: FileChangedEvent;
ackIds: string[];
}
export function useFolderSync() {
const queueRef = useRef<FileChangedEvent[]>([]);
const queueRef = useRef<QueueItem[]>([]);
const processingRef = useRef(false);
const debounceTimers = useRef<Map<string, ReturnType<typeof setTimeout>>>(new Map());
const pendingByKey = useRef<Map<string, QueueItem>>(new Map());
const isMountedRef = useRef(false);
async function processQueue() {
if (processingRef.current) return;
processingRef.current = true;
while (queueRef.current.length > 0) {
const event = queueRef.current.shift()!;
const item = queueRef.current.shift()!;
try {
await documentsApiService.folderIndexFile(event.searchSpaceId, {
folder_path: event.folderPath,
folder_name: event.folderName,
search_space_id: event.searchSpaceId,
target_file_path: event.fullPath,
root_folder_id: event.rootFolderId,
await documentsApiService.folderIndexFile(item.event.searchSpaceId, {
folder_path: item.event.folderPath,
folder_name: item.event.folderName,
search_space_id: item.event.searchSpaceId,
target_file_path: item.event.fullPath,
root_folder_id: item.event.rootFolderId,
});
const api = typeof window !== "undefined" ? window.electronAPI : null;
if (api?.acknowledgeFileEvents && item.ackIds.length > 0) {
await api.acknowledgeFileEvents(item.ackIds);
}
} catch (err) {
console.error("[FolderSync] Failed to trigger re-index:", err);
}
@ -41,34 +52,63 @@ export function useFolderSync() {
processingRef.current = false;
}
function enqueueWithDebounce(event: FileChangedEvent) {
const key = `${event.folderPath}:${event.relativePath}`;
const existing = pendingByKey.current.get(key);
const ackSet = new Set(existing?.ackIds ?? []);
ackSet.add(event.id);
pendingByKey.current.set(key, {
event,
ackIds: Array.from(ackSet),
});
const existingTimeout = debounceTimers.current.get(key);
if (existingTimeout) clearTimeout(existingTimeout);
const timeout = setTimeout(() => {
debounceTimers.current.delete(key);
const pending = pendingByKey.current.get(key);
if (!pending) return;
pendingByKey.current.delete(key);
queueRef.current.push(pending);
processQueue();
}, DEBOUNCE_MS);
debounceTimers.current.set(key, timeout);
}
useEffect(() => {
isMountedRef.current = true;
const api = typeof window !== "undefined" ? window.electronAPI : null;
if (!api?.onFileChanged) return;
if (!api?.onFileChanged) {
return () => {
isMountedRef.current = false;
};
}
// Signal to main process that the renderer is ready to receive events
api.signalRendererReady?.();
// Drain durable outbox first so events survive renderer startup gaps and restarts
void api.getPendingFileEvents?.().then((pendingEvents) => {
if (!isMountedRef.current || !pendingEvents?.length) return;
for (const event of pendingEvents) {
enqueueWithDebounce(event);
}
});
const cleanup = api.onFileChanged((event: FileChangedEvent) => {
const key = `${event.folderPath}:${event.fullPath}`;
const existing = debounceTimers.current.get(key);
if (existing) clearTimeout(existing);
const timeout = setTimeout(() => {
debounceTimers.current.delete(key);
queueRef.current.push(event);
processQueue();
}, DEBOUNCE_MS);
debounceTimers.current.set(key, timeout);
enqueueWithDebounce(event);
});
return () => {
isMountedRef.current = false;
cleanup();
for (const timeout of debounceTimers.current.values()) {
clearTimeout(timeout);
}
debounceTimers.current.clear();
pendingByKey.current.clear();
};
}, []);
}

View file

@ -11,6 +11,7 @@ interface WatchedFolderConfig {
}
interface FolderSyncFileChangedEvent {
id: string;
rootFolderId: number | null;
searchSpaceId: number;
folderPath: string;
@ -63,6 +64,8 @@ interface ElectronAPI {
pauseWatcher: () => Promise<void>;
resumeWatcher: () => Promise<void>;
signalRendererReady: () => Promise<void>;
getPendingFileEvents: () => Promise<FolderSyncFileChangedEvent[]>;
acknowledgeFileEvents: (eventIds: string[]) => Promise<{ acknowledged: number }>;
// Unified browse
browseFileOrFolder: () => Promise<BrowseResult | null>;
readLocalFiles: (paths: string[]) => Promise<LocalFileData[]>;