diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 5b6cba57d..6591f479f 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -15,14 +15,13 @@ import nbformat from nbclient import NotebookClient from nbclient.exceptions import CellTimeoutError, DeadKernelError from nbformat import NotebookNode -from nbformat.v4 import new_code_cell, new_output, new_markdown_cell -from rich.console import Console -from rich.syntax import Syntax +from nbformat.v4 import new_code_cell, new_markdown_cell, new_output +from rich.box import MINIMAL +from rich.console import Console, Group +from rich.live import Live from rich.markdown import Markdown from rich.panel import Panel -from rich.box import MINIMAL -from rich.live import Live -from rich.console import Group +from rich.syntax import Syntax from metagpt.actions import Action from metagpt.logs import logger @@ -166,7 +165,7 @@ class ExecutePyCode(ExecuteCode, Action): # 如果在Python脚本中运行,__file__ 变量存在 return False - def _process_code(self, code: Union[str, Dict, Message], language: str = None) -> Tuple: + def _process_code(self, code: Union[str, Dict], language: str = None) -> Tuple: language = language or "python" if isinstance(code, str) and Path(code).suffix in (".py", ".txt"): code = Path(code).read_text(encoding="utf-8") @@ -174,20 +173,10 @@ class ExecutePyCode(ExecuteCode, Action): if isinstance(code, str): return code, language + if isinstance(code, dict): assert "code" in code - if "language" not in code: - code["language"] = "python" - code, language = code["code"], code["language"] - elif isinstance(code, Message): - if isinstance(code.content, dict) and "language" not in code.content: - code.content["language"] = "python" - code, language = code.content["code"], code.content["language"] - elif isinstance(code.content, str): - code, language = code.content, language - else: - raise ValueError(f"Not support code type {type(code).__name__}.") - + code = code["code"] return code, language async def run_cell(self, cell: NotebookNode, cell_index: int) -> Tuple[bool, str]: @@ -229,7 +218,7 @@ class ExecutePyCode(ExecuteCode, Action): # code success outputs = self.parse_outputs(self.nb.cells[-1].outputs) return truncate(remove_escape_and_color_codes(outputs), is_success=success) - elif language == 'markdown': + elif language == "markdown": # markdown self.add_markdown_cell(code) return code, True @@ -238,28 +227,28 @@ class ExecutePyCode(ExecuteCode, Action): def truncate(result: str, keep_len: int = 2000, is_success: bool = True): - desc = f"Executed code {'successfully' if is_success else 'failed, please reflect the cause of bug and then debug'}" + """对于超出keep_len个字符的result: 执行失败的代码, 展示result后keep_len个字符; 执行成功的代码, 展示result前keep_len个字符。""" + desc = f"Executed code {'successfully. ' if is_success else 'failed, please reflect the cause of bug and then debug. '}" + is_same_desc = False + if is_success: - desc += f"Truncated to show only {keep_len} characters\n" + desc += f"Truncated to show only first {keep_len} characters\n" else: - desc += "Show complete information for you." + desc += f"Truncated to show only last {keep_len} characters\n" if result.startswith(desc): result = result[len(desc) :] + is_same_desc = True + + if result.strip().startswith(" keep_len: - result = result[-keep_len:] if not is_success else result - if not result: - result = 'No output about your code. Only when importing packages it is normal case. Recap and go ahead.' - return result, False + result = result[-keep_len:] if not is_success else result[:keep_len] + return desc + result, is_success - if result.strip().startswith(" {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -@pytest.mark.asyncio -async def test_aask_code_str(): - llm = OpenAILLM() - msg = "Write a python hello world code." - rsp = await llm.aask_code(msg) # -> {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -@pytest.mark.asyncio -async def test_aask_code_Message(): - llm = OpenAILLM() - msg = UserMessage("Write a python hello world code.") - rsp = await llm.aask_code(msg) # -> {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -def test_ask_code(): - llm = OpenAIGPTAPI() - msg = [{"role": "user", "content": "Write a python hello world code."}] - rsp = llm.ask_code(msg) # -> {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -def test_ask_code_str(): - llm = OpenAIGPTAPI() - msg = "Write a python hello world code." - rsp = llm.ask_code(msg) # -> {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -def test_ask_code_Message(): - llm = OpenAIGPTAPI() - msg = UserMessage("Write a python hello world code.") - rsp = llm.ask_code(msg) # -> {'language': 'python', 'code': "print('Hello, World!')"} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -def test_ask_code_list_Message(): - llm = OpenAIGPTAPI() - msg = [UserMessage("a=[1,2,5,10,-10]"), UserMessage("写出求a中最大值的代码python")] - rsp = llm.ask_code(msg) # -> {'language': 'python', 'code': 'max_value = max(a)\nmax_value'} - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -def test_ask_code_list_str(): - llm = OpenAIGPTAPI() - msg = ["a=[1,2,5,10,-10]", "写出求a中最大值的代码python"] - rsp = llm.ask_code(msg) # -> {'language': 'python', 'code': 'max_value = max(a)\nmax_value'} - print(rsp) - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - - -@pytest.mark.asyncio -async def test_ask_code_steps2(): - llm = OpenAIGPTAPI() - msg = ["step by setp 生成代码: Step 1. 先生成随机数组a, Step 2. 求a中最大值, Step 3. 绘制数据a的直方图"] - rsp = await llm.aask_code(msg) # -> {'language': 'python', 'code': 'max_value = max(a)\nmax_value'} - print(rsp) - assert "language" in rsp - assert "code" in rsp - assert len(rsp["code"]) > 0 - assert "Step 1" in rsp["code"] - assert "Step 2" in rsp["code"] - assert "Step 3" in rsp["code"] - - class TestOpenAI: @pytest.fixture def config(self): @@ -146,6 +63,38 @@ class TestOpenAI: openai_api_type="azure", ) + @pytest.fixture + def tool_calls_rsp(self): + function_rsps = [ + Function(arguments='{\n"language": "python",\n"code": "print(\'hello world\')"}', name="execute"), + Function(arguments='{\n"language": "python",\n"code": \'print("hello world")\'}', name="execute"), + Function(arguments='{\n"language": \'python\',\n"code": "print(\'hello world\')"}', name="execute"), + Function(arguments='{\n"language": "python",\n"code": "print(\'hello world\')"}', name="execute"), + Function(arguments='{\n"language": "python",\n"code": ```print("hello world")```}', name="execute"), + Function(arguments='{\n"language": "python",\n"code": """print("hello world")"""}', name="execute"), + Function(arguments='\nprint("hello world")\\n', name="execute"), + # only `{` in arguments + Function(arguments='{\n"language": "python",\n"code": "print(\'hello world\')"', name="execute"), + # no `{`, `}` in arguments + Function(arguments='\n"language": "python",\n"code": "print(\'hello world\')"', name="execute"), + ] + tool_calls = [ + ChatCompletionMessageToolCall(type="function", id=f"call_{i}", function=f) + for i, f in enumerate(function_rsps) + ] + messages = [ChatCompletionMessage(content=None, role="assistant", tool_calls=[t]) for t in tool_calls] + # 添加一个纯文本响应 + messages.append( + ChatCompletionMessage(content="Completed a python code for hello world!", role="assistant", tool_calls=None) + ) + choices = [ + Choice(finish_reason="tool_calls", logprobs=None, index=i, message=msg) for i, msg in enumerate(messages) + ] + return [ + ChatCompletion(id=str(i), choices=[c], created=i, model="gpt-4", object="chat.completion") + for i, c in enumerate(choices) + ] + def test_make_client_kwargs_without_proxy(self, config): instance = OpenAILLM() instance.config = config @@ -171,3 +120,17 @@ class TestOpenAI: instance.config = config_azure_proxy kwargs = instance._make_client_kwargs() assert "http_client" in kwargs + + def test_get_choice_function_arguments_for_aask_code(self, tool_calls_rsp): + instance = OpenAILLM() + for i, rsp in enumerate(tool_calls_rsp): + code = instance.get_choice_function_arguments(rsp) + logger.info(f"\ntest get function call arguments {i}: {code}") + assert "code" in code + assert "language" in code + assert "hello world" in code["code"] + + if "Completed a python code for hello world!" == code["code"]: + code["language"] == "markdown" + else: + code["language"] == "python"