fix(filesystem): avoid replaying failed agent runs

Only use the fresh event loop fallback for missing running-loop detection, so RuntimeError from a threaded agent run is not retried.
This commit is contained in:
Bukely_ 2026-05-26 20:27:58 +08:00 committed by BukeLy
parent ad45f96dfa
commit 2297453103
2 changed files with 35 additions and 2 deletions

View file

@ -585,11 +585,12 @@ class PIFSAgentSession:
try:
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
except RuntimeError:
return asyncio.run(_run())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
def _run_bash(self, command: str) -> str:
started = time.time()
ok = True

View file

@ -1,14 +1,18 @@
import io
import os
import threading
import unittest
from unittest.mock import patch
from types import SimpleNamespace
from pydantic import BaseModel, ConfigDict
from pageindex.filesystem import agent as agent_module
from pageindex.filesystem.agent import (
AGENT_TOOL_POLICY,
AGENT_SYSTEM_PROMPT,
BASH_TOOL_DESCRIPTION,
PIFSAgentSession,
PIFSAgentStreamObserver,
build_agent_model_settings,
normalize_agent_stream_mode,
@ -219,6 +223,34 @@ class PIFSAgentStreamTest(unittest.TestCase):
self.assertIn("clarify only after a reasonable search", AGENT_SYSTEM_PROMPT)
self.assertIn("search for candidate documents before asking", AGENT_TOOL_POLICY)
def test_threaded_runtime_error_is_not_retried_on_fresh_loop(self):
session = object.__new__(PIFSAgentSession)
session.executor = SimpleNamespace(query_context=None)
session.normalized_stream_mode = "off"
session.agent_log = []
session.max_seconds = None
session.max_turns = 1
session.session = None
session.agent = object()
main_thread = threading.get_ident()
run_threads = []
def fail_asyncio_run(coro):
coro.close()
run_threads.append(threading.get_ident())
raise RuntimeError("threaded agent failure")
with (
patch.object(agent_module.asyncio, "get_running_loop", return_value=object()),
patch.object(agent_module.asyncio, "run", side_effect=fail_asyncio_run),
):
with self.assertRaisesRegex(RuntimeError, "threaded agent failure"):
session.run("Question: inspect workspace")
self.assertEqual(len(run_threads), 1)
self.assertNotEqual(run_threads[0], main_thread)
if __name__ == "__main__":
unittest.main()