diff --git a/examples/ags/benchmark/gsm8k.py b/examples/ags/benchmark/gsm8k.py index 8ec24ca2d..ebde56484 100644 --- a/examples/ags/benchmark/gsm8k.py +++ b/examples/ags/benchmark/gsm8k.py @@ -1,70 +1,4 @@ # -*- coding: utf-8 -*- -# @Date : +# @Date : # @Author : issac # @Desc : test on gsm8k - -import json -import re -import os - -# 读取原始数据集 -def read_jsonl(path: str): - with open(path) as fh: - return [json.loads(line) for line in fh.readlines() if line] - -# 和图/和基础模型直接交互得到答案 -def LLM(question): - answer = "" - # 这里就是输入问题question返回答案answer - # answer = 根据question生成的回答 - return answer - -def gsm_extract_answer(completion): - ANS_RE = re.compile(r"#### (\-?[0-9\.\,]+)") - INVALID_ANS = "[invalid]" - - match = ANS_RE.search(completion) - if match: - match_str = match.group(1).strip() - match_str = match_str.replace(",", "") - return match_str - else: - return INVALID_ANS - - -def gsm_is_correct(data): - INVALID_ANS = "[invalid]" - - gt_answer = gsm_extract_answer(data["answer"]) - assert gt_answer != INVALID_ANS - return gsm_extract_answer(data["answer_llm"]) == gt_answer - - -# 提取数据集并得到测试答案 -def get_examples(split): - path = os.path.join("", f"{split}.jsonl") - output_path = "gsm8k_generate.jsonl" - examples = read_jsonl(path) - - processed_examples = [] # 用于存储处理后的样本 - - for ex in examples: - answer_llm = LLM(ex['question']) - ex['answer_llm'] = answer_llm - ex['is_correct'] = gsm_is_correct(ex) - # 将处理后的样本添加到列表中 - processed_examples.append(ex) - - # 将处理后的样本写入到新的 JSONL 文件 - with open(output_path, 'w', encoding='utf-8') as f: - for example in processed_examples: - # 将字典转换为 JSON 格式的字符串,并写入新行 - json_line = json.dumps(example) + '\n' - f.write(json_line) - - print(f"{len(examples)} {split} examples") - return examples - -if __name__ == "__main__": - example = get_examples("gsm") - print(example[:5]) \ No newline at end of file diff --git a/examples/ags/benchmark/hotpotQA.py b/examples/ags/benchmark/hotpotQA.py index 260e4e11b..7e4923491 100644 --- a/examples/ags/benchmark/hotpotQA.py +++ b/examples/ags/benchmark/hotpotQA.py @@ -1,155 +1,4 @@ # -*- coding: utf-8 -*- -# @Date : +# @Date : # @Author : issac # @Desc : test on hotpotqa - -import sys -import json -import re -import string -from collections import Counter -import pickle - -def normalize_answer(s): - - def remove_articles(text): - return re.sub(r'\b(a|an|the)\b', ' ', text) - - def white_space_fix(text): - return ' '.join(text.split()) - - def remove_punc(text): - exclude = set(string.punctuation) - return ''.join(ch for ch in text if ch not in exclude) - - def lower(text): - return text.lower() - - return white_space_fix(remove_articles(remove_punc(lower(s)))) - - -def f1_score(prediction, ground_truth): - normalized_prediction = normalize_answer(prediction) - normalized_ground_truth = normalize_answer(ground_truth) - - ZERO_METRIC = (0, 0, 0) - - if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: - return ZERO_METRIC - if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: - return ZERO_METRIC - - prediction_tokens = normalized_prediction.split() - ground_truth_tokens = normalized_ground_truth.split() - common = Counter(prediction_tokens) & Counter(ground_truth_tokens) - num_same = sum(common.values()) - if num_same == 0: - return ZERO_METRIC - precision = 1.0 * num_same / len(prediction_tokens) - recall = 1.0 * num_same / len(ground_truth_tokens) - f1 = (2 * precision * recall) / (precision + recall) - return f1, precision, recall - - -def exact_match_score(prediction, ground_truth): - return (normalize_answer(prediction) == normalize_answer(ground_truth)) - -def update_answer(metrics, prediction, gold): - em = exact_match_score(prediction, gold) - f1, prec, recall = f1_score(prediction, gold) - metrics['em'] += float(em) - metrics['f1'] += f1 - metrics['prec'] += prec - metrics['recall'] += recall - return em, prec, recall - -def update_sp(metrics, prediction, gold): - cur_sp_pred = set(map(tuple, prediction)) - gold_sp_pred = set(map(tuple, gold)) - tp, fp, fn = 0, 0, 0 - for e in cur_sp_pred: - if e in gold_sp_pred: - tp += 1 - else: - fp += 1 - for e in gold_sp_pred: - if e not in cur_sp_pred: - fn += 1 - prec = 1.0 * tp / (tp + fp) if tp + fp > 0 else 0.0 - recall = 1.0 * tp / (tp + fn) if tp + fn > 0 else 0.0 - f1 = 2 * prec * recall / (prec + recall) if prec + recall > 0 else 0.0 - em = 1.0 if fp + fn == 0 else 0.0 - metrics['sp_em'] += em - metrics['sp_f1'] += f1 - metrics['sp_prec'] += prec - metrics['sp_recall'] += recall - return em, prec, recall - -def LLM(question): - answer = "" - # 这里就是输入问题question返回答案answer - # answer = 根据question生成的回答 - return answer - -def eval(prediction_file, gold_file): - with open(prediction_file) as f: - prediction = json.load(f) - with open(gold_file) as f: - gold = json.load(f) - - metrics = {'em': 0, 'f1': 0, 'prec': 0, 'recall': 0, - 'sp_em': 0, 'sp_f1': 0, 'sp_prec': 0, 'sp_recall': 0, - 'joint_em': 0, 'joint_f1': 0, 'joint_prec': 0, 'joint_recall': 0} - for dp in gold: - cur_id = dp['_id'] - can_eval_joint = True - if cur_id not in prediction['answer']: - print('missing answer {}'.format(cur_id)) - can_eval_joint = False - else: - em, prec, recall = update_answer( - metrics, prediction['answer'][cur_id], dp['answer']) - - N = len(gold) - for k in metrics.keys(): - metrics[k] /= N - - print(metrics) - -def LLM(question): - answer = question - # 这里就是输入问题question返回答案answer - # answer = 根据question生成的回答 - return answer - -def answer(prediction_file, gold_file): - with open(gold_file) as f: - gold = json.load(f) - - # 初始化预测字典,包含 answer 和 sp 两个键,初始为空字典 - prediction = {'answer': {}} - - for dp in gold: - cur_id = dp['_id'] - paragraphs = [item[1] for item in dp['context'] if isinstance(item[1], list)] # 确保 item[1] 是列表 - # 将所有文本段落连接成一个字符串 - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - question = dp['question'] - - # 构建输入字符串 - input_llm = f"question:{question}\n\ncontext:{context_str}" - - # 假设 LLM 是一个函数,返回模型的预测答案 - response = LLM(input_llm) - - # 将预测答案存储在字典中,键为 cur_id - prediction['answer'][cur_id] = response - - # 将预测结果写入文件 - with open(prediction_file, 'w') as f: - json.dump(prediction, f) - - -if __name__ == '__main__': - answer('hotpot_pre.json', 'your path here') - eval('hotpot_pre.json', 'your path here') diff --git a/examples/ags/benchmark/humaneval.py b/examples/ags/benchmark/humaneval.py index fab02ef7f..02f221753 100644 --- a/examples/ags/benchmark/humaneval.py +++ b/examples/ags/benchmark/humaneval.py @@ -3,134 +3,132 @@ # @Author : didi # @Desc : test on human eval graph -import os +import asyncio import json +import os import subprocess import sys -import asyncio +from typing import Literal, Optional + import aiofiles -from metagpt.llm import LLM from evalplus.data import get_human_eval_plus -from examples.ags.w_action_node.utils import jsonl_ranker + from examples.ags.w_action_node.graph import HumanEvalGraph from examples.ags.w_action_node.operator import GenerateCode, GenerateCodeBlock +from examples.ags.w_action_node.utils import sort_json_by_task_id +from metagpt.llm import LLM +from metagpt.logs import logger +from metagpt.utils.common import add_jsonl_file, read_json_file +from metagpt.utils.exceptions import handle_exception generate_code = GenerateCode(llm=LLM()) generate_code_block = GenerateCodeBlock(llm=LLM()) -solver = HumanEvalGraph(name="solver", llm=LLM(), criteria='correctness, efficiency, readability', vote_count=5) +solver = HumanEvalGraph(name="solver", llm=LLM(), criteria="correctness, efficiency, readability", vote_count=5) -async def sample_generate(id, result_path:str="samples.jsonl",mode:str="ags"): +ModeType = Literal["ags", "alpha_codium", "llm"] + + +async def llm_generate(id): case = get_human_eval_plus()[f"{id}"] + solution_result = await generate_code_block(case["prompt"], case["entry_point"]) + sample_dict = dict(task_id=case["task_id"], solution=solution_result["code_solution"]) + return sample_dict + + +async def ags_generate(id, ensemble_count: int = 5): + case = get_human_eval_plus()[f"{id}"] + solution_result = await solver(case["prompt"], ensemble_count=ensemble_count) + sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"]) + return sample_dict + + +async def alpha_codium_generate(id): + case = get_human_eval_plus()[f"{id}"] + solution_result = await solver.alpha_codium(case["task_id"], case["prompt"], ensemble_count=5) + sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"]) + return sample_dict + + +async def route_generate(mode: ModeType, id: str): if mode == "ags": - solution_result = await solver(case['prompt'],ensemble_count=5) - sample_dict = dict(task_id=case['task_id'], solution=solution_result['final_solution']) - elif mode == "alpha": - solution_result = await solver.alpha_codium(case['task_id'], case['prompt'], ensemble_count=5) - sample_dict = dict(task_id=case['task_id'], solution=solution_result['final_solution']) + sample_dict = await ags_generate(id) + elif mode == "alpha_codium": + sample_dict = await alpha_codium_generate(id) elif mode == "llm": - solution_result = await generate_code_block(case['prompt'],case['entry_point']) - sample_dict = dict(task_id=case['task_id'], solution=solution_result['code_solution']) - print(sample_dict) - with open(result_path, mode='a') as f: - f.write(json.dumps(sample_dict) + '\n') - jsonl_ranker(result_path, result_path) + sample_dict = await llm_generate(id) + else: + raise ValueError(f"Invalid mode: {mode}") + return sample_dict -async def samples_generate(mode:str, result_path:str="samples.jsonl"): - cases = list(get_human_eval_plus().values()) + +async def sample_generate(id, result_path: str = "samples.jsonl", mode: ModeType = "ags"): + sample_dict = await route_generate(mode, id) + add_jsonl_file(result_path, [sample_dict]) + sort_json_by_task_id(result_path, result_path) + + +async def samples_generate(mode: ModeType, result_path: str = "samples.jsonl"): + ids = list(get_human_eval_plus().keys()) file_lock = asyncio.Lock() - - async def solve_and_write(case, mode): - try: - if mode == 'llm': - solution_result = await generate_code_block(problem_description=case['prompt'], function_name=case['entry_point']) - # solution_result = await generate_code(case['prompt']) - sample_dict = { - 'task_id': case['task_id'], - 'solution': solution_result['code_solution'] - } - elif mode == "ags": - solution_result = await solver(case['prompt'], ensemble_count=5) - sample_dict = { - 'task_id': case['task_id'], - 'solution': solution_result['final_solution'] - } - elif mode == "alpha": - solution_result = await solver.alpha_codium(case['task_id'], case['prompt'], ensemble_count=5) - sample_dict = { - 'task_id': case['task_id'], - 'solution': solution_result['final_solution'] - } - # TODO 解决 final_solution 问题之后就可以开始正式测评了 - async with file_lock: - async with aiofiles.open(result_path, mode='a') as f: - await f.write(json.dumps(sample_dict) + '\n') - return None - except Exception as e: - print(e) - return case['task_id'] + @handle_exception( + exception_type=Exception, + exception_msg="Error in solve_and_write function", + default_return=lambda id, *args, **kwargs: id, + ) + async def solve_and_write(id: str, mode: ModeType) -> Optional[str]: + sample_dict = await route_generate(mode, id) + async with file_lock: + async with aiofiles.open(result_path, mode="a") as f: + await f.write(json.dumps(sample_dict) + "\n") + return None - tasks = [solve_and_write(case, mode) for case in cases] + tasks = [solve_and_write(id, mode) for id in ids] results = await asyncio.gather(*tasks) failed_tasks = [task_id for task_id in results if task_id is not None] if failed_tasks: - print(failed_tasks) - if mode == 'llm': - for task_id in failed_tasks: - case = get_human_eval_plus()[task_id] - for _ in range(3): - try: - solution_result = await generate_code_block(case['prompt'],function_name=case['entry_point']) - task_dict = { - 'task_id': case['task_id'], - 'solution': solution_result['code_solution'] - } - with open(result_path, mode='a') as f: - f.write(json.dumps(task_dict) + '\n') - failed_tasks.remove(task_id) - break - except Exception as e: - print(f"{e} \n failure {task_id}") - elif mode == "ags" or mode == "alpha": - for task_id in failed_tasks: - try: - await sample_generate(task_id,result_path,mode) - except Exception as e: - print(f"failure {task_id}") - - jsonl_ranker(result_path, result_path) - - if not failed_tasks: - # 自动 sanitize - # result_path = automatic_sanitize(result_path) - if automatic_evalplus(result_path): - eval_path = result_path[:-6]+"_eval_results.json" - unpassed_exapmle = extract_failure_tests(eval_path) - print(unpassed_exapmle) - else: - print(failed_tasks) + logger.info(failed_tasks) + for task_id in failed_tasks: + try: + await sample_generate(task_id, result_path, mode) + failed_tasks.remove(task_id) + except Exception: + logger.error(f"{task_id} fail") -def automatic_sanitize(result_path: str = "samples.jsonl"): + sort_json_by_task_id(result_path, result_path) + + if not failed_tasks: + if automatic_evalplus(result_path): + eval_path = result_path[:-6] + "_eval_results.json" + unpassed_exapmle = extract_failure_tests(eval_path) + logger.info(unpassed_exapmle) + else: + logger.info(failed_tasks) + + +@handle_exception(exception_type=subprocess.CalledProcessError, exception_msg="sanitize error", default_return=None) +def automatic_sanitize(result_path: str = "samples.jsonl") -> Optional[str]: """ 在命令行中自动执行 evalplus.sanitize --samples result_path 返回result_path前缀加上"-sanitized.jsonl" """ command = ["evalplus.sanitize", "--samples", result_path] - - try: - subprocess.run(command, check=True) - except subprocess.CalledProcessError as e: - print(f"执行命令时出错: {e}") - return None - - # 构建sanitized文件路径 + + subprocess.run(command, check=True) + base_name = os.path.splitext(result_path)[0] sanitized_path = f"{base_name}-sanitized.jsonl" - + return sanitized_path -def automatic_evalplus(result_path:str ="samples.jsonl"): + +@handle_exception( + exception_type=subprocess.CalledProcessError, + exception_msg="Error in automatic_evalplus function", + default_return=False, +) +def automatic_evalplus(result_path: str = "samples.jsonl") -> bool: """ 在命令行中自动执行 evalplus.evaluate --dataset humaneval --samples samples.jsonl --parallel 2 --base-only """ @@ -138,34 +136,30 @@ def automatic_evalplus(result_path:str ="samples.jsonl"): sys.executable, # 使用当前 Python 解释器 "-m", "evalplus.evaluate", - "--dataset", "humaneval", - "--samples", result_path, - "--parallel", "2", - "--base-only" + "--dataset", + "humaneval", + "--samples", + result_path, + "--parallel", + "2", + "--base-only", ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("输出:", result.stdout) - return True - except subprocess.CalledProcessError as e: - print("错误输出:", e.stderr) - return False - -def extract_failure_tests(file_path:str = "samples_eval_results.json"): - with open(file_path, 'r') as f: - task_results = json.load(f) + + result = subprocess.run(command, check=True, capture_output=True, text=True) + logger.info(f"ouptput: \n {result.stdout}") + return True + + +def extract_failure_tests(file_path: str = "samples_eval_results.json"): + task_results = read_json_file(file_path) failed_tests = [] - - for task in task_results['eval'].values(): + for task in task_results["eval"].values(): if task[0]["base_status"] == "fail": failed_test = { "task_id": task[0]["task_id"], - # "solution": task["solution"], - # "fail_tests": task["base_fail_tests"] } failed_tests.append(failed_test) - print(len(failed_tests)) - + logger.info(f"length of failed tests: {len(failed_tests)}") + return failed_tests diff --git a/examples/ags/w_action_node/graph.py b/examples/ags/w_action_node/graph.py index 448753b42..c0557a1dd 100644 --- a/examples/ags/w_action_node/graph.py +++ b/examples/ags/w_action_node/graph.py @@ -3,26 +3,41 @@ # @Author : didi # @Desc : graph & an instance - humanevalgraph -from metagpt.llm import LLM from typing import List -from examples.ags.w_action_node.operator import Generate, GenerateCode, GenerateCodeBlock, Review, Revise, FuEnsemble, MdEnsemble, DbEnsemble, Rephrase, Test -from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl + from evalplus.data import get_human_eval_plus + +from examples.ags.w_action_node.operator import ( + FuEnsemble, + Generate, + GenerateCode, + GenerateCodeBlock, + MdEnsemble, + Rephrase, + Review, + Revise, + Test, +) +from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl +from metagpt.llm import LLM + + class Graph: - def __init__(self, name:str, llm:LLM) -> None: + def __init__(self, name: str, llm: LLM) -> None: self.name = name - self.model = llm + self.model = llm def __call__(): NotImplementedError("Subclasses must implement __call__ method") - def optimize(dataset:List): + def optimize(dataset: List): pass + class HumanEvalGraph(Graph): - def __init__(self, name:str, llm: LLM, criteria:str, vote_count:int =5) -> None: + def __init__(self, name: str, llm: LLM, criteria: str, vote_count: int = 5) -> None: super().__init__(name, llm) - self.criteria = criteria # TODO 自动构建图时,图的初始参数与图所使用的算子要求的外部参数相匹配 + self.criteria = criteria # TODO 自动构建图时,图的初始参数与图所使用的算子要求的外部参数相匹配 self.generate_code = GenerateCode(llm=llm) self.generate_code_block = GenerateCodeBlock(llm=llm) self.review = Review(llm=llm, criteria=criteria) @@ -32,84 +47,82 @@ class HumanEvalGraph(Graph): self.fuensemble = FuEnsemble(llm=llm) self.mdensemble = MdEnsemble(llm=llm, vote_count=vote_count) - async def __call__(self, problem:str, ensemble_count:int = 3): + async def __call__(self, problem: str, ensemble_count: int = 3): solution_list = [] for _ in range(ensemble_count): - for retry_count in range(5): - try: - # solution = await self.generate_code(problem) - solution = await self.generate_code_block(problem) - solution = solution.get('code_solution') - solution_list.append(solution) - break - except Exception as e: - print(e) + solution = await self.generate_code_block(problem) + solution = solution.get("code_solution") + solution_list.append(solution) solution = await self.mdensemble("code", solution_list, problem) return solution - - async def alpha_codium(self, problem_id:str, problem:str, ensemble_count:int = 3): + + async def alpha_codium(self, problem_id: str, problem: str, ensemble_count: int = 3): + """ + Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering + Link: https://arxiv.org/abs/2404.14963 + Flow: An incomplete version of alpha codium, implementing the basic process of rephrase -> code ensemble -> tes + """ test_cases = extract_test_cases_from_jsonl(problem_id) - entry_point = get_human_eval_plus()[problem_id]['entry_point'] - rephrase_problem = await self.rephrase(problem) # 在rephrase 中拼接原始的问题描述 + entry_point = get_human_eval_plus()[problem_id]["entry_point"] + rephrase_problem = await self.rephrase(problem) # 在rephrase 中拼接原始的问题描述 solution_list = [] for _ in range(ensemble_count): - for retry_count in range(5): - try: - solution = await self.generate_code_block.rephrase_generate(problem, rephrase_problem, function_name=entry_point) - solution = solution.get('code_solution') - solution_list.append(solution) - break - except Exception as e: - print(e) + solution = await self.generate_code_block.rephrase_generate( + problem, rephrase_problem, function_name=entry_point + ) + solution = solution.get("code_solution") + solution_list.append(solution) solution = await self.mdensemble("code", solution_list, problem) solution = await self.tester(problem_id, problem, rephrase_problem, solution, test_cases) return solution - async def review_revise_ensemble(self, problem:str, ensemble_count:int = 2): + async def review_revise_ensemble(self, problem: str, ensemble_count: int = 2, revise_round: int = 3): solution_list = [] for _ in range(ensemble_count): - solution = await self.single_solve(problem, 3) + solution = await self.single_solve(problem, revise_round) solution_list.append(solution) solution = await self.ensemble(solution_list, problem) return solution - async def simple_ensemble(self, problem:str, ensemble_count:int = 3): + async def simple_ensemble(self, problem: str, ensemble_count: int = 3): solution_list = [] for _ in range(ensemble_count): solution = await self.generate_code(problem) # solution = await self.generate_code_block(problem) - solution = solution.get('code_solution') + solution = solution.get("code_solution") solution_list.append(solution) solution = await self.fuensemble(solution_list, problem) return solution - - async def single_solve(self, problem:str, max_loop:int): + + async def single_solve(self, problem: str, max_loop: int): solution = await self.generate_code(problem) - solution = solution.get('code_solution') + solution = solution.get("code_solution") for _ in range(max_loop): review_feedback = await self.review(problem, solution) - if review_feedback['review_result']: + if review_feedback["review_result"]: break - solution = await self.revise(problem, solution, review_feedback['feedback']) - solution = solution.get('revised_solution') + solution = await self.revise(problem, solution, review_feedback["feedback"]) + solution = solution.get("revised_solution") return solution - + + class Gsm8kGraph(Graph): - def __init__(self, name:str, llm: LLM) -> None: + def __init__(self, name: str, llm: LLM) -> None: super().__init__(name, llm) self.generate = Generate(llm=llm) self.rephrase = Rephrase(llm=llm) - - async def __call__(self, problem:str): + + async def __call__(self, problem: str): solution = self.generate(problem) return solution - + + class HotpotQAGraph(Graph): - def __init__(self, name:str, llm: LLM) -> None: + def __init__(self, name: str, llm: LLM) -> None: super().__init__(name, llm) self.generate = Generate(llm=llm) self.rephrase = Rephrase(llm=llm) - - async def __call__(self, problem:str): + + async def __call__(self, problem: str): solution = self.generate(problem) - return solution \ No newline at end of file + return solution diff --git a/examples/ags/w_action_node/operator.py b/examples/ags/w_action_node/operator.py index 759de9756..72c0b30fc 100644 --- a/examples/ags/w_action_node/operator.py +++ b/examples/ags/w_action_node/operator.py @@ -3,30 +3,60 @@ # @Author : didi # @Desc : operator demo of ags import ast +import random import sys import traceback -import random -from typing import List, Tuple, Any, Dict from collections import Counter +from typing import Dict, List, Tuple -from metagpt.actions.action_node import ActionNode -from metagpt.llm import LLM +from tenacity import retry, stop_after_attempt -from examples.ags.w_action_node.operator_an import GenerateOp, GenerateCodeOp, GenerateCodeBlockOp ,ReviewOp, ReviseOp, FuEnsembleOp, MdEnsembleOp, ReflectionTestOp, RephraseOp -from examples.ags.w_action_node.prompt import GENERATE_PROMPT, GENERATE_CODE_PROMPT, GENERATE_CODEBLOCK_PROMPT, REVIEW_PROMPT, REVISE_PROMPT, FU_ENSEMBLE_PROMPT, MD_ENSEMBLE_PROMPT, REFLECTION_ON_PUBILIC_TEST_PROMPT, REPHRASE_ON_PROBLEM_PROMPT, GENERATE_CODEBLOCK_REPHRASE_PROMPT -from examples.ags.w_action_node.prompt import DE_ENSEMBLE_CODE_FORMAT_PROMPT, DE_ENSEMBLE_TXT_FORMAT_PROMPT, DE_ENSEMBLE_ANGEL_PROMPT, DE_ENSEMBLE_DEVIL_PROMPT, DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT, DE_ENSEMBLE_JUDGE_FINAL_PROMPT +from examples.ags.w_action_node.operator_an import ( + FuEnsembleOp, + GenerateCodeBlockOp, + GenerateCodeOp, + GenerateOp, + MdEnsembleOp, + ReflectionTestOp, + RephraseOp, + ReviewOp, + ReviseOp, +) +from examples.ags.w_action_node.prompt import ( + DE_ENSEMBLE_ANGEL_PROMPT, + DE_ENSEMBLE_CODE_FORMAT_PROMPT, + DE_ENSEMBLE_DEVIL_PROMPT, + DE_ENSEMBLE_JUDGE_FINAL_PROMPT, + DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT, + DE_ENSEMBLE_TXT_FORMAT_PROMPT, + FU_ENSEMBLE_PROMPT, + GENERATE_CODE_PROMPT, + GENERATE_CODEBLOCK_PROMPT, + GENERATE_CODEBLOCK_REPHRASE_PROMPT, + GENERATE_PROMPT, + MD_ENSEMBLE_PROMPT, + REFLECTION_ON_PUBLIC_TEST_PROMPT, + REPHRASE_ON_PROBLEM_PROMPT, + REVIEW_PROMPT, + REVISE_PROMPT, +) from examples.ags.w_action_node.utils import test_cases_2_test_functions +from metagpt.actions.action_node import ActionNode +from metagpt.llm import LLM +from metagpt.logs import logger + class Operator: - def __init__(self, name, llm:LLM): + def __init__(self, name, llm: LLM): self.name = name self.llm = llm def __call__(self, *args, **kwargs): raise NotImplementedError + class Generate(Operator): - def __init__(self, name:str ="Generator", llm: LLM = LLM()): + def __init__(self, name: str = "Generate", llm: LLM = LLM()): super().__init__(name, llm) async def __call__(self, problem_description): @@ -34,10 +64,10 @@ class Generate(Operator): node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() return response - -class GenerateCode(Operator): - def __init__(self, name:str ="Coder", llm: LLM = LLM()): + +class GenerateCode(Operator): + def __init__(self, name: str = "GenerateCode", llm: LLM = LLM()): super().__init__(name, llm) async def __call__(self, problem_description): @@ -45,39 +75,49 @@ class GenerateCode(Operator): node = await ActionNode.from_pydantic(GenerateCodeOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() return response - -class GenerateCodeBlock(Operator): - def __init__(self, name:str ="Coder", llm: LLM = LLM()): + +class GenerateCodeBlock(Operator): + def __init__(self, name: str = "GenerateCodeBlock", llm: LLM = LLM()): super().__init__(name, llm) + @retry(stop=stop_after_attempt(3)) async def __call__(self, problem_description, function_name): prompt = GENERATE_CODEBLOCK_PROMPT.format(problem_description=problem_description) - node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(context=prompt, llm=self.llm, mode='code_fill',function_name=function_name) + node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill( + context=prompt, llm=self.llm, mode="code_fill", function_name=function_name + ) response = node.instruct_content.model_dump() return response + @retry(stop=stop_after_attempt(3)) async def rephrase_generate(self, problem_description, rephrase_problem, function_name): - prompt = GENERATE_CODEBLOCK_REPHRASE_PROMPT.format(problem_description=problem_description,rephrase_problem=rephrase_problem) - node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(context=prompt, llm=self.llm, mode='code_fill', function_name=function_name) + prompt = GENERATE_CODEBLOCK_REPHRASE_PROMPT.format( + problem_description=problem_description, rephrase_problem=rephrase_problem + ) + node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill( + context=prompt, llm=self.llm, mode="code_fill", function_name=function_name + ) response = node.instruct_content.model_dump() return response - + + class Review(Operator): - - def __init__(self, criteria, name:str ="Reviewer", llm: LLM = LLM()): + def __init__(self, criteria, name: str = "Review", llm: LLM = LLM()): self.criteria = criteria super().__init__(name, llm) async def __call__(self, problem_description, solution): - prompt = REVIEW_PROMPT.format(problem_description=problem_description, solution=solution, criteria=self.criteria) + prompt = REVIEW_PROMPT.format( + problem_description=problem_description, solution=solution, criteria=self.criteria + ) node = await ActionNode.from_pydantic(ReviewOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() return response -class Revise(Operator): - def __init__(self, name:str ="Reviser", llm: LLM = LLM()): +class Revise(Operator): + def __init__(self, name: str = "Revise", llm: LLM = LLM()): super().__init__(name, llm) async def __call__(self, problem_description, solution, feedback): @@ -86,12 +126,16 @@ class Revise(Operator): response = node.instruct_content.model_dump() return response -class FuEnsemble(Operator): - def __init__(self, name:str ="FuseEnsembler", llm: LLM = LLM()): +class FuEnsemble(Operator): + """ + Function: Critically evaluating multiple solution candidates, synthesizing their strengths, and developing an enhanced, integrated solution. + """ + + def __init__(self, name: str = "FuEnsemble", llm: LLM = LLM()): super().__init__(name, llm) - async def __call__(self, solutions:List, problem_description): + async def __call__(self, solutions: List, problem_description): solution_text = "" for solution in solutions: solution_text += str(solution) + "\n" @@ -99,13 +143,18 @@ class FuEnsemble(Operator): node = await ActionNode.from_pydantic(FuEnsembleOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() return response - -class MdEnsemble(Operator): - def __init__(self, name:str ="MedEnsembler", llm: LLM = LLM(), vote_count:int=3): + +class MdEnsemble(Operator): + """ + Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine + Link: https://arxiv.org/abs/2311.16452 + """ + + def __init__(self, name: str = "MdEnsemble", llm: LLM = LLM(), vote_count: int = 3): super().__init__(name, llm) self.vote_count = vote_count - + @staticmethod def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]: shuffled_solutions = solutions.copy() @@ -113,8 +162,7 @@ class MdEnsemble(Operator): answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)} return shuffled_solutions, answer_mapping - async def __call__(self, solution_type:str ,solutions:List[str], problem_description:str): - print(solutions) + async def __call__(self, solution_type: str, solutions: List[str], problem_description: str): all_responses = [] # 当Ensmeble方案是Code类型时,我们使用AST进行去重 if solution_type == "code": @@ -125,7 +173,7 @@ class MdEnsemble(Operator): try: tree = ast.parse(solution) structure_key = ast.dump(tree, annotate_fields=False, include_attributes=False) - + if structure_key not in unique_structures: unique_structures[structure_key] = solution updated_solutions.append(solution) @@ -136,54 +184,52 @@ class MdEnsemble(Operator): updated_length = len(solutions) if updated_length == 1: return {"final_solution": solutions[0]} - + for _ in range(self.vote_count): shuffled_solutions, answer_mapping = self.shuffle_answers(solutions) - + solution_text = "" for index, solution in enumerate(shuffled_solutions): solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n" - + prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, problem_description=problem_description) node = await ActionNode.from_pydantic(MdEnsembleOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() - - answer = response.get('solution_letter', '') + + answer = response.get("solution_letter", "") answer = answer.strip().upper() - + if answer in answer_mapping: original_index = answer_mapping[answer] # print(f"original index: {original_index}") all_responses.append(original_index) - + most_frequent_index = Counter(all_responses).most_common(1)[0][0] final_answer = solutions[most_frequent_index] return {"final_solution": final_answer} + class ScEnsemble(Operator): """ - self consistency ensemble + Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models + Link: https://arxiv.org/abs/2203.11171 """ + pass -class DbEnsemble(Operator): + +class MADEnsemble(Operator): """ - (Should we be going MAD? A Look at Multi-Agent Debate Strategies for LLMs) - The system is a multi-round debate system where each agent is given the - question and responses generated by all agents. For each round, a judge - analyzes the responses provided determines whether to terminate the - debate or keep going. At the end of the debate the judge is also responsible - for determining the final answer. + Paper: Should we be going MAD? A Look at Multi-Agent Debate Strategies for LLMs + Link: https://arxiv.org/abs/2311.17371 """ - def __init__(self, name:str ="DebateEnsemble", llm: LLM = LLM()): + + def __init__(self, name: str = "DebateEnsemble", llm: LLM = LLM()): super().__init__(name, llm) - self.agents = ["angel","devil","judge"] - self.format_requirements = { - "txt":DE_ENSEMBLE_TXT_FORMAT_PROMPT, - "code":DE_ENSEMBLE_CODE_FORMAT_PROMPT - } - - def get_system_prompt(self, name:str, mode:str='txt'): + self.agents = ["angel", "devil", "judge"] + self.format_requirements = {"txt": DE_ENSEMBLE_TXT_FORMAT_PROMPT, "code": DE_ENSEMBLE_CODE_FORMAT_PROMPT} + + def get_system_prompt(self, name: str, mode: str = "txt"): if name == "angel": if mode == "code": return DE_ENSEMBLE_ANGEL_PROMPT + "\n" + DE_ENSEMBLE_CODE_FORMAT_PROMPT @@ -194,10 +240,10 @@ class DbEnsemble(Operator): return DE_ENSEMBLE_DEVIL_PROMPT + "\n" + DE_ENSEMBLE_TXT_FORMAT_PROMPT elif name == "judge": if mode == "final": - return DE_ENSEMBLE_JUDGE_FINAL_PROMPT + return DE_ENSEMBLE_JUDGE_FINAL_PROMPT return DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT - - def construct_messages(self, message_history_with_name, name, mode:str="txt", phase:str="universal"): + + def construct_messages(self, message_history_with_name, name, mode: str = "txt", phase: str = "universal"): """ 基于name与mode来构建system message. 基于name来构建messages @@ -210,67 +256,63 @@ class DbEnsemble(Operator): elif name == "judge": messages = self._construct_judge(message_history_with_name, mode, messages) return messages - + def _construct_debate(self, message_history_with_name, name, messages): user_message = "" - + for message in message_history_with_name: if message["name"] == "Judge": continue elif message["name"] == name: if user_message: - messages.append({ - "role": "user", - "name": "user", - "content": user_message.strip("\n"), - }) - messages.append({ - "role": "assistant", - "name": name, - "content": message["content"], - }) + messages.append( + { + "role": "user", + "name": "user", + "content": user_message.strip("\n"), + } + ) + messages.append( + { + "role": "assistant", + "name": name, + "content": message["content"], + } + ) user_message = "" else: user_message += message["content"] - + if user_message: - messages.append({ - "role": "user", - "name": "user", - "content": user_message.strip("\n"), - }) - + messages.append( + { + "role": "user", + "name": "user", + "content": user_message.strip("\n"), + } + ) + return messages def _construct_judge(self, message_history_with_name, mode, messages): pass - async def debate_answer(self, message_history:List, role:str="angel"): + async def debate_answer(self, message_history: List, role: str = "angel"): messages = self.construct_messages(message_history, role) response = await self.llm.acompletion_text(messages=messages) - message_history.append({ - "role":"user", - "name":role, - "content":response} - ) + message_history.append({"role": "user", "name": role, "content": response}) return message_history, response - async def judge_answer(self, message_history:List, phase:str="universal"): + async def judge_answer(self, message_history: List, phase: str = "universal"): messages = self.construct_messages(message_history, "judge", phase=phase) response = await self.llm.acompletion_text(messages=messages) - message_history.append({ - "role": "user", - "name": "judge", - "content": response} - ) + message_history.append({"role": "user", "name": "judge", "content": response}) return message_history, response - async def __call__(self, origin_solution:str, problem_description:str, max_round:int = 3, mode:str='txt'): + async def __call__(self, origin_solution: str, problem_description: str, max_round: int = 3, mode: str = "txt"): # 思路,输入一个原始答案,构建一个agent代表这个答案进行辩论;另一个agent(devil)使用debate llm的内容进行辩论;法官在每一轮次做出决定是否终止,到了maxround还没终止就由法官进行总结。 - message_history_with_name = [ - {"role":"user", "name":"angel", "content":origin_solution} - ] - + message_history_with_name = [{"role": "user", "name": "angel", "content": origin_solution}] + for index in range(max_round): for agent in self.agents: if agent == "angel": @@ -280,91 +322,108 @@ class DbEnsemble(Operator): elif agent == "devil": message_history_with_name, rsp = self.debate_answer(message_history_with_name, role="devil") elif agent == "judge": - message_history_with_name, judge_result = self.judge_answer(message_history_with_name, phase="universal") + message_history_with_name, judge_result = self.judge_answer( + message_history_with_name, phase="universal" + ) if not judge_result["is_debating"]: """ 这里需要在 self.judge_answer 中设置一个自动给出solution的地方 """ - return {"final_solution":judge_result["final_solution"]} - - message_history_with_name.pop(-1) - message_history_with_name, judge_answer = self.judge_answer(message_history_with_name, phase="final") + return {"final_solution": judge_result["final_solution"]} + + message_history_with_name.pop(-1) + message_history_with_name, judge_answer = self.judge_answer(message_history_with_name, phase="final") + + return {"final_solution": judge_answer["debate_answer"]} - return {"final_solution":judge_answer["debate_answer"]} class Rephrase(Operator): """ - 1. AlphaCodium - 2. https://arxiv.org/abs/2404.14963 + Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering + Link: https://arxiv.org/abs/2404.14963 + Paper: Achieving >97% on GSM8K: Deeply Understanding the Problems Makes LLMs Better Solvers for Math Word Problems + Link: https://arxiv.org/abs/2404.14963 """ - def __init__(self, name:str ="Rephraser", llm: LLM = LLM()): + + def __init__(self, name: str = "Rephrase", llm: LLM = LLM()): super().__init__(name, llm) - async def __call__(self, problem_description:str)->str: + async def __call__(self, problem_description: str) -> str: prompt = REPHRASE_ON_PROBLEM_PROMPT.format(problem_description=problem_description) node = await ActionNode.from_pydantic(RephraseOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() return response["rephrased_problem"] - + + class Test(Operator): - def __init__(self, name:str ="Tester", llm: LLM = LLM()): + def __init__(self, name: str = "Test", llm: LLM = LLM()): super().__init__(name, llm) - + def exec_code(self, solution, test_cases, problem_id): - # TODO + # TODO # 1. 获取更加详细的Test error信息 - # 2. 更换Public Test数据集,当前使用的数据存在Label Leak(使用的Reflexion的数据集) - # 3. 实现单独测试每一个test case -> 1 + # 2. 更换Public Test数据集,当前使用的数据存在Label Leak(使用的Reflexion的数据集) -> 这个问题使用LLM抽取解决,直接生成为assert代码串 + # 3. 实现单独测试每一个test case -> 1 solution = solution["final_solution"] test_code = test_cases_2_test_functions(solution, test_cases) - print("test_code", test_code) try: exec(test_code, globals()) except AssertionError as e: exc_type, exc_value, exc_traceback = sys.exc_info() tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback) with open("tester.txt", "a") as f: - f.write("test_error" +problem_id + "\n") - error_infomation = {"test_fail_case": { - "error_type": "AssertionError", - "error_message": str(e), - "traceback": tb_str - }} - print("error here", error_infomation) + f.write("test_error" + problem_id + "\n") + error_infomation = { + "test_fail_case": {"error_type": "AssertionError", "error_message": str(e), "traceback": tb_str} + } + logger.info(f"test error: {error_infomation}") return error_infomation except Exception as e: with open("tester.txt", "a") as f: f.write(problem_id + "\n") - return {"exec_fail_case":str(e)} + return {"exec_fail_case": str(e)} return [] async def __call__(self, problem_id, problem, rephrase_problem, solution, test_cases): result = self.exec_code(solution, test_cases, problem_id) - print("result here", result) if result == []: return solution elif "exec_fail_case" in result: result = result["exec_fail_case"] - prompt = REFLECTION_ON_PUBILIC_TEST_PROMPT.format(problem_description=problem, rephrase_problem=rephrase_problem, code_solution=solution, exec_pass=f"executed unsuccessfully, error: \n {result}", test_fail="executed unsucessfully") + prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format( + problem_description=problem, + rephrase_problem=rephrase_problem, + code_solution=solution, + exec_pass=f"executed unsuccessfully, error: \n {result}", + test_fail="executed unsucessfully", + ) node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() - return {"final_solution":response["refined_solution"]} + return {"final_solution": response["refined_solution"]} else: result = result["test_fail_case"] - prompt = REFLECTION_ON_PUBILIC_TEST_PROMPT.format(problem_description=problem, rephrase_problem=rephrase_problem, code_solution=solution, exec_pass="executed successfully", test_fail=result) + prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format( + problem_description=problem, + rephrase_problem=rephrase_problem, + code_solution=solution, + exec_pass="executed successfully", + test_fail=result, + ) node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm) response = node.instruct_content.model_dump() - return {"final_solution":response["refined_solution"]} - + return {"final_solution": response["refined_solution"]} + + class FindFact(Operator): - pass + def __init__(self, name: str = "FindFact", llm: LLM = LLM()): + super().__init__(name, llm) + class SelfAsk(Operator): - pass + def __init__(self, name: str = "SelfAsk", llm: LLM = LLM()): + super().__init__(name, llm) + class Verify(Operator): - """ - ? 还没有想好 - """ - pass - + def __init__(self, name: str = "Verify", llm: LLM = LLM()): + super().__init__(name, llm) diff --git a/examples/ags/w_action_node/operator_an.py b/examples/ags/w_action_node/operator_an.py index 2cad6b9fc..9008742fa 100644 --- a/examples/ags/w_action_node/operator_an.py +++ b/examples/ags/w_action_node/operator_an.py @@ -5,26 +5,42 @@ from pydantic import BaseModel, Field + class GenerateOp(BaseModel): solution: str = Field(default="", description="Your Solution for this problem") + class GenerateCodeOp(BaseModel): code_solution: str = Field(default="", description="Complete and correct code here.") + class GenerateCodeBlockOp(BaseModel): code_solution: str = Field(default="", description="Your complete code solution for this problem") + class ReviewOp(BaseModel): - review_result: bool = Field(default=False, description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'") - feedback: str = Field(default="", description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.") + review_result: bool = Field( + default=False, + description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'", + ) + feedback: str = Field( + default="", + description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.", + ) + class ReviseOp(BaseModel): revised_solution: str = Field(default="", description="Based on the feedback, revised solution for this problem") + class FuEnsembleOp(BaseModel): - thought: str = Field(default="", description="Analyze the solutions and think how to combine the advantages of various solutions to form the best possible solution.") + thought: str = Field( + default="", + description="Analyze the solutions and think how to combine the advantages of various solutions to form the best possible solution.", + ) final_solution: str = Field(default="", description="Output the final solution after analysis and integration") + class MdEnsembleOp(BaseModel): thought: str = Field( default="""Example thought process: @@ -35,22 +51,30 @@ class MdEnsembleOp(BaseModel): 5. The use of 'isinstance' for type checking is a good practice. 6. The function handles decimal separators well by replacing ',' with '.'. Overall, this solution effectively solves the problem of comparing two values, with good error handling and flexibility. It could be improved by specifying behavior for equal values, but it's a strong solution as is.""", - description="Step-by-step analysis of the solutions to determine the best one." - ) - solution_letter: str = Field( - default="", - description="The letter of the chosen best solution (only one letter)." + description="Step-by-step analysis of the solutions to determine the best one.", ) + solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).") + class TestCaseExtractOp(BaseModel): - test_cases: list = Field(default=[('', [5, 8, 7, 1], 12), ('', [3, 3, 3, 3, 3], 9)], - description="Extracted test cases from the problem description") - + test_cases: list = Field( + default=[ + "assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True", + "assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False", + "", + ], + description="Extracted test cases from the problem description", + ) + + class RephraseOp(BaseModel): rephrased_problem: str = Field(default="", description="Rephrased problem description for this problem") + class ReflectionTestOp(BaseModel): - reflection: str = Field(default="", description="对关于代码执行错误或者测试用例失败step by step的思考") - refined_solution: str = Field(default="", description="对于代码执行错误或者测试用例失败的修正方案") - - \ No newline at end of file + reflection: str = Field( + default="", description="Step-by-step reflection on code execution errors or test case failures" + ) + refined_solution: str = Field( + default="", description="Corrective solution for code execution errors or test case failures" + ) diff --git a/examples/ags/w_action_node/prompt.py b/examples/ags/w_action_node/prompt.py index 266e5b722..f7c68e3ed 100644 --- a/examples/ags/w_action_node/prompt.py +++ b/examples/ags/w_action_node/prompt.py @@ -10,19 +10,19 @@ Generate Solution for the following problem: {problem_description} GENERATE_CODE_PROMPT = """ You are an expert programmer tasked with solving a coding problem. -### Problem Description: +### Problem Description {problem_description} -### Instructions: +### Instructions The above is an incomplete Python code fragment. Return the complete and correct code with no additional text. Please maintain the JSON format in your response. -### Your Response: +### Your Response """ GENERATE_CODEBLOCK_REPHRASE_PROMPT = """ Please provide a self-contained Python script that solves the following problem in a markdown code block: -### Problem Description: +### Problem Description {problem_description} ### self reflection on the problem @@ -35,7 +35,7 @@ When creating your solution: 4. Avoid adding additional test cases beyond those provided in the problem description. """ -GENERATE_CODEBLOCK_PROMPT =""" +GENERATE_CODEBLOCK_PROMPT = """ Please provide a self-contained Python script that solves the following problem in a markdown code block: {problem_description} @@ -99,10 +99,10 @@ Please strictly output in JSON format, do not output irrelevant content. """ DE_ENSEMBLE_CODE_FORMAT_PROMPT = """ Now please output your answer in json format, with the format as follows: -{{ - "reason":"<为什么要这样做>", - "code_solution":"<你觉得合适的solution,用代码表示出来>" -}} +{ + "reason":"", + "code_solution":"" +} Please strictly output in JSON format, do not output irrelevant content. """ DE_ENSEMBLE_ANGEL_PROMPT = """ @@ -131,18 +131,6 @@ You, as the moderator, will evaluate both sides' answers and determine if there Please strictly output in JSON format, do not output irrelevant content """ -EXTRACT_CASE_PROMPT = """ -You are given a coding problem, and you need to extract the test cases from the problem description. -{problem_description} - -一个problem中会有多个测试用例,每个测试用例包含三个部分: -1. 函数名 -2. 输入 -3. 期望输出 -每个测试用例包裹在一个三元组之中,三元组之间用逗号分隔,整体用列表包裹。 -由于结果需要被解析到JSON中,True与False请表示为true, false; -""" - REPHRASE_ON_PROBLEM_PROMPT = """ You are given a code contest problem: @@ -155,26 +143,26 @@ Reflect on the problem, and describe it in your own words, in bullet points. Pay """ -REFLECTION_ON_PUBILIC_TEST_PROMPT = """ - +REFLECTION_ON_PUBLIC_TEST_PROMPT = """ You are given a code contest problem, and a self-reflection on the problem: ### problem {problem_description} + ### self reflection on the problem {rephrase_problem} -======================= + A Python code solution was generated for the problem: ### Code Solution {code_solution} -======================= + This section of the code execution result is ### Execution Result {exec_pass} -======================= + However, when running the following input example, the code solution above failed to produce the expected output: #### Failed Test Case {test_fail} @@ -182,4 +170,31 @@ However, when running the following input example, the code solution above faile Your goal is to analyze the code solution and the error, and propose a fixed code which will produce the expected output for the provided test input. The fixed code should keep the solution robust, and work for all other input examples as well. Make sure the fixed code has a reasonable runtime - less than three seconds on a modern computer, given the problem constraints for large input. -""" \ No newline at end of file +""" + +EXTRACT_CASE_PROMPT = """ +You are given a coding problem, and you need to extract the test cases from the problem description. + +## Problem Description +{problem_description} + +Your task is to extract test cases from the above description and convert them into Python assert statements (as strings). These statements should be returned in a list for testing purposes. + +Example: +Input: +>>> has_close_elements([1.0, 2.0, 3.0], 0.5) +False +>>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3) +True + +Output: +[ + "assert candidate([1.0, 2.0, 3.0], 0.5) == False", + "assert candidate([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3) == True" +] + +Please ensure that: +1. Each test case is converted to a separate assert statement. +2. The function name in the original example (e.g., 'has_close_elements') is replaced with 'candidate'. +3. The assert statements are returned as strings in a list. +""" diff --git a/examples/ags/w_action_node/utils.py b/examples/ags/w_action_node/utils.py index de475cbac..9cb811976 100644 --- a/examples/ags/w_action_node/utils.py +++ b/examples/ags/w_action_node/utils.py @@ -3,38 +3,42 @@ # @Author : didi # @Desc : utils for experiment +import ast import json import re -import ast -from typing import List, Dict, Any, Tuple -from metagpt.llm import LLM -from metagpt.actions.action_node import ActionNode +from typing import Any, List, Tuple + from examples.ags.w_action_node.operator_an import TestCaseExtractOp from examples.ags.w_action_node.prompt import EXTRACT_CASE_PROMPT +from metagpt.actions.action_node import ActionNode +from metagpt.llm import LLM + def extract_task_id(task_id: str) -> int: """Extract the numeric part of the task_id.""" - match = re.search(r'/(\d+)', task_id) + match = re.search(r"/(\d+)", task_id) return int(match.group(1)) if match else 0 -def jsonl_ranker(input_file: str, output_file: str): + +def sort_json_by_task_id(input_file: str, output_file: str): """ Read a JSONL file, sort the entries based on task_id, and write to a new JSONL file. - + :param input_file: Path to the input JSONL file :param output_file: Path to the output JSONL file """ # Read and parse the JSONL file - with open(input_file, 'r') as f: + with open(input_file, "r") as f: data = [json.loads(line) for line in f] - + # Sort the data based on the numeric part of task_id - sorted_data = sorted(data, key=lambda x: extract_task_id(x['task_id'])) - + sorted_data = sorted(data, key=lambda x: extract_task_id(x["task_id"])) + # Write the sorted data to a new JSONL file - with open(output_file, 'w') as f: + with open(output_file, "w") as f: for item in sorted_data: - f.write(json.dumps(item) + '\n') + f.write(json.dumps(item) + "\n") + def parse_python_literal(s): try: @@ -42,7 +46,8 @@ def parse_python_literal(s): except (ValueError, SyntaxError): return s -def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_reflexion.jsonl"): + +def extract_test_cases_from_jsonl(problem_id: str, file_path: str = "public_test_reflexion.jsonl"): # 保留原有的硬编码测试用例 hardcoded_cases = { "HumanEval/32": "", @@ -55,7 +60,7 @@ def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_ref return hardcoded_cases[problem_id] # 如果没有硬编码的测试用例,从文件中读取 - with open(file_path, 'r') as file: + with open(file_path, "r") as file: for line in file: data = json.loads(line) if data.get("id") == problem_id: @@ -63,60 +68,63 @@ def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_ref return None # 如果没有找到问题,返回 None + def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]: # 使用正则表达式匹配测试用例,现在捕获函数名和任意输出 - pattern = r'>>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)' + pattern = r">>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)" matches = re.findall(pattern, docstring, re.DOTALL) - + test_cases = [] for match in matches: func_name, input_str, expected_output = match - + # 处理输入 input_list = [] - for item in input_str.split(','): + for item in input_str.split(","): item = item.strip() try: # 尝试将输入转换为数值类型 - if '.' in item: + if "." in item: input_list.append(float(item)) else: input_list.append(int(item)) except ValueError: # 如果无法转换为数值,则保留为字符串 input_list.append(item.strip("'\"")) - + # 处理输出 try: # 尝试将输出转换为数值或布尔值 - if expected_output.lower() == 'true': + if expected_output.lower() == "true": expected_output = True - elif expected_output.lower() == 'false': + elif expected_output.lower() == "false": expected_output = False - elif '.' in expected_output: + elif "." in expected_output: expected_output = float(expected_output) else: expected_output = int(expected_output) except ValueError: # 如果无法转换,则保留为字符串 expected_output = expected_output.strip("'\"") - + test_cases.append([func_name, input_list, expected_output]) - + return test_cases -async def llm_extract_test_case(id, problem_description: str, file_path:str="public_test.jsonl"): + +async def llm_extract_test_case(id, problem_description: str, file_path: str = "public_test.jsonl"): prompt = EXTRACT_CASE_PROMPT.format(problem_description=problem_description) node = await ActionNode.from_pydantic(TestCaseExtractOp).fill(context=prompt, llm=LLM()) result = node.instruct_content.model_dump() - with open(file_path,"a") as f: - f.write(json.dumps({id:result["test_cases"]}) + '\n') - return {id:result["test_cases"]} + with open(file_path, "a") as f: + f.write(json.dumps({id: result["test_cases"]}) + "\n") + return {id: result["test_cases"]} + def test_cases_2_test_functions(solution: str, test_cases: str): tester_function = f""" {solution} {test_cases} -""" - return tester_function \ No newline at end of file +""" + return tester_function diff --git a/metagpt/actions/action_node.py b/metagpt/actions/action_node.py index 20a73a433..059118637 100644 --- a/metagpt/actions/action_node.py +++ b/metagpt/actions/action_node.py @@ -474,53 +474,26 @@ class ActionNode: """ model_class = self.create_class() fields = model_class.model_fields - + # Assuming there's only one field in the model if len(fields) == 1: return next(iter(fields)) - + # If there are multiple fields, we might want to use self.key to find the right one return self.key - - async def code_fill( - self, - context, - function_name=None, - timeout=USE_CONFIG_TIMEOUT - ): + + async def code_fill(self, context, function_name=None, timeout=USE_CONFIG_TIMEOUT): """ fill CodeBlock Node """ - def extract_code_from_response(response): - """ - Extracts code wrapped in triple backticks from the response, - removing any language specifier. - - :param response: The full response from the LLM - :return: The extracted code, or None if no code is found - """ - code_pattern = r"```(?:\w+\n)?([\s\S]*?)```" - matches = re.findall(code_pattern, response) - - if matches: - # The first group in the regex contains the code without the language specifier - code = matches[0].strip() - return code - return None - - import re field_name = self.get_field_name() prompt = context - # print("generate prompt", "\n", prompt) content = await self.llm.aask(prompt, timeout=timeout) - # print("generate content", "\n", content) extracted_code = sanitize(code=content, entrypoint=function_name) - # extracted_code = extract_code_from_response(content) result = {field_name: extracted_code} - # print("final_result", "\n", result) return result - + async def messages_fill( self, ): @@ -540,7 +513,7 @@ class ActionNode: images: Optional[Union[str, list[str]]] = None, timeout=USE_CONFIG_TIMEOUT, exclude=[], - function_name: str = None + function_name: str = None, ): """Fill the node(s) with mode. diff --git a/metagpt/actions/code_sanitize.py b/metagpt/actions/code_sanitize.py index 958c712df..56422589c 100644 --- a/metagpt/actions/code_sanitize.py +++ b/metagpt/actions/code_sanitize.py @@ -4,28 +4,35 @@ @Time : 2024/7/24 16:37 @Author : didi @File : code_node.py +@Acknowledgement https://github.com/evalplus/evalplus/blob/master/evalplus/sanitize.py """ -import os import ast -import pathlib import traceback - +from enum import Enum from typing import Dict, Generator, List, Optional, Set, Tuple import tree_sitter_python -from tqdm import tqdm from tree_sitter import Language, Node, Parser -CLASS_TYPE = "class_definition" -FUNCTION_TYPE = "function_definition" -IMPORT_TYPE = ["import_statement", "import_from_statement"] -IDENTIFIER_TYPE = "identifier" -ATTRIBUTE_TYPE = "attribute" -RETURN_TYPE = "return_statement" -EXPRESSION_TYPE = "expression_statement" -ASSIGNMENT_TYPE = "assignment" + +class NodeType(Enum): + CLASS = "class_definition" + FUNCTION = "function_definition" + IMPORT = ["import_statement", "import_from_statement"] + IDENTIFIER = "identifier" + ATTRIBUTE = "attribute" + RETURN = "return_statement" + EXPRESSION = "expression_statement" + ASSIGNMENT = "assignment" + def traverse_tree(node: Node) -> Generator[Node, None, None]: + """ + Traverse the tree structure starting from the given node. + + :param node: The root node to start the traversal from. + :return: A generator object that yields nodes in the tree. + """ cursor = node.walk() depth = 0 @@ -43,6 +50,7 @@ def traverse_tree(node: Node) -> Generator[Node, None, None]: else: depth -= 1 + def syntax_check(code, verbose=False): try: ast.parse(code) @@ -52,6 +60,7 @@ def syntax_check(code, verbose=False): traceback.print_exc() return False + def code_extract(text: str) -> str: lines = text.split("\n") longest_line_pair = (0, 0) @@ -68,22 +77,25 @@ def code_extract(text: str) -> str: return "\n".join(lines[longest_line_pair[0] : longest_line_pair[1] + 1]) + def get_definition_name(node: Node) -> str: for child in node.children: - if child.type == IDENTIFIER_TYPE: + if child.type == NodeType.IDENTIFIER.value: return child.text.decode("utf8") - + + def has_return_statement(node: Node) -> bool: traverse_nodes = traverse_tree(node) for node in traverse_nodes: - if node.type == RETURN_TYPE: + if node.type == NodeType.RETURN.value: return True return False + def get_deps(nodes: List[Tuple[str, Node]]) -> Dict[str, Set[str]]: def dfs_get_deps(node: Node, deps: Set[str]) -> None: for child in node.children: - if child.type == IDENTIFIER_TYPE: + if child.type == NodeType.IDENTIFIER.value: deps.add(child.text.decode("utf8")) else: dfs_get_deps(child, deps) @@ -104,12 +116,23 @@ def get_function_dependency(entrypoint: str, call_graph: Dict[str, str]) -> Set[ if current not in call_graph: continue for neighbour in call_graph[current]: - if not (neighbour in visited): + if neighbour not in visited: visited.add(neighbour) queue.append(neighbour) return visited + def sanitize(code: str, entrypoint: Optional[str] = None) -> str: + """ + Sanitize and extract relevant parts of the given Python code. + This function parses the input code, extracts import statements, class and function definitions, + and variable assignments. If an entrypoint is provided, it only includes definitions that are + reachable from the entrypoint in the call graph. + + :param code: The input Python code as a string. + :param entrypoint: Optional name of a function to use as the entrypoint for dependency analysis. + :return: A sanitized version of the input code, containing only relevant parts. + """ code = code_extract(code) code_bytes = bytes(code, "utf8") parser = Parser(Language(tree_sitter_python.language())) @@ -123,30 +146,24 @@ def sanitize(code: str, entrypoint: Optional[str] = None) -> str: definition_nodes = [] for child in root_node.children: - if child.type in IMPORT_TYPE: + if child.type in NodeType.IMPORT.value: import_nodes.append(child) - elif child.type == CLASS_TYPE: + elif child.type == NodeType.CLASS.value: name = get_definition_name(child) - if not ( - name in class_names or name in variable_names or name in function_names - ): + if not (name in class_names or name in variable_names or name in function_names): definition_nodes.append((name, child)) class_names.add(name) - elif child.type == FUNCTION_TYPE: + elif child.type == NodeType.FUNCTION.value: name = get_definition_name(child) - if not ( - name in function_names or name in variable_names or name in class_names - ) and has_return_statement(child): + if not (name in function_names or name in variable_names or name in class_names) and has_return_statement( + child + ): definition_nodes.append((name, child)) function_names.add(get_definition_name(child)) - elif ( - child.type == EXPRESSION_TYPE and child.children[0].type == ASSIGNMENT_TYPE - ): + elif child.type == NodeType.EXPRESSION.value and child.children[0].type == NodeType.ASSIGNMENT.value: subchild = child.children[0] name = get_definition_name(subchild) - if not ( - name in variable_names or name in function_names or name in class_names - ): + if not (name in variable_names or name in function_names or name in class_names): definition_nodes.append((name, subchild)) variable_names.add(name) @@ -161,7 +178,7 @@ def sanitize(code: str, entrypoint: Optional[str] = None) -> str: for pair in definition_nodes: name, node = pair - if entrypoint and not (name in reacheable): + if entrypoint and name not in reacheable: continue sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n" return sanitized_output[:-1].decode("utf8") diff --git a/test.py b/test.py index 804b92fd6..3161a7368 100644 --- a/test.py +++ b/test.py @@ -4,7 +4,8 @@ # @Desc : test on humaneval graph import asyncio + from examples.ags.benchmark.humaneval import sample_generate, samples_generate -asyncio.run(sample_generate('HumanEval/id',result_path="result_path",mode="alpha")) -asyncio.run(samples_generate(mode='alpha',result_path="result_path")) +asyncio.run(sample_generate("HumanEval/id", result_path="result_path", mode="alpha")) +asyncio.run(samples_generate(mode="alpha_codium", result_path="result_path"))