integrate knowledge sync with oauth

This commit is contained in:
Ramnique Singh 2026-01-06 06:57:03 +05:30
parent dfe940d0ba
commit 41eef3c1b5
3 changed files with 324 additions and 216 deletions

View file

@ -1,97 +1,126 @@
import fs from 'fs';
import path from 'path';
import { google, calendar_v3 as cal, drive_v3 as drive } from 'googleapis';
import { authenticate } from '@google-cloud/local-auth';
import { OAuth2Client } from 'google-auth-library';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import container from '../di/container.js';
import { IOAuthRepo } from '../auth/repo.js';
import { getProviderConfig } from '../auth/providers.js';
import { createOAuthService } from '../auth/oauth.js';
import { WorkDir } from '../config/config.js';
import { OAuthTokens } from 'packages/shared/dist/auth.js';
// Configuration
const CREDENTIALS_PATH = path.join(process.cwd(), 'credentials.json');
const TOKEN_PATH = path.join(process.cwd(), 'token_calendar_notes.json'); // Changed to force re-auth with new scopes
const SYNC_INTERVAL_MS = 60 * 1000;
const SCOPES = [
const SYNC_DIR = path.join(WorkDir, 'calendar_sync');
const SYNC_INTERVAL_MS = 60 * 1000; // Check every minute
const LOOKBACK_DAYS = 14;
const REQUIRED_SCOPES = [
'https://www.googleapis.com/auth/calendar.readonly',
'https://www.googleapis.com/auth/drive.readonly'
];
const PROVIDER_NAME = 'google';
const nhm = new NodeHtmlMarkdown();
// --- Auth Functions ---
async function loadSavedCredentialsIfExist(): Promise<OAuth2Client | null> {
try {
if (!fs.existsSync(TOKEN_PATH)) return null;
const tokenContent = fs.readFileSync(TOKEN_PATH, 'utf-8');
const tokenData = JSON.parse(tokenContent);
/**
* Get OAuth repository from DI container
*/
function getOAuthRepo(): IOAuthRepo {
return container.resolve<IOAuthRepo>('oauthRepo');
}
const credsContent = fs.readFileSync(CREDENTIALS_PATH, 'utf-8');
const keys = JSON.parse(credsContent);
const key = keys.installed || keys.web;
/**
* Check if all required scopes are present in the granted scopes
*/
function hasRequiredScopes(grantedScopes?: string[]): boolean {
if (!grantedScopes || grantedScopes.length === 0) {
return false;
}
// Check if all required scopes are present
return REQUIRED_SCOPES.every(scope => grantedScopes.includes(scope));
}
const client = new google.auth.OAuth2(
key.client_id,
key.client_secret,
key.redirect_uris ? key.redirect_uris[0] : 'http://localhost'
);
/**
* Convert OAuthTokens to OAuth2Client for use with googleapis
*/
async function createOAuth2Client(): Promise<OAuth2Client | null> {
const oauthRepo = getOAuthRepo();
const tokens = await oauthRepo.getTokens(PROVIDER_NAME);
client.setCredentials({
refresh_token: tokenData.refresh_token || tokenData.refreshToken,
access_token: tokenData.token || tokenData.access_token,
expiry_date: tokenData.expiry || tokenData.expiry_date,
scope: tokenData.scope
});
return client;
} catch (err) {
console.error("Error loading saved credentials:", err);
if (!tokens) {
return null;
}
}
async function saveCredentials(client: OAuth2Client) {
const content = fs.readFileSync(CREDENTIALS_PATH, 'utf-8');
const keys = JSON.parse(content);
const key = keys.installed || keys.web;
const payload = JSON.stringify({
type: 'authorized_user',
client_id: key.client_id,
client_secret: key.client_secret,
refresh_token: client.credentials.refresh_token,
access_token: client.credentials.access_token,
expiry_date: client.credentials.expiry_date,
}, null, 2);
fs.writeFileSync(TOKEN_PATH, payload);
}
// Check if token is expired
const now = Math.floor(Date.now() / 1000);
if (tokens.expires_at <= now) {
// Token expired, try to refresh
if (!tokens.refresh_token) {
console.log("Token expired and no refresh token available.");
return null;
}
async function authorize(): Promise<OAuth2Client> {
let client: OAuth2Client | null = await loadSavedCredentialsIfExist();
if (client && client.credentials && client.credentials.expiry_date && client.credentials.expiry_date > Date.now()) {
console.log("Using existing valid token.");
return client;
}
if (client && client.credentials && (!client.credentials.expiry_date || client.credentials.expiry_date <= Date.now()) && client.credentials.refresh_token) {
console.log("Refreshing expired token...");
try {
await client.refreshAccessToken();
await saveCredentials(client);
return client;
} catch (e) {
console.error("Failed to refresh token:", e);
if (fs.existsSync(TOKEN_PATH)) fs.unlinkSync(TOKEN_PATH);
const oauthService = createOAuthService(PROVIDER_NAME);
const existingScopes = tokens.scopes;
const refreshedTokens = await oauthService.refreshAccessToken(tokens.refresh_token, existingScopes);
await oauthRepo.saveTokens(PROVIDER_NAME, refreshedTokens);
// Use refreshed tokens
return createClientFromTokens(refreshedTokens);
} catch (error) {
console.error("Failed to refresh token:", error);
return null;
}
}
console.log("Performing new OAuth authentication...");
client = await authenticate({
scopes: SCOPES,
keyfilePath: CREDENTIALS_PATH,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
if (client && client.credentials) {
await saveCredentials(client);
return createClientFromTokens(tokens);
}
/**
* Create OAuth2Client from OAuthTokens
*/
function createClientFromTokens(tokens: OAuthTokens): OAuth2Client {
const providerConfig = getProviderConfig(PROVIDER_NAME);
// Create OAuth2Client directly (PKCE flow doesn't use client secret)
const client = new OAuth2Client(
providerConfig.clientId,
undefined, // client_secret not needed for PKCE
undefined // redirect_uri not needed for token usage
);
// Set credentials
client.setCredentials({
access_token: tokens.access_token,
refresh_token: tokens.refresh_token || undefined,
expiry_date: tokens.expires_at * 1000, // Convert from seconds to milliseconds
scope: tokens.scopes?.join(' ') || undefined,
});
return client;
}
/**
* Check if Google OAuth credentials are available with required scopes
*/
async function hasValidCredentials(): Promise<boolean> {
const oauthRepo = getOAuthRepo();
const isConnected = await oauthRepo.isConnected(PROVIDER_NAME);
if (!isConnected) {
return false;
}
return client!;
const tokens = await oauthRepo.getTokens(PROVIDER_NAME);
if (!tokens) {
return false;
}
// Check if all required scopes are present
return hasRequiredScopes(tokens.scopes);
}
// --- Helper Functions ---
@ -112,9 +141,9 @@ function cleanUpOldFiles(currentEventIds: Set<string>, syncDir: string) {
// We expect files like:
// {eventId}.json
// {eventId}_doc_{docId}.md
let eventId: string | null = null;
if (filename.endsWith('.json')) {
eventId = filename.replace('.json', '');
} else if (filename.endsWith('.md')) {
@ -144,7 +173,7 @@ async function saveEvent(event: cal.Schema$Event, syncDir: string): Promise<bool
if (!eventId) return false;
const filePath = path.join(syncDir, `${eventId}.json`);
try {
fs.writeFileSync(filePath, JSON.stringify(event, null, 2));
return true;
@ -165,20 +194,20 @@ async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, s
for (const att of event.attachments) {
// We only care about Google Docs
if (att.mimeType === 'application/vnd.google-apps.document') {
const fileId = att.fileId;
const safeTitle = cleanFilename(att.title || 'Untitled');
// Unique filename linked to event
const filename = `${eventId}_doc_${safeTitle}.md`;
const filePath = path.join(syncDir, filename);
const fileId = att.fileId;
const safeTitle = cleanFilename(att.title || 'Untitled');
// Unique filename linked to event
const filename = `${eventId}_doc_${safeTitle}.md`;
const filePath = path.join(syncDir, filename);
// Simple cache check: if file exists, skip.
// Ideally we check modifiedTime, but that requires an extra API call per file.
// Given the loop interval, we can just check existence to save quota.
// If user updates notes, they might want them re-synced.
// For now, let's just check existence. To be smarter, we'd need a state file or check API.
if (fs.existsSync(filePath)) continue;
// Simple cache check: if file exists, skip.
// Ideally we check modifiedTime, but that requires an extra API call per file.
// Given the loop interval, we can just check existence to save quota.
// If user updates notes, they might want them re-synced.
// For now, let's just check existence. To be smarter, we'd need a state file or check API.
if (fs.existsSync(filePath)) continue;
try {
try {
const res = await drive.files.export({
fileId: fileId ?? '',
mimeType: 'text/html'
@ -199,9 +228,9 @@ async function processAttachments(drive: drive.Drive, event: cal.Schema$Event, s
fs.writeFileSync(filePath, frontmatter + md);
console.log(`Synced Note: ${att.title} for event ${eventTitle}`);
} catch (e) {
console.error(`Failed to download note ${att.title}:`, e);
}
} catch (e) {
console.error(`Failed to download note ${att.title}:`, e);
}
}
}
}
@ -249,39 +278,66 @@ async function syncCalendarWindow(auth: OAuth2Client, syncDir: string, lookbackD
} catch (error) {
console.error("An error occurred during calendar sync:", error);
}
}
async function main() {
console.log("Starting Google Calendar & Notes Sync (TS)...");
const syncDirArg = process.argv[2];
const lookbackDaysArg = process.argv[3];
const SYNC_DIR = syncDirArg || 'synced_calendar_events';
const LOOKBACK_DAYS = lookbackDaysArg ? parseInt(lookbackDaysArg, 10) : 14;
if (isNaN(LOOKBACK_DAYS) || LOOKBACK_DAYS <= 0) {
console.error("Error: Lookback days must be a positive number.");
process.exit(1);
}
if (!fs.existsSync(SYNC_DIR)) {
fs.mkdirSync(SYNC_DIR, { recursive: true });
}
try {
const auth = await authorize();
console.log("Authorization successful.");
while (true) {
await syncCalendarWindow(auth, SYNC_DIR, LOOKBACK_DAYS);
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS));
// If 401, clear tokens to force re-auth next run
const e = error as { response?: { status?: number } };
if (e.response?.status === 401) {
console.log("401 Unauthorized. Clearing tokens to force re-authentication.");
const oauthRepo = getOAuthRepo();
await oauthRepo.clearTokens(PROVIDER_NAME);
}
} catch (error) {
console.error("Fatal error in main loop:", error);
throw error; // Re-throw to be handled by performSync
}
}
main().catch(console.error);
async function performSync(syncDir: string, lookbackDays: number) {
try {
if (!fs.existsSync(SYNC_DIR)) {
fs.mkdirSync(SYNC_DIR, { recursive: true });
}
const auth = await createOAuth2Client();
if (!auth) {
console.log("No valid OAuth credentials available.");
return;
}
console.log("Authorization successful. Starting sync...");
await syncCalendarWindow(auth, syncDir, lookbackDays);
console.log("Sync completed.");
} catch (error) {
console.error("Error during sync:", error);
// If 401, clear tokens to force re-auth next run
const e = error as { response?: { status?: number } };
if (e.response?.status === 401) {
console.log("401 Unauthorized. Clearing tokens to force re-authentication.");
const oauthRepo = getOAuthRepo();
await oauthRepo.clearTokens(PROVIDER_NAME);
}
}
}
export async function init() {
console.log("Starting Google Calendar & Notes Sync (TS)...");
console.log(`Will check for credentials every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
// Check if credentials are available with required scopes
const hasCredentials = await hasValidCredentials();
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Calendar/Drive scopes. Sleeping...");
} else {
// Perform one sync
await performSync(SYNC_DIR, LOOKBACK_DAYS);
}
} catch (error) {
console.error("Error in main loop:", error);
}
// Sleep for N minutes before next check
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS));
}
}

View file

@ -1,91 +1,120 @@
import fs from 'fs';
import path from 'path';
import { google, gmail_v1 as gmail } from 'googleapis';
import { authenticate } from '@google-cloud/local-auth';
import { NodeHtmlMarkdown } from 'node-html-markdown'
import { OAuth2Client } from 'google-auth-library';
import { WorkDir } from '../config/config.js';
import container from '../di/container.js';
import { IOAuthRepo } from '../auth/repo.js';
import { getProviderConfig } from '../auth/providers.js';
import { createOAuthService } from '../auth/oauth.js';
import { OAuthTokens } from 'packages/shared/dist/auth.js';
// Configuration
const DEFAULT_SYNC_DIR = 'synced_emails_ts';
const CREDENTIALS_PATH = path.join(process.cwd(), 'credentials.json');
const TOKEN_PATH = path.join(process.cwd(), 'token_api.json'); // Reuse Python's token
const SYNC_INTERVAL_MS = 60 * 1000;
const SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'];
const SYNC_DIR = path.join(WorkDir, 'gmail_sync');
const SYNC_INTERVAL_MS = 60 * 1000; // Check every minute
const REQUIRED_SCOPE = 'https://www.googleapis.com/auth/gmail.readonly';
const PROVIDER_NAME = 'google';
const nhm = new NodeHtmlMarkdown();
// --- Auth Functions ---
async function loadSavedCredentialsIfExist(): Promise<OAuth2Client | null> {
try {
const tokenContent = fs.readFileSync(TOKEN_PATH, 'utf-8');
const tokenData = JSON.parse(tokenContent);
/**
* Get OAuth repository from DI container
*/
function getOAuthRepo(): IOAuthRepo {
return container.resolve<IOAuthRepo>('oauthRepo');
}
const credsContent = fs.readFileSync(CREDENTIALS_PATH, 'utf-8');
const keys = JSON.parse(credsContent);
const key = keys.installed || keys.web;
/**
* Check if the required Gmail scope is present in the granted scopes
*/
function hasRequiredScope(grantedScopes?: string[]): boolean {
if (!grantedScopes || grantedScopes.length === 0) {
return false;
}
return grantedScopes.includes(REQUIRED_SCOPE);
}
// Manually construct credentials for google.auth.fromJSON
const credentials = {
type: 'authorized_user',
client_id: key.client_id,
client_secret: key.client_secret,
refresh_token: tokenData.refresh_token || tokenData.refreshToken, // Handle both cases
access_token: tokenData.token || tokenData.access_token, // Handle both cases
expiry_date: tokenData.expiry || tokenData.expiry_date
};
return google.auth.fromJSON(credentials) as OAuth2Client;
} catch (err) {
console.error("Error loading saved credentials:", err);
/**
* Convert OAuthTokens to OAuth2Client for use with googleapis
*/
async function createOAuth2Client(): Promise<OAuth2Client | null> {
const oauthRepo = getOAuthRepo();
const tokens = await oauthRepo.getTokens(PROVIDER_NAME);
if (!tokens) {
return null;
}
}
async function saveCredentials(client: OAuth2Client) {
const content = fs.readFileSync(CREDENTIALS_PATH, 'utf-8');
const keys = JSON.parse(content);
const key = keys.installed || keys.web;
const payload = JSON.stringify({
type: 'authorized_user',
client_id: key.client_id,
client_secret: key.client_secret,
refresh_token: client.credentials.refresh_token,
access_token: client.credentials.access_token,
expiry_date: client.credentials.expiry_date,
}, null, 2);
fs.writeFileSync(TOKEN_PATH, payload);
}
// Check if token is expired
const now = Math.floor(Date.now() / 1000);
if (tokens.expires_at <= now) {
// Token expired, try to refresh
if (!tokens.refresh_token) {
console.log("Token expired and no refresh token available.");
return null;
}
async function authorize(): Promise<OAuth2Client> {
let client = await loadSavedCredentialsIfExist();
if (client && client.credentials && client.credentials.expiry_date && client.credentials.expiry_date > Date.now()) {
console.log("Using existing valid token.");
return client;
}
if (client && client.credentials && (!client.credentials.expiry_date || client.credentials.expiry_date <= Date.now()) && client.credentials.refresh_token) {
console.log("Refreshing expired token...");
try {
await client.refreshAccessToken();
await saveCredentials(client); // Save refreshed token
return client;
} catch (e) {
console.error("Failed to refresh token:", e);
// Fall through to full re-auth if refresh fails
if (fs.existsSync(TOKEN_PATH)) fs.unlinkSync(TOKEN_PATH);
const oauthService = createOAuthService(PROVIDER_NAME);
const existingScopes = tokens.scopes;
const refreshedTokens = await oauthService.refreshAccessToken(tokens.refresh_token, existingScopes);
await oauthRepo.saveTokens(PROVIDER_NAME, refreshedTokens);
// Use refreshed tokens
return createClientFromTokens(refreshedTokens);
} catch (error) {
console.error("Failed to refresh token:", error);
return null;
}
}
console.log("Performing new OAuth authentication...");
client = await authenticate({
scopes: SCOPES,
keyfilePath: CREDENTIALS_PATH,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
if (client && client.credentials) {
await saveCredentials(client);
return createClientFromTokens(tokens);
}
/**
* Create OAuth2Client from OAuthTokens
*/
function createClientFromTokens(tokens: OAuthTokens): OAuth2Client {
const providerConfig = getProviderConfig(PROVIDER_NAME);
// Create OAuth2Client directly (PKCE flow doesn't use client secret)
const client = new OAuth2Client(
providerConfig.clientId,
undefined, // client_secret not needed for PKCE
undefined // redirect_uri not needed for token usage
);
// Set credentials
client.setCredentials({
access_token: tokens.access_token,
refresh_token: tokens.refresh_token || undefined,
expiry_date: tokens.expires_at * 1000, // Convert from seconds to milliseconds
});
return client;
}
/**
* Check if Google OAuth credentials are available with required scopes
*/
async function hasValidCredentials(): Promise<boolean> {
const oauthRepo = getOAuthRepo();
const isConnected = await oauthRepo.isConnected(PROVIDER_NAME);
if (!isConnected) {
return false;
}
return client!;
const tokens = await oauthRepo.getTokens(PROVIDER_NAME);
if (!tokens) {
return false;
}
// Check if required scope is present
return hasRequiredScope(tokens.scopes);
}
// --- Helper Functions ---
@ -318,28 +347,18 @@ async function partialSync(auth: OAuth2Client, startHistoryId: string, syncDir:
await fullSync(auth, syncDir, attachmentsDir, stateFile, lookbackDays);
} else {
console.error("Error during partial sync:", error);
// If 401, remove token to force re-auth next run
if (e.response?.status === 401 && fs.existsSync(TOKEN_PATH)) {
console.log("401 Unauthorized. Deleting token to force re-authentication.");
fs.unlinkSync(TOKEN_PATH);
// If 401, clear tokens to force re-auth next run
if (e.response?.status === 401) {
console.log("401 Unauthorized. Clearing tokens to force re-authentication.");
const oauthRepo = getOAuthRepo();
await oauthRepo.clearTokens(PROVIDER_NAME);
}
}
}
}
async function main() {
console.log("Starting Gmail Sync (TS)...");
const syncDirArg = process.argv[2];
const lookbackDaysArg = process.argv[3];
const SYNC_DIR = syncDirArg || DEFAULT_SYNC_DIR;
const LOOKBACK_DAYS = lookbackDaysArg ? parseInt(lookbackDaysArg, 10) : 7; // Default to 7 days
if (isNaN(LOOKBACK_DAYS) || LOOKBACK_DAYS <= 0) {
console.error("Error: Lookback days must be a positive number.");
process.exit(1);
}
async function performSync() {
const LOOKBACK_DAYS = 7; // Default to 7 days
const ATTACHMENTS_DIR = path.join(SYNC_DIR, 'attachments');
const STATE_FILE = path.join(SYNC_DIR, 'sync_state.json');
@ -348,25 +367,50 @@ async function main() {
if (!fs.existsSync(ATTACHMENTS_DIR)) fs.mkdirSync(ATTACHMENTS_DIR, { recursive: true });
try {
const auth = await authorize();
console.log("Authorization successful.");
while (true) {
const state = loadState(STATE_FILE);
if (!state.historyId) {
console.log("No history ID found, starting full sync...");
await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
} else {
console.log("History ID found, starting partial sync...");
await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
}
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS));
const auth = await createOAuth2Client();
if (!auth) {
console.log("No valid OAuth credentials available.");
return;
}
console.log("Authorization successful. Starting sync...");
const state = loadState(STATE_FILE);
if (!state.historyId) {
console.log("No history ID found, starting full sync...");
await fullSync(auth, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
} else {
console.log("History ID found, starting partial sync...");
await partialSync(auth, state.historyId, SYNC_DIR, ATTACHMENTS_DIR, STATE_FILE, LOOKBACK_DAYS);
}
console.log("Sync completed.");
} catch (error) {
console.error("Fatal error in main loop:", error);
console.error("Error during sync:", error);
}
}
main().catch(console.error);
export async function init() {
console.log("Starting Gmail Sync (TS)...");
console.log(`Will check for credentials every ${SYNC_INTERVAL_MS / 1000} seconds.`);
while (true) {
try {
// Check if credentials are available with required scopes
const hasCredentials = await hasValidCredentials();
if (!hasCredentials) {
console.log("Google OAuth credentials not available or missing required Gmail scope. Sleeping...");
} else {
// Perform one sync
await performSync();
}
} catch (error) {
console.error("Error in main loop:", error);
}
// Sleep for N minutes before next check
console.log(`Sleeping for ${SYNC_INTERVAL_MS / 1000} seconds...`);
await new Promise(resolve => setTimeout(resolve, SYNC_INTERVAL_MS));
}
}