From b5d0413459a5502e650eb149119ef614db0dc9ff Mon Sep 17 00:00:00 2001 From: API Test Bot Date: Sun, 1 Feb 2026 14:20:34 +0700 Subject: [PATCH] fix(rag): Add DEXSCREENER_CONNECTOR to searchable mapping - Added DEXSCREENER_CONNECTOR to _CONNECTOR_TYPE_TO_SEARCHABLE in chat_deepagent.py - This fixes LLM's inability to search DexScreener data despite it being indexed - Root cause: connector was enabled in DB but missing from mapping, causing it to be filtered out - Verified: LLM now successfully retrieves WETH price (~$2,442) with DexScreener citations Related files: - chat_deepagent.py: Added connector mapping - knowledge_base.py: Added debug logging for DexScreener search - connector_service.py: Fixed metadata field names (base_symbol, quote_symbol, dex) - 85_add_dexscreener_connector.py: Migration for connector type --- .../versions/85_add_dexscreener_connector.py | 5 +- .../app/agents/new_chat/chat_deepagent.py | 2 + .../agents/new_chat/tools/knowledge_base.py | 15 ++ .../app/connectors/dexscreener_connector.py | 12 +- .../routes/search_source_connectors_routes.py | 11 ++ .../app/services/connector_service.py | 129 ++++++++++++++++++ 6 files changed, 170 insertions(+), 4 deletions(-) diff --git a/surfsense_backend/alembic/versions/85_add_dexscreener_connector.py b/surfsense_backend/alembic/versions/85_add_dexscreener_connector.py index 0f61b4a19..d65a733a8 100644 --- a/surfsense_backend/alembic/versions/85_add_dexscreener_connector.py +++ b/surfsense_backend/alembic/versions/85_add_dexscreener_connector.py @@ -17,13 +17,16 @@ depends_on = None def upgrade(): - """Add DEXSCREENER_CONNECTOR to searchsourceconnectortype enum.""" + """Add DEXSCREENER_CONNECTOR to searchsourceconnectortype and documenttype enums.""" # Add new enum value using raw SQL # Note: ALTER TYPE ... ADD VALUE cannot be executed inside a transaction block # Alembic handles this automatically when using op.execute() op.execute( "ALTER TYPE searchsourceconnectortype ADD VALUE IF NOT EXISTS 'DEXSCREENER_CONNECTOR'" ) + op.execute( + "ALTER TYPE documenttype ADD VALUE IF NOT EXISTS 'DEXSCREENER_CONNECTOR'" + ) def downgrade(): diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 9c383c308..6edea89f0 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -59,6 +59,8 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = { "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + # Cryptocurrency data + "DEXSCREENER_CONNECTOR": "DEXSCREENER_CONNECTOR", } # Document types that don't come from SearchSourceConnector but should always be searchable diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index a11e4ac38..de7a96330 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -52,6 +52,7 @@ _ALL_CONNECTORS: list[str] = [ "CRAWLED_URL", "CIRCLEBACK", "OBSIDIAN_CONNECTOR", + "DEXSCREENER_CONNECTOR", # Composio connectors "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", "COMPOSIO_GMAIL_CONNECTOR", @@ -89,6 +90,7 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = { "BOOKSTACK_CONNECTOR": "BookStack pages (personal documentation)", "CIRCLEBACK": "Circleback meeting notes, transcripts, and action items", "OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)", + "DEXSCREENER_CONNECTOR": "DexScreener real-time cryptocurrency trading pair data and market information", # Composio connectors "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)", "COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)", @@ -610,6 +612,19 @@ async def search_knowledge_base_async( ) all_documents.extend(chunks) + elif connector == "DEXSCREENER_CONNECTOR": + _, chunks = await connector_service.search_dexscreener( + user_query=query, + search_space_id=search_space_id, + top_k=top_k, + start_date=resolved_start_date, + end_date=resolved_end_date, + ) + print(f"[DEBUG] DexScreener search returned {len(chunks)} chunks") + if chunks: + print(f"[DEBUG] First chunk metadata: {chunks[0].get('document', {}).get('metadata', {})}") + all_documents.extend(chunks) + # ========================================================= # Composio Connectors # ========================================================= diff --git a/surfsense_backend/app/connectors/dexscreener_connector.py b/surfsense_backend/app/connectors/dexscreener_connector.py index 0e635bce8..3dedb1c4f 100644 --- a/surfsense_backend/app/connectors/dexscreener_connector.py +++ b/surfsense_backend/app/connectors/dexscreener_connector.py @@ -23,7 +23,7 @@ class DexScreenerConnector: Note: DexScreener API is public and doesn't require authentication. """ - self.base_url = "https://api.dexscreener.com/latest/dex" + self.base_url = "https://api.dexscreener.com" self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits async def make_request( @@ -106,13 +106,18 @@ class DexScreenerConnector: Tuple containing (list of pairs, error message or None) """ try: - endpoint = f"tokens/{chain_id}/{token_address}" + endpoint = f"token-pairs/v1/{chain_id}/{token_address}" response = await self.make_request(endpoint) if response is None: return [], f"Token not found: {chain_id}/{token_address}" - pairs = response.get("pairs", []) + # DexScreener API returns {"pairs": [...]} or {"pairs": null} + if isinstance(response, dict): + pairs = response.get("pairs", []) + else: + # Fallback if API returns list directly (shouldn't happen) + pairs = response if isinstance(response, list) else [] if not pairs: return [], f"No trading pairs found for {chain_id}/{token_address}" @@ -121,6 +126,7 @@ class DexScreenerConnector: except Exception as e: return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}" + def format_pair_to_markdown( self, diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index a27c2125c..fd39b3c67 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -905,6 +905,17 @@ async def index_connector_content( ) response_message = "Luma indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DEXSCREENER_CONNECTOR: + from app.tasks.celery_tasks.connector_tasks import index_dexscreener_pairs_task + + logger.info( + f"Triggering DexScreener indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_dexscreener_pairs_task.delay( + connector_id, search_space_id, str(user.id), indexing_from, indexing_to + ) + response_message = "DexScreener indexing started in the background." + elif ( connector.connector_type == SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 4c5599815..d59972251 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -2872,6 +2872,135 @@ class ConnectorService: return result_object, obsidian_docs + async def search_dexscreener( + self, + user_query: str, + search_space_id: int, + top_k: int = 20, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> tuple: + """ + Search for DexScreener cryptocurrency trading pair data and return both the source information and langchain documents. + + Uses combined chunk-level and document-level hybrid search with RRF fusion. + + Args: + user_query: The user's query + search_space_id: The search space ID to search in + top_k: Maximum number of results to return + start_date: Optional start date for filtering documents by updated_at + end_date: Optional end date for filtering documents by updated_at + + Returns: + tuple: (sources_info, langchain_documents) + """ + dexscreener_docs = await self._combined_rrf_search( + query_text=user_query, + search_space_id=search_space_id, + document_type="DEXSCREENER_CONNECTOR", + top_k=top_k, + start_date=start_date, + end_date=end_date, + ) + + # Early return if no results + if not dexscreener_docs: + return { + "id": 54, + "name": "DexScreener", + "type": "DEXSCREENER_CONNECTOR", + "sources": [], + }, [] + + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + # Extract token and chain info from metadata + base_token = metadata.get("base_symbol", "") + quote_token = metadata.get("quote_symbol", "") + chain = metadata.get("chain_id", "") + dex = metadata.get("dex", "") + + if base_token and quote_token: + title = f"{base_token}/{quote_token}" + if chain: + title += f" on {chain.capitalize()}" + if dex: + title += f" ({dex})" + return title + + return doc_info.get("title", "DexScreener Trading Pair") + + def _url_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + # DexScreener URL format: https://dexscreener.com/{chain}/{pair_address} + chain = metadata.get("chain_id", "") + pair_address = metadata.get("pair_address", "") + if chain and pair_address: + return f"https://dexscreener.com/{chain}/{pair_address}" + return "" + + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + # Build a rich description with price and volume info + description_parts = [] + + price_usd = metadata.get("price_usd") + if price_usd: + description_parts.append(f"Price: ${price_usd}") + + volume_24h = metadata.get("volume_24h") + if volume_24h: + description_parts.append(f"24h Volume: ${volume_24h}") + + price_change_24h = metadata.get("price_change_24h") + if price_change_24h is not None: + sign = "+" if price_change_24h > 0 else "" + description_parts.append(f"24h Change: {sign}{price_change_24h}%") + + liquidity_usd = metadata.get("liquidity_usd") + if liquidity_usd: + description_parts.append(f"Liquidity: ${liquidity_usd}") + + if description_parts: + return " | ".join(description_parts) + + # Fallback to chunk content preview + return self._chunk_preview(chunk.get("content", ""), limit=200) + + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "chain_id": metadata.get("chain_id", ""), + "dex": metadata.get("dex", ""), + "pair_address": metadata.get("pair_address", ""), + "base_symbol": metadata.get("base_symbol", ""), + "quote_symbol": metadata.get("quote_symbol", ""), + "price_usd": metadata.get("price_usd"), + "volume_24h": metadata.get("volume_24h"), + "price_change_24h": metadata.get("price_change_24h"), + "liquidity_usd": metadata.get("liquidity_usd"), + } + + sources_list = self._build_chunk_sources_from_documents( + dexscreener_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) + + # Create result object + result_object = { + "id": 54, + "name": "DexScreener", + "type": "DEXSCREENER_CONNECTOR", + "sources": sources_list, + } + + return result_object, dexscreener_docs + + # ========================================================================= # Composio Connector Search Methods # =========================================================================