refactor: update Discord message indexing logic

- Enhanced the indexing process for Discord messages to treat each message as an individual document, improving metadata handling and content management.
- Replaced the announcement banner component and related state management with a more streamlined approach, removing unnecessary files and simplifying the dashboard layout.
- Updated logging messages for clarity and accuracy regarding processed messages.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-01-05 22:18:25 -08:00
parent afe63943f2
commit aac0432023
4 changed files with 129 additions and 237 deletions

View file

@ -11,17 +11,15 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.discord_connector import DiscordConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
build_document_metadata_string,
build_document_metadata_markdown,
check_document_by_unique_identifier,
get_connector_by_id,
get_current_timestamp,
@ -336,207 +334,155 @@ async def index_discord_messages(
documents_skipped += 1
continue
# Convert messages to markdown format
channel_content = (
f"# Discord Channel: {guild_name} / {channel_name}\n\n"
)
# Process each message as an individual document (like Slack)
for msg in formatted_messages:
user_name = msg.get("author_name", "Unknown User")
timestamp = msg.get("created_at", "Unknown Time")
text = msg.get("content", "")
channel_content += (
f"## {user_name} ({timestamp})\n\n{text}\n\n---\n\n"
msg_id = msg.get("id", "")
msg_user_name = msg.get("author_name", "Unknown User")
msg_timestamp = msg.get("created_at", "Unknown Time")
msg_text = msg.get("content", "")
# Format document metadata (similar to Slack)
metadata_sections = [
(
"METADATA",
[
f"GUILD_NAME: {guild_name}",
f"GUILD_ID: {guild_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_TIMESTAMP: {msg_timestamp}",
f"MESSAGE_USER_NAME: {msg_user_name}",
],
),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
msg_text,
"TEXT_END",
],
),
]
# Build the document string
combined_document_string = build_document_metadata_markdown(
metadata_sections
)
# Metadata sections
metadata_sections = [
(
"METADATA",
[
f"GUILD_NAME: {guild_name}",
f"GUILD_ID: {guild_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_COUNT: {len(formatted_messages)}",
],
),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
channel_content,
"TEXT_END",
],
),
]
# Generate unique identifier hash for this Discord message
unique_identifier = f"{channel_id}_{msg_id}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DISCORD_CONNECTOR,
unique_identifier,
search_space_id,
)
combined_document_string = build_document_metadata_string(
metadata_sections
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Generate unique identifier hash for this Discord channel
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DISCORD_CONNECTOR, channel_id, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Generate content hash
content_hash = generate_content_hash(
combined_document_string, search_space_id
)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Discord channel {guild_name}#{channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Discord channel {guild_name}#{channel_name}. Updating document."
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(
f"No long context LLM configured for user {user_id}"
)
skipped_channels.append(
f"{guild_name}#{channel_name} (no LLM configured)"
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Discord message {msg_id} in {guild_name}#{channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Discord message {msg_id} in {guild_name}#{channel_name}. Updating document."
)
# Generate summary with metadata
document_metadata = {
"guild_name": guild_name,
"channel_name": channel_name,
"message_count": len(formatted_messages),
"document_type": "Discord Channel Messages",
"connector_type": "Discord",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
combined_document_string,
user_llm,
document_metadata,
)
# Update chunks and embedding
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Chunks from channel content
chunks = await create_document_chunks(channel_content)
# Update existing document
existing_document.content = combined_document_string
existing_document.content_hash = content_hash
existing_document.embedding = doc_embedding
existing_document.document_metadata = {
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Update existing document
existing_document.title = (
f"Discord - {guild_name}#{channel_name}"
)
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
# Delete old chunks and add new ones
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
f"Successfully updated Discord message {msg_id}"
)
continue
# Document doesn't exist - create new one
# Process chunks
chunks = await create_document_chunks(combined_document_string)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Discord - {guild_name}#{channel_name}",
document_type=DocumentType.DISCORD_CONNECTOR,
document_metadata={
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_count": len(formatted_messages),
"start_date": start_date_iso,
"end_date": end_date_iso,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
},
content=combined_document_string,
embedding=doc_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
updated_at=get_current_timestamp(),
)
documents_indexed += 1
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Successfully updated Discord channel {guild_name}#{channel_name}"
f"Committing batch: {documents_indexed} Discord messages processed so far"
)
continue
await session.commit()
# Document doesn't exist - create new one
# Get user's long context LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if not user_llm:
logger.error(
f"No long context LLM configured for user {user_id}"
)
skipped_channels.append(
f"{guild_name}#{channel_name} (no LLM configured)"
)
documents_skipped += 1
continue
# Generate summary with metadata
document_metadata = {
"guild_name": guild_name,
"channel_name": channel_name,
"message_count": len(formatted_messages),
"document_type": "Discord Channel Messages",
"connector_type": "Discord",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
combined_document_string, user_llm, document_metadata
)
# Chunks from channel content
chunks = await create_document_chunks(channel_content)
# Create and store new document
document = Document(
search_space_id=search_space_id,
title=f"Discord - {guild_name}#{channel_name}",
document_type=DocumentType.DISCORD_CONNECTOR,
document_metadata={
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_count": len(formatted_messages),
"start_date": start_date_iso,
"end_date": end_date_iso,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
)
session.add(document)
documents_indexed += 1
logger.info(
f"Successfully indexed new channel {guild_name}#{channel_name} with {len(formatted_messages)} messages"
f"Successfully indexed channel {guild_name}#{channel_name} with {len(formatted_messages)} messages"
)
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Discord channels processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing guild {guild_name}: {e!s}", exc_info=True
@ -553,7 +499,7 @@ async def index_discord_messages(
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} Discord channels processed"
f"Final commit: Total {documents_indexed} Discord messages processed"
)
await session.commit()
@ -561,18 +507,18 @@ async def index_discord_messages(
result_message = None
if skipped_channels:
result_message = (
f"Processed {documents_indexed} channels. Skipped {len(skipped_channels)} channels: "
f"Processed {documents_indexed} messages. Skipped {len(skipped_channels)} channels: "
+ ", ".join(skipped_channels)
)
else:
result_message = f"Processed {documents_indexed} channels."
result_message = f"Processed {documents_indexed} messages."
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Discord indexing for connector {connector_id}",
{
"channels_processed": documents_indexed,
"messages_processed": documents_indexed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_channels_count": len(skipped_channels),
@ -582,7 +528,7 @@ async def index_discord_messages(
)
logger.info(
f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped"
f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped"
)
return documents_indexed, result_message