fix(filesystem): reuse agent session in pifs chat

This commit is contained in:
BukeLy 2026-05-26 16:04:46 +08:00
parent b2089712b3
commit cb1bfb00fa
4 changed files with 241 additions and 104 deletions

View file

@ -354,6 +354,12 @@ class PIFSAgentStreamObserver:
self._start_section("tool_call", "[llm -> pifs command]")
print(command, file=self.output, flush=True)
def emit_request_started(self) -> None:
if not (self.wants_model_stream or self.wants_tool_stream):
return
self._start_section("request_started", "[llm request started]")
print("waiting for first model token or PIFS tool call...", file=self.output, flush=True)
def emit_tool_result(
self,
*,
@ -408,43 +414,167 @@ def run_pifs_agent(
tool_log: list[dict[str, Any]] | None = None,
agent_log: list[dict[str, Any]] | None = None,
) -> str:
try:
from agents import Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled
from openai import AsyncOpenAI
except ModuleNotFoundError as exc:
if exc.name == "agents":
raise RuntimeError("openai-agents is required to run the PageIndex FileSystem agent") from exc
raise
set_tracing_disabled(should_disable_pifs_agent_tracing())
normalized_stream_mode = normalize_agent_stream_mode(stream_mode)
executor = PIFSCommandExecutor(
filesystem,
json_output=False,
query_context=extract_agent_question_text(question),
)
observer = PIFSAgentStreamObserver(normalized_stream_mode, stream_log=agent_log)
instructions = build_pifs_agent_instructions(
session = PIFSAgentSession(
filesystem,
model=model,
root=root,
system_prompt=system_prompt,
executor=executor,
max_turns=max_turns,
max_seconds=max_seconds,
verbose=verbose,
stream_mode=stream_mode,
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
output_type=output_type,
tool_log=tool_log,
agent_log=agent_log,
persist_conversation=False,
)
return session.run(question)
@function_tool(description_override=BASH_TOOL_DESCRIPTION.strip())
def bash(command: str) -> str:
"""Run an allowed PageIndex FileSystem virtual shell command."""
class PIFSAgentSession:
def __init__(
self,
filesystem: PageIndexFileSystem,
*,
model: str,
root: str = "/",
system_prompt: str | None = None,
max_turns: int = 20,
max_seconds: float | None = 60,
verbose: bool = False,
stream_mode: str = "off",
reasoning_effort: str | None = None,
reasoning_summary: str | None = None,
output_type: type[Any] | None = None,
tool_log: list[dict[str, Any]] | None = None,
agent_log: list[dict[str, Any]] | None = None,
persist_conversation: bool = True,
) -> None:
self.filesystem = filesystem
self.max_turns = max_turns
self.max_seconds = max_seconds
self.verbose = verbose
self.tool_log = tool_log
self.agent_log = agent_log
self.normalized_stream_mode = normalize_agent_stream_mode(stream_mode)
self.observer: PIFSAgentStreamObserver | None = None
try:
from agents import (
Agent,
OpenAIChatCompletionsModel,
function_tool,
set_tracing_disabled,
)
from agents.memory import SQLiteSession
from openai import AsyncOpenAI
except ModuleNotFoundError as exc:
if exc.name == "agents":
raise RuntimeError(
"openai-agents is required to run the PageIndex FileSystem agent"
) from exc
raise
set_tracing_disabled(should_disable_pifs_agent_tracing())
self.executor = PIFSCommandExecutor(filesystem, json_output=False)
instructions = build_pifs_agent_instructions(
filesystem,
root=root,
system_prompt=system_prompt,
executor=self.executor,
)
@function_tool(description_override=BASH_TOOL_DESCRIPTION.strip())
def bash(command: str) -> str:
"""Run an allowed PageIndex FileSystem virtual shell command."""
return self._run_bash(command)
model_settings = build_agent_model_settings(
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
)
base_url = os.environ.get("OPENAI_BASE_URL")
model_config = model
if should_use_openai_compatible_chat_model(base_url):
model_config = OpenAIChatCompletionsModel(
model=model,
openai_client=AsyncOpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
base_url=base_url,
),
)
agent_kwargs: dict[str, Any] = {
"name": "PageIndexFileSystem",
"instructions": instructions,
"tools": [bash],
"model": model_config,
}
if model_settings is not None:
agent_kwargs["model_settings"] = model_settings
if output_type is not None:
agent_kwargs["output_type"] = output_type
self.agent = Agent(**agent_kwargs)
self.session = SQLiteSession("pifs-chat") if persist_conversation else None
def run(self, question: str) -> str:
self.executor.query_context = extract_agent_question_text(question)
self.observer = PIFSAgentStreamObserver(
self.normalized_stream_mode,
stream_log=self.agent_log,
)
self.observer.emit_request_started()
async def _run_streamed() -> str:
from agents import Runner
streamed_run = Runner.run_streamed(
self.agent,
question,
max_turns=self.max_turns,
session=self.session,
)
final_output = ""
try:
async for event in streamed_run.stream_events():
self.observer.handle_event(event)
final_output = serialize_agent_final_output(streamed_run.final_output)
return final_output
finally:
if not final_output and streamed_run.final_output:
final_output = serialize_agent_final_output(streamed_run.final_output)
self.observer.finish(final_output)
async def _run() -> str:
if self.max_seconds is None or self.max_seconds <= 0:
return await _run_streamed()
try:
return await asyncio.wait_for(_run_streamed(), timeout=self.max_seconds)
except asyncio.TimeoutError as exc:
raise TimeoutError(f"MaxSecondsExceeded: exceeded {self.max_seconds:g}s") from exc
try:
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
except RuntimeError:
return asyncio.run(_run())
def _run_bash(self, command: str) -> str:
started = time.time()
ok = True
observer.emit_tool_call(command, force=verbose)
assert self.observer is not None
self.observer.emit_tool_call(command, force=self.verbose)
try:
output = executor.execute(command)
output = self.executor.execute(command)
except PIFSCommandError as exc:
ok = False
output = f"ERROR: {exc}"
seconds = time.time() - started
if tool_log is not None:
tool_log.append(
if self.tool_log is not None:
self.tool_log.append(
{
"command": command,
"ok": ok,
@ -453,63 +583,13 @@ def run_pifs_agent(
"preview": output[:500],
}
)
observer.emit_tool_result(ok=ok, output=output, seconds=seconds, force=verbose)
return output
model_settings = build_agent_model_settings(
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
)
base_url = os.environ.get("OPENAI_BASE_URL")
model_config = model
if should_use_openai_compatible_chat_model(base_url):
model_config = OpenAIChatCompletionsModel(
model=model,
openai_client=AsyncOpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
base_url=base_url,
),
self.observer.emit_tool_result(
ok=ok,
output=output,
seconds=seconds,
force=self.verbose,
)
agent_kwargs: dict[str, Any] = {
"name": "PageIndexFileSystem",
"instructions": instructions,
"tools": [bash],
"model": model_config,
}
if model_settings is not None:
agent_kwargs["model_settings"] = model_settings
if output_type is not None:
agent_kwargs["output_type"] = output_type
agent = Agent(**agent_kwargs)
async def _run_streamed() -> str:
streamed_run = Runner.run_streamed(agent, question, max_turns=max_turns)
final_output = ""
try:
async for event in streamed_run.stream_events():
observer.handle_event(event)
final_output = serialize_agent_final_output(streamed_run.final_output)
return final_output
finally:
if not final_output and streamed_run.final_output:
final_output = serialize_agent_final_output(streamed_run.final_output)
observer.finish(final_output)
async def _run() -> str:
if max_seconds is None or max_seconds <= 0:
return await _run_streamed()
try:
return await asyncio.wait_for(_run_streamed(), timeout=max_seconds)
except asyncio.TimeoutError as exc:
raise TimeoutError(f"MaxSecondsExceeded: exceeded {max_seconds:g}s") from exc
try:
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
except RuntimeError:
return asyncio.run(_run())
return output
def extract_agent_question_text(prompt: str) -> str:

View file

@ -9,7 +9,12 @@ import sys
from pathlib import Path
from typing import Iterator, TextIO
from .agent import REASONING_EFFORT_CHOICES, REASONING_SUMMARY_CHOICES, run_pifs_agent
from .agent import (
PIFSAgentSession,
REASONING_EFFORT_CHOICES,
REASONING_SUMMARY_CHOICES,
run_pifs_agent,
)
from .commands import PIFSCommandError, PIFSCommandExecutor
from .core import PageIndexFileSystem
@ -200,6 +205,7 @@ def _run_chat(argv: list[str], *, workspace_default: str | None) -> int:
default_stream_mode="all",
)
filesystem = _filesystem_from_workspace(args.workspace)
session = PIFSAgentSession(filesystem, **_agent_kwargs(args))
while True:
try:
question = _sanitize_chat_question(input("pifs> "))
@ -213,7 +219,7 @@ def _run_chat(argv: list[str], *, workspace_default: str | None) -> int:
if question.lower() in EXIT_COMMANDS:
break
with _suppress_tty_input_echo():
answer = run_pifs_agent(filesystem, question, **_agent_kwargs(args))
answer = session.run(question)
if args.stream_mode == "off":
print(answer)
return 0

View file

@ -92,6 +92,16 @@ class PIFSAgentStreamTest(unittest.TestCase):
self.assertIn("more lines omitted from preview", printed)
self.assertNotIn("line 49", printed)
def test_request_started_prints_immediate_stream_feedback(self):
output = io.StringIO()
observer = PIFSAgentStreamObserver("all", output=output)
observer.emit_request_started()
printed = output.getvalue()
self.assertIn("[llm request started]", printed)
self.assertIn("waiting for first model token or PIFS tool call", printed)
def test_raw_reasoning_is_not_logged_by_default_but_summary_is(self):
output = io.StringIO()
stream_log = []

View file

@ -127,26 +127,32 @@ def test_cli_chat_runs_one_question_and_exits(monkeypatch, capsys, tmp_path):
workspace = tmp_path / "workspace"
inputs = iter(["", "Summarize the workspace", "exit"])
agent_calls = []
session_instances = []
session_questions = []
def fake_run_pifs_agent(filesystem, question, **kwargs):
agent_calls.append((filesystem, question, kwargs))
return f"answer:{question}"
class FakeSession:
def __init__(self, filesystem, **kwargs):
self.filesystem = filesystem
self.kwargs = kwargs
session_instances.append(self)
def run(self, question):
session_questions.append((self, question))
return f"answer:{question}"
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem)
monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent)
monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession)
monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs))
status = cli.main(["chat", "--workspace", str(workspace), "--model", "test-model"])
assert status == 0
assert capsys.readouterr().out == ""
assert len(agent_calls) == 1
filesystem, question, kwargs = agent_calls[0]
assert filesystem.workspace == workspace
assert question == "Summarize the workspace"
assert kwargs["model"] == "test-model"
assert kwargs["stream_mode"] == "all"
assert len(session_instances) == 1
assert session_instances[0].filesystem.workspace == workspace
assert session_questions == [(session_instances[0], "Summarize the workspace")]
assert session_instances[0].kwargs["model"] == "test-model"
assert session_instances[0].kwargs["stream_mode"] == "all"
def test_cli_chat_sanitizes_control_input(monkeypatch, capsys, tmp_path):
@ -156,12 +162,16 @@ def test_cli_chat_sanitizes_control_input(monkeypatch, capsys, tmp_path):
inputs = iter(["\x12", "he\x7fllo\x1b[A", "exit"])
agent_calls = []
def fake_run_pifs_agent(filesystem, question, **kwargs):
agent_calls.append(question)
return f"answer:{question}"
class FakeSession:
def __init__(self, filesystem, **kwargs):
pass
def run(self, question):
agent_calls.append(question)
return f"answer:{question}"
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem)
monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent)
monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession)
monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs))
status = cli.main(["chat", "--workspace", str(workspace), "--stream-mode", "off"])
@ -205,14 +215,17 @@ def test_cli_chat_stream_mode_can_be_overridden(monkeypatch, tmp_path):
workspace = tmp_path / "workspace"
inputs = iter(["Summarize the workspace", "exit"])
agent_calls = []
session_kwargs = []
def fake_run_pifs_agent(filesystem, question, **kwargs):
agent_calls.append((filesystem, question, kwargs))
return f"answer:{question}"
class FakeSession:
def __init__(self, filesystem, **kwargs):
session_kwargs.append(kwargs)
def run(self, question):
return f"answer:{question}"
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem)
monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent)
monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession)
monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs))
status = cli.main(
@ -226,4 +239,32 @@ def test_cli_chat_stream_mode_can_be_overridden(monkeypatch, tmp_path):
)
assert status == 0
assert agent_calls[0][2]["stream_mode"] == "tools"
assert session_kwargs[0]["stream_mode"] == "tools"
def test_cli_chat_reuses_one_agent_session_for_multiple_questions(monkeypatch, capsys, tmp_path):
from pageindex.filesystem import cli
workspace = tmp_path / "workspace"
inputs = iter(["first", "second", "exit"])
sessions = []
class FakeSession:
def __init__(self, filesystem, **kwargs):
self.questions = []
sessions.append(self)
def run(self, question):
self.questions.append(question)
return f"answer:{question}"
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem)
monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession)
monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs))
status = cli.main(["chat", "--workspace", str(workspace), "--stream-mode", "off"])
assert status == 0
assert len(sessions) == 1
assert sessions[0].questions == ["first", "second"]
assert capsys.readouterr().out == "answer:first\nanswer:second\n"