feat: add mcp server to Dograh OSS (#240)

* feat: add mcp server

* update mcp endpoint
This commit is contained in:
Abhishek 2026-04-16 13:03:29 +05:30 committed by GitHub
parent e31b38122e
commit 79bc91b1e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 442 additions and 74 deletions

View file

@ -22,6 +22,9 @@ ENABLE_AWS_S3="false"
# MinIO Configuration if using containerised MinIO instead of
# AWS S3
MINIO_ENDPOINT=localhost:9000
# Full URL (with scheme) that browsers use to reach MinIO. Required.
# Remote deployments behind HTTPS: set to e.g. https://your-server.example.com
MINIO_PUBLIC_ENDPOINT=http://localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=voice-audio

View file

@ -65,6 +65,10 @@ ENV PYTHONUNBUFFERED=1
COPY ./api ./api
COPY ./scripts/start_services_dev.sh ./scripts/start_services_dev.sh
# Product documentation — read at runtime by the MCP docs tools
# (search_dograh_docs / fetch_dograh_doc) so agents can learn Dograh.
COPY ./docs ./docs
ENV PYTHONPATH=/app
# Disable file logging in Docker - logs go to stdout for docker logs

View file

@ -27,6 +27,7 @@ from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from api.constants import REDIS_URL
from api.mcp import mcp
from api.routes.main import router as main_router
from api.services.pipecat.tracing_config import (
handle_langfuse_sync,
@ -41,29 +42,32 @@ from api.tasks.arq import get_arq_redis
API_PREFIX = "/api/v1"
mcp_app = mcp.http_app(path="/", stateless_http=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
# warmup arq pool
await get_arq_redis()
async with mcp_app.lifespan(app):
# warmup arq pool
await get_arq_redis()
# Pre-register all org-specific Langfuse exporters so they're ready
# before any pipeline runs, without per-call DB lookups.
await load_all_org_langfuse_credentials()
# Pre-register all org-specific Langfuse exporters so they're ready
# before any pipeline runs, without per-call DB lookups.
await load_all_org_langfuse_credentials()
# Start cross-worker sync manager so config changes propagate to all workers
sync_manager = WorkerSyncManager(REDIS_URL)
sync_manager.register(
WorkerSyncEventType.LANGFUSE_CREDENTIALS, handle_langfuse_sync
)
await sync_manager.start()
set_worker_sync_manager(sync_manager)
# Start cross-worker sync manager so config changes propagate to all workers
sync_manager = WorkerSyncManager(REDIS_URL)
sync_manager.register(
WorkerSyncEventType.LANGFUSE_CREDENTIALS, handle_langfuse_sync
)
await sync_manager.start()
set_worker_sync_manager(sync_manager)
yield # Run app
yield # Run app
# Shutdown sequence - this runs when FastAPI is shutting down
logger.info("Starting graceful shutdown...")
await sync_manager.stop()
# Shutdown sequence - this runs when FastAPI is shutting down
logger.info("Starting graceful shutdown...")
await sync_manager.stop()
app = FastAPI(
@ -95,3 +99,9 @@ api_router.include_router(main_router)
# main router with api prefix
app.include_router(api_router, prefix=API_PREFIX)
# Mount the MCP server — agents reach it at /api/v1/mcp over Streamable HTTP,
# authenticating with the same X-API-Key header used by the REST API.
# Mounted under /api/v1 so existing reverse-proxy rules (nginx etc.) route it
# without any extra configuration.
app.mount(f"{API_PREFIX}/mcp", mcp_app)

View file

@ -121,4 +121,7 @@ def setup_logging():
logging_logger.setLevel(logging.INFO)
logging_logger.propagate = False
# MCP SDK logs a line per request lifecycle event; child loggers inherit.
logging.getLogger("mcp").setLevel(logging.WARNING)
_logging_initialized = True

3
api/mcp/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from api.mcp.server import mcp
__all__ = ["mcp"]

25
api/mcp/auth.py Normal file
View file

@ -0,0 +1,25 @@
from fastapi import HTTPException
from fastmcp.server.dependencies import get_http_headers
from api.db.models import UserModel
from api.services.auth.depends import _handle_api_key_auth
async def authenticate_mcp_request() -> UserModel:
"""Resolve the authenticated Dograh user for an MCP tool invocation.
Accepts either `X-API-Key: <key>` or `Authorization: Bearer <key>`,
reusing the API-key flow from `api.services.auth.depends`.
"""
headers = get_http_headers()
api_key = headers.get("x-api-key")
if not api_key:
auth = headers.get("authorization", "")
if auth.lower().startswith("bearer "):
api_key = auth.split(" ", 1)[1].strip()
if not api_key:
raise HTTPException(
status_code=401,
detail="Missing API key — send X-API-Key or Authorization: Bearer <key>",
)
return await _handle_api_key_auth(api_key)

6
api/mcp/server.py Normal file
View file

@ -0,0 +1,6 @@
from fastmcp import FastMCP
mcp = FastMCP("dograh")
from api.mcp.tools import docs as _docs # noqa: E402, F401
from api.mcp.tools import workflows as _workflows # noqa: E402, F401

View file

115
api/mcp/tools/docs.py Normal file
View file

@ -0,0 +1,115 @@
import re
from functools import lru_cache
from pathlib import Path
from fastapi import HTTPException
from rank_bm25 import BM25Okapi
from api.mcp.server import mcp
DOCS_ROOT = Path(__file__).resolve().parents[3] / "docs"
_TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")
_FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---\n", re.DOTALL)
_TITLE_RE = re.compile(r"^title:\s*['\"]?(.+?)['\"]?\s*$", re.MULTILINE)
_H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
def _tokenize(text: str) -> list[str]:
return [t.lower() for t in _TOKEN_RE.findall(text)]
def _extract_title(path: Path, body: str) -> str:
fm_match = _FRONTMATTER_RE.match(body)
if fm_match:
title_match = _TITLE_RE.search(fm_match.group(1))
if title_match:
return title_match.group(1).strip()
h1_match = _H1_RE.search(body)
if h1_match:
return h1_match.group(1).strip()
return path.stem.replace("-", " ").title()
def _strip_frontmatter(body: str) -> str:
return _FRONTMATTER_RE.sub("", body, count=1)
@lru_cache(maxsize=1)
def _load_index() -> tuple[list[dict], BM25Okapi]:
"""Read every docs/**/*.mdx file once and build a BM25 index.
Cached for the process lifetime docs rarely change between restarts.
"""
docs: list[dict] = []
corpus: list[list[str]] = []
for path in sorted(DOCS_ROOT.rglob("*.mdx")):
body = path.read_text(encoding="utf-8")
rel = path.relative_to(DOCS_ROOT).as_posix()
title = _extract_title(path, body)
content = _strip_frontmatter(body)
docs.append({"path": rel, "title": title, "content": content})
corpus.append(_tokenize(f"{title} {content}"))
return docs, BM25Okapi(corpus)
def _snippet(content: str, query_tokens: list[str], width: int = 240) -> str:
lowered = content.lower()
for tok in query_tokens:
idx = lowered.find(tok)
if idx >= 0:
start = max(0, idx - width // 2)
end = min(len(content), start + width)
return ("" if start > 0 else "") + content[start:end].strip() + (
"" if end < len(content) else ""
)
return content[:width].strip() + ("" if len(content) > width else "")
@mcp.tool
async def search_dograh_docs(query: str, limit: int = 5) -> list[dict]:
"""Search Dograh's product documentation.
Returns the top matches as {path, title, snippet}. Pass the returned
`path` to `fetch_dograh_doc` to read the full page. Use this first
when you need to learn how a Dograh feature works before building
against it.
"""
docs, bm25 = _load_index()
tokens = _tokenize(query)
if not tokens:
return []
scores = bm25.get_scores(tokens)
ranked = sorted(
zip(scores, docs), key=lambda pair: pair[0], reverse=True
)[:limit]
return [
{
"path": doc["path"],
"title": doc["title"],
"snippet": _snippet(doc["content"], tokens),
"score": round(float(score), 3),
}
for score, doc in ranked
if score > 0
]
@mcp.tool
async def fetch_dograh_doc(path: str) -> dict:
"""Fetch the full content of a Dograh docs page by its path
(e.g. `core-concepts/workflows.mdx`), as returned by `search_dograh_docs`.
"""
docs, _ = _load_index()
for doc in docs:
if doc["path"] == path:
return {
"path": doc["path"],
"title": doc["title"],
"content": doc["content"],
}
raise HTTPException(status_code=404, detail=f"Doc not found: {path}")

View file

@ -0,0 +1,47 @@
from fastapi import HTTPException
from api.db import db_client
from api.mcp.auth import authenticate_mcp_request
from api.mcp.server import mcp
@mcp.tool
async def list_workflows(status: str | None = None) -> list[dict]:
"""List agents (workflows) in the caller's organization.
Returns id, name, status, and created_at for each agent. Use
`get_workflow` to fetch a single agent's full definition. Pass
`status="active"` or `status="archived"` to filter.
"""
user = await authenticate_mcp_request()
workflows = await db_client.get_all_workflows_for_listing(
organization_id=user.selected_organization_id,
status=status,
)
return [
{
"id": w.id,
"name": w.name,
"status": w.status,
"created_at": w.created_at.isoformat() if w.created_at else None,
}
for w in workflows
]
@mcp.tool
async def get_workflow(workflow_id: int) -> dict:
"""Fetch a single agent by id, including its current published definition."""
user = await authenticate_mcp_request()
workflow = await db_client.get_workflow_by_id(workflow_id)
if not workflow or workflow.organization_id != user.selected_organization_id:
raise HTTPException(status_code=404, detail=f"Workflow {workflow_id} not found")
current = workflow.current_definition
return {
"id": workflow.id,
"name": workflow.name,
"status": workflow.status,
"definition": current.workflow_json if current else None,
"version_number": current.version_number if current else None,
}

View file

@ -1,5 +1,5 @@
langfuse==3.9.3
fastapi==0.116.2
fastapi==0.135.3
asyncpg==0.30.0
alembic==1.16.5
redis==5.3.1
@ -17,4 +17,6 @@ docling[rapidocr]==2.68.0
pgvector==0.4.2
bcrypt==5.0.0
email-validator==2.3.0
posthog==3.24.0
posthog==7.11.1
fastmcp==3.2.4
rank-bm25==0.2.2

View file

@ -12,14 +12,11 @@ from .base import BaseFileSystem
class MinioFileSystem(BaseFileSystem):
"""MinIO implementation of the filesystem interface for OSS users.
Handles both internal (container-to-container) and external (browser) access:
- endpoint: Used for API operations (uploads, downloads from code)
- public_endpoint: Used for generating browser-accessible presigned URLs
Auto-detection logic:
1. If MINIO_PUBLIC_ENDPOINT env var is set, use it (for production/custom domains)
2. If endpoint is "minio:9000" (Docker internal), auto-use "localhost:9000" for browser
3. Otherwise, endpoint works for both (e.g., "localhost:9000" in local non-Docker setup)
Two endpoints, two different purposes:
- endpoint (host:port) + secure (bool): used by the MinIO SDK for
container-to-container calls. The SDK requires these split.
- public_endpoint (full URL, e.g. "https://example.com"): used verbatim
when building URLs that browsers will fetch. Required.
"""
def __init__(
@ -31,9 +28,22 @@ class MinioFileSystem(BaseFileSystem):
secure: bool = False,
public_endpoint: Optional[str] = None,
):
if not public_endpoint:
raise ValueError(
"MinioFileSystem requires public_endpoint (set MINIO_PUBLIC_ENDPOINT). "
"Expected a full URL with scheme, e.g. 'http://localhost:9000' or 'https://example.com'."
)
if not (
public_endpoint.startswith("http://")
or public_endpoint.startswith("https://")
):
raise ValueError(
f"MINIO_PUBLIC_ENDPOINT must include a scheme (http:// or https://), got: {public_endpoint!r}"
)
self.bucket_name = bucket_name
self.endpoint = endpoint
self.public_endpoint = public_endpoint or endpoint
self.public_endpoint = public_endpoint.rstrip("/")
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
@ -115,13 +125,12 @@ class MinioFileSystem(BaseFileSystem):
use_internal_endpoint: bool = False,
) -> Optional[str]:
try:
# For MinIO in local development, return unsigned URLs
# This avoids signature mismatch issues when endpoint differs
# MinIO must be configured to allow anonymous read access
protocol = "https" if self.secure else "http"
endpoint = self.endpoint if use_internal_endpoint else self.public_endpoint
url = f"{protocol}://{endpoint}/{self.bucket_name}/{file_path}"
return url
if use_internal_endpoint:
protocol = "https" if self.secure else "http"
base = f"{protocol}://{self.endpoint}"
else:
base = self.public_endpoint
return f"{base}/{self.bucket_name}/{file_path}"
except Exception as e:
logger.error(f"Error generating MinIO URL: {e}")
return None
@ -162,9 +171,7 @@ class MinioFileSystem(BaseFileSystem):
The bucket policy allows anonymous s3:PutObject, so no signature is needed.
"""
try:
# Return unsigned URL for anonymous upload
protocol = "https" if self.secure else "http"
url = f"{protocol}://{self.public_endpoint}/{self.bucket_name}/{file_path}"
url = f"{self.public_endpoint}/{self.bucket_name}/{file_path}"
logger.debug(f"Generated unsigned upload URL: {url}")
return url
except Exception as e:

View file

@ -25,46 +25,24 @@ def get_storage_for_backend(backend: str) -> BaseFileSystem:
"""
# Code 2: MinIO implementation (local/OSS deployments)
if backend == StorageBackend.MINIO.value:
endpoint = MINIO_ENDPOINT
# Auto-detect public endpoint:
# - If MINIO_PUBLIC_ENDPOINT is set, use it (for custom domains/IPs)
# - If running in Docker and endpoint is "minio:9000", use "localhost:9000" for local dev
# - Otherwise, use the endpoint as-is (both containers and browser can reach it)
public_endpoint = MINIO_PUBLIC_ENDPOINT
if not public_endpoint:
# Auto-detect based on endpoint
if endpoint.startswith("minio:"):
# Docker internal endpoint detected, assume local development
public_endpoint = endpoint.replace("minio:", "localhost:")
logger.info(
f"Auto-detected local development: using {public_endpoint} for public access"
)
elif endpoint.startswith("host.docker.internal:"):
# Docker Desktop special DNS detected, use localhost for browser access
public_endpoint = endpoint.replace(
"host.docker.internal:", "localhost:"
)
logger.info(
f"Auto-detected Docker Desktop: using {public_endpoint} for public access"
)
else:
# Already using a public endpoint (localhost:9000 or domain:9000)
public_endpoint = endpoint
access_key = MINIO_ACCESS_KEY
secret_key = MINIO_SECRET_KEY
bucket = MINIO_BUCKET
secure = MINIO_SECURE
if not MINIO_PUBLIC_ENDPOINT:
raise ValueError(
"MINIO_PUBLIC_ENDPOINT is required for MinIO storage. "
"Set it to the full URL browsers use to reach MinIO, "
"e.g. 'http://localhost:9000' for local dev or "
"'https://your-server.example.com' for a remote deployment."
)
logger.info(
f"Initializing {backend} storage at {endpoint} (public: {public_endpoint}) with bucket '{bucket}'"
f"Initializing {backend} storage at {MINIO_ENDPOINT} "
f"(public: {MINIO_PUBLIC_ENDPOINT}) with bucket '{MINIO_BUCKET}'"
)
return MinioFileSystem(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
bucket_name=bucket,
secure=secure,
public_endpoint=public_endpoint,
endpoint=MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
bucket_name=MINIO_BUCKET,
secure=MINIO_SECURE,
public_endpoint=MINIO_PUBLIC_ENDPOINT,
)
# Code 1: AWS S3 implementation (cloud deployments)

View file

@ -98,6 +98,10 @@ services:
# MinIO
MINIO_ENDPOINT: "minio:9000"
# Full URL (with scheme) browsers use to reach MinIO. For remote
# deployments behind HTTPS, set MINIO_PUBLIC_ENDPOINT in .env to
# e.g. https://your-server.example.com (nginx proxies /voice-audio/).
MINIO_PUBLIC_ENDPOINT: "${MINIO_PUBLIC_ENDPOINT:-http://localhost:9000}"
MINIO_ACCESS_KEY: "minioadmin"
MINIO_SECRET_KEY: "minioadmin"
MINIO_BUCKET: "voice-audio"

View file

@ -278,6 +278,12 @@ if [[ -f ".env" ]]; then
sed -i.bak "/^# Backend URL for UI$/d" .env
# Update TURN_HOST to use domain
sed -i.bak "s|^TURN_HOST=.*|TURN_HOST=$DOMAIN_NAME|" .env
# Update MINIO_PUBLIC_ENDPOINT to use domain (browsers fetch /voice-audio/* here)
if grep -q "^MINIO_PUBLIC_ENDPOINT=" .env; then
sed -i.bak "s|^MINIO_PUBLIC_ENDPOINT=.*|MINIO_PUBLIC_ENDPOINT=https://$DOMAIN_NAME|" .env
else
echo "MINIO_PUBLIC_ENDPOINT=https://$DOMAIN_NAME" >> .env
fi
rm -f .env.bak
echo -e "${GREEN}✓ .env updated with domain name${NC}"
else

View file

@ -207,6 +207,9 @@ cat > .env << ENV_EOF
# Backend API endpoint (public URL the backend uses to build webhook/embed links)
BACKEND_API_ENDPOINT=https://$SERVER_IP
# Public URL browsers use to fetch objects from MinIO (proxied by nginx)
MINIO_PUBLIC_ENDPOINT=https://$SERVER_IP
# TURN Server Configuration (time-limited credentials via TURN REST API)
TURN_HOST=$SERVER_IP
TURN_SECRET=$TURN_SECRET

View file

@ -1,5 +1,6 @@
"use client";
import { MCPSection } from "@/components/MCPSection";
import { TelemetrySection } from "@/components/TelemetrySection";
import {
Card,
@ -20,6 +21,19 @@ export default function SettingsPage() {
</p>
</div>
<Card>
<CardHeader>
<CardTitle>MCP Server</CardTitle>
<CardDescription>
Let AI agents access your Dograh workspace and documentation via
the Model Context Protocol.
</CardDescription>
</CardHeader>
<CardContent>
<MCPSection />
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle>Telemetry</CardTitle>

View file

@ -0,0 +1,138 @@
"use client";
import { Check, Copy } from "lucide-react";
import Link from "next/link";
import { useState } from "react";
import { Button } from "@/components/ui/button";
import { Label } from "@/components/ui/label";
export function MCPSection() {
const backendUrl =
process.env.NEXT_PUBLIC_BACKEND_URL ||
(typeof window !== "undefined" ? window.location.origin : "");
const endpoint = `${backendUrl}/api/v1/mcp/`;
const clientConfig = JSON.stringify(
{
mcpServers: {
dograh: {
url: endpoint,
headers: { "X-API-Key": "YOUR_API_KEY" },
},
},
},
null,
2,
);
const claudeCliCommand = `claude mcp add --transport http dograh ${endpoint} \\
--header "X-API-Key: YOUR_API_KEY"`;
const [endpointCopied, setEndpointCopied] = useState(false);
const [configCopied, setConfigCopied] = useState(false);
const [cliCopied, setCliCopied] = useState(false);
const handleCopy = async (
value: string,
setter: (v: boolean) => void,
) => {
await navigator.clipboard.writeText(value);
setter(true);
setTimeout(() => setter(false), 2000);
};
return (
<div className="grid gap-6">
<div className="grid gap-2">
<Label>MCP Endpoint</Label>
<p className="text-xs text-muted-foreground">
Connect an AI agent (Claude Desktop, Cursor, etc.) to this URL over
Streamable HTTP. Requires an API key in the X-API-Key header.{" "}
<Link
href="/api-keys"
target="_blank"
className="text-primary underline hover:no-underline"
>
Get your API key
</Link>
</p>
<div className="flex items-center gap-2">
<code className="text-xs break-all bg-muted px-2 py-1 rounded flex-1">
{endpoint}
</code>
<Button
variant="outline"
size="icon"
className="shrink-0"
onClick={() => handleCopy(endpoint, setEndpointCopied)}
>
{endpointCopied ? (
<Check className="h-4 w-4" />
) : (
<Copy className="h-4 w-4" />
)}
</Button>
</div>
</div>
<div className="grid gap-2">
<Label>Claude Code CLI</Label>
<p className="text-xs text-muted-foreground">
Run this in your terminal to register Dograh as an MCP server with
Claude Code.
</p>
<div className="relative">
<pre className="text-xs bg-muted px-3 py-2 pr-12 rounded overflow-x-auto whitespace-pre-wrap">
{claudeCliCommand}
</pre>
<Button
variant="outline"
size="icon"
className="absolute top-2 right-2"
onClick={() => handleCopy(claudeCliCommand, setCliCopied)}
>
{cliCopied ? (
<Check className="h-4 w-4" />
) : (
<Copy className="h-4 w-4" />
)}
</Button>
</div>
</div>
<div className="grid gap-2">
<Label>Client Configuration</Label>
<p className="text-xs text-muted-foreground">
Paste this into your MCP client&apos;s config file (e.g. Claude
Desktop&apos;s{" "}
<code className="rounded bg-muted px-1 py-0.5 font-mono text-[11px]">
claude_desktop_config.json
</code>
) and replace{" "}
<code className="rounded bg-muted px-1 py-0.5 font-mono text-[11px]">
YOUR_API_KEY
</code>
.
</p>
<div className="relative">
<pre className="text-xs bg-muted px-3 py-2 pr-12 rounded overflow-x-auto whitespace-pre-wrap">
{clientConfig}
</pre>
<Button
variant="outline"
size="icon"
className="absolute top-2 right-2"
onClick={() => handleCopy(clientConfig, setConfigCopied)}
>
{configCopied ? (
<Check className="h-4 w-4" />
) : (
<Copy className="h-4 w-4" />
)}
</Button>
</div>
</div>
</div>
);
}