chore: merge upstream with local feature additions

- Merged dexscreener connector, composio connectors, crypto realtime tools from upstream
- Kept local additions: dropbox/onedrive connectors, memory routes, model_list routes, RefreshToken model
- Resolved frontend conflicts: kept tool UIs from both sides
- Accepted upstream lock files (uv.lock, pnpm-lock.yaml)
This commit is contained in:
Vonic 2026-04-13 23:31:52 +07:00
commit 6e86cd7e8a
803 changed files with 152168 additions and 14005 deletions

View file

@ -92,6 +92,8 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = {
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE",
"COMPOSIO_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR",
# Cryptocurrency data
"DEXSCREENER_CONNECTOR": "DEXSCREENER_CONNECTOR",
}
# Document types that don't come from SearchSourceConnector but should always be searchable

View file

@ -11,10 +11,19 @@ Available tools:
- generate_image: Generate images from text descriptions using AI models
- scrape_webpage: Extract content from webpages
- update_memory: Update the user's / team's memory document
- save_memory: Store facts/preferences about the user
- recall_memory: Retrieve relevant user memories
- get_live_token_price: Get real-time crypto price from DexScreener
- get_live_token_data: Get comprehensive real-time crypto market data
"""
# Registry exports
# Tool factory exports (for direct use)
from .crypto_realtime import (
create_get_live_token_data_tool,
create_get_live_token_price_tool,
)
from .display_image import create_display_image_tool
from .generate_image import create_generate_image_tool
from .knowledge_base import (
CONNECTOR_DESCRIPTIONS,
@ -46,6 +55,11 @@ __all__ = [
"create_generate_image_tool",
"create_generate_podcast_tool",
"create_generate_video_presentation_tool",
"create_get_live_token_data_tool",
"create_get_live_token_price_tool",
"create_link_preview_tool",
"create_recall_memory_tool",
"create_save_memory_tool",
"create_scrape_webpage_tool",
"create_search_surfsense_docs_tool",
"create_update_memory_tool",

View file

@ -0,0 +1,322 @@
"""
Real-time cryptocurrency data tools for the SurfSense agent.
This module provides tools for fetching LIVE crypto data directly from DexScreener API.
These tools complement the RAG-based search_knowledge_base tool:
- RAG (search_knowledge_base): Historical context, trends, analysis from indexed data
- Real-time tools: Current prices, live market data
The AI agent decides which to use based on the query:
- "What's the current price of BULLA?" get_live_token_price (real-time)
- "How has BULLA performed this week?" search_knowledge_base (RAG)
- "Analyze BULLA for me" Both (RAG for context + real-time for current data)
"""
import hashlib
import logging
from typing import Any
from langchain_core.tools import tool
from app.connectors.dexscreener_connector import DexScreenerConnector
logger = logging.getLogger(__name__)
def generate_token_id(chain: str, address: str) -> str:
"""Generate a unique ID for a token query."""
hash_val = hashlib.md5(f"{chain}:{address}".encode()).hexdigest()[:12]
return f"token-{hash_val}"
def create_get_live_token_price_tool():
"""
Factory function to create the get_live_token_price tool.
This tool fetches REAL-TIME price data directly from DexScreener API.
Use this when users ask for current/live prices.
Returns:
A configured tool function for fetching live token prices.
"""
@tool
async def get_live_token_price(
chain: str,
token_address: str,
token_symbol: str | None = None,
) -> dict[str, Any]:
"""
Get the LIVE/CURRENT price of a cryptocurrency token from DexScreener.
Use this tool when the user asks for:
- Current price: "What's the price of BULLA right now?"
- Live data: "Show me live price for SOL"
- Real-time info: "What's WETH trading at?"
DO NOT use this for historical analysis - use search_knowledge_base instead.
Args:
chain: Blockchain network (e.g., 'solana', 'ethereum', 'base', 'bsc')
token_address: The token's contract address
token_symbol: Optional token symbol for display (e.g., 'BULLA', 'SOL')
Returns:
Dictionary with live price data including:
- price_usd: Current price in USD
- price_change_24h: 24-hour price change percentage
- price_change_1h: 1-hour price change percentage
- volume_24h: 24-hour trading volume
- liquidity_usd: Total liquidity in USD
- market_cap: Market capitalization
- dex: DEX where the best liquidity is found
- pair_url: Link to DexScreener chart
"""
token_id = generate_token_id(chain, token_address)
try:
# Initialize DexScreener connector
connector = DexScreenerConnector()
# Fetch live data from API
pairs, error = await connector.get_token_pairs(chain, token_address)
if error:
logger.warning(f"[get_live_token_price] Error: {error}")
return {
"id": token_id,
"kind": "live_token_price",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": error,
}
if not pairs:
return {
"id": token_id,
"kind": "live_token_price",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": f"No trading pairs found for {token_symbol or token_address} on {chain}",
}
# Get the best pair (highest liquidity)
best_pair = max(pairs, key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0))
# Extract data from best pair
base_token = best_pair.get("baseToken", {})
price_change = best_pair.get("priceChange", {})
volume = best_pair.get("volume", {})
liquidity = best_pair.get("liquidity", {})
return {
"id": token_id,
"kind": "live_token_price",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol or base_token.get("symbol", "Unknown"),
"token_name": base_token.get("name", "Unknown"),
"price_usd": best_pair.get("priceUsd", "N/A"),
"price_native": best_pair.get("priceNative", "N/A"),
"price_change_5m": price_change.get("m5", 0),
"price_change_1h": price_change.get("h1", 0),
"price_change_6h": price_change.get("h6", 0),
"price_change_24h": price_change.get("h24", 0),
"volume_24h": volume.get("h24", 0),
"volume_6h": volume.get("h6", 0),
"volume_1h": volume.get("h1", 0),
"liquidity_usd": liquidity.get("usd", 0),
"market_cap": best_pair.get("marketCap", 0),
"fdv": best_pair.get("fdv", 0),
"dex": best_pair.get("dexId", "Unknown"),
"pair_address": best_pair.get("pairAddress", ""),
"pair_url": best_pair.get("url", ""),
"total_pairs": len(pairs),
"data_source": "DexScreener API (Real-time)",
}
except Exception as e:
error_message = str(e)
logger.error(f"[get_live_token_price] Error fetching {chain}/{token_address}: {error_message}")
return {
"id": token_id,
"kind": "live_token_price",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": f"Failed to fetch live price: {error_message[:100]}",
}
return get_live_token_price
def create_get_live_token_data_tool():
"""
Factory function to create the get_live_token_data tool.
This tool fetches comprehensive REAL-TIME market data from DexScreener API.
Use this when users want detailed current market information.
Returns:
A configured tool function for fetching live token market data.
"""
@tool
async def get_live_token_data(
chain: str,
token_address: str,
token_symbol: str | None = None,
include_all_pairs: bool = False,
) -> dict[str, Any]:
"""
Get comprehensive LIVE market data for a cryptocurrency token.
Use this tool when the user asks for:
- Detailed market info: "Show me full market data for BULLA"
- Trading activity: "What's the trading volume for SOL?"
- Liquidity info: "How much liquidity does WETH have?"
- Transaction counts: "How many buys/sells for this token?"
This returns more detailed data than get_live_token_price.
For historical trends and analysis, use search_knowledge_base instead.
Args:
chain: Blockchain network (e.g., 'solana', 'ethereum', 'base', 'bsc')
token_address: The token's contract address
token_symbol: Optional token symbol for display
include_all_pairs: If True, include data from all trading pairs
Returns:
Dictionary with comprehensive market data including:
- All price data from get_live_token_price
- Transaction counts (buys/sells in 24h, 6h, 1h)
- All trading pairs (if include_all_pairs=True)
- Aggregated volume across all pairs
"""
token_id = generate_token_id(chain, token_address)
try:
# Initialize DexScreener connector
connector = DexScreenerConnector()
# Fetch live data from API
pairs, error = await connector.get_token_pairs(chain, token_address)
if error:
logger.warning(f"[get_live_token_data] Error: {error}")
return {
"id": token_id,
"kind": "live_token_data",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": error,
}
if not pairs:
return {
"id": token_id,
"kind": "live_token_data",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": f"No trading pairs found for {token_symbol or token_address} on {chain}",
}
# Get the best pair (highest liquidity)
best_pair = max(pairs, key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0))
# Extract data from best pair
base_token = best_pair.get("baseToken", {})
price_change = best_pair.get("priceChange", {})
volume = best_pair.get("volume", {})
liquidity = best_pair.get("liquidity", {})
txns = best_pair.get("txns", {})
# Calculate aggregated stats across all pairs
total_volume_24h = sum(float(p.get("volume", {}).get("h24", 0) or 0) for p in pairs)
total_liquidity = sum(float(p.get("liquidity", {}).get("usd", 0) or 0) for p in pairs)
total_buys_24h = sum(p.get("txns", {}).get("h24", {}).get("buys", 0) or 0 for p in pairs)
total_sells_24h = sum(p.get("txns", {}).get("h24", {}).get("sells", 0) or 0 for p in pairs)
result = {
"id": token_id,
"kind": "live_token_data",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol or base_token.get("symbol", "Unknown"),
"token_name": base_token.get("name", "Unknown"),
# Price data
"price_usd": best_pair.get("priceUsd", "N/A"),
"price_native": best_pair.get("priceNative", "N/A"),
"price_change_5m": price_change.get("m5", 0),
"price_change_1h": price_change.get("h1", 0),
"price_change_6h": price_change.get("h6", 0),
"price_change_24h": price_change.get("h24", 0),
# Volume data (best pair)
"volume_24h": volume.get("h24", 0),
"volume_6h": volume.get("h6", 0),
"volume_1h": volume.get("h1", 0),
"volume_5m": volume.get("m5", 0),
# Liquidity
"liquidity_usd": liquidity.get("usd", 0),
"liquidity_base": liquidity.get("base", 0),
"liquidity_quote": liquidity.get("quote", 0),
# Market metrics
"market_cap": best_pair.get("marketCap", 0),
"fdv": best_pair.get("fdv", 0),
# Transaction counts (best pair)
"txns_24h_buys": txns.get("h24", {}).get("buys", 0),
"txns_24h_sells": txns.get("h24", {}).get("sells", 0),
"txns_6h_buys": txns.get("h6", {}).get("buys", 0),
"txns_6h_sells": txns.get("h6", {}).get("sells", 0),
"txns_1h_buys": txns.get("h1", {}).get("buys", 0),
"txns_1h_sells": txns.get("h1", {}).get("sells", 0),
# Aggregated stats (all pairs)
"total_volume_24h_all_pairs": total_volume_24h,
"total_liquidity_all_pairs": total_liquidity,
"total_buys_24h_all_pairs": total_buys_24h,
"total_sells_24h_all_pairs": total_sells_24h,
# DEX info
"dex": best_pair.get("dexId", "Unknown"),
"pair_address": best_pair.get("pairAddress", ""),
"pair_url": best_pair.get("url", ""),
"pair_created_at": best_pair.get("pairCreatedAt"),
# Metadata
"total_pairs": len(pairs),
"data_source": "DexScreener API (Real-time)",
}
# Include all pairs if requested
if include_all_pairs and len(pairs) > 1:
result["all_pairs"] = [
{
"dex": p.get("dexId"),
"pair_address": p.get("pairAddress"),
"quote_symbol": p.get("quoteToken", {}).get("symbol"),
"price_usd": p.get("priceUsd"),
"liquidity_usd": p.get("liquidity", {}).get("usd", 0),
"volume_24h": p.get("volume", {}).get("h24", 0),
"url": p.get("url"),
}
for p in sorted(pairs, key=lambda x: float(x.get("liquidity", {}).get("usd", 0) or 0), reverse=True)[:10]
]
return result
except Exception as e:
error_message = str(e)
logger.error(f"[get_live_token_data] Error fetching {chain}/{token_address}: {error_message}")
return {
"id": token_id,
"kind": "live_token_data",
"chain": chain,
"token_address": token_address,
"token_symbol": token_symbol,
"error": f"Failed to fetch live data: {error_message[:100]}",
}
return get_live_token_data

View file

@ -203,6 +203,11 @@ _ALL_CONNECTORS: list[str] = [
"OBSIDIAN_CONNECTOR",
"ONEDRIVE_FILE",
"DROPBOX_FILE",
"DEXSCREENER_CONNECTOR",
# Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
]
# Human-readable descriptions for each connector type
@ -234,6 +239,11 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = {
"OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)",
"ONEDRIVE_FILE": "Microsoft OneDrive files and documents (personal cloud storage)",
"DROPBOX_FILE": "Dropbox files and documents (cloud storage)",
"DEXSCREENER_CONNECTOR": "DexScreener real-time cryptocurrency trading pair data and market information",
# Composio connectors
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)",
"COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "Google Calendar events via Composio (personal calendar)",
}
@ -771,6 +781,270 @@ async def search_knowledge_base_raw_async(
for docs in connector_results:
all_documents.extend(docs)
elif connector == "TEAMS_CONNECTOR":
_, chunks = await connector_service.search_teams(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "NOTION_CONNECTOR":
_, chunks = await connector_service.search_notion(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "GITHUB_CONNECTOR":
_, chunks = await connector_service.search_github(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "LINEAR_CONNECTOR":
_, chunks = await connector_service.search_linear(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "TAVILY_API":
_, chunks = await connector_service.search_tavily(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
)
all_documents.extend(chunks)
elif connector == "SEARXNG_API":
_, chunks = await connector_service.search_searxng(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
)
all_documents.extend(chunks)
elif connector == "LINKUP_API":
# Keep behavior aligned with researcher: default "standard"
_, chunks = await connector_service.search_linkup(
user_query=query,
search_space_id=search_space_id,
mode="standard",
)
all_documents.extend(chunks)
elif connector == "BAIDU_SEARCH_API":
_, chunks = await connector_service.search_baidu(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
)
all_documents.extend(chunks)
elif connector == "DISCORD_CONNECTOR":
_, chunks = await connector_service.search_discord(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "JIRA_CONNECTOR":
_, chunks = await connector_service.search_jira(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "GOOGLE_CALENDAR_CONNECTOR":
_, chunks = await connector_service.search_google_calendar(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "AIRTABLE_CONNECTOR":
_, chunks = await connector_service.search_airtable(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "GOOGLE_GMAIL_CONNECTOR":
_, chunks = await connector_service.search_google_gmail(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "GOOGLE_DRIVE_FILE":
_, chunks = await connector_service.search_google_drive(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "CONFLUENCE_CONNECTOR":
_, chunks = await connector_service.search_confluence(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "CLICKUP_CONNECTOR":
_, chunks = await connector_service.search_clickup(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "LUMA_CONNECTOR":
_, chunks = await connector_service.search_luma(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "ELASTICSEARCH_CONNECTOR":
_, chunks = await connector_service.search_elasticsearch(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "NOTE":
_, chunks = await connector_service.search_notes(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "BOOKSTACK_CONNECTOR":
_, chunks = await connector_service.search_bookstack(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "CIRCLEBACK":
_, chunks = await connector_service.search_circleback(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "OBSIDIAN_CONNECTOR":
_, chunks = await connector_service.search_obsidian(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "DEXSCREENER_CONNECTOR":
_, chunks = await connector_service.search_dexscreener(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
print(f"[DEBUG] DexScreener search returned {len(chunks)} chunks")
if chunks:
print(f"[DEBUG] First chunk metadata: {chunks[0].get('document', {}).get('metadata', {})}")
all_documents.extend(chunks)
# =========================================================
# Composio Connectors
# =========================================================
elif connector == "COMPOSIO_GOOGLE_DRIVE_CONNECTOR":
_, chunks = await connector_service.search_composio_google_drive(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "COMPOSIO_GMAIL_CONNECTOR":
_, chunks = await connector_service.search_composio_gmail(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
elif connector == "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR":
_, chunks = await connector_service.search_composio_google_calendar(
user_query=query,
search_space_id=search_space_id,
top_k=top_k,
start_date=resolved_start_date,
end_date=resolved_end_date,
)
all_documents.extend(chunks)
except Exception as e:
print(f"Error searching connector {connector}: {e}")
continue
# Deduplicate by content hash
seen_doc_ids: set[Any] = set()
seen_content_hashes: set[int] = set()
deduplicated: list[dict[str, Any]] = []

View file

@ -50,6 +50,11 @@ from .confluence import (
create_delete_confluence_page_tool,
create_update_confluence_page_tool,
)
from .crypto_realtime import (
create_get_live_token_data_tool,
create_get_live_token_price_tool,
)
from .display_image import create_display_image_tool
from .dropbox import (
create_create_dropbox_file_tool,
create_delete_dropbox_file_tool,
@ -80,6 +85,8 @@ from .linear import (
create_delete_linear_issue_tool,
create_update_linear_issue_tool,
)
from .knowledge_base import create_search_knowledge_base_tool
from .link_preview import create_link_preview_tool
from .mcp_tool import load_mcp_tools
from .notion import (
create_create_notion_page_tool,
@ -522,6 +529,26 @@ BUILTIN_TOOLS: list[ToolDefinition] = [
),
requires=["db_session", "search_space_id", "user_id"],
),
# =========================================================================
# CRYPTO REAL-TIME TOOLS - Hybrid approach (RAG + Real-time)
# =========================================================================
# These tools fetch LIVE data directly from DexScreener API.
# Use alongside search_knowledge_base for comprehensive crypto analysis:
# - search_knowledge_base: Historical context, trends (from indexed data)
# - get_live_token_price: Current price (real-time API call)
# - get_live_token_data: Full market data (real-time API call)
ToolDefinition(
name="get_live_token_price",
description="Get LIVE/CURRENT cryptocurrency price from DexScreener API. Use for real-time price queries.",
factory=lambda deps: create_get_live_token_price_tool(),
requires=[],
),
ToolDefinition(
name="get_live_token_data",
description="Get comprehensive LIVE market data (price, volume, liquidity, transactions) from DexScreener API.",
factory=lambda deps: create_get_live_token_data_tool(),
requires=[],
),
]

View file

@ -1,246 +1,34 @@
import asyncio
import gc
import logging
import time
from collections import defaultdict
from contextlib import asynccontextmanager
from threading import Lock
import redis
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from limits.storage import MemoryStorage
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request as StarletteRequest
from starlette.responses import Response as StarletteResponse
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from app.agents.new_chat.checkpointer import (
close_checkpointer,
setup_checkpointer_tables,
)
from app.config import (
config,
initialize_image_gen_router,
initialize_llm_router,
initialize_vision_llm_router,
)
from app.config import config, initialize_llm_router
from app.db import User, create_db_and_tables, get_async_session
from app.routes import router as crud_router
from app.routes.auth_routes import router as auth_router
from app.schemas import UserCreate, UserRead, UserUpdate
from app.tasks.surfsense_docs_indexer import seed_surfsense_docs
from app.users import SECRET, auth_backend, current_active_user, fastapi_users
from app.utils.perf import get_perf_logger, log_system_snapshot
rate_limit_logger = logging.getLogger("surfsense.rate_limit")
# ============================================================================
# Rate Limiting Configuration (SlowAPI + Redis)
# ============================================================================
# Uses the same Redis instance as Celery for zero additional infrastructure.
# Protects auth endpoints from brute force and user enumeration attacks.
# SlowAPI limiter — provides default rate limits (1024/min) for ALL routes
# via the ASGI middleware. This is the general safety net.
# in_memory_fallback ensures requests are still served (with per-worker
# in-memory limiting) when Redis is unreachable, instead of hanging.
limiter = Limiter(
key_func=get_remote_address,
storage_uri=config.REDIS_APP_URL,
default_limits=["1024/minute"],
in_memory_fallback_enabled=True,
in_memory_fallback=[MemoryStorage()],
)
def _rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):
"""Custom 429 handler that returns JSON matching our frontend error format."""
retry_after = exc.detail.split("per")[-1].strip() if exc.detail else "60"
return JSONResponse(
status_code=429,
content={"detail": "RATE_LIMIT_EXCEEDED"},
headers={"Retry-After": retry_after},
)
# ============================================================================
# Auth-Specific Rate Limits (Redis-backed with in-memory fallback)
# ============================================================================
# Stricter per-IP limits on auth endpoints to prevent:
# - Brute force password attacks
# - User enumeration via REGISTER_USER_ALREADY_EXISTS
# - Email spam via forgot-password
#
# Primary: Redis INCR+EXPIRE (shared across all workers).
# Fallback: In-memory sliding window (per-worker) when Redis is unavailable.
# Same Redis instance as SlowAPI / Celery.
_rate_limit_redis: redis.Redis | None = None
# In-memory fallback rate limiter (per-worker, used only when Redis is down)
_memory_rate_limits: dict[str, list[float]] = defaultdict(list)
_memory_lock = Lock()
def _get_rate_limit_redis() -> redis.Redis:
"""Get or create Redis client for auth rate limiting."""
global _rate_limit_redis
if _rate_limit_redis is None:
_rate_limit_redis = redis.from_url(config.REDIS_APP_URL, decode_responses=True)
return _rate_limit_redis
def _check_rate_limit_memory(
client_ip: str, max_requests: int, window_seconds: int, scope: str
):
"""
In-memory fallback rate limiter using a sliding window.
Used only when Redis is unavailable. Per-worker only (not shared),
so effective limit = max_requests x num_workers.
"""
key = f"{scope}:{client_ip}"
now = time.monotonic()
with _memory_lock:
timestamps = [t for t in _memory_rate_limits[key] if now - t < window_seconds]
if not timestamps:
_memory_rate_limits.pop(key, None)
else:
_memory_rate_limits[key] = timestamps
if len(timestamps) >= max_requests:
rate_limit_logger.warning(
f"Rate limit exceeded (in-memory fallback) on {scope} for IP {client_ip} "
f"({len(timestamps)}/{max_requests} in {window_seconds}s)"
)
raise HTTPException(
status_code=429,
detail="RATE_LIMIT_EXCEEDED",
)
_memory_rate_limits[key] = [*timestamps, now]
def _check_rate_limit(
request: Request, max_requests: int, window_seconds: int, scope: str
):
"""
Check per-IP rate limit using Redis. Raises 429 if exceeded.
Uses atomic INCR + EXPIRE to avoid race conditions.
Falls back to in-memory sliding window if Redis is unavailable.
"""
client_ip = get_remote_address(request)
key = f"surfsense:auth_rate_limit:{scope}:{client_ip}"
try:
r = _get_rate_limit_redis()
# Atomic: increment first, then set TTL if this is a new key
pipe = r.pipeline()
pipe.incr(key)
pipe.expire(key, window_seconds)
result = pipe.execute()
except (redis.exceptions.RedisError, OSError) as exc:
# Redis unavailable — fall back to in-memory rate limiting
rate_limit_logger.warning(
f"Redis unavailable for rate limiting ({scope}), "
f"falling back to in-memory limiter for {client_ip}: {exc}"
)
_check_rate_limit_memory(client_ip, max_requests, window_seconds, scope)
return
current_count = result[0] # INCR returns the new value
if current_count > max_requests:
rate_limit_logger.warning(
f"Rate limit exceeded on {scope} for IP {client_ip} "
f"({current_count}/{max_requests} in {window_seconds}s)"
)
raise HTTPException(
status_code=429,
detail="RATE_LIMIT_EXCEEDED",
)
def rate_limit_login(request: Request):
"""5 login attempts per minute per IP."""
_check_rate_limit(request, max_requests=5, window_seconds=60, scope="login")
def rate_limit_register(request: Request):
"""3 registration attempts per minute per IP."""
_check_rate_limit(request, max_requests=3, window_seconds=60, scope="register")
def rate_limit_password_reset(request: Request):
"""2 password reset attempts per minute per IP."""
_check_rate_limit(
request, max_requests=2, window_seconds=60, scope="password_reset"
)
def _enable_slow_callback_logging(threshold_sec: float = 0.5) -> None:
"""Monkey-patch the event loop to warn whenever a callback blocks longer than *threshold_sec*.
This helps pinpoint synchronous code that freezes the entire FastAPI server.
Only active when the PERF_DEBUG env var is set (to avoid overhead in production).
"""
import os
if not os.environ.get("PERF_DEBUG"):
return
_slow_log = logging.getLogger("surfsense.perf.slow")
_slow_log.setLevel(logging.WARNING)
if not _slow_log.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(asctime)s [SLOW-CALLBACK] %(message)s"))
_slow_log.addHandler(_h)
_slow_log.propagate = False
loop = asyncio.get_running_loop()
loop.slow_callback_duration = threshold_sec # type: ignore[attr-defined]
loop.set_debug(True)
_slow_log.warning(
"Event-loop slow-callback detector ENABLED (threshold=%.1fs). "
"Set PERF_DEBUG='' to disable.",
threshold_sec,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Tune GC: lower gen-2 threshold so long-lived garbage is collected
# sooner (default 700/10/10 → 700/10/5). This reduces peak RSS
# with minimal CPU overhead.
gc.set_threshold(700, 10, 5)
_enable_slow_callback_logging(threshold_sec=0.5)
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
# Setup LangGraph checkpointer tables for conversation persistence
await setup_checkpointer_tables()
# Initialize LLM Router for Auto mode load balancing
initialize_llm_router()
initialize_image_gen_router()
initialize_vision_llm_router()
try:
await asyncio.wait_for(seed_surfsense_docs(), timeout=120)
except TimeoutError:
logging.getLogger(__name__).warning(
"Surfsense docs seeding timed out after 120s — skipping. "
"Docs will be indexed on the next restart."
)
log_system_snapshot("startup_complete")
# Seed Surfsense documentation
await seed_surfsense_docs()
yield
# Cleanup: close checkpointer connection on shutdown
await close_checkpointer()
@ -254,73 +42,6 @@ def registration_allowed():
app = FastAPI(lifespan=lifespan)
# Register rate limiter and custom 429 handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ---------------------------------------------------------------------------
# Request-level performance middleware
# ---------------------------------------------------------------------------
# Logs wall-clock time, method, path, and status for every request so we can
# spot slow endpoints in production logs.
_PERF_SLOW_REQUEST_THRESHOLD = float(
__import__("os").environ.get("PERF_SLOW_REQUEST_MS", "2000")
)
class RequestPerfMiddleware(BaseHTTPMiddleware):
"""Middleware that logs per-request wall-clock time.
- ALL requests are logged at DEBUG level.
- Requests exceeding PERF_SLOW_REQUEST_MS (default 2000ms) are logged at
WARNING level with a system snapshot so we can correlate slow responses
with CPU/memory usage at that moment.
"""
async def dispatch(
self, request: StarletteRequest, call_next: RequestResponseEndpoint
) -> StarletteResponse:
perf = get_perf_logger()
t0 = time.perf_counter()
response = await call_next(request)
elapsed_ms = (time.perf_counter() - t0) * 1000
path = request.url.path
method = request.method
status = response.status_code
perf.debug(
"[request] %s %s -> %d in %.1fms",
method,
path,
status,
elapsed_ms,
)
if elapsed_ms > _PERF_SLOW_REQUEST_THRESHOLD:
perf.warning(
"[SLOW_REQUEST] %s %s -> %d in %.1fms (threshold=%.0fms)",
method,
path,
status,
elapsed_ms,
_PERF_SLOW_REQUEST_THRESHOLD,
)
log_system_snapshot("slow_request")
return response
app.add_middleware(RequestPerfMiddleware)
# Add SlowAPI middleware for automatic rate limiting
# Uses Starlette BaseHTTPMiddleware (not the raw ASGI variant) to avoid
# corrupting StreamingResponse — SlowAPIASGIMiddleware re-sends
# http.response.start on every body chunk, breaking SSE/streaming endpoints.
app.add_middleware(SlowAPIMiddleware)
# Add ProxyHeaders middleware FIRST to trust proxy headers (e.g., from Cloudflare)
# This ensures FastAPI uses HTTPS in redirects when behind a proxy
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
@ -346,42 +67,40 @@ if config.NEXT_FRONTEND_URL:
if www_url not in allowed_origins:
allowed_origins.append(www_url)
allowed_origins.extend(
[ # For local development and desktop app
"http://localhost:3000",
"http://127.0.0.1:3000",
]
)
# For local development, also allow common localhost origins
if not config.BACKEND_URL or (
config.NEXT_FRONTEND_URL and "localhost" in config.NEXT_FRONTEND_URL
):
allowed_origins.extend(
[
"http://localhost:3000",
"http://127.0.0.1:3000",
"http://localhost:3999",
"http://127.0.0.1:3999",
]
)
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_origin_regex=r"^https?://(localhost|127\.0\.0\.1)(:\d+)?$",
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
app.include_router(
fastapi_users.get_auth_router(auth_backend),
prefix="/auth/jwt",
tags=["auth"],
dependencies=[Depends(rate_limit_login)],
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
)
app.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
dependencies=[
Depends(rate_limit_register),
Depends(registration_allowed), # blocks registration when disabled
],
dependencies=[Depends(registration_allowed)], # blocks registration when disabled
)
app.include_router(
fastapi_users.get_reset_password_router(),
prefix="/auth",
tags=["auth"],
dependencies=[Depends(rate_limit_password_reset)],
)
app.include_router(
fastapi_users.get_verify_router(UserRead),
@ -394,9 +113,6 @@ app.include_router(
tags=["users"],
)
# Include custom auth routes (refresh token, logout)
app.include_router(auth_router)
if config.AUTH_TYPE == "GOOGLE":
from fastapi.responses import RedirectResponse
@ -509,13 +225,6 @@ if config.AUTH_TYPE == "GOOGLE":
app.include_router(crud_router, prefix="/api/v1", tags=["crud"])
@app.get("/health", tags=["health"])
@limiter.exempt
async def health_check():
"""Lightweight liveness probe exempt from rate limiting."""
return {"status": "ok"}
@app.get("/verify-token")
async def authenticated_route(
user: User = Depends(current_active_user),

View file

@ -12,7 +12,7 @@ from sqlalchemy.future import select
from app.config import config
from app.connectors.airtable_connector import AirtableConnector
from app.db import SearchSourceConnector
from app.routes.airtable_add_connector_route import refresh_airtable_token
from app.utils.airtable_token_utils import refresh_airtable_token
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption

View file

@ -14,10 +14,10 @@ from sqlalchemy.future import select
from app.config import config
from app.connectors.clickup_connector import ClickUpConnector
from app.db import SearchSourceConnector
from app.routes.clickup_add_connector_route import refresh_clickup_token
from app.schemas.clickup_auth_credentials import ClickUpAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
@ -184,6 +184,8 @@ class ClickUpHistoryConnector:
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.clickup_add_connector_route import refresh_clickup_token
connector = await refresh_clickup_token(self._session, connector)
# Reload credentials after refresh

View file

@ -86,14 +86,6 @@ class ConfluenceHistoryConnector:
if is_oauth:
# OAuth 2.0 authentication
# Check if access_token exists before processing
raw_access_token = config_data.get("access_token")
if not raw_access_token:
raise ValueError(
"Confluence access token not found. "
"Please reconnect your Confluence account."
)
# Decrypt credentials if they are encrypted
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
@ -125,16 +117,6 @@ class ConfluenceHistoryConnector:
f"Failed to decrypt Confluence credentials: {e!s}"
) from e
# Final validation after decryption
final_token = config_data.get("access_token")
if not final_token or (
isinstance(final_token, str) and not final_token.strip()
):
raise ValueError(
"Confluence access token is invalid or empty. "
"Please reconnect your Confluence account."
)
try:
self._credentials = AtlassianAuthCredentialsBase.from_dict(
config_data
@ -189,11 +171,9 @@ class ConfluenceHistoryConnector:
f"Connector {self._connector_id} not found; cannot refresh token."
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.confluence_add_connector_route import (
refresh_confluence_token,
)
from app.routes.confluence_add_connector_route import refresh_confluence_token
connector = await refresh_confluence_token(self._session, connector)
# Reload credentials after refresh
@ -344,61 +324,6 @@ class ConfluenceHistoryConnector:
logger.error(f"Confluence API request error: {e!s}", exc_info=True)
raise Exception(f"Confluence API request failed: {e!s}") from e
async def _make_api_request_with_method(
self,
endpoint: str,
method: str = "GET",
json_payload: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make a request to the Confluence API with a specified HTTP method."""
if not self._use_oauth:
raise ValueError("Write operations require OAuth authentication")
token = await self._get_valid_token()
base_url = await self._get_base_url()
http_client = await self._get_client()
url = f"{base_url}/wiki/api/v2/{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
try:
method_upper = method.upper()
if method_upper == "POST":
response = await http_client.post(
url, headers=headers, json=json_payload, params=params
)
elif method_upper == "PUT":
response = await http_client.put(
url, headers=headers, json=json_payload, params=params
)
elif method_upper == "DELETE":
response = await http_client.delete(url, headers=headers, params=params)
else:
response = await http_client.get(url, headers=headers, params=params)
response.raise_for_status()
if response.status_code == 204 or not response.text:
return {"status": "success"}
return response.json()
except httpx.HTTPStatusError as e:
error_detail = {
"status_code": e.response.status_code,
"url": str(e.request.url),
"response_text": e.response.text,
}
logger.error(f"Confluence API HTTP error: {error_detail}")
raise Exception(
f"Confluence API request failed (HTTP {e.response.status_code}): {e.response.text}"
) from e
except httpx.RequestError as e:
logger.error(f"Confluence API request error: {e!s}", exc_info=True)
raise Exception(f"Confluence API request failed: {e!s}") from e
async def get_all_spaces(self) -> list[dict[str, Any]]:
"""
Fetch all spaces from Confluence.
@ -651,65 +576,6 @@ class ConfluenceHistoryConnector:
except Exception as e:
return [], f"Error fetching pages: {e!s}"
async def get_page(self, page_id: str) -> dict[str, Any]:
"""Fetch a single page by ID with body content."""
return await self._make_api_request(
f"pages/{page_id}", params={"body-format": "storage"}
)
async def create_page(
self,
space_id: str,
title: str,
body: str,
parent_page_id: str | None = None,
) -> dict[str, Any]:
"""Create a new Confluence page."""
payload: dict[str, Any] = {
"spaceId": space_id,
"title": title,
"body": {
"representation": "storage",
"value": body,
},
"status": "current",
}
if parent_page_id:
payload["parentId"] = parent_page_id
return await self._make_api_request_with_method(
"pages", method="POST", json_payload=payload
)
async def update_page(
self,
page_id: str,
title: str,
body: str,
version_number: int,
) -> dict[str, Any]:
"""Update an existing Confluence page (requires version number)."""
payload: dict[str, Any] = {
"id": page_id,
"title": title,
"body": {
"representation": "storage",
"value": body,
},
"version": {
"number": version_number,
},
"status": "current",
}
return await self._make_api_request_with_method(
f"pages/{page_id}", method="PUT", json_payload=payload
)
async def delete_page(self, page_id: str) -> dict[str, Any]:
"""Delete a Confluence page."""
return await self._make_api_request_with_method(
f"pages/{page_id}", method="DELETE"
)
async def close(self):
"""Close the HTTP client connection."""
if self._http_client:

View file

@ -0,0 +1,258 @@
"""
DexScreener Connector Module
A module for retrieving cryptocurrency trading pair data from DexScreener API.
Allows fetching pair information for tracked tokens across multiple blockchain networks.
"""
import asyncio
import logging
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class DexScreenerConnector:
"""Class for retrieving trading pair data from DexScreener API."""
def __init__(self):
"""
Initialize the DexScreenerConnector class.
Note: DexScreener API is public and doesn't require authentication.
"""
self.base_url = "https://api.dexscreener.com"
self.rate_limit_delay = 0.2 # 200ms delay between requests to respect rate limits
async def make_request(
self,
endpoint: str,
max_retries: int = 3
) -> dict[str, Any] | None:
"""
Make an async request to the DexScreener API with retry logic.
Args:
endpoint: API endpoint path (without base URL)
max_retries: Maximum number of retry attempts for failed requests
Returns:
Response data from the API, or None if request fails
Raises:
Exception: If the API request fails after all retries
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
if response.status_code == 200:
# Add delay to respect rate limits
await self._rate_limit_delay()
return response.json()
elif response.status_code == 429:
# Rate limit exceeded - exponential backoff
wait_time = (2 ** attempt) * 1.0 # 1s, 2s, 4s
logger.warning(f"Rate limit exceeded. Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
elif response.status_code == 404:
# Token/pair not found - return None instead of raising
logger.info(f"Token not found: {endpoint}")
return None
else:
raise Exception(
f"API request failed with status code {response.status_code}: {response.text}"
)
except httpx.TimeoutException:
if attempt < max_retries - 1:
logger.warning(f"Request timeout. Retrying... (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"Request timeout after {max_retries} attempts")
except httpx.RequestError as e:
if attempt < max_retries - 1:
logger.warning(f"Network error: {e}. Retrying... (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"Network error after {max_retries} attempts: {e}") from e
return None
async def _rate_limit_delay(self):
"""Add delay to respect API rate limits (300 req/min = ~200ms between requests)."""
import asyncio
await asyncio.sleep(self.rate_limit_delay)
async def get_token_pairs(
self,
chain_id: str,
token_address: str
) -> tuple[list[dict[str, Any]], str | None]:
"""
Fetch all trading pairs for a specific token on a blockchain.
Args:
chain_id: Blockchain identifier (e.g., 'ethereum', 'bsc', 'polygon')
token_address: Token contract address (0x format)
Returns:
Tuple containing (list of pairs, error message or None)
"""
try:
endpoint = f"token-pairs/v1/{chain_id}/{token_address}"
response = await self.make_request(endpoint)
if response is None:
return [], f"Token not found: {chain_id}/{token_address}"
# DexScreener API returns {"pairs": [...]} or {"pairs": null}
if isinstance(response, dict):
pairs = response.get("pairs", [])
else:
# Fallback if API returns list directly (shouldn't happen)
pairs = response if isinstance(response, list) else []
if not pairs:
return [], f"No trading pairs found for {chain_id}/{token_address}"
return pairs, None
except Exception as e:
return [], f"Error fetching pairs for {chain_id}/{token_address}: {e!s}"
def format_pair_to_markdown(
self,
pair: dict[str, Any],
token_name: str | None = None
) -> str:
"""
Convert a trading pair to markdown format.
Args:
pair: The pair object from DexScreener API
token_name: Optional custom name for the token
Returns:
Markdown string representation of the trading pair
"""
# Extract pair details
pair_address = pair.get("pairAddress", "Unknown")
chain_id = pair.get("chainId", "Unknown")
dex_id = pair.get("dexId", "Unknown")
url = pair.get("url", "")
# Extract token information
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
base_symbol = base_token.get("symbol", "Unknown")
base_name = token_name or base_token.get("name", "Unknown")
quote_symbol = quote_token.get("symbol", "Unknown")
# Extract price and volume data
price_native = pair.get("priceNative", "N/A")
price_usd = pair.get("priceUsd", "N/A")
# Extract liquidity data
liquidity = pair.get("liquidity", {})
liquidity_usd = liquidity.get("usd", 0)
# Extract volume data
volume = pair.get("volume", {})
volume_24h = volume.get("h24", 0)
volume_6h = volume.get("h6", 0)
volume_1h = volume.get("h1", 0)
# Extract price change data
price_change = pair.get("priceChange", {})
price_change_24h = price_change.get("h24", 0)
# Extract market cap and FDV
market_cap = pair.get("marketCap", 0)
fdv = pair.get("fdv", 0)
# Extract transaction counts
txns = pair.get("txns", {})
txns_24h = txns.get("h24", {})
buys_24h = txns_24h.get("buys", 0)
sells_24h = txns_24h.get("sells", 0)
# Build markdown content
markdown_content = f"# {base_symbol}/{quote_symbol} Trading Pair\n\n"
if token_name:
markdown_content += f"**Token:** {base_name} ({base_symbol})\n"
markdown_content += f"**Chain:** {chain_id}\n"
markdown_content += f"**DEX:** {dex_id}\n"
markdown_content += f"**Pair Address:** `{pair_address}`\n\n"
# Add price information
markdown_content += "## Price Information\n\n"
markdown_content += f"- **Price (USD):** ${price_usd}\n"
markdown_content += f"- **Price (Native):** {price_native} {quote_symbol}\n"
markdown_content += f"- **24h Change:** {price_change_24h:+.2f}%\n\n"
# Add liquidity information
markdown_content += "## Liquidity\n\n"
markdown_content += f"- **Total Liquidity:** ${liquidity_usd:,.2f}\n\n"
# Add volume information
markdown_content += "## Trading Volume\n\n"
markdown_content += f"- **24h Volume:** ${volume_24h:,.2f}\n"
markdown_content += f"- **6h Volume:** ${volume_6h:,.2f}\n"
markdown_content += f"- **1h Volume:** ${volume_1h:,.2f}\n\n"
# Add market metrics
markdown_content += "## Market Metrics\n\n"
markdown_content += f"- **Market Cap:** ${market_cap:,.2f}\n"
markdown_content += f"- **FDV (Fully Diluted Valuation):** ${fdv:,.2f}\n\n"
# Add transaction information
markdown_content += "## Transactions (24h)\n\n"
markdown_content += f"- **Buys:** {buys_24h}\n"
markdown_content += f"- **Sells:** {sells_24h}\n"
markdown_content += f"- **Total:** {buys_24h + sells_24h}\n\n"
# Add link to DexScreener
if url:
markdown_content += f"**View on DexScreener:** {url}\n\n"
return markdown_content
# Example usage (uncomment to use):
"""
import asyncio
async def main():
connector = DexScreenerConnector()
# Example: Fetch WETH pairs on Ethereum
chain = "ethereum"
address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
pairs, error = await connector.get_token_pairs(chain, address)
if error:
print(f"Error: {error}")
else:
print(f"Found {len(pairs)} pairs for WETH")
# Format first pair to markdown
if pairs:
markdown = connector.format_pair_to_markdown(pairs[0], "Wrapped Ether")
print("\nSample Pair in Markdown:\n")
print(markdown)
if __name__ == "__main__":
asyncio.run(main())
"""

View file

@ -17,7 +17,6 @@ from sqlalchemy.future import select
from app.config import config
from app.db import SearchSourceConnector
from app.routes.discord_add_connector_route import refresh_discord_token
from app.schemas.discord_auth_credentials import DiscordAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
@ -177,6 +176,8 @@ class DiscordConnector(commands.Bot):
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.discord_add_connector_route import refresh_discord_token
connector = await refresh_discord_token(self._session, connector)
# Reload credentials after refresh

View file

@ -85,14 +85,6 @@ class JiraHistoryConnector:
if is_oauth:
# OAuth 2.0 authentication
# Check if access_token exists before processing
raw_access_token = config_data.get("access_token")
if not raw_access_token:
raise ValueError(
"Jira access token not found. "
"Please reconnect your Jira account."
)
if not config.SECRET_KEY:
raise ValueError(
"SECRET_KEY not configured but tokens are marked as encrypted"
@ -126,16 +118,6 @@ class JiraHistoryConnector:
f"Failed to decrypt Jira credentials: {e!s}"
) from e
# Final validation after decryption
final_token = config_data.get("access_token")
if not final_token or (
isinstance(final_token, str) and not final_token.strip()
):
raise ValueError(
"Jira access token is invalid or empty. "
"Please reconnect your Jira account."
)
try:
self._credentials = AtlassianAuthCredentialsBase.from_dict(
config_data
@ -183,9 +165,9 @@ class JiraHistoryConnector:
f"Connector {self._connector_id} not found; cannot refresh token."
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.jira_add_connector_route import refresh_jira_token
connector = await refresh_jira_token(self._session, connector)
# Reload credentials after refresh

View file

@ -1,12 +1,10 @@
import asyncio
import contextlib
import logging
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar
from notion_client import AsyncClient
from notion_client.errors import APIResponseError
from notion_markdown import to_notion
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -17,15 +15,6 @@ from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
class NotionAPIError(Exception):
"""Raised when the Notion API returns a non-200 response.
The message is always user-presentable; callers should surface it directly
without any additional prefix or wrapping.
"""
# Type variable for generic return type
T = TypeVar("T")
@ -37,12 +26,6 @@ T = TypeVar("T")
MAX_RETRIES = 5
BASE_RETRY_DELAY = 1.0 # seconds
MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout)
MAX_RATE_LIMIT_WAIT_SECONDS = float(
getattr(config, "NOTION_MAX_RETRY_AFTER_SECONDS", 30.0)
)
MAX_TOTAL_RETRY_WAIT_SECONDS = float(
getattr(config, "NOTION_MAX_TOTAL_RETRY_WAIT_SECONDS", 120.0)
)
# Type alias for retry callback function
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
@ -229,8 +212,8 @@ class NotionHistoryConnector:
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.notion_add_connector_route import refresh_notion_token
connector = await refresh_notion_token(self._session, connector)
# Reload credentials after refresh
@ -259,9 +242,8 @@ class NotionHistoryConnector:
logger.error(
f"Failed to refresh Notion token for connector {self._connector_id}: {e!s}"
)
raise NotionAPIError(
"Failed to refresh your Notion connection. "
"Please try again or reconnect your Notion account."
raise Exception(
f"Failed to refresh Notion OAuth credentials: {e!s}"
) from e
return self._credentials.access_token
@ -311,7 +293,6 @@ class NotionHistoryConnector:
"""
last_exception: APIResponseError | None = None
retry_delay = BASE_RETRY_DELAY
total_wait_time = 0.0
for attempt in range(MAX_RETRIES):
try:
@ -345,15 +326,6 @@ class NotionHistoryConnector:
wait_time = retry_delay
else:
wait_time = retry_delay
# Avoid very long worker sleeps from external Retry-After values.
if wait_time > MAX_RATE_LIMIT_WAIT_SECONDS:
logger.warning(
f"Notion Retry-After ({wait_time}s) exceeds cap "
f"({MAX_RATE_LIMIT_WAIT_SECONDS}s). Clamping wait time."
)
wait_time = MAX_RATE_LIMIT_WAIT_SECONDS
logger.warning(
f"Notion API rate limited (429). "
f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
@ -377,14 +349,6 @@ class NotionHistoryConnector:
# Notify about retry via callback (for user notifications)
# Call before sleeping so user sees the message while we wait
if total_wait_time + wait_time > MAX_TOTAL_RETRY_WAIT_SECONDS:
logger.error(
"Notion API retry budget exceeded "
f"({total_wait_time + wait_time:.1f}s > "
f"{MAX_TOTAL_RETRY_WAIT_SECONDS:.1f}s). Failing fast."
)
raise
if on_retry:
try:
await on_retry(
@ -399,7 +363,6 @@ class NotionHistoryConnector:
# Wait before retrying
await asyncio.sleep(wait_time)
total_wait_time += wait_time
# Exponential backoff for next attempt
retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
@ -452,16 +415,6 @@ class NotionHistoryConnector:
if page_title not in self._pages_with_skipped_content:
self._pages_with_skipped_content.append(page_title)
@staticmethod
def _api_error_message(error: APIResponseError) -> str:
"""Extract a stable, human-readable message from Notion API errors."""
body = getattr(error, "body", None)
if isinstance(body, dict):
return str(body.get("message", str(error)))
if body:
return str(body)
return str(error)
async def __aenter__(self):
"""Async context manager entry."""
return self
@ -800,282 +753,3 @@ class NotionHistoryConnector:
# Return empty string for unsupported block types
return ""
# =========================================================================
# WRITE OPERATIONS (create, update, delete pages)
# =========================================================================
async def _get_first_accessible_parent(self) -> str | None:
"""
Get the first accessible page ID that can be used as a parent.
Returns:
Page ID string, or None if no accessible pages found
"""
try:
notion = await self._get_client()
# Search for pages, get most recently edited first
response = await self._api_call_with_retry(
notion.search,
filter={"property": "object", "value": "page"},
sort={"direction": "descending", "timestamp": "last_edited_time"},
page_size=1, # We only need the first one
)
results = response.get("results", [])
if results:
return results[0]["id"]
return None
except Exception as e:
logger.error(f"Error finding accessible parent page: {e}")
return None
def _markdown_to_blocks(self, markdown: str) -> list[dict[str, Any]]:
"""Convert markdown content to Notion blocks using notion-markdown."""
return to_notion(markdown)
async def create_page(
self, title: str, content: str, parent_page_id: str | None = None
) -> dict[str, Any]:
"""
Create a new Notion page.
Args:
title: Page title
content: Page content (markdown format)
parent_page_id: Optional parent page ID (creates as subpage if provided)
Returns:
Dictionary with page details:
- page_id: Created page ID
- url: Page URL
- title: Page title
- status: "success" or "error"
- message: Success/error message
Raises:
APIResponseError: If Notion API returns an error
"""
try:
logger.info(
f"Creating Notion page: title='{title}', parent_page_id={parent_page_id}"
)
# Get Notion client
notion = await self._get_client()
# Convert markdown content to Notion blocks
children = self._markdown_to_blocks(content)
# Prepare parent - find first available page if not provided
if not parent_page_id:
logger.info(
"No parent_page_id provided, searching for first accessible page..."
)
parent_page_id = await self._get_first_accessible_parent()
if not parent_page_id:
logger.warning("No accessible parent pages found")
return {
"status": "error",
"message": "Could not find any accessible Notion pages to use as parent. "
"Please make sure your Notion integration has access to at least one page.",
}
logger.info(f"Using parent_page_id: {parent_page_id}")
parent = {"type": "page_id", "page_id": parent_page_id}
# Create the page with standard title property
properties = {
"title": {"title": [{"type": "text", "text": {"content": title}}]}
}
response = await self._api_call_with_retry(
notion.pages.create,
parent=parent,
properties=properties,
children=children[:100], # Notion API limit: 100 blocks per request
)
page_id = response["id"]
page_url = response["url"]
# If content has more than 100 blocks, append them
if len(children) > 100:
for i in range(100, len(children), 100):
batch = children[i : i + 100]
await self._api_call_with_retry(
notion.blocks.children.append, block_id=page_id, children=batch
)
return {
"status": "success",
"page_id": page_id,
"url": page_url,
"title": title,
"message": f"Created Notion page '{title}'",
}
except APIResponseError as e:
logger.error(f"Notion API error creating page: {e}")
error_msg = self._api_error_message(e)
return {
"status": "error",
"message": f"Failed to create Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error creating Notion page: {e}")
return {
"status": "error",
"message": f"Failed to create Notion page: {e!s}",
}
async def update_page(
self, page_id: str, content: str | None = None
) -> dict[str, Any]:
"""
Update an existing Notion page by appending new content.
Note: Content is appended to the page, not replaced.
Args:
page_id: Page ID to update
content: New markdown content to append to the page (optional)
Returns:
Dictionary with update result
Raises:
APIResponseError: If Notion API returns an error
"""
try:
notion = await self._get_client()
appended_block_ids = []
if content:
# Convert new content to blocks
try:
children = self._markdown_to_blocks(content)
if not children:
logger.warning(
"No blocks generated from content, skipping append"
)
return {
"status": "error",
"message": "Content conversion failed: no valid blocks generated",
}
except Exception as e:
logger.error(f"Failed to convert markdown to blocks: {e}")
return {
"status": "error",
"message": f"Failed to parse content: {e!s}",
}
# Append new content blocks
try:
for i in range(0, len(children), 100):
batch = children[i : i + 100]
response = await self._api_call_with_retry(
notion.blocks.children.append,
block_id=page_id,
children=batch,
)
batch_block_ids = [
block["id"] for block in response.get("results", [])
]
appended_block_ids.extend(batch_block_ids)
logger.info(
f"Successfully appended {len(children)} new blocks to page {page_id}"
)
logger.debug(
f"Appended block IDs: {appended_block_ids[:5]}..."
if len(appended_block_ids) > 5
else f"Appended block IDs: {appended_block_ids}"
)
except Exception as e:
logger.error(f"Failed to append content blocks: {e}")
return {
"status": "error",
"message": f"Failed to append content: {e!s}",
}
# Get updated page info
response = await self._api_call_with_retry(
notion.pages.retrieve, page_id=page_id
)
page_url = response["url"]
page_title = response["properties"]["title"]["title"][0]["text"]["content"]
return {
"status": "success",
"page_id": page_id,
"url": page_url,
"title": page_title,
"appended_block_ids": appended_block_ids,
"message": f"Updated Notion page '{page_title}' (content appended)",
}
except APIResponseError as e:
logger.error(f"Notion API error updating page: {e}")
error_msg = self._api_error_message(e)
return {
"status": "error",
"message": f"Failed to update Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error updating Notion page: {e}")
return {
"status": "error",
"message": f"Failed to update Notion page: {e!s}",
}
async def delete_page(self, page_id: str) -> dict[str, Any]:
"""
Delete (archive) a Notion page.
Note: Notion doesn't truly delete pages, it archives them.
Args:
page_id: Page ID to delete
Returns:
Dictionary with deletion result
Raises:
APIResponseError: If Notion API returns an error
"""
try:
notion = await self._get_client()
# Archive the page (Notion's way of "deleting")
response = await self._api_call_with_retry(
notion.pages.update, page_id=page_id, archived=True
)
page_title = "Unknown"
with contextlib.suppress(KeyError, IndexError):
page_title = response["properties"]["title"]["title"][0]["text"][
"content"
]
return {
"status": "success",
"page_id": page_id,
"message": f"Deleted Notion page '{page_title}'",
}
except APIResponseError as e:
logger.error(f"Notion API error deleting page: {e}")
error_msg = self._api_error_message(e)
return {
"status": "error",
"message": f"Failed to delete Notion page: {error_msg}",
}
except Exception as e:
logger.error(f"Unexpected error deleting Notion page: {e}")
return {
"status": "error",
"message": f"Failed to delete Notion page: {e!s}",
}

View file

@ -17,7 +17,6 @@ from sqlalchemy.future import select
from app.config import config
from app.db import SearchSourceConnector
from app.routes.slack_add_connector_route import refresh_slack_token
from app.schemas.slack_auth_credentials import SlackAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
@ -155,6 +154,8 @@ class SlackHistory:
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.slack_add_connector_route import refresh_slack_token
connector = await refresh_slack_token(self._session, connector)
# Reload credentials after refresh

View file

@ -16,7 +16,6 @@ from sqlalchemy.future import select
from app.config import config
from app.db import SearchSourceConnector
from app.routes.teams_add_connector_route import refresh_teams_token
from app.schemas.teams_auth_credentials import TeamsAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
@ -146,6 +145,8 @@ class TeamsConnector:
)
# Refresh token
# Lazy import to avoid circular dependency
from app.routes.teams_add_connector_route import refresh_teams_token
connector = await refresh_teams_token(self._session, connector)
# Reload credentials after refresh

View file

@ -55,6 +55,7 @@ class DocumentType(StrEnum):
GOOGLE_DRIVE_FILE = "GOOGLE_DRIVE_FILE"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
LUMA_CONNECTOR = "LUMA_CONNECTOR"
DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR"
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
CIRCLEBACK = "CIRCLEBACK"
@ -98,6 +99,7 @@ class SearchSourceConnectorType(StrEnum):
GOOGLE_DRIVE_CONNECTOR = "GOOGLE_DRIVE_CONNECTOR"
AIRTABLE_CONNECTOR = "AIRTABLE_CONNECTOR"
LUMA_CONNECTOR = "LUMA_CONNECTOR"
DEXSCREENER_CONNECTOR = "DEXSCREENER_CONNECTOR"
ELASTICSEARCH_CONNECTOR = "ELASTICSEARCH_CONNECTOR"
WEBCRAWLER_CONNECTOR = "WEBCRAWLER_CONNECTOR"
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
@ -2120,6 +2122,7 @@ engine = create_async_engine(
pool_recycle=1800,
pool_pre_ping=True,
pool_timeout=30,
connect_args={"ssl": False}, # Disable SSL for local development
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)

View file

@ -30,6 +30,7 @@ from .jira_add_connector_route import router as jira_add_connector_router
from .linear_add_connector_route import router as linear_add_connector_router
from .logs_routes import router as logs_router
from .luma_add_connector_route import router as luma_add_connector_router
from .dexscreener_add_connector_route import router as dexscreener_add_connector_router
from .memory_routes import router as memory_router
from .model_list_routes import router as model_list_router
from .new_chat_routes import router as new_chat_router
@ -80,6 +81,7 @@ router.include_router(google_drive_add_connector_router)
router.include_router(airtable_add_connector_router)
router.include_router(linear_add_connector_router)
router.include_router(luma_add_connector_router)
router.include_router(dexscreener_add_connector_router)
router.include_router(notion_add_connector_router)
router.include_router(slack_add_connector_router)
router.include_router(teams_add_connector_router)

View file

@ -1,5 +1,7 @@
import base64
import hashlib
import logging
import secrets
from datetime import UTC, datetime, timedelta
from uuid import UUID
@ -20,15 +22,12 @@ from app.db import (
)
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.users import current_active_user
from app.utils.airtable_token_utils import refresh_airtable_token
from app.utils.connector_naming import (
check_duplicate_connector,
generate_unique_connector_name,
)
from app.utils.oauth_security import (
OAuthStateManager,
TokenEncryption,
generate_pkce_pair,
)
from app.utils.oauth_security import OAuthStateManager, TokenEncryption
logger = logging.getLogger(__name__)
@ -77,6 +76,28 @@ def make_basic_auth_header(client_id: str, client_secret: str) -> str:
return f"Basic {b64}"
def generate_pkce_pair() -> tuple[str, str]:
"""
Generate PKCE code verifier and code challenge.
Returns:
Tuple of (code_verifier, code_challenge)
"""
# Generate code verifier (43-128 characters)
code_verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
)
# Generate code challenge (SHA256 hash of verifier, base64url encoded)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest())
.decode("utf-8")
.rstrip("=")
)
return code_verifier, code_challenge
@router.get("/auth/airtable/connector/add")
async def connect_airtable(space_id: int, user: User = Depends(current_active_user)):
"""
@ -179,7 +200,7 @@ async def airtable_callback(
# Redirect to frontend with error parameter
if space_id:
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=airtable_oauth_denied"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&error=airtable_oauth_denied"
)
else:
return RedirectResponse(
@ -296,7 +317,7 @@ async def airtable_callback(
f"Duplicate Airtable connector detected for user {user_id} with email {user_email}"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=airtable-connector"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&error=duplicate_account&connector=airtable-connector"
)
# Generate a unique, user-friendly connector name
@ -328,7 +349,7 @@ async def airtable_callback(
# Redirect to the frontend with success params for indexing config
# Using query params to auto-open the popup with config view on new-chat page
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=airtable-connector&connectorId={new_connector.id}"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=airtable-connector&connectorId={new_connector.id}"
)
except ValidationError as e:
@ -358,134 +379,3 @@ async def airtable_callback(
status_code=500, detail=f"Failed to complete Airtable OAuth: {e!s}"
) from e
async def refresh_airtable_token(
session: AsyncSession, connector: SearchSourceConnector
) -> SearchSourceConnector:
"""
Refresh the Airtable access token for a connector.
Args:
session: Database session
connector: Airtable connector to refresh
Returns:
Updated connector object
"""
try:
logger.info(f"Refreshing Airtable token for connector {connector.id}")
credentials = AirtableAuthCredentialsBase.from_dict(connector.config)
# Decrypt tokens if they are encrypted
token_encryption = get_token_encryption()
is_encrypted = connector.config.get("_token_encrypted", False)
refresh_token = credentials.refresh_token
if is_encrypted and refresh_token:
try:
refresh_token = token_encryption.decrypt_token(refresh_token)
except Exception as e:
logger.error(f"Failed to decrypt refresh token: {e!s}")
raise HTTPException(
status_code=500, detail="Failed to decrypt stored refresh token"
) from e
if not refresh_token:
raise HTTPException(
status_code=400,
detail="No refresh token available. Please re-authenticate.",
)
auth_header = make_basic_auth_header(
config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET
)
# Prepare token refresh data
refresh_data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": config.AIRTABLE_CLIENT_ID,
"client_secret": config.AIRTABLE_CLIENT_SECRET,
}
async with httpx.AsyncClient() as client:
token_response = await client.post(
TOKEN_URL,
data=refresh_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_header,
},
timeout=30.0,
)
if token_response.status_code != 200:
error_detail = token_response.text
error_code = ""
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
error_code = error_json.get("error", "")
except Exception:
pass
# Check if this is a token expiration/revocation error
error_lower = (error_detail + error_code).lower()
if (
"invalid_grant" in error_lower
or "expired" in error_lower
or "revoked" in error_lower
):
raise HTTPException(
status_code=401,
detail="Airtable authentication failed. Please re-authenticate.",
)
raise HTTPException(
status_code=400, detail=f"Token refresh failed: {error_detail}"
)
token_json = token_response.json()
# Calculate expiration time (UTC, tz-aware)
expires_at = None
if token_json.get("expires_in"):
now_utc = datetime.now(UTC)
expires_at = now_utc + timedelta(seconds=int(token_json["expires_in"]))
# Encrypt new tokens before storing
access_token = token_json.get("access_token")
new_refresh_token = token_json.get("refresh_token")
if not access_token:
raise HTTPException(
status_code=400, detail="No access token received from Airtable refresh"
)
# Update credentials object with encrypted tokens
credentials.access_token = token_encryption.encrypt_token(access_token)
if new_refresh_token:
credentials.refresh_token = token_encryption.encrypt_token(
new_refresh_token
)
credentials.expires_in = token_json.get("expires_in")
credentials.expires_at = expires_at
credentials.scope = token_json.get("scope")
# Update connector config with encrypted tokens
credentials_dict = credentials.to_dict()
credentials_dict["_token_encrypted"] = True
connector.config = credentials_dict
await session.commit()
await session.refresh(connector)
logger.info(
f"Successfully refreshed Airtable token for connector {connector.id}"
)
return connector
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to refresh Airtable token: {e!s}"
) from e

View file

@ -0,0 +1,312 @@
import logging
import re
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.users import current_active_user
logger = logging.getLogger(__name__)
router = APIRouter()
class TokenConfig(BaseModel):
"""Configuration for a single token to track."""
chain: str = Field(..., description="Blockchain network (e.g., ethereum, bsc, solana)", pattern=r"^[a-z0-9-]+$")
address: str = Field(..., description="Token contract address")
name: str | None = Field(None, description="Optional token name for display")
@field_validator("address")
@classmethod
def validate_address(cls, v: str) -> str:
"""Validate token address format (EVM or Solana)."""
# EVM address: 0x + 40 hex characters
if v.startswith("0x"):
if not re.match(r"^0x[a-fA-F0-9]{40}$", v):
raise ValueError("Invalid EVM address format. Must be 0x followed by 40 hex characters.")
return v
# Solana address: 32-44 base58 characters
if len(v) < 32 or len(v) > 44:
raise ValueError("Invalid Solana address format. Must be 32-44 characters.")
# Allow base58 chars only for Solana
if not re.match(r"^[1-9A-HJ-NP-Za-km-z]{32,44}$", v):
raise ValueError("Invalid Solana address format. Contains invalid characters.")
return v
class AddDexScreenerConnectorRequest(BaseModel):
"""Request model for adding a DexScreener connector."""
tokens: list[TokenConfig] = Field(
..., description="List of tokens to track (max 50)", min_length=1, max_length=50
)
space_id: int = Field(..., description="Search space ID")
@router.post("/connectors/dexscreener/add")
async def add_dexscreener_connector(
request: AddDexScreenerConnectorRequest,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Add a new DexScreener connector for the authenticated user.
Args:
request: The request containing tokens configuration and space_id
user: Current authenticated user
session: Database session
Returns:
Success message and connector details
Raises:
HTTPException: If connector already exists or validation fails
"""
try:
# Check if a DexScreener connector already exists for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == request.space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
existing_connector = result.scalars().first()
# Convert tokens to dict format for storage
tokens_config = [token.model_dump() for token in request.tokens]
if existing_connector:
# Update existing connector with new tokens
existing_connector.config = {"tokens": tokens_config}
existing_connector.is_indexable = True
await session.commit()
await session.refresh(existing_connector)
logger.info(
f"Updated existing DexScreener connector for user {user.id} in space {request.space_id}"
)
return {
"message": "DexScreener connector updated successfully",
"connector_id": existing_connector.id,
"connector_type": "DEXSCREENER_CONNECTOR",
"tokens_count": len(tokens_config),
}
# Create new DexScreener connector
db_connector = SearchSourceConnector(
name="DexScreener Connector",
connector_type=SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
config={"tokens": tokens_config},
search_space_id=request.space_id,
user_id=user.id,
is_indexable=True,
)
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
logger.info(
f"Successfully created DexScreener connector for user {user.id} with ID {db_connector.id}"
)
return {
"message": "DexScreener connector added successfully",
"connector_id": db_connector.id,
"connector_type": "DEXSCREENER_CONNECTOR",
"tokens_count": len(tokens_config),
}
except IntegrityError as e:
await session.rollback()
logger.error(f"Database integrity error: {e!s}")
raise HTTPException(
status_code=409,
detail="A DexScreener connector already exists for this user.",
) from e
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error adding DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to add DexScreener connector: {e!s}",
) from e
@router.delete("/connectors/dexscreener")
async def delete_dexscreener_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Delete the DexScreener connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
Returns:
Success message
Raises:
HTTPException: If connector doesn't exist
"""
try:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404,
detail="DexScreener connector not found for this user.",
)
await session.delete(connector)
await session.commit()
logger.info(f"Successfully deleted DexScreener connector for user {user.id}")
return {"message": "DexScreener connector deleted successfully"}
except HTTPException:
raise
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error deleting DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to delete DexScreener connector: {e!s}",
) from e
@router.get("/connectors/dexscreener/test")
async def test_dexscreener_connector(
space_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""
Test the DexScreener connector for the authenticated user in a specific search space.
Args:
space_id: Search space ID
user: Current authenticated user
session: Database session
Returns:
Test results including token count and sample pair data
Raises:
HTTPException: If connector doesn't exist or test fails
"""
try:
# Get the DexScreener connector for this search space and user
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DEXSCREENER_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404,
detail="DexScreener connector not found. Please add a connector first.",
)
# Import DexScreenerConnector
from app.connectors.dexscreener_connector import DexScreenerConnector
# Initialize the connector
tokens = connector.config.get("tokens", [])
if not tokens:
raise HTTPException(
status_code=400,
detail="Invalid connector configuration: No tokens configured.",
)
dexscreener = DexScreenerConnector()
# Test the connection by fetching pairs for the first token
first_token = tokens[0]
chain = first_token.get("chain")
address = first_token.get("address")
token_name = first_token.get("name", "Unknown")
if not chain or not address:
raise HTTPException(
status_code=400,
detail="Invalid token configuration: Missing chain or address.",
)
# Try to fetch pairs for the first token
pairs, error = await dexscreener.get_token_pairs(chain, address)
if error:
raise HTTPException(
status_code=400,
detail=f"Failed to connect to DexScreener: {error}",
)
# Get sample pair info if available
sample_pair = None
if pairs and len(pairs) > 0:
pair = pairs[0]
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
sample_pair = {
"pair_address": pair.get("pairAddress"),
"base_symbol": base_token.get("symbol", "Unknown"),
"quote_symbol": quote_token.get("symbol", "Unknown"),
"dex": pair.get("dexId", "Unknown"),
"price_usd": pair.get("priceUsd", "N/A"),
"liquidity_usd": pair.get("liquidity", {}).get("usd", 0),
}
return {
"message": "DexScreener connector is working correctly",
"tokens_configured": len(tokens),
"test_token": {
"name": token_name,
"chain": chain,
"address": address,
},
"pairs_found": len(pairs) if pairs else 0,
"sample_pair": sample_pair,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error testing DexScreener connector: {e!s}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to test DexScreener connector: {e!s}",
) from e

View file

@ -1106,6 +1106,17 @@ async def index_connector_content(
)
response_message = "Luma indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DEXSCREENER_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import index_dexscreener_pairs_task
logger.info(
f"Triggering DexScreener indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}"
)
index_dexscreener_pairs_task.delay(
connector_id, search_space_id, str(user.id), indexing_from, indexing_to
)
response_message = "DexScreener indexing started in the background."
elif (
connector.connector_type
== SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR

View file

@ -2758,6 +2758,135 @@ class ConnectorService:
return result_object, obsidian_docs
async def search_dexscreener(
self,
user_query: str,
search_space_id: int,
top_k: int = 20,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> tuple:
"""
Search for DexScreener cryptocurrency trading pair data and return both the source information and langchain documents.
Uses combined chunk-level and document-level hybrid search with RRF fusion.
Args:
user_query: The user's query
search_space_id: The search space ID to search in
top_k: Maximum number of results to return
start_date: Optional start date for filtering documents by updated_at
end_date: Optional end date for filtering documents by updated_at
Returns:
tuple: (sources_info, langchain_documents)
"""
dexscreener_docs = await self._combined_rrf_search(
query_text=user_query,
search_space_id=search_space_id,
document_type="DEXSCREENER_CONNECTOR",
top_k=top_k,
start_date=start_date,
end_date=end_date,
)
# Early return if no results
if not dexscreener_docs:
return {
"id": 54,
"name": "DexScreener",
"type": "DEXSCREENER_CONNECTOR",
"sources": [],
}, []
def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
# Extract token and chain info from metadata
base_token = metadata.get("base_symbol", "")
quote_token = metadata.get("quote_symbol", "")
chain = metadata.get("chain_id", "")
dex = metadata.get("dex", "")
if base_token and quote_token:
title = f"{base_token}/{quote_token}"
if chain:
title += f" on {chain.capitalize()}"
if dex:
title += f" ({dex})"
return title
return doc_info.get("title", "DexScreener Trading Pair")
def _url_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str:
# DexScreener URL format: https://dexscreener.com/{chain}/{pair_address}
chain = metadata.get("chain_id", "")
pair_address = metadata.get("pair_address", "")
if chain and pair_address:
return f"https://dexscreener.com/{chain}/{pair_address}"
return ""
def _description_fn(
chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> str:
# Build a rich description with price and volume info
description_parts = []
price_usd = metadata.get("price_usd")
if price_usd:
description_parts.append(f"Price: ${price_usd}")
volume_24h = metadata.get("volume_24h")
if volume_24h:
description_parts.append(f"24h Volume: ${volume_24h}")
price_change_24h = metadata.get("price_change_24h")
if price_change_24h is not None:
sign = "+" if price_change_24h > 0 else ""
description_parts.append(f"24h Change: {sign}{price_change_24h}%")
liquidity_usd = metadata.get("liquidity_usd")
if liquidity_usd:
description_parts.append(f"Liquidity: ${liquidity_usd}")
if description_parts:
return " | ".join(description_parts)
# Fallback to chunk content preview
return self._chunk_preview(chunk.get("content", ""), limit=200)
def _extra_fields_fn(
_chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
return {
"chain_id": metadata.get("chain_id", ""),
"dex": metadata.get("dex", ""),
"pair_address": metadata.get("pair_address", ""),
"base_symbol": metadata.get("base_symbol", ""),
"quote_symbol": metadata.get("quote_symbol", ""),
"price_usd": metadata.get("price_usd"),
"volume_24h": metadata.get("volume_24h"),
"price_change_24h": metadata.get("price_change_24h"),
"liquidity_usd": metadata.get("liquidity_usd"),
}
sources_list = self._build_chunk_sources_from_documents(
dexscreener_docs,
title_fn=_title_fn,
url_fn=_url_fn,
description_fn=_description_fn,
extra_fields_fn=_extra_fields_fn,
)
# Create result object
result_object = {
"id": 54,
"name": "DexScreener",
"type": "DEXSCREENER_CONNECTOR",
"sources": sources_list,
}
return result_object, dexscreener_docs
# =========================================================================
# Utility Methods for Connector Discovery
# =========================================================================

View file

@ -968,3 +968,47 @@ async def _index_composio_connector(
await run_composio_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
@celery_app.task(name="index_dexscreener_pairs", bind=True)
def index_dexscreener_pairs_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Celery task to index DexScreener trading pairs."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_dexscreener_pairs(
connector_id, search_space_id, user_id, start_date, end_date
)
)
finally:
loop.close()
async def _index_dexscreener_pairs(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""Index DexScreener pairs with new session."""
from app.tasks.connector_indexers.dexscreener_indexer import (
index_dexscreener_pairs,
)
async with get_celery_session_maker()() as session:
await index_dexscreener_pairs(
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -57,6 +57,7 @@ async def _check_and_trigger_schedules():
index_clickup_tasks_task,
index_confluence_pages_task,
index_crawled_urls_task,
index_dexscreener_pairs_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
@ -84,6 +85,7 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.DEXSCREENER_CONNECTOR: index_dexscreener_pairs_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,

View file

@ -42,9 +42,10 @@ from .jira_indexer import index_jira_issues
# Issue tracking and project management
from .linear_indexer import index_linear_issues
from .luma_indexer import index_luma_events
from .dexscreener_indexer import index_dexscreener_pairs
# Documentation and knowledge management
from .luma_indexer import index_luma_events
from .notion_indexer import index_notion_pages
from .obsidian_indexer import index_obsidian_vault
from .slack_indexer import index_slack_messages
@ -74,4 +75,5 @@ __all__ = [ # noqa: RUF022
# Communication platforms
"index_slack_messages",
"index_google_gmail_messages",
"index_dexscreener_pairs",
]

View file

@ -0,0 +1,417 @@
"""
DexScreener connector indexer.
"""
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.connectors.dexscreener_connector import DexScreenerConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
update_connector_last_indexed,
)
async def index_dexscreener_pairs(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index DexScreener trading pairs.
Args:
session: Database session
connector_id: ID of the DexScreener connector
search_space_id: ID of the search space to store documents in
user_id: User ID
start_date: Not used for DexScreener (included for consistency with other indexers)
end_date: Not used for DexScreener (included for consistency with other indexers)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
# Log task start
log_entry = await task_logger.log_task_start(
task_name="dexscreener_pairs_indexing",
source="connector_indexing_task",
message=f"Starting DexScreener pairs indexing for connector {connector_id}",
metadata={
"connector_id": connector_id,
"user_id": str(user_id),
},
)
try:
# Get the connector
await task_logger.log_task_progress(
log_entry,
f"Retrieving DexScreener connector {connector_id} from database",
{"stage": "connector_retrieval"},
)
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.DEXSCREENER_CONNECTOR
)
if not connector:
await task_logger.log_task_failure(
log_entry,
f"Connector with ID {connector_id} not found or is not a DexScreener connector",
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return (
0,
f"Connector with ID {connector_id} not found or is not a DexScreener connector",
)
# Get the tokens list from the connector config
tokens = connector.config.get("tokens", [])
if not tokens:
await task_logger.log_task_failure(
log_entry,
f"No tokens configured for connector {connector_id}",
"Missing token configuration",
{"error_type": "MissingConfiguration"},
)
return 0, "No tokens configured for connector"
logger.info(f"Starting DexScreener indexing for connector {connector_id} with {len(tokens)} tokens")
# Initialize DexScreener client
await task_logger.log_task_progress(
log_entry,
f"Initializing DexScreener client for connector {connector_id}",
{"stage": "client_initialization"},
)
dexscreener_client = DexScreenerConnector()
documents_indexed = 0
documents_skipped = 0
skipped_pairs = []
batch_size = 10 # Commit every 10 documents for performance
# Process each tracked token
for token_idx, token in enumerate(tokens):
try:
chain = token.get("chain")
address = token.get("address")
token_name = token.get("name", "")
if not chain or not address:
logger.warning(f"Skipping token with missing chain or address: {token}")
continue
await task_logger.log_task_progress(
log_entry,
f"Fetching pairs for {token_name or address} on {chain} ({token_idx + 1}/{len(tokens)})",
{
"stage": "fetching_pairs",
"token": token_name or address,
"chain": chain,
"progress": f"{token_idx + 1}/{len(tokens)}",
},
)
# Get trading pairs for this token
pairs, error = await dexscreener_client.get_token_pairs(chain, address)
if error:
logger.warning(f"Error fetching pairs for {chain}/{address}: {error}")
skipped_pairs.append(f"{token_name or address} ({error})")
continue
if not pairs:
logger.info(f"No pairs found for {chain}/{address}")
continue
logger.info(f"Retrieved {len(pairs)} pairs for {token_name or address} on {chain}")
# Process each pair
for pair in pairs:
try:
pair_address = pair.get("pairAddress")
if not pair_address:
logger.warning(f"Skipping pair with missing pairAddress")
documents_skipped += 1
continue
# Format pair to markdown
pair_markdown = dexscreener_client.format_pair_to_markdown(pair, token_name)
if not pair_markdown.strip():
logger.warning(f"Skipping pair with no content: {pair_address}")
documents_skipped += 1
continue
# Extract pair metadata
base_token = pair.get("baseToken", {})
quote_token = pair.get("quoteToken", {})
base_symbol = base_token.get("symbol", "Unknown")
quote_symbol = quote_token.get("symbol", "Unknown")
dex_id = pair.get("dexId", "Unknown")
price_usd = pair.get("priceUsd", "N/A")
liquidity_usd = pair.get("liquidity", {}).get("usd", 0)
volume_24h = pair.get("volume", {}).get("h24", 0)
price_change_24h = pair.get("priceChange", {}).get("h24", 0)
# Generate unique identifier hash for this pair
# Use chain + pair_address as unique identifier
unique_id = f"{chain}:{pair_address}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DEXSCREENER_CONNECTOR, unique_id, search_space_id
)
# Generate content hash
content_hash = generate_content_hash(pair_markdown, search_space_id)
# Check if document with this unique identifier already exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for pair {base_symbol}/{quote_symbol} unchanged. Skipping."
)
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for pair {base_symbol}/{quote_symbol}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"document_type": "DexScreener Trading Pair",
"connector_type": "DexScreener",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
pair_markdown, user_llm, document_metadata
)
else:
summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n"
summary_content += f"Chain: {chain}\n"
summary_content += f"DEX: {dex_id}\n"
summary_content += f"Pair Address: {pair_address}\n"
summary_content += f"Price (USD): ${price_usd}\n"
summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n"
summary_content += f"24h Volume: ${volume_24h:,.2f}\n"
summary_content += f"24h Change: {price_change_24h:+.2f}%\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(pair_markdown)
# Update existing document
existing_document.title = f"DexScreener - {base_symbol}/{quote_symbol} on {chain}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"token_name": token_name,
"token_address": address,
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(f"Updated document for pair {base_symbol}/{quote_symbol}")
else:
# New document - create it
logger.info(f"Creating new document for pair {base_symbol}/{quote_symbol}")
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"document_type": "DexScreener Trading Pair",
"connector_type": "DexScreener",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
pair_markdown, user_llm, document_metadata
)
else:
summary_content = f"DexScreener Pair: {base_symbol}/{quote_symbol}\n\n"
summary_content += f"Chain: {chain}\n"
summary_content += f"DEX: {dex_id}\n"
summary_content += f"Pair Address: {pair_address}\n"
summary_content += f"Price (USD): ${price_usd}\n"
summary_content += f"Liquidity: ${liquidity_usd:,.2f}\n"
summary_content += f"24h Volume: ${volume_24h:,.2f}\n"
summary_content += f"24h Change: {price_change_24h:+.2f}%\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(pair_markdown)
# Create new document
new_document = Document(
title=f"DexScreener - {base_symbol}/{quote_symbol} on {chain}",
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
document_type=DocumentType.DEXSCREENER_CONNECTOR,
document_metadata={
"pair_address": pair_address,
"chain_id": chain,
"dex": dex_id,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"price_usd": price_usd,
"liquidity_usd": liquidity_usd,
"volume_24h": volume_24h,
"price_change_24h": price_change_24h,
"token_name": token_name,
"token_address": address,
},
chunks=chunks,
search_space_id=search_space_id,
created_at=get_current_timestamp(),
updated_at=get_current_timestamp(),
)
session.add(new_document)
documents_indexed += 1
logger.info(f"Created new document for pair {base_symbol}/{quote_symbol}")
# Batch commit every N documents
if documents_indexed % batch_size == 0:
await session.commit()
logger.info(f"Committed batch of {batch_size} documents")
except Exception as e:
logger.error(f"Error processing pair {pair.get('pairAddress', 'unknown')}: {e!s}", exc_info=True)
documents_skipped += 1
continue
except Exception as e:
logger.error(f"Error processing token {token.get('name', token.get('address', 'unknown'))}: {e!s}", exc_info=True)
continue
# Final commit for any remaining documents
if documents_indexed % batch_size != 0:
await session.commit()
logger.info(f"Committed final batch of documents")
# Update last_indexed_at timestamp
if update_last_indexed:
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Log task completion
await task_logger.log_task_success(
log_entry,
f"Successfully indexed {documents_indexed} DexScreener pairs",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"tokens_processed": len(tokens),
},
)
logger.info(
f"DexScreener indexing completed: {documents_indexed} documents indexed, {documents_skipped} skipped"
)
return documents_indexed, None
except SQLAlchemyError as e:
await session.rollback()
logger.error(f"Database error during DexScreener indexing: {e!s}", exc_info=True)
await task_logger.log_task_failure(
log_entry,
f"Database error: {e!s}",
"Database Error",
{"error_type": "DatabaseError"},
)
return 0, f"Database error: {e!s}"
except Exception as e:
await session.rollback()
logger.error(f"Unexpected error during DexScreener indexing: {e!s}", exc_info=True)
await task_logger.log_task_failure(
log_entry,
f"Unexpected error: {e!s}",
"Unexpected Error",
{"error_type": "UnexpectedError"},
)
return 0, f"Unexpected error: {e!s}"

View file

@ -0,0 +1,157 @@
"""
Airtable token refresh utilities.
This module contains shared utilities for refreshing Airtable OAuth tokens.
Extracted from routes to avoid circular imports.
"""
import base64
import logging
from datetime import UTC, datetime, timedelta
import httpx
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.db import SearchSourceConnector
from app.schemas.airtable_auth_credentials import AirtableAuthCredentialsBase
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
# Airtable OAuth token endpoint
TOKEN_URL = "https://airtable.com/oauth2/v1/token"
def make_basic_auth_header(client_id: str, client_secret: str) -> str:
"""Create HTTP Basic authentication header."""
credentials = f"{client_id}:{client_secret}".encode()
b64 = base64.b64encode(credentials).decode("ascii")
return f"Basic {b64}"
def get_token_encryption() -> TokenEncryption:
"""Get or create token encryption instance."""
if not config.SECRET_KEY:
raise ValueError("SECRET_KEY must be set for token encryption")
return TokenEncryption(config.SECRET_KEY)
async def refresh_airtable_token(
session: AsyncSession, connector: SearchSourceConnector
) -> SearchSourceConnector:
"""
Refresh the Airtable access token for a connector.
Args:
session: Database session
connector: Airtable connector to refresh
Returns:
Updated connector object
"""
try:
logger.info(f"Refreshing Airtable token for connector {connector.id}")
credentials = AirtableAuthCredentialsBase.from_dict(connector.config)
# Decrypt tokens if they are encrypted
token_encryption = get_token_encryption()
is_encrypted = connector.config.get("_token_encrypted", False)
refresh_token = credentials.refresh_token
if is_encrypted and refresh_token:
try:
refresh_token = token_encryption.decrypt_token(refresh_token)
except Exception as e:
logger.error(f"Failed to decrypt refresh token: {e!s}")
raise HTTPException(
status_code=500, detail="Failed to decrypt stored refresh token"
) from e
if not refresh_token:
raise HTTPException(
status_code=400,
detail="No refresh token available. Please re-authenticate.",
)
auth_header = make_basic_auth_header(
config.AIRTABLE_CLIENT_ID, config.AIRTABLE_CLIENT_SECRET
)
# Prepare token refresh data
refresh_data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": config.AIRTABLE_CLIENT_ID,
"client_secret": config.AIRTABLE_CLIENT_SECRET,
}
async with httpx.AsyncClient() as client:
token_response = await client.post(
TOKEN_URL,
data=refresh_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_header,
},
timeout=30.0,
)
if token_response.status_code != 200:
error_detail = token_response.text
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
except Exception:
pass
raise HTTPException(
status_code=400, detail=f"Token refresh failed: {error_detail}"
)
token_json = token_response.json()
# Calculate expiration time (UTC, tz-aware)
expires_at = None
if token_json.get("expires_in"):
now_utc = datetime.now(UTC)
expires_at = now_utc + timedelta(seconds=int(token_json["expires_in"]))
# Encrypt new tokens before storing
access_token = token_json.get("access_token")
new_refresh_token = token_json.get("refresh_token")
if not access_token:
raise HTTPException(
status_code=400, detail="No access token received from Airtable refresh"
)
# Update credentials object with encrypted tokens
credentials.access_token = token_encryption.encrypt_token(access_token)
if new_refresh_token:
credentials.refresh_token = token_encryption.encrypt_token(
new_refresh_token
)
credentials.expires_in = token_json.get("expires_in")
credentials.expires_at = expires_at
credentials.scope = token_json.get("scope")
# Update connector config with encrypted tokens
credentials_dict = credentials.to_dict()
credentials_dict["_token_encrypted"] = True
connector.config = credentials_dict
await session.commit()
await session.refresh(connector)
logger.info(
f"Successfully refreshed Airtable token for connector {connector.id}"
)
return connector
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to refresh Airtable token: {e!s}"
) from e

View file

@ -485,6 +485,56 @@ def validate_connector_config(
if not validators.url(url):
raise ValueError(f"Invalid URL format in INITIAL_URLS: {url}")
def validate_dexscreener_tokens() -> None:
"""Validate DexScreener tokens configuration."""
tokens = config.get("tokens")
if not isinstance(tokens, list) or not tokens:
raise ValueError("tokens must be a non-empty list")
# Valid blockchain names supported by DexScreener
valid_chains = [
"ethereum", "bsc", "polygon", "arbitrum", "optimism", "base",
"solana", "avalanche", "fantom", "cronos", "moonbeam", "moonriver",
"celo", "aurora", "harmony", "metis", "boba", "fuse", "okex",
"heco", "elastos", "telos", "iotex", "thundercore", "tomochain",
"velas", "wanchain", "kardia", "pulsechain", "dogechain", "evmos",
"kava", "step", "godwoken", "milkomeda", "dfk", "swimmer", "rei",
"vision", "smartbch", "redlight", "astar", "shiden", "clover",
"bitgert", "sx", "oasis", "energi", "tombchain", "canto", "kcc",
"ethw", "ethf", "core", "zksync", "polygonzkevm", "linea", "scroll",
"mantle", "manta", "blast", "mode", "xlayer", "merlin", "zkfair",
"opbnb", "taiko", "zeta", "sei", "berachain"
]
for i, token in enumerate(tokens):
if not isinstance(token, dict):
raise ValueError(f"tokens[{i}] must be a dictionary")
# Validate required fields
if "chain" not in token:
raise ValueError(f"tokens[{i}] must have 'chain' field")
if "address" not in token:
raise ValueError(f"tokens[{i}] must have 'address' field")
# Validate chain is valid
chain = token["chain"]
if not isinstance(chain, str) or chain.lower() not in valid_chains:
raise ValueError(
f"tokens[{i}].chain must be one of the supported blockchains. "
f"Got: {chain}. See DexScreener documentation for valid chains."
)
# Validate address format (basic check)
address = token["address"]
if not isinstance(address, str) or not address.strip():
raise ValueError(f"tokens[{i}].address cannot be empty")
# Optional: validate name field if present
if "name" in token:
name = token["name"]
if not isinstance(name, str):
raise ValueError(f"tokens[{i}].name must be a string if provided")
# Lookup table for connector validation rules
connector_rules = {
"SERPER_API": {"required": ["SERPER_API_KEY"], "validators": {}},
@ -578,6 +628,12 @@ def validate_connector_config(
"INITIAL_URLS": lambda: validate_initial_urls(),
},
},
"DEXSCREENER_CONNECTOR": {
"required": ["tokens"],
"validators": {
"tokens": lambda: validate_dexscreener_tokens()
},
},
}
rules = connector_rules.get(connector_type_str)