mirror of
https://github.com/VectifyAI/PageIndex.git
synced 2026-06-12 19:55:17 +02:00
Remove unused PIFS text window commands
This commit is contained in:
parent
d0c0c67a39
commit
b9e30952ad
5 changed files with 19 additions and 191 deletions
|
|
@ -56,7 +56,7 @@ format. If the caller needs stricter behavior, pass an explicit system_prompt.
|
|||
BASH_TOOL_DESCRIPTION = """
|
||||
Run a command in the PageIndex FileSystem virtual shell. This is not a real
|
||||
operating-system shell. By default the tool is read-only: use ls, tree, find,
|
||||
grep, cat, stat, head, tail, sed, and browse when listed in the workspace
|
||||
grep, cat, stat, and browse when listed in the workspace
|
||||
context. grep -R is lexical evidence search; grep does not support regex
|
||||
alternation such as "a|b"; run multiple grep commands or use browse for
|
||||
relevance-ranked file discovery instead. Start broad workspace questions with
|
||||
|
|
|
|||
|
|
@ -26,11 +26,8 @@ class PIFSCommandExecutor:
|
|||
"browse",
|
||||
"cat",
|
||||
"stat",
|
||||
"head",
|
||||
"tail",
|
||||
"sed",
|
||||
}
|
||||
ALLOWED_PIPE_FILTERS = {"head", "tail", "grep", "sed"}
|
||||
ALLOWED_PIPE_FILTERS = {"grep"}
|
||||
MAX_CHAINED_COMMANDS = 3
|
||||
MAX_PIPE_COMMANDS = 3
|
||||
MAX_LS_LIMIT = 100
|
||||
|
|
@ -163,14 +160,8 @@ class PIFSCommandExecutor:
|
|||
"does not support it; run multiple grep commands or browse "
|
||||
"with one phrase each."
|
||||
)
|
||||
if name == "head":
|
||||
return self._pipe_head_tail(input_text, tokens[1:], from_tail=False)
|
||||
if name == "tail":
|
||||
return self._pipe_head_tail(input_text, tokens[1:], from_tail=True)
|
||||
if name == "grep":
|
||||
return self._pipe_grep(input_text, tokens[1:])
|
||||
if name == "sed":
|
||||
return self._pipe_sed(input_text, tokens[1:])
|
||||
raise PIFSCommandError(f"Unsupported pipe command: {name}")
|
||||
|
||||
def _cmd_ls(self, args: list[str]) -> Any:
|
||||
|
|
@ -599,43 +590,6 @@ class PIFSCommandExecutor:
|
|||
"data": [{"target": target, **self.filesystem._stat(target)} for target in targets],
|
||||
}
|
||||
|
||||
def _cmd_head(self, args: list[str]) -> Any:
|
||||
count, target = self._parse_standalone_head_tail(args, default_count=10)
|
||||
count = self._require_at_most(count, "head line count", self.MAX_TEXT_LINES)
|
||||
opened = self.filesystem.cat_text_artifact(target, "all")
|
||||
lines = opened.text.splitlines()
|
||||
text = "\n".join(lines[:count])
|
||||
return {**self._jsonable(opened), "text": text, "end_line": min(count, len(lines))}
|
||||
|
||||
def _cmd_tail(self, args: list[str]) -> Any:
|
||||
count, target = self._parse_standalone_head_tail(args, default_count=10)
|
||||
count = self._require_at_most(count, "tail line count", self.MAX_TEXT_LINES)
|
||||
opened = self.filesystem.cat_text_artifact(target, "all")
|
||||
lines = opened.text.splitlines()
|
||||
selected = lines[-count:] if count else []
|
||||
start_line = max(1, len(lines) - len(selected) + 1)
|
||||
return {
|
||||
**self._jsonable(opened),
|
||||
"text": "\n".join(selected),
|
||||
"start_line": start_line,
|
||||
"end_line": len(lines),
|
||||
}
|
||||
|
||||
def _cmd_sed(self, args: list[str]) -> Any:
|
||||
if len(args) < 3 or args[0] != "-n":
|
||||
raise PIFSCommandError("sed supports only: sed -n '<start>,<end>p' <target>")
|
||||
match = re.fullmatch(r"(\d+),(\d+)p", args[1])
|
||||
if not match:
|
||||
raise PIFSCommandError("sed supports only: sed -n '<start>,<end>p' <target>")
|
||||
start, end = int(match.group(1)), int(match.group(2))
|
||||
if start < 1 or end < start:
|
||||
raise PIFSCommandError("Invalid sed line range")
|
||||
self._require_at_most(end - start + 1, "sed line count", self.MAX_TEXT_LINES)
|
||||
return self.filesystem.cat_text_artifact(
|
||||
args[2],
|
||||
f"{start}-{end}",
|
||||
)
|
||||
|
||||
def _bounded_text_artifact(self, target: str, location: str) -> dict[str, Any]:
|
||||
if str(location).strip().lower() in {"all", "full", "*"}:
|
||||
start, end = 1, self.MAX_TEXT_LINES
|
||||
|
|
@ -785,8 +739,6 @@ class PIFSCommandExecutor:
|
|||
return self._render_find(data)
|
||||
if command_name == "stat":
|
||||
return self._render_stat(data)
|
||||
if command_name in {"head", "tail", "sed"}:
|
||||
return str(data.get("text", "")) if isinstance(data, dict) else str(data)
|
||||
if isinstance(data, dict):
|
||||
return "\n".join(f"{key}: {value}" for key, value in data.items())
|
||||
if isinstance(data, list):
|
||||
|
|
@ -1597,20 +1549,6 @@ class PIFSCommandExecutor:
|
|||
parts.append(part)
|
||||
return parts
|
||||
|
||||
def _pipe_head_tail(self, input_text: str, args: list[str], *, from_tail: bool) -> str:
|
||||
count = self._parse_head_tail_count(args)
|
||||
count = self._require_at_most(
|
||||
count,
|
||||
"pipe head/tail line count",
|
||||
self.MAX_TEXT_LINES,
|
||||
)
|
||||
payload = self._try_json_loads(input_text)
|
||||
if payload is not None:
|
||||
return self._render_json_payload(self._slice_payload(payload, count, from_tail=from_tail))
|
||||
lines = input_text.splitlines()
|
||||
selected = [] if count == 0 else lines[-count:] if from_tail else lines[:count]
|
||||
return "\n".join(selected)
|
||||
|
||||
def _pipe_grep(self, input_text: str, args: list[str]) -> str:
|
||||
ignore_case = False
|
||||
invert = False
|
||||
|
|
@ -1649,70 +1587,6 @@ class PIFSCommandExecutor:
|
|||
]
|
||||
return "\n".join(filtered)
|
||||
|
||||
def _pipe_sed(self, input_text: str, args: list[str]) -> str:
|
||||
if not args:
|
||||
raise PIFSCommandError("pipe sed requires an expression")
|
||||
if args[0] == "-n":
|
||||
args = args[1:]
|
||||
if len(args) != 1:
|
||||
raise PIFSCommandError("pipe sed supports only -n '<start>,<end>p'")
|
||||
match = re.fullmatch(r"(\d+)(?:,(\d+))?p", args[0])
|
||||
if not match:
|
||||
raise PIFSCommandError("pipe sed supports only -n '<start>,<end>p'")
|
||||
start = int(match.group(1))
|
||||
end = int(match.group(2) or match.group(1))
|
||||
if start < 1 or end < start:
|
||||
raise PIFSCommandError("Invalid sed line range")
|
||||
self._require_at_most(end - start + 1, "pipe sed line count", self.MAX_TEXT_LINES)
|
||||
payload = self._try_json_loads(input_text)
|
||||
if payload is not None:
|
||||
return self._render_json_payload(self._slice_text_payload(payload, start, end))
|
||||
lines = input_text.splitlines()
|
||||
return "\n".join(lines[start - 1 : end])
|
||||
|
||||
@staticmethod
|
||||
def _parse_head_tail_count(args: list[str]) -> int:
|
||||
count = 10
|
||||
i = 0
|
||||
while i < len(args):
|
||||
arg = args[i]
|
||||
if arg == "-n":
|
||||
i += 1
|
||||
if i >= len(args):
|
||||
raise PIFSCommandError("head/tail -n requires a count")
|
||||
count = PIFSCommandExecutor._parse_non_negative_int(args[i], "head/tail count")
|
||||
elif re.fullmatch(r"-\d+", arg):
|
||||
count = PIFSCommandExecutor._parse_non_negative_int(arg[1:], "head/tail count")
|
||||
elif arg.startswith("-"):
|
||||
raise PIFSCommandError(f"Unsupported head/tail option: {arg}")
|
||||
else:
|
||||
count = PIFSCommandExecutor._parse_non_negative_int(arg, "head/tail count")
|
||||
i += 1
|
||||
return count
|
||||
|
||||
@staticmethod
|
||||
def _parse_standalone_head_tail(args: list[str], *, default_count: int) -> tuple[int, str]:
|
||||
count = default_count
|
||||
target = ""
|
||||
i = 0
|
||||
while i < len(args):
|
||||
arg = args[i]
|
||||
if arg == "-n":
|
||||
i += 1
|
||||
if i >= len(args):
|
||||
raise PIFSCommandError("head/tail -n requires a count")
|
||||
count = PIFSCommandExecutor._parse_non_negative_int(args[i], "head/tail count")
|
||||
elif re.fullmatch(r"-\d+", arg):
|
||||
count = PIFSCommandExecutor._parse_non_negative_int(arg[1:], "head/tail count")
|
||||
elif arg.startswith("-"):
|
||||
raise PIFSCommandError(f"Unsupported head/tail option: {arg}")
|
||||
else:
|
||||
target = arg
|
||||
i += 1
|
||||
if not target:
|
||||
raise PIFSCommandError("head/tail requires a file target")
|
||||
return count, target
|
||||
|
||||
@staticmethod
|
||||
def _parse_non_negative_int(value: str, label: str) -> int:
|
||||
try:
|
||||
|
|
@ -1763,40 +1637,6 @@ class PIFSCommandExecutor:
|
|||
def _render_json_payload(payload: Any) -> str:
|
||||
return json.dumps(payload, ensure_ascii=False)
|
||||
|
||||
@classmethod
|
||||
def _slice_payload(cls, payload: Any, count: int, *, from_tail: bool) -> Any:
|
||||
if isinstance(payload, list):
|
||||
return payload[-count:] if from_tail and count else payload[:count]
|
||||
if not isinstance(payload, dict):
|
||||
return payload
|
||||
sliced = dict(payload)
|
||||
if "data" in sliced:
|
||||
sliced["data"] = cls._slice_data(sliced["data"], count, from_tail=from_tail)
|
||||
else:
|
||||
sliced = cls._slice_mapping_lists(sliced, count, from_tail=from_tail)
|
||||
return sliced
|
||||
|
||||
@classmethod
|
||||
def _slice_data(cls, data: Any, count: int, *, from_tail: bool) -> Any:
|
||||
if isinstance(data, list):
|
||||
return data[-count:] if from_tail and count else data[:count]
|
||||
if isinstance(data, dict):
|
||||
if isinstance(data.get("text"), str):
|
||||
copied = dict(data)
|
||||
lines = copied["text"].splitlines()
|
||||
copied["text"] = "\n".join(lines[-count:] if from_tail and count else lines[:count])
|
||||
return copied
|
||||
return cls._slice_mapping_lists(data, count, from_tail=from_tail)
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def _slice_mapping_lists(cls, data: dict[str, Any], count: int, *, from_tail: bool) -> dict[str, Any]:
|
||||
copied = dict(data)
|
||||
for key, value in copied.items():
|
||||
if isinstance(value, list):
|
||||
copied[key] = value[-count:] if from_tail and count else value[:count]
|
||||
return copied
|
||||
|
||||
@classmethod
|
||||
def _filter_payload(
|
||||
cls,
|
||||
|
|
@ -1919,18 +1759,3 @@ class PIFSCommandExecutor:
|
|||
else:
|
||||
matched = pattern in text
|
||||
return not matched if invert else matched
|
||||
|
||||
@classmethod
|
||||
def _slice_text_payload(cls, payload: Any, start: int, end: int) -> Any:
|
||||
if not isinstance(payload, dict):
|
||||
return payload
|
||||
sliced = dict(payload)
|
||||
data = sliced.get("data")
|
||||
if isinstance(data, dict) and isinstance(data.get("text"), str):
|
||||
copied_data = dict(data)
|
||||
lines = copied_data["text"].splitlines()
|
||||
copied_data["text"] = "\n".join(lines[start - 1 : end])
|
||||
copied_data["start_line"] = start
|
||||
copied_data["end_line"] = min(end, len(lines))
|
||||
sliced["data"] = copied_data
|
||||
return sliced
|
||||
|
|
|
|||
|
|
@ -182,6 +182,23 @@ def test_browse_is_agent_visible_semantic_command(tmp_path):
|
|||
assert executor.command_capabilities()["retrieval"]["semantic"]["commands"] == ["browse"]
|
||||
|
||||
|
||||
def test_shell_text_window_commands_are_not_agent_visible(tmp_path):
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
from pageindex.filesystem.commands import PIFSCommandError
|
||||
|
||||
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
|
||||
executor = PIFSCommandExecutor(filesystem)
|
||||
|
||||
assert not {"head", "tail", "sed"} & executor.allowed_commands()
|
||||
assert not {"head", "tail", "sed"} & set(
|
||||
executor.command_capabilities()["allowed_commands"]
|
||||
)
|
||||
|
||||
for command in ("head /documents/a.txt", "tail /documents/a.txt", "sed -n 1,1p /documents/a.txt"):
|
||||
with pytest.raises(PIFSCommandError, match="Unsupported command"):
|
||||
executor.execute(command)
|
||||
|
||||
|
||||
def test_browse_requires_positional_query_and_rejects_removed_options(tmp_path):
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
from pageindex.filesystem.commands import PIFSCommandError
|
||||
|
|
|
|||
|
|
@ -580,17 +580,6 @@ def test_cat_all_is_limited_to_text_files():
|
|||
executor.execute("cat dsid_json_file --all")
|
||||
opened_json = filesystem.open("dsid_json_file")
|
||||
assert opened_json.text == '{"body":"json"}'
|
||||
for command in (
|
||||
"head dsid_pdf_file",
|
||||
"tail dsid_pdf_file",
|
||||
"sed -n 1,1p dsid_pdf_file",
|
||||
"head dsid_md_file",
|
||||
"tail dsid_md_file",
|
||||
"sed -n 1,1p dsid_md_file",
|
||||
):
|
||||
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
|
||||
executor.execute(command)
|
||||
|
||||
|
||||
def test_pageindex_structure_commands_are_limited_to_pdf_and_markdown():
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
|
|
|
|||
|
|
@ -108,9 +108,6 @@ def test_shell_limits_reject_context_expanding_counts(tmp_path):
|
|||
("grep --limit 21 Root /documents", 20),
|
||||
("ls /documents --limit 101", 100),
|
||||
("tree /documents --limit 201", 200),
|
||||
("head -n 101 /documents/Root\\ document", 100),
|
||||
("tail -n 101 /documents/Root\\ document", 100),
|
||||
("sed -n 1,101p /documents/Root\\ document", 100),
|
||||
):
|
||||
with pytest.raises(PIFSCommandError, match=f"at most {limit}"):
|
||||
executor.execute(command)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue