diff --git a/pageindex/filesystem/agent.py b/pageindex/filesystem/agent.py index 4c26919..0d69c73 100644 --- a/pageindex/filesystem/agent.py +++ b/pageindex/filesystem/agent.py @@ -585,11 +585,12 @@ class PIFSAgentSession: try: asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - return pool.submit(asyncio.run, _run()).result() except RuntimeError: return asyncio.run(_run()) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, _run()).result() + def _run_bash(self, command: str) -> str: started = time.time() ok = True diff --git a/tests/test_pifs_agent_stream.py b/tests/test_pifs_agent_stream.py index 1b6a378..0994b94 100644 --- a/tests/test_pifs_agent_stream.py +++ b/tests/test_pifs_agent_stream.py @@ -1,14 +1,18 @@ import io import os +import threading import unittest +from unittest.mock import patch from types import SimpleNamespace from pydantic import BaseModel, ConfigDict +from pageindex.filesystem import agent as agent_module from pageindex.filesystem.agent import ( AGENT_TOOL_POLICY, AGENT_SYSTEM_PROMPT, BASH_TOOL_DESCRIPTION, + PIFSAgentSession, PIFSAgentStreamObserver, build_agent_model_settings, normalize_agent_stream_mode, @@ -219,6 +223,34 @@ class PIFSAgentStreamTest(unittest.TestCase): self.assertIn("clarify only after a reasonable search", AGENT_SYSTEM_PROMPT) self.assertIn("search for candidate documents before asking", AGENT_TOOL_POLICY) + def test_threaded_runtime_error_is_not_retried_on_fresh_loop(self): + session = object.__new__(PIFSAgentSession) + session.executor = SimpleNamespace(query_context=None) + session.normalized_stream_mode = "off" + session.agent_log = [] + session.max_seconds = None + session.max_turns = 1 + session.session = None + session.agent = object() + + main_thread = threading.get_ident() + run_threads = [] + + def fail_asyncio_run(coro): + coro.close() + run_threads.append(threading.get_ident()) + raise RuntimeError("threaded agent failure") + + with ( + patch.object(agent_module.asyncio, "get_running_loop", return_value=object()), + patch.object(agent_module.asyncio, "run", side_effect=fail_asyncio_run), + ): + with self.assertRaisesRegex(RuntimeError, "threaded agent failure"): + session.run("Question: inspect workspace") + + self.assertEqual(len(run_threads), 1) + self.assertNotEqual(run_threads[0], main_thread) + if __name__ == "__main__": unittest.main()