Merge pull request #1428 from didiforgithub/action_graph

Update Action Graph Solver Version 0.1
This commit is contained in:
Alexander Wu 2024-08-01 15:40:01 +08:00 committed by GitHub
commit d867b60907
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 10194 additions and 0 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Date :
# @Author : issac
# @Desc : test on gsm8k

View file

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Date :
# @Author : issac
# @Desc : test on hotpotqa

View file

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# @Date : 7/7/2024 17:07 PM
# @Author : didi
# @Desc : test on human eval graph
import asyncio
import json
import os
import subprocess
import sys
from typing import Literal, Optional
import aiofiles
from evalplus.data import get_human_eval_plus
from examples.ags.w_action_node.graph import HumanEvalGraph
from examples.ags.w_action_node.operator import GenerateCode, GenerateCodeBlock
from examples.ags.w_action_node.utils import sort_json_by_key
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.utils.common import add_jsonl_file, read_json_file
from metagpt.utils.exceptions import handle_exception
generate_code = GenerateCode(llm=LLM())
generate_code_block = GenerateCodeBlock(llm=LLM())
solver = HumanEvalGraph(name="solver", llm=LLM(), criteria="correctness, efficiency, readability", vote_count=5)
ModeType = Literal["ags", "alpha_codium", "llm"]
async def llm_generate(id):
case = get_human_eval_plus()[f"{id}"]
solution_result = await generate_code_block(case["prompt"], case["entry_point"])
sample_dict = dict(task_id=case["task_id"], solution=solution_result["code_solution"])
return sample_dict
async def ags_generate(id, ensemble_count: int = 5):
case = get_human_eval_plus()[f"{id}"]
solution_result = await solver(case["prompt"], ensemble_count=ensemble_count)
sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"])
return sample_dict
async def alpha_codium_generate(id):
case = get_human_eval_plus()[f"{id}"]
solution_result = await solver.alpha_codium(case["task_id"], case["prompt"], ensemble_count=5)
sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"])
return sample_dict
async def route_generate(mode: ModeType, id: str):
if mode == "ags":
sample_dict = await ags_generate(id)
elif mode == "alpha_codium":
sample_dict = await alpha_codium_generate(id)
elif mode == "llm":
sample_dict = await llm_generate(id)
else:
raise ValueError(f"Invalid mode: {mode}")
return sample_dict
async def sample_generate(id, result_path: str = "samples.jsonl", mode: ModeType = "ags"):
sample_dict = await route_generate(mode, id)
add_jsonl_file(result_path, [sample_dict])
sort_json_by_key(result_path, result_path)
async def samples_generate(mode: ModeType, result_path: str = "samples.jsonl"):
ids = list(get_human_eval_plus().keys())
file_lock = asyncio.Lock()
async def solve_and_write(id: str, mode: ModeType) -> Optional[str]:
try:
sample_dict = await route_generate(mode, id)
except Exception:
return id
async with file_lock:
async with aiofiles.open(result_path, mode="a") as f:
await f.write(json.dumps(sample_dict) + "\n")
return None
tasks = [solve_and_write(id, mode) for id in ids]
results = await asyncio.gather(*tasks)
failed_tasks = [task_id for task_id in results if task_id is not None]
if failed_tasks:
logger.info(failed_tasks)
for task_id in failed_tasks:
try:
await sample_generate(task_id, result_path, mode)
failed_tasks.remove(task_id)
except Exception:
logger.error(f"{task_id} fail")
sort_json_by_key(result_path, result_path)
if not failed_tasks:
if automatic_evalplus(result_path):
eval_path = result_path[:-6] + "_eval_results.json"
unpassed_exapmle = extract_failure_tests(eval_path)
logger.info(unpassed_exapmle)
else:
logger.info(failed_tasks)
@handle_exception(exception_type=subprocess.CalledProcessError, exception_msg="sanitize error", default_return=None)
def automatic_sanitize(result_path: str = "samples.jsonl") -> Optional[str]:
"""
在命令行中自动执行 evalplus.sanitize --samples result_path
返回result_path前缀加上"-sanitized.jsonl"
"""
command = ["evalplus.sanitize", "--samples", result_path]
subprocess.run(command, check=True)
base_name = os.path.splitext(result_path)[0]
sanitized_path = f"{base_name}-sanitized.jsonl"
return sanitized_path
@handle_exception(
exception_type=subprocess.CalledProcessError,
exception_msg="Error in automatic_evalplus function",
default_return=False,
)
def automatic_evalplus(result_path: str = "samples.jsonl") -> bool:
"""
在命令行中自动执行 evalplus.evaluate --dataset humaneval --samples samples.jsonl --parallel 2 --base-only
"""
command = [
sys.executable, # 使用当前 Python 解释器
"-m",
"evalplus.evaluate",
"--dataset",
"humaneval",
"--samples",
result_path,
"--parallel",
"2",
"--base-only",
]
result = subprocess.run(command, check=True, capture_output=True, text=True)
logger.info(f"ouptput: \n {result.stdout}")
return True
def extract_failure_tests(file_path: str = "samples_eval_results.json"):
task_results = read_json_file(file_path)
failed_tests = []
for task in task_results["eval"].values():
if task[0]["base_status"] == "fail":
failed_test = {
"task_id": task[0]["task_id"],
}
failed_tests.append(failed_test)
logger.info(f"length of failed tests: {len(failed_tests)}")
return failed_tests

View file

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : graph & an instance - humanevalgraph
from typing import List
from evalplus.data import get_human_eval_plus
from examples.ags.w_action_node.operator import (
FuEnsemble,
Generate,
GenerateCode,
GenerateCodeBlock,
MdEnsemble,
Rephrase,
Review,
Revise,
Test,
)
from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl
from metagpt.llm import LLM
class Graph:
def __init__(self, name: str, llm: LLM) -> None:
self.name = name
self.model = llm
def __call__():
NotImplementedError("Subclasses must implement __call__ method")
def optimize(dataset: List):
pass
class HumanEvalGraph(Graph):
def __init__(self, name: str, llm: LLM, criteria: str, vote_count: int = 5) -> None:
super().__init__(name, llm)
self.criteria = criteria # TODO 自动构建图时,图的初始参数与图所使用的算子要求的外部参数相匹配
self.generate_code = GenerateCode(llm=llm)
self.generate_code_block = GenerateCodeBlock(llm=llm)
self.review = Review(llm=llm, criteria=criteria)
self.revise = Revise(llm=llm)
self.rephrase = Rephrase(llm=llm)
self.tester = Test(llm=llm)
self.fuensemble = FuEnsemble(llm=llm)
self.mdensemble = MdEnsemble(llm=llm, vote_count=vote_count)
async def __call__(self, problem: str, ensemble_count: int = 3):
solution_list = []
for _ in range(ensemble_count):
solution = await self.generate_code_block(problem)
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.mdensemble("code", solution_list, problem)
return solution
async def alpha_codium(self, problem_id: str, problem: str, ensemble_count: int = 3):
"""
Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering
Link: https://arxiv.org/abs/2404.14963
Flow: An incomplete version of alpha codium, implementing the basic process of rephrase -> code ensemble -> tes
"""
test_cases = extract_test_cases_from_jsonl(problem_id)
entry_point = get_human_eval_plus()[problem_id]["entry_point"]
rephrase_problem = await self.rephrase(problem) # 在rephrase 中拼接原始的问题描述
solution_list = []
for _ in range(ensemble_count):
solution = await self.generate_code_block.rephrase_generate(
problem, rephrase_problem, function_name=entry_point
)
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.mdensemble("code", solution_list, problem)
solution = await self.tester(problem_id, problem, rephrase_problem, solution, test_cases)
return solution
async def review_revise_ensemble(self, problem: str, ensemble_count: int = 2, revise_round: int = 3):
solution_list = []
for _ in range(ensemble_count):
solution = await self.single_solve(problem, revise_round)
solution_list.append(solution)
solution = await self.ensemble(solution_list, problem)
return solution
async def simple_ensemble(self, problem: str, ensemble_count: int = 3):
solution_list = []
for _ in range(ensemble_count):
solution = await self.generate_code(problem)
# solution = await self.generate_code_block(problem)
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.fuensemble(solution_list, problem)
return solution
async def single_solve(self, problem: str, max_loop: int):
solution = await self.generate_code(problem)
solution = solution.get("code_solution")
for _ in range(max_loop):
review_feedback = await self.review(problem, solution)
if review_feedback["review_result"]:
break
solution = await self.revise(problem, solution, review_feedback["feedback"])
solution = solution.get("revised_solution")
return solution
class Gsm8kGraph(Graph):
def __init__(self, name: str, llm: LLM) -> None:
super().__init__(name, llm)
self.generate = Generate(llm=llm)
self.rephrase = Rephrase(llm=llm)
async def __call__(self, problem: str):
solution = self.generate(problem)
return solution
class HotpotQAGraph(Graph):
def __init__(self, name: str, llm: LLM) -> None:
super().__init__(name, llm)
self.generate = Generate(llm=llm)
self.rephrase = Rephrase(llm=llm)
async def __call__(self, problem: str):
solution = self.generate(problem)
return solution

View file

@ -0,0 +1,429 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from tenacity import retry, stop_after_attempt
from examples.ags.w_action_node.operator_an import (
FuEnsembleOp,
GenerateCodeBlockOp,
GenerateCodeOp,
GenerateOp,
MdEnsembleOp,
ReflectionTestOp,
RephraseOp,
ReviewOp,
ReviseOp,
)
from examples.ags.w_action_node.prompt import (
DE_ENSEMBLE_ANGEL_PROMPT,
DE_ENSEMBLE_CODE_FORMAT_PROMPT,
DE_ENSEMBLE_DEVIL_PROMPT,
DE_ENSEMBLE_JUDGE_FINAL_PROMPT,
DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT,
DE_ENSEMBLE_TXT_FORMAT_PROMPT,
FU_ENSEMBLE_PROMPT,
GENERATE_CODE_PROMPT,
GENERATE_CODEBLOCK_PROMPT,
GENERATE_CODEBLOCK_REPHRASE_PROMPT,
GENERATE_PROMPT,
MD_ENSEMBLE_PROMPT,
REFLECTION_ON_PUBLIC_TEST_PROMPT,
REPHRASE_ON_PROBLEM_PROMPT,
REVIEW_PROMPT,
REVISE_PROMPT,
)
from examples.ags.w_action_node.utils import test_cases_2_test_functions
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Generate(Operator):
def __init__(self, name: str = "Generate", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description):
prompt = GENERATE_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class GenerateCode(Operator):
def __init__(self, name: str = "GenerateCode", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description):
prompt = GENERATE_CODE_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(GenerateCodeOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class GenerateCodeBlock(Operator):
def __init__(self, name: str = "GenerateCodeBlock", llm: LLM = LLM()):
super().__init__(name, llm)
@retry(stop=stop_after_attempt(3))
async def __call__(self, problem_description, function_name):
prompt = GENERATE_CODEBLOCK_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(
context=prompt, llm=self.llm, mode="code_fill", function_name=function_name
)
response = node.instruct_content.model_dump()
return response
@retry(stop=stop_after_attempt(3))
async def rephrase_generate(self, problem_description, rephrase_problem, function_name):
prompt = GENERATE_CODEBLOCK_REPHRASE_PROMPT.format(
problem_description=problem_description, rephrase_problem=rephrase_problem
)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(
context=prompt, llm=self.llm, mode="code_fill", function_name=function_name
)
response = node.instruct_content.model_dump()
return response
class Review(Operator):
def __init__(self, criteria, name: str = "Review", llm: LLM = LLM()):
self.criteria = criteria
super().__init__(name, llm)
async def __call__(self, problem_description, solution):
prompt = REVIEW_PROMPT.format(
problem_description=problem_description, solution=solution, criteria=self.criteria
)
node = await ActionNode.from_pydantic(ReviewOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class Revise(Operator):
def __init__(self, name: str = "Revise", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description, solution, feedback):
prompt = REVISE_PROMPT.format(problem_description=problem_description, solution=solution, feedback=feedback)
node = await ActionNode.from_pydantic(ReviseOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class FuEnsemble(Operator):
"""
Function: Critically evaluating multiple solution candidates, synthesizing their strengths, and developing an enhanced, integrated solution.
"""
def __init__(self, name: str = "FuEnsemble", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, solutions: List, problem_description):
solution_text = ""
for solution in solutions:
solution_text += str(solution) + "\n"
prompt = FU_ENSEMBLE_PROMPT.format(solutions=solution_text, problem_description=problem_description)
node = await ActionNode.from_pydantic(FuEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class MdEnsemble(Operator):
"""
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, name: str = "MdEnsemble", llm: LLM = LLM(), vote_count: int = 3):
super().__init__(name, llm)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
random.shuffle(shuffled_solutions)
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solution_type: str, solutions: List[str], problem_description: str):
all_responses = []
# 当Ensmeble方案是Code类型时我们使用AST进行去重
if solution_type == "code":
unique_structures = {}
updated_solutions = []
for solution in solutions:
try:
tree = ast.parse(solution)
structure_key = ast.dump(tree, annotate_fields=False, include_attributes=False)
if structure_key not in unique_structures:
unique_structures[structure_key] = solution
updated_solutions.append(solution)
except SyntaxError:
# If the solution has a syntax error, we'll skip it
continue
solutions = updated_solutions
updated_length = len(solutions)
if updated_length == 1:
return {"final_solution": solutions[0]}
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, problem_description=problem_description)
node = await ActionNode.from_pydantic(MdEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
# print(f"original index: {original_index}")
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"final_solution": final_answer}
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
"""
pass
class MADEnsemble(Operator):
"""
Paper: Should we be going MAD? A Look at Multi-Agent Debate Strategies for LLMs
Link: https://arxiv.org/abs/2311.17371
"""
def __init__(self, name: str = "DebateEnsemble", llm: LLM = LLM()):
super().__init__(name, llm)
self.agents = ["angel", "devil", "judge"]
self.format_requirements = {"txt": DE_ENSEMBLE_TXT_FORMAT_PROMPT, "code": DE_ENSEMBLE_CODE_FORMAT_PROMPT}
def get_system_prompt(self, name: str, mode: str = "txt"):
if name == "angel":
if mode == "code":
return DE_ENSEMBLE_ANGEL_PROMPT + "\n" + DE_ENSEMBLE_CODE_FORMAT_PROMPT
return DE_ENSEMBLE_ANGEL_PROMPT + "\n" + DE_ENSEMBLE_TXT_FORMAT_PROMPT
elif name == "devil":
if mode == "code":
return DE_ENSEMBLE_DEVIL_PROMPT + "\n" + DE_ENSEMBLE_CODE_FORMAT_PROMPT
return DE_ENSEMBLE_DEVIL_PROMPT + "\n" + DE_ENSEMBLE_TXT_FORMAT_PROMPT
elif name == "judge":
if mode == "final":
return DE_ENSEMBLE_JUDGE_FINAL_PROMPT
return DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT
def construct_messages(self, message_history_with_name, name, mode: str = "txt", phase: str = "universal"):
"""
基于name与mode来构建system message.
基于name来构建messages
"""
messages = []
messages.append({"role": "system", "content": self.get_system_prompt(name, mode)})
if name in ["angel", "devil"]:
messages = self._construct_debate(message_history_with_name, name, messages)
elif name == "judge":
messages = self._construct_judge(message_history_with_name, mode, messages)
return messages
def _construct_debate(self, message_history_with_name, name, messages):
user_message = ""
for message in message_history_with_name:
if message["name"] == "Judge":
continue
elif message["name"] == name:
if user_message:
messages.append(
{
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
}
)
messages.append(
{
"role": "assistant",
"name": name,
"content": message["content"],
}
)
user_message = ""
else:
user_message += message["content"]
if user_message:
messages.append(
{
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
}
)
return messages
def _construct_judge(self, message_history_with_name, mode, messages):
pass
async def debate_answer(self, message_history: List, role: str = "angel"):
messages = self.construct_messages(message_history, role)
response = await self.llm.acompletion_text(messages=messages)
message_history.append({"role": "user", "name": role, "content": response})
return message_history, response
async def judge_answer(self, message_history: List, phase: str = "universal"):
messages = self.construct_messages(message_history, "judge", phase=phase)
response = await self.llm.acompletion_text(messages=messages)
message_history.append({"role": "user", "name": "judge", "content": response})
return message_history, response
async def __call__(self, origin_solution: str, problem_description: str, max_round: int = 3, mode: str = "txt"):
# 思路输入一个原始答案构建一个agent代表这个答案进行辩论另一个agentdevil使用debate llm的内容进行辩论法官在每一轮次做出决定是否终止到了maxround还没终止就由法官进行总结。
message_history_with_name = [{"role": "user", "name": "angel", "content": origin_solution}]
for index in range(max_round):
for agent in self.agents:
if agent == "angel":
if index == 0:
pass
message_history_with_name, rsp = self.debate_answer(message_history_with_name, role="angel")
elif agent == "devil":
message_history_with_name, rsp = self.debate_answer(message_history_with_name, role="devil")
elif agent == "judge":
message_history_with_name, judge_result = self.judge_answer(
message_history_with_name, phase="universal"
)
if not judge_result["is_debating"]:
"""
这里需要在 self.judge_answer 中设置一个自动给出solution的地方
"""
return {"final_solution": judge_result["final_solution"]}
message_history_with_name.pop(-1)
message_history_with_name, judge_answer = self.judge_answer(message_history_with_name, phase="final")
return {"final_solution": judge_answer["debate_answer"]}
class Rephrase(Operator):
"""
Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering
Link: https://arxiv.org/abs/2404.14963
Paper: Achieving >97% on GSM8K: Deeply Understanding the Problems Makes LLMs Better Solvers for Math Word Problems
Link: https://arxiv.org/abs/2404.14963
"""
def __init__(self, name: str = "Rephrase", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description: str) -> str:
prompt = REPHRASE_ON_PROBLEM_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(RephraseOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response["rephrased_problem"]
class Test(Operator):
def __init__(self, name: str = "Test", llm: LLM = LLM()):
super().__init__(name, llm)
def exec_code(self, solution, test_cases, problem_id):
# TODO
# 1. 获取更加详细的Test error信息
# 2. 更换Public Test数据集当前使用的数据存在Label Leak(使用的Reflexion的数据集) -> 这个问题使用LLM抽取解决直接生成为assert代码串
# 3. 实现单独测试每一个test case -> 1
solution = solution["final_solution"]
test_code = test_cases_2_test_functions(solution, test_cases)
try:
exec(test_code, globals())
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
with open("tester.txt", "a") as f:
f.write("test_error" + problem_id + "\n")
error_infomation = {
"test_fail_case": {"error_type": "AssertionError", "error_message": str(e), "traceback": tb_str}
}
logger.info(f"test error: {error_infomation}")
return error_infomation
except Exception as e:
with open("tester.txt", "a") as f:
f.write(problem_id + "\n")
return {"exec_fail_case": str(e)}
return []
async def __call__(self, problem_id, problem, rephrase_problem, solution, test_cases):
result = self.exec_code(solution, test_cases, problem_id)
if result == []:
return solution
elif "exec_fail_case" in result:
result = result["exec_fail_case"]
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem_description=problem,
rephrase_problem=rephrase_problem,
code_solution=solution,
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return {"final_solution": response["refined_solution"]}
else:
result = result["test_fail_case"]
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem_description=problem,
rephrase_problem=rephrase_problem,
code_solution=solution,
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return {"final_solution": response["refined_solution"]}
class FindFact(Operator):
def __init__(self, name: str = "FindFact", llm: LLM = LLM()):
super().__init__(name, llm)
class SelfAsk(Operator):
def __init__(self, name: str = "SelfAsk", llm: LLM = LLM()):
super().__init__(name, llm)
class Verify(Operator):
def __init__(self, name: str = "Verify", llm: LLM = LLM()):
super().__init__(name, llm)

View file

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Your Solution for this problem")
class GenerateCodeOp(BaseModel):
code_solution: str = Field(default="", description="Complete and correct code here.")
class GenerateCodeBlockOp(BaseModel):
code_solution: str = Field(default="", description="Your complete code solution for this problem")
class ReviewOp(BaseModel):
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
class ReviseOp(BaseModel):
revised_solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
class FuEnsembleOp(BaseModel):
thought: str = Field(
default="",
description="Analyze the solutions and think how to combine the advantages of various solutions to form the best possible solution.",
)
final_solution: str = Field(default="", description="Output the final solution after analysis and integration")
class MdEnsembleOp(BaseModel):
thought: str = Field(
default="""Example thought process:
1. Examined the 'compare_one' function.
2. The function correctly handles both numeric and string inputs by converting strings to floats.
3. It properly compares two values and returns the larger one.
4. The function returns None if the values are equal, which might be useful in some contexts but could be improved by returning either value.
5. The use of 'isinstance' for type checking is a good practice.
6. The function handles decimal separators well by replacing ',' with '.'.
Overall, this solution effectively solves the problem of comparing two values, with good error handling and flexibility. It could be improved by specifying behavior for equal values, but it's a strong solution as is.""",
description="Step-by-step analysis of the solutions to determine the best one.",
)
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class TestCaseExtractOp(BaseModel):
test_cases: list = Field(
default=[
"assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True",
"assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False",
"",
],
description="Extracted test cases from the problem description",
)
class RephraseOp(BaseModel):
rephrased_problem: str = Field(default="", description="Rephrased problem description for this problem")
class ReflectionTestOp(BaseModel):
reflection: str = Field(
default="", description="Step-by-step reflection on code execution errors or test case failures"
)
refined_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)

View file

@ -0,0 +1,200 @@
# -*- coding: utf-8 -*-
# @Date : 6/26/2024 17:07 PM
# @Author : didi
# @Desc : prompts of operators
GENERATE_PROMPT = """
Generate Solution for the following problem: {problem_description}
"""
GENERATE_CODE_PROMPT = """
You are an expert programmer tasked with solving a coding problem.
### Problem Description
{problem_description}
### Instructions
The above is an incomplete Python code fragment. Return the complete and correct code with no additional text.
Please maintain the JSON format in your response.
### Your Response
"""
GENERATE_CODEBLOCK_REPHRASE_PROMPT = """
Please provide a self-contained Python script that solves the following problem in a markdown code block:
### Problem Description
{problem_description}
### self reflection on the problem
{rephrase_problem}
When creating your solution:
1. Consider all edge cases and boundary conditions.
2. Avoid oversimplification - address all aspects of the problem.
3. Ensure your logic covers all stated requirements.
4. Avoid adding additional test cases beyond those provided in the problem description.
"""
GENERATE_CODEBLOCK_PROMPT = """
Please provide a self-contained Python script that solves the following problem in a markdown code block:
{problem_description}
When creating your solution:
1. Consider all edge cases and boundary conditions.
2. Avoid oversimplification - address all aspects of the problem.
3. Ensure your logic covers all stated requirements.
4. Avoid adding additional test cases beyond those provided in the problem description.
"""
REVIEW_PROMPT = """
For the question described as {problem_description},
please review the following solution: {solution}, and provide a review result in boolean format.
If you believe the solution is capable of resolving the issue, return True; otherwise, return False, and include your comments
"""
REVISE_PROMPT = """
For the question described as {problem_description},
please evaluate and revise the solution provided: {solution}, taking into account the review feedbacks: {feedback}."
Then output the revised solution.
"""
FU_ENSEMBLE_PROMPT = """
### Given problem
{problem_description}
### We've got a list of solutions
<solutions>
{solutions}
</solutions>
### Instructions
Based on the given problem and solution candidates:
1. Analyze the pros and cons of each candidate solution
2. Consider how to integrate reasonable parts from different solutions
3. Formulate a more comprehensive and effective solution
"""
MD_ENSEMBLE_PROMPT = """
You are given a coding problem:
{problem_description}
Here is a list of possible solutions to the problem:
{solutions}
Using the inputs above, your goal is to choose the best solution to the code contest problem.
Don't just pick the most efficient solution. The main consideration is that the solution can fully solve the problem in a correct and robust manner.
Provide your final decision by writing the chosen solution letter (e.g., B).
Please maintain the JSON format in your response.
"""
DE_ENSEMBLE_TXT_FORMAT_PROMPT = """
Now please output your answer in json format, with the format as follows:
{\"Reason\": \"\", \"debate_answer\": \"the capital letter corresponding to the answer\"}.
Please strictly output in JSON format, do not output irrelevant content. """
DE_ENSEMBLE_CODE_FORMAT_PROMPT = """
Now please output your answer in json format, with the format as follows:
{
"reason":"<why do it this way>",
"code_solution":"<the solution you think is appropriate, expressed in code>"
}
Please strictly output in JSON format, do not output irrelevant content. """
DE_ENSEMBLE_ANGEL_PROMPT = """
Do you agree with my perspective? Please provide your reasons and answer.
"""
DE_ENSEMBLE_DEVIL_PROMPT = """
You agree with my answer 90% of the time and have almost no reservations. Affirm your agreement, share any additional thoughts if you have them, and conclude with the capital letter corresponding to your answer at the end of your response.
"""
DE_ENSEMBLE_JUDGE_FINAL_PROMPT = """
You, as the moderator, will evaluate both sides' answers and determine your
preference for an answer candidate. Please summarize your reasons for supporting affirmative/negative side and
give the final answer that you think is correct to conclude the debate. Now please output your answer in json format, with the format as follows:
{\"Reason\": \"\", \"debate_answer\": \"the capital letter corresponding to the answer\"}.
Please strictly output in JSON format, do not output irrelevant content.
"""
DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT = """
You, as the moderator, will evaluate both sides' answers and determine if there is a clear
preference for an answer candidate. If so, please summarize your reasons for supporting affirmative/negative side and
give the final answer that you think is correct, and the debate will conclude. If not, the debate will continue to
the next round. Now please output your answer in json format, with the format as follows:
{\"Whether there is a preference\": \"Yes or No\", \"Supported Side\": \"Affirmative or Negative\",
\"Reason\": \"\", \"debate_answer\": \"the capital letter corresponding to the answer\"}.
Please strictly output in JSON format, do not output irrelevant content
"""
REPHRASE_ON_PROBLEM_PROMPT = """
You are given a code contest problem:
### problem
{problem_description}
### instrcutions
Given the code contest problem, Your Goal is:
Reflect on the problem, and describe it in your own words, in bullet points. Pay attention to small details, nuances, notes and examples in the problem description.
"""
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
You are given a code contest problem, and a self-reflection on the problem:
### problem
{problem_description}
### self reflection on the problem
{rephrase_problem}
A Python code solution was generated for the problem:
### Code Solution
{code_solution}
This section of the code execution result is
### Execution Result
{exec_pass}
However, when running the following input example, the code solution above failed to produce the expected output:
#### Failed Test Case
{test_fail}
Your goal is to analyze the code solution and the error, and propose a fixed code which will produce the expected output for the provided test input.
The fixed code should keep the solution robust, and work for all other input examples as well.
Make sure the fixed code has a reasonable runtime - less than three seconds on a modern computer, given the problem constraints for large input.
"""
EXTRACT_CASE_PROMPT = """
You are given a coding problem, and you need to extract the test cases from the problem description.
## Problem Description
{problem_description}
Your task is to extract test cases from the above description and convert them into Python assert statements (as strings). These statements should be returned in a list for testing purposes.
Example:
Input:
>>> has_close_elements([1.0, 2.0, 3.0], 0.5)
False
>>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
True
Output:
[
"assert candidate([1.0, 2.0, 3.0], 0.5) == False",
"assert candidate([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3) == True"
]
Please ensure that:
1. Each test case is converted to a separate assert statement.
2. The function name in the original example (e.g., 'has_close_elements') is replaced with 'candidate'.
3. The assert statements are returned as strings in a list.
"""

View file

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
# @Date : 7/2/2024 17:36 PM
# @Author : didi
# @Desc : utils for experiment
import ast
import json
import re
from typing import Any, List, Tuple
from examples.ags.w_action_node.operator_an import TestCaseExtractOp
from examples.ags.w_action_node.prompt import EXTRACT_CASE_PROMPT
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
def extract_task_id(task_id: str) -> int:
"""Extract the numeric part of the task_id."""
match = re.search(r"/(\d+)", task_id)
return int(match.group(1)) if match else 0
def sort_json_by_key(input_file: str, output_file: str, key: str = "task_id"):
"""
Read a JSONL file, sort the entries based on task_id, and write to a new JSONL file.
:param input_file: Path to the input JSONL file
:param output_file: Path to the output JSONL file
"""
# Read and parse the JSONL file
with open(input_file, "r") as f:
data = [json.loads(line) for line in f]
# Sort the data based on the numeric part of task_id
sorted_data = sorted(data, key=lambda x: extract_task_id(x[key]))
# Write the sorted data to a new JSONL file
with open(output_file, "w") as f:
for item in sorted_data:
f.write(json.dumps(item) + "\n")
def parse_python_literal(s):
try:
return ast.literal_eval(s)
except (ValueError, SyntaxError):
return s
def extract_test_cases_from_jsonl(problem_id: str, file_path: str = "public_test_reflexion.jsonl"):
# 保留原有的硬编码测试用例
hardcoded_cases = {
"HumanEval/32": "",
"HumanEval/38": "",
"HumanEval/50": "",
}
# 检查是否有硬编码的测试用例
if problem_id in hardcoded_cases:
return hardcoded_cases[problem_id]
# 如果没有硬编码的测试用例,从文件中读取
with open(file_path, "r") as file:
for line in file:
data = json.loads(line)
if data.get("id") == problem_id:
return data.get("test")
return None # 如果没有找到问题,返回 None
def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]:
# 使用正则表达式匹配测试用例,现在捕获函数名和任意输出
pattern = r">>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)"
matches = re.findall(pattern, docstring, re.DOTALL)
test_cases = []
for match in matches:
func_name, input_str, expected_output = match
# 处理输入
input_list = []
for item in input_str.split(","):
item = item.strip()
try:
# 尝试将输入转换为数值类型
if "." in item:
input_list.append(float(item))
else:
input_list.append(int(item))
except ValueError:
# 如果无法转换为数值,则保留为字符串
input_list.append(item.strip("'\""))
# 处理输出
try:
# 尝试将输出转换为数值或布尔值
if expected_output.lower() == "true":
expected_output = True
elif expected_output.lower() == "false":
expected_output = False
elif "." in expected_output:
expected_output = float(expected_output)
else:
expected_output = int(expected_output)
except ValueError:
# 如果无法转换,则保留为字符串
expected_output = expected_output.strip("'\"")
test_cases.append([func_name, input_list, expected_output])
return test_cases
async def llm_extract_test_case(id, problem_description: str, file_path: str = "public_test.jsonl"):
prompt = EXTRACT_CASE_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(TestCaseExtractOp).fill(context=prompt, llm=LLM())
result = node.instruct_content.model_dump()
with open(file_path, "a") as f:
f.write(json.dumps({id: result["test_cases"]}) + "\n")
return {id: result["test_cases"]}
def test_cases_2_test_functions(solution: str, test_cases: str):
tester_function = f"""
{solution}
{test_cases}
"""
return tester_function

View file

@ -17,6 +17,7 @@ from pydantic import BaseModel, Field, create_model, model_validator
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action_outcls_registry import register_action_outcls
from metagpt.actions.code_sanitize import sanitize
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.llm import BaseLLM
from metagpt.logs import logger
@ -37,10 +38,12 @@ class ReviseMode(Enum):
TAG = "CONTENT"
MODE_CODE_FILL = "code_fill"
LANGUAGE_CONSTRAINT = "Language: Please use the same language as Human INPUT."
FORMAT_CONSTRAINT = f"Format: output wrapped inside [{TAG}][/{TAG}] like format example, nothing else."
SIMPLE_TEMPLATE = """
## context
{context}
@ -464,6 +467,41 @@ class ActionNode:
return self
def get_field_name(self):
"""
Get the field name from the Pydantic model associated with this ActionNode.
"""
model_class = self.create_class()
fields = model_class.model_fields
# Assuming there's only one field in the model
if len(fields) == 1:
return next(iter(fields))
# If there are multiple fields, we might want to use self.key to find the right one
return self.key
async def code_fill(self, context, function_name=None, timeout=USE_CONFIG_TIMEOUT):
"""
fill CodeBlock Node
"""
field_name = self.get_field_name()
prompt = context
content = await self.llm.aask(prompt, timeout=timeout)
extracted_code = sanitize(code=content, entrypoint=function_name)
result = {field_name: extracted_code}
return result
async def messages_fill(
self,
):
"""
参考这个代码只不过LLM调用方式改成使用
参考
"""
pass
async def fill(
self,
context,
@ -474,6 +512,7 @@ class ActionNode:
images: Optional[Union[str, list[str]]] = None,
timeout=USE_CONFIG_TIMEOUT,
exclude=[],
function_name: str = None,
):
"""Fill the node(s) with mode.
@ -500,6 +539,11 @@ class ActionNode:
if self.schema:
schema = self.schema
if mode == MODE_CODE_FILL:
result = await self.code_fill(context, function_name, timeout)
self.instruct_content = self.create_class()(**result)
return self
if strgy == "simple":
return await self.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
elif strgy == "complex":

View file

@ -0,0 +1,184 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/7/24 16:37
@Author : didi
@File : code_node.py
@Acknowledgement https://github.com/evalplus/evalplus/blob/master/evalplus/sanitize.py
"""
import ast
import traceback
from enum import Enum
from typing import Dict, Generator, List, Optional, Set, Tuple
import tree_sitter_python
from tree_sitter import Language, Node, Parser
class NodeType(Enum):
CLASS = "class_definition"
FUNCTION = "function_definition"
IMPORT = ["import_statement", "import_from_statement"]
IDENTIFIER = "identifier"
ATTRIBUTE = "attribute"
RETURN = "return_statement"
EXPRESSION = "expression_statement"
ASSIGNMENT = "assignment"
def traverse_tree(node: Node) -> Generator[Node, None, None]:
"""
Traverse the tree structure starting from the given node.
:param node: The root node to start the traversal from.
:return: A generator object that yields nodes in the tree.
"""
cursor = node.walk()
depth = 0
visited_children = False
while True:
if not visited_children:
yield cursor.node
if not cursor.goto_first_child():
depth += 1
visited_children = True
elif cursor.goto_next_sibling():
visited_children = False
elif not cursor.goto_parent() or depth == 0:
break
else:
depth -= 1
def syntax_check(code, verbose=False):
try:
ast.parse(code)
return True
except (SyntaxError, MemoryError):
if verbose:
traceback.print_exc()
return False
def code_extract(text: str) -> str:
lines = text.split("\n")
longest_line_pair = (0, 0)
longest_so_far = 0
for i in range(len(lines)):
for j in range(i + 1, len(lines)):
current_lines = "\n".join(lines[i : j + 1])
if syntax_check(current_lines):
current_length = sum(1 for line in lines[i : j + 1] if line.strip())
if current_length > longest_so_far:
longest_so_far = current_length
longest_line_pair = (i, j)
return "\n".join(lines[longest_line_pair[0] : longest_line_pair[1] + 1])
def get_definition_name(node: Node) -> str:
for child in node.children:
if child.type == NodeType.IDENTIFIER.value:
return child.text.decode("utf8")
def has_return_statement(node: Node) -> bool:
traverse_nodes = traverse_tree(node)
for node in traverse_nodes:
if node.type == NodeType.RETURN.value:
return True
return False
def get_deps(nodes: List[Tuple[str, Node]]) -> Dict[str, Set[str]]:
def dfs_get_deps(node: Node, deps: Set[str]) -> None:
for child in node.children:
if child.type == NodeType.IDENTIFIER.value:
deps.add(child.text.decode("utf8"))
else:
dfs_get_deps(child, deps)
name2deps = {}
for name, node in nodes:
deps = set()
dfs_get_deps(node, deps)
name2deps[name] = deps
return name2deps
def get_function_dependency(entrypoint: str, call_graph: Dict[str, str]) -> Set[str]:
queue = [entrypoint]
visited = {entrypoint}
while queue:
current = queue.pop(0)
if current not in call_graph:
continue
for neighbour in call_graph[current]:
if neighbour not in visited:
visited.add(neighbour)
queue.append(neighbour)
return visited
def sanitize(code: str, entrypoint: Optional[str] = None) -> str:
"""
Sanitize and extract relevant parts of the given Python code.
This function parses the input code, extracts import statements, class and function definitions,
and variable assignments. If an entrypoint is provided, it only includes definitions that are
reachable from the entrypoint in the call graph.
:param code: The input Python code as a string.
:param entrypoint: Optional name of a function to use as the entrypoint for dependency analysis.
:return: A sanitized version of the input code, containing only relevant parts.
"""
code = code_extract(code)
code_bytes = bytes(code, "utf8")
parser = Parser(Language(tree_sitter_python.language()))
tree = parser.parse(code_bytes)
class_names = set()
function_names = set()
variable_names = set()
root_node = tree.root_node
import_nodes = []
definition_nodes = []
for child in root_node.children:
if child.type in NodeType.IMPORT.value:
import_nodes.append(child)
elif child.type == NodeType.CLASS.value:
name = get_definition_name(child)
if not (name in class_names or name in variable_names or name in function_names):
definition_nodes.append((name, child))
class_names.add(name)
elif child.type == NodeType.FUNCTION.value:
name = get_definition_name(child)
if not (name in function_names or name in variable_names or name in class_names) and has_return_statement(
child
):
definition_nodes.append((name, child))
function_names.add(get_definition_name(child))
elif child.type == NodeType.EXPRESSION.value and child.children[0].type == NodeType.ASSIGNMENT.value:
subchild = child.children[0]
name = get_definition_name(subchild)
if not (name in variable_names or name in function_names or name in class_names):
definition_nodes.append((name, subchild))
variable_names.add(name)
if entrypoint:
name2deps = get_deps(definition_nodes)
reacheable = get_function_dependency(entrypoint, name2deps)
sanitized_output = b""
for node in import_nodes:
sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n"
for pair in definition_nodes:
name, node = pair
if entrypoint and name not in reacheable:
continue
sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n"
return sanitized_output[:-1].decode("utf8")

View file

@ -581,6 +581,30 @@ def write_json_file(json_file: str, data: list, encoding: str = None, indent: in
json.dump(data, fout, ensure_ascii=False, indent=indent, default=to_jsonable_python)
def read_jsonl_file(jsonl_file: str, encoding="utf-8") -> list[dict]:
if not Path(jsonl_file).exists():
raise FileNotFoundError(f"json_file: {jsonl_file} not exist, return []")
datas = []
with open(jsonl_file, "r", encoding=encoding) as fin:
try:
for line in fin:
data = json.loads(line)
datas.append(data)
except Exception:
raise ValueError(f"read jsonl file: {jsonl_file} failed")
return datas
def add_jsonl_file(jsonl_file: str, data: list[dict], encoding: str = None):
folder_path = Path(jsonl_file).parent
if not folder_path.exists():
folder_path.mkdir(parents=True, exist_ok=True)
with open(jsonl_file, "a", encoding=encoding) as fout:
for json_item in data:
fout.write(json.dumps(json_item) + "\n")
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
"""
Reads in a csv file to a list of list. If header is True, it returns a

11
test.py Normal file
View file

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 18:00 PM
# @Author : didi
# @Desc : test on humaneval graph
import asyncio
from examples.ags.benchmark.humaneval import sample_generate, samples_generate
asyncio.run(sample_generate("HumanEval/id", result_path="result_path", mode="alpha_codium"))
asyncio.run(samples_generate(mode="alpha_codium", result_path="result_path"))