mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-26 21:39:43 +02:00
Added Luma connector
This commit is contained in:
parent
8bbc8dba4e
commit
9d2b808e66
27 changed files with 1757 additions and 4 deletions
|
|
@ -0,0 +1,60 @@
|
|||
"""Add Luma connector enums
|
||||
|
||||
Revision ID: 21
|
||||
Revises: 20
|
||||
Create Date: 2025-09-27 20:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "21"
|
||||
down_revision: str | None = "20"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Safely add 'LUMA_CONNECTOR' to enum types if missing."""
|
||||
|
||||
# Add to searchsourceconnectortype enum
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_type t
|
||||
JOIN pg_enum e ON t.oid = e.enumtypid
|
||||
WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'LUMA_CONNECTOR'
|
||||
) THEN
|
||||
ALTER TYPE searchsourceconnectortype ADD VALUE 'LUMA_CONNECTOR';
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# Add to documenttype enum
|
||||
op.execute(
|
||||
"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_type t
|
||||
JOIN pg_enum e ON t.oid = e.enumtypid
|
||||
WHERE t.typname = 'documenttype' AND e.enumlabel = 'LUMA_CONNECTOR'
|
||||
) THEN
|
||||
ALTER TYPE documenttype ADD VALUE 'LUMA_CONNECTOR';
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Remove 'LUMA_CONNECTOR' from enum types."""
|
||||
pass
|
||||
|
|
@ -413,6 +413,41 @@ async def fetch_documents_by_ids(
|
|||
else:
|
||||
url = ""
|
||||
|
||||
elif doc_type == "LUMA_CONNECTOR":
|
||||
# Extract Luma-specific metadata
|
||||
event_id = metadata.get("event_id", "")
|
||||
event_name = metadata.get("event_name", "Untitled Event")
|
||||
event_url = metadata.get("event_url", "")
|
||||
start_time = metadata.get("start_time", "")
|
||||
location_name = metadata.get("location_name", "")
|
||||
meeting_url = metadata.get("meeting_url", "")
|
||||
|
||||
title = f"Luma: {event_name}"
|
||||
if start_time:
|
||||
# Format the start time for display
|
||||
try:
|
||||
if "T" in start_time:
|
||||
from datetime import datetime
|
||||
start_dt = datetime.fromisoformat(
|
||||
start_time.replace("Z", "+00:00")
|
||||
)
|
||||
formatted_time = start_dt.strftime("%Y-%m-%d %H:%M")
|
||||
title += f" ({formatted_time})"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
description = (
|
||||
doc.content[:100] + "..."
|
||||
if len(doc.content) > 100
|
||||
else doc.content
|
||||
)
|
||||
if location_name:
|
||||
description += f" | Venue: {location_name}"
|
||||
elif meeting_url:
|
||||
description += f" | Online Event"
|
||||
|
||||
url = event_url if event_url else ""
|
||||
|
||||
elif doc_type == "EXTENSION":
|
||||
# Extract Extension-specific metadata
|
||||
webpage_title = metadata.get("VisitedWebPageTitle", doc.title)
|
||||
|
|
@ -487,6 +522,7 @@ async def fetch_documents_by_ids(
|
|||
"CONFLUENCE_CONNECTOR": "Confluence (Selected)",
|
||||
"CLICKUP_CONNECTOR": "ClickUp (Selected)",
|
||||
"AIRTABLE_CONNECTOR": "Airtable (Selected)",
|
||||
"LUMA_CONNECTOR": "Luma Events (Selected)",
|
||||
}
|
||||
|
||||
source_object = {
|
||||
|
|
@ -1197,6 +1233,33 @@ async def fetch_relevant_documents(
|
|||
}
|
||||
)
|
||||
|
||||
elif connector == "LUMA_CONNECTOR":
|
||||
(
|
||||
source_object,
|
||||
luma_chunks,
|
||||
) = await connector_service.search_luma(
|
||||
user_query=reformulated_query,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
top_k=top_k,
|
||||
search_mode=search_mode,
|
||||
)
|
||||
|
||||
# Add to sources and raw documents
|
||||
if source_object:
|
||||
all_sources.append(source_object)
|
||||
all_raw_documents.extend(luma_chunks)
|
||||
|
||||
# Stream found document count
|
||||
if streaming_service and writer:
|
||||
writer(
|
||||
{
|
||||
"yield_value": streaming_service.format_terminal_info_delta(
|
||||
f"🎯 Found {len(luma_chunks)} Luma events related to your query"
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Error in search_airtable: %s", traceback.format_exc())
|
||||
error_message = f"Error searching connector {connector}: {e!s}"
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ You are SurfSense, an advanced AI research assistant that provides detailed, wel
|
|||
- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization)
|
||||
- TAVILY_API: "Tavily search API results" (personalized search results)
|
||||
- LINKUP_API: "Linkup search API results" (personalized search results)
|
||||
- LUMA_CONNECTOR: "Luma events"
|
||||
</knowledge_sources>
|
||||
|
||||
<instructions>
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ You are SurfSense, an advanced AI research assistant that synthesizes informatio
|
|||
- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization)
|
||||
- TAVILY_API: "Tavily search API results" (personalized search results)
|
||||
- LINKUP_API: "Linkup search API results" (personalized search results)
|
||||
- LUMA_CONNECTOR: "Luma events"
|
||||
</knowledge_sources>
|
||||
<instructions>
|
||||
1. Review the chat history to understand the conversation context and any previous topics discussed.
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ def get_connector_emoji(connector_name: str) -> str:
|
|||
"LINKUP_API": "🔗",
|
||||
"GOOGLE_CALENDAR_CONNECTOR": "📅",
|
||||
"AIRTABLE_CONNECTOR": "🗃️",
|
||||
"LUMA_CONNECTOR": "✨",
|
||||
}
|
||||
return connector_emojis.get(connector_name, "🔎")
|
||||
|
||||
|
|
@ -72,6 +73,7 @@ def get_connector_friendly_name(connector_name: str) -> str:
|
|||
"TAVILY_API": "Tavily Search",
|
||||
"LINKUP_API": "Linkup Search",
|
||||
"AIRTABLE_CONNECTOR": "Airtable",
|
||||
"LUMA_CONNECTOR": "Luma",
|
||||
}
|
||||
return connector_friendly_names.get(connector_name, connector_name)
|
||||
|
||||
|
|
|
|||
423
surfsense_backend/app/connectors/luma_connector.py
Normal file
423
surfsense_backend/app/connectors/luma_connector.py
Normal file
|
|
@ -0,0 +1,423 @@
|
|||
"""
|
||||
Luma Connector Module
|
||||
|
||||
A module for retrieving events and guest data from Luma Event Platform.
|
||||
Allows fetching event lists, event details, and guest information with date range filtering.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class LumaConnector:
|
||||
"""Class for retrieving events and guest data from Luma Event Platform."""
|
||||
|
||||
def __init__(self, api_key: str | None = None):
|
||||
"""
|
||||
Initialize the LumaConnector class.
|
||||
|
||||
Args:
|
||||
api_key: Luma API key (optional, can be set later with set_api_key)
|
||||
"""
|
||||
self.api_key = api_key
|
||||
self.base_url = "https://public-api.luma.com/v1"
|
||||
|
||||
def set_api_key(self, api_key: str) -> None:
|
||||
"""
|
||||
Set the Luma API key.
|
||||
|
||||
Args:
|
||||
api_key: Luma API key
|
||||
"""
|
||||
self.api_key = api_key
|
||||
|
||||
def get_headers(self) -> dict[str, str]:
|
||||
"""
|
||||
Get headers for Luma API requests.
|
||||
|
||||
Returns:
|
||||
Dictionary of headers
|
||||
|
||||
Raises:
|
||||
ValueError: If no Luma API key has been set
|
||||
"""
|
||||
if not self.api_key:
|
||||
raise ValueError("Luma API key not initialized. Call set_api_key() first.")
|
||||
|
||||
return {
|
||||
"Content-Type": "application/json",
|
||||
"x-luma-api-key": self.api_key,
|
||||
}
|
||||
|
||||
def make_request(
|
||||
self, endpoint: str, params: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Make a request to the Luma API.
|
||||
|
||||
Args:
|
||||
endpoint: API endpoint path (without base URL)
|
||||
params: Query parameters (optional)
|
||||
|
||||
Returns:
|
||||
Response data from the API
|
||||
|
||||
Raises:
|
||||
ValueError: If no Luma API key has been set
|
||||
Exception: If the API request fails
|
||||
"""
|
||||
if not self.api_key:
|
||||
raise ValueError("Luma API key not initialized. Call set_api_key() first.")
|
||||
|
||||
headers = self.get_headers()
|
||||
url = f"{self.base_url}/{endpoint.lstrip('/')}"
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
elif response.status_code == 401:
|
||||
raise Exception("Unauthorized: Invalid Luma API key")
|
||||
elif response.status_code == 403:
|
||||
raise Exception("Forbidden: Access denied or Luma Plus subscription required")
|
||||
elif response.status_code == 429:
|
||||
raise Exception("Rate limit exceeded: Too many requests")
|
||||
else:
|
||||
raise Exception(
|
||||
f"API request failed with status code {response.status_code}: {response.text}"
|
||||
)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Network error: {e}") from e
|
||||
|
||||
def get_user_info(self) -> tuple[dict[str, Any] | None, str | None]:
|
||||
"""
|
||||
Get information about the authenticated user.
|
||||
|
||||
Returns:
|
||||
Tuple containing (user info dict, error message or None)
|
||||
"""
|
||||
try:
|
||||
user_info = self.make_request("user/get-self")
|
||||
return user_info, None
|
||||
except Exception as e:
|
||||
return None, f"Error fetching user info: {e!s}"
|
||||
|
||||
def get_all_events(self, limit: int = 100) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""
|
||||
Fetch all events for the authenticated user.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of events to fetch per request (default: 100)
|
||||
|
||||
Returns:
|
||||
Tuple containing (events list, error message or None)
|
||||
"""
|
||||
try:
|
||||
all_events = []
|
||||
cursor = None
|
||||
|
||||
while True:
|
||||
params = {"limit": limit}
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
|
||||
response = self.make_request("calendar/list-events", params)
|
||||
|
||||
if "entries" not in response:
|
||||
break
|
||||
|
||||
events = response["entries"]
|
||||
all_events.extend(events)
|
||||
|
||||
# Check for pagination
|
||||
if "next_cursor" in response and response["next_cursor"]:
|
||||
cursor = response["next_cursor"]
|
||||
else:
|
||||
break
|
||||
|
||||
return all_events, None
|
||||
|
||||
except Exception as e:
|
||||
return [], f"Error fetching events: {e!s}"
|
||||
|
||||
def get_event_details(self, event_id: str) -> tuple[dict[str, Any] | None, str | None]:
|
||||
"""
|
||||
Fetch detailed information about a specific event.
|
||||
|
||||
Args:
|
||||
event_id: The ID of the event to fetch details for
|
||||
|
||||
Returns:
|
||||
Tuple containing (event details dict, error message or None)
|
||||
"""
|
||||
try:
|
||||
event_details = self.make_request(f"events/{event_id}")
|
||||
return event_details, None
|
||||
except Exception as e:
|
||||
return None, f"Error fetching event details for {event_id}: {e!s}"
|
||||
|
||||
def get_event_guests(self, event_id: str, limit: int = 100) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""
|
||||
Fetch guests for a specific event.
|
||||
|
||||
Args:
|
||||
event_id: The ID of the event to fetch guests for
|
||||
limit: Maximum number of guests to fetch per request (default: 100)
|
||||
|
||||
Returns:
|
||||
Tuple containing (guests list, error message or None)
|
||||
"""
|
||||
try:
|
||||
all_guests = []
|
||||
cursor = None
|
||||
|
||||
while True:
|
||||
params = {"limit": limit}
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
|
||||
response = self.make_request(f"events/{event_id}/guests", params)
|
||||
|
||||
if "entries" not in response:
|
||||
break
|
||||
|
||||
guests = response["entries"]
|
||||
all_guests.extend(guests)
|
||||
|
||||
# Check for pagination
|
||||
if "next_cursor" in response and response["next_cursor"]:
|
||||
cursor = response["next_cursor"]
|
||||
else:
|
||||
break
|
||||
|
||||
return all_guests, None
|
||||
|
||||
except Exception as e:
|
||||
return [], f"Error fetching guests for event {event_id}: {e!s}"
|
||||
|
||||
def get_events_by_date_range(
|
||||
self, start_date: str, end_date: str, include_guests: bool = True
|
||||
) -> tuple[list[dict[str, Any]], str | None]:
|
||||
"""
|
||||
Fetch events within a date range.
|
||||
|
||||
Args:
|
||||
start_date: Start date in YYYY-MM-DD format
|
||||
end_date: End date in YYYY-MM-DD format (inclusive)
|
||||
include_guests: Whether to include guest information for each event
|
||||
|
||||
Returns:
|
||||
Tuple containing (events list, error message or None)
|
||||
"""
|
||||
try:
|
||||
# Convert date strings to ISO format for comparison
|
||||
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
|
||||
|
||||
# Get all events first
|
||||
all_events, error = self.get_all_events()
|
||||
if error:
|
||||
return [], error
|
||||
|
||||
# Filter events by date range
|
||||
filtered_events = []
|
||||
for event in all_events:
|
||||
event_start_time = event.get("event", {}).get("start_at")
|
||||
if event_start_time:
|
||||
try:
|
||||
# Parse the event start time (assuming ISO format)
|
||||
event_dt = datetime.fromisoformat(event_start_time.replace("Z", "+00:00"))
|
||||
event_date = event_dt.date()
|
||||
|
||||
# Check if event falls within the date range
|
||||
if start_dt.date() <= event_date <= end_dt.date():
|
||||
# Add guest information if requested
|
||||
if include_guests:
|
||||
event_id = event.get("api_id")
|
||||
if event_id:
|
||||
guests, guest_error = self.get_event_guests(event_id)
|
||||
if not guest_error:
|
||||
event["guests"] = guests
|
||||
|
||||
filtered_events.append(event)
|
||||
except (ValueError, AttributeError):
|
||||
# Skip events with invalid dates
|
||||
continue
|
||||
|
||||
if not filtered_events:
|
||||
return [], "No events found in the specified date range."
|
||||
|
||||
return filtered_events, None
|
||||
|
||||
except ValueError as e:
|
||||
return [], f"Invalid date format: {e!s}. Please use YYYY-MM-DD."
|
||||
except Exception as e:
|
||||
return [], f"Error fetching events by date range: {e!s}"
|
||||
|
||||
def format_event_to_markdown(self, event: dict[str, Any]) -> str:
|
||||
"""
|
||||
Convert an event to markdown format.
|
||||
|
||||
Args:
|
||||
event: The event object from Luma API
|
||||
|
||||
Returns:
|
||||
Markdown string representation of the event
|
||||
"""
|
||||
# Extract event details
|
||||
event_data = event.get("event", {})
|
||||
|
||||
title = event_data.get("name", "Untitled Event")
|
||||
description = event_data.get("description", "")
|
||||
event_id = event.get("api_id", "")
|
||||
|
||||
# Extract timing information
|
||||
start_at = event_data.get("start_at", "")
|
||||
end_at = event_data.get("end_at", "")
|
||||
timezone = event_data.get("timezone", "")
|
||||
|
||||
# Format dates
|
||||
start_formatted = self.format_date(start_at) if start_at else "Unknown"
|
||||
end_formatted = self.format_date(end_at) if end_at else "Unknown"
|
||||
|
||||
# Extract location information
|
||||
geo_info = event_data.get("geo_info", {})
|
||||
location_name = geo_info.get("name", "")
|
||||
address = geo_info.get("address", "")
|
||||
|
||||
# Extract other details
|
||||
url = event_data.get("url", "")
|
||||
visibility = event_data.get("visibility", "")
|
||||
meeting_url = event_data.get("meeting_url", "")
|
||||
|
||||
# Build markdown content
|
||||
markdown_content = f"# {title}\n\n"
|
||||
|
||||
if event_id:
|
||||
markdown_content += f"**Event ID:** {event_id}\n"
|
||||
|
||||
# Add timing information
|
||||
markdown_content += f"**Start:** {start_formatted}\n"
|
||||
markdown_content += f"**End:** {end_formatted}\n"
|
||||
|
||||
if timezone:
|
||||
markdown_content += f"**Timezone:** {timezone}\n"
|
||||
|
||||
markdown_content += "\n"
|
||||
|
||||
# Add location information
|
||||
if location_name or address:
|
||||
markdown_content += "## Location\n\n"
|
||||
if location_name:
|
||||
markdown_content += f"**Venue:** {location_name}\n"
|
||||
if address:
|
||||
markdown_content += f"**Address:** {address}\n"
|
||||
markdown_content += "\n"
|
||||
|
||||
# Add online meeting info
|
||||
if meeting_url:
|
||||
markdown_content += f"**Meeting URL:** {meeting_url}\n\n"
|
||||
|
||||
# Add description if available
|
||||
if description:
|
||||
markdown_content += f"## Description\n\n{description}\n\n"
|
||||
|
||||
# Add event details
|
||||
markdown_content += "## Event Details\n\n"
|
||||
|
||||
if url:
|
||||
markdown_content += f"- **Event URL:** {url}\n"
|
||||
|
||||
if visibility:
|
||||
markdown_content += f"- **Visibility:** {visibility}\n"
|
||||
|
||||
# Add guest information if available
|
||||
if "guests" in event:
|
||||
guests = event["guests"]
|
||||
markdown_content += f"\n## Guests ({len(guests)})\n\n"
|
||||
|
||||
for guest in guests[:10]: # Show first 10 guests
|
||||
guest_data = guest.get("guest", {})
|
||||
name = guest_data.get("name", "Unknown")
|
||||
email = guest_data.get("email", "")
|
||||
status = guest.get("registration_status", "unknown")
|
||||
|
||||
markdown_content += f"- **{name}**"
|
||||
if email:
|
||||
markdown_content += f" ({email})"
|
||||
markdown_content += f" - Status: {status}\n"
|
||||
|
||||
if len(guests) > 10:
|
||||
markdown_content += f"- ... and {len(guests) - 10} more guests\n"
|
||||
|
||||
markdown_content += "\n"
|
||||
|
||||
return markdown_content
|
||||
|
||||
@staticmethod
|
||||
def format_date(iso_date: str) -> str:
|
||||
"""
|
||||
Format an ISO date string to a more readable format.
|
||||
|
||||
Args:
|
||||
iso_date: ISO format date string
|
||||
|
||||
Returns:
|
||||
Formatted date string
|
||||
"""
|
||||
if not iso_date or not isinstance(iso_date, str):
|
||||
return "Unknown date"
|
||||
|
||||
try:
|
||||
dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00"))
|
||||
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
except ValueError:
|
||||
return iso_date
|
||||
|
||||
|
||||
# Example usage (uncomment to use):
|
||||
"""
|
||||
if __name__ == "__main__":
|
||||
# Set your API key here
|
||||
api_key = "YOUR_LUMA_API_KEY"
|
||||
|
||||
luma = LumaConnector(api_key)
|
||||
|
||||
try:
|
||||
# Test authentication
|
||||
user_info, error = luma.get_user_info()
|
||||
if error:
|
||||
print(f"Authentication error: {error}")
|
||||
else:
|
||||
print(f"Authenticated as: {user_info.get('name', 'Unknown')}")
|
||||
|
||||
# Get all events
|
||||
events, error = luma.get_all_events()
|
||||
if error:
|
||||
print(f"Error fetching events: {error}")
|
||||
else:
|
||||
print(f"Retrieved {len(events)} events")
|
||||
|
||||
# Format and print the first event as markdown
|
||||
if events:
|
||||
event_md = luma.format_event_to_markdown(events[0])
|
||||
print("\nSample Event in Markdown:\n")
|
||||
print(event_md)
|
||||
|
||||
# Get events by date range
|
||||
start_date = "2023-01-01"
|
||||
end_date = "2023-01-31"
|
||||
date_events, error = luma.get_events_by_date_range(start_date, end_date)
|
||||
|
||||
if error:
|
||||
print(f"Error: {error}")
|
||||
else:
|
||||
print(f"\nRetrieved {len(date_events)} events from {start_date} to {end_date}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
"""
|
||||
|
|
@ -49,6 +49,7 @@ class DocumentType(str, Enum):
|
|||
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
|
||||
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
|
||||
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
|
||||
LUMA_CONNECTOR = "LUMA_CONNECTOR"
|
||||
|
||||
|
||||
class SearchSourceConnectorType(str, Enum):
|
||||
|
|
@ -66,6 +67,7 @@ class SearchSourceConnectorType(str, Enum):
|
|||
GOOGLE_CALENDAR_CONNECTOR = "GOOGLE_CALENDAR_CONNECTOR"
|
||||
GOOGLE_GMAIL_CONNECTOR = "GOOGLE_GMAIL_CONNECTOR"
|
||||
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
|
||||
LUMA_CONNECTOR = "LUMA_CONNECTOR"
|
||||
|
||||
|
||||
class ChatType(str, Enum):
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from .airtable_add_connector_route import (
|
|||
router as airtable_add_connector_router,
|
||||
)
|
||||
from .chats_routes import router as chats_router
|
||||
from .luma_add_connector_route import router as luma_add_connector_router
|
||||
from .documents_routes import router as documents_router
|
||||
from .google_calendar_add_connector_route import (
|
||||
router as google_calendar_add_connector_router,
|
||||
|
|
@ -27,5 +28,6 @@ router.include_router(search_source_connectors_router)
|
|||
router.include_router(google_calendar_add_connector_router)
|
||||
router.include_router(google_gmail_add_connector_router)
|
||||
router.include_router(airtable_add_connector_router)
|
||||
router.include_router(luma_add_connector_router)
|
||||
router.include_router(llm_config_router)
|
||||
router.include_router(logs_router)
|
||||
|
|
|
|||
240
surfsense_backend/app/routes/luma_add_connector_route.py
Normal file
240
surfsense_backend/app/routes/luma_add_connector_route.py
Normal file
|
|
@ -0,0 +1,240 @@
|
|||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from app.db import (
|
||||
SearchSourceConnector,
|
||||
SearchSourceConnectorType,
|
||||
User,
|
||||
get_async_session,
|
||||
)
|
||||
from app.users import current_active_user
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
class AddLumaConnectorRequest(BaseModel):
|
||||
"""Request model for adding a Luma connector."""
|
||||
|
||||
api_key: str = Field(..., description="Luma API key")
|
||||
space_id: int = Field(..., description="Search space ID")
|
||||
|
||||
|
||||
@router.post("/connectors/luma/add")
|
||||
async def add_luma_connector(
|
||||
request: AddLumaConnectorRequest,
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""
|
||||
Add a new Luma connector for the authenticated user.
|
||||
|
||||
Args:
|
||||
request: The request containing Luma API key and space_id
|
||||
user: Current authenticated user
|
||||
session: Database session
|
||||
|
||||
Returns:
|
||||
Success message and connector details
|
||||
|
||||
Raises:
|
||||
HTTPException: If connector already exists or validation fails
|
||||
"""
|
||||
try:
|
||||
# Check if a Luma connector already exists for this user
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.LUMA_CONNECTOR,
|
||||
)
|
||||
)
|
||||
existing_connector = result.scalars().first()
|
||||
|
||||
if existing_connector:
|
||||
# Update existing connector with new API key
|
||||
existing_connector.config = {"api_key": request.api_key}
|
||||
existing_connector.is_indexable = True
|
||||
await session.commit()
|
||||
await session.refresh(existing_connector)
|
||||
|
||||
logger.info(f"Updated existing Luma connector for user {user.id}")
|
||||
|
||||
return {
|
||||
"message": "Luma connector updated successfully",
|
||||
"connector_id": existing_connector.id,
|
||||
"connector_type": "LUMA_CONNECTOR",
|
||||
}
|
||||
|
||||
# Create new Luma connector
|
||||
db_connector = SearchSourceConnector(
|
||||
name="Luma Event Connector",
|
||||
connector_type=SearchSourceConnectorType.LUMA_CONNECTOR,
|
||||
config={"api_key": request.api_key},
|
||||
user_id=user.id,
|
||||
is_indexable=True,
|
||||
)
|
||||
|
||||
session.add(db_connector)
|
||||
await session.commit()
|
||||
await session.refresh(db_connector)
|
||||
|
||||
logger.info(
|
||||
f"Successfully created Luma connector for user {user.id} with ID {db_connector.id}"
|
||||
)
|
||||
|
||||
return {
|
||||
"message": "Luma connector added successfully",
|
||||
"connector_id": db_connector.id,
|
||||
"connector_type": "LUMA_CONNECTOR",
|
||||
}
|
||||
|
||||
except IntegrityError as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Database integrity error: {e!s}")
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail="A Luma connector already exists for this user.",
|
||||
) from e
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Unexpected error adding Luma connector: {e!s}", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to add Luma connector: {e!s}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.delete("/connectors/luma")
|
||||
async def delete_luma_connector(
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""
|
||||
Delete the Luma connector for the authenticated user.
|
||||
|
||||
Args:
|
||||
user: Current authenticated user
|
||||
session: Database session
|
||||
|
||||
Returns:
|
||||
Success message
|
||||
|
||||
Raises:
|
||||
HTTPException: If connector doesn't exist
|
||||
"""
|
||||
try:
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.LUMA_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
||||
if not connector:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Luma connector not found for this user.",
|
||||
)
|
||||
|
||||
await session.delete(connector)
|
||||
await session.commit()
|
||||
|
||||
logger.info(f"Successfully deleted Luma connector for user {user.id}")
|
||||
|
||||
return {"message": "Luma connector deleted successfully"}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Unexpected error deleting Luma connector: {e!s}", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to delete Luma connector: {e!s}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/connectors/luma/test")
|
||||
async def test_luma_connector(
|
||||
user: User = Depends(current_active_user),
|
||||
session: AsyncSession = Depends(get_async_session),
|
||||
):
|
||||
"""
|
||||
Test the Luma connector for the authenticated user.
|
||||
|
||||
Args:
|
||||
user: Current authenticated user
|
||||
session: Database session
|
||||
|
||||
Returns:
|
||||
Test results including user info and event count
|
||||
|
||||
Raises:
|
||||
HTTPException: If connector doesn't exist or test fails
|
||||
"""
|
||||
try:
|
||||
# Get the Luma connector for this user
|
||||
result = await session.execute(
|
||||
select(SearchSourceConnector).filter(
|
||||
SearchSourceConnector.user_id == user.id,
|
||||
SearchSourceConnector.connector_type == SearchSourceConnectorType.LUMA_CONNECTOR,
|
||||
)
|
||||
)
|
||||
connector = result.scalars().first()
|
||||
|
||||
if not connector:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Luma connector not found. Please add a connector first.",
|
||||
)
|
||||
|
||||
# Import LumaConnector
|
||||
from app.connectors.luma_connector import LumaConnector
|
||||
|
||||
# Initialize the connector
|
||||
api_key = connector.config.get("api_key")
|
||||
if not api_key:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Invalid connector configuration: API key missing.",
|
||||
)
|
||||
|
||||
luma = LumaConnector(api_key=api_key)
|
||||
|
||||
# Test the connection by fetching user info
|
||||
user_info, error = luma.get_user_info()
|
||||
if error:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Failed to connect to Luma: {error}",
|
||||
)
|
||||
|
||||
# Try to fetch events
|
||||
events, events_error = luma.get_all_events(limit=10)
|
||||
|
||||
return {
|
||||
"message": "Luma connector is working correctly",
|
||||
"user_info": {
|
||||
"name": user_info.get("name", "Unknown"),
|
||||
"email": user_info.get("email", "Unknown"),
|
||||
},
|
||||
"event_count": len(events) if not events_error else 0,
|
||||
"events_error": events_error,
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error testing Luma connector: {e!s}", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to test Luma connector: {e!s}",
|
||||
) from e
|
||||
|
|
@ -7,7 +7,7 @@ PUT /search-source-connectors/{connector_id} - Update a specific connector
|
|||
DELETE /search-source-connectors/{connector_id} - Delete a specific connector
|
||||
POST /search-source-connectors/{connector_id}/index - Index content from a connector to a search space
|
||||
|
||||
Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR, GITHUB_CONNECTOR, LINEAR_CONNECTOR, DISCORD_CONNECTOR).
|
||||
Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR, GITHUB_CONNECTOR, LINEAR_CONNECTOR, DISCORD_CONNECTOR, LUMA_CONNECTOR).
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
|
@ -47,6 +47,7 @@ from app.tasks.connector_indexers import (
|
|||
index_linear_issues,
|
||||
index_notion_pages,
|
||||
index_slack_messages,
|
||||
index_luma_events
|
||||
)
|
||||
from app.users import current_active_user
|
||||
from app.utils.check_ownership import check_ownership
|
||||
|
|
@ -344,6 +345,7 @@ async def index_connector_content(
|
|||
- LINEAR_CONNECTOR: Indexes issues and comments from Linear
|
||||
- JIRA_CONNECTOR: Indexes issues and comments from Jira
|
||||
- DISCORD_CONNECTOR: Indexes messages from all accessible Discord channels
|
||||
- LUMA_CONNECTOR: Indexes events from Luma
|
||||
|
||||
Args:
|
||||
connector_id: ID of the connector to use
|
||||
|
|
@ -555,6 +557,21 @@ async def index_connector_content(
|
|||
)
|
||||
response_message = "Discord indexing started in the background."
|
||||
|
||||
elif connector.connector_type == SearchSourceConnectorType.LUMA_CONNECTOR:
|
||||
# Run indexing in background
|
||||
logger.info(
|
||||
f"Triggering Luma indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
|
||||
)
|
||||
background_tasks.add_task(
|
||||
run_luma_indexing_with_new_session,
|
||||
connector_id,
|
||||
search_space_id,
|
||||
str(user.id),
|
||||
indexing_from,
|
||||
indexing_to,
|
||||
)
|
||||
response_message = "Luma indexing started in the background."
|
||||
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
|
|
@ -1262,3 +1279,63 @@ async def run_google_gmail_indexing(
|
|||
exc_info=True,
|
||||
)
|
||||
# Optionally update status in DB to indicate failure
|
||||
|
||||
# Add new helper functions for luma indexing
|
||||
async def run_luma_indexing_with_new_session(
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""
|
||||
Create a new session and run the Luma indexing task.
|
||||
This prevents session leaks by creating a dedicated session for the background task.
|
||||
"""
|
||||
async with async_session_maker() as session:
|
||||
await run_luma_indexing(
|
||||
session, connector_id, search_space_id, user_id, start_date, end_date
|
||||
)
|
||||
|
||||
async def run_luma_indexing(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str,
|
||||
end_date: str,
|
||||
):
|
||||
"""
|
||||
Background task to run Luma indexing.
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Luma connector
|
||||
search_space_id: ID of the search space
|
||||
user_id: ID of the user
|
||||
start_date: Start date for indexing
|
||||
end_date: End date for indexing
|
||||
"""
|
||||
try:
|
||||
# Index Luma events without updating last_indexed_at (we'll do it separately)
|
||||
documents_processed, error_or_warning = await index_luma_events(
|
||||
session=session,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
update_last_indexed=False, # Don't update timestamp in the indexing function
|
||||
)
|
||||
|
||||
# Only update last_indexed_at if indexing was successful (either new docs or updated docs)
|
||||
if documents_processed > 0:
|
||||
await update_connector_last_indexed(session, connector_id)
|
||||
logger.info(
|
||||
f"Luma indexing completed successfully: {documents_processed} documents processed"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"Luma indexing failed or no documents processed: {error_or_warning}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in background Luma indexing task: {e!s}")
|
||||
|
|
@ -196,6 +196,18 @@ class SearchSourceConnectorBase(BaseModel):
|
|||
if key not in config or config[key] in (None, ""):
|
||||
raise ValueError(f"{key} is required and cannot be empty")
|
||||
|
||||
elif connector_type == SearchSourceConnectorType.LUMA_CONNECTOR:
|
||||
# For LUMA_CONNECTOR, only allow LUMA_API_KEY
|
||||
allowed_keys = ["LUMA_API_KEY"]
|
||||
if set(config.keys()) != set(allowed_keys):
|
||||
raise ValueError(
|
||||
f"For LUMA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
|
||||
)
|
||||
|
||||
# Ensure the api key is not empty
|
||||
if not config.get("LUMA_API_KEY"):
|
||||
raise ValueError("LUMA_API_KEY cannot be empty")
|
||||
|
||||
return config
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1852,3 +1852,163 @@ class ConnectorService:
|
|||
}
|
||||
|
||||
return result_object, discord_chunks
|
||||
|
||||
async def search_luma(
|
||||
self,
|
||||
user_query: str,
|
||||
user_id: str,
|
||||
search_space_id: int,
|
||||
top_k: int = 20,
|
||||
search_mode: SearchMode = SearchMode.CHUNKS,
|
||||
) -> tuple:
|
||||
"""
|
||||
Search for Luma events and return both the source information and langchain documents
|
||||
|
||||
Args:
|
||||
user_query: The user's query
|
||||
user_id: The user's ID
|
||||
search_space_id: The search space ID to search in
|
||||
top_k: Maximum number of results to return
|
||||
search_mode: Search mode (CHUNKS or DOCUMENTS)
|
||||
|
||||
Returns:
|
||||
tuple: (sources_info, langchain_documents)
|
||||
"""
|
||||
if search_mode == SearchMode.CHUNKS:
|
||||
luma_chunks = await self.chunk_retriever.hybrid_search(
|
||||
query_text=user_query,
|
||||
top_k=top_k,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
document_type="LUMA_CONNECTOR",
|
||||
)
|
||||
elif search_mode == SearchMode.DOCUMENTS:
|
||||
luma_chunks = await self.document_retriever.hybrid_search(
|
||||
query_text=user_query,
|
||||
top_k=top_k,
|
||||
user_id=user_id,
|
||||
search_space_id=search_space_id,
|
||||
document_type="LUMA_CONNECTOR",
|
||||
)
|
||||
# Transform document retriever results to match expected format
|
||||
luma_chunks = self._transform_document_results(luma_chunks)
|
||||
|
||||
# Early return if no results
|
||||
if not luma_chunks:
|
||||
return {
|
||||
"id": 33,
|
||||
"name": "Luma Events",
|
||||
"type": "LUMA_CONNECTOR",
|
||||
"sources": [],
|
||||
}, []
|
||||
|
||||
# Process each chunk and create sources directly without deduplication
|
||||
sources_list = []
|
||||
async with self.counter_lock:
|
||||
for _i, chunk in enumerate(luma_chunks):
|
||||
# Extract document metadata
|
||||
document = chunk.get("document", {})
|
||||
metadata = document.get("metadata", {})
|
||||
|
||||
# Extract Luma-specific metadata
|
||||
event_id = metadata.get("event_id", "")
|
||||
event_name = metadata.get("event_name", "Untitled Event")
|
||||
event_url = metadata.get("event_url", "")
|
||||
start_time = metadata.get("start_time", "")
|
||||
end_time = metadata.get("end_time", "")
|
||||
location_name = metadata.get("location_name", "")
|
||||
location_address = metadata.get("location_address", "")
|
||||
meeting_url = metadata.get("meeting_url", "")
|
||||
timezone = metadata.get("timezone", "")
|
||||
visibility = metadata.get("visibility", "")
|
||||
|
||||
# Create a more descriptive title for Luma events
|
||||
title = f"Luma: {event_name}"
|
||||
if start_time:
|
||||
# Format the start time for display
|
||||
try:
|
||||
if "T" in start_time:
|
||||
from datetime import datetime
|
||||
|
||||
start_dt = datetime.fromisoformat(
|
||||
start_time.replace("Z", "+00:00")
|
||||
)
|
||||
formatted_time = start_dt.strftime("%Y-%m-%d %H:%M")
|
||||
title += f" ({formatted_time})"
|
||||
else:
|
||||
title += f" ({start_time})"
|
||||
except Exception:
|
||||
title += f" ({start_time})"
|
||||
|
||||
# Create a more descriptive description for Luma events
|
||||
description = chunk.get("content", "")[:150]
|
||||
if len(description) == 150:
|
||||
description += "..."
|
||||
|
||||
# Add event info to description
|
||||
info_parts = []
|
||||
if location_name:
|
||||
info_parts.append(f"Venue: {location_name}")
|
||||
elif location_address:
|
||||
info_parts.append(f"Location: {location_address}")
|
||||
|
||||
if meeting_url:
|
||||
info_parts.append("Online Event")
|
||||
|
||||
if end_time:
|
||||
try:
|
||||
if "T" in end_time:
|
||||
from datetime import datetime
|
||||
end_dt = datetime.fromisoformat(
|
||||
end_time.replace("Z", "+00:00")
|
||||
)
|
||||
formatted_end = end_dt.strftime("%Y-%m-%d %H:%M")
|
||||
info_parts.append(f"Ends: {formatted_end}")
|
||||
else:
|
||||
info_parts.append(f"Ends: {end_time}")
|
||||
except Exception:
|
||||
info_parts.append(f"Ends: {end_time}")
|
||||
|
||||
if timezone:
|
||||
info_parts.append(f"TZ: {timezone}")
|
||||
|
||||
if visibility:
|
||||
info_parts.append(f"Visibility: {visibility.title()}")
|
||||
|
||||
if info_parts:
|
||||
if description:
|
||||
description += f" | {' | '.join(info_parts)}"
|
||||
else:
|
||||
description = " | ".join(info_parts)
|
||||
|
||||
# Use the Luma event URL if available
|
||||
url = event_url if event_url else ""
|
||||
|
||||
source = {
|
||||
"id": chunk.get("chunk_id", self.source_id_counter),
|
||||
"title": title,
|
||||
"description": description,
|
||||
"url": url,
|
||||
"event_id": event_id,
|
||||
"event_name": event_name,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"location_name": location_name,
|
||||
"location_address": location_address,
|
||||
"meeting_url": meeting_url,
|
||||
"timezone": timezone,
|
||||
"visibility": visibility,
|
||||
}
|
||||
|
||||
self.source_id_counter += 1
|
||||
sources_list.append(source)
|
||||
|
||||
# Create result object
|
||||
result_object = {
|
||||
"id": 33, # Assign a unique ID for the Luma connector
|
||||
"name": "Luma Events",
|
||||
"type": "LUMA_CONNECTOR",
|
||||
"sources": sources_list,
|
||||
}
|
||||
|
||||
return result_object, luma_chunks
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ Available indexers:
|
|||
- ClickUp: Index tasks from ClickUp workspaces
|
||||
- Google Gmail: Index messages from Google Gmail
|
||||
- Google Calendar: Index events from Google Calendar
|
||||
- Luma: Index events from Luma
|
||||
"""
|
||||
|
||||
# Communication platforms
|
||||
|
|
@ -30,6 +31,7 @@ from .github_indexer import index_github_repos
|
|||
from .google_calendar_indexer import index_google_calendar_events
|
||||
from .google_gmail_indexer import index_google_gmail_messages
|
||||
from .jira_indexer import index_jira_issues
|
||||
from .luma_indexer import index_luma_events
|
||||
|
||||
# Issue tracking and project management
|
||||
from .linear_indexer import index_linear_issues
|
||||
|
|
@ -47,6 +49,7 @@ __all__ = [ # noqa: RUF022
|
|||
"index_github_repos",
|
||||
# Calendar and scheduling
|
||||
"index_google_calendar_events",
|
||||
"index_luma_events",
|
||||
"index_jira_issues",
|
||||
# Issue tracking and project management
|
||||
"index_linear_issues",
|
||||
|
|
|
|||
400
surfsense_backend/app/tasks/connector_indexers/luma_indexer.py
Normal file
400
surfsense_backend/app/tasks/connector_indexers/luma_indexer.py
Normal file
|
|
@ -0,0 +1,400 @@
|
|||
"""
|
||||
Luma connector indexer.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import config
|
||||
from app.connectors.luma_connector import LumaConnector
|
||||
from app.db import Document, DocumentType, SearchSourceConnectorType
|
||||
from app.services.llm_service import get_user_long_context_llm
|
||||
from app.services.task_logging_service import TaskLoggingService
|
||||
from app.utils.document_converters import (
|
||||
create_document_chunks,
|
||||
generate_content_hash,
|
||||
generate_document_summary,
|
||||
)
|
||||
|
||||
from .base import (
|
||||
get_connector_by_id,
|
||||
logger,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
||||
async def index_luma_events(
|
||||
session: AsyncSession,
|
||||
connector_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
update_last_indexed: bool = True,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index Luma events.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
connector_id: ID of the Luma connector
|
||||
search_space_id: ID of the search space to store documents in
|
||||
user_id: User ID
|
||||
start_date: Start date for indexing (YYYY-MM-DD format)
|
||||
end_date: End date for indexing (YYYY-MM-DD format)
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
"""
|
||||
task_logger = TaskLoggingService(session, search_space_id)
|
||||
|
||||
# Log task start
|
||||
log_entry = await task_logger.log_task_start(
|
||||
task_name="luma_events_indexing",
|
||||
source="connector_indexing_task",
|
||||
message=f"Starting Luma events indexing for connector {connector_id}",
|
||||
metadata={
|
||||
"connector_id": connector_id,
|
||||
"user_id": str(user_id),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
# Get the connector
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Retrieving Luma connector {connector_id} from database",
|
||||
{"stage": "connector_retrieval"},
|
||||
)
|
||||
|
||||
# Get the connector from the database
|
||||
connector = await get_connector_by_id(
|
||||
session, connector_id, SearchSourceConnectorType.LUMA_CONNECTOR
|
||||
)
|
||||
|
||||
if not connector:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Connector with ID {connector_id} not found or is not a Luma connector",
|
||||
"Connector not found",
|
||||
{"error_type": "ConnectorNotFound"},
|
||||
)
|
||||
return 0, f"Connector with ID {connector_id} not found or is not a Luma connector"
|
||||
|
||||
# Get the Luma API key from the connector config
|
||||
api_key = connector.config.get("LUMA_API_KEY")
|
||||
|
||||
if not api_key:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Luma API key not found in connector config for connector {connector_id}",
|
||||
"Missing Luma API key",
|
||||
{"error_type": "MissingCredentials"},
|
||||
)
|
||||
return 0, "Luma API key not found in connector config"
|
||||
|
||||
logger.info(f"Starting Luma indexing for connector {connector_id}")
|
||||
|
||||
# Initialize Luma client
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Initializing Luma client for connector {connector_id}",
|
||||
{"stage": "client_initialization"},
|
||||
)
|
||||
|
||||
luma_client = LumaConnector(
|
||||
api_key=api_key
|
||||
)
|
||||
|
||||
# Calculate date range
|
||||
if start_date is None or end_date is None:
|
||||
# Fall back to calculating dates based on last_indexed_at
|
||||
calculated_end_date = datetime.now()
|
||||
|
||||
# Use last_indexed_at as start date if available, otherwise use 30 days ago
|
||||
if connector.last_indexed_at:
|
||||
# Convert dates to be comparable (both timezone-naive)
|
||||
last_indexed_naive = (
|
||||
connector.last_indexed_at.replace(tzinfo=None)
|
||||
if connector.last_indexed_at.tzinfo
|
||||
else connector.last_indexed_at
|
||||
)
|
||||
|
||||
# Check if last_indexed_at is in the future or after end_date
|
||||
if last_indexed_naive > calculated_end_date:
|
||||
logger.warning(
|
||||
f"Last indexed date ({last_indexed_naive.strftime('%Y-%m-%d')}) is in the future. Using 30 days ago instead."
|
||||
)
|
||||
calculated_start_date = calculated_end_date - timedelta(days=30)
|
||||
else:
|
||||
calculated_start_date = last_indexed_naive
|
||||
logger.info(
|
||||
f"Using last_indexed_at ({calculated_start_date.strftime('%Y-%m-%d')}) as start date"
|
||||
)
|
||||
else:
|
||||
calculated_start_date = calculated_end_date - timedelta(days=30)
|
||||
logger.info(
|
||||
f"No last_indexed_at found, using {calculated_start_date.strftime('%Y-%m-%d')} (30 days ago) as start date"
|
||||
)
|
||||
|
||||
# Use calculated dates if not provided
|
||||
start_date_str = (
|
||||
start_date if start_date else calculated_start_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
end_date_str = (
|
||||
end_date if end_date else calculated_end_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
else:
|
||||
# Use provided dates
|
||||
start_date_str = start_date
|
||||
end_date_str = end_date
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
log_entry,
|
||||
f"Fetching Luma events from {start_date_str} to {end_date_str}",
|
||||
{
|
||||
"stage": "fetching_events",
|
||||
"start_date": start_date_str,
|
||||
"end_date": end_date_str,
|
||||
},
|
||||
)
|
||||
|
||||
# Get events within date range from Luma
|
||||
try:
|
||||
events, error = luma_client.get_events_by_date_range(
|
||||
start_date_str,
|
||||
end_date_str,
|
||||
include_guests=False
|
||||
)
|
||||
|
||||
if error:
|
||||
logger.error(f"Failed to get Luma events: {error}")
|
||||
|
||||
# Don't treat "No events found" as an error that should stop indexing
|
||||
if "No events found" in error or "no events" in error.lower():
|
||||
logger.info(
|
||||
"No events found is not a critical error, continuing with update"
|
||||
)
|
||||
if update_last_indexed:
|
||||
await update_connector_last_indexed(
|
||||
session, connector, update_last_indexed
|
||||
)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"Updated last_indexed_at to {connector.last_indexed_at} despite no events found"
|
||||
)
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"No Luma events found in date range {start_date_str} to {end_date_str}",
|
||||
{"events_found": 0},
|
||||
)
|
||||
return 0, None
|
||||
else:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to get Luma events: {error}",
|
||||
"API Error",
|
||||
{"error_type": "APIError"},
|
||||
)
|
||||
return 0, f"Failed to get Luma events: {error}"
|
||||
|
||||
logger.info(f"Retrieved {len(events)} events from Luma API")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching Luma events: {e!s}", exc_info=True)
|
||||
return 0, f"Error fetching Luma events: {e!s}"
|
||||
|
||||
documents_indexed = 0
|
||||
documents_skipped = 0
|
||||
skipped_events = []
|
||||
|
||||
for event in events:
|
||||
try:
|
||||
# Luma event structure fields - events have nested 'event' field
|
||||
event_data = event.get("event", {})
|
||||
event_id = event.get("api_id") or event_data.get("id")
|
||||
event_name = event_data.get("name", "No Title")
|
||||
event_url = event_data.get("url", "")
|
||||
|
||||
if not event_id:
|
||||
logger.warning(f"Skipping event with missing ID: {event_name}")
|
||||
skipped_events.append(f"{event_name} (missing ID)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Format event to markdown using Luma connector's method
|
||||
event_markdown = luma_client.format_event_to_markdown(event)
|
||||
if not event_markdown.strip():
|
||||
logger.warning(f"Skipping event with no content: {event_name}")
|
||||
skipped_events.append(f"{event_name} (no content)")
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Extract Luma-specific fields from event_data
|
||||
start_at = event_data.get("start_at", "")
|
||||
end_at = event_data.get("end_at", "")
|
||||
timezone = event_data.get("timezone", "")
|
||||
|
||||
# Location info from geo_info
|
||||
geo_info = event_data.get("geo_info", {})
|
||||
location = geo_info.get("address", "")
|
||||
city = geo_info.get("city", "")
|
||||
|
||||
# Host info
|
||||
hosts = event_data.get("hosts", [])
|
||||
host_names = ", ".join([host.get("name", "") for host in hosts if host.get("name")])
|
||||
|
||||
description = event_data.get("description", "")
|
||||
cover_url = event_data.get("cover_url", "")
|
||||
|
||||
content_hash = generate_content_hash(event_markdown, search_space_id)
|
||||
|
||||
# Duplicate check via simple query using helper in base
|
||||
from .base import check_duplicate_document_by_hash
|
||||
|
||||
existing_document_by_hash = await check_duplicate_document_by_hash(
|
||||
session, content_hash
|
||||
)
|
||||
if existing_document_by_hash:
|
||||
logger.info(
|
||||
f"Document with content hash {content_hash} already exists for event {event_name}. Skipping processing."
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
# Generate summary with metadata
|
||||
user_llm = await get_user_long_context_llm(session, user_id)
|
||||
|
||||
if user_llm:
|
||||
document_metadata = {
|
||||
"event_id": event_id,
|
||||
"event_name": event_name,
|
||||
"event_url": event_url,
|
||||
"start_at": start_at,
|
||||
"end_at": end_at,
|
||||
"timezone": timezone,
|
||||
"location": location or "No location",
|
||||
"city": city,
|
||||
"hosts": host_names,
|
||||
"document_type": "Luma Event",
|
||||
"connector_type": "Luma",
|
||||
}
|
||||
(
|
||||
summary_content,
|
||||
summary_embedding,
|
||||
) = await generate_document_summary(
|
||||
event_markdown, user_llm, document_metadata
|
||||
)
|
||||
else:
|
||||
# Fallback to simple summary if no LLM configured
|
||||
summary_content = f"Luma Event: {event_name}\n\n"
|
||||
if event_url:
|
||||
summary_content += f"URL: {event_url}\n"
|
||||
summary_content += f"Start: {start_at}\n"
|
||||
summary_content += f"End: {end_at}\n"
|
||||
if timezone:
|
||||
summary_content += f"Timezone: {timezone}\n"
|
||||
if location:
|
||||
summary_content += f"Location: {location}\n"
|
||||
if city:
|
||||
summary_content += f"City: {city}\n"
|
||||
if host_names:
|
||||
summary_content += f"Hosts: {host_names}\n"
|
||||
if description:
|
||||
desc_preview = description[:300]
|
||||
if len(description) > 300:
|
||||
desc_preview += "..."
|
||||
summary_content += f"Description: {desc_preview}\n"
|
||||
|
||||
summary_embedding = config.embedding_model_instance.embed(
|
||||
summary_content
|
||||
)
|
||||
|
||||
chunks = await create_document_chunks(event_markdown)
|
||||
|
||||
document = Document(
|
||||
search_space_id=search_space_id,
|
||||
title=f"Luma Event - {event_name}",
|
||||
document_type=DocumentType.LUMA_CONNECTOR,
|
||||
document_metadata={
|
||||
"event_id": event_id,
|
||||
"event_name": event_name,
|
||||
"event_url": event_url,
|
||||
"start_at": start_at,
|
||||
"end_at": end_at,
|
||||
"timezone": timezone,
|
||||
"location": location,
|
||||
"city": city,
|
||||
"hosts": host_names,
|
||||
"cover_url": cover_url,
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
},
|
||||
content=summary_content,
|
||||
content_hash=content_hash,
|
||||
embedding=summary_embedding,
|
||||
chunks=chunks,
|
||||
)
|
||||
|
||||
session.add(document)
|
||||
documents_indexed += 1
|
||||
logger.info(f"Successfully indexed new event {event_name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing event {event.get('name', 'Unknown')}: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
skipped_events.append(
|
||||
f"{event.get('name', 'Unknown')} (processing error)"
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
total_processed = documents_indexed
|
||||
if total_processed > 0:
|
||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||
|
||||
await session.commit()
|
||||
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"Successfully completed Luma indexing for connector {connector_id}",
|
||||
{
|
||||
"events_processed": total_processed,
|
||||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"skipped_events_count": len(skipped_events),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Luma indexing completed: {documents_indexed} new events, {documents_skipped} skipped"
|
||||
)
|
||||
return total_processed, None
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Database error during Luma indexing for connector {connector_id}",
|
||||
str(db_error),
|
||||
{"error_type": "SQLAlchemyError"},
|
||||
)
|
||||
logger.error(f"Database error: {db_error!s}", exc_info=True)
|
||||
return 0, f"Database error: {db_error!s}"
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
f"Failed to index Luma events for connector {connector_id}",
|
||||
str(e),
|
||||
{"error_type": type(e).__name__},
|
||||
)
|
||||
logger.error(f"Failed to index Luma events: {e!s}", exc_info=True)
|
||||
return 0, f"Failed to index Luma events: {e!s}"
|
||||
Loading…
Add table
Add a link
Reference in a new issue