feat(backend): Implement DexScreener connector (Story 1.1)

Core Implementation:
- Add DexScreenerConnector class with public API integration
- Implement token pair data fetching and indexing
- Add API routes: add, delete, test endpoints
- Register connector in task indexers and Celery tasks
- Add DEXSCREENER_CONNECTOR enum to database models

Features:
- Support up to 50 tokens per connector
- Track prices, volume, liquidity across multiple DEXs
- EVM and Solana address validation
- Periodic sync support
- No API key required (public DexScreener API)

API Endpoints:
- POST /api/v1/connectors/dexscreener/add
- DELETE /api/v1/connectors/dexscreener
- GET /api/v1/connectors/dexscreener/test

All endpoints require JWT authentication 
Integration tests passing 
Ready for production deployment 
This commit is contained in:
API Test Bot 2026-01-31 17:25:48 +07:00
parent 8fec08edcd
commit 9f66d5ca25
7 changed files with 1031 additions and 0 deletions

View file

@ -0,0 +1,252 @@
"""
DexScreener Connector Module
A module for retrieving cryptocurrency trading pair data from DexScreener API.
Allows fetching pair information for tracked tokens across multiple blockchain networks.
"""
import asyncio
import logging
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class DexScreenerConnector:
"""Class for retrieving trading pair data from DexScreener API."""
def __init__(self):
"""
Initialize the DexScreenerConnector class.
Note: DexScreener API is public and doesn't require authentication.
"""
self.base_url = "https://api.dexscreener.com/latest/dex"
self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits
async def make_request(
self,
endpoint: str,
max_retries: int = 3
) -> dict[str, Any] | None:
"""
Make an async request to the DexScreener API with retry logic.
Args:
endpoint: API endpoint path (without base URL)
max_retries: Maximum number of retry attempts for failed requests
Returns:
Response data from the API, or None if request fails
Raises:
Exception: If the API request fails after all retries
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
if response.status_code == 200:
# Add delay to respect rate limits
await self._rate_limit_delay()
return response.json()
elif response.status_code == 429:
# Rate limit exceeded - exponential backoff
wait_time = (2 ** attempt) * 1.0 # 1s, 2s, 4s
logger.warning(f"Rate limit exceeded. Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
elif response.status_code == 404:
# Token/pair not found - return None instead of raising
logger.info(f"Token not found: {endpoint}")
return None
else:
raise Exception(
f"API request failed with status code {response.status_code}: {response.text}"
)
except httpx.TimeoutException:
if attempt < max_retries - 1:
logger.warning(f"Request timeout. Retrying... (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"Request timeout after {max_retries} attempts")
except httpx.RequestError as e:
if attempt < max_retries - 1:
logger.warning(f"Network error: {e}. Retrying... (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"Network error after {max_retries} attempts: {e}") from e
return None
async def _rate_limit_delay(self):
"""Add delay to respect API rate limits (300 req/min = ~200ms between requests)."""
import asyncio
await asyncio.sleep(self.rate_limit_delay)
async def get_token_pairs(
self,
chain_id: str,
token_address: str
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch all trading pairs for a specific token on a blockchain.
Args:
chain_id: Blockchain identifier (e.g., 'ethereum', 'bsc', 'polygon')
token_address: Token contract address (0x format)
Returns:
Tuple containing (list of pairs, error message or None)
"""
try:
endpoint = f"tokens/{chain_id}/{token_address}"
response = await self.make_request(endpoint)
if response is None:
return [], f"Token not found: {chain_id}/{token_address}"
pairs = response.get("pairs", [])
if not pairs:
return [], f"No trading pairs found for {chain_id}/{token_address}"
return pairs, None
except Exception as e:
return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}"
def format_pair_to_markdown(
self,
pair: dict[str, Any],
token_name: str | None = None
) -> str:
"""
Convert a trading pair to markdown format.
Args:
pair: The pair object from DexScreener API
token_name: Optional custom name for the token
Returns:
Markdown string representation of the trading pair
"""
# Extract pair details
pair_address = pair.get("pairAddress", "Unknown")
chain_id = pair.get("chainId", "Unknown")
dex_id = pair.get("dexId", "Unknown")
url = pair.get("url", "")
# Extract token information
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
base_symbol = base_token.get("symbol", "Unknown")
base_name = token_name or base_token.get("name", "Unknown")
quote_symbol = quote_token.get("symbol", "Unknown")
# Extract price and volume data
price_native = pair.get("priceNative", "N/A")
price_usd = pair.get("priceUsd", "N/A")
# Extract liquidity data
liquidity = pair.get("liquidity", {})
liquidity_usd = liquidity.get("usd", 0)
# Extract volume data
volume = pair.get("volume", {})
volume_24h = volume.get("h24", 0)
volume_6h = volume.get("h6", 0)
volume_1h = volume.get("h1", 0)
# Extract price change data
price_change = pair.get("priceChange", {})
price_change_24h = price_change.get("h24", 0)
# Extract market cap and FDV
market_cap = pair.get("marketCap", 0)
fdv = pair.get("fdv", 0)
# Extract transaction counts
txns = pair.get("txns", {})
txns_24h = txns.get("h24", {})
buys_24h = txns_24h.get("buys", 0)
sells_24h = txns_24h.get("sells", 0)
# Build markdown content
markdown_content = f"# {base_symbol}/{quote_symbol} Trading Pair\n\n"
if token_name:
markdown_content += f"**Token:** {base_name} ({base_symbol})\n"
markdown_content += f"**Chain:** {chain_id}\n"
markdown_content += f"**DEX:** {dex_id}\n"
markdown_content += f"**Pair Address:** `{pair_address}`\n\n"
# Add price information
markdown_content += "## Price Information\n\n"
markdown_content += f"- **Price (USD):** ${price_usd}\n"
markdown_content += f"- **Price (Native):** {price_native} {quote_symbol}\n"
markdown_content += f"- **24h Change:** {price_change_24h:+.2f}%\n\n"
# Add liquidity information
markdown_content += "## Liquidity\n\n"
markdown_content += f"- **Total Liquidity:** ${liquidity_usd:,.2f}\n\n"
# Add volume information
markdown_content += "## Trading Volume\n\n"
markdown_content += f"- **24h Volume:** ${volume_24h:,.2f}\n"
markdown_content += f"- **6h Volume:** ${volume_6h:,.2f}\n"
markdown_content += f"- **1h Volume:** ${volume_1h:,.2f}\n\n"
# Add market metrics
markdown_content += "## Market Metrics\n\n"
markdown_content += f"- **Market Cap:** ${market_cap:,.2f}\n"
markdown_content += f"- **FDV (Fully Diluted Valuation):** ${fdv:,.2f}\n\n"
# Add transaction information
markdown_content += "## Transactions (24h)\n\n"
markdown_content += f"- **Buys:** {buys_24h}\n"
markdown_content += f"- **Sells:** {sells_24h}\n"
markdown_content += f"- **Total:** {buys_24h + sells_24h}\n\n"
# Add link to DexScreener
if url:
markdown_content += f"**View on DexScreener:** {url}\n\n"
return markdown_content
# Example usage (uncomment to use):
"""
import asyncio
async def main():
connector = DexScreenerConnector()
# Example: Fetch WETH pairs on Ethereum
chain = "ethereum"
address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
pairs, error = await connector.get_token_pairs(chain, address)
if error:
print(f"Error: {error}")
else:
print(f"Found {len(pairs)} pairs for WETH")
# Format first pair to markdown
if pairs:
markdown = connector.format_pair_to_markdown(pairs[0], "Wrapped Ether")
print("\nSample Pair in Markdown:\n")
print(markdown)
if __name__ == "__main__":
asyncio.run(main())
"""

View file

@ -50,6 +50,7 @@ class DocumentType(str, Enum):
GOOGLE_DRIVE_FILE = "GOOGLE_DRIVE_FILE"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
LUMA_CONNECTOR = "LUMA_CONNECTOR"
DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR"
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
CIRCLEBACK = "CIRCLEBACK"
@ -80,6 +81,7 @@ class SearchSourceConnectorType(str, Enum):
GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
LUMA_CONNECTOR = "LUMA_CONNECTOR"
DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR"
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR"
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"

View file

@ -25,6 +25,7 @@ from .jira_add_connector_route import router as jira_add_connector_router
from .linear_add_connector_route import router as linear_add_connector_router
from .logs_routes import router as logs_router
from .luma_add_connector_route import router as luma_add_connector_router
from .dexscreener_add_connector_route import router as dexscreener_add_connector_router
from .new_chat_routes import router as new_chat_router
from .new_llm_config_routes import router as new_llm_config_router
from .notes_routes import router as notes_router
@ -56,6 +57,7 @@ router.include_router(google_drive_add_connector_router)
router.include_router(airtable_add_connector_router)
router.include_router(linear_add_connector_router)
router.include_router(luma_add_connector_router)
router.include_router(dexscreener_add_connector_router)
router.include_router(notion_add_connector_router)
router.include_router(slack_add_connector_router)
router.include_router(teams_add_connector_router)

View file

@ -0,0 +1,312 @@
import logging
import re
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.users import current_active_user
logger = logging.getLogger(__name__)
router = APIRouter()
class TokenConfig(BaseModel):
"""Configuration for a single token to track."""
chain: str = Field(..., description="Blockchain network (e.g., ethereum, bsc, solana)", pattern=r"^[a-z0-9-]+$")
address: str = Field(..., description="Token contract address")
name: str | None = Field(None, description="Optional token name for display")
@field_validator("address")
@classmethod
def validate_address(cls, v: str) -> str:
"""Validate token address format (EVM or Solana)."""
# EVM address: 0x + 40 hex characters
if v.startswith("0x"):
if not re.match(r"^0x[a-fA-F0-9]{40}$", v):
raise ValueError("Invalid EVM address format. Must be 0x followed by 40 hex characters.")
return v
# Solana address: 32-44 base58 characters
if len(v) < 32 or len(v) > 44:
raise ValueError("Invalid Solana address format. Must be 32-44 characters.")
# Allow base58 chars only for Solana
if not re.match(r"^[1-9A-HJ-NP-Za-km-z]{32,44}$", v):
raise ValueError("Invalid Solana address format. Contains invalid characters.")
return v
class AddDexScreenerConnectorRequest(BaseModel):
"""Request model for adding a DexScreener connector."""
tokens: list[TokenConfig] = Field(
..., description="List of tokens to track (max 50)", min_length=1, max_length=50
)
space_id: int = Field(..., description="Search space ID")
@router.post("/connectors/dexscreener/add")
async def add_dexscreener_connector(
request: AddDexScreenerConnectorRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Add a new DexScreener connector for the authenticated user.
Args:
request: The request containing tokens configuration and space_id
user: Current authenticated user
session: Database session
Returns:
Success message and connector details
Raises:
HTTPException: If connector already exists or validation fails
"""
try:
# Check if a DexScreener connector already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == request.space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
existing_connector = result.scalars().first()
# Convert tokens to dict format for storage
tokens_config = [token.model_dump() for token in request.tokens]
if existing_connector:
# Update existing connector with new tokens
existing_connector.config = {"tokens": tokens_config}
existing_connector.is_indexable = True
await session.commit()
await session.refresh(existing_connector)
logger.info(
f"Updated existing DexScreener connector for user {user.id} in space {request.space_id}"
)
return {
"message": "DexScreener connector updated successfully",
"connector_id": existing_connector.id,
"connector_type": "DEXSCREENER_CONNECTOR",
"tokens_count": len(tokens_config),
}
# Create new DexScreener connector
db_connector = SearchSourceConnector(
name="DexScreener Connector",
connector_type=SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
config={"tokens": tokens_config},
search_space_id=request.space_id,
user_id=user.id,
is_indexable=True,
)
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
logger.info(
f"Successfully created DexScreener connector for user {user.id} with ID {db_connector.id}"
)
return {
"message": "DexScreener connector added successfully",
"connector_id": db_connector.id,
"connector_type": "DEXSCREENER_CONNECTOR",
"tokens_count": len(tokens_config),
}
except IntegrityError as e:
await session.rollback()
logger.error(f"Database integrity error: {e!s}")
raise HTTPException(
status_code=409,
detail="A DexScreener connector already exists for this user.",
) from e
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error adding DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to add DexScreener connector: {e!s}",
) from e
@router.delete("/connectors/dexscreener")
async def delete_dexscreener_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Delete the DexScreener connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
Returns:
Success message
Raises:
HTTPException: If connector doesn't exist
"""
try:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404,
detail="DexScreener connector not found for this user.",
)
await session.delete(connector)
await session.commit()
logger.info(f"Successfully deleted DexScreener connector for user {user.id}")
return {"message": "DexScreener connector deleted successfully"}
except HTTPException:
raise
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error deleting DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to delete DexScreener connector: {e!s}",
) from e
@router.get("/connectors/dexscreener/test")
async def test_dexscreener_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Test the DexScreener connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
Returns:
Test results including token count and sample pair data
Raises:
HTTPException: If connector doesn't exist or test fails
"""
try:
# Get the DexScreener connector for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404,
detail="DexScreener connector not found. Please add a connector first.",
)
# Import DexScreenerConnector
from app.connectors.dexscreener_connector import DexScreenerConnector
# Initialize the connector
tokens = connector.config.get("tokens", [])
if not tokens:
raise HTTPException(
status_code=400,
detail="Invalid connector configuration: No tokens configured.",
)
dexscreener = DexScreenerConnector()
# Test the connection by fetching pairs for the first token
first_token = tokens[0]
chain = first_token.get("chain")
address = first_token.get("address")
token_name = first_token.get("name", "Unknown")
if not chain or not address:
raise HTTPException(
status_code=400,
detail="Invalid token configuration: Missing chain or address.",
)
# Try to fetch pairs for the first token
pairs, error = await dexscreener.get_token_pairs(chain, address)
if error:
raise HTTPException(
status_code=400,
detail=f"Failed to connect to DexScreener: {error}",
)
# Get sample pair info if available
sample_pair = None
if pairs and len(pairs) > 0:
pair = pairs[0]
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
sample_pair = {
"pair_address": pair.get("pairAddress"),
"base_symbol": base_token.get("symbol", "Unknown"),
"quote_symbol": quote_token.get("symbol", "Unknown"),
"dex": pair.get("dexId", "Unknown"),
"price_usd": pair.get("priceUsd", "N/A"),
"liquidity_usd": pair.get("liquidity", {}).get("usd", 0),
}
return {
"message": "DexScreener connector is working correctly",
"tokens_configured": len(tokens),
"test_token": {
"name": token_name,
"chain": chain,
"address": address,
},
"pairs_found": len(pairs) if pairs else 0,
"sample_pair": sample_pair,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error testing DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to test DexScreener connector: {e!s}",
) from e

View file

@ -846,3 +846,47 @@ async def _index_composio_connector(
await run_composio_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_dexscreener_pairs", bind=True)
def index_dexscreener_pairs_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index DexScreener trading pairs."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_dexscreener_pairs(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_dexscreener_pairs(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index DexScreener pairs with new session."""
from app.tasks.connector_indexers.dexscreener_indexer import (
index_dexscreener_pairs,
)
async with get_celery_session_maker()() as session:
await index_dexscreener_pairs(
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -43,6 +43,7 @@ from .jira_indexer import index_jira_issues
# Issue tracking and project management
from .linear_indexer import index_linear_issues
from .luma_indexer import index_luma_events
from .dexscreener_indexer import index_dexscreener_pairs
# Documentation and knowledge management
from .notion_indexer import index_notion_pages
@ -74,4 +75,5 @@ __all__ = [ # noqa: RUF022
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
"index_dexscreener_pairs",
]

View file

@ -0,0 +1,417 @@
"""
DexScreener connector indexer.
"""
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.dexscreener_connector import DexScreenerConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
update_connector_last_indexed,
)
async def index_dexscreener_pairs(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index DexScreener trading pairs.
Args:
session: Database session
connector_id: ID of the DexScreener connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Not used for DexScreener (included for consistency with other indexers)
end_date: Not used for DexScreener (included for consistency with other indexers)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="dexscreener_pairs_indexing",
source="connector_indexing_task",
message=f"Starting DexScreener pairs indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
},
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving DexScreener connector {connector_id} from database",
{"stage": "connector_retrieval"},
)
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.DEXSCREENER_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a DexScreener connector",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return (
0,
f"Connector with ID {connector_id} not found or is not a DexScreener connector",
)
# Get the tokens list from the connector config
tokens = connector.config.get("tokens", [])
if not tokens:
await task_logger.log_task_failure(
log_entry,
f"No tokens configured for connector {connector_id}",
"Missing token configuration",
{"error_type": "MissingConfiguration"},
)
return 0, "No tokens configured for connector"
logger.info(f"Starting DexScreener indexing for connector {connector_id} with {len(tokens)} tokens")
# Initialize DexScreener client
await task_logger.log_task_progress(
log_entry,
f"Initializing DexScreener client for connector {connector_id}",
{"stage": "client_initialization"},
)
dexscreener_client = DexScreenerConnector()
documents_indexed = 0
documents_skipped = 0
skipped_pairs = []
batch_size = 10 # Commit every 10 documents for performance
# Process each tracked token
for token_idx, token in enumerate(tokens):
try:
chain = token.get("chain")
address = token.get("address")
token_name = token.get("name", "")
if not chain or not address:
logger.warning(f"Skipping token with missing chain or address: {token}")
continue
await task_logger.log_task_progress(
log_entry,
f"Fetching pairs for {token_name or address} on {chain} ({token_idx + 1}/{len(tokens)})",
{
"stage": "fetching_pairs",
"token": token_name or address,
"chain": chain,
"progress": f"{token_idx + 1}/{len(tokens)}",
},
)
# Get trading pairs for this token
pairs, error = await dexscreener_client.get_token_pairs(chain, address)
if error:
logger.warning(f"Error fetching pairs for {chain}/{address}: {error}")
skipped_pairs.append(f"{token_name or address} ({error})")
continue
if not pairs:
logger.info(f"No pairs found for {chain}/{address}")
continue
logger.info(f"Retrieved {len(pairs)} pairs for {token_name or address} on {chain}")
# Process each pair
for pair in pairs:
try:
pair_address = pair.get("pairAddress")
if not pair_address:
logger.warning(f"Skipping pair with missing pairAddress")
documents_skipped += 1
continue
# Format pair to markdown
pair_markdown = dexscreener_client.format_pair_to_markdown(pair, token_name)
if not pair_markdown.strip():
logger.warning(f"Skipping pair with no content: {pair_address}")
documents_skipped += 1
continue
# Extract pair metadata
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
base_symbol = base_token.get("symbol", "Unknown")
quote_symbol = quote_token.get("symbol", "Unknown")
dex_id = pair.get("dexId", "Unknown")
price_usd = pair.get("priceUsd", "N/A")
liquidity_usd = pair.get("liquidity", {}).get("usd", 0)
volume_24h = pair.get("volume", {}).get("h24", 0)
price_change_24h = pair.get("priceChange", {}).get("h24", 0)
# Generate unique identifier hash for this pair
# Use chain + pair_address as unique identifier
unique_id = f"{chain}:{pair_address}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DEXSCREENER_CONNECTOR, unique_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(pair_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for pair {base_symbol}/{quote_symbol} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for pair {base_symbol}/{quote_symbol}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"document_type": "DexScreener Trading Pair",
"connector_type": "DexScreener",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
pair_markdown, user_llm, document_metadata
)
else:
summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n"
summary_content += f"Chain: {chain}\n"
summary_content += f"DEX: {dex_id}\n"
summary_content += f"Pair Address: {pair_address}\n"
summary_content += f"Price (USD): ${price_usd}\n"
summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n"
summary_content += f"24h Volume: ${volume_24h:,.2f}\n"
summary_content += f"24h Change: {price_change_24h:+.2f}%\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(pair_markdown)
# Update existing document
existing_document.title = f"DexScreener - {base_symbol}/{quote_symbol} on {chain}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"token_name": token_name,
"token_address": address,
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(f"Updated document for pair {base_symbol}/{quote_symbol}")
else:
# New document - create it
logger.info(f"Creating new document for pair {base_symbol}/{quote_symbol}")
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"document_type": "DexScreener Trading Pair",
"connector_type": "DexScreener",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
pair_markdown, user_llm, document_metadata
)
else:
summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n"
summary_content += f"Chain: {chain}\n"
summary_content += f"DEX: {dex_id}\n"
summary_content += f"Pair Address: {pair_address}\n"
summary_content += f"Price (USD): ${price_usd}\n"
summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n"
summary_content += f"24h Volume: ${volume_24h:,.2f}\n"
summary_content += f"24h Change: {price_change_24h:+.2f}%\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(pair_markdown)
# Create new document
new_document = Document(
title=f"DexScreener - {base_symbol}/{quote_symbol} on {chain}",
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
document_type=DocumentType.DEXSCREENER_CONNECTOR,
document_metadata={
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"token_name": token_name,
"token_address": address,
},
chunks=chunks,
search_space_id=search_space_id,
created_at=get_current_timestamp(),
updated_at=get_current_timestamp(),
)
session.add(new_document)
documents_indexed += 1
logger.info(f"Created new document for pair {base_symbol}/{quote_symbol}")
# Batch commit every N documents
if documents_indexed % batch_size == 0:
await session.commit()
logger.info(f"Committed batch of {batch_size} documents")
except Exception as e:
logger.error(f"Error processing pair {pair.get('pairAddress', 'unknown')}: {e!s}", exc_info=True)
documents_skipped += 1
continue
except Exception as e:
logger.error(f"Error processing token {token.get('name', token.get('address', 'unknown'))}: {e!s}", exc_info=True)
continue
# Final commit for any remaining documents
if documents_indexed % batch_size != 0:
await session.commit()
logger.info(f"Committed final batch of documents")
# Update last_indexed_at timestamp
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Log task completion
await task_logger.log_task_success(
log_entry,
f"Successfully indexed {documents_indexed} DexScreener pairs",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"tokens_processed": len(tokens),
},
)
logger.info(
f"DexScreener indexing completed: {documents_indexed} documents indexed, {documents_skipped} skipped"
)
return documents_indexed, None
except SQLAlchemyError as e:
await session.rollback()
logger.error(f"Database error during DexScreener indexing: {e!s}", exc_info=True)
await task_logger.log_task_failure(
log_entry,
f"Database error: {e!s}",
"Database Error",
{"error_type": "DatabaseError"},
)
return 0, f"Database error: {e!s}"
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error during DexScreener indexing: {e!s}", exc_info=True)
await task_logger.log_task_failure(
log_entry,
f"Unexpected error: {e!s}",
"Unexpected Error",
{"error_type": "UnexpectedError"},
)
return 0, f"Unexpected error: {e!s}"