diff --git a/api/services/pipecat/tracing_config.py b/api/services/pipecat/tracing_config.py index 7ae8a64..f4bc30b 100644 --- a/api/services/pipecat/tracing_config.py +++ b/api/services/pipecat/tracing_config.py @@ -1,6 +1,7 @@ import base64 import os +from loguru import logger from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from api.constants import ENABLE_TRACING @@ -23,7 +24,7 @@ def setup_pipeline_tracing(): langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY") if not all([langfuse_host, langfuse_public_key, langfuse_secret_key]): - print( + logger.warning( "Warning: ENABLE_TRACING is true but Langfuse credentials are not configured. Tracing disabled." ) return @@ -39,6 +40,3 @@ def setup_pipeline_tracing(): otlp_exporter = OTLPSpanExporter() setup_tracing(service_name="dograh-pipeline", exporter=otlp_exporter) - print("Langfuse tracing enabled") - else: - print("Tracing disabled (ENABLE_TRACING=false)") diff --git a/scripts/start_services.sh b/scripts/start_services.sh index 1ab984c..055fde4 100755 --- a/scripts/start_services.sh +++ b/scripts/start_services.sh @@ -15,7 +15,7 @@ BASE_LOG_DIR="$BASE_DIR/logs" # Base logs directory TIMESTAMP=$(date +"%Y%m%d_%H%M%S") LOG_DIR="$BASE_LOG_DIR/$TIMESTAMP" # Timestamped log directory LATEST_LINK="$BASE_LOG_DIR/latest" # Symlink to latest logs -VENV_PATH="$BASE_DIR/venv" +VENV_PATH="$(dirname "$BASE_DIR")/venv" ARQ_WORKERS=${ARQ_WORKERS:-1} diff --git a/ui/src/app/integrations/[id]/gmail/page.tsx b/ui/src/app/integrations/[id]/gmail/page.tsx new file mode 100644 index 0000000..e6c9459 --- /dev/null +++ b/ui/src/app/integrations/[id]/gmail/page.tsx @@ -0,0 +1,390 @@ +'use client'; + +import { useParams, useRouter } from 'next/navigation'; +import { useCallback, useEffect, useState } from 'react'; + +import { getIntegrationAccessTokenApiV1IntegrationIntegrationIdAccessTokenGet } from '@/client/sdk.gen'; +import { useAuth } from '@/lib/auth'; +import logger from '@/lib/logger'; + +interface Email { + id: string; + threadId: string; + subject: string; + from: string; + snippet: string; + date: string; +} + +interface EmailDetail { + id: string; + threadId: string; + subject: string; + from: string; + to: string; + date: string; + body: string; +} + +interface GmailHeader { + name: string; + value: string; +} + +interface GmailPayloadPart { + mimeType: string; + body: { + data?: string; + }; +} + +export default function GmailSearchPage() { + const params = useParams(); + const router = useRouter(); + const integrationId = parseInt(params.id as string); + const { getAccessToken, redirectToLogin } = useAuth(); + + const [accessToken, setAccessToken] = useState(null); + const [searchQuery, setSearchQuery] = useState(''); + const [emails, setEmails] = useState([]); + const [selectedEmail, setSelectedEmail] = useState(null); + const [replyText, setReplyText] = useState(''); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [sendingReply, setSendingReply] = useState(false); + + const fetchAccessToken = useCallback(async () => { + try { + const token = await getAccessToken(); + if (!token) { + redirectToLogin(); + return; + } + + const response = await getIntegrationAccessTokenApiV1IntegrationIntegrationIdAccessTokenGet({ + path: { integration_id: integrationId }, + headers: { Authorization: `Bearer ${token}` }, + }); + + if (response.data?.access_token) { + setAccessToken(response.data.access_token); + } else { + setError('Failed to get access token'); + } + } catch (err) { + logger.error('Error fetching access token:', err); + setError('Failed to fetch access token. Please try again.'); + } + }, [getAccessToken, redirectToLogin, integrationId]); + + useEffect(() => { + fetchAccessToken(); + }, [fetchAccessToken]); + + const searchEmails = async () => { + if (!accessToken || !searchQuery.trim()) return; + + setLoading(true); + setError(null); + setEmails([]); + setSelectedEmail(null); + + try { + const response = await fetch( + `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(searchQuery)}&maxResults=20`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + } + ); + + if (!response.ok) { + throw new Error(`Gmail API error: ${response.statusText}`); + } + + const data = await response.json(); + + if (!data.messages || data.messages.length === 0) { + setEmails([]); + return; + } + + // Fetch details for each message + const emailPromises = data.messages.map(async (msg: { id: string }) => { + const msgResponse = await fetch( + `https://gmail.googleapis.com/gmail/v1/users/me/messages/${msg.id}?format=metadata&metadataHeaders=Subject&metadataHeaders=From&metadataHeaders=Date`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + } + ); + return msgResponse.json(); + }); + + const emailDetails = await Promise.all(emailPromises); + + const formattedEmails: Email[] = emailDetails.map((email) => { + const headers = email.payload.headers as GmailHeader[]; + const subject = headers.find((h) => h.name === 'Subject')?.value || 'No Subject'; + const from = headers.find((h) => h.name === 'From')?.value || 'Unknown'; + const date = headers.find((h) => h.name === 'Date')?.value || ''; + + return { + id: email.id, + threadId: email.threadId, + subject, + from, + snippet: email.snippet || '', + date, + }; + }); + + setEmails(formattedEmails); + } catch (err) { + logger.error('Error searching emails:', err); + setError('Failed to search emails. Please try again.'); + } finally { + setLoading(false); + } + }; + + const loadEmailDetail = async (emailId: string) => { + if (!accessToken) return; + + setLoading(true); + setError(null); + + try { + const response = await fetch( + `https://gmail.googleapis.com/gmail/v1/users/me/messages/${emailId}?format=full`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + } + ); + + if (!response.ok) { + throw new Error(`Gmail API error: ${response.statusText}`); + } + + const email = await response.json(); + const headers = email.payload.headers as GmailHeader[]; + + const subject = headers.find((h) => h.name === 'Subject')?.value || 'No Subject'; + const from = headers.find((h) => h.name === 'From')?.value || 'Unknown'; + const to = headers.find((h) => h.name === 'To')?.value || 'Unknown'; + const date = headers.find((h) => h.name === 'Date')?.value || ''; + + // Extract email body + let body = ''; + if (email.payload.body.data) { + body = atob(email.payload.body.data.replace(/-/g, '+').replace(/_/g, '/')); + } else if (email.payload.parts) { + const parts = email.payload.parts as GmailPayloadPart[]; + const textPart = parts.find((part) => part.mimeType === 'text/plain'); + if (textPart && textPart.body.data) { + body = atob(textPart.body.data.replace(/-/g, '+').replace(/_/g, '/')); + } + } + + setSelectedEmail({ + id: email.id, + threadId: email.threadId, + subject, + from, + to, + date, + body, + }); + } catch (err) { + logger.error('Error loading email detail:', err); + setError('Failed to load email details. Please try again.'); + } finally { + setLoading(false); + } + }; + + const sendReply = async () => { + if (!accessToken || !selectedEmail || !replyText.trim()) return; + + setSendingReply(true); + setError(null); + + try { + // Create the email message + const to = selectedEmail.from.match(/<(.+)>/)?.[1] || selectedEmail.from; + const subject = selectedEmail.subject.startsWith('Re:') + ? selectedEmail.subject + : `Re: ${selectedEmail.subject}`; + + const messageParts = [ + `To: ${to}`, + `Subject: ${subject}`, + `In-Reply-To: ${selectedEmail.id}`, + `References: ${selectedEmail.id}`, + '', + replyText, + ]; + + const message = messageParts.join('\n'); + const encodedMessage = btoa(message) + .replace(/\+/g, '-') + .replace(/\//g, '_') + .replace(/=+$/, ''); + + const response = await fetch( + `https://gmail.googleapis.com/gmail/v1/users/me/messages/send`, + { + method: 'POST', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + raw: encodedMessage, + threadId: selectedEmail.threadId, + }), + } + ); + + if (!response.ok) { + throw new Error(`Gmail API error: ${response.statusText}`); + } + + alert('Reply sent successfully!'); + setReplyText(''); + } catch (err) { + logger.error('Error sending reply:', err); + setError('Failed to send reply. Please try again.'); + } finally { + setSendingReply(false); + } + }; + + return ( +
+
+ +

Gmail Search

+
+ + {/* Search Section */} +
+
+ setSearchQuery(e.target.value)} + onKeyPress={(e) => e.key === 'Enter' && searchEmails()} + placeholder="Search emails (e.g., from:user@example.com, subject:meeting)" + className="flex-1 px-4 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + disabled={!accessToken || loading} + /> + +
+
+ + {error && ( +
+ {error} +
+ )} + +
+ {/* Email List */} +
+
+

Search Results

+
+
+ {emails.length === 0 && !loading && ( +
+ {searchQuery ? 'No emails found' : 'Enter a search query to find emails'} +
+ )} + {emails.map((email) => ( +
loadEmailDetail(email.id)} + className={`p-4 cursor-pointer hover:bg-gray-50 ${ + selectedEmail?.id === email.id ? 'bg-blue-50' : '' + }`} + > +
{email.subject}
+
{email.from}
+
{email.snippet}
+
{email.date}
+
+ ))} +
+
+ + {/* Email Detail and Reply */} +
+
+

Email Details

+
+ {selectedEmail ? ( +
+
+
+ Subject: {selectedEmail.subject} +
+
+ From: {selectedEmail.from} +
+
+ To: {selectedEmail.to} +
+
+ Date: {selectedEmail.date} +
+
+ +
+
{selectedEmail.body}
+
+ +
+

Reply

+