mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-25 18:55:19 +02:00
fix gmail sync
This commit is contained in:
parent
25a1976394
commit
4db42d17cf
1 changed files with 109 additions and 10 deletions
|
|
@ -28,6 +28,7 @@ const CACHE_DIR = path.join(WorkDir, 'inbox_lists');
|
|||
const SYNC_INTERVAL_MS = 30 * 1000; // Check every 30 seconds
|
||||
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
|
||||
const MAX_THREADS_IN_DIGEST = 10;
|
||||
const RECENT_BACKFILL_INTERVAL_MS = 15 * 60 * 1000;
|
||||
const nhm = new NodeHtmlMarkdown();
|
||||
|
||||
interface SnapshotCacheEntry {
|
||||
|
|
@ -713,7 +714,9 @@ async function processThread(auth: OAuth2Client, threadId: string, syncDir: stri
|
|||
|
||||
} catch (error) {
|
||||
console.error(`Error processing thread ${threadId}:`, error);
|
||||
return null;
|
||||
const status = getErrorStatus(error);
|
||||
if (status === 404) return null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -757,20 +760,102 @@ async function pruneInboxCache(auth: OAuth2Client): Promise<void> {
|
|||
}
|
||||
}
|
||||
|
||||
function loadState(stateFile: string): { historyId?: string; last_sync?: string } {
|
||||
function loadState(stateFile: string): { historyId?: string; last_sync?: string; last_recent_backfill?: string } {
|
||||
if (fs.existsSync(stateFile)) {
|
||||
return JSON.parse(fs.readFileSync(stateFile, 'utf-8'));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
function saveState(historyId: string, stateFile: string) {
|
||||
function saveState(historyId: string, stateFile: string, extra: { last_recent_backfill?: string } = {}) {
|
||||
const previous = loadState(stateFile);
|
||||
fs.writeFileSync(stateFile, JSON.stringify({
|
||||
historyId,
|
||||
last_sync: new Date().toISOString()
|
||||
last_sync: new Date().toISOString(),
|
||||
last_recent_backfill: extra.last_recent_backfill ?? previous.last_recent_backfill,
|
||||
...extra,
|
||||
}, null, 2));
|
||||
}
|
||||
|
||||
function getErrorStatus(error: unknown): number | undefined {
|
||||
const status = (error as { response?: { status?: number } }).response?.status;
|
||||
if (status) return status;
|
||||
const code = Number((error as { code?: number | string }).code);
|
||||
return Number.isFinite(code) ? code : undefined;
|
||||
}
|
||||
|
||||
function recentDateQuery(lookbackDays: number): string {
|
||||
const pastDate = new Date();
|
||||
pastDate.setDate(pastDate.getDate() - lookbackDays);
|
||||
return pastDate.toISOString().split('T')[0].replace(/-/g, '/');
|
||||
}
|
||||
|
||||
async function listRecentNonDeletedThreadIds(gmailClient: gmail.Gmail, lookbackDays: number): Promise<RecentThreadInfo[]> {
|
||||
const dateQuery = recentDateQuery(lookbackDays);
|
||||
const results: RecentThreadInfo[] = [];
|
||||
const seen = new Set<string>();
|
||||
let pageToken: string | undefined;
|
||||
|
||||
do {
|
||||
const res = await gmailClient.users.threads.list({
|
||||
userId: 'me',
|
||||
q: `after:${dateQuery} -in:spam -in:trash`,
|
||||
maxResults: 500,
|
||||
pageToken,
|
||||
});
|
||||
for (const thread of res.data.threads || []) {
|
||||
if (!thread.id || seen.has(thread.id)) continue;
|
||||
seen.add(thread.id);
|
||||
results.push({
|
||||
threadId: thread.id,
|
||||
historyId: thread.historyId || '',
|
||||
snippet: thread.snippet || undefined,
|
||||
});
|
||||
}
|
||||
pageToken = res.data.nextPageToken ?? undefined;
|
||||
} while (pageToken);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
function shouldRunRecentBackfill(stateFile: string): boolean {
|
||||
const state = loadState(stateFile);
|
||||
if (!state.last_recent_backfill) return true;
|
||||
const lastRunMs = new Date(state.last_recent_backfill).getTime();
|
||||
if (!Number.isFinite(lastRunMs)) return true;
|
||||
return Date.now() - lastRunMs >= RECENT_BACKFILL_INTERVAL_MS;
|
||||
}
|
||||
|
||||
async function backfillMissingRecentThreads(
|
||||
auth: OAuth2Client,
|
||||
syncDir: string,
|
||||
attachmentsDir: string,
|
||||
stateFile: string,
|
||||
lookbackDays: number,
|
||||
): Promise<SyncedThread[]> {
|
||||
if (!shouldRunRecentBackfill(stateFile)) return [];
|
||||
|
||||
const gmailClient = google.gmail({ version: 'v1', auth });
|
||||
const recentThreads = await listRecentNonDeletedThreadIds(gmailClient, lookbackDays);
|
||||
const missingThreadIds = recentThreads
|
||||
.map((thread) => thread.threadId)
|
||||
.filter((threadId) => !fs.existsSync(path.join(syncDir, `${threadId}.md`)));
|
||||
|
||||
const synced: SyncedThread[] = [];
|
||||
for (const threadId of missingThreadIds) {
|
||||
const result = await processThread(auth, threadId, syncDir, attachmentsDir);
|
||||
if (result) synced.push(result);
|
||||
}
|
||||
|
||||
const profile = await gmailClient.users.getProfile({ userId: 'me' });
|
||||
saveState(profile.data.historyId!, stateFile, { last_recent_backfill: new Date().toISOString() });
|
||||
|
||||
if (missingThreadIds.length > 0) {
|
||||
console.log(`Recent Gmail backfill synced ${synced.length}/${missingThreadIds.length} missing thread(s).`);
|
||||
}
|
||||
return synced;
|
||||
}
|
||||
|
||||
async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: string, stateFile: string, lookbackDays: number) {
|
||||
const gmail = google.gmail({ version: 'v1', auth });
|
||||
|
||||
|
|
@ -814,6 +899,7 @@ async function fullSync(auth: OAuth2Client, syncDir: string, attachmentsDir: str
|
|||
const res = await gmail.users.threads.list({
|
||||
userId: 'me',
|
||||
q: `after:${dateQuery} -in:spam -in:trash`,
|
||||
maxResults: 500,
|
||||
pageToken
|
||||
});
|
||||
|
||||
|
|
@ -907,15 +993,24 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
|
|||
};
|
||||
|
||||
try {
|
||||
const res = await gmail.users.history.list({
|
||||
userId: 'me',
|
||||
startHistoryId,
|
||||
historyTypes: ['messageAdded']
|
||||
});
|
||||
const changes: gmail.Schema$History[] = [];
|
||||
let pageToken: string | undefined;
|
||||
do {
|
||||
const res = await gmail.users.history.list({
|
||||
userId: 'me',
|
||||
startHistoryId,
|
||||
historyTypes: ['messageAdded'],
|
||||
maxResults: 500,
|
||||
pageToken,
|
||||
});
|
||||
if (res.data.history) changes.push(...res.data.history);
|
||||
pageToken = res.data.nextPageToken ?? undefined;
|
||||
} while (pageToken);
|
||||
|
||||
const changes = res.data.history;
|
||||
if (!changes || changes.length === 0) {
|
||||
console.log("No new changes.");
|
||||
const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays);
|
||||
await publishGmailSyncEvent(backfilled);
|
||||
const profile = await gmail.users.getProfile({ userId: 'me' });
|
||||
saveState(profile.data.historyId!, stateFile);
|
||||
return;
|
||||
|
|
@ -937,6 +1032,8 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
|
|||
}
|
||||
|
||||
if (threadIds.size === 0) {
|
||||
const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays);
|
||||
await publishGmailSyncEvent(backfilled);
|
||||
const profile = await gmail.users.getProfile({ userId: 'me' });
|
||||
saveState(profile.data.historyId!, stateFile);
|
||||
return;
|
||||
|
|
@ -961,6 +1058,8 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
|
|||
const result = await processThread(auth, tid, syncDir, attachmentsDir);
|
||||
if (result) synced.push(result);
|
||||
}
|
||||
const backfilled = await backfillMissingRecentThreads(auth, syncDir, attachmentsDir, stateFile, lookbackDays);
|
||||
synced.push(...backfilled);
|
||||
|
||||
await publishGmailSyncEvent(synced);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue