Merge remote-tracking branch 'origin/main' into pr-333-fix-webRTC-LAN

This commit is contained in:
Abhishek Kumar 2026-05-20 19:10:56 +05:30
commit 777b0596f7
8 changed files with 1248 additions and 28 deletions

View file

@ -7,6 +7,14 @@ handling, hard constraints). Design-level per-field guidance belongs in
each `PropertySpec.llm_hint`; it flows out through `get_node_type` and
doesn't need to be repeated here.
Tool names, parameters, and per-tool `error_code` values are NOT
authoritative here they reach the model dynamically via `tools/list`
from each tool's own signature and docstring. Reference tools by bare
name and describe orchestration; do not restate signatures (they drift)
or re-enumerate error codes (document those on the tool itself).
`test_mcp_instructions_drift.py` fails if this guide names a tool that
is not registered, or if a tool's error codes aren't in its docstring.
Extend based on real LLM failures every bullet below ideally maps to a
mistake the system has seen at least once.
"""
@ -16,18 +24,23 @@ You build and edit Dograh voice-AI workflows by emitting TypeScript that uses th
## Call order
### Reading documentation
1. `search_docs` use first for keyword or acronym lookup when the user is asking how Dograh works or how to configure something.
2. `read_doc` fetch the full page once one result looks likely. Prefer this over reasoning from search summaries alone.
3. `list_docs` use when the user wants to browse a topic area or when search terms are too vague. Call it with no arguments for the top-level sections; returned section paths feed back into `list_docs`, returned page paths feed into `read_doc`.
### Editing an existing workflow
1. `list_workflows` locate the target workflow.
2. `get_workflow_code(workflow_id)` fetch the current source.
3. (optional) `list_node_types` / `get_node_type(name)` consult before adding or editing a node type whose fields aren't already visible in the current code.
2. `get_workflow_code` fetch the current source for that workflow.
3. (optional) `list_node_types` / `get_node_type` consult before adding or editing a node type whose fields aren't already visible in the current code.
4. Mutate the code in place. Preserve existing nodes, edges, and variable names unless the task requires removing or renaming them.
5. `save_workflow(workflow_id, code)` persist as a new draft. The published version is untouched.
5. `save_workflow` persist as a new draft. The published version is untouched.
### Creating a new workflow
1. Create a simple 1-node workflow with only `startCall`. The user can iteratively add complexity by editing it.
2. `list_node_types` / `get_node_type(name)` consult to learn the fields available on the node types you intend to use.
2. `list_node_types` / `get_node_type` consult to learn the fields available on the node types you intend to use.
3. Author SDK TypeScript from scratch. The `new Workflow({ name: "..." })` call is required `name` becomes the workflow's display name.
4. `create_workflow(code)` persists a new workflow as version 1 (published). Returns the new `workflow_id`. For subsequent edits use `save_workflow(workflow_id, code)` (which writes a draft).
4. `create_workflow` persists a new workflow as version 1 (published). Returns the new `workflow_id`. For subsequent edits use `save_workflow` (which writes a draft).
## Allowed source shape
@ -68,14 +81,7 @@ Example:
## Iterating on errors
`save_workflow` and `create_workflow` return one of:
- `parse_error` Disallowed construct (see grammar above) or malformed TypeScript.
- `validation_error` Node data failed spec validation (unknown field, missing required, wrong type, bad `options` value).
- `graph_validation` Structural rule broken (missing startCall, unreachable node, edge to/from wrong node type).
- `missing_name` (`create_workflow` only) `new Workflow({ name })` is absent or empty.
- `bridge_error` Internal; retry once, then surface to the user.
Every error carries `line` and `column`. Fix at that location and resubmit the **complete source** this tool does not accept patches.
A failed `save_workflow` / `create_workflow` returns a result with `saved`/`created` set to false, a machine-readable `error_code`, and a human-readable `error` message carrying `line` and `column` when the problem is locatable in your source. The full set of `error_code` values and their meanings is documented on each tool (visible in its description). Read the `error` message, fix at the reported location, and resubmit the **complete source** these tools do not accept patches. If a failure looks internal or transient rather than a problem with your code, retry once before surfacing it to the user.
## Field conventions

View file

@ -1,4 +1,5 @@
from fastmcp import FastMCP
from mcp.types import ToolAnnotations
from api.mcp_server.instructions import DOGRAH_MCP_INSTRUCTIONS
from api.mcp_server.tools.catalog import (
@ -8,6 +9,7 @@ from api.mcp_server.tools.catalog import (
list_tools,
)
from api.mcp_server.tools.create_workflow import create_workflow
from api.mcp_server.tools.docs_search import list_docs, read_doc, search_docs
from api.mcp_server.tools.get_workflow_code import get_workflow_code
from api.mcp_server.tools.node_types import get_node_type, list_node_types
from api.mcp_server.tools.save_workflow import save_workflow
@ -29,3 +31,13 @@ for _tool in (
save_workflow,
):
mcp.tool(_tool)
_DOCS_TOOL_ANNOTATIONS = ToolAnnotations(
readOnlyHint=True,
idempotentHint=True,
destructiveHint=False,
openWorldHint=False,
)
for _tool in (list_docs, read_doc, search_docs):
mcp.tool(_tool, annotations=_DOCS_TOOL_ANNOTATIONS)

View file

@ -12,10 +12,10 @@ Execution flow mirrors `save_workflow`:
4. Persist via `db_client.create_workflow` workflow row + v1
published definition in a single transaction.
Error codes surfaced to the LLM match `save_workflow`. An additional
`missing_name` error is returned when the source omits
`new Workflow({ name: "..." })` the name is required and there is no
prior workflow to fall back to.
Each failure path returns an `error_code` via `_error_result`. Those
codes and their meanings are documented in the `create_workflow`
docstring (the description shipped to the LLM via `tools/list`); keep the
two in sync `test_mcp_instructions_drift.py` enforces it.
"""
from __future__ import annotations
@ -86,6 +86,22 @@ async def create_workflow(code: str) -> dict[str, Any]:
On success the new workflow is published as version 1. Use
`save_workflow(workflow_id, code)` for subsequent edits those go to
a draft.
On failure the result has `created: false`, a machine-readable
`error_code`, and a human-readable `error` (with file:line:column
where the problem is locatable). Resubmit the full corrected source
patches are not accepted. Possible `error_code` values:
- `parse_error` disallowed construct or malformed TypeScript.
- `validation_error` node data failed spec validation (unknown
field, missing required, wrong type, option out of range).
- `schema_validation` wire-format (DTO) rejection; rare.
- `graph_validation` structural rule broken (e.g. no start node,
unreachable node, edge to/from the wrong node type).
- `missing_name` `new Workflow({ name })` is absent or empty; the
name is required and there is no prior workflow to fall back to.
- `trigger_path_conflict` a trigger node's path is already used by
another workflow in this organization; rename it and resubmit.
- `bridge_error` internal/transient; retry once, then surface it.
"""
user = await authenticate_mcp_request()

View file

@ -0,0 +1,704 @@
"""MCP docs discovery tools over the Mintlify docs tree.
The docs surface is intentionally split into three steps:
- ``list_docs`` for lightweight navigation over the published hierarchy
- ``search_docs`` for keyword lookup across the visible docs catalog
- ``read_doc`` for the full content of one chosen page (or one section)
The runtime index is derived from ``docs/docs.json`` plus the referenced
``.mdx``/``.md`` files. That keeps navigation, ordering, and visibility in
sync with the published docs rather than indexing every file under ``docs/``.
"""
from __future__ import annotations
import json
import os
import re
from collections import Counter
from dataclasses import dataclass, replace
from functools import lru_cache
from pathlib import Path
from typing import Any
import yaml
from fastapi import HTTPException
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.tracing import traced_tool
DOCS_SEARCH_MAX_LIMIT = 25
DOCS_LIST_MAX_DEPTH = 3
_ROOT_SECTION_PATH = "__root__"
_TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")
_FRONTMATTER_RE = re.compile(r"\A---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
_HEADING_RE = re.compile(r"^(#{1,6})\s+(.*?)\s*$", re.MULTILINE)
_STOPWORDS = {
"a",
"an",
"and",
"are",
"at",
"be",
"by",
"can",
"do",
"for",
"from",
"how",
"i",
"if",
"in",
"is",
"it",
"me",
"my",
"of",
"on",
"or",
"the",
"to",
"what",
"when",
"where",
"with",
"you",
"your",
}
@dataclass(frozen=True)
class DocSection:
title: str
slug: str
level: int
content: str
@dataclass(frozen=True)
class DocPage:
path: str
file_path: str
title: str
description: str
llm_hint: str
aliases: tuple[str, ...]
breadcrumb: tuple[str, ...]
content: str
sections: tuple[DocSection, ...]
order: int
def breadcrumb_text(self) -> str:
return " > ".join(self.breadcrumb)
def routing_hint(self) -> str:
return self.llm_hint or self.description
def to_catalog_dict(self, section: DocSection | None = None) -> dict:
data = {
"kind": "page",
"path": self.path,
"title": self.title,
"breadcrumb": self.breadcrumb_text(),
"llm_hint": self.routing_hint(),
}
if section is not None:
data["section_title"] = section.title
data["section_slug"] = section.slug
return _compact_dict(data)
def to_read_dict(self, section: DocSection | None = None) -> dict:
active_section = section
content = self.content
if active_section is not None:
content = active_section.content
return _compact_dict(
{
"path": self.path,
"title": self.title,
"breadcrumb": self.breadcrumb_text(),
"llm_hint": self.routing_hint(),
"section_title": active_section.title if active_section else None,
"section_slug": active_section.slug if active_section else None,
"content": content,
"sections": [
{"title": sec.title, "slug": sec.slug}
for sec in self.sections
if sec.title and sec.slug
],
}
)
@dataclass(frozen=True)
class NavSection:
path: str
title: str
breadcrumb: tuple[str, ...]
children: tuple[tuple[str, str], ...]
descendant_page_count: int = 0
def breadcrumb_text(self) -> str:
return " > ".join(self.breadcrumb)
def to_mcp_dict(self) -> dict:
hint = None
if self.descendant_page_count:
hint = f"Browse {self.descendant_page_count} docs in this section."
return _compact_dict(
{
"kind": "section",
"path": self.path,
"title": self.title,
"breadcrumb": self.breadcrumb_text(),
"llm_hint": hint,
"has_children": bool(self.children),
"child_count": len(self.children),
"page_count": self.descendant_page_count,
}
)
@dataclass(frozen=True)
class DocsIndex:
pages_by_path: dict[str, DocPage]
sections_by_path: dict[str, NavSection]
def _compact_dict(data: dict[str, Any]) -> dict[str, Any]:
return {
key: value for key, value in data.items() if value not in (None, "", [], (), {})
}
def _slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "section"
def _coerce_docs_root(candidate: Path) -> Path | None:
candidate = candidate.expanduser().resolve()
if (candidate / "docs.json").is_file():
return candidate
nested = candidate / "docs"
if (nested / "docs.json").is_file():
return nested
return None
def _resolve_docs_root() -> Path | None:
"""Return the path to the on-disk docs tree, or None if not found."""
override = os.environ.get("DOGRAH_DOCS_PATH")
if override:
resolved = _coerce_docs_root(Path(override))
if resolved is not None:
return resolved
docker_default = _coerce_docs_root(Path("/app/docs"))
if docker_default is not None:
return docker_default
for parent in Path(__file__).resolve().parents:
resolved = _coerce_docs_root(parent / "docs")
if resolved is not None:
return resolved
return None
def _split_frontmatter(contents: str) -> tuple[dict[str, Any], str]:
match = _FRONTMATTER_RE.match(contents)
if not match:
return {}, contents
try:
frontmatter = yaml.safe_load(match.group(1)) or {}
except yaml.YAMLError:
return {}, contents
if not isinstance(frontmatter, dict):
frontmatter = {}
return frontmatter, contents[match.end() :].lstrip("\n")
def _strip_frontmatter(contents: str) -> str:
"""Drop the YAML frontmatter block from a docs page body."""
return _split_frontmatter(contents)[1]
def _clean_heading_text(raw: str) -> str:
text = re.sub(r"\s*\{#.*\}\s*$", "", raw.strip())
return " ".join(text.split())
def _extract_page_title(contents: str, fallback: str) -> str:
"""Pull a human-readable title for a docs page."""
frontmatter, body = _split_frontmatter(contents)
title = frontmatter.get("title")
if isinstance(title, str) and title.strip():
return title.strip()
match = _HEADING_RE.search(body)
if match:
return _clean_heading_text(match.group(2))
return fallback
def _normalize_text(value: Any) -> str:
if isinstance(value, str):
return " ".join(value.strip().split())
return ""
def _normalize_aliases(value: Any) -> tuple[str, ...]:
if isinstance(value, str):
aliases = [value]
elif isinstance(value, list):
aliases = [item for item in value if isinstance(item, str)]
else:
aliases = []
return tuple(alias.strip() for alias in aliases if alias.strip())
def _extract_sections(body: str) -> tuple[DocSection, ...]:
matches = list(_HEADING_RE.finditer(body))
stripped_body = body.strip()
if not matches:
if not stripped_body:
return ()
return (
DocSection(
title="Overview",
slug="overview",
level=1,
content=stripped_body,
),
)
sections: list[DocSection] = []
preamble = body[: matches[0].start()].strip()
if preamble:
sections.append(
DocSection(
title="Overview",
slug="overview",
level=1,
content=preamble,
)
)
for index, match in enumerate(matches):
start = match.start()
end = matches[index + 1].start() if index + 1 < len(matches) else len(body)
title = _clean_heading_text(match.group(2))
sections.append(
DocSection(
title=title or "Section",
slug=_slugify(title or "section"),
level=len(match.group(1)),
content=body[start:end].strip(),
)
)
return tuple(sections)
def _tokenize_text(text: str) -> list[str]:
return [
token
for token in _TOKEN_RE.findall(text.lower())
if len(token) >= 2 and token not in _STOPWORDS
]
def _tokenize_query(query: str) -> list[str]:
"""Split a user query into lowercased keyword terms."""
seen: set[str] = set()
terms: list[str] = []
for token in _TOKEN_RE.findall(query.lower()):
if len(token) < 2 or token in _STOPWORDS or token in seen:
continue
seen.add(token)
terms.append(token)
return terms
def _resolve_doc_file(root: Path, route_path: str) -> Path | None:
candidates = (
root / f"{route_path}.mdx",
root / f"{route_path}.md",
root / route_path / "index.mdx",
root / route_path / "index.md",
)
for candidate in candidates:
if candidate.is_file():
return candidate
return None
def _build_doc_page(
root: Path,
route_path: str,
*,
breadcrumb: tuple[str, ...],
order: int,
) -> DocPage | None:
file_path = _resolve_doc_file(root, route_path)
if file_path is None:
return None
try:
contents = file_path.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
return None
frontmatter, body = _split_frontmatter(contents)
fallback = route_path.rsplit("/", 1)[-1].replace("-", " ").title()
title = _extract_page_title(contents, fallback=fallback)
description = _normalize_text(frontmatter.get("description"))
llm_hint = _normalize_text(frontmatter.get("llm_hint"))
aliases = _normalize_aliases(frontmatter.get("aliases"))
content = body.strip()
return DocPage(
path=route_path,
file_path=file_path.relative_to(root).as_posix(),
title=title,
description=description,
llm_hint=llm_hint,
aliases=aliases,
breadcrumb=breadcrumb,
content=content,
sections=_extract_sections(content),
order=order,
)
def _score_counter(counter: Counter[str], term: str, *, weight: int, cap: int) -> int:
return min(counter.get(term, 0), cap) * weight
def _normalized_phrase(text: str) -> str:
return " ".join(_tokenize_text(text))
def _score_section(section: DocSection, terms: list[str]) -> int:
title_counts = Counter(_tokenize_text(section.title))
body_counts = Counter(_tokenize_text(section.content))
score = 0
matched_terms = 0
for term in terms:
term_score = _score_counter(
title_counts, term, weight=7, cap=2
) + _score_counter(body_counts, term, weight=1, cap=4)
if term_score:
matched_terms += 1
score += term_score
score += matched_terms * 4
phrase = " ".join(terms)
if phrase and phrase in _normalized_phrase(section.content):
score += 6
return score
def _score_page(page: DocPage, terms: list[str]) -> tuple[int, DocSection | None]:
if not terms:
return 0, None
path_counts = Counter(_tokenize_text(page.path))
title_counts = Counter(_tokenize_text(page.title))
breadcrumb_counts = Counter(_tokenize_text(" ".join(page.breadcrumb)))
hint_counts = Counter(_tokenize_text(page.routing_hint()))
alias_counts = Counter(_tokenize_text(" ".join(page.aliases)))
score = 0
matched_terms = 0
for term in terms:
term_score = (
_score_counter(path_counts, term, weight=6, cap=3)
+ _score_counter(title_counts, term, weight=10, cap=2)
+ _score_counter(breadcrumb_counts, term, weight=4, cap=2)
+ _score_counter(hint_counts, term, weight=7, cap=3)
+ _score_counter(alias_counts, term, weight=7, cap=3)
)
if term_score:
matched_terms += 1
score += term_score
best_section = None
best_section_score = 0
for section in page.sections:
section_score = _score_section(section, terms)
if section_score > best_section_score:
best_section = section
best_section_score = section_score
if score == 0 and best_section_score == 0:
return 0, None
score += matched_terms * 8 + best_section_score
phrase = " ".join(terms)
if phrase:
if phrase in _normalized_phrase(page.title):
score += 12
elif phrase in _normalized_phrase(page.routing_hint()):
score += 8
elif phrase in _normalized_phrase(page.path):
score += 8
elif best_section is not None and phrase in _normalized_phrase(
best_section.content
):
score += 4
return score, best_section
def _set_descendant_counts(
sections_by_path: dict[str, NavSection],
section_path: str,
) -> int:
section = sections_by_path[section_path]
page_count = 0
for child_kind, child_path in section.children:
if child_kind == "page":
page_count += 1
else:
page_count += _set_descendant_counts(sections_by_path, child_path)
sections_by_path[section_path] = replace(section, descendant_page_count=page_count)
return page_count
@lru_cache(maxsize=1)
def _docs_index() -> DocsIndex:
root = _resolve_docs_root()
if root is None:
return DocsIndex(pages_by_path={}, sections_by_path={})
try:
docs_config = json.loads((root / "docs.json").read_text(encoding="utf-8"))
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return DocsIndex(pages_by_path={}, sections_by_path={})
pages_by_path: dict[str, DocPage] = {}
sections_by_path: dict[str, NavSection] = {}
page_order = 0
def ensure_unique_section_path(base_path: str) -> str:
if base_path not in sections_by_path:
return base_path
suffix = 2
while f"{base_path}-{suffix}" in sections_by_path:
suffix += 1
return f"{base_path}-{suffix}"
def walk_pages(
items: list[Any],
*,
section_path: str,
section_title: str,
ancestor_breadcrumb: tuple[str, ...],
) -> None:
nonlocal page_order
children: list[tuple[str, str]] = []
page_breadcrumb = ancestor_breadcrumb + (section_title,)
for item in items:
if isinstance(item, str):
route_path = item.strip("/")
if not route_path:
continue
if route_path not in pages_by_path:
page = _build_doc_page(
root,
route_path,
breadcrumb=page_breadcrumb,
order=page_order,
)
if page is not None:
pages_by_path[route_path] = page
page_order += 1
if route_path in pages_by_path:
children.append(("page", route_path))
continue
if not isinstance(item, dict):
continue
group_title = str(item.get("group", "")).strip()
nested_pages = item.get("pages")
if not group_title or not isinstance(nested_pages, list):
continue
child_path = ensure_unique_section_path(
f"{section_path}/{_slugify(group_title)}"
)
walk_pages(
nested_pages,
section_path=child_path,
section_title=group_title,
ancestor_breadcrumb=page_breadcrumb,
)
children.append(("section", child_path))
sections_by_path[section_path] = NavSection(
path=section_path,
title=section_title,
breadcrumb=ancestor_breadcrumb,
children=tuple(children),
)
root_children: list[tuple[str, str]] = []
tabs = docs_config.get("navigation", {}).get("tabs", [])
for tab in tabs:
if not isinstance(tab, dict):
continue
tab_title = str(tab.get("tab", "")).strip() or "Docs"
for group in tab.get("groups", []):
if not isinstance(group, dict):
continue
group_title = str(group.get("group", "")).strip()
group_pages = group.get("pages")
if not group_title or not isinstance(group_pages, list):
continue
top_level_path = ensure_unique_section_path(
f"{_slugify(tab_title)}/{_slugify(group_title)}"
)
walk_pages(
group_pages,
section_path=top_level_path,
section_title=group_title,
ancestor_breadcrumb=(tab_title,),
)
root_children.append(("section", top_level_path))
sections_by_path[_ROOT_SECTION_PATH] = NavSection(
path=_ROOT_SECTION_PATH,
title="Docs",
breadcrumb=(),
children=tuple(root_children),
)
_set_descendant_counts(sections_by_path, _ROOT_SECTION_PATH)
return DocsIndex(pages_by_path=pages_by_path, sections_by_path=sections_by_path)
def _get_page_or_404(path: str) -> DocPage:
page = _docs_index().pages_by_path.get(path.strip("/"))
if page is None:
raise HTTPException(status_code=404, detail=f"Unknown docs page: {path!r}")
return page
def _find_section(page: DocPage, section: str) -> DocSection | None:
target = section.strip().lower()
for candidate in page.sections:
if candidate.slug.lower() == target or candidate.title.lower() == target:
return candidate
return None
def _expand_nav_entries(
index: DocsIndex,
section_path: str,
depth: int,
) -> list[dict]:
section = index.sections_by_path[section_path]
results: list[dict] = []
for child_kind, child_path in section.children:
if child_kind == "section":
child_section = index.sections_by_path[child_path]
results.append(child_section.to_mcp_dict())
if depth > 1:
results.extend(_expand_nav_entries(index, child_path, depth - 1))
else:
results.append(index.pages_by_path[child_path].to_catalog_dict())
return results
@traced_tool
async def list_docs(path: str | None = None, depth: int = 1) -> list[dict]:
"""Browse the Dograh docs hierarchy before reading a page in full.
``path`` addresses navigation sections exposed by this tool. Page paths
returned by ``search_docs`` and ``read_doc`` are the published docs routes
instead, for example ``voice-agent/tools/mcp-tool``.
"""
await authenticate_mcp_request()
if depth < 1 or depth > DOCS_LIST_MAX_DEPTH:
raise ValueError(f"`depth` must be between 1 and {DOCS_LIST_MAX_DEPTH}.")
index = _docs_index()
if not index.sections_by_path:
return []
if path is None:
return _expand_nav_entries(index, _ROOT_SECTION_PATH, depth)
normalized = path.strip("/")
if normalized in index.sections_by_path:
return _expand_nav_entries(index, normalized, depth)
if normalized in index.pages_by_path:
return [index.pages_by_path[normalized].to_catalog_dict()]
raise HTTPException(status_code=404, detail=f"Unknown docs section: {path!r}")
@traced_tool
async def read_doc(path: str, section: str | None = None) -> dict:
"""Read one docs page after you have narrowed to a likely match."""
await authenticate_mcp_request()
if not isinstance(path, str) or not path.strip():
raise ValueError("`path` must be a non-empty string.")
page = _get_page_or_404(path)
active_section = None
if section is not None:
active_section = _find_section(page, section)
if active_section is None:
raise HTTPException(
status_code=404,
detail=f"Unknown section {section!r} for docs page {path!r}",
)
return page.to_read_dict(section=active_section)
@traced_tool
async def search_docs(query: str, limit: int = 5) -> list[dict]:
"""Search the Dograh documentation and return a lean ranked shortlist.
Use this first for keyword or acronym lookup. Once the right page looks
likely, call ``read_doc(path)`` instead of reasoning from summaries alone.
"""
await authenticate_mcp_request()
if not isinstance(query, str) or not query.strip():
raise ValueError("`query` must be a non-empty string.")
if limit < 1:
raise ValueError("`limit` must be at least 1.")
terms = _tokenize_query(query)
if not terms:
raise ValueError(
"`query` must contain at least one non-stopword alphanumeric term."
)
index = _docs_index()
if not index.pages_by_path:
return []
capped_limit = min(limit, DOCS_SEARCH_MAX_LIMIT)
ranked: list[tuple[int, int, DocPage, DocSection | None]] = []
for page in index.pages_by_path.values():
score, best_section = _score_page(page, terms)
if score <= 0:
continue
ranked.append((score, page.order, page, best_section))
ranked.sort(key=lambda item: (-item[0], item[1], item[2].path))
return [
page.to_catalog_dict(section=best_section)
for _, _, page, best_section in ranked[:capped_limit]
]

View file

@ -10,16 +10,12 @@ Execution flow:
4. Save as a new draft via `db_client.save_workflow_draft` the
published version stays intact, so edits are rollback-safe.
Error codes surfaced to the LLM:
parse_error TS parse failed or a disallowed construct was used
validation_error node data failed spec validation (unknown field,
missing required, wrong type, option out of range)
schema_validation ReactFlowDTO Pydantic rejection (rare; parser bug)
graph_validation semantic graph rule broken (e.g. no start node)
bridge_error Node subprocess failed before returning JSON
All LLM-facing errors include file:line:column where available so the
LLM can correct its code directly.
Each failure path returns an `error_code` via `_error_result`. Those
codes and their meanings are documented in the `save_workflow` docstring
(the description shipped to the LLM via `tools/list`); keep the two in
sync `test_mcp_instructions_drift.py` enforces it. All LLM-facing
errors include file:line:column where available so the LLM can correct
its code directly.
"""
from __future__ import annotations
@ -91,6 +87,18 @@ async def save_workflow(workflow_id: int, code: str) -> dict[str, Any]:
On success the draft version is saved; the published version is
untouched.
On failure the result has `saved: false`, a machine-readable
`error_code`, and a human-readable `error` (with file:line:column
where the problem is locatable). Resubmit the full corrected source
patches are not accepted. Possible `error_code` values:
- `parse_error` disallowed construct or malformed TypeScript.
- `validation_error` node data failed spec validation (unknown
field, missing required, wrong type, option out of range).
- `schema_validation` wire-format (DTO) rejection; rare.
- `graph_validation` structural rule broken (e.g. no start node,
unreachable node, edge to/from the wrong node type).
- `bridge_error` internal/transient; retry once, then surface it.
"""
user = await authenticate_mcp_request()

View file

@ -1,6 +1,6 @@
import re
from collections import Counter
from typing import Any, Dict, List, Set
from typing import Dict, List, Set
from api.services.workflow.dto import EdgeDataDTO, NodeType, ReactFlowDTO
from api.services.workflow.errors import ItemKind, WorkflowError

View file

@ -0,0 +1,359 @@
"""Unit tests for the MCP docs discovery tools."""
from __future__ import annotations
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import HTTPException
from api.mcp_server.tools import docs_search as docs_search_module
from api.mcp_server.tools.docs_search import (
_docs_index,
_extract_page_title,
_resolve_docs_root,
_score_page,
_strip_frontmatter,
_tokenize_query,
list_docs,
read_doc,
search_docs,
)
def _clear_docs_caches() -> None:
docs_search_module._docs_index.cache_clear()
@pytest.fixture
def fake_docs_root(tmp_path: Path) -> Path:
docs_root = tmp_path / "docs"
docs_root.mkdir()
(docs_root / "getting-started").mkdir()
(docs_root / "getting-started" / "index.mdx").write_text(
"---\n"
'title: "Getting started"\n'
'description: "Start using Dograh."\n'
"---\n\n"
"# Getting started\n\n"
"Welcome to Dograh.\n",
encoding="utf-8",
)
(docs_root / "voice-agent").mkdir()
(docs_root / "voice-agent" / "introduction.mdx").write_text(
"---\n"
'title: "Voice Agent Builder"\n'
'description: "Build conversational workflows."\n'
"---\n\n"
"# Voice Agent Builder\n\n"
"Build workflows with nodes and tools.\n",
encoding="utf-8",
)
(docs_root / "voice-agent" / "tools").mkdir()
(docs_root / "voice-agent" / "tools" / "mcp-tool.mdx").write_text(
"---\n"
'title: "MCP Tool"\n'
'description: "Connect external MCP servers."\n'
'llm_hint: "Use for MCP server setup, remote tools, or model context protocol questions."\n'
"aliases:\n"
' - "model context protocol"\n'
"---\n\n"
"# MCP Tool\n\n"
"Connect an external MCP server to your voice agent.\n\n"
"## Authentication\n\n"
"Provide the MCP endpoint URL and headers.\n",
encoding="utf-8",
)
(docs_root / "deployment").mkdir()
(docs_root / "deployment" / "docker.mdx").write_text(
"---\n"
'title: "Docker"\n'
'description: "Deploy Dograh with Docker."\n'
'llm_hint: "Use for Docker deployment, local setup, remote setup, TURN server, coturn, or WebRTC connectivity questions."\n'
"aliases:\n"
' - "coturn"\n'
' - "turn server"\n'
"---\n\n"
"# Docker\n\n"
"Run Dograh with Docker.\n\n"
"## Troubleshooting WebRTC Connectivity\n\n"
"If audio fails or ICE fails, configure a TURN server. Coturn is the recommended choice.\n",
encoding="utf-8",
)
# Hidden/orphaned docs page: present on disk but not in docs.json, so it
# must not be indexed by the MCP tools.
(docs_root / "internal-only.mdx").write_text(
"---\n"
'title: "Internal TURN Notes"\n'
"---\n\n"
"# Internal TURN Notes\n\n"
"This page mentions zyxinternalturntoken but is not user-facing.\n",
encoding="utf-8",
)
(docs_root / "AGENTS.md").write_text("# Internal instructions\n", encoding="utf-8")
(docs_root / "docs.json").write_text(
"""{
"navigation": {
"tabs": [
{
"tab": "Guides",
"groups": [
{
"group": "Getting started",
"pages": [
"getting-started/index"
]
},
{
"group": "Voice Agent Builder",
"pages": [
"voice-agent/introduction",
{
"group": "Tools",
"pages": [
"voice-agent/tools/mcp-tool"
]
}
]
}
]
},
{
"tab": "Developer",
"groups": [
{
"group": "Deployment",
"pages": [
"deployment/docker"
]
}
]
}
]
}
}
""",
encoding="utf-8",
)
_clear_docs_caches()
with patch.dict(os.environ, {"DOGRAH_DOCS_PATH": str(docs_root)}):
yield docs_root
_clear_docs_caches()
@pytest.fixture
def authed_user():
class _FakeUser:
selected_organization_id = 1
id = 42
with patch(
"api.mcp_server.tools.docs_search.authenticate_mcp_request",
new=AsyncMock(return_value=_FakeUser()),
):
yield _FakeUser()
def test_tokenize_query_dedupes_and_drops_stopwords():
assert _tokenize_query("How do I configure a TURN server TURN?") == [
"configure",
"turn",
"server",
]
def test_tokenize_query_empty_input_returns_empty():
assert _tokenize_query("") == []
assert _tokenize_query("?? // !!") == []
def test_strip_frontmatter_removes_yaml_block():
body = '---\ntitle: "X"\n---\n\n# Heading\n'
assert _strip_frontmatter(body).startswith("# Heading")
def test_extract_page_title_prefers_frontmatter():
body = '---\ntitle: "Front Title"\n---\n\n# Heading Title\n'
assert _extract_page_title(body, fallback="x.mdx") == "Front Title"
def test_extract_page_title_falls_back_to_first_heading():
body = "# Heading Title\nbody\n"
assert _extract_page_title(body, fallback="x.mdx") == "Heading Title"
def test_score_page_uses_llm_hint_and_aliases():
page = docs_search_module.DocPage(
path="deployment/docker",
file_path="deployment/docker.mdx",
title="Docker",
description="Deploy Dograh with Docker.",
llm_hint="Use for TURN server and coturn setup.",
aliases=("coturn",),
breadcrumb=("Developer", "Deployment"),
content="Docker deployment.",
sections=(
docs_search_module.DocSection(
title="Troubleshooting WebRTC Connectivity",
slug="troubleshooting-webrtc-connectivity",
level=2,
content="Configure a TURN server with coturn.",
),
),
order=0,
)
score, section = _score_page(page, ["coturn"])
assert score > 0
assert section is not None
assert section.slug == "troubleshooting-webrtc-connectivity"
def test_resolve_docs_root_honors_env_override(tmp_path: Path):
docs = tmp_path / "custom_docs"
docs.mkdir()
(docs / "docs.json").write_text("{}", encoding="utf-8")
with patch.dict(os.environ, {"DOGRAH_DOCS_PATH": str(docs)}):
assert _resolve_docs_root() == docs.resolve()
@pytest.mark.asyncio
async def test_search_docs_ranks_turn_doc_and_uses_route_path(
fake_docs_root, authed_user
):
results = await search_docs("How do I configure coturn for WebRTC?")
assert results
assert results[0]["path"] == "deployment/docker"
assert results[0]["section_slug"] == "troubleshooting-webrtc-connectivity"
assert "TURN server" in results[0]["llm_hint"]
assert "snippet" not in results[0]
assert "score" not in results[0]
assert "url" not in results[0]
@pytest.mark.asyncio
async def test_search_docs_indexes_only_docs_json_pages(fake_docs_root, authed_user):
results = await search_docs("zyxinternalturntoken")
assert results == []
@pytest.mark.asyncio
async def test_search_docs_respects_limit(fake_docs_root, authed_user):
results = await search_docs("dograh", limit=1)
assert len(results) == 1
@pytest.mark.asyncio
async def test_search_docs_returns_empty_when_no_match(fake_docs_root, authed_user):
assert await search_docs("xyzzy unrelated zzz") == []
@pytest.mark.asyncio
async def test_search_docs_returns_empty_when_no_corpus(
tmp_path, authed_user, monkeypatch
):
nonexistent = tmp_path / "no-docs-here"
monkeypatch.setenv("DOGRAH_DOCS_PATH", str(nonexistent))
_clear_docs_caches()
with patch(
"api.mcp_server.tools.docs_search._resolve_docs_root", return_value=None
):
assert await search_docs("anything") == []
@pytest.mark.asyncio
async def test_search_docs_rejects_empty_query(fake_docs_root, authed_user):
with pytest.raises(ValueError, match="non-empty string"):
await search_docs("")
@pytest.mark.asyncio
async def test_search_docs_rejects_query_with_only_stopwords(
fake_docs_root, authed_user
):
with pytest.raises(ValueError, match="non-stopword"):
await search_docs("how do I")
@pytest.mark.asyncio
async def test_search_docs_rejects_zero_limit(fake_docs_root, authed_user):
with pytest.raises(ValueError, match="at least 1"):
await search_docs("Dograh", limit=0)
@pytest.mark.asyncio
async def test_list_docs_returns_top_level_sections(fake_docs_root, authed_user):
results = await list_docs()
assert results[0]["kind"] == "section"
assert results[0]["path"] == "guides/getting-started"
assert results[1]["path"] == "guides/voice-agent-builder"
@pytest.mark.asyncio
async def test_list_docs_depth_expands_children(fake_docs_root, authed_user):
results = await list_docs("guides/voice-agent-builder", depth=2)
paths = [item["path"] for item in results]
assert "voice-agent/introduction" in paths
assert "guides/voice-agent-builder/tools" in paths
assert "voice-agent/tools/mcp-tool" in paths
@pytest.mark.asyncio
async def test_list_docs_rejects_unknown_section(fake_docs_root, authed_user):
with pytest.raises(HTTPException, match="Unknown docs section"):
await list_docs("nope")
@pytest.mark.asyncio
async def test_read_doc_returns_full_page_and_sections(fake_docs_root, authed_user):
result = await read_doc("deployment/docker")
assert result["path"] == "deployment/docker"
assert result["title"] == "Docker"
assert "url" not in result
section_slugs = [section["slug"] for section in result["sections"]]
assert "docker" in section_slugs
assert "troubleshooting-webrtc-connectivity" in section_slugs
assert "Coturn" in result["content"] or "coturn" in result["content"].lower()
@pytest.mark.asyncio
async def test_read_doc_can_target_section(fake_docs_root, authed_user):
result = await read_doc(
"deployment/docker",
section="troubleshooting-webrtc-connectivity",
)
assert result["section_slug"] == "troubleshooting-webrtc-connectivity"
assert "ICE fails" in result["content"] or "TURN server" in result["content"]
assert "Run Dograh with Docker." not in result["content"]
@pytest.mark.asyncio
async def test_read_doc_rejects_unknown_page(fake_docs_root, authed_user):
with pytest.raises(HTTPException, match="Unknown docs page"):
await read_doc("missing/page")
@pytest.mark.asyncio
async def test_read_doc_rejects_unknown_section(fake_docs_root, authed_user):
with pytest.raises(HTTPException, match="Unknown section"):
await read_doc("deployment/docker", section="missing-section")
def test_docs_index_uses_docs_json_navigation(fake_docs_root):
index = _docs_index()
assert "internal-only" not in index.pages_by_path
assert "guides/voice-agent-builder/tools" in index.sections_by_path
assert index.pages_by_path["voice-agent/tools/mcp-tool"].breadcrumb == (
"Guides",
"Voice Agent Builder",
"Tools",
)

View file

@ -0,0 +1,115 @@
"""Drift guards between the static MCP guide and the live tool surface.
`api/mcp_server/instructions.py` is free text baked into the client
system prompt. It is *not* the authoritative description of the tools
names, signatures, and per-tool error codes reach the model dynamically
via `tools/list`, derived from each tool's own function signature and
docstring. These tests fail on the two classic drift modes:
1. The guide references a tool that is no longer registered (renamed or
removed) the model would be told to call something that 404s.
2. A tool returns an `error_code` that is absent from the description it
ships via `tools/list` the model can't learn to recover from it.
Keep the guide about orchestration (call order, hard constraints) and let
the tools describe themselves.
"""
from __future__ import annotations
import re
from pathlib import Path
import pytest
from api.mcp_server import instructions as instructions_module
from api.mcp_server.server import mcp
from api.mcp_server.tools import create_workflow as create_workflow_module
from api.mcp_server.tools import save_workflow as save_workflow_module
# Every registered MCP tool name starts with one of these verbs. A
# backticked snake_case token in the guide whose leading word is a verb is
# treated as a tool reference; field/reference names like `tool_refs`,
# `credential_ref`, or `pre_call_fetch` don't start with a verb and are
# correctly ignored. Extend this only when a new tool introduces a new
# leading verb (a missing verb under-checks, it never false-fails).
_TOOL_VERB_PREFIXES = frozenset(
{
"search",
"read",
"list",
"get",
"save",
"create",
"update",
"delete",
"add",
"remove",
"set",
}
)
# A backtick immediately followed by a snake_case identifier (>= 1
# underscore). Anchoring on the opening backtick captures the leading
# identifier of a code span whether it is bare (`read_doc`) or a call
# (`read_doc(path)`), while skipping DSL constructs like `wf.edge` or
# `new Workflow` whose first char after the backtick isn't `[a-z_]`.
_BACKTICKED_SNAKE_RE = re.compile(r"`([a-z][a-z0-9]*(?:_[a-z0-9]+)+)")
# Error codes are emitted as the first string arg to `_error_result(...)`.
_ERROR_RESULT_LITERAL_RE = re.compile(r'_error_result\(\s*"([a-z_]+)"')
# `parse_error` / `validation_error` are picked by a `code_key` ternary
# rather than passed as a literal to `_error_result`, so match them too.
_CODE_KEY_LITERAL_RE = re.compile(r'"(parse_error|validation_error)"')
def _referenced_tool_names(text: str) -> set[str]:
return {
token
for token in _BACKTICKED_SNAKE_RE.findall(text)
if token.split("_", 1)[0] in _TOOL_VERB_PREFIXES
}
def _returned_error_codes(module) -> set[str]:
source = Path(module.__file__).read_text(encoding="utf-8")
return set(_ERROR_RESULT_LITERAL_RE.findall(source)) | set(
_CODE_KEY_LITERAL_RE.findall(source)
)
@pytest.mark.asyncio
async def test_guide_only_references_registered_tools():
registered = {tool.name for tool in await mcp.list_tools()}
referenced = _referenced_tool_names(instructions_module.DOGRAH_MCP_INSTRUCTIONS)
assert referenced, "no tool references extracted — the regex likely broke"
unknown = sorted(referenced - registered)
assert not unknown, (
f"instructions.py references tools that are not registered: {unknown}. "
f"Rename/remove the reference or register the tool. "
f"Registered tools: {sorted(registered)}."
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_name, module",
[
("save_workflow", save_workflow_module),
("create_workflow", create_workflow_module),
],
)
async def test_tool_documents_every_error_code_it_returns(tool_name, module):
descriptions = {
tool.name: tool.description or "" for tool in await mcp.list_tools()
}
description = descriptions[tool_name]
returned = _returned_error_codes(module)
assert returned, f"no error codes detected in {tool_name} source — regex broke"
undocumented = sorted(code for code in returned if code not in description)
assert not undocumented, (
f"{tool_name} returns error_code(s) {undocumented} absent from the description "
f"shipped via tools/list. Document them in the {tool_name} docstring."
)