fix(rag): Add DEXSCREENER_CONNECTOR to searchable mapping

- Added DEXSCREENER_CONNECTOR to _CONNECTOR_TYPE_TO_SEARCHABLE in chat_deepagent.py
- This fixes LLM's inability to search DexScreener data despite it being indexed
- Root cause: connector was enabled in DB but missing from mapping, causing it to be filtered out
- Verified: LLM now successfully retrieves WETH price (~$2,442) with DexScreener citations

Related files:
- chat_deepagent.py: Added connector mapping
- knowledge_base.py: Added debug logging for DexScreener search
- connector_service.py: Fixed metadata field names (base_symbol, quote_symbol, dex)
- 85_add_dexscreener_connector.py: Migration for connector type
This commit is contained in:
API Test Bot 2026-02-01 14:20:34 +07:00
parent d654556eb8
commit b5d0413459
6 changed files with 170 additions and 4 deletions

View file

@ -17,13 +17,16 @@ depends_on = None
def upgrade(): def upgrade():
"""Add DEXSCREENER_CONNECTOR to searchsourceconnectortype enum.""" """Add DEXSCREENER_CONNECTOR to searchsourceconnectortype and documenttype enums."""
# Add new enum value using raw SQL # Add new enum value using raw SQL
# Note: ALTER TYPE ... ADD VALUE cannot be executed inside a transaction block # Note: ALTER TYPE ... ADD VALUE cannot be executed inside a transaction block
# Alembic handles this automatically when using op.execute() # Alembic handles this automatically when using op.execute()
op.execute( op.execute(
"ALTER TYPE searchsourceconnectortype ADD VALUE IF NOT EXISTS 'DEXSCREENER_CONNECTOR'" "ALTER TYPE searchsourceconnectortype ADD VALUE IF NOT EXISTS 'DEXSCREENER_CONNECTOR'"
) )
op.execute(
"ALTER TYPE documenttype ADD VALUE IF NOT EXISTS 'DEXSCREENER_CONNECTOR'"
)
def downgrade(): def downgrade():

View file

@ -59,6 +59,8 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
# Cryptocurrency data
"DEXSCREENER_CONNECTOR": "DEXSCREENER_CONNECTOR",
} }
# Document types that don't come from SearchSourceConnector but should always be searchable # Document types that don't come from SearchSourceConnector but should always be searchable

View file

@ -52,6 +52,7 @@ _ALL_CONNECTORS: list[str] = [
"CRAWLED_URL", "CRAWLED_URL",
"CIRCLEBACK", "CIRCLEBACK",
"OBSIDIAN_CONNECTOR", "OBSIDIAN_CONNECTOR",
"DEXSCREENER_CONNECTOR",
# Composio connectors # Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR",
@ -89,6 +90,7 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = {
"BOOKSTACK_CONNECTOR": "BookStack pages (personal documentation)", "BOOKSTACK_CONNECTOR": "BookStack pages (personal documentation)",
"CIRCLEBACK": "Circleback meeting notes, transcripts, and action items", "CIRCLEBACK": "Circleback meeting notes, transcripts, and action items",
"OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)", "OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)",
"DEXSCREENER_CONNECTOR": "DexScreener real-time cryptocurrency trading pair data and market information",
# Composio connectors # Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)", "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)",
"COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)", "COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)",
@ -610,6 +612,19 @@ async def search_knowledge_base_async(
) )
all_documents.extend(chunks) all_documents.extend(chunks)
elif connector == "DEXSCREENER_CONNECTOR":
_, chunks = await connector_service.search_dexscreener(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
print(f"[DEBUG] DexScreener search returned {len(chunks)} chunks")
if chunks:
print(f"[DEBUG] First chunk metadata: {chunks[0].get('document', {}).get('metadata', {})}")
all_documents.extend(chunks)
# ========================================================= # =========================================================
# Composio Connectors # Composio Connectors
# ========================================================= # =========================================================

View file

@ -23,7 +23,7 @@ class DexScreenerConnector:
Note: DexScreener API is public and doesn't require authentication. Note: DexScreener API is public and doesn't require authentication.
""" """
self.base_url = "https://api.dexscreener.com/latest/dex" self.base_url = "https://api.dexscreener.com"
self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits
async def make_request( async def make_request(
@ -106,13 +106,18 @@ class DexScreenerConnector:
Tuple containing (list of pairs, error message or None) Tuple containing (list of pairs, error message or None)
""" """
try: try:
endpoint = f"tokens/{chain_id}/{token_address}" endpoint = f"token-pairs/v1/{chain_id}/{token_address}"
response = await self.make_request(endpoint) response = await self.make_request(endpoint)
if response is None: if response is None:
return [], f"Token not found: {chain_id}/{token_address}" return [], f"Token not found: {chain_id}/{token_address}"
pairs = response.get("pairs", []) # DexScreener API returns {"pairs": [...]} or {"pairs": null}
if isinstance(response, dict):
pairs = response.get("pairs", [])
else:
# Fallback if API returns list directly (shouldn't happen)
pairs = response if isinstance(response, list) else []
if not pairs: if not pairs:
return [], f"No trading pairs found for {chain_id}/{token_address}" return [], f"No trading pairs found for {chain_id}/{token_address}"
@ -122,6 +127,7 @@ class DexScreenerConnector:
except Exception as e: except Exception as e:
return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}" return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}"
def format_pair_to_markdown( def format_pair_to_markdown(
self, self,
pair: dict[str, Any], pair: dict[str, Any],

View file

@ -905,6 +905,17 @@ async def index_connector_content(
) )
response_message = "Luma indexing started in the background." response_message = "Luma indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DEXSCREENER_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import index_dexscreener_pairs_task
logger.info(
f"Triggering DexScreener indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_dexscreener_pairs_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "DexScreener indexing started in the background."
elif ( elif (
connector.connector_type connector.connector_type
== SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR == SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR

View file

@ -2872,6 +2872,135 @@ class ConnectorService:
return result_object, obsidian_docs return result_object, obsidian_docs
async def search_dexscreener(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for DexScreener cryptocurrency trading pair data and return both the source information and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
dexscreener_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="DEXSCREENER_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not dexscreener_docs:
return {
"id": 54,
"name": "DexScreener",
"type": "DEXSCREENER_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
# Extract token and chain info from metadata
base_token = metadata.get("base_symbol", "")
quote_token = metadata.get("quote_symbol", "")
chain = metadata.get("chain_id", "")
dex = metadata.get("dex", "")
if base_token and quote_token:
title = f"{base_token}/{quote_token}"
if chain:
title += f" on {chain.capitalize()}"
if dex:
title += f" ({dex})"
return title
return doc_info.get("title", "DexScreener Trading Pair")
def _url_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
# DexScreener URL format: https://dexscreener.com/{chain}/{pair_address}
chain = metadata.get("chain_id", "")
pair_address = metadata.get("pair_address", "")
if chain and pair_address:
return f"https://dexscreener.com/{chain}/{pair_address}"
return ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
# Build a rich description with price and volume info
description_parts = []
price_usd = metadata.get("price_usd")
if price_usd:
description_parts.append(f"Price: ${price_usd}")
volume_24h = metadata.get("volume_24h")
if volume_24h:
description_parts.append(f"24h Volume: ${volume_24h}")
price_change_24h = metadata.get("price_change_24h")
if price_change_24h is not None:
sign = "+" if price_change_24h > 0 else ""
description_parts.append(f"24h Change: {sign}{price_change_24h}%")
liquidity_usd = metadata.get("liquidity_usd")
if liquidity_usd:
description_parts.append(f"Liquidity: ${liquidity_usd}")
if description_parts:
return " | ".join(description_parts)
# Fallback to chunk content preview
return self._chunk_preview(chunk.get("content", ""), limit=200)
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"chain_id": metadata.get("chain_id", ""),
"dex": metadata.get("dex", ""),
"pair_address": metadata.get("pair_address", ""),
"base_symbol": metadata.get("base_symbol", ""),
"quote_symbol": metadata.get("quote_symbol", ""),
"price_usd": metadata.get("price_usd"),
"volume_24h": metadata.get("volume_24h"),
"price_change_24h": metadata.get("price_change_24h"),
"liquidity_usd": metadata.get("liquidity_usd"),
}
sources_list = self._build_chunk_sources_from_documents(
dexscreener_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 54,
"name": "DexScreener",
"type": "DEXSCREENER_CONNECTOR",
"sources": sources_list,
}
return result_object, dexscreener_docs
# ========================================================================= # =========================================================================
# Composio Connector Search Methods # Composio Connector Search Methods
# ========================================================================= # =========================================================================