fix(filesystem): enforce pifs shell command limits

This commit is contained in:
BukeLy 2026-05-26 20:27:40 +08:00
parent cb9db0bab9
commit 9734bf6914
8 changed files with 780 additions and 29 deletions

View file

@ -43,6 +43,10 @@ If the user asks a workspace-related topic question without naming a specific
file, treat it as a retrieval task. Use available PIFS discovery commands to
look for relevant files and inspect evidence before answering. Ask the user to
clarify only after a reasonable search cannot identify relevant evidence.
Do not conclude that no relevant document exists from one failed grep. If grep
returns no matches for a workspace topic, verify with available semantic
candidate discovery such as search-summary, or inspect likely document
structure, before saying that the workspace lacks evidence.
Follow the task prompt for command policy, retrieval strategy, and answer
format. If the caller needs stricter behavior, pass an explicit system_prompt.
@ -53,10 +57,15 @@ Run a command in the PageIndex FileSystem virtual shell. This is not a real
operating-system shell. By default the tool is read-only: use ls, tree, find,
grep, cat, stat, head, tail, sed, and any dynamically available semantic search
commands described in the workspace context. grep -R is lexical evidence search;
grep does not support regex alternation such as "a|b"; run multiple grep
commands or use search-summary for semantic candidate discovery instead.
semantic search commands such as search-summary return candidate documents and
do not guarantee literal text matches. Use search-summary when the user asks for
summary search, semantic search, or vector search and the command is listed as
available. Errors are returned as text prefixed with ERROR. Do not call
available. Quote multi-word semantic queries, for example:
search-summary "Federal Reserve" /documents. Do not write
search-summary Federal Reserve /documents. Errors are returned as text prefixed
with ERROR. Do not call
commands that are not listed as available. When evidence is required, inspect it
with cat or grep before answering. Prefer shell-like target-first cat syntax
with stable targets: cat <path> --structure, cat <path> --page 31-59, and
@ -64,8 +73,14 @@ cat <path> --node 0009. You may also use file_ref or document_id when a path is
ambiguous. After structure identifies a relevant section node, prefer
cat <path> --node <node_id>; use cat <path> --page <range> when the user asks
for page-level evidence, no suitable node exists, or exact page text is needed.
cat <path> --structure is paginated; request more with --offset if needed. Page
reads are limited to three pages at once, node reads to at most five node ids,
and text cat --all returns only the first page of text lines.
For questions about metadata fields, available summaries, or whether metadata
was provided, inspect stat --schema and stat <target> before making claims.
Do not use stat as a general content/topic discovery step. For document Q&A,
prefer search-summary/find/grep for candidates, then cat --structure and
cat --node or cat --page for evidence.
"""
AGENT_TOOL_POLICY = """
@ -76,12 +91,19 @@ Tool policy:
- Folder paths such as /documents are positional command targets; never put folder paths in --where.
- Use --where only with metadata fields shown by stat --schema.
- grep -R performs lexical evidence search.
- grep does not support regex alternation such as "a|b"; run separate grep commands or use search-summary for semantic candidate discovery.
- Semantic search commands are candidate-discovery tools and do not guarantee literal text matches.
- If search-summary is available and the user asks for summary search, semantic search, vector search, or "用 summary 搜", use search-summary <query> <folder>; do not translate that request into find --where.
- A single failed grep is not enough evidence to say there is no relevant document. If grep returns no matches for a workspace-topic question, verify with search-summary or another available semantic/vector candidate command, or inspect likely document structure, before answering no-evidence.
- If search-summary is available and the user asks for summary search, semantic search, vector search, or "用 summary 搜", use search-summary "<query>" <folder>; quote multi-word queries, for example search-summary "Federal Reserve" /documents; do not translate that request into find --where.
- Tool errors are returned as ERROR text; recover by trying an available command.
- Use cat or grep to gather evidence before making source-backed claims.
- For broad topic, method, or "what solution" questions that are likely about the workspace, search for candidate documents before asking the user to choose a document.
- Use stat only for metadata/schema/status questions or to resolve ambiguous target identity. Do not run stat merely to understand what a document says.
- Prefer target-first cat syntax with stable targets: cat <path> --structure, cat <path> --page 31-59, cat <path> --node <node_id>.
- cat <target> --structure returns at most 25 nodes; use --offset and --limit for more structure pages.
- cat <target> --page accepts at most 3 pages at once. If a larger range is needed, first inspect cat <target> --structure and then read a smaller page range or node.
- cat <target> --node accepts at most 5 node ids at once. Prefer one relevant node when possible.
- cat <target> --all returns at most 100 text lines; use cat <target> --range <start>-<end> for the next page.
- After cat <target> --structure finds a relevant section/subsection with a node_id, prefer cat <target> --node <node_id> for content from that semantic unit.
- Use cat <target> --page <start>-<end> when the user explicitly asks for pages/page ranges, when no suitable node_id exists, or when you need exact page text to verify page-level evidence.
- Avoid fetching a broad page span after a matching node is available unless page-level citation or verification is required.

View file

@ -46,6 +46,20 @@ class PIFSCommandExecutor:
"search-relation": "_cmd_search_relation",
"semantic-grep": "_cmd_semantic_grep",
}
MAX_CHAINED_COMMANDS = 3
MAX_PIPE_COMMANDS = 3
MAX_LS_LIMIT = 100
MAX_TREE_LIMIT = 200
MAX_FIND_LIMIT = 50
MAX_GREP_LIMIT = 20
MAX_SEMANTIC_LIMIT = 20
MAX_TEXT_LINES = 100
MAX_PAGE_SPAN = 3
MAX_STRUCTURE_NODES = 25
MAX_NODE_IDS = 5
MAX_NODE_TEXT_LINES = 100
MAX_NODE_TEXT_CHARS = 12_000
MAX_STAT_FIELD_TARGETS = 20
MAX_TREE_DEPTH = 4
MAX_LS_RENDER_FILES = 25
MAX_STAT_METADATA_FIELDS = 8
@ -92,8 +106,11 @@ class PIFSCommandExecutor:
"- find --where: exact/canonical metadata DSL filtering using stat --schema fields only",
"- find <folder> -maxdepth N -type f|d: bounded folder traversal for find",
"- grep -R: recursive lexical/FTS search only; semantic vector prefilter is disabled",
"- cat <path|file_ref|document_id> --structure/--node/--page: cached PageIndex reads for PDF/Markdown files",
"- cat <path|file_ref|document_id> --all: full text artifact reads for txt/text files",
"- cat <path|file_ref|document_id> --structure: cached PageIndex node list, paginated at 25 nodes",
"- cat <path|file_ref|document_id> --page: cached PageIndex page reads, limited to 3 pages",
"- cat <path|file_ref|document_id> --node: cached PageIndex node reads, limited to 5 node ids",
"- cat <path|file_ref|document_id> --all: text artifact reads for txt/text files, paginated at 100 lines",
"- stat --field <metadata_field> <target...>: one metadata field across up to 20 documents",
]
if "entity" in semantic_channels:
lines.append("- find --name: entity semantic candidate discovery alias")
@ -123,6 +140,12 @@ class PIFSCommandExecutor:
if not command.strip():
raise PIFSCommandError("Empty command")
commands = self._split_chained_commands(command)
if len(commands) > self.MAX_CHAINED_COMMANDS:
raise PIFSCommandError(
f"Command chain supports at most {self.MAX_CHAINED_COMMANDS} commands. "
"Run fewer commands or narrow the request first; if you are unsure where "
"to inspect, use cat <target> --structure."
)
if len(commands) > 1:
return "\n".join(self._execute_pipeline(part) for part in commands)
return self._execute_pipeline(commands[0])
@ -133,6 +156,12 @@ class PIFSCommandExecutor:
def _execute_pipeline(self, command: str) -> str:
commands = self._split_piped_commands(command)
if len(commands) > self.MAX_PIPE_COMMANDS:
raise PIFSCommandError(
f"Pipeline supports at most {self.MAX_PIPE_COMMANDS} commands. "
"Use a smaller command and explicit limits; if you are unsure where "
"to inspect, use cat <target> --structure."
)
output = self._execute_single(commands[0])
for pipe_command in commands[1:]:
output = self._execute_pipe_filter(output, pipe_command)
@ -170,7 +199,13 @@ class PIFSCommandExecutor:
self._validate_tokens(tokens)
name = tokens[0]
if name not in self.ALLOWED_PIPE_FILTERS:
raise PIFSCommandError(f"Unsupported pipe command: {name}")
raise PIFSCommandError(
f"Unsupported pipe command: {name}. Supported pipes are: "
f"{', '.join(sorted(self.ALLOWED_PIPE_FILTERS))}. "
"If you meant regex alternation such as a|b, PIFS grep/search "
"does not support it; run multiple grep or search-summary "
"commands with one phrase each."
)
if name == "head":
return self._pipe_head_tail(input_text, tokens[1:], from_tail=False)
if name == "tail":
@ -183,7 +218,7 @@ class PIFSCommandExecutor:
def _cmd_ls(self, args: list[str]) -> Any:
recursive = False
limit = 100
limit = self.MAX_LS_LIMIT
path = "/"
i = 0
while i < len(args):
@ -192,7 +227,9 @@ class PIFSCommandExecutor:
recursive = True
elif arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i], "ls --limit", max_value=self.MAX_LS_LIMIT
)
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported ls option: {arg}")
else:
@ -202,17 +239,19 @@ class PIFSCommandExecutor:
def _cmd_tree(self, args: list[str]) -> Any:
path = "/"
limit = 1000
limit = self.MAX_TREE_LIMIT
depth = 2
i = 0
while i < len(args):
arg = args[i]
if arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i], "tree --limit", max_value=self.MAX_TREE_LIMIT
)
elif arg in {"--depth", "-L"}:
i += 1
depth = int(args[i])
depth = self._parse_non_negative_int(args[i], "tree --depth")
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported tree option: {arg}")
else:
@ -247,7 +286,9 @@ class PIFSCommandExecutor:
relation = args[i]
elif arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i], "find --limit", max_value=self.MAX_FIND_LIMIT
)
elif arg == "-type":
i += 1
file_type = args[i]
@ -332,7 +373,9 @@ class PIFSCommandExecutor:
where = args[i]
elif arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i], "grep --limit", max_value=self.MAX_GREP_LIMIT
)
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported grep option: {arg}")
else:
@ -341,6 +384,7 @@ class PIFSCommandExecutor:
if not positionals:
raise PIFSCommandError("grep requires a query")
query = positionals[0]
self._reject_regex_alternation_query(query, "grep")
path = positionals[1] if len(positionals) > 1 else "/"
if self._is_folder(path):
normalized = self._normalize_folder_path(path)
@ -438,8 +482,10 @@ class PIFSCommandExecutor:
)
location = "all"
structural_mode: str | None = None
node_id: str | None = None
node_ids: list[str] = []
page_range: str | None = None
structure_offset = 0
structure_limit = self.MAX_STRUCTURE_NODES
i = 1
while i < len(args):
arg = args[i]
@ -452,12 +498,26 @@ class PIFSCommandExecutor:
location = "all"
elif arg == "--structure":
structural_mode = "structure"
elif arg == "--offset":
i += 1
if i >= len(args):
raise PIFSCommandError("cat --structure --offset requires a value")
structure_offset = self._parse_non_negative_int(args[i], "cat --structure --offset")
elif arg == "--limit":
i += 1
if i >= len(args):
raise PIFSCommandError("cat --structure --limit requires a value")
structure_limit = self._parse_bounded_int(
args[i],
"cat --structure --limit",
max_value=self.MAX_STRUCTURE_NODES,
)
elif arg == "--node":
i += 1
if i >= len(args):
raise PIFSCommandError("cat --node requires a node id")
structural_mode = "node"
node_id = args[i]
node_ids.extend(self._parse_node_ids(args[i]))
elif arg == "--page":
i += 1
if i >= len(args):
@ -473,27 +533,121 @@ class PIFSCommandExecutor:
)
i += 1
if structural_mode == "structure":
return self.filesystem.pageindex_structure(target)
if structure_limit < 1:
raise PIFSCommandError(
"cat --structure --limit must be at least 1 and at most "
f"{self.MAX_STRUCTURE_NODES}."
)
data = self.filesystem.pageindex_structure(
target,
offset=structure_offset,
limit=structure_limit,
)
self._attach_structure_next_command(data, target)
return data
if structural_mode == "node":
return self.filesystem.pageindex_node(target, str(node_id))
self._require_at_most(
len(node_ids),
"cat --node node count",
self.MAX_NODE_IDS,
)
if not node_ids:
raise PIFSCommandError("cat --node requires a node id")
node_results = [
self._bounded_node_result(
self.filesystem.pageindex_node(target, node_id),
target=target,
node_id=node_id,
)
for node_id in node_ids
]
if len(node_results) == 1:
return node_results[0]
return {
"mode": "nodes",
"target": target,
"available": all(result.get("available") is not False for result in node_results),
"node_ids": node_ids,
"nodes": node_results,
"text": "\n\n".join(
f"[node {result.get('node_id') or node_id}]\n{result.get('text', '')}"
for node_id, result in zip(node_ids, node_results)
),
}
if structural_mode == "page":
if not page_range or not re.fullmatch(r"\d+(?:-\d+)?", page_range):
raise PIFSCommandError(
"cat --page requires one page selector like 31 or 31-59. "
"Use: cat <path|file_ref|document_id> --page <page-or-range>"
)
return self.filesystem.pageindex_pages(target, page_range)
return self.filesystem.cat_text_artifact(target, location)
start, end = self._parse_numeric_range(page_range, "cat --page")
self._require_at_most(
end - start + 1,
"cat --page page count",
self.MAX_PAGE_SPAN,
)
data = self.filesystem.pageindex_pages(target, page_range)
self._attach_page_next_command(data, target, start=start, end=end)
return data
return self._bounded_text_artifact(target, location)
def _cmd_stat(self, args: list[str]) -> Any:
if args and args[0] == "--schema":
schema = False
field: str | None = None
targets: list[str] = []
i = 0
while i < len(args):
arg = args[i]
if arg == "--schema":
schema = True
elif arg == "--field":
i += 1
if i >= len(args):
raise PIFSCommandError("stat --field requires a metadata field name")
field = args[i]
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported stat option: {arg}")
else:
targets.append(arg)
i += 1
if schema:
if field or targets:
raise PIFSCommandError("stat --schema cannot be combined with file targets or --field")
return self.filesystem._metadata_schema()
if not args:
if field:
if not targets:
raise PIFSCommandError("stat --field requires at least one file target")
self._require_at_most(
len(targets),
"stat --field target count",
self.MAX_STAT_FIELD_TARGETS,
)
self._validate_metadata_field_for_stat(field)
return {
"mode": "field_values",
"field": field,
"target_count": len(targets),
"max_targets": self.MAX_STAT_FIELD_TARGETS,
"data": [self._stat_field_row(field, target) for target in targets],
}
if not targets:
raise PIFSCommandError("stat requires a file target or --schema")
return {"target": args[0], **self.filesystem._stat(args[0])}
self._require_at_most(
len(targets),
"stat target count",
self.MAX_STAT_FIELD_TARGETS,
)
if len(targets) == 1:
return {"target": targets[0], **self.filesystem._stat(targets[0])}
return {
"mode": "files",
"target_count": len(targets),
"data": [{"target": target, **self.filesystem._stat(target)} for target in targets],
}
def _cmd_head(self, args: list[str]) -> Any:
count, target = self._parse_standalone_head_tail(args, default_count=10)
count = self._require_at_most(count, "head line count", self.MAX_TEXT_LINES)
opened = self.filesystem.cat_text_artifact(target, "all")
lines = opened.text.splitlines()
text = "\n".join(lines[:count])
@ -501,6 +655,7 @@ class PIFSCommandExecutor:
def _cmd_tail(self, args: list[str]) -> Any:
count, target = self._parse_standalone_head_tail(args, default_count=10)
count = self._require_at_most(count, "tail line count", self.MAX_TEXT_LINES)
opened = self.filesystem.cat_text_artifact(target, "all")
lines = opened.text.splitlines()
selected = lines[-count:] if count else []
@ -518,9 +673,13 @@ class PIFSCommandExecutor:
match = re.fullmatch(r"(\d+),(\d+)p", args[1])
if not match:
raise PIFSCommandError("sed supports only: sed -n '<start>,<end>p' <target>")
start, end = int(match.group(1)), int(match.group(2))
if start < 1 or end < start:
raise PIFSCommandError("Invalid sed line range")
self._require_at_most(end - start + 1, "sed line count", self.MAX_TEXT_LINES)
return self.filesystem.cat_text_artifact(
args[2],
f"{match.group(1)}-{match.group(2)}",
f"{start}-{end}",
)
def _cmd_search_summary(self, args: list[str]) -> Any:
@ -551,7 +710,9 @@ class PIFSCommandExecutor:
where = args[i]
elif arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i], "semantic-grep --limit", max_value=self.MAX_SEMANTIC_LIMIT
)
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported semantic-grep option: {arg}")
else:
@ -566,7 +727,9 @@ class PIFSCommandExecutor:
)
if not positionals:
raise PIFSCommandError("semantic-grep requires a query")
self._validate_search_positionals("semantic-grep", positionals)
query = positionals[0]
self._reject_regex_alternation_query(query, "semantic-grep")
path = positionals[1] if len(positionals) > 1 else "/"
if not self._is_folder(path):
raise PIFSCommandError("semantic-grep target must be a folder")
@ -594,7 +757,11 @@ class PIFSCommandExecutor:
where = args[i]
elif arg == "--limit":
i += 1
limit = int(args[i])
limit = self._parse_bounded_int(
args[i],
f"search-{channel} --limit",
max_value=self.MAX_SEMANTIC_LIMIT,
)
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported search-{channel} option: {arg}")
else:
@ -602,7 +769,9 @@ class PIFSCommandExecutor:
i += 1
if not positionals:
raise PIFSCommandError(f"search-{channel} requires a query")
self._validate_search_positionals(f"search-{channel}", positionals)
query = positionals[0]
self._reject_regex_alternation_query(query, f"search-{channel}")
path = positionals[1] if len(positionals) > 1 else "/"
normalized = self._normalize_folder_path(path)
results = self.filesystem.search_semantic_channel(
@ -679,6 +848,214 @@ class PIFSCommandExecutor:
available = set(self.filesystem.semantic_retrieval_channels())
return tuple(channel for channel in SEMANTIC_GREP_CHANNELS if channel in available)
def _bounded_text_artifact(self, target: str, location: str) -> dict[str, Any]:
if str(location).strip().lower() in {"all", "full", "*"}:
start, end = 1, self.MAX_TEXT_LINES
else:
start, end = self._parse_numeric_range(location, "cat --range")
self._require_at_most(
end - start + 1,
"cat --range line count",
self.MAX_TEXT_LINES,
)
opened = self.filesystem.cat_text_artifact(target, f"{start}-{end}")
data = self._jsonable(opened)
total_lines = len(self.filesystem.store.read_text(opened.file_ref).splitlines())
has_more = int(data.get("end_line") or end) < total_lines
pagination = {
"offset_line": start,
"limit": self.MAX_TEXT_LINES,
"returned_lines": max(0, int(data.get("end_line") or end) - start + 1),
"total_lines": total_lines,
"has_more": has_more,
"next_range": None,
"next_command": None,
}
if has_more:
next_start = int(data.get("end_line") or end) + 1
next_end = min(total_lines, next_start + self.MAX_TEXT_LINES - 1)
next_range = f"{next_start}-{next_end}"
pagination["next_range"] = next_range
pagination["next_command"] = (
f"cat {shlex.quote(target)} --range {shlex.quote(next_range)}"
)
data["text"] = (
str(data.get("text") or "").rstrip()
+ "\n"
+ self._pagination_footer(
"cat --all",
f"showing lines {start}-{data.get('end_line')} of {total_lines}",
str(pagination["next_command"]),
)
).strip()
data["pagination"] = pagination
return data
def _bounded_node_result(
self,
data: dict[str, Any],
*,
target: str,
node_id: str,
) -> dict[str, Any]:
if not isinstance(data, dict) or data.get("available") is False:
return data
text = str(data.get("text") or "")
lines = text.splitlines()
truncated_by_lines = len(lines) > self.MAX_NODE_TEXT_LINES
truncated_by_chars = len(text) > self.MAX_NODE_TEXT_CHARS
if not truncated_by_lines and not truncated_by_chars:
data["node_pagination"] = {
"limit_nodes": self.MAX_NODE_IDS,
"text_truncated": False,
}
return data
selected = "\n".join(lines[: self.MAX_NODE_TEXT_LINES])
if len(selected) > self.MAX_NODE_TEXT_CHARS:
selected = selected[: self.MAX_NODE_TEXT_CHARS].rstrip()
data["text"] = (
selected.rstrip()
+ "\n"
+ self._pagination_footer(
"cat --node",
(
f"node text limited to {self.MAX_NODE_TEXT_LINES} lines/"
f"{self.MAX_NODE_TEXT_CHARS} chars"
),
f"cat {shlex.quote(target)} --structure",
)
).strip()
data["node_pagination"] = {
"limit_nodes": self.MAX_NODE_IDS,
"line_limit": self.MAX_NODE_TEXT_LINES,
"char_limit": self.MAX_NODE_TEXT_CHARS,
"original_lines": len(lines),
"original_chars": len(text),
"text_truncated": True,
"suggested_command": f"cat {shlex.quote(target)} --structure",
"node_id": node_id,
}
return data
def _attach_structure_next_command(self, data: dict[str, Any], target: str) -> None:
pagination = data.get("structure_pagination")
if not isinstance(pagination, dict):
return
if pagination.get("has_more") and pagination.get("next_offset") is not None:
next_command = (
f"cat {shlex.quote(target)} --structure "
f"--offset {pagination['next_offset']} --limit {pagination['limit']}"
)
pagination["next_command"] = next_command
else:
pagination["next_command"] = None
def _attach_page_next_command(
self,
data: dict[str, Any],
target: str,
*,
start: int,
end: int,
) -> None:
page_count = end - start + 1
next_command = None
if page_count == self.MAX_PAGE_SPAN:
next_start = end + 1
next_end = next_start + self.MAX_PAGE_SPAN - 1
next_command = f"cat {shlex.quote(target)} --page {next_start}-{next_end}"
data["page_pagination"] = {
"start": start,
"end": end,
"returned_pages": page_count,
"limit": self.MAX_PAGE_SPAN,
"next_command": next_command,
}
@staticmethod
def _pagination_footer(command: str, reason: str, next_command: str) -> str:
return (
f"# output limited by {command}: {reason}. "
f"Next: {next_command}. If unsure, use cat <target> --structure."
)
@staticmethod
def _parse_node_ids(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
@staticmethod
def _reject_regex_alternation_query(query: str, command_name: str) -> None:
if "|" not in str(query):
return
raise PIFSCommandError(
f"{command_name} does not support regex alternation '|'. "
"Run multiple grep commands or multiple search-summary commands "
"with one phrase each."
)
@staticmethod
def _validate_search_positionals(command_name: str, positionals: list[str]) -> None:
if len(positionals) > 2:
raise PIFSCommandError(
f"{command_name} accepts one query and an optional folder path. "
f"Quote multi-word queries, for example: {command_name} "
'"Federal Reserve" /documents'
)
if len(positionals) == 2 and not positionals[1].startswith("/"):
raise PIFSCommandError(
f"{command_name} target must be a PIFS folder path like /documents. "
f"If your query has spaces, quote it, for example: {command_name} "
'"Federal Reserve" /documents'
)
@staticmethod
def _parse_numeric_range(value: str, label: str) -> tuple[int, int]:
try:
if "-" in value:
left, right = value.split("-", 1)
start, end = int(left), int(right)
else:
start = end = int(value)
except ValueError as exc:
raise PIFSCommandError(f"{label} requires a numeric range") from exc
if start < 1 or end < start:
raise PIFSCommandError(f"Invalid {label} range: {value}")
return start, end
def _validate_metadata_field_for_stat(self, field: str) -> None:
schema = self.filesystem._metadata_schema()
fields = schema.get("fields", {})
if field not in fields:
available = ", ".join(sorted(fields)[:20]) or "(none)"
raise PIFSCommandError(
f"Unknown metadata field: {field}. Use stat --schema to inspect fields. "
f"Available fields include: {available}"
)
def _stat_field_row(self, field: str, target: str) -> dict[str, Any]:
info = self.filesystem._stat(target)
folder_paths = [
folder.get("path", "")
for folder in info.get("folders", [])
if folder.get("path")
]
row = dict(info)
row["target"] = target
row["folder_paths"] = folder_paths
metadata = info.get("metadata") or {}
raw_value = metadata.get(field)
value_text = "" if raw_value is None else str(raw_value)
row.update(
{
"field": field,
"present": field in metadata,
"value": raw_value if field in metadata else None,
"display_target": self._file_target_path(row),
}
)
return row
def _render(self, data: Any, *, json_output: bool, command_name: str) -> str:
jsonable = self._jsonable(data)
if json_output:
@ -714,7 +1091,14 @@ class PIFSCommandExecutor:
if data.get("available") is False:
return f"# {data.get('message', 'PageIndex structural content is unavailable')}"
if data.get("mode") == "structure":
return json.dumps(data.get("structure", {}), ensure_ascii=False, indent=2)
return json.dumps(
{
"structure": data.get("structure", []),
"pagination": data.get("structure_pagination", {}),
},
ensure_ascii=False,
indent=2,
)
return str(data.get("text", ""))
def _render_listing(self, data: Any) -> str:
@ -839,6 +1223,19 @@ class PIFSCommandExecutor:
for name, field in sorted(data["fields"].items()):
lines.append(f"{name}: {field.get('type', 'string')}")
return "\n".join(lines)
if data.get("mode") == "field_values":
field = data.get("field", "")
lines = []
for item in data.get("data", []):
lines.append(f"{item.get('display_target') or item.get('target')}:")
value = item.get("value")
if value is None:
lines.append(f"{field}: -")
else:
lines.append(f"{field}: {self._one_line_value(value)}")
return "\n\n".join(lines)
if data.get("mode") == "files":
return "\n\n".join(self._render_stat(item) for item in data.get("data", []))
lines = [
f"target: {data.get('target') or data.get('file_ref')}",
f"file_ref: {data.get('file_ref')}",
@ -1298,6 +1695,12 @@ class PIFSCommandExecutor:
return cls._compact_text(json.dumps(value, ensure_ascii=False, sort_keys=True), max_chars=120)
return cls._compact_text(str(value), max_chars=120)
@staticmethod
def _one_line_value(value: Any) -> str:
if isinstance(value, (dict, list)):
value = json.dumps(value, ensure_ascii=False, sort_keys=True)
return re.sub(r"\s+", " ", str(value or "")).strip()
@staticmethod
def _compact_text(text: str, *, max_chars: int) -> str:
collapsed = re.sub(r"\s+", " ", text or "").strip()
@ -1399,6 +1802,11 @@ class PIFSCommandExecutor:
def _pipe_head_tail(self, input_text: str, args: list[str], *, from_tail: bool) -> str:
count = self._parse_head_tail_count(args)
count = self._require_at_most(
count,
"pipe head/tail line count",
self.MAX_TEXT_LINES,
)
payload = self._try_json_loads(input_text)
if payload is not None:
return self._render_json_payload(self._slice_payload(payload, count, from_tail=from_tail))
@ -1425,6 +1833,7 @@ class PIFSCommandExecutor:
if len(patterns) != 1:
raise PIFSCommandError("pipe grep requires exactly one pattern")
pattern = patterns[0]
self._reject_regex_alternation_query(pattern, "pipe grep")
payload = self._try_json_loads(input_text)
if payload is not None:
return self._render_json_payload(
@ -1457,6 +1866,7 @@ class PIFSCommandExecutor:
end = int(match.group(2) or match.group(1))
if start < 1 or end < start:
raise PIFSCommandError("Invalid sed line range")
self._require_at_most(end - start + 1, "pipe sed line count", self.MAX_TEXT_LINES)
payload = self._try_json_loads(input_text)
if payload is not None:
return self._render_json_payload(self._slice_text_payload(payload, start, end))
@ -1516,6 +1926,21 @@ class PIFSCommandExecutor:
raise PIFSCommandError(f"{label} must be non-negative")
return parsed
@classmethod
def _parse_bounded_int(cls, value: str, label: str, *, max_value: int) -> int:
parsed = cls._parse_non_negative_int(value, label)
return cls._require_at_most(parsed, label, max_value)
@classmethod
def _require_at_most(cls, value: int, label: str, max_value: int) -> int:
if value > max_value:
raise PIFSCommandError(
f"{label} supports at most {max_value}; requested {value}. "
"Use a smaller value. If you are unsure where to inspect, "
"use cat <target> --structure first."
)
return value
@staticmethod
def _parse_find_maxdepth(value: str | None) -> int:
if value is None:

View file

@ -32,6 +32,7 @@ from .store import (
normalize_path,
)
from .structural_read import (
flatten_pageindex_structure_nodes,
first_node_location,
find_pageindex_node,
strip_pageindex_text_fields,
@ -621,7 +622,13 @@ class PageIndexFileSystem:
start, end = self._parse_line_range(location)
return self._open_lines(file_ref, start, end)
def pageindex_structure(self, target: str) -> dict[str, Any]:
def pageindex_structure(
self,
target: str,
*,
offset: int = 0,
limit: int = 25,
) -> dict[str, Any]:
file_ref = self._resolve_target(target)
entry = self.store.get_file(file_ref)
self._require_pageindex_document_file(entry, "cat --structure")
@ -642,6 +649,12 @@ class PageIndexFileSystem:
entry,
message=str(structure["error"]),
)
node_rows = flatten_pageindex_structure_nodes(structure)
offset = max(0, offset)
limit = max(0, limit)
window = node_rows[offset : offset + limit] if limit else []
next_offset = offset + len(window)
has_more = next_offset < len(node_rows)
return {
"mode": "structure",
"file_ref": file_ref,
@ -650,7 +663,15 @@ class PageIndexFileSystem:
"status": entry.pageindex_tree_status,
"available": True,
"pageindex_doc_id": doc_id,
"structure": strip_pageindex_text_fields(structure),
"structure": window,
"structure_pagination": {
"offset": offset,
"limit": limit,
"returned_nodes": len(window),
"total_nodes": len(node_rows),
"has_more": has_more,
"next_offset": next_offset if has_more else None,
},
}
def pageindex_node(self, target: str, node_id: str) -> dict[str, Any]:

View file

@ -16,6 +16,43 @@ def strip_pageindex_text_fields(value: Any) -> Any:
return value
def flatten_pageindex_structure_nodes(structure: Any) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
def visit(value: Any, *, depth: int, parent_node_id: str | None) -> None:
if isinstance(value, list):
for item in value:
visit(item, depth=depth, parent_node_id=parent_node_id)
return
if not isinstance(value, dict):
return
node_id = value.get("node_id")
child_values: list[Any] = []
for child_key in ("nodes", "children"):
children = value.get(child_key)
if isinstance(children, list):
child_values.extend(children)
row = {
key: strip_pageindex_text_fields(item)
for key, item in value.items()
if key not in {"text", "nodes", "children"}
}
row["depth"] = depth
row["children_count"] = len(child_values)
if parent_node_id:
row["parent_node_id"] = parent_node_id
rows.append(row)
next_parent = str(node_id) if node_id is not None else parent_node_id
for child in child_values:
visit(child, depth=depth + 1, parent_node_id=next_parent)
visit(structure, depth=0, parent_node_id=None)
return rows
def find_pageindex_node(structure: Any, node_id: str) -> dict[str, Any] | None:
if isinstance(structure, dict):
if str(structure.get("node_id", "")) == str(node_id):

View file

@ -1,6 +1,8 @@
import json
from types import SimpleNamespace
import pytest
class SummaryBackend:
def __init__(self, document_id):
@ -45,6 +47,32 @@ def test_semantic_search_scope_keeps_ordinary_folders_out_of_source_type_filters
assert result["data"]["data"][0]["external_id"] == "dsid_report"
def test_semantic_search_rejects_unquoted_multi_word_query(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
filesystem.register_file(
storage_uri="file:///tmp/report.pdf",
source_path="examples/documents/report.pdf",
folder_path="/documents",
external_id="dsid_report",
title="Annual report",
content="Federal Reserve supervision and regulation annual report.",
)
filesystem.semantic_retrieval_backend = SummaryBackend("dsid_report")
executor = PIFSCommandExecutor(filesystem, json_output=True)
with pytest.raises(PIFSCommandError, match="Quote multi-word queries"):
executor.execute("search-summary Federal Reserve /documents")
with pytest.raises(PIFSCommandError, match="quote it"):
executor.execute("search-summary Federal Reserve")
with pytest.raises(PIFSCommandError, match="does not support regex alternation"):
executor.execute('search-summary "Federal|Reserve" /documents')
def test_semantic_search_scope_filters_explicit_source_type_facets():
from pageindex.filesystem import PageIndexFileSystem

View file

@ -341,8 +341,10 @@ def test_cat_structure_page_reuses_pageindex_client_cache_without_indexing(monke
assert structure["data"]["available"] is True
assert structure["data"]["pageindex_doc_id"] == "doc_cached_pdf"
assert structure["data"]["structure"][0]["title"] == "Introduction"
assert structure["data"]["structure"][1]["title"] == "Findings"
assert structure["data"]["structure_pagination"]["limit"] == 25
assert "text" not in structure["data"]["structure"][0]
assert "text" not in structure["data"]["structure"][0]["nodes"][0]
assert "text" not in structure["data"]["structure"][1]
assert pages["data"]["available"] is True
assert pages["data"]["text"] == "Page one text\n\nPage two text"
@ -401,6 +403,92 @@ def test_cat_node_reads_pageindex_client_structure_without_custom_pifs_artifact(
assert "text" not in node["data"]["node"]
def test_cat_structure_page_node_and_text_outputs_are_hard_limited():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "report.pdf"
source.write_bytes(b"%PDF-1.4\n% test fixture\n")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
structure_nodes = [
{
"title": f"Section {index}",
"node_id": f"{index:04d}",
"start_index": index,
"end_index": index,
"text": f"node {index} text",
"nodes": [],
}
for index in range(1, 31)
]
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_limited_pdf",
{
"id": "doc_limited_pdf",
"type": "pdf",
"path": str(source.resolve()),
"doc_name": "report.pdf",
"doc_description": "",
"page_count": 10,
"structure": structure_nodes,
"pages": [
{"page": index, "content": f"Page {index} text"}
for index in range(1, 11)
],
},
)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/report.pdf",
external_id="dsid_limited_pdf",
title="Limited structural report",
content="text artifact remains available for grep",
)
text_content = "\n".join(f"line {index}" for index in range(1, 106))
filesystem.register_file(
storage_uri="file:///tmp/long.txt",
source_path="docs/long.txt",
external_id="dsid_long_text",
title="Long text",
content=text_content,
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
first_structure = json.loads(executor.execute("cat dsid_limited_pdf --structure"))
assert len(first_structure["data"]["structure"]) == 25
assert first_structure["data"]["structure_pagination"]["has_more"] is True
assert first_structure["data"]["structure_pagination"]["next_offset"] == 25
second_structure = json.loads(
executor.execute("cat dsid_limited_pdf --structure --offset 25")
)
assert len(second_structure["data"]["structure"]) == 5
assert second_structure["data"]["structure"][0]["node_id"] == "0026"
pages = json.loads(executor.execute("cat dsid_limited_pdf --page 1-3"))
assert pages["data"]["text"] == "Page 1 text\n\nPage 2 text\n\nPage 3 text"
assert pages["data"]["page_pagination"]["limit"] == 3
with pytest.raises(PIFSCommandError, match="at most 3"):
executor.execute("cat dsid_limited_pdf --page 1-4")
nodes = json.loads(
executor.execute("cat dsid_limited_pdf --node 0001,0002,0003,0004,0005")
)
assert nodes["data"]["node_ids"] == ["0001", "0002", "0003", "0004", "0005"]
with pytest.raises(PIFSCommandError, match="at most 5"):
executor.execute("cat dsid_limited_pdf --node 0001,0002,0003,0004,0005,0006")
text = json.loads(executor.execute("cat dsid_long_text --all"))
assert "line 100" in text["data"]["text"]
assert "line 101" not in text["data"]["text"]
assert text["data"]["pagination"]["has_more"] is True
assert text["data"]["pagination"]["next_range"] == "101-105"
with pytest.raises(PIFSCommandError, match="at most 100"):
executor.execute("cat dsid_long_text --range 1-101")
def test_tree_folder_behavior_is_preserved():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem

View file

@ -207,10 +207,14 @@ class PIFSAgentStreamTest(unittest.TestCase):
self.assertIn("stat --schema and stat <target>", AGENT_TOOL_POLICY)
self.assertIn("do not infer metadata presence or absence", AGENT_TOOL_POLICY)
self.assertIn("questions about metadata fields", BASH_TOOL_DESCRIPTION)
self.assertIn("Use stat only for metadata/schema/status questions", AGENT_TOOL_POLICY)
self.assertIn("Do not run stat merely to understand what a document says", AGENT_TOOL_POLICY)
self.assertIn("Do not use stat as a general content/topic discovery step", BASH_TOOL_DESCRIPTION)
def test_prompt_routes_summary_search_to_search_summary(self):
self.assertIn("search-summary when the user asks for", BASH_TOOL_DESCRIPTION)
self.assertIn("use search-summary <query> <folder>", AGENT_TOOL_POLICY)
self.assertIn('use search-summary "<query>" <folder>', AGENT_TOOL_POLICY)
self.assertIn('search-summary "Federal Reserve" /documents', BASH_TOOL_DESCRIPTION)
self.assertIn("do not translate that request into find --where", AGENT_TOOL_POLICY)
def test_system_prompt_sets_workspace_identity_and_scope(self):
@ -222,6 +226,8 @@ class PIFSAgentStreamTest(unittest.TestCase):
self.assertIn("workspace-related topic question", AGENT_SYSTEM_PROMPT)
self.assertIn("clarify only after a reasonable search", AGENT_SYSTEM_PROMPT)
self.assertIn("search for candidate documents before asking", AGENT_TOOL_POLICY)
self.assertIn("Do not conclude that no relevant document exists from one failed grep", AGENT_SYSTEM_PROMPT)
self.assertIn("A single failed grep is not enough evidence", AGENT_TOOL_POLICY)
def test_threaded_runtime_error_is_not_retried_on_fresh_loop(self):
session = object.__new__(PIFSAgentSession)

View file

@ -98,6 +98,37 @@ def test_stable_path_targets_work_without_session_refs(tmp_path):
assert "Root document fixture text" in text
def test_shell_limits_reject_context_expanding_counts(tmp_path):
from pageindex.filesystem.commands import PIFSCommandError
executor = _register_find_fixture(tmp_path)
for command, limit in (
("find /documents --limit 51", 50),
("grep --limit 21 Root /documents", 20),
("ls /documents --limit 101", 100),
("tree /documents --limit 201", 200),
("head -n 101 /documents/Root\\ document", 100),
("tail -n 101 /documents/Root\\ document", 100),
("sed -n 1,101p /documents/Root\\ document", 100),
):
with pytest.raises(PIFSCommandError, match=f"at most {limit}"):
executor.execute(command)
def test_grep_rejects_regex_alternation_patterns(tmp_path):
from pageindex.filesystem.commands import PIFSCommandError
executor = _register_find_fixture(tmp_path)
executor.json_output = False
with pytest.raises(PIFSCommandError, match="does not support regex alternation"):
executor.execute('grep -R "Root|Child" /documents')
with pytest.raises(PIFSCommandError, match="multiple grep commands"):
executor.execute('find /documents -type f | grep "Root|Child"')
def test_stat_shell_output_includes_unified_metadata_status(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.metadata_generation import MetadataGenerationResult
@ -142,6 +173,99 @@ def test_stat_shell_output_includes_unified_metadata_status(tmp_path):
assert "metadata_status: generated" in stat
def test_stat_field_reads_one_metadata_field_across_multiple_targets(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
from pageindex.filesystem.metadata_generation import MetadataGenerationResult
class SummaryGenerator:
def generate(self, document, *, fields):
return MetadataGenerationResult(
values={
field: (
f"Summary for {document.title}\n"
+ "full summary token " * 80
)
for field in fields
}
)
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=SummaryGenerator(),
)
for index in range(1, 3):
source = tmp_path / f"source{index}.txt"
source.write_text(f"fixture text {index}", encoding="utf-8")
filesystem.register_file(
storage_uri=source.as_uri(),
source_path=f"docs/source{index}.txt",
folder_path="/documents",
external_id=f"doc_summary_{index}",
title=f"Summary document {index}",
content=source.read_text(encoding="utf-8"),
metadata_policy={
"fields": {
"summary": True,
"doc_type": False,
"domain": False,
"topic": False,
}
},
)
executor = PIFSCommandExecutor(filesystem, json_output=False)
output = executor.execute(
"stat --field summary /documents/'Summary document 1' /documents/'Summary document 2'"
)
assert "/documents/Summary document 1:" in output
assert "summary: Summary for Summary document 1" in output
assert "full summary token" in output
assert "[truncated]" not in output
assert "/documents/Summary document 2:" in output
assert "summary: Summary for Summary document 2" in output
data = json.loads(
PIFSCommandExecutor(filesystem, json_output=True).execute(
"stat --field summary /documents/'Summary document 1' /documents/'Summary document 2'"
)
)["data"]
assert data["mode"] == "field_values"
assert data["target_count"] == 2
assert data["data"][0]["field"] == "summary"
assert data["data"][0]["value"].startswith("Summary for Summary document 1\n")
assert data["data"][0]["value"].count("full summary token") == 80
with pytest.raises(PIFSCommandError, match="Unknown metadata field"):
executor.execute("stat --field missing_field /documents/'Summary document 1'")
def test_stat_field_rejects_more_than_twenty_targets(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
targets = []
for index in range(21):
source = tmp_path / f"source{index}.txt"
source.write_text(f"fixture text {index}", encoding="utf-8")
filesystem.register_file(
storage_uri=source.as_uri(),
source_path=f"docs/source{index}.txt",
folder_path="/documents",
external_id=f"doc_{index}",
title=f"Document {index}",
content=source.read_text(encoding="utf-8"),
metadata={"department": "ops"},
)
targets.append(f"/documents/'Document {index}'")
executor = PIFSCommandExecutor(filesystem, json_output=False)
with pytest.raises(PIFSCommandError, match="at most 20"):
executor.execute("stat --field department " + " ".join(targets))
def test_register_rejects_pifs_owned_metadata_fields(tmp_path):
from pageindex.filesystem import PageIndexFileSystem