diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 663f76b7b..c41e0fc5a 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -5,6 +5,10 @@ @File : write_code_v2.py """ from typing import Dict, List, Union, Tuple +from tenacity import retry, stop_after_attempt, wait_fixed +from pathlib import Path +import re +import json from metagpt.actions import Action from metagpt.llm import LLM @@ -86,7 +90,6 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): self, context: [List[Message]], plan: Plan = None, - code_steps: str = "", system_msg: str = None, **kwargs, ) -> str: @@ -206,25 +209,110 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): return rsp["code"] +class MakeTools(WriteCodeByGenerate): + DEFAULT_SYSTEM_MSG = """Please Create a very General Function Code startswith `def` from any codes you got.\n + **Notice: + 1. Your code must contain a general function start with `def`. + 2. Refactor your code to get the most efficient implementation for large input data in the shortest amount of time. + 3. Use Google style for function annotations. + 4. Write example code after `if __name__ == '__main__':`by using old varibales in old code, + and make sure it could be execute in the user's machine. + 5. Do not have missing package references.** + """ + + def __init__(self, name: str = '', context: list[Message] = None, llm: LLM = None, workspace: str = None): + """ + :param str name: name, defaults to '' + :param list[Message] context: context, defaults to None + :param LLM llm: llm, defaults to None + :param str workspace: tools code saved file path dir, defaults to None + """ + super().__init__(name, context, llm) + self.workspace = workspace or str(Path(__file__).parents[1].joinpath("./tools/functions/libs/udf")) + self.file_suffix: str = '.py' + + def parse_function_name(self, function_code: str) -> str: + # 定义正则表达式模式 + pattern = r'\bdef\s+([a-zA-Z_]\w*)\s*\(' + # 在代码中搜索匹配的模式 + match = re.search(pattern, function_code) + # 如果找到匹配项,则返回匹配的函数名;否则返回None + if match: + return match.group(1) + else: + return None + + def save(self, tool_code: str) -> None: + func_name = self.parse_function_name(tool_code) + if func_name is None: + raise ValueError(f"No function name found in {tool_code}") + saved_path = Path(self.workspace).joinpath(func_name+self.file_suffix) + logger.info(f"Saved tool_code {func_name} in {str(saved_path)}.") + saved_path.write_text(tool_code, encoding='utf-8') + + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) + async def run(self, code_message: List[Message | Dict], **kwargs) -> str: + msgs = self.process_msg(code_message) + logger.info(f"\n\nAsk to Make tools:\n{'-'*60}\n {msgs[-1]}") + tool_code = await self.llm.aask_code(msgs, **kwargs) + max_tries, current_try = 3, 1 + func_name = self.parse_function_name(tool_code['code']) + while current_try < max_tries and func_name is None: + logger.info(f"\n\nTools Respond\n{'-'*60}\n: {tool_code}") + logger.warning(f"No function name found in code, we will retry make tools. \n\n{tool_code['code']}\n") + msgs.append({'role': 'assistant', 'content': 'We need a general function in above code,but not found function.'}) + tool_code = await self.llm.aask_code(msgs, **kwargs) + current_try += 1 + func_name = self.parse_function_name(tool_code['code']) + if func_name is not None: + break + self.save(tool_code['code']) + return tool_code["code"] + + class WriteCodeWithUDFs(WriteCodeByGenerate): """Write code with user defined function.""" from metagpt.tools.functions.libs.udf import UDFS - DEFAULT_SYSTEM_MSG = f"""Please remember these functions, you will use these functions to write code:\n - {UDFS}, **Notice: 1. if no right udf for user requirement, please send `No udf found`** + UDFS_DEFAULT_SYSTEM_MSG = f"""Please remember these functions, you will use these functions to write code:\n + {UDFS}, **Notice: 1. if no udf meets user requirement, please send `No udf found`. 2.Only use function code provied to you. + 3. Dont generate code from scratch.** """ async def aask_code_and_text(self, context: List[Dict], **kwargs) -> Tuple[str]: rsp = await self.llm.acompletion(context, **kwargs) rsp_content = self.llm.get_choice_text(rsp) code = CodeParser.parse_code(None, rsp_content) - if code.startswith('No udf found') or rsp_content.startswith('No udf found'): + if 'No udf found' in code or 'No udf found' in rsp_content: rsp_content = 'No udf found' code = 'No udf found' return code, rsp_content - async def run(self, context: List[Message], plan: Plan = None, task_guide: str = "", **kwargs) -> str: - prompt = self.process_msg(context) - logger.info(prompt[-1]) - code, _ = await self.aask_code_and_text(prompt, **kwargs) + async def run(self, context: List[Message], plan: Plan = None, **kwargs) -> str: + from metagpt.tools.functions.libs.udf import UDFS + if len(UDFS) > 0: + # Write code from user defined function. + prompt = self.process_msg(context, self.UDFS_DEFAULT_SYSTEM_MSG) + logger.info(prompt[-1]) + try: + logger.info("Local user defined function as following:") + logger.info(json.dumps(UDFS, indent=4, ensure_ascii=False)) + except Exception: + from pprint import pprint + pprint(UDFS) + logger.info('Writing code from user defined function by LLM...') + code, _ = await self.aask_code_and_text(prompt, **kwargs) + logger.info(f"Writing code from user defined function: \n{'-'*50}\n {code}") + if code != 'No udf found': + return code + logger.warning("No udf found, we will write code from scratch by LLM.") + # Writing code from scratch. + logger.warning("Writing code from scratch by LLM.") + code = await super().run(context, plan, self.DEFAULT_SYSTEM_MSG, **kwargs) + logger.info(f"Code Writing code from scratch by LLM is :\n{'-'*60}\n {code}") + # Make tools for above code. + logger.info("Make tools for above code.") + make_tools = MakeTools() + tool_code = await make_tools.run(code) + make_tools.save(tool_code) return code