feat: Moved searchconnectors association from user to searchspace

- Need to move llm configs to searchspace
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-10-08 21:13:01 -07:00
parent b3d8279931
commit aea09a5dad
36 changed files with 578 additions and 223 deletions

View file

@ -0,0 +1,159 @@
"""Associate SearchSourceConnector with SearchSpace instead of User
Revision ID: '23'
Revises: '22'
Create Date: 2025-01-10 12:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "23"
down_revision: str | None = "22"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""
Add search_space_id to SearchSourceConnector and update unique constraint.
Changes:
1. Add search_space_id column (nullable initially)
2. Populate search_space_id with user's first search space
3. Make search_space_id NOT NULL
4. Add foreign key constraint
5. Drop old unique constraint (user_id, connector_type)
6. Add new unique constraint (search_space_id, user_id, connector_type)
"""
from sqlalchemy import inspect
conn = op.get_bind()
inspector = inspect(conn)
# Get existing columns
columns = [col["name"] for col in inspector.get_columns("search_source_connectors")]
# Step 1: Add search_space_id column as nullable first (if it doesn't exist)
if "search_space_id" not in columns:
op.add_column(
"search_source_connectors",
sa.Column("search_space_id", sa.Integer(), nullable=True),
)
# Step 2: Populate search_space_id with each user's first search space
# This ensures existing connectors are assigned to a valid search space
op.execute(
"""
UPDATE search_source_connectors ssc
SET search_space_id = (
SELECT id
FROM searchspaces ss
WHERE ss.user_id = ssc.user_id
ORDER BY ss.created_at ASC
LIMIT 1
)
WHERE search_space_id IS NULL
"""
)
# Step 3: Make search_space_id NOT NULL
op.alter_column(
"search_source_connectors",
"search_space_id",
nullable=False,
)
# Step 4: Add foreign key constraint (if it doesn't exist)
foreign_keys = [
fk["name"] for fk in inspector.get_foreign_keys("search_source_connectors")
]
if "fk_search_source_connectors_search_space_id" not in foreign_keys:
op.create_foreign_key(
"fk_search_source_connectors_search_space_id",
"search_source_connectors",
"searchspaces",
["search_space_id"],
["id"],
ondelete="CASCADE",
)
# Step 5: Drop the old unique constraint (user_id, connector_type) if it exists
unique_constraints = [
uc["name"]
for uc in inspector.get_unique_constraints("search_source_connectors")
]
if "uq_user_connector_type" in unique_constraints:
op.drop_constraint(
"uq_user_connector_type",
"search_source_connectors",
type_="unique",
)
# Step 6: Create new unique constraint (search_space_id, user_id, connector_type) if it doesn't exist
if "uq_searchspace_user_connector_type" not in unique_constraints:
op.create_unique_constraint(
"uq_searchspace_user_connector_type",
"search_source_connectors",
["search_space_id", "user_id", "connector_type"],
)
def downgrade() -> None:
"""
Revert SearchSourceConnector association back to User only.
WARNING: This downgrade may result in data loss if multiple connectors
of the same type exist for a user across different search spaces.
"""
from sqlalchemy import inspect
conn = op.get_bind()
inspector = inspect(conn)
# Get existing constraints and columns
unique_constraints = [
uc["name"]
for uc in inspector.get_unique_constraints("search_source_connectors")
]
foreign_keys = [
fk["name"] for fk in inspector.get_foreign_keys("search_source_connectors")
]
columns = [col["name"] for col in inspector.get_columns("search_source_connectors")]
# Step 1: Drop the new unique constraint if it exists
if "uq_searchspace_user_connector_type" in unique_constraints:
op.drop_constraint(
"uq_searchspace_user_connector_type",
"search_source_connectors",
type_="unique",
)
# Step 2: Recreate the old unique constraint (user_id, connector_type) if it doesn't exist
# NOTE: This will fail if there are duplicate (user_id, connector_type) combinations
# Manual cleanup may be required before downgrading
if "uq_user_connector_type" not in unique_constraints:
op.create_unique_constraint(
"uq_user_connector_type",
"search_source_connectors",
["user_id", "connector_type"],
)
# Step 3: Drop the foreign key constraint if it exists
if "fk_search_source_connectors_search_space_id" in foreign_keys:
op.drop_constraint(
"fk_search_source_connectors_search_space_id",
"search_source_connectors",
type_="foreignkey",
)
# Step 4: Drop the search_space_id column if it exists
if "search_space_id" in columns:
op.drop_column("search_source_connectors", "search_space_id")

View file

@ -0,0 +1,39 @@
"""Fix NULL chat types by setting them to QNA
Revision ID: 24
Revises: 23
Create Date: 2025-01-10 14:00:00.000000
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "24"
down_revision: str | None = "23"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""
Fix any chats with NULL type values by setting them to QNA.
This handles edge cases from previous migrations where type values were not properly migrated.
"""
# Update any NULL type values to QNA (the default chat type)
op.execute(
"""
UPDATE chats
SET type = 'QNA'
WHERE type IS NULL
"""
)
def downgrade() -> None:
"""
No downgrade necessary - we can't restore NULL values as we don't know which ones were NULL.
"""
pass

View file

@ -1010,7 +1010,10 @@ async def fetch_relevant_documents(
source_object,
tavily_chunks,
) = await connector_service.search_tavily(
user_query=reformulated_query, user_id=user_id, top_k=top_k
user_query=reformulated_query,
user_id=user_id,
search_space_id=search_space_id,
top_k=top_k,
)
# Add to sources and raw documents
@ -1037,6 +1040,7 @@ async def fetch_relevant_documents(
) = await connector_service.search_linkup(
user_query=reformulated_query,
user_id=user_id,
search_space_id=search_space_id,
mode=linkup_mode,
)

View file

@ -31,15 +31,20 @@ class GoogleCalendarConnector:
credentials: Credentials,
session: AsyncSession,
user_id: str,
connector_id: int | None = None,
):
"""
Initialize the GoogleCalendarConnector class.
Args:
credentials: Google OAuth Credentials object
session: Database session for updating connector
user_id: User ID (kept for backward compatibility)
connector_id: Optional connector ID for direct updates
"""
self._credentials = credentials
self._session = session
self._user_id = user_id
self._connector_id = connector_id
self.service = None
async def _get_credentials(
@ -84,17 +89,25 @@ class GoogleCalendarConnector:
self._credentials.refresh(Request())
# Update the connector config in DB
if self._session:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == self._user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
# Use connector_id if available, otherwise fall back to user_id query
if self._connector_id:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
else:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == self._user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
)
)
)
connector = result.scalars().first()
if connector is None:
raise RuntimeError(
"GOOGLE_CALENDAR_CONNECTOR connector not found for current user; cannot persist refreshed token."
"GOOGLE_CALENDAR_CONNECTOR connector not found; cannot persist refreshed token."
)
connector.config = json.loads(self._credentials.to_json())
flag_modified(connector, "config")

View file

@ -30,15 +30,20 @@ class GoogleGmailConnector:
credentials: Credentials,
session: AsyncSession,
user_id: str,
connector_id: int | None = None,
):
"""
Initialize the GoogleGmailConnector class.
Args:
credentials: Google OAuth Credentials object
session: Database session for updating connector
user_id: User ID (kept for backward compatibility)
connector_id: Optional connector ID for direct updates
"""
self._credentials = credentials
self._session = session
self._user_id = user_id
self._connector_id = connector_id
self.service = None
async def _get_credentials(
@ -83,17 +88,25 @@ class GoogleGmailConnector:
self._credentials.refresh(Request())
# Update the connector config in DB
if self._session:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == self._user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
# Use connector_id if available, otherwise fall back to user_id query
if self._connector_id:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
else:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == self._user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
)
)
)
connector = result.scalars().first()
if connector is None:
raise RuntimeError(
"GMAIL connector not found for current user; cannot persist refreshed token."
"GMAIL connector not found; cannot persist refreshed token."
)
connector.config = json.loads(self._credentials.to_json())
flag_modified(connector, "config")

View file

@ -234,12 +234,23 @@ class SearchSpace(BaseModel, TimestampMixin):
order_by="Log.id",
cascade="all, delete-orphan",
)
search_source_connectors = relationship(
"SearchSourceConnector",
back_populates="search_space",
order_by="SearchSourceConnector.id",
cascade="all, delete-orphan",
)
class SearchSourceConnector(BaseModel, TimestampMixin):
__tablename__ = "search_source_connectors"
__table_args__ = (
UniqueConstraint("user_id", "connector_type", name="uq_user_connector_type"),
UniqueConstraint(
"search_space_id",
"user_id",
"connector_type",
name="uq_searchspace_user_connector_type",
),
)
name = Column(String(100), nullable=False, index=True)
@ -248,10 +259,16 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
last_indexed_at = Column(TIMESTAMP(timezone=True), nullable=True)
config = Column(JSON, nullable=False)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)
search_space = relationship(
"SearchSpace", back_populates="search_source_connectors"
)
user_id = Column(
UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
user = relationship("User", back_populates="search_source_connectors")
class LLMConfig(BaseModel, TimestampMixin):
@ -304,9 +321,6 @@ if config.AUTH_TYPE == "GOOGLE":
"OAuthAccount", lazy="joined"
)
search_spaces = relationship("SearchSpace", back_populates="user")
search_source_connectors = relationship(
"SearchSourceConnector", back_populates="user"
)
llm_configs = relationship(
"LLMConfig",
back_populates="user",
@ -338,9 +352,6 @@ else:
class User(SQLAlchemyBaseUserTableUUID, Base):
search_spaces = relationship("SearchSpace", back_populates="user")
search_source_connectors = relationship(
"SearchSourceConnector", back_populates="user"
)
llm_configs = relationship(
"LLMConfig",
back_populates="user",

View file

@ -217,9 +217,10 @@ async def airtable_callback(
scope=token_json.get("scope"),
)
# Check if connector already exists for this user
# Check if connector already exists for this search space and user
existing_connector_result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.AIRTABLE_CONNECTOR,
@ -232,7 +233,9 @@ async def airtable_callback(
existing_connector.config = credentials.to_dict()
existing_connector.name = "Airtable Connector"
existing_connector.is_indexable = True
logger.info(f"Updated existing Airtable connector for user {user_id}")
logger.info(
f"Updated existing Airtable connector for user {user_id} in space {space_id}"
)
else:
# Create new connector
new_connector = SearchSourceConnector(
@ -240,10 +243,13 @@ async def airtable_callback(
connector_type=SearchSourceConnectorType.AIRTABLE_CONNECTOR,
is_indexable=True,
config=credentials.to_dict(),
search_space_id=space_id,
user_id=user_id,
)
session.add(new_connector)
logger.info(f"Created new Airtable connector for user {user_id}")
logger.info(
f"Created new Airtable connector for user {user_id} in space {space_id}"
)
try:
await session.commit()

View file

@ -105,9 +105,10 @@ async def calendar_callback(
creds_dict = json.loads(creds.to_json())
try:
# Check if a connector with the same type already exists for this user
# Check if a connector with the same type already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
@ -117,12 +118,13 @@ async def calendar_callback(
if existing_connector:
raise HTTPException(
status_code=409,
detail="A GOOGLE_CALENDAR_CONNECTOR connector already exists. Each user can have only one connector of each type.",
detail="A GOOGLE_CALENDAR_CONNECTOR connector already exists in this search space. Each search space can have only one connector of each type per user.",
)
db_connector = SearchSourceConnector(
name="Google Calendar Connector",
connector_type=SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
config=creds_dict,
search_space_id=space_id,
user_id=user_id,
is_indexable=True,
)

View file

@ -104,9 +104,10 @@ async def gmail_callback(
creds_dict = json.loads(creds.to_json())
try:
# Check if a connector with the same type already exists for this user
# Check if a connector with the same type already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
@ -116,12 +117,13 @@ async def gmail_callback(
if existing_connector:
raise HTTPException(
status_code=409,
detail="A GOOGLE_GMAIL_CONNECTOR connector already exists. Each user can have only one connector of each type.",
detail="A GOOGLE_GMAIL_CONNECTOR connector already exists in this search space. Each search space can have only one connector of each type per user.",
)
db_connector = SearchSourceConnector(
name="Google Gmail Connector",
connector_type=SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
config=creds_dict,
search_space_id=space_id,
user_id=user_id,
is_indexable=True,
)

View file

@ -47,9 +47,10 @@ async def add_luma_connector(
HTTPException: If connector already exists or validation fails
"""
try:
# Check if a Luma connector already exists for this user
# Check if a Luma connector already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == request.space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LUMA_CONNECTOR,
@ -64,7 +65,9 @@ async def add_luma_connector(
await session.commit()
await session.refresh(existing_connector)
logger.info(f"Updated existing Luma connector for user {user.id}")
logger.info(
f"Updated existing Luma connector for user {user.id} in space {request.space_id}"
)
return {
"message": "Luma connector updated successfully",
@ -77,6 +80,7 @@ async def add_luma_connector(
name="Luma Event Connector",
connector_type=SearchSourceConnectorType.LUMA_CONNECTOR,
config={"api_key": request.api_key},
search_space_id=request.space_id,
user_id=user.id,
is_indexable=True,
)
@ -113,13 +117,15 @@ async def add_luma_connector(
@router.delete("/connectors/luma")
async def delete_luma_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Delete the Luma connector for the authenticated user.
Delete the Luma connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
@ -132,6 +138,7 @@ async def delete_luma_connector(
try:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LUMA_CONNECTOR,
@ -165,13 +172,15 @@ async def delete_luma_connector(
@router.get("/connectors/luma/test")
async def test_luma_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Test the Luma connector for the authenticated user.
Test the Luma connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
@ -182,9 +191,10 @@ async def test_luma_connector(
HTTPException: If connector doesn't exist or test fails
"""
try:
# Get the Luma connector for this user
# Get the Luma connector for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.LUMA_CONNECTOR,

View file

@ -1,13 +1,13 @@
"""
SearchSourceConnector routes for CRUD operations:
POST /search-source-connectors/ - Create a new connector
GET /search-source-connectors/ - List all connectors for the current user
GET /search-source-connectors/ - List all connectors for the current user (optionally filtered by search space)
GET /search-source-connectors/{connector_id} - Get a specific connector
PUT /search-source-connectors/{connector_id} - Update a specific connector
DELETE /search-source-connectors/{connector_id} - Delete a specific connector
POST /search-source-connectors/{connector_id}/index - Index content from a connector to a search space
Note: Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, NOTION_CONNECTOR, GITHUB_CONNECTOR, LINEAR_CONNECTOR, DISCORD_CONNECTOR, LUMA_CONNECTOR).
Note: Each search space can have only one connector of each type per user (based on search_space_id, user_id, and connector_type).
"""
import logging
@ -93,19 +93,26 @@ async def list_github_repositories(
@router.post("/search-source-connectors/", response_model=SearchSourceConnectorRead)
async def create_search_source_connector(
connector: SearchSourceConnectorCreate,
search_space_id: int = Query(
..., description="ID of the search space to associate the connector with"
),
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""
Create a new search source connector.
Each user can have only one connector of each type (SERPER_API, TAVILY_API, SLACK_CONNECTOR, etc.).
Each search space can have only one connector of each type per user (based on search_space_id, user_id, and connector_type).
The config must contain the appropriate keys for the connector type.
"""
try:
# Check if a connector with the same type already exists for this user
# Check if the search space belongs to the user
await check_ownership(session, SearchSpace, search_space_id, user)
# Check if a connector with the same type already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type == connector.connector_type,
)
@ -114,9 +121,11 @@ async def create_search_source_connector(
if existing_connector:
raise HTTPException(
status_code=409,
detail=f"A connector with type {connector.connector_type} already exists. Each user can have only one connector of each type.",
detail=f"A connector with type {connector.connector_type} already exists in this search space. Each search space can have only one connector of each type per user.",
)
db_connector = SearchSourceConnector(**connector.model_dump(), user_id=user.id)
db_connector = SearchSourceConnector(
**connector.model_dump(), search_space_id=search_space_id, user_id=user.id
)
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
@ -128,7 +137,7 @@ async def create_search_source_connector(
await session.rollback()
raise HTTPException(
status_code=409,
detail=f"Integrity error: A connector with this type already exists. {e!s}",
detail=f"Integrity error: A connector with this type already exists in this search space. {e!s}",
) from e
except HTTPException:
await session.rollback()
@ -152,13 +161,19 @@ async def read_search_source_connectors(
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""List all search source connectors for the current user."""
"""List all search source connectors for the current user, optionally filtered by search space."""
try:
query = select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user.id
)
# No need to filter by search_space_id as connectors are user-owned, not search space specific
# Filter by search_space_id if provided
if search_space_id is not None:
# Verify the search space belongs to the user
await check_ownership(session, SearchSpace, search_space_id, user)
query = query.filter(
SearchSourceConnector.search_space_id == search_space_id
)
result = await session.execute(query.offset(skip).limit(limit))
return result.scalars().all()
@ -255,6 +270,8 @@ async def update_search_source_connector(
if key == "connector_type" and value != db_connector.connector_type:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id
== db_connector.search_space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type == value,
SearchSourceConnector.id != connector_id,
@ -264,7 +281,7 @@ async def update_search_source_connector(
if existing_connector:
raise HTTPException(
status_code=409,
detail=f"A connector with type {value} already exists. Each user can have only one connector of each type.",
detail=f"A connector with type {value} already exists in this search space. Each search space can have only one connector of each type per user.",
)
setattr(db_connector, key, value)

View file

@ -224,6 +224,7 @@ class SearchSourceConnectorUpdate(BaseModel):
class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampModel):
search_space_id: int
user_id: uuid.UUID
model_config = ConfigDict(from_attributes=True)

View file

@ -236,28 +236,37 @@ class ConnectorService:
return transformed_results
async def get_connector_by_type(
self, user_id: str, connector_type: SearchSourceConnectorType
self,
user_id: str,
connector_type: SearchSourceConnectorType,
search_space_id: int | None = None,
) -> SearchSourceConnector | None:
"""
Get a connector by type for a specific user
Get a connector by type for a specific user and optionally a search space
Args:
user_id: The user's ID
connector_type: The connector type to retrieve
search_space_id: Optional search space ID to filter by
Returns:
Optional[SearchSourceConnector]: The connector if found, None otherwise
"""
result = await self.session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == connector_type,
)
query = select(SearchSourceConnector).filter(
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.connector_type == connector_type,
)
if search_space_id is not None:
query = query.filter(
SearchSourceConnector.search_space_id == search_space_id
)
result = await self.session.execute(query)
return result.scalars().first()
async def search_tavily(
self, user_query: str, user_id: str, top_k: int = 20
self, user_query: str, user_id: str, search_space_id: int, top_k: int = 20
) -> tuple:
"""
Search using Tavily API and return both the source information and documents
@ -265,6 +274,7 @@ class ConnectorService:
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID
top_k: Maximum number of results to return
Returns:
@ -272,7 +282,7 @@ class ConnectorService:
"""
# Get Tavily connector configuration
tavily_connector = await self.get_connector_by_type(
user_id, SearchSourceConnectorType.TAVILY_API
user_id, SearchSourceConnectorType.TAVILY_API, search_space_id
)
if not tavily_connector:
@ -1637,7 +1647,11 @@ class ConnectorService:
return result_object, clickup_chunks
async def search_linkup(
self, user_query: str, user_id: str, mode: str = "standard"
self,
user_query: str,
user_id: str,
search_space_id: int,
mode: str = "standard",
) -> tuple:
"""
Search using Linkup API and return both the source information and documents
@ -1645,6 +1659,7 @@ class ConnectorService:
Args:
user_query: The user's query
user_id: The user's ID
search_space_id: The search space ID
mode: Search depth mode, can be "standard" or "deep"
Returns:
@ -1652,7 +1667,7 @@ class ConnectorService:
"""
# Get Linkup connector configuration
linkup_connector = await self.get_connector_by_type(
user_id, SearchSourceConnectorType.LINKUP_API
user_id, SearchSourceConnectorType.LINKUP_API, search_space_id
)
if not linkup_connector:

View file

@ -113,7 +113,10 @@ async def index_google_calendar_events(
)
calendar_client = GoogleCalendarConnector(
credentials=credentials, session=session, user_id=user_id
credentials=credentials,
session=session,
user_id=user_id,
connector_id=connector_id,
)
# Calculate date range

View file

@ -127,7 +127,9 @@ async def index_google_gmail_messages(
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(credentials, session, user_id)
gmail_connector = GoogleGmailConnector(
credentials, session, user_id, connector_id
)
# Fetch recent Google gmail messages
logger.info(f"Fetching recent emails for connector {connector_id}")