From d895ac0fba381a984bdef8cea83111ede2a313f4 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Wed, 15 Apr 2026 22:19:41 +0530 Subject: [PATCH] feat: add mcp server --- api/.env.example | 3 + api/Dockerfile | 4 + api/app.py | 40 +++++---- api/logging_config.py | 3 + api/mcp/__init__.py | 3 + api/mcp/auth.py | 25 ++++++ api/mcp/server.py | 6 ++ api/mcp/tools/__init__.py | 0 api/mcp/tools/docs.py | 115 ++++++++++++++++++++++++++ api/mcp/tools/workflows.py | 47 +++++++++++ api/requirements.txt | 6 +- api/services/filesystem/minio.py | 45 +++++----- api/services/storage.py | 52 ++++-------- docker-compose.yaml | 4 + scripts/setup_custom_domain.sh | 6 ++ scripts/setup_remote.sh | 3 + ui/src/app/settings/page.tsx | 14 ++++ ui/src/components/MCPSection.tsx | 138 +++++++++++++++++++++++++++++++ 18 files changed, 440 insertions(+), 74 deletions(-) create mode 100644 api/mcp/__init__.py create mode 100644 api/mcp/auth.py create mode 100644 api/mcp/server.py create mode 100644 api/mcp/tools/__init__.py create mode 100644 api/mcp/tools/docs.py create mode 100644 api/mcp/tools/workflows.py create mode 100644 ui/src/components/MCPSection.tsx diff --git a/api/.env.example b/api/.env.example index 80f4020..dc4d66b 100644 --- a/api/.env.example +++ b/api/.env.example @@ -22,6 +22,9 @@ ENABLE_AWS_S3="false" # MinIO Configuration if using containerised MinIO instead of # AWS S3 MINIO_ENDPOINT=localhost:9000 +# Full URL (with scheme) that browsers use to reach MinIO. Required. +# Remote deployments behind HTTPS: set to e.g. https://your-server.example.com +MINIO_PUBLIC_ENDPOINT=http://localhost:9000 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_BUCKET=voice-audio diff --git a/api/Dockerfile b/api/Dockerfile index 09a8df7..21f5f88 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -65,6 +65,10 @@ ENV PYTHONUNBUFFERED=1 COPY ./api ./api COPY ./scripts/start_services_dev.sh ./scripts/start_services_dev.sh +# Product documentation — read at runtime by the MCP docs tools +# (search_dograh_docs / fetch_dograh_doc) so agents can learn Dograh. +COPY ./docs ./docs + ENV PYTHONPATH=/app # Disable file logging in Docker - logs go to stdout for docker logs diff --git a/api/app.py b/api/app.py index 1a2a7c8..70657ef 100644 --- a/api/app.py +++ b/api/app.py @@ -27,6 +27,7 @@ from fastapi.middleware.cors import CORSMiddleware from loguru import logger from api.constants import REDIS_URL +from api.mcp import mcp from api.routes.main import router as main_router from api.services.pipecat.tracing_config import ( handle_langfuse_sync, @@ -41,29 +42,32 @@ from api.tasks.arq import get_arq_redis API_PREFIX = "/api/v1" +mcp_app = mcp.http_app(path="/", stateless_http=True) + @asynccontextmanager async def lifespan(app: FastAPI): - # warmup arq pool - await get_arq_redis() + async with mcp_app.lifespan(app): + # warmup arq pool + await get_arq_redis() - # Pre-register all org-specific Langfuse exporters so they're ready - # before any pipeline runs, without per-call DB lookups. - await load_all_org_langfuse_credentials() + # Pre-register all org-specific Langfuse exporters so they're ready + # before any pipeline runs, without per-call DB lookups. + await load_all_org_langfuse_credentials() - # Start cross-worker sync manager so config changes propagate to all workers - sync_manager = WorkerSyncManager(REDIS_URL) - sync_manager.register( - WorkerSyncEventType.LANGFUSE_CREDENTIALS, handle_langfuse_sync - ) - await sync_manager.start() - set_worker_sync_manager(sync_manager) + # Start cross-worker sync manager so config changes propagate to all workers + sync_manager = WorkerSyncManager(REDIS_URL) + sync_manager.register( + WorkerSyncEventType.LANGFUSE_CREDENTIALS, handle_langfuse_sync + ) + await sync_manager.start() + set_worker_sync_manager(sync_manager) - yield # Run app + yield # Run app - # Shutdown sequence - this runs when FastAPI is shutting down - logger.info("Starting graceful shutdown...") - await sync_manager.stop() + # Shutdown sequence - this runs when FastAPI is shutting down + logger.info("Starting graceful shutdown...") + await sync_manager.stop() app = FastAPI( @@ -95,3 +99,7 @@ api_router.include_router(main_router) # main router with api prefix app.include_router(api_router, prefix=API_PREFIX) + +# Mount the MCP server — agents reach it at /mcp over Streamable HTTP, +# authenticating with the same X-API-Key header used by the REST API. +app.mount("/mcp", mcp_app) diff --git a/api/logging_config.py b/api/logging_config.py index b14e894..bd4649f 100644 --- a/api/logging_config.py +++ b/api/logging_config.py @@ -121,4 +121,7 @@ def setup_logging(): logging_logger.setLevel(logging.INFO) logging_logger.propagate = False + # MCP SDK logs a line per request lifecycle event; child loggers inherit. + logging.getLogger("mcp").setLevel(logging.WARNING) + _logging_initialized = True diff --git a/api/mcp/__init__.py b/api/mcp/__init__.py new file mode 100644 index 0000000..72dbd23 --- /dev/null +++ b/api/mcp/__init__.py @@ -0,0 +1,3 @@ +from api.mcp.server import mcp + +__all__ = ["mcp"] diff --git a/api/mcp/auth.py b/api/mcp/auth.py new file mode 100644 index 0000000..00fcbec --- /dev/null +++ b/api/mcp/auth.py @@ -0,0 +1,25 @@ +from fastapi import HTTPException +from fastmcp.server.dependencies import get_http_headers + +from api.db.models import UserModel +from api.services.auth.depends import _handle_api_key_auth + + +async def authenticate_mcp_request() -> UserModel: + """Resolve the authenticated Dograh user for an MCP tool invocation. + + Accepts either `X-API-Key: ` or `Authorization: Bearer `, + reusing the API-key flow from `api.services.auth.depends`. + """ + headers = get_http_headers() + api_key = headers.get("x-api-key") + if not api_key: + auth = headers.get("authorization", "") + if auth.lower().startswith("bearer "): + api_key = auth.split(" ", 1)[1].strip() + if not api_key: + raise HTTPException( + status_code=401, + detail="Missing API key — send X-API-Key or Authorization: Bearer ", + ) + return await _handle_api_key_auth(api_key) diff --git a/api/mcp/server.py b/api/mcp/server.py new file mode 100644 index 0000000..91fcfda --- /dev/null +++ b/api/mcp/server.py @@ -0,0 +1,6 @@ +from fastmcp import FastMCP + +mcp = FastMCP("dograh") + +from api.mcp.tools import docs as _docs # noqa: E402, F401 +from api.mcp.tools import workflows as _workflows # noqa: E402, F401 diff --git a/api/mcp/tools/__init__.py b/api/mcp/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/mcp/tools/docs.py b/api/mcp/tools/docs.py new file mode 100644 index 0000000..6e81cca --- /dev/null +++ b/api/mcp/tools/docs.py @@ -0,0 +1,115 @@ +import re +from functools import lru_cache +from pathlib import Path + +from fastapi import HTTPException +from rank_bm25 import BM25Okapi + +from api.mcp.server import mcp + +DOCS_ROOT = Path(__file__).resolve().parents[3] / "docs" + +_TOKEN_RE = re.compile(r"[A-Za-z0-9_]+") +_FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---\n", re.DOTALL) +_TITLE_RE = re.compile(r"^title:\s*['\"]?(.+?)['\"]?\s*$", re.MULTILINE) +_H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE) + + +def _tokenize(text: str) -> list[str]: + return [t.lower() for t in _TOKEN_RE.findall(text)] + + +def _extract_title(path: Path, body: str) -> str: + fm_match = _FRONTMATTER_RE.match(body) + if fm_match: + title_match = _TITLE_RE.search(fm_match.group(1)) + if title_match: + return title_match.group(1).strip() + h1_match = _H1_RE.search(body) + if h1_match: + return h1_match.group(1).strip() + return path.stem.replace("-", " ").title() + + +def _strip_frontmatter(body: str) -> str: + return _FRONTMATTER_RE.sub("", body, count=1) + + +@lru_cache(maxsize=1) +def _load_index() -> tuple[list[dict], BM25Okapi]: + """Read every docs/**/*.mdx file once and build a BM25 index. + + Cached for the process lifetime — docs rarely change between restarts. + """ + docs: list[dict] = [] + corpus: list[list[str]] = [] + + for path in sorted(DOCS_ROOT.rglob("*.mdx")): + body = path.read_text(encoding="utf-8") + rel = path.relative_to(DOCS_ROOT).as_posix() + title = _extract_title(path, body) + content = _strip_frontmatter(body) + docs.append({"path": rel, "title": title, "content": content}) + corpus.append(_tokenize(f"{title} {content}")) + + return docs, BM25Okapi(corpus) + + +def _snippet(content: str, query_tokens: list[str], width: int = 240) -> str: + lowered = content.lower() + for tok in query_tokens: + idx = lowered.find(tok) + if idx >= 0: + start = max(0, idx - width // 2) + end = min(len(content), start + width) + return ("…" if start > 0 else "") + content[start:end].strip() + ( + "…" if end < len(content) else "" + ) + return content[:width].strip() + ("…" if len(content) > width else "") + + +@mcp.tool +async def search_dograh_docs(query: str, limit: int = 5) -> list[dict]: + """Search Dograh's product documentation. + + Returns the top matches as {path, title, snippet}. Pass the returned + `path` to `fetch_dograh_doc` to read the full page. Use this first + when you need to learn how a Dograh feature works before building + against it. + """ + docs, bm25 = _load_index() + tokens = _tokenize(query) + if not tokens: + return [] + + scores = bm25.get_scores(tokens) + ranked = sorted( + zip(scores, docs), key=lambda pair: pair[0], reverse=True + )[:limit] + + return [ + { + "path": doc["path"], + "title": doc["title"], + "snippet": _snippet(doc["content"], tokens), + "score": round(float(score), 3), + } + for score, doc in ranked + if score > 0 + ] + + +@mcp.tool +async def fetch_dograh_doc(path: str) -> dict: + """Fetch the full content of a Dograh docs page by its path + (e.g. `core-concepts/workflows.mdx`), as returned by `search_dograh_docs`. + """ + docs, _ = _load_index() + for doc in docs: + if doc["path"] == path: + return { + "path": doc["path"], + "title": doc["title"], + "content": doc["content"], + } + raise HTTPException(status_code=404, detail=f"Doc not found: {path}") diff --git a/api/mcp/tools/workflows.py b/api/mcp/tools/workflows.py new file mode 100644 index 0000000..d22d92b --- /dev/null +++ b/api/mcp/tools/workflows.py @@ -0,0 +1,47 @@ +from fastapi import HTTPException + +from api.db import db_client +from api.mcp.auth import authenticate_mcp_request +from api.mcp.server import mcp + + +@mcp.tool +async def list_workflows(status: str | None = None) -> list[dict]: + """List agents (workflows) in the caller's organization. + + Returns id, name, status, and created_at for each agent. Use + `get_workflow` to fetch a single agent's full definition. Pass + `status="active"` or `status="archived"` to filter. + """ + user = await authenticate_mcp_request() + workflows = await db_client.get_all_workflows_for_listing( + organization_id=user.selected_organization_id, + status=status, + ) + return [ + { + "id": w.id, + "name": w.name, + "status": w.status, + "created_at": w.created_at.isoformat() if w.created_at else None, + } + for w in workflows + ] + + +@mcp.tool +async def get_workflow(workflow_id: int) -> dict: + """Fetch a single agent by id, including its current published definition.""" + user = await authenticate_mcp_request() + workflow = await db_client.get_workflow_by_id(workflow_id) + if not workflow or workflow.organization_id != user.selected_organization_id: + raise HTTPException(status_code=404, detail=f"Workflow {workflow_id} not found") + + current = workflow.current_definition + return { + "id": workflow.id, + "name": workflow.name, + "status": workflow.status, + "definition": current.workflow_json if current else None, + "version_number": current.version_number if current else None, + } diff --git a/api/requirements.txt b/api/requirements.txt index e45dbe4..525b34f 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,5 +1,5 @@ langfuse==3.9.3 -fastapi==0.116.2 +fastapi==0.135.3 asyncpg==0.30.0 alembic==1.16.5 redis==5.3.1 @@ -17,4 +17,6 @@ docling[rapidocr]==2.68.0 pgvector==0.4.2 bcrypt==5.0.0 email-validator==2.3.0 -posthog==3.24.0 +posthog==7.11.1 +fastmcp==3.2.4 +rank-bm25==0.2.2 diff --git a/api/services/filesystem/minio.py b/api/services/filesystem/minio.py index 2e52c66..f26bf1c 100644 --- a/api/services/filesystem/minio.py +++ b/api/services/filesystem/minio.py @@ -12,14 +12,11 @@ from .base import BaseFileSystem class MinioFileSystem(BaseFileSystem): """MinIO implementation of the filesystem interface for OSS users. - Handles both internal (container-to-container) and external (browser) access: - - endpoint: Used for API operations (uploads, downloads from code) - - public_endpoint: Used for generating browser-accessible presigned URLs - - Auto-detection logic: - 1. If MINIO_PUBLIC_ENDPOINT env var is set, use it (for production/custom domains) - 2. If endpoint is "minio:9000" (Docker internal), auto-use "localhost:9000" for browser - 3. Otherwise, endpoint works for both (e.g., "localhost:9000" in local non-Docker setup) + Two endpoints, two different purposes: + - endpoint (host:port) + secure (bool): used by the MinIO SDK for + container-to-container calls. The SDK requires these split. + - public_endpoint (full URL, e.g. "https://example.com"): used verbatim + when building URLs that browsers will fetch. Required. """ def __init__( @@ -31,9 +28,22 @@ class MinioFileSystem(BaseFileSystem): secure: bool = False, public_endpoint: Optional[str] = None, ): + if not public_endpoint: + raise ValueError( + "MinioFileSystem requires public_endpoint (set MINIO_PUBLIC_ENDPOINT). " + "Expected a full URL with scheme, e.g. 'http://localhost:9000' or 'https://example.com'." + ) + if not ( + public_endpoint.startswith("http://") + or public_endpoint.startswith("https://") + ): + raise ValueError( + f"MINIO_PUBLIC_ENDPOINT must include a scheme (http:// or https://), got: {public_endpoint!r}" + ) + self.bucket_name = bucket_name self.endpoint = endpoint - self.public_endpoint = public_endpoint or endpoint + self.public_endpoint = public_endpoint.rstrip("/") self.secure = secure self.access_key = access_key self.secret_key = secret_key @@ -115,13 +125,12 @@ class MinioFileSystem(BaseFileSystem): use_internal_endpoint: bool = False, ) -> Optional[str]: try: - # For MinIO in local development, return unsigned URLs - # This avoids signature mismatch issues when endpoint differs - # MinIO must be configured to allow anonymous read access - protocol = "https" if self.secure else "http" - endpoint = self.endpoint if use_internal_endpoint else self.public_endpoint - url = f"{protocol}://{endpoint}/{self.bucket_name}/{file_path}" - return url + if use_internal_endpoint: + protocol = "https" if self.secure else "http" + base = f"{protocol}://{self.endpoint}" + else: + base = self.public_endpoint + return f"{base}/{self.bucket_name}/{file_path}" except Exception as e: logger.error(f"Error generating MinIO URL: {e}") return None @@ -162,9 +171,7 @@ class MinioFileSystem(BaseFileSystem): The bucket policy allows anonymous s3:PutObject, so no signature is needed. """ try: - # Return unsigned URL for anonymous upload - protocol = "https" if self.secure else "http" - url = f"{protocol}://{self.public_endpoint}/{self.bucket_name}/{file_path}" + url = f"{self.public_endpoint}/{self.bucket_name}/{file_path}" logger.debug(f"Generated unsigned upload URL: {url}") return url except Exception as e: diff --git a/api/services/storage.py b/api/services/storage.py index 6a5f407..0f05073 100644 --- a/api/services/storage.py +++ b/api/services/storage.py @@ -25,46 +25,24 @@ def get_storage_for_backend(backend: str) -> BaseFileSystem: """ # Code 2: MinIO implementation (local/OSS deployments) if backend == StorageBackend.MINIO.value: - endpoint = MINIO_ENDPOINT - # Auto-detect public endpoint: - # - If MINIO_PUBLIC_ENDPOINT is set, use it (for custom domains/IPs) - # - If running in Docker and endpoint is "minio:9000", use "localhost:9000" for local dev - # - Otherwise, use the endpoint as-is (both containers and browser can reach it) - public_endpoint = MINIO_PUBLIC_ENDPOINT - if not public_endpoint: - # Auto-detect based on endpoint - if endpoint.startswith("minio:"): - # Docker internal endpoint detected, assume local development - public_endpoint = endpoint.replace("minio:", "localhost:") - logger.info( - f"Auto-detected local development: using {public_endpoint} for public access" - ) - elif endpoint.startswith("host.docker.internal:"): - # Docker Desktop special DNS detected, use localhost for browser access - public_endpoint = endpoint.replace( - "host.docker.internal:", "localhost:" - ) - logger.info( - f"Auto-detected Docker Desktop: using {public_endpoint} for public access" - ) - else: - # Already using a public endpoint (localhost:9000 or domain:9000) - public_endpoint = endpoint - - access_key = MINIO_ACCESS_KEY - secret_key = MINIO_SECRET_KEY - bucket = MINIO_BUCKET - secure = MINIO_SECURE + if not MINIO_PUBLIC_ENDPOINT: + raise ValueError( + "MINIO_PUBLIC_ENDPOINT is required for MinIO storage. " + "Set it to the full URL browsers use to reach MinIO, " + "e.g. 'http://localhost:9000' for local dev or " + "'https://your-server.example.com' for a remote deployment." + ) logger.info( - f"Initializing {backend} storage at {endpoint} (public: {public_endpoint}) with bucket '{bucket}'" + f"Initializing {backend} storage at {MINIO_ENDPOINT} " + f"(public: {MINIO_PUBLIC_ENDPOINT}) with bucket '{MINIO_BUCKET}'" ) return MinioFileSystem( - endpoint=endpoint, - access_key=access_key, - secret_key=secret_key, - bucket_name=bucket, - secure=secure, - public_endpoint=public_endpoint, + endpoint=MINIO_ENDPOINT, + access_key=MINIO_ACCESS_KEY, + secret_key=MINIO_SECRET_KEY, + bucket_name=MINIO_BUCKET, + secure=MINIO_SECURE, + public_endpoint=MINIO_PUBLIC_ENDPOINT, ) # Code 1: AWS S3 implementation (cloud deployments) diff --git a/docker-compose.yaml b/docker-compose.yaml index 949b40a..262746e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -98,6 +98,10 @@ services: # MinIO MINIO_ENDPOINT: "minio:9000" + # Full URL (with scheme) browsers use to reach MinIO. For remote + # deployments behind HTTPS, set MINIO_PUBLIC_ENDPOINT in .env to + # e.g. https://your-server.example.com (nginx proxies /voice-audio/). + MINIO_PUBLIC_ENDPOINT: "${MINIO_PUBLIC_ENDPOINT:-http://localhost:9000}" MINIO_ACCESS_KEY: "minioadmin" MINIO_SECRET_KEY: "minioadmin" MINIO_BUCKET: "voice-audio" diff --git a/scripts/setup_custom_domain.sh b/scripts/setup_custom_domain.sh index 55d22bc..fec11be 100755 --- a/scripts/setup_custom_domain.sh +++ b/scripts/setup_custom_domain.sh @@ -278,6 +278,12 @@ if [[ -f ".env" ]]; then sed -i.bak "/^# Backend URL for UI$/d" .env # Update TURN_HOST to use domain sed -i.bak "s|^TURN_HOST=.*|TURN_HOST=$DOMAIN_NAME|" .env + # Update MINIO_PUBLIC_ENDPOINT to use domain (browsers fetch /voice-audio/* here) + if grep -q "^MINIO_PUBLIC_ENDPOINT=" .env; then + sed -i.bak "s|^MINIO_PUBLIC_ENDPOINT=.*|MINIO_PUBLIC_ENDPOINT=https://$DOMAIN_NAME|" .env + else + echo "MINIO_PUBLIC_ENDPOINT=https://$DOMAIN_NAME" >> .env + fi rm -f .env.bak echo -e "${GREEN}✓ .env updated with domain name${NC}" else diff --git a/scripts/setup_remote.sh b/scripts/setup_remote.sh index 652016e..c37ff8f 100755 --- a/scripts/setup_remote.sh +++ b/scripts/setup_remote.sh @@ -207,6 +207,9 @@ cat > .env << ENV_EOF # Backend API endpoint (public URL the backend uses to build webhook/embed links) BACKEND_API_ENDPOINT=https://$SERVER_IP +# Public URL browsers use to fetch objects from MinIO (proxied by nginx) +MINIO_PUBLIC_ENDPOINT=https://$SERVER_IP + # TURN Server Configuration (time-limited credentials via TURN REST API) TURN_HOST=$SERVER_IP TURN_SECRET=$TURN_SECRET diff --git a/ui/src/app/settings/page.tsx b/ui/src/app/settings/page.tsx index 1e8c91a..3032215 100644 --- a/ui/src/app/settings/page.tsx +++ b/ui/src/app/settings/page.tsx @@ -1,5 +1,6 @@ "use client"; +import { MCPSection } from "@/components/MCPSection"; import { TelemetrySection } from "@/components/TelemetrySection"; import { Card, @@ -20,6 +21,19 @@ export default function SettingsPage() {

+ + + MCP Server + + Let AI agents access your Dograh workspace and documentation via + the Model Context Protocol. + + + + + + + Telemetry diff --git a/ui/src/components/MCPSection.tsx b/ui/src/components/MCPSection.tsx new file mode 100644 index 0000000..90b516e --- /dev/null +++ b/ui/src/components/MCPSection.tsx @@ -0,0 +1,138 @@ +"use client"; + +import { Check, Copy } from "lucide-react"; +import Link from "next/link"; +import { useState } from "react"; + +import { Button } from "@/components/ui/button"; +import { Label } from "@/components/ui/label"; + +export function MCPSection() { + const backendUrl = + process.env.NEXT_PUBLIC_BACKEND_URL || + (typeof window !== "undefined" ? window.location.origin : ""); + const endpoint = `${backendUrl}/mcp/`; + + const clientConfig = JSON.stringify( + { + mcpServers: { + dograh: { + url: endpoint, + headers: { "X-API-Key": "YOUR_API_KEY" }, + }, + }, + }, + null, + 2, + ); + + const claudeCliCommand = `claude mcp add --transport http dograh ${endpoint} \\ + --header "X-API-Key: YOUR_API_KEY"`; + + const [endpointCopied, setEndpointCopied] = useState(false); + const [configCopied, setConfigCopied] = useState(false); + const [cliCopied, setCliCopied] = useState(false); + + const handleCopy = async ( + value: string, + setter: (v: boolean) => void, + ) => { + await navigator.clipboard.writeText(value); + setter(true); + setTimeout(() => setter(false), 2000); + }; + + return ( +
+
+ +

+ Connect an AI agent (Claude Desktop, Cursor, etc.) to this URL over + Streamable HTTP. Requires an API key in the X-API-Key header.{" "} + + Get your API key + +

+
+ + {endpoint} + + +
+
+ +
+ +

+ Run this in your terminal to register Dograh as an MCP server with + Claude Code. +

+
+
+            {claudeCliCommand}
+          
+ +
+
+ +
+ +

+ Paste this into your MCP client's config file (e.g. Claude + Desktop's{" "} + + claude_desktop_config.json + + ) and replace{" "} + + YOUR_API_KEY + + . +

+
+
+            {clientConfig}
+          
+ +
+
+
+ ); +}