diff --git a/pageindex/filesystem/agent.py b/pageindex/filesystem/agent.py index 2fbe034..a1a3847 100644 --- a/pageindex/filesystem/agent.py +++ b/pageindex/filesystem/agent.py @@ -354,6 +354,12 @@ class PIFSAgentStreamObserver: self._start_section("tool_call", "[llm -> pifs command]") print(command, file=self.output, flush=True) + def emit_request_started(self) -> None: + if not (self.wants_model_stream or self.wants_tool_stream): + return + self._start_section("request_started", "[llm request started]") + print("waiting for first model token or PIFS tool call...", file=self.output, flush=True) + def emit_tool_result( self, *, @@ -408,43 +414,167 @@ def run_pifs_agent( tool_log: list[dict[str, Any]] | None = None, agent_log: list[dict[str, Any]] | None = None, ) -> str: - try: - from agents import Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled - from openai import AsyncOpenAI - except ModuleNotFoundError as exc: - if exc.name == "agents": - raise RuntimeError("openai-agents is required to run the PageIndex FileSystem agent") from exc - raise - - set_tracing_disabled(should_disable_pifs_agent_tracing()) - normalized_stream_mode = normalize_agent_stream_mode(stream_mode) - executor = PIFSCommandExecutor( - filesystem, - json_output=False, - query_context=extract_agent_question_text(question), - ) - observer = PIFSAgentStreamObserver(normalized_stream_mode, stream_log=agent_log) - instructions = build_pifs_agent_instructions( + session = PIFSAgentSession( filesystem, + model=model, root=root, system_prompt=system_prompt, - executor=executor, + max_turns=max_turns, + max_seconds=max_seconds, + verbose=verbose, + stream_mode=stream_mode, + reasoning_effort=reasoning_effort, + reasoning_summary=reasoning_summary, + output_type=output_type, + tool_log=tool_log, + agent_log=agent_log, + persist_conversation=False, ) + return session.run(question) - @function_tool(description_override=BASH_TOOL_DESCRIPTION.strip()) - def bash(command: str) -> str: - """Run an allowed PageIndex FileSystem virtual shell command.""" + +class PIFSAgentSession: + def __init__( + self, + filesystem: PageIndexFileSystem, + *, + model: str, + root: str = "/", + system_prompt: str | None = None, + max_turns: int = 20, + max_seconds: float | None = 60, + verbose: bool = False, + stream_mode: str = "off", + reasoning_effort: str | None = None, + reasoning_summary: str | None = None, + output_type: type[Any] | None = None, + tool_log: list[dict[str, Any]] | None = None, + agent_log: list[dict[str, Any]] | None = None, + persist_conversation: bool = True, + ) -> None: + self.filesystem = filesystem + self.max_turns = max_turns + self.max_seconds = max_seconds + self.verbose = verbose + self.tool_log = tool_log + self.agent_log = agent_log + self.normalized_stream_mode = normalize_agent_stream_mode(stream_mode) + self.observer: PIFSAgentStreamObserver | None = None + + try: + from agents import ( + Agent, + OpenAIChatCompletionsModel, + function_tool, + set_tracing_disabled, + ) + from agents.memory import SQLiteSession + from openai import AsyncOpenAI + except ModuleNotFoundError as exc: + if exc.name == "agents": + raise RuntimeError( + "openai-agents is required to run the PageIndex FileSystem agent" + ) from exc + raise + + set_tracing_disabled(should_disable_pifs_agent_tracing()) + self.executor = PIFSCommandExecutor(filesystem, json_output=False) + instructions = build_pifs_agent_instructions( + filesystem, + root=root, + system_prompt=system_prompt, + executor=self.executor, + ) + + @function_tool(description_override=BASH_TOOL_DESCRIPTION.strip()) + def bash(command: str) -> str: + """Run an allowed PageIndex FileSystem virtual shell command.""" + return self._run_bash(command) + + model_settings = build_agent_model_settings( + reasoning_effort=reasoning_effort, + reasoning_summary=reasoning_summary, + ) + base_url = os.environ.get("OPENAI_BASE_URL") + model_config = model + if should_use_openai_compatible_chat_model(base_url): + model_config = OpenAIChatCompletionsModel( + model=model, + openai_client=AsyncOpenAI( + api_key=os.environ.get("OPENAI_API_KEY"), + base_url=base_url, + ), + ) + + agent_kwargs: dict[str, Any] = { + "name": "PageIndexFileSystem", + "instructions": instructions, + "tools": [bash], + "model": model_config, + } + if model_settings is not None: + agent_kwargs["model_settings"] = model_settings + if output_type is not None: + agent_kwargs["output_type"] = output_type + self.agent = Agent(**agent_kwargs) + self.session = SQLiteSession("pifs-chat") if persist_conversation else None + + def run(self, question: str) -> str: + self.executor.query_context = extract_agent_question_text(question) + self.observer = PIFSAgentStreamObserver( + self.normalized_stream_mode, + stream_log=self.agent_log, + ) + self.observer.emit_request_started() + + async def _run_streamed() -> str: + from agents import Runner + + streamed_run = Runner.run_streamed( + self.agent, + question, + max_turns=self.max_turns, + session=self.session, + ) + final_output = "" + try: + async for event in streamed_run.stream_events(): + self.observer.handle_event(event) + final_output = serialize_agent_final_output(streamed_run.final_output) + return final_output + finally: + if not final_output and streamed_run.final_output: + final_output = serialize_agent_final_output(streamed_run.final_output) + self.observer.finish(final_output) + + async def _run() -> str: + if self.max_seconds is None or self.max_seconds <= 0: + return await _run_streamed() + try: + return await asyncio.wait_for(_run_streamed(), timeout=self.max_seconds) + except asyncio.TimeoutError as exc: + raise TimeoutError(f"MaxSecondsExceeded: exceeded {self.max_seconds:g}s") from exc + + try: + asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, _run()).result() + except RuntimeError: + return asyncio.run(_run()) + + def _run_bash(self, command: str) -> str: started = time.time() ok = True - observer.emit_tool_call(command, force=verbose) + assert self.observer is not None + self.observer.emit_tool_call(command, force=self.verbose) try: - output = executor.execute(command) + output = self.executor.execute(command) except PIFSCommandError as exc: ok = False output = f"ERROR: {exc}" seconds = time.time() - started - if tool_log is not None: - tool_log.append( + if self.tool_log is not None: + self.tool_log.append( { "command": command, "ok": ok, @@ -453,63 +583,13 @@ def run_pifs_agent( "preview": output[:500], } ) - observer.emit_tool_result(ok=ok, output=output, seconds=seconds, force=verbose) - return output - - model_settings = build_agent_model_settings( - reasoning_effort=reasoning_effort, - reasoning_summary=reasoning_summary, - ) - base_url = os.environ.get("OPENAI_BASE_URL") - model_config = model - if should_use_openai_compatible_chat_model(base_url): - model_config = OpenAIChatCompletionsModel( - model=model, - openai_client=AsyncOpenAI( - api_key=os.environ.get("OPENAI_API_KEY"), - base_url=base_url, - ), + self.observer.emit_tool_result( + ok=ok, + output=output, + seconds=seconds, + force=self.verbose, ) - - agent_kwargs: dict[str, Any] = { - "name": "PageIndexFileSystem", - "instructions": instructions, - "tools": [bash], - "model": model_config, - } - if model_settings is not None: - agent_kwargs["model_settings"] = model_settings - if output_type is not None: - agent_kwargs["output_type"] = output_type - agent = Agent(**agent_kwargs) - - async def _run_streamed() -> str: - streamed_run = Runner.run_streamed(agent, question, max_turns=max_turns) - final_output = "" - try: - async for event in streamed_run.stream_events(): - observer.handle_event(event) - final_output = serialize_agent_final_output(streamed_run.final_output) - return final_output - finally: - if not final_output and streamed_run.final_output: - final_output = serialize_agent_final_output(streamed_run.final_output) - observer.finish(final_output) - - async def _run() -> str: - if max_seconds is None or max_seconds <= 0: - return await _run_streamed() - try: - return await asyncio.wait_for(_run_streamed(), timeout=max_seconds) - except asyncio.TimeoutError as exc: - raise TimeoutError(f"MaxSecondsExceeded: exceeded {max_seconds:g}s") from exc - - try: - asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - return pool.submit(asyncio.run, _run()).result() - except RuntimeError: - return asyncio.run(_run()) + return output def extract_agent_question_text(prompt: str) -> str: diff --git a/pageindex/filesystem/cli.py b/pageindex/filesystem/cli.py index ab749e2..aa443a5 100644 --- a/pageindex/filesystem/cli.py +++ b/pageindex/filesystem/cli.py @@ -9,7 +9,12 @@ import sys from pathlib import Path from typing import Iterator, TextIO -from .agent import REASONING_EFFORT_CHOICES, REASONING_SUMMARY_CHOICES, run_pifs_agent +from .agent import ( + PIFSAgentSession, + REASONING_EFFORT_CHOICES, + REASONING_SUMMARY_CHOICES, + run_pifs_agent, +) from .commands import PIFSCommandError, PIFSCommandExecutor from .core import PageIndexFileSystem @@ -200,6 +205,7 @@ def _run_chat(argv: list[str], *, workspace_default: str | None) -> int: default_stream_mode="all", ) filesystem = _filesystem_from_workspace(args.workspace) + session = PIFSAgentSession(filesystem, **_agent_kwargs(args)) while True: try: question = _sanitize_chat_question(input("pifs> ")) @@ -213,7 +219,7 @@ def _run_chat(argv: list[str], *, workspace_default: str | None) -> int: if question.lower() in EXIT_COMMANDS: break with _suppress_tty_input_echo(): - answer = run_pifs_agent(filesystem, question, **_agent_kwargs(args)) + answer = session.run(question) if args.stream_mode == "off": print(answer) return 0 diff --git a/tests/test_pifs_agent_stream.py b/tests/test_pifs_agent_stream.py index 1b7c9d1..0f56f36 100644 --- a/tests/test_pifs_agent_stream.py +++ b/tests/test_pifs_agent_stream.py @@ -92,6 +92,16 @@ class PIFSAgentStreamTest(unittest.TestCase): self.assertIn("more lines omitted from preview", printed) self.assertNotIn("line 49", printed) + def test_request_started_prints_immediate_stream_feedback(self): + output = io.StringIO() + observer = PIFSAgentStreamObserver("all", output=output) + + observer.emit_request_started() + + printed = output.getvalue() + self.assertIn("[llm request started]", printed) + self.assertIn("waiting for first model token or PIFS tool call", printed) + def test_raw_reasoning_is_not_logged_by_default_but_summary_is(self): output = io.StringIO() stream_log = [] diff --git a/tests/test_pifs_cli.py b/tests/test_pifs_cli.py index 85e2b6d..04717c4 100644 --- a/tests/test_pifs_cli.py +++ b/tests/test_pifs_cli.py @@ -127,26 +127,32 @@ def test_cli_chat_runs_one_question_and_exits(monkeypatch, capsys, tmp_path): workspace = tmp_path / "workspace" inputs = iter(["", "Summarize the workspace", "exit"]) - agent_calls = [] + session_instances = [] + session_questions = [] - def fake_run_pifs_agent(filesystem, question, **kwargs): - agent_calls.append((filesystem, question, kwargs)) - return f"answer:{question}" + class FakeSession: + def __init__(self, filesystem, **kwargs): + self.filesystem = filesystem + self.kwargs = kwargs + session_instances.append(self) + + def run(self, question): + session_questions.append((self, question)) + return f"answer:{question}" monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem) - monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent) + monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession) monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) status = cli.main(["chat", "--workspace", str(workspace), "--model", "test-model"]) assert status == 0 assert capsys.readouterr().out == "" - assert len(agent_calls) == 1 - filesystem, question, kwargs = agent_calls[0] - assert filesystem.workspace == workspace - assert question == "Summarize the workspace" - assert kwargs["model"] == "test-model" - assert kwargs["stream_mode"] == "all" + assert len(session_instances) == 1 + assert session_instances[0].filesystem.workspace == workspace + assert session_questions == [(session_instances[0], "Summarize the workspace")] + assert session_instances[0].kwargs["model"] == "test-model" + assert session_instances[0].kwargs["stream_mode"] == "all" def test_cli_chat_sanitizes_control_input(monkeypatch, capsys, tmp_path): @@ -156,12 +162,16 @@ def test_cli_chat_sanitizes_control_input(monkeypatch, capsys, tmp_path): inputs = iter(["\x12", "he\x7fllo\x1b[A", "exit"]) agent_calls = [] - def fake_run_pifs_agent(filesystem, question, **kwargs): - agent_calls.append(question) - return f"answer:{question}" + class FakeSession: + def __init__(self, filesystem, **kwargs): + pass + + def run(self, question): + agent_calls.append(question) + return f"answer:{question}" monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem) - monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent) + monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession) monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) status = cli.main(["chat", "--workspace", str(workspace), "--stream-mode", "off"]) @@ -205,14 +215,17 @@ def test_cli_chat_stream_mode_can_be_overridden(monkeypatch, tmp_path): workspace = tmp_path / "workspace" inputs = iter(["Summarize the workspace", "exit"]) - agent_calls = [] + session_kwargs = [] - def fake_run_pifs_agent(filesystem, question, **kwargs): - agent_calls.append((filesystem, question, kwargs)) - return f"answer:{question}" + class FakeSession: + def __init__(self, filesystem, **kwargs): + session_kwargs.append(kwargs) + + def run(self, question): + return f"answer:{question}" monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem) - monkeypatch.setattr(cli, "run_pifs_agent", fake_run_pifs_agent) + monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession) monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) status = cli.main( @@ -226,4 +239,32 @@ def test_cli_chat_stream_mode_can_be_overridden(monkeypatch, tmp_path): ) assert status == 0 - assert agent_calls[0][2]["stream_mode"] == "tools" + assert session_kwargs[0]["stream_mode"] == "tools" + + +def test_cli_chat_reuses_one_agent_session_for_multiple_questions(monkeypatch, capsys, tmp_path): + from pageindex.filesystem import cli + + workspace = tmp_path / "workspace" + inputs = iter(["first", "second", "exit"]) + sessions = [] + + class FakeSession: + def __init__(self, filesystem, **kwargs): + self.questions = [] + sessions.append(self) + + def run(self, question): + self.questions.append(question) + return f"answer:{question}" + + monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem) + monkeypatch.setattr(cli, "PIFSAgentSession", FakeSession) + monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) + + status = cli.main(["chat", "--workspace", str(workspace), "--stream-mode", "off"]) + + assert status == 0 + assert len(sessions) == 1 + assert sessions[0].questions == ["first", "second"] + assert capsys.readouterr().out == "answer:first\nanswer:second\n"