diff --git a/pageindex/filesystem/agent.py b/pageindex/filesystem/agent.py index d3bba68..6691f01 100644 --- a/pageindex/filesystem/agent.py +++ b/pageindex/filesystem/agent.py @@ -56,7 +56,7 @@ format. If the caller needs stricter behavior, pass an explicit system_prompt. BASH_TOOL_DESCRIPTION = """ Run a command in the PageIndex FileSystem virtual shell. This is not a real operating-system shell. By default the tool is read-only: use ls, tree, find, -grep, cat, stat, head, tail, sed, and browse when listed in the workspace +grep, cat, stat, and browse when listed in the workspace context. grep -R is lexical evidence search; grep does not support regex alternation such as "a|b"; run multiple grep commands or use browse for relevance-ranked file discovery instead. Start broad workspace questions with diff --git a/pageindex/filesystem/commands.py b/pageindex/filesystem/commands.py index 4b9a598..16a7b22 100644 --- a/pageindex/filesystem/commands.py +++ b/pageindex/filesystem/commands.py @@ -26,11 +26,8 @@ class PIFSCommandExecutor: "browse", "cat", "stat", - "head", - "tail", - "sed", } - ALLOWED_PIPE_FILTERS = {"head", "tail", "grep", "sed"} + ALLOWED_PIPE_FILTERS = {"grep"} MAX_CHAINED_COMMANDS = 3 MAX_PIPE_COMMANDS = 3 MAX_LS_LIMIT = 100 @@ -163,14 +160,8 @@ class PIFSCommandExecutor: "does not support it; run multiple grep commands or browse " "with one phrase each." ) - if name == "head": - return self._pipe_head_tail(input_text, tokens[1:], from_tail=False) - if name == "tail": - return self._pipe_head_tail(input_text, tokens[1:], from_tail=True) if name == "grep": return self._pipe_grep(input_text, tokens[1:]) - if name == "sed": - return self._pipe_sed(input_text, tokens[1:]) raise PIFSCommandError(f"Unsupported pipe command: {name}") def _cmd_ls(self, args: list[str]) -> Any: @@ -599,43 +590,6 @@ class PIFSCommandExecutor: "data": [{"target": target, **self.filesystem._stat(target)} for target in targets], } - def _cmd_head(self, args: list[str]) -> Any: - count, target = self._parse_standalone_head_tail(args, default_count=10) - count = self._require_at_most(count, "head line count", self.MAX_TEXT_LINES) - opened = self.filesystem.cat_text_artifact(target, "all") - lines = opened.text.splitlines() - text = "\n".join(lines[:count]) - return {**self._jsonable(opened), "text": text, "end_line": min(count, len(lines))} - - def _cmd_tail(self, args: list[str]) -> Any: - count, target = self._parse_standalone_head_tail(args, default_count=10) - count = self._require_at_most(count, "tail line count", self.MAX_TEXT_LINES) - opened = self.filesystem.cat_text_artifact(target, "all") - lines = opened.text.splitlines() - selected = lines[-count:] if count else [] - start_line = max(1, len(lines) - len(selected) + 1) - return { - **self._jsonable(opened), - "text": "\n".join(selected), - "start_line": start_line, - "end_line": len(lines), - } - - def _cmd_sed(self, args: list[str]) -> Any: - if len(args) < 3 or args[0] != "-n": - raise PIFSCommandError("sed supports only: sed -n ',p' ") - match = re.fullmatch(r"(\d+),(\d+)p", args[1]) - if not match: - raise PIFSCommandError("sed supports only: sed -n ',p' ") - start, end = int(match.group(1)), int(match.group(2)) - if start < 1 or end < start: - raise PIFSCommandError("Invalid sed line range") - self._require_at_most(end - start + 1, "sed line count", self.MAX_TEXT_LINES) - return self.filesystem.cat_text_artifact( - args[2], - f"{start}-{end}", - ) - def _bounded_text_artifact(self, target: str, location: str) -> dict[str, Any]: if str(location).strip().lower() in {"all", "full", "*"}: start, end = 1, self.MAX_TEXT_LINES @@ -785,8 +739,6 @@ class PIFSCommandExecutor: return self._render_find(data) if command_name == "stat": return self._render_stat(data) - if command_name in {"head", "tail", "sed"}: - return str(data.get("text", "")) if isinstance(data, dict) else str(data) if isinstance(data, dict): return "\n".join(f"{key}: {value}" for key, value in data.items()) if isinstance(data, list): @@ -1597,20 +1549,6 @@ class PIFSCommandExecutor: parts.append(part) return parts - def _pipe_head_tail(self, input_text: str, args: list[str], *, from_tail: bool) -> str: - count = self._parse_head_tail_count(args) - count = self._require_at_most( - count, - "pipe head/tail line count", - self.MAX_TEXT_LINES, - ) - payload = self._try_json_loads(input_text) - if payload is not None: - return self._render_json_payload(self._slice_payload(payload, count, from_tail=from_tail)) - lines = input_text.splitlines() - selected = [] if count == 0 else lines[-count:] if from_tail else lines[:count] - return "\n".join(selected) - def _pipe_grep(self, input_text: str, args: list[str]) -> str: ignore_case = False invert = False @@ -1649,70 +1587,6 @@ class PIFSCommandExecutor: ] return "\n".join(filtered) - def _pipe_sed(self, input_text: str, args: list[str]) -> str: - if not args: - raise PIFSCommandError("pipe sed requires an expression") - if args[0] == "-n": - args = args[1:] - if len(args) != 1: - raise PIFSCommandError("pipe sed supports only -n ',p'") - match = re.fullmatch(r"(\d+)(?:,(\d+))?p", args[0]) - if not match: - raise PIFSCommandError("pipe sed supports only -n ',p'") - start = int(match.group(1)) - end = int(match.group(2) or match.group(1)) - if start < 1 or end < start: - raise PIFSCommandError("Invalid sed line range") - self._require_at_most(end - start + 1, "pipe sed line count", self.MAX_TEXT_LINES) - payload = self._try_json_loads(input_text) - if payload is not None: - return self._render_json_payload(self._slice_text_payload(payload, start, end)) - lines = input_text.splitlines() - return "\n".join(lines[start - 1 : end]) - - @staticmethod - def _parse_head_tail_count(args: list[str]) -> int: - count = 10 - i = 0 - while i < len(args): - arg = args[i] - if arg == "-n": - i += 1 - if i >= len(args): - raise PIFSCommandError("head/tail -n requires a count") - count = PIFSCommandExecutor._parse_non_negative_int(args[i], "head/tail count") - elif re.fullmatch(r"-\d+", arg): - count = PIFSCommandExecutor._parse_non_negative_int(arg[1:], "head/tail count") - elif arg.startswith("-"): - raise PIFSCommandError(f"Unsupported head/tail option: {arg}") - else: - count = PIFSCommandExecutor._parse_non_negative_int(arg, "head/tail count") - i += 1 - return count - - @staticmethod - def _parse_standalone_head_tail(args: list[str], *, default_count: int) -> tuple[int, str]: - count = default_count - target = "" - i = 0 - while i < len(args): - arg = args[i] - if arg == "-n": - i += 1 - if i >= len(args): - raise PIFSCommandError("head/tail -n requires a count") - count = PIFSCommandExecutor._parse_non_negative_int(args[i], "head/tail count") - elif re.fullmatch(r"-\d+", arg): - count = PIFSCommandExecutor._parse_non_negative_int(arg[1:], "head/tail count") - elif arg.startswith("-"): - raise PIFSCommandError(f"Unsupported head/tail option: {arg}") - else: - target = arg - i += 1 - if not target: - raise PIFSCommandError("head/tail requires a file target") - return count, target - @staticmethod def _parse_non_negative_int(value: str, label: str) -> int: try: @@ -1763,40 +1637,6 @@ class PIFSCommandExecutor: def _render_json_payload(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False) - @classmethod - def _slice_payload(cls, payload: Any, count: int, *, from_tail: bool) -> Any: - if isinstance(payload, list): - return payload[-count:] if from_tail and count else payload[:count] - if not isinstance(payload, dict): - return payload - sliced = dict(payload) - if "data" in sliced: - sliced["data"] = cls._slice_data(sliced["data"], count, from_tail=from_tail) - else: - sliced = cls._slice_mapping_lists(sliced, count, from_tail=from_tail) - return sliced - - @classmethod - def _slice_data(cls, data: Any, count: int, *, from_tail: bool) -> Any: - if isinstance(data, list): - return data[-count:] if from_tail and count else data[:count] - if isinstance(data, dict): - if isinstance(data.get("text"), str): - copied = dict(data) - lines = copied["text"].splitlines() - copied["text"] = "\n".join(lines[-count:] if from_tail and count else lines[:count]) - return copied - return cls._slice_mapping_lists(data, count, from_tail=from_tail) - return data - - @classmethod - def _slice_mapping_lists(cls, data: dict[str, Any], count: int, *, from_tail: bool) -> dict[str, Any]: - copied = dict(data) - for key, value in copied.items(): - if isinstance(value, list): - copied[key] = value[-count:] if from_tail and count else value[:count] - return copied - @classmethod def _filter_payload( cls, @@ -1919,18 +1759,3 @@ class PIFSCommandExecutor: else: matched = pattern in text return not matched if invert else matched - - @classmethod - def _slice_text_payload(cls, payload: Any, start: int, end: int) -> Any: - if not isinstance(payload, dict): - return payload - sliced = dict(payload) - data = sliced.get("data") - if isinstance(data, dict) and isinstance(data.get("text"), str): - copied_data = dict(data) - lines = copied_data["text"].splitlines() - copied_data["text"] = "\n".join(lines[start - 1 : end]) - copied_data["start_line"] = start - copied_data["end_line"] = min(end, len(lines)) - sliced["data"] = copied_data - return sliced diff --git a/tests/test_pageindex_filesystem_scope.py b/tests/test_pageindex_filesystem_scope.py index 0ce9f39..b74cc79 100644 --- a/tests/test_pageindex_filesystem_scope.py +++ b/tests/test_pageindex_filesystem_scope.py @@ -182,6 +182,23 @@ def test_browse_is_agent_visible_semantic_command(tmp_path): assert executor.command_capabilities()["retrieval"]["semantic"]["commands"] == ["browse"] +def test_shell_text_window_commands_are_not_agent_visible(tmp_path): + from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem + from pageindex.filesystem.commands import PIFSCommandError + + filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace") + executor = PIFSCommandExecutor(filesystem) + + assert not {"head", "tail", "sed"} & executor.allowed_commands() + assert not {"head", "tail", "sed"} & set( + executor.command_capabilities()["allowed_commands"] + ) + + for command in ("head /documents/a.txt", "tail /documents/a.txt", "sed -n 1,1p /documents/a.txt"): + with pytest.raises(PIFSCommandError, match="Unsupported command"): + executor.execute(command) + + def test_browse_requires_positional_query_and_rejects_removed_options(tmp_path): from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem from pageindex.filesystem.commands import PIFSCommandError diff --git a/tests/test_pageindex_structural_read.py b/tests/test_pageindex_structural_read.py index 63a08dd..2f27077 100644 --- a/tests/test_pageindex_structural_read.py +++ b/tests/test_pageindex_structural_read.py @@ -580,17 +580,6 @@ def test_cat_all_is_limited_to_text_files(): executor.execute("cat dsid_json_file --all") opened_json = filesystem.open("dsid_json_file") assert opened_json.text == '{"body":"json"}' - for command in ( - "head dsid_pdf_file", - "tail dsid_pdf_file", - "sed -n 1,1p dsid_pdf_file", - "head dsid_md_file", - "tail dsid_md_file", - "sed -n 1,1p dsid_md_file", - ): - with pytest.raises(PIFSCommandError, match="only supported for txt/text files"): - executor.execute(command) - def test_pageindex_structure_commands_are_limited_to_pdf_and_markdown(): from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem diff --git a/tests/test_pifs_find_maxdepth.py b/tests/test_pifs_find_maxdepth.py index c1afe91..8b93f70 100644 --- a/tests/test_pifs_find_maxdepth.py +++ b/tests/test_pifs_find_maxdepth.py @@ -108,9 +108,6 @@ def test_shell_limits_reject_context_expanding_counts(tmp_path): ("grep --limit 21 Root /documents", 20), ("ls /documents --limit 101", 100), ("tree /documents --limit 201", 200), - ("head -n 101 /documents/Root\\ document", 100), - ("tail -n 101 /documents/Root\\ document", 100), - ("sed -n 1,101p /documents/Root\\ document", 100), ): with pytest.raises(PIFSCommandError, match=f"at most {limit}"): executor.execute(command)