feat: add created_by_id column to documents for ownership tracking and update related connectors

This commit is contained in:
Anish Sarkar 2026-02-02 12:32:24 +05:30
parent e7c17c327c
commit e0ade20e68
29 changed files with 214 additions and 1 deletions

View file

@ -0,0 +1,126 @@
"""Add created_by_id column to documents table for document ownership tracking
Revision ID: 86
Revises: 85
Create Date: 2026-02-02
Changes:
1. Add created_by_id column (UUID, nullable, foreign key to user.id)
2. Create index on created_by_id for performance
3. Backfill existing documents with search space owner's user_id
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "86"
down_revision: str | None = "85"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Add created_by_id column to documents and backfill with search space owner."""
# 1. Add created_by_id column (nullable for backward compatibility)
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'created_by_id'
) THEN
ALTER TABLE documents
ADD COLUMN created_by_id UUID;
END IF;
END$$;
"""
)
# 2. Create index on created_by_id for efficient queries
op.execute(
"""
CREATE INDEX IF NOT EXISTS ix_documents_created_by_id
ON documents (created_by_id);
"""
)
# 3. Add foreign key constraint with ON DELETE SET NULL
# First check if constraint already exists
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_created_by_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
ADD CONSTRAINT fk_documents_created_by_id
FOREIGN KEY (created_by_id) REFERENCES "user"(id)
ON DELETE SET NULL;
END IF;
END$$;
"""
)
# 4. Backfill existing documents with search space owner's user_id
# This ensures all existing documents are associated with the search space owner
op.execute(
"""
UPDATE documents
SET created_by_id = searchspaces.user_id
FROM searchspaces
WHERE documents.search_space_id = searchspaces.id
AND documents.created_by_id IS NULL;
"""
)
def downgrade() -> None:
"""Remove created_by_id column from documents."""
# Drop foreign key constraint
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_documents_created_by_id'
AND table_name = 'documents'
) THEN
ALTER TABLE documents
DROP CONSTRAINT fk_documents_created_by_id;
END IF;
END$$;
"""
)
# Drop index
op.execute(
"""
DROP INDEX IF EXISTS ix_documents_created_by_id;
"""
)
# Drop column
op.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'created_by_id'
) THEN
ALTER TABLE documents
DROP COLUMN created_by_id;
END IF;
END$$;
"""
)

View file

@ -394,6 +394,7 @@ async def _process_gmail_message_batch(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
documents_indexed += 1

View file

@ -442,6 +442,7 @@ async def index_composio_google_calendar(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
documents_indexed += 1

View file

@ -1258,6 +1258,7 @@ async def _process_single_drive_file(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -749,7 +749,18 @@ class Document(BaseModel, TimestampMixin):
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)
# Track who created/uploaded this document
created_by_id = Column(
UUID(as_uuid=True),
ForeignKey("user.id", ondelete="SET NULL"),
nullable=True, # Nullable for backward compatibility with existing records
index=True,
)
# Relationships
search_space = relationship("SearchSpace", back_populates="documents")
created_by = relationship("User", back_populates="documents")
chunks = relationship(
"Chunk", back_populates="document", cascade="all, delete-orphan"
)
@ -1284,6 +1295,13 @@ if config.AUTH_TYPE == "GOOGLE":
passive_deletes=True,
)
# Documents created/uploaded by this user
documents = relationship(
"Document",
back_populates="created_by",
passive_deletes=True,
)
# User memories for personalized AI responses
memories = relationship(
"UserMemory",
@ -1342,6 +1360,13 @@ else:
passive_deletes=True,
)
# Documents created/uploaded by this user
documents = relationship(
"Document",
back_populates="created_by",
passive_deletes=True,
)
# User memories for personalized AI responses
memories = relationship(
"UserMemory",

View file

@ -76,6 +76,7 @@ async def create_note(
document_metadata={"NOTE": True},
embedding=None, # Will be generated on first reindex
updated_at=datetime.now(UTC),
created_by_id=user.id, # Track who created this note
)
session.add(document)
@ -93,6 +94,7 @@ async def create_note(
search_space_id=document.search_space_id,
created_at=document.created_at,
updated_at=document.updated_at,
created_by_id=document.created_by_id,
)

View file

@ -1,5 +1,6 @@
from datetime import datetime
from typing import TypeVar
from uuid import UUID
from pydantic import BaseModel, ConfigDict
@ -51,6 +52,7 @@ class DocumentRead(BaseModel):
created_at: datetime
updated_at: datetime | None
search_space_id: int
created_by_id: UUID | None = None # User who created/uploaded this document
model_config = ConfigDict(from_attributes=True)

View file

@ -417,6 +417,7 @@ async def index_airtable_records(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -396,6 +396,7 @@ async def index_bookstack_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -395,6 +395,7 @@ async def index_clickup_tasks(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -402,6 +402,7 @@ async def index_confluence_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -527,6 +527,7 @@ async def index_discord_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -292,6 +292,7 @@ async def index_elasticsearch_documents(
document_metadata=metadata,
search_space_id=search_space_id,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
# Create chunks and attach to document (persist via relationship)

View file

@ -426,6 +426,7 @@ async def _process_repository_digest(
search_space_id=search_space_id,
chunks=chunks_data,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -499,6 +499,7 @@ async def index_google_calendar_events(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -421,6 +421,7 @@ async def index_google_gmail_messages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
documents_indexed += 1

View file

@ -380,6 +380,7 @@ async def index_jira_issues(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -413,6 +413,7 @@ async def index_linear_issues(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -476,6 +476,7 @@ async def index_luma_events(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -470,6 +470,7 @@ async def index_notion_pages(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -500,6 +500,7 @@ async def index_obsidian_vault(
embedding=embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(new_document)

View file

@ -389,6 +389,7 @@ async def index_slack_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -430,6 +430,7 @@ async def index_teams_messages(
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -371,6 +371,7 @@ async def index_crawled_urls(
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -8,10 +8,17 @@ and stores it as searchable documents in the database.
import logging
from typing import Any
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.db import (
Document,
DocumentType,
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
)
from app.services.llm_service import get_document_summary_llm
from app.utils.document_converters import (
create_document_chunks,
@ -125,6 +132,30 @@ async def add_circleback_meeting_document(
**metadata,
}
# Fetch the user who set up the Circleback connector (preferred)
# or fall back to search space owner if no connector found
created_by_user_id = None
# Try to find the Circleback connector for this search space
connector_result = await session.execute(
select(SearchSourceConnector.user_id).where(
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.CIRCLEBACK_CONNECTOR,
)
)
connector_user = connector_result.scalar_one_or_none()
if connector_user:
# Use the user who set up the Circleback connector
created_by_user_id = connector_user
else:
# Fallback: use search space owner if no connector found
search_space_result = await session.execute(
select(SearchSpace.user_id).where(SearchSpace.id == search_space_id)
)
created_by_user_id = search_space_result.scalar_one_or_none()
# Update or create document
if existing_document:
# Update existing document
@ -160,6 +191,7 @@ async def add_circleback_meeting_document(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=created_by_user_id,
)
session.add(document)

View file

@ -185,6 +185,7 @@ async def add_extension_received_document(
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -526,6 +526,7 @@ async def add_received_file_document_using_unstructured(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
@ -665,6 +666,7 @@ async def add_received_file_document_using_llamacloud(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)
@ -829,6 +831,7 @@ async def add_received_file_document_using_docling(
blocknote_document=blocknote_json,
content_needs_reindexing=False,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -295,6 +295,7 @@ async def add_received_markdown_file_document(
unique_identifier_hash=primary_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)

View file

@ -357,6 +357,7 @@ async def add_youtube_video_document(
unique_identifier_hash=unique_identifier_hash,
blocknote_document=blocknote_json,
updated_at=get_current_timestamp(),
created_by_id=user_id,
)
session.add(document)