diff --git a/surfsense_backend/app/connectors/dexscreener_connector.py b/surfsense_backend/app/connectors/dexscreener_connector.py new file mode 100644 index 000000000..0e635bce8 --- /dev/null +++ b/surfsense_backend/app/connectors/dexscreener_connector.py @@ -0,0 +1,252 @@ +""" +DexScreener Connector Module + +A module for retrieving cryptocurrency trading pair data from DexScreener API. +Allows fetching pair information for tracked tokens across multiple blockchain networks. +""" + +import asyncio +import logging +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + + +class DexScreenerConnector: + """Class for retrieving trading pair data from DexScreener API.""" + + def __init__(self): + """ + Initialize the DexScreenerConnector class. + + Note: DexScreener API is public and doesn't require authentication. + """ + self.base_url = "https://api.dexscreener.com/latest/dex" + self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits + + async def make_request( + self, + endpoint: str, + max_retries: int = 3 + ) -> dict[str, Any] | None: + """ + Make an async request to the DexScreener API with retry logic. + + Args: + endpoint: API endpoint path (without base URL) + max_retries: Maximum number of retry attempts for failed requests + + Returns: + Response data from the API, or None if request fails + + Raises: + Exception: If the API request fails after all retries + """ + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + for attempt in range(max_retries): + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url) + + if response.status_code == 200: + # Add delay to respect rate limits + await self._rate_limit_delay() + return response.json() + elif response.status_code == 429: + # Rate limit exceeded - exponential backoff + wait_time = (2 ** attempt) * 1.0 # 1s, 2s, 4s + logger.warning(f"Rate limit exceeded. Waiting {wait_time}s before retry...") + await asyncio.sleep(wait_time) + continue + elif response.status_code == 404: + # Token/pair not found - return None instead of raising + logger.info(f"Token not found: {endpoint}") + return None + else: + raise Exception( + f"API request failed with status code {response.status_code}: {response.text}" + ) + + except httpx.TimeoutException: + if attempt < max_retries - 1: + logger.warning(f"Request timeout. Retrying... (attempt {attempt + 1}/{max_retries})") + continue + else: + raise Exception(f"Request timeout after {max_retries} attempts") + except httpx.RequestError as e: + if attempt < max_retries - 1: + logger.warning(f"Network error: {e}. Retrying... (attempt {attempt + 1}/{max_retries})") + continue + else: + raise Exception(f"Network error after {max_retries} attempts: {e}") from e + + return None + + async def _rate_limit_delay(self): + """Add delay to respect API rate limits (300 req/min = ~200ms between requests).""" + import asyncio + await asyncio.sleep(self.rate_limit_delay) + + async def get_token_pairs( + self, + chain_id: str, + token_address: str + ) -> tuple[list[dict[str, Any]], str | None]: + """ + Fetch all trading pairs for a specific token on a blockchain. + + Args: + chain_id: Blockchain identifier (e.g., 'ethereum', 'bsc', 'polygon') + token_address: Token contract address (0x format) + + Returns: + Tuple containing (list of pairs, error message or None) + """ + try: + endpoint = f"tokens/{chain_id}/{token_address}" + response = await self.make_request(endpoint) + + if response is None: + return [], f"Token not found: {chain_id}/{token_address}" + + pairs = response.get("pairs", []) + + if not pairs: + return [], f"No trading pairs found for {chain_id}/{token_address}" + + return pairs, None + + except Exception as e: + return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}" + + def format_pair_to_markdown( + self, + pair: dict[str, Any], + token_name: str | None = None + ) -> str: + """ + Convert a trading pair to markdown format. + + Args: + pair: The pair object from DexScreener API + token_name: Optional custom name for the token + + Returns: + Markdown string representation of the trading pair + """ + # Extract pair details + pair_address = pair.get("pairAddress", "Unknown") + chain_id = pair.get("chainId", "Unknown") + dex_id = pair.get("dexId", "Unknown") + url = pair.get("url", "") + + # Extract token information + base_token = pair.get("baseToken", {}) + quote_token = pair.get("quoteToken", {}) + + base_symbol = base_token.get("symbol", "Unknown") + base_name = token_name or base_token.get("name", "Unknown") + quote_symbol = quote_token.get("symbol", "Unknown") + + # Extract price and volume data + price_native = pair.get("priceNative", "N/A") + price_usd = pair.get("priceUsd", "N/A") + + # Extract liquidity data + liquidity = pair.get("liquidity", {}) + liquidity_usd = liquidity.get("usd", 0) + + # Extract volume data + volume = pair.get("volume", {}) + volume_24h = volume.get("h24", 0) + volume_6h = volume.get("h6", 0) + volume_1h = volume.get("h1", 0) + + # Extract price change data + price_change = pair.get("priceChange", {}) + price_change_24h = price_change.get("h24", 0) + + # Extract market cap and FDV + market_cap = pair.get("marketCap", 0) + fdv = pair.get("fdv", 0) + + # Extract transaction counts + txns = pair.get("txns", {}) + txns_24h = txns.get("h24", {}) + buys_24h = txns_24h.get("buys", 0) + sells_24h = txns_24h.get("sells", 0) + + # Build markdown content + markdown_content = f"# {base_symbol}/{quote_symbol} Trading Pair\n\n" + + if token_name: + markdown_content += f"**Token:** {base_name} ({base_symbol})\n" + + markdown_content += f"**Chain:** {chain_id}\n" + markdown_content += f"**DEX:** {dex_id}\n" + markdown_content += f"**Pair Address:** `{pair_address}`\n\n" + + # Add price information + markdown_content += "## Price Information\n\n" + markdown_content += f"- **Price (USD):** ${price_usd}\n" + markdown_content += f"- **Price (Native):** {price_native} {quote_symbol}\n" + markdown_content += f"- **24h Change:** {price_change_24h:+.2f}%\n\n" + + # Add liquidity information + markdown_content += "## Liquidity\n\n" + markdown_content += f"- **Total Liquidity:** ${liquidity_usd:,.2f}\n\n" + + # Add volume information + markdown_content += "## Trading Volume\n\n" + markdown_content += f"- **24h Volume:** ${volume_24h:,.2f}\n" + markdown_content += f"- **6h Volume:** ${volume_6h:,.2f}\n" + markdown_content += f"- **1h Volume:** ${volume_1h:,.2f}\n\n" + + # Add market metrics + markdown_content += "## Market Metrics\n\n" + markdown_content += f"- **Market Cap:** ${market_cap:,.2f}\n" + markdown_content += f"- **FDV (Fully Diluted Valuation):** ${fdv:,.2f}\n\n" + + # Add transaction information + markdown_content += "## Transactions (24h)\n\n" + markdown_content += f"- **Buys:** {buys_24h}\n" + markdown_content += f"- **Sells:** {sells_24h}\n" + markdown_content += f"- **Total:** {buys_24h + sells_24h}\n\n" + + # Add link to DexScreener + if url: + markdown_content += f"**View on DexScreener:** {url}\n\n" + + return markdown_content + + +# Example usage (uncomment to use): +""" +import asyncio + +async def main(): + connector = DexScreenerConnector() + + # Example: Fetch WETH pairs on Ethereum + chain = "ethereum" + address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + + pairs, error = await connector.get_token_pairs(chain, address) + + if error: + print(f"Error: {error}") + else: + print(f"Found {len(pairs)} pairs for WETH") + + # Format first pair to markdown + if pairs: + markdown = connector.format_pair_to_markdown(pairs[0], "Wrapped Ether") + print("\nSample Pair in Markdown:\n") + print(markdown) + +if __name__ == "__main__": + asyncio.run(main()) +""" diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 360e0e975..02e89bca9 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -50,6 +50,7 @@ class DocumentType(str, Enum): GOOGLE_DRIVE_FILE = "GOOGLE_DRIVE_FILE" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" + DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR" CIRCLEBACK = "CIRCLEBACK" @@ -80,6 +81,7 @@ class SearchSourceConnectorType(str, Enum): GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR" AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR" LUMA_CONNECTOR = "LUMA_CONNECTOR" + DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR" ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR" WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR" BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR" diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index 746c18c6d..03d26d20d 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -25,6 +25,7 @@ from .jira_add_connector_route import router as jira_add_connector_router from .linear_add_connector_route import router as linear_add_connector_router from .logs_routes import router as logs_router from .luma_add_connector_route import router as luma_add_connector_router +from .dexscreener_add_connector_route import router as dexscreener_add_connector_router from .new_chat_routes import router as new_chat_router from .new_llm_config_routes import router as new_llm_config_router from .notes_routes import router as notes_router @@ -56,6 +57,7 @@ router.include_router(google_drive_add_connector_router) router.include_router(airtable_add_connector_router) router.include_router(linear_add_connector_router) router.include_router(luma_add_connector_router) +router.include_router(dexscreener_add_connector_router) router.include_router(notion_add_connector_router) router.include_router(slack_add_connector_router) router.include_router(teams_add_connector_router) diff --git a/surfsense_backend/app/routes/dexscreener_add_connector_route.py b/surfsense_backend/app/routes/dexscreener_add_connector_route.py new file mode 100644 index 000000000..cf4477fc5 --- /dev/null +++ b/surfsense_backend/app/routes/dexscreener_add_connector_route.py @@ -0,0 +1,312 @@ +import logging +import re + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field, field_validator +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.users import current_active_user + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class TokenConfig(BaseModel): + """Configuration for a single token to track.""" + + chain: str = Field(..., description="Blockchain network (e.g., ethereum, bsc, solana)", pattern=r"^[a-z0-9-]+$") + address: str = Field(..., description="Token contract address") + name: str | None = Field(None, description="Optional token name for display") + + @field_validator("address") + @classmethod + def validate_address(cls, v: str) -> str: + """Validate token address format (EVM or Solana).""" + # EVM address: 0x + 40 hex characters + if v.startswith("0x"): + if not re.match(r"^0x[a-fA-F0-9]{40}$", v): + raise ValueError("Invalid EVM address format. Must be 0x followed by 40 hex characters.") + return v + # Solana address: 32-44 base58 characters + if len(v) < 32 or len(v) > 44: + raise ValueError("Invalid Solana address format. Must be 32-44 characters.") + # Allow base58 chars only for Solana + if not re.match(r"^[1-9A-HJ-NP-Za-km-z]{32,44}$", v): + raise ValueError("Invalid Solana address format. Contains invalid characters.") + return v + + +class AddDexScreenerConnectorRequest(BaseModel): + """Request model for adding a DexScreener connector.""" + + tokens: list[TokenConfig] = Field( + ..., description="List of tokens to track (max 50)", min_length=1, max_length=50 + ) + space_id: int = Field(..., description="Search space ID") + + +@router.post("/connectors/dexscreener/add") +async def add_dexscreener_connector( + request: AddDexScreenerConnectorRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +): + """ + Add a new DexScreener connector for the authenticated user. + + Args: + request: The request containing tokens configuration and space_id + user: Current authenticated user + session: Database session + + Returns: + Success message and connector details + + Raises: + HTTPException: If connector already exists or validation fails + """ + try: + # Check if a DexScreener connector already exists for this search space and user + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == request.space_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DEXSCREENER_CONNECTOR, + ) + ) + existing_connector = result.scalars().first() + + # Convert tokens to dict format for storage + tokens_config = [token.model_dump() for token in request.tokens] + + if existing_connector: + # Update existing connector with new tokens + existing_connector.config = {"tokens": tokens_config} + existing_connector.is_indexable = True + await session.commit() + await session.refresh(existing_connector) + + logger.info( + f"Updated existing DexScreener connector for user {user.id} in space {request.space_id}" + ) + + return { + "message": "DexScreener connector updated successfully", + "connector_id": existing_connector.id, + "connector_type": "DEXSCREENER_CONNECTOR", + "tokens_count": len(tokens_config), + } + + # Create new DexScreener connector + db_connector = SearchSourceConnector( + name="DexScreener Connector", + connector_type=SearchSourceConnectorType.DEXSCREENER_CONNECTOR, + config={"tokens": tokens_config}, + search_space_id=request.space_id, + user_id=user.id, + is_indexable=True, + ) + + session.add(db_connector) + await session.commit() + await session.refresh(db_connector) + + logger.info( + f"Successfully created DexScreener connector for user {user.id} with ID {db_connector.id}" + ) + + return { + "message": "DexScreener connector added successfully", + "connector_id": db_connector.id, + "connector_type": "DEXSCREENER_CONNECTOR", + "tokens_count": len(tokens_config), + } + + except IntegrityError as e: + await session.rollback() + logger.error(f"Database integrity error: {e!s}") + raise HTTPException( + status_code=409, + detail="A DexScreener connector already exists for this user.", + ) from e + except Exception as e: + await session.rollback() + logger.error(f"Unexpected error adding DexScreener connector: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to add DexScreener connector: {e!s}", + ) from e + + +@router.delete("/connectors/dexscreener") +async def delete_dexscreener_connector( + space_id: int, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +): + """ + Delete the DexScreener connector for the authenticated user in a specific search space. + + Args: + space_id: Search space ID + user: Current authenticated user + session: Database session + + Returns: + Success message + + Raises: + HTTPException: If connector doesn't exist + """ + try: + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DEXSCREENER_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + raise HTTPException( + status_code=404, + detail="DexScreener connector not found for this user.", + ) + + await session.delete(connector) + await session.commit() + + logger.info(f"Successfully deleted DexScreener connector for user {user.id}") + + return {"message": "DexScreener connector deleted successfully"} + + except HTTPException: + raise + except Exception as e: + await session.rollback() + logger.error(f"Unexpected error deleting DexScreener connector: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to delete DexScreener connector: {e!s}", + ) from e + + +@router.get("/connectors/dexscreener/test") +async def test_dexscreener_connector( + space_id: int, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +): + """ + Test the DexScreener connector for the authenticated user in a specific search space. + + Args: + space_id: Search space ID + user: Current authenticated user + session: Database session + + Returns: + Test results including token count and sample pair data + + Raises: + HTTPException: If connector doesn't exist or test fails + """ + try: + # Get the DexScreener connector for this search space and user + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DEXSCREENER_CONNECTOR, + ) + ) + connector = result.scalars().first() + + if not connector: + raise HTTPException( + status_code=404, + detail="DexScreener connector not found. Please add a connector first.", + ) + + # Import DexScreenerConnector + from app.connectors.dexscreener_connector import DexScreenerConnector + + # Initialize the connector + tokens = connector.config.get("tokens", []) + if not tokens: + raise HTTPException( + status_code=400, + detail="Invalid connector configuration: No tokens configured.", + ) + + dexscreener = DexScreenerConnector() + + # Test the connection by fetching pairs for the first token + first_token = tokens[0] + chain = first_token.get("chain") + address = first_token.get("address") + token_name = first_token.get("name", "Unknown") + + if not chain or not address: + raise HTTPException( + status_code=400, + detail="Invalid token configuration: Missing chain or address.", + ) + + # Try to fetch pairs for the first token + pairs, error = await dexscreener.get_token_pairs(chain, address) + + if error: + raise HTTPException( + status_code=400, + detail=f"Failed to connect to DexScreener: {error}", + ) + + # Get sample pair info if available + sample_pair = None + if pairs and len(pairs) > 0: + pair = pairs[0] + base_token = pair.get("baseToken", {}) + quote_token = pair.get("quoteToken", {}) + sample_pair = { + "pair_address": pair.get("pairAddress"), + "base_symbol": base_token.get("symbol", "Unknown"), + "quote_symbol": quote_token.get("symbol", "Unknown"), + "dex": pair.get("dexId", "Unknown"), + "price_usd": pair.get("priceUsd", "N/A"), + "liquidity_usd": pair.get("liquidity", {}).get("usd", 0), + } + + return { + "message": "DexScreener connector is working correctly", + "tokens_configured": len(tokens), + "test_token": { + "name": token_name, + "chain": chain, + "address": address, + }, + "pairs_found": len(pairs) if pairs else 0, + "sample_pair": sample_pair, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error testing DexScreener connector: {e!s}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to test DexScreener connector: {e!s}", + ) from e diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index d0710d246..3c649f18d 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -846,3 +846,47 @@ async def _index_composio_connector( await run_composio_indexing( session, connector_id, search_space_id, user_id, start_date, end_date ) + + +@celery_app.task(name="index_dexscreener_pairs", bind=True) +def index_dexscreener_pairs_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Celery task to index DexScreener trading pairs.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_dexscreener_pairs( + connector_id, search_space_id, user_id, start_date, end_date + ) + ) + finally: + loop.close() + + +async def _index_dexscreener_pairs( + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str, + end_date: str, +): + """Index DexScreener pairs with new session.""" + from app.tasks.connector_indexers.dexscreener_indexer import ( + index_dexscreener_pairs, + ) + + async with get_celery_session_maker()() as session: + await index_dexscreener_pairs( + session, connector_id, search_space_id, user_id, start_date, end_date + ) + diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 9a1d17fd5..9a5fa4b69 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -43,6 +43,7 @@ from .jira_indexer import index_jira_issues # Issue tracking and project management from .linear_indexer import index_linear_issues from .luma_indexer import index_luma_events +from .dexscreener_indexer import index_dexscreener_pairs # Documentation and knowledge management from .notion_indexer import index_notion_pages @@ -74,4 +75,5 @@ __all__ = [ # noqa: RUF022 # Communication platforms "index_slack_messages", "index_google_gmail_messages", + "index_dexscreener_pairs", ] diff --git a/surfsense_backend/app/tasks/connector_indexers/dexscreener_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dexscreener_indexer.py new file mode 100644 index 000000000..5fa2a82ed --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/dexscreener_indexer.py @@ -0,0 +1,417 @@ +""" +DexScreener connector indexer. +""" + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import config +from app.connectors.dexscreener_connector import DexScreenerConnector +from app.db import Document, DocumentType, SearchSourceConnectorType +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +from .base import ( + check_document_by_unique_identifier, + check_duplicate_document_by_hash, + get_connector_by_id, + get_current_timestamp, + logger, + update_connector_last_indexed, +) + + +async def index_dexscreener_pairs( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None = None, + end_date: str | None = None, + update_last_indexed: bool = True, +) -> tuple[int, str | None]: + """ + Index DexScreener trading pairs. + + Args: + session: Database session + connector_id: ID of the DexScreener connector + search_space_id: ID of the search space to store documents in + user_id: User ID + start_date: Not used for DexScreener (included for consistency with other indexers) + end_date: Not used for DexScreener (included for consistency with other indexers) + update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + + Returns: + Tuple containing (number of documents indexed, error message or None) + """ + task_logger = TaskLoggingService(session, search_space_id) + + # Log task start + log_entry = await task_logger.log_task_start( + task_name="dexscreener_pairs_indexing", + source="connector_indexing_task", + message=f"Starting DexScreener pairs indexing for connector {connector_id}", + metadata={ + "connector_id": connector_id, + "user_id": str(user_id), + }, + ) + + try: + # Get the connector + await task_logger.log_task_progress( + log_entry, + f"Retrieving DexScreener connector {connector_id} from database", + {"stage": "connector_retrieval"}, + ) + + # Get the connector from the database + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.DEXSCREENER_CONNECTOR + ) + + if not connector: + await task_logger.log_task_failure( + log_entry, + f"Connector with ID {connector_id} not found or is not a DexScreener connector", + "Connector not found", + {"error_type": "ConnectorNotFound"}, + ) + return ( + 0, + f"Connector with ID {connector_id} not found or is not a DexScreener connector", + ) + + # Get the tokens list from the connector config + tokens = connector.config.get("tokens", []) + + if not tokens: + await task_logger.log_task_failure( + log_entry, + f"No tokens configured for connector {connector_id}", + "Missing token configuration", + {"error_type": "MissingConfiguration"}, + ) + return 0, "No tokens configured for connector" + + logger.info(f"Starting DexScreener indexing for connector {connector_id} with {len(tokens)} tokens") + + # Initialize DexScreener client + await task_logger.log_task_progress( + log_entry, + f"Initializing DexScreener client for connector {connector_id}", + {"stage": "client_initialization"}, + ) + + dexscreener_client = DexScreenerConnector() + + documents_indexed = 0 + documents_skipped = 0 + skipped_pairs = [] + batch_size = 10 # Commit every 10 documents for performance + + # Process each tracked token + for token_idx, token in enumerate(tokens): + try: + chain = token.get("chain") + address = token.get("address") + token_name = token.get("name", "") + + if not chain or not address: + logger.warning(f"Skipping token with missing chain or address: {token}") + continue + + await task_logger.log_task_progress( + log_entry, + f"Fetching pairs for {token_name or address} on {chain} ({token_idx + 1}/{len(tokens)})", + { + "stage": "fetching_pairs", + "token": token_name or address, + "chain": chain, + "progress": f"{token_idx + 1}/{len(tokens)}", + }, + ) + + # Get trading pairs for this token + pairs, error = await dexscreener_client.get_token_pairs(chain, address) + + if error: + logger.warning(f"Error fetching pairs for {chain}/{address}: {error}") + skipped_pairs.append(f"{token_name or address} ({error})") + continue + + if not pairs: + logger.info(f"No pairs found for {chain}/{address}") + continue + + logger.info(f"Retrieved {len(pairs)} pairs for {token_name or address} on {chain}") + + # Process each pair + for pair in pairs: + try: + pair_address = pair.get("pairAddress") + + if not pair_address: + logger.warning(f"Skipping pair with missing pairAddress") + documents_skipped += 1 + continue + + # Format pair to markdown + pair_markdown = dexscreener_client.format_pair_to_markdown(pair, token_name) + + if not pair_markdown.strip(): + logger.warning(f"Skipping pair with no content: {pair_address}") + documents_skipped += 1 + continue + + # Extract pair metadata + base_token = pair.get("baseToken", {}) + quote_token = pair.get("quoteToken", {}) + base_symbol = base_token.get("symbol", "Unknown") + quote_symbol = quote_token.get("symbol", "Unknown") + dex_id = pair.get("dexId", "Unknown") + price_usd = pair.get("priceUsd", "N/A") + liquidity_usd = pair.get("liquidity", {}).get("usd", 0) + volume_24h = pair.get("volume", {}).get("h24", 0) + price_change_24h = pair.get("priceChange", {}).get("h24", 0) + + # Generate unique identifier hash for this pair + # Use chain + pair_address as unique identifier + unique_id = f"{chain}:{pair_address}" + unique_identifier_hash = generate_unique_identifier_hash( + DocumentType.DEXSCREENER_CONNECTOR, unique_id, search_space_id + ) + + # Generate content hash + content_hash = generate_content_hash(pair_markdown, search_space_id) + + # Check if document with this unique identifier already exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + if existing_document: + # Document exists - check if content has changed + if existing_document.content_hash == content_hash: + logger.info( + f"Document for pair {base_symbol}/{quote_symbol} unchanged. Skipping." + ) + documents_skipped += 1 + continue + else: + # Content has changed - update the existing document + logger.info( + f"Content changed for pair {base_symbol}/{quote_symbol}. Updating document." + ) + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "pair_address": pair_address, + "chain_id": chain, + "dex": dex_id, + "base_symbol": base_symbol, + "quote_symbol": quote_symbol, + "price_usd": price_usd, + "liquidity_usd": liquidity_usd, + "volume_24h": volume_24h, + "price_change_24h": price_change_24h, + "document_type": "DexScreener Trading Pair", + "connector_type": "DexScreener", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + pair_markdown, user_llm, document_metadata + ) + else: + summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n" + summary_content += f"Chain: {chain}\n" + summary_content += f"DEX: {dex_id}\n" + summary_content += f"Pair Address: {pair_address}\n" + summary_content += f"Price (USD): ${price_usd}\n" + summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n" + summary_content += f"24h Volume: ${volume_24h:,.2f}\n" + summary_content += f"24h Change: {price_change_24h:+.2f}%\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(pair_markdown) + + # Update existing document + existing_document.title = f"DexScreener - {base_symbol}/{quote_symbol} on {chain}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "pair_address": pair_address, + "chain_id": chain, + "dex": dex_id, + "base_symbol": base_symbol, + "quote_symbol": quote_symbol, + "price_usd": price_usd, + "liquidity_usd": liquidity_usd, + "volume_24h": volume_24h, + "price_change_24h": price_change_24h, + "token_name": token_name, + "token_address": address, + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + documents_indexed += 1 + logger.info(f"Updated document for pair {base_symbol}/{quote_symbol}") + + else: + # New document - create it + logger.info(f"Creating new document for pair {base_symbol}/{quote_symbol}") + + # Generate summary with metadata + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "pair_address": pair_address, + "chain_id": chain, + "dex": dex_id, + "base_symbol": base_symbol, + "quote_symbol": quote_symbol, + "price_usd": price_usd, + "liquidity_usd": liquidity_usd, + "volume_24h": volume_24h, + "price_change_24h": price_change_24h, + "document_type": "DexScreener Trading Pair", + "connector_type": "DexScreener", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + pair_markdown, user_llm, document_metadata + ) + else: + summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n" + summary_content += f"Chain: {chain}\n" + summary_content += f"DEX: {dex_id}\n" + summary_content += f"Pair Address: {pair_address}\n" + summary_content += f"Price (USD): ${price_usd}\n" + summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n" + summary_content += f"24h Volume: ${volume_24h:,.2f}\n" + summary_content += f"24h Change: {price_change_24h:+.2f}%\n" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + # Process chunks + chunks = await create_document_chunks(pair_markdown) + + # Create new document + new_document = Document( + title=f"DexScreener - {base_symbol}/{quote_symbol} on {chain}", + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + document_type=DocumentType.DEXSCREENER_CONNECTOR, + document_metadata={ + "pair_address": pair_address, + "chain_id": chain, + "dex": dex_id, + "base_symbol": base_symbol, + "quote_symbol": quote_symbol, + "price_usd": price_usd, + "liquidity_usd": liquidity_usd, + "volume_24h": volume_24h, + "price_change_24h": price_change_24h, + "token_name": token_name, + "token_address": address, + }, + chunks=chunks, + search_space_id=search_space_id, + created_at=get_current_timestamp(), + updated_at=get_current_timestamp(), + ) + + session.add(new_document) + documents_indexed += 1 + logger.info(f"Created new document for pair {base_symbol}/{quote_symbol}") + + # Batch commit every N documents + if documents_indexed % batch_size == 0: + await session.commit() + logger.info(f"Committed batch of {batch_size} documents") + + except Exception as e: + logger.error(f"Error processing pair {pair.get('pairAddress', 'unknown')}: {e!s}", exc_info=True) + documents_skipped += 1 + continue + + except Exception as e: + logger.error(f"Error processing token {token.get('name', token.get('address', 'unknown'))}: {e!s}", exc_info=True) + continue + + # Final commit for any remaining documents + if documents_indexed % batch_size != 0: + await session.commit() + logger.info(f"Committed final batch of documents") + + # Update last_indexed_at timestamp + if update_last_indexed: + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + # Log task completion + await task_logger.log_task_success( + log_entry, + f"Successfully indexed {documents_indexed} DexScreener pairs", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "tokens_processed": len(tokens), + }, + ) + + logger.info( + f"DexScreener indexing completed: {documents_indexed} documents indexed, {documents_skipped} skipped" + ) + + return documents_indexed, None + + except SQLAlchemyError as e: + await session.rollback() + logger.error(f"Database error during DexScreener indexing: {e!s}", exc_info=True) + await task_logger.log_task_failure( + log_entry, + f"Database error: {e!s}", + "Database Error", + {"error_type": "DatabaseError"}, + ) + return 0, f"Database error: {e!s}" + + except Exception as e: + await session.rollback() + logger.error(f"Unexpected error during DexScreener indexing: {e!s}", exc_info=True) + await task_logger.log_task_failure( + log_entry, + f"Unexpected error: {e!s}", + "Unexpected Error", + {"error_type": "UnexpectedError"}, + ) + return 0, f"Unexpected error: {e!s}"