This commit is contained in:
didi 2024-08-01 14:56:42 +08:00
parent 3fc3d217a8
commit bdfa6eb512
19 changed files with 3381 additions and 1409 deletions

52
10.txt Normal file
View file

@ -0,0 +1,52 @@
2024-07-01 15:30:33.806 | DEBUG | metagpt.provider.base_llm:aask:151 - [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': '\n## context\n\nGenerate Code Solution for the following problem: \n\ndef is_palindrome(string: str) -> bool:\n """ Test if given string is a palindrome """\n return string == string[::-1]\n\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n >>> make_palindrome(\'\')\n \'\'\n >>> make_palindrome(\'cat\')\n \'catac\'\n >>> make_palindrome(\'cata\')\n \'catac\'\n """\n\n\n\n-----\n\n## format example\n[CONTENT]\n{\n "solution": ""\n}\n[/CONTENT]\n\n## nodes: "<node>: <type> # <instruction>"\n- solution: <class \'str\'> # Your Code Solution for this problem\n\n\n## constraint\nLanguage: Please use the same language as Human INPUT.\nFormat: output wrapped inside [CONTENT][/CONTENT] like format example, nothing else.\n\n## action\nFollow instructions of nodes, generate output and make sure it follows the format example.\n'}]
2024-07-01 15:30:42.412 | INFO | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.000 | Max budget: $10.000 | Current cost: $0.000, prompt_tokens: 318, completion_tokens: 175
2024-07-01 15:30:42.413 | DEBUG | metagpt.actions.action_node:_aask_v1:421 - llm raw output:
[CONTENT]
{
"solution": "def make_palindrome(string: str) -> str:\n \"\"\" Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n \"\"\"\n if not string:\n return ''\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n"
}
[/CONTENT]
2024-07-01 15:30:42.418 | DEBUG | metagpt.actions.action_node:_aask_v1:431 - parsed_data:
{'solution': 'def make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n """\n if not string:\n return \'\'\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n'}
2024-07-01 15:30:42.419 | DEBUG | metagpt.provider.base_llm:aask:151 - [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': '\n## context\n\nFor the question described as \n\ndef is_palindrome(string: str) -> bool:\n """ Test if given string is a palindrome """\n return string == string[::-1]\n\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n >>> make_palindrome(\'\')\n \'\'\n >>> make_palindrome(\'cat\')\n \'catac\'\n >>> make_palindrome(\'cata\')\n \'catac\'\n """\n,\nplease review the following solution: {\'solution\': \'def make_palindrome(string: str) -> str:\\n """ Find the shortest palindrome that begins with a supplied string.\\n Algorithm idea is simple:\\n - Find the longest postfix of supplied string that is a palindrome.\\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\\n """\\n if not string:\\n return \\\'\\\'\\n\\n for i in range(len(string)):\\n if string[i:] == string[i:][::-1]:\\n return string + string[:i][::-1]\\n\\n return string + string[:-1][::-1]\\n\'}, and provide a review result in boolean format.\nIf you believe the solution is capable of resolving the issue, return True; otherwise, return False, and include your comments\n\n\n-----\n\n## format example\n[CONTENT]\n{\n "review_result": false,\n "feedback": ""\n}\n[/CONTENT]\n\n## nodes: "<node>: <type> # <instruction>"\n- review_result: <class \'bool\'> # The Review Result (Bool). If you think this solution looks good for you, return \'true\'; If not, return \'false\'\n- feedback: <class \'str\'> # Your FeedBack for this problem based on the criteria. If the review result is true, you can put it \'nothing here\'.\n\n\n## constraint\nLanguage: Please use the same language as Human INPUT.\nFormat: output wrapped inside [CONTENT][/CONTENT] like format example, nothing else.\n\n## action\nFollow instructions of nodes, generate output and make sure it follows the format example.\n'}]
2024-07-01 15:30:44.222 | INFO | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.000 | Max budget: $10.000 | Current cost: $0.000, prompt_tokens: 585, completion_tokens: 29
2024-07-01 15:30:44.222 | DEBUG | metagpt.actions.action_node:_aask_v1:421 - llm raw output:
[CONTENT]
{
"review_result": true,
"feedback": "nothing here"
}
[/CONTENT]
2024-07-01 15:30:44.224 | DEBUG | metagpt.actions.action_node:_aask_v1:431 - parsed_data:
{'review_result': True, 'feedback': 'nothing here'}
2024-07-01 15:30:44.224 | DEBUG | metagpt.provider.base_llm:aask:151 - [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': '\n## context\n\nGenerate Code Solution for the following problem: \n\ndef is_palindrome(string: str) -> bool:\n """ Test if given string is a palindrome """\n return string == string[::-1]\n\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n >>> make_palindrome(\'\')\n \'\'\n >>> make_palindrome(\'cat\')\n \'catac\'\n >>> make_palindrome(\'cata\')\n \'catac\'\n """\n\n\n\n-----\n\n## format example\n[CONTENT]\n{\n "solution": ""\n}\n[/CONTENT]\n\n## nodes: "<node>: <type> # <instruction>"\n- solution: <class \'str\'> # Your Code Solution for this problem\n\n\n## constraint\nLanguage: Please use the same language as Human INPUT.\nFormat: output wrapped inside [CONTENT][/CONTENT] like format example, nothing else.\n\n## action\nFollow instructions of nodes, generate output and make sure it follows the format example.\n'}]
2024-07-01 15:30:53.135 | INFO | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.000 | Max budget: $10.000 | Current cost: $0.000, prompt_tokens: 318, completion_tokens: 175
2024-07-01 15:30:53.136 | DEBUG | metagpt.actions.action_node:_aask_v1:421 - llm raw output:
[CONTENT]
{
"solution": "def make_palindrome(string: str) -> str:\n \"\"\" Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n \"\"\"\n if not string:\n return ''\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n"
}
[/CONTENT]
2024-07-01 15:30:53.137 | DEBUG | metagpt.actions.action_node:_aask_v1:431 - parsed_data:
{'solution': 'def make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n """\n if not string:\n return \'\'\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n'}
2024-07-01 15:30:53.138 | DEBUG | metagpt.provider.base_llm:aask:151 - [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': '\n## context\n\nFor the question described as \n\ndef is_palindrome(string: str) -> bool:\n """ Test if given string is a palindrome """\n return string == string[::-1]\n\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n >>> make_palindrome(\'\')\n \'\'\n >>> make_palindrome(\'cat\')\n \'catac\'\n >>> make_palindrome(\'cata\')\n \'catac\'\n """\n,\nplease review the following solution: {\'solution\': \'def make_palindrome(string: str) -> str:\\n """ Find the shortest palindrome that begins with a supplied string.\\n Algorithm idea is simple:\\n - Find the longest postfix of supplied string that is a palindrome.\\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\\n """\\n if not string:\\n return \\\'\\\'\\n\\n for i in range(len(string)):\\n if string[i:] == string[i:][::-1]:\\n return string + string[:i][::-1]\\n\\n return string + string[:-1][::-1]\\n\'}, and provide a review result in boolean format.\nIf you believe the solution is capable of resolving the issue, return True; otherwise, return False, and include your comments\n\n\n-----\n\n## format example\n[CONTENT]\n{\n "review_result": false,\n "feedback": ""\n}\n[/CONTENT]\n\n## nodes: "<node>: <type> # <instruction>"\n- review_result: <class \'bool\'> # The Review Result (Bool). If you think this solution looks good for you, return \'true\'; If not, return \'false\'\n- feedback: <class \'str\'> # Your FeedBack for this problem based on the criteria. If the review result is true, you can put it \'nothing here\'.\n\n\n## constraint\nLanguage: Please use the same language as Human INPUT.\nFormat: output wrapped inside [CONTENT][/CONTENT] like format example, nothing else.\n\n## action\nFollow instructions of nodes, generate output and make sure it follows the format example.\n'}]
2024-07-01 15:30:55.232 | INFO | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.000 | Max budget: $10.000 | Current cost: $0.000, prompt_tokens: 585, completion_tokens: 29
2024-07-01 15:30:55.233 | DEBUG | metagpt.actions.action_node:_aask_v1:421 - llm raw output:
[CONTENT]
{
"review_result": true,
"feedback": "nothing here"
}
[/CONTENT]
2024-07-01 15:30:55.234 | DEBUG | metagpt.actions.action_node:_aask_v1:431 - parsed_data:
{'review_result': True, 'feedback': 'nothing here'}
2024-07-01 15:30:55.234 | DEBUG | metagpt.provider.base_llm:aask:151 - [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': '\n## context\n\nFor the question described as \n\ndef is_palindrome(string: str) -> bool:\n """ Test if given string is a palindrome """\n return string == string[::-1]\n\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n >>> make_palindrome(\'\')\n \'\'\n >>> make_palindrome(\'cat\')\n \'catac\'\n >>> make_palindrome(\'cata\')\n \'catac\'\n """\n, Solutions: def make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n """\n if not string:\n return \'\'\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n\ndef make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n """\n if not string:\n return \'\'\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]\n\n\nPlease select the solution that appears most frequently from these options and provide the best solution based on that.\n\n\n-----\n\n## format example\n[CONTENT]\n{\n "solution": ""\n}\n[/CONTENT]\n\n## nodes: "<node>: <type> # <instruction>"\n- solution: <class \'str\'> # Final ensemble solution for this problem\n\n\n## constraint\nLanguage: Please use the same language as Human INPUT.\nFormat: output wrapped inside [CONTENT][/CONTENT] like format example, nothing else.\n\n## action\nFollow instructions of nodes, generate output and make sure it follows the format example.\n'}]
2024-07-01 15:31:03.826 | INFO | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.001 | Max budget: $10.000 | Current cost: $0.000, prompt_tokens: 635, completion_tokens: 173
2024-07-01 15:31:03.827 | DEBUG | metagpt.actions.action_node:_aask_v1:421 - llm raw output:
[CONTENT]
{
"solution": "def make_palindrome(string: str) -> str:\n \"\"\" Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n \"\"\"\n if not string:\n return ''\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]"
}
[/CONTENT]
2024-07-01 15:31:03.830 | DEBUG | metagpt.actions.action_node:_aask_v1:431 - parsed_data:
{'solution': 'def make_palindrome(string: str) -> str:\n """ Find the shortest palindrome that begins with a supplied string.\n Algorithm idea is simple:\n - Find the longest postfix of supplied string that is a palindrome.\n - Append to the end of the string reverse of a string prefix that comes before the palindromic suffix.\n """\n if not string:\n return \'\'\n\n for i in range(len(string)):\n if string[i:] == string[i:][::-1]:\n return string + string[:i][::-1]\n\n return string + string[:-1][::-1]'}

View file

@ -3,174 +3,132 @@
# @Author : didi
# @Desc : test on human eval graph
# 1. 出效果
# 2. 代码方面,格式问题,很多格式处理 ->增加效果
# 3. GSM8k ->
# 4. 我来写一个GSM8k最基础代码GSM8k实验代码需要你来改写
import os
import asyncio
import json
import os
import subprocess
import sys
import asyncio
from typing import Literal, Optional
import aiofiles
from metagpt.llm import LLM
from evalplus.data import get_human_eval_plus, write_jsonl
from examples.ags.w_action_node.utils import jsonl_ranker
from evalplus.data import get_human_eval_plus
from examples.ags.w_action_node.graph import HumanEvalGraph
from examples.ags.w_action_node.operator import GenerateCode, GenerateCodeBlock
from examples.ags.w_action_node.utils import sort_json_by_key
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.utils.common import add_jsonl_file, read_json_file
from metagpt.utils.exceptions import handle_exception
generate_code = GenerateCode(llm=LLM())
generate_code_block = GenerateCodeBlock(llm=LLM())
solver = HumanEvalGraph(name="solver", llm=LLM(), criteria='correctness, efficiency, readability', vote_count=1)
solver = HumanEvalGraph(name="solver", llm=LLM(), criteria="correctness, efficiency, readability", vote_count=5)
async def sample_generate(id, result_path:str="samples.jsonl",mode:str="ags"):
ModeType = Literal["ags", "alpha_codium", "llm"]
async def llm_generate(id):
case = get_human_eval_plus()[f"{id}"]
solution_result = await generate_code_block(case["prompt"], case["entry_point"])
sample_dict = dict(task_id=case["task_id"], solution=solution_result["code_solution"])
return sample_dict
async def ags_generate(id, ensemble_count: int = 5):
case = get_human_eval_plus()[f"{id}"]
solution_result = await solver(case["prompt"], ensemble_count=ensemble_count)
sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"])
return sample_dict
async def alpha_codium_generate(id):
case = get_human_eval_plus()[f"{id}"]
solution_result = await solver.alpha_codium(case["task_id"], case["prompt"], ensemble_count=5)
sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"])
return sample_dict
async def route_generate(mode: ModeType, id: str):
if mode == "ags":
solution_result = await solver(case['prompt'],ensemble_count=5)
sample_dict = dict(task_id=case['task_id'], solution=solution_result['final_solution'])
elif mode == "alpha":
solution_result = await solver.alpha_codium(case['task_id'], case['prompt'], ensemble_count=5)
sample_dict = dict(task_id=case['task_id'], solution=solution_result['final_solution'])
sample_dict = await ags_generate(id)
elif mode == "alpha_codium":
sample_dict = await alpha_codium_generate(id)
elif mode == "llm":
solution_result = await generate_code_block(case['prompt'],case['entry_point'])
sample_dict = dict(task_id=case['task_id'], solution=solution_result['code_solution'])
print(sample_dict)
with open(result_path, mode='a') as f:
f.write(json.dumps(sample_dict) + '\n')
jsonl_ranker(result_path, result_path)
sample_dict = await llm_generate(id)
else:
raise ValueError(f"Invalid mode: {mode}")
return sample_dict
async def samples_generate(mode:str, result_path:str="samples.jsonl"):
cases = list(get_human_eval_plus().values())
async def sample_generate(id, result_path: str = "samples.jsonl", mode: ModeType = "ags"):
sample_dict = await route_generate(mode, id)
add_jsonl_file(result_path, [sample_dict])
sort_json_by_key(result_path, result_path)
async def samples_generate(mode: ModeType, result_path: str = "samples.jsonl"):
ids = list(get_human_eval_plus().keys())
file_lock = asyncio.Lock()
async def solve_and_write(case, mode):
try:
if mode == 'llm':
solution_result = await generate_code_block(problem_description=case['prompt'], function_name=case['entry_point'])
# solution_result = await generate_code(case['prompt'])
sample_dict = {
'task_id': case['task_id'],
'solution': solution_result['code_solution']
}
elif mode == "ags":
solution_result = await solver(case['prompt'], ensemble_count=5)
sample_dict = {
'task_id': case['task_id'],
'solution': solution_result['final_solution']
}
elif mode == "alpha":
solution_result = await solver.alpha_codium(case['task_id'], case['prompt'], ensemble_count=1)
sample_dict = {
'task_id': case['task_id'],
'solution': solution_result['final_solution']
}
# TODO 解决 final_solution 问题之后就可以开始正式测评了
async with file_lock:
async with aiofiles.open(result_path, mode='a') as f:
await f.write(json.dumps(sample_dict) + '\n')
return None
except Exception as e:
print(e)
return case['task_id']
@handle_exception(
exception_type=Exception,
exception_msg="Error in solve_and_write function",
default_return=lambda id, *args, **kwargs: id,
)
async def solve_and_write(id: str, mode: ModeType) -> Optional[str]:
sample_dict = await route_generate(mode, id)
async with file_lock:
async with aiofiles.open(result_path, mode="a") as f:
await f.write(json.dumps(sample_dict) + "\n")
return None
tasks = [solve_and_write(case, mode) for case in cases]
tasks = [solve_and_write(id, mode) for id in ids]
results = await asyncio.gather(*tasks)
failed_tasks = [task_id for task_id in results if task_id is not None]
if failed_tasks:
print(failed_tasks)
if mode == 'llm':
for task_id in failed_tasks:
case = get_human_eval_plus()[task_id]
for _ in range(3):
try:
solution_result = await generate_code_block(case['prompt'],function_name=case['entry_point'])
task_dict = {
'task_id': case['task_id'],
'solution': solution_result['code_solution']
}
with open(result_path, mode='a') as f:
f.write(json.dumps(task_dict) + '\n')
failed_tasks.remove(task_id)
break
except Exception as e:
print(f"{e} \n failure {task_id}")
elif mode == "ags" or mode == "alpha":
for task_id in failed_tasks:
try:
await sample_generate(task_id,result_path,mode)
except Exception as e:
print(f"failure {task_id}")
jsonl_ranker(result_path, result_path)
logger.info(failed_tasks)
for task_id in failed_tasks:
try:
await sample_generate(task_id, result_path, mode)
failed_tasks.remove(task_id)
except Exception:
logger.error(f"{task_id} fail")
sort_json_by_key(result_path, result_path)
if not failed_tasks:
# 自动 sanitize
# result_path = automatic_sanitize(result_path)
if automatic_evalplus(result_path):
eval_path = result_path[:-6]+"_eval_results.json"
eval_path = result_path[:-6] + "_eval_results.json"
unpassed_exapmle = extract_failure_tests(eval_path)
print(unpassed_exapmle)
logger.info(unpassed_exapmle)
else:
print(failed_tasks)
logger.info(failed_tasks)
async def samples_generate_ags():
sample_list = []
cases = list(get_human_eval_plus().values())
async def solve_with_id(case):
solution_result = await solver(case['prompt'], ensemble_count=5)
return case['task_id'], solution_result['final_solution']
tasks = [solve_with_id(case) for case in cases]
results = await asyncio.gather(*tasks)
for task_id, solution in results:
sample_dict = dict(task_id=task_id, solution=solution)
sample_list.append(sample_dict)
write_jsonl("samples.jsonl", sample_list)
async def samples_generate_llm():
sample_list = []
cases = list(get_human_eval_plus().values())
async def solve_with_id(case):
solution_result = await generate_code_block(case['prompt'])
# solution_result = await generate_code(case['prompt'])
return case['task_id'], solution_result['code_solution']
tasks = [solve_with_id(case) for case in cases]
results = await asyncio.gather(*tasks)
for task_id, solution in results:
sample_dict = dict(task_id=task_id, solution=solution)
sample_list.append(sample_dict)
write_jsonl("samples.jsonl", sample_list)
def automatic_sanitize(result_path: str = "samples.jsonl"):
@handle_exception(exception_type=subprocess.CalledProcessError, exception_msg="sanitize error", default_return=None)
def automatic_sanitize(result_path: str = "samples.jsonl") -> Optional[str]:
"""
在命令行中自动执行 evalplus.sanitize --samples result_path
返回result_path前缀加上"-sanitized.jsonl"
"""
command = ["evalplus.sanitize", "--samples", result_path]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"执行命令时出错: {e}")
return None
# 构建sanitized文件路径
subprocess.run(command, check=True)
base_name = os.path.splitext(result_path)[0]
sanitized_path = f"{base_name}-sanitized.jsonl"
return sanitized_path
def automatic_evalplus(result_path:str ="samples.jsonl"):
@handle_exception(
exception_type=subprocess.CalledProcessError,
exception_msg="Error in automatic_evalplus function",
default_return=False,
)
def automatic_evalplus(result_path: str = "samples.jsonl") -> bool:
"""
在命令行中自动执行 evalplus.evaluate --dataset humaneval --samples samples.jsonl --parallel 2 --base-only
"""
@ -178,41 +136,30 @@ def automatic_evalplus(result_path:str ="samples.jsonl"):
sys.executable, # 使用当前 Python 解释器
"-m",
"evalplus.evaluate",
"--dataset", "humaneval",
"--samples", result_path,
"--parallel", "2",
"--base-only"
"--dataset",
"humaneval",
"--samples",
result_path,
"--parallel",
"2",
"--base-only",
]
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
print("输出:", result.stdout)
return True
except subprocess.CalledProcessError as e:
print("错误输出:", e.stderr)
return False
def extract_failure_tests(file_path:str = "samples_eval_results.json"):
with open(file_path, 'r') as f:
task_results = json.load(f)
result = subprocess.run(command, check=True, capture_output=True, text=True)
logger.info(f"ouptput: \n {result.stdout}")
return True
def extract_failure_tests(file_path: str = "samples_eval_results.json"):
task_results = read_json_file(file_path)
failed_tests = []
for task in task_results['eval'].values():
for task in task_results["eval"].values():
if task[0]["base_status"] == "fail":
failed_test = {
"task_id": task[0]["task_id"],
# "solution": task["solution"],
# "fail_tests": task["base_fail_tests"]
}
failed_tests.append(failed_test)
print(len(failed_tests))
logger.info(f"length of failed tests: {len(failed_tests)}")
return failed_tests
# asyncio.run(sample_generate('HumanEval/101'))
# asyncio.run(samples_generate(mode='ags'))
# jsonl_ranker("samples.jsonl", "samples.jsonl")
# {"task_id": "HumanEval/101", "solution": "def words_string(s):\n import re\n return re.split(r'[,\\s]\\s*', s)"}

View file

@ -1,239 +0,0 @@
# Import necessary libraries and modules
import gzip
import itertools
import json
import os
import subprocess
from typing import Dict, Iterable, List, Union
import numpy as np
import tqdm
from loguru import logger
# Define the root directory as the location of the script
ROOT = os.path.dirname(os.path.abspath(__file__))
# Define the input data file containing human evaluations
HUMAN_EVAL = r"HumanEval.jsonl.gz"
def read_problems(evalset_file: str = HUMAN_EVAL) -> Dict[str, Dict]:
"""
Reads a JSONL file containing problem evaluations and returns them as a dictionary.
Args:
evalset_file (str): Path to the JSONL file.
Returns:
Dict[str, Dict]: A dictionary where task IDs are keys and problem details are values.
"""
return {task["task_id"]: task for task in stream_jsonl(evalset_file)}
def stream_jsonl(filename: str) -> Iterable[Dict]:
"""
Parses a JSONL file and yields each line as a dictionary.
Args:
filename (str): Path to the JSONL file.
Yields:
Iterable[Dict]: A generator of dictionaries representing JSONL lines.
"""
if filename.endswith(".gz"):
with open(filename, "rb") as gzfp:
with gzip.open(gzfp, "rt") as fp:
for line in fp:
if any(not x.isspace() for x in line):
yield json.loads(line)
else:
with open(filename, "r") as fp:
for line in fp:
if any(not x.isspace() for x in line):
yield json.loads(line)
def _generate_examples(filepath, split, name="sanitized"):
if name == "full":
def _read_lines(fn, start, end):
data = []
with open(fn, encoding="utf-8") as f:
for line in f:
sample = json.loads(line)
if start <= sample["task_id"] <= end:
data.append(sample)
elif sample["task_id"] > end:
break
return data
if split == "test":
data = _read_lines(filepath, 11, 510)
elif split == "train":
data = _read_lines(filepath, 601, 974)
elif split == "validation":
data = _read_lines(filepath, 511, 600)
elif split == "prompt":
data = _read_lines(filepath, 1, 10)
elif name == "sanitized":
with open(filepath, encoding="utf-8") as f:
data = json.load(f)
if split == "test":
data = [sample for sample in data if 11 <= sample["task_id"] <= 510]
elif split == "train":
data = [sample for sample in data if 601 <= sample["task_id"] <= 974]
elif split == "validation":
data = [sample for sample in data if 511 <= sample["task_id"] <= 600]
elif split == "prompt":
data = [sample for sample in data if 1 <= sample["task_id"] <= 10]
id_ = 0
for sample in data:
yield id_, sample
id_ += 1
def write_jsonl(filename: str, data: Iterable[Dict], append: bool = False):
"""
Writes an iterable of dictionaries to a JSONL file.
Args:
filename (str): Path to the output JSONL file.
data (Iterable[Dict]): Data to write as JSONL.
append (bool): If True, appends to an existing file, else creates a new file.
"""
# Determine the file writing mode based on the 'append' flag
if append:
mode = "ab"
else:
mode = "wb"
filename = os.path.expanduser(filename)
# Handle .gz compression
if filename.endswith(".gz"):
with open(filename, mode) as fp:
with gzip.GzipFile(fileobj=fp, mode="wb") as gzfp:
for x in data:
gzfp.write((json.dumps(x) + "\n").encode("utf-8"))
else:
with open(filename, mode) as fp:
for x in data:
fp.write((json.dumps(x) + "\n").encode("utf-8"))
def execution(task_id, check_program):
"""
Executes a Python program and captures its output.
Args:
task_id: A unique identifier for the task.
check_program: The Python program to execute.
Returns:
bool: True if the execution was successful, False otherwise.
"""
process = subprocess.Popen(["python", "-c", f"{check_program}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
# Wait for the process to complete, with a timeout
stdout, stderr = process.communicate(timeout=30)
if len(stderr) == 0:
# logger.info(f"{task_id}: passed")
passed = True
elif b"OK" in stderr:
# logger.info(f"{task_id}: passed, {stderr}")
passed = True
else:
logger.info(f"{task_id}: error: {stderr}")
passed = False
except subprocess.TimeoutExpired:
logger.info("The command did not complete within the given timeout.")
process.kill() # Kill the process if it times out
logger.info(f"{task_id}: error")
passed = False
return passed
def estimate_pass_at_k(
num_samples: Union[int, List[int], np.ndarray], num_correct: Union[List[int], np.ndarray], k: int
) -> np.ndarray:
"""
Estimates pass@k of each problem and returns them in an array.
Args:
num_samples: Number of total samples (can be an int, list, or NumPy array).
num_correct: Number of correct samples (list or NumPy array).
k (int): The 'k' value for pass@k.
Returns:
np.ndarray: An array of pass rates for each problem.
"""
# Define a pass rate estimator function
def estimator(n: int, c: int, k: int) -> float:
if n - c < k:
return 1.0
return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))
# Determine the number of samples based on the input type
if isinstance(num_samples, int):
num_samples_it = itertools.repeat(num_samples, len(num_correct))
else:
assert len(num_samples) == len(num_correct)
num_samples_it = iter(num_samples)
# Calculate pass rates for each problem
return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)])
def evaluate(total: List, correct: List, ks: List = [1, 10]):
"""
Evaluates and logs pass rates at various 'k' values.
Args:
total (List): List of total samples.
correct (List): List of correct samples.
ks (List): List of 'k' values to evaluate.
Returns:
dict: A dictionary of pass rates at each 'k' value.
"""
total = np.array(total)
correct = np.array(correct)
# Calculate and log pass rates at each 'k' value
pass_at_k = {f"pass@{k}": estimate_pass_at_k(total, correct, k).mean() for k in ks if (total >= k).all()}
logger.info(pass_at_k)
return pass_at_k
if __name__ == "__main__":
logger.info("Reading samples...")
problems = read_problems(HUMAN_EVAL)
total, correct = [], []
passed = []
for sample in tqdm.tqdm(stream_jsonl("example_samples.jsonl")):
task_id = sample["task_id"]
completion = sample["completion"]
problem = problems[task_id]
# Construct a check program
check_program = completion + "\n" + problem["test"] + "\n" + f"check({problem['entry_point']})"
# Execute the check program and capture the result
passed_flg = execution(task_id, check_program)
if not passed_flg:
logger.debug("error")
else:
logger.debug("passed")
passed.append(len(passed))
total.append(len(passed))
correct.append(sum(passed))
# Evaluate pass rates at various 'k' values
evaluate(total, correct, ks=[1, 5, 10])

View file

@ -1,101 +0,0 @@
from typing import Any, Dict, List, Callable
from abc import ABC, abstractmethod
class LLM:
def ask(self, text: str) -> str:
# Implement LLM query logic here
pass
class Operator(ABC):
def __init__(self, llm: LLM):
self.llm = llm
@abstractmethod
def forward(self, *args: Any, **kwargs: Any) -> Any:
pass
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self.forward(*args, **kwargs)
class Generate(Operator):
def __init__(self, llm: LLM, prompt: str):
super().__init__(llm)
self.prompt = prompt
def forward(self, input_problem: str) -> str:
return self.llm.ask(f"{self.prompt}\n{input_problem}")
class Review(Operator):
def __init__(self, llm: LLM, criteria: List[str]):
super().__init__(llm)
self.criteria = criteria
def forward(self, solution: str) -> Dict[str, float]:
review_prompt = f"Review the following solution based on these criteria: {', '.join(self.criteria)}\n\nSolution: {solution}"
review_result = self.llm.ask(review_prompt)
# Parse the review_result to extract scores
return {criteria: float(review_result.split(criteria)[1].split()[0]) for criteria in self.criteria}
class Module:
def __init__(self, llm: LLM):
self.llm = llm
def forward(self, x: Any) -> Any:
raise NotImplementedError("Subclasses must implement forward method")
def __call__(self, x: Any) -> Any:
return self.forward(x)
class CodeGenerationModule(Module):
def __init__(self, llm: LLM):
super().__init__(llm)
self.generate = Generate(llm, "Generate a Python function for the following problem:")
self.review = Review(llm, ["correctness", "efficiency", "readability"])
def forward(self, problem: str) -> Dict[str, Any]:
solution = self.generate(problem)
review = self.review(solution)
return {"solution": solution, "review": review}
def optimize(module: Module, loss_fn: Callable[[Dict[str, Any]], float], iterations: int = 10):
for _ in range(iterations):
# This is a placeholder for the optimization logic
# In a real implementation, you would:
# 1. Run the module on some input
# 2. Compute the loss
# 3. Use the loss to improve the module (e.g., by adjusting prompts or using LLM feedback)
pass
# Usage
llm = LLM()
code_gen = CodeGenerationModule(llm)
# Solve a problem
result = code_gen("Write a function to calculate the factorial of a number")
print(result)
# Define a loss function
def loss_function(output: Dict[str, Any]) -> float:
# Implement your loss computation here
# For example, you might use the review scores
return 1.0 - output["review"].get("correctness", 0)
# Optimize the module
optimize(code_gen, loss_function, iterations=10)
# You can also create custom modules easily
class CustomModule(Module):
def __init__(self, llm: LLM):
super().__init__(llm)
self.op1 = Generate(llm, "Custom prompt 1")
self.op2 = Review(llm, ["custom_criteria"])
def forward(self, x: str) -> Dict[str, Any]:
intermediate = self.op1(x)
final = self.op2(intermediate)
return {"result": final}
custom_module = CustomModule(llm)
custom_result = custom_module("Custom input")
print(custom_result)

View file

@ -1,82 +0,0 @@
from metagpt import nn
import metagpt.functional as F
class Generate(nn.Module):
def __init__(self, model_name):
super(Generate, self).__init__()
self.model = nn.LLM(model_name)
def forward(self, prompt):
return self.model.generate(prompt)
class Review(nn.Module):
def __init__(self, criteria):
super(Review, self).__init__()
self.criteria = criteria
def forward(self, generated_code):
return F.analyze(generated_code, self.criteria)
class Revise(nn.Module):
def __init__(self, model_name):
super(Revise, self).__init__()
self.model = nn.LLM(model_name)
def forward(self, original_code, review_feedback):
prompt = f"Original code:\n{original_code}\n\nFeedback:\n{review_feedback}\n\nRevised code:"
return self.model.generate(prompt)
class Ensemble(nn.Module):
def __init__(self, strategy='majority_vote'):
super(Ensemble, self).__init__()
self.strategy = strategy
def forward(self, solutions):
return F.ensemble(solutions, strategy=self.strategy)
class LLMAgent(nn.Module):
def __init__(self, generate_model, review_criteria, revise_model):
super(LLMAgent, self).__init__()
self.generate = Generate(generate_model)
self.review = Review(review_criteria)
self.revise = Revise(revise_model)
self.ensemble = Ensemble()
def forward(self, problem_description, num_iterations=3):
solutions = []
for _ in range(num_iterations):
# 生成初始解决方案
initial_solution = self.generate(problem_description)
# 审查解决方案
review_feedback = self.review(initial_solution)
# 根据反馈修改解决方案
revised_solution = self.revise(initial_solution, review_feedback)
solutions.append(revised_solution)
# 整合多个解决方案
final_solution = self.ensemble(solutions)
return final_solution
# 示例使用
problem = """
Human: Write a function that takes a list of numbers and returns the sum of the numbers at even indices.
Function Signature:
def sum_even_indices(numbers: List[int]) -> int:
Example:
>>> sum_even_indices([1, 2, 3, 4, 5])
9 # 1 + 3 + 5 = 9
"""
agent = LLMAgent(
generate_model="gpt-3.5-turbo",
review_criteria=["correctness", "efficiency", "readability"],
revise_model="gpt-4"
)
solution = agent(problem)
print(solution)

View file

@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/26/2024 17:07 PM
# @Author : didi
# @Desc : graph demo of ags
from examples.ags.demo.operator import Generate, GenerateCode, Review, Revise, Ensemble, LLM
class Graph:
def __init__(self, name:str, llm:str) -> None:
self.name = name
self.model = llm # TODO 抽象一个逻辑用不同的model适配不同的算子
def __call__():
NotImplementedError("Subclasses must implement __call__ method")
class HumanEvalGraph(Graph):
def __init__(self, name:str, llm: str, criteria:str) -> None:
super().__init__(name, llm)
self.criteria = criteria # TODO 有位置参数的生成逻辑是基于算子的要求
self.generate_code = GenerateCode(llm=LLM(model=llm))
self.review = Review(llm=LLM(model=llm), criteria=criteria)
self.revise = Revise(llm=LLM(model=llm))
self.ensemble = Ensemble(llm=LLM(model=llm))
def __call__(self, problem):
# TODO 我先来实现一版不带Ensemble的版本
solution = self.generate_code(problem)
# review & revise loop
for _ in range(3):
review_feedback = self.review(problem, solution)
if review_feedback['result']:
break
solution = self.revise(solution, review_feedback['feedback'])
return solution

View file

@ -1,168 +0,0 @@
# 第一段代码是MedPrompt一种利用利用LLM产生多种答案然后进行洗牌投票来选出最优决策的方法
# 我需要你首先理解这个方法,然后将这个方法与我的代码结合起来
# 我的代码如下我们会接收到多个答案我需要你将这个答案利用MedPrompt的方法进行处理。
# 在我的代码中产生llm answer是用 await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm) 实现的。
class ScEnsemble(Ensemble):
def __init__(self, name:str ="Ensembler", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, solutions:List, problem_description):
solution_text = ""
for index, solution in enumerate(solutions):
solution_text += f"Solution{index}: {str(solution)}" + "\n"
prompt = ENSEMBLE_PROMPT.format(solutions=solution_text, problem_description=problem_description)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class Medprompt(QASystem):
def __init__(
self,
agents: list,
num_reasoning_steps: int,
debate_prompts: dict,
verbose: bool = False,
name: Optional[str] = None,
mock: bool = False, # Unused
agent_prompts: Optional[dict] = None, # Unused
):
super().__init__(verbose=verbose)
assert len(agents) == 1
self._num_reasoning_steps = num_reasoning_steps
self._agent = agents[0]
self._agent_names = [type(agent).__name__ for agent in agents]
self.prompts = debate_prompts
"""
This is an implementation of the Medprompt system take
from https://arxiv.org/abs/2311.16452
The system is comprised of a single agent prompted to provide multiple
answers and explainations via temperature sampling and question shuffling.
The final answer is determined by taking the most frequent answer provided
by the agent during the aggregation.
IMPORTANT: The current implementation only contains the first three steps
of the Medprompt setup. Therefore additional improvements can be made
by including the kNN and Ensemble with choice shuffling as well.
"""
# Setup debate metrics
def metrics(
self, info: Dict[str, Any], format_solution_fn: Callable, solution: str
) -> Dict[str, Any]:
return construct_agent_metrics(
info=info,
format_solution_fn=format_solution_fn,
solution=solution,
verbose=self._verbose,
agents=["Agent_0"],
agent_names=self._agent_names,
num_rounds=self._num_reasoning_steps,
)
@staticmethod
def shuffle_answers(question: str) -> Tuple[str, Any]:
"""
Takes in a multiple choice question string and shuffles only the answer texts,
keeping the answer labels (A, B, C, etc.) intact.
Also returns a mapping of shuffled choices to original choices.
"""
# Find the start of the answer section (e.g., '\nA:')
answer_section_start = re.search(r"\n[A-Z]:", question).start() # type: ignore
# Split the question from the answers
main_question = question[:answer_section_start]
answers = question[answer_section_start + 1 :].split("\n")
# Filter out answers that are not in the correct format
# answers = [answer for answer in answers if ": " == answer[1:3]]
# Extract answer texts
answer_texts = [answer.split(": ", 1)[1] for answer in answers]
# assert len(answer_texts) > 0
# Shuffle the answer texts and create a mapping to original answers
shuffled_texts = answer_texts.copy()
random.shuffle(shuffled_texts)
answer_mapping = {
chr(65 + i): answers[answer_texts.index(text)][0]
for i, text in enumerate(shuffled_texts)
}
# Reassemble the shuffled answers with original labels
shuffled_answers = [
f"{chr(65 + i)}: {text}" for i, text in enumerate(shuffled_texts)
]
# Reassemble the question
shuffled_question = main_question + "\n" + "\n".join(shuffled_answers)
return shuffled_question, answer_mapping
def answer(
self,
question: str,
) -> Tuple[str, Any]:
agent_answers: Any = {"Agent_0": {}}
agent_info: Any = {"Agent_0": {}}
agent_responses: Any = {"Agent_0": {}}
if self._verbose:
print("#######################")
print("REASONING STEP")
print("#######################")
message_history: List[Dict[str, str]] = []
for i in range(self._num_reasoning_steps):
try:
# TODO: Provide the options to the system as well. This would
# make it much easier to shuffle the answers. Furthermore, remove
# all questions without options in load_datasets.py.
shuffled_question, answer_mapping = self.shuffle_answers(question)
except Exception as e:
shuffled_question = question
answer_mapping = {"A": "A", "B": "B", "C": "C", "D": "D", "E": "E"}
print("question: ", question)
print("Shuffling failed, using original question: ", e)
answer, info = self._agent.answer(
question=shuffled_question,
system_message=self.prompts["system"],
)
# Dummy data to check the suffler.
# answer = "A"
# info = {"prompt_tokens": 1234, "response_tokens": 1234,
# "response": "I don't know, A.",
# "cost": 0.0, "num_messages_removed": 0.0,
# "answer_duration": 1.0, "engine": "Diesel"}
# Map the answer back to the original answer
if answer in answer_mapping:
answer = answer_mapping[answer]
message_history.append(
{"agent_name": f"Reasoning_{i}", "content": info["response"]}
)
agent_answers["Agent_0"][f"Reasoning_{i}"] = answer
agent_responses["Agent_0"][f"Reasoning_{i}"] = info["response"]
agent_info["Agent_0"][f"Reasoning_{i}"] = info
final_answers = [
agent_answers["Agent_0"][f"Reasoning_{i}"]
for i in range(self._num_reasoning_steps)
]
answer, _ = most_frequent(final_answers)
return answer, {
"response": agent_responses,
"agent_answers": agent_answers,
"agent_info": agent_info,
}

View file

@ -1,109 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/26/2024 17:07 PM
# @Author : didi
# @Desc : operator demo of ags
import json
from openai import OpenAI
from examples.ags.demo.prompt import GENERATE_PROMPT, GENERATE_CODE_PROMPT, REVIEW_PROMPT, REVISE_PROMPT, ENSEMBLE_PROMPT
class LLM():
def __init__(self, model:str='gpt-4-turbo', timeout:int=60):
self.model = model
self.timeout = timeout
self.api_key = ''
self.base_url = ''
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
self.system_prompt = None
def ask(self, text: str, json_mode: bool = False, temperature: float = 0.7, retries: int = 5):
response_type = "text" if not json_mode else "json_object"
messages = [{"role": "user", "content": text}] if self.system_prompt == None else [
{"role": "system", "content": self.system_prompt}, {"role": "user", "content": text}]
for i in range(retries):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
response_format={"type": response_type}
)
if json_mode:
result = response.choices[0].message.content
result = json.loads(result)
else:
result = response.choices[0].message.content
print(result)
return result
except Exception as e:
print(f"{__name__} occurs: {e}")
class Operator:
def __init__(self, name, llm:LLM=None):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Generate(Operator):
"""
Generate code & Generate text 应该被分开
"""
def __init__(self, name:str ="Generator", llm: LLM = LLM()):
super().__init__(name, llm)
def __call__(self, problem_description):
prompt = GENERATE_PROMPT.format(problem_description=problem_description)
response = self.llm.ask(prompt, json_mode=True)
return {"solution": response.get("solution")}
class GenerateCode(Operator):
def __init__(self, name:str ="Coder", llm: LLM = LLM()):
super().__init__(name, llm)
def __call__(self, problem_description):
prompt = GENERATE_CODE_PROMPT.format(problem_description=problem_description)
response = self.llm.ask(prompt, json_mode=True)
return {"code": response.get("code")}
class Review(Operator):
def __init__(self, criteria, name:str ="Reviewer", llm: LLM = LLM()):
self.criteria = criteria
super().__init__(name, llm)
# TODO 有点搞笑我忘记加上criteria了
def __call__(self, problem_description, solution):
prompt = REVIEW_PROMPT.format(problem_description=problem_description, solution=solution)
response = self.llm.ask(prompt, json_mode=True)
if response.get("result") == True:
return {"result": True}
else:
return {"result":False, "feedback":response.get('feedback')}
class Revise(Operator):
def __init__(self, name:str ="Reviser", llm: LLM = LLM()):
super().__init__(name, llm)
def __call__(self, problem_description, solution, feedback):
prompt = REVISE_PROMPT.format(problem_description=problem_description, solution=solution, feedback=feedback)
response = self.llm.ask(prompt, json_mode=True)
return {"revised_solution": response.get("revised_solution")}
class Ensemble(Operator):
def __init__(self, name:str ="Ensembler", llm: LLM = LLM()):
super().__init__(name, llm)
def __call__(self, *args, problem_description):
solutions = ""
for solution in args:
solutions += solution + "\n"
prompt = ENSEMBLE_PROMPT.format(solutions=solutions, problem_description=problem_description)
response = self.llm.ask(prompt, json_mode=True)
return {"ensembled_solution": response.get("ensembled_solution")}

View file

@ -1,57 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/26/2024 17:07 PM
# @Author : didi
# @Desc : prompts of operators
GENERATE_PROMPT = """
Generate Solution for the following problem: {problem_description}
Please structure your response in JSON format as follows:
{{
"solution": "<your solution>"
}}
"""
GENERATE_CODE_PROMPT = """
Generate Code Solution for the following problem: {problem_description}
Please structure your response in JSON format as follows:
{{
"code": "<your code>"
}}
"""
REVIEW_PROMPT = """
For the question described as {problem_description},
please review the following solution: {solution}, and provide a review result in boolean format.
If you believe the solution is capable of resolving the issue, return True; otherwise, return False, and include your comments
Please structure your response in JSON format as follows:
{{
"result": <result>,
"comment": "<if result is ture, don't response this>"
}}
"""
REVISE_PROMPT = """
For the question described as {problem_description},
please evaluate and revise the solution provided: {solution}, taking into account the review comments: {comment}."
Then output the revised solution.
Please structure your response in JSON format as follows:
{{
"revised_solution": "<your revised solution>"
}}
"""
ENSEMBLE_PROMPT = """
For the question described as {problem_description},
please ensemble the following solutions: {solutions}, and provide an ensemble result.
Please structure your response in JSON format as follows:
{{
"ensembled_solution": "<your ensembled solution>"
}}
"""

View file

@ -3,26 +3,41 @@
# @Author : didi
# @Desc : graph & an instance - humanevalgraph
from metagpt.llm import LLM
from typing import List
from examples.ags.w_action_node.operator import Generate, GenerateCode, GenerateCodeBlock, Review, Revise, FuEnsemble, MdEnsemble, DbEnsemble, Rephrase, Test
from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl
from evalplus.data import get_human_eval_plus
from examples.ags.w_action_node.operator import (
FuEnsemble,
Generate,
GenerateCode,
GenerateCodeBlock,
MdEnsemble,
Rephrase,
Review,
Revise,
Test,
)
from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl
from metagpt.llm import LLM
class Graph:
def __init__(self, name:str, llm:LLM) -> None:
def __init__(self, name: str, llm: LLM) -> None:
self.name = name
self.model = llm
self.model = llm
def __call__():
NotImplementedError("Subclasses must implement __call__ method")
def optimize(dataset:List):
def optimize(dataset: List):
pass
class HumanEvalGraph(Graph):
def __init__(self, name:str, llm: LLM, criteria:str, vote_count:int =5) -> None:
def __init__(self, name: str, llm: LLM, criteria: str, vote_count: int = 5) -> None:
super().__init__(name, llm)
self.criteria = criteria # TODO 自动构建图时,图的初始参数与图所使用的算子要求的外部参数相匹配
self.criteria = criteria # TODO 自动构建图时,图的初始参数与图所使用的算子要求的外部参数相匹配
self.generate_code = GenerateCode(llm=llm)
self.generate_code_block = GenerateCodeBlock(llm=llm)
self.review = Review(llm=llm, criteria=criteria)
@ -32,82 +47,82 @@ class HumanEvalGraph(Graph):
self.fuensemble = FuEnsemble(llm=llm)
self.mdensemble = MdEnsemble(llm=llm, vote_count=vote_count)
async def __call__(self, problem:str, ensemble_count:int = 3):
async def __call__(self, problem: str, ensemble_count: int = 3):
solution_list = []
for _ in range(ensemble_count):
for retry_count in range(5):
try:
# solution = await self.generate_code(problem)
solution = await self.generate_code_block(problem)
solution = solution.get('code_solution')
solution_list.append(solution)
break
except Exception as e:
print(e)
solution = await self.generate_code_block(problem)
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.mdensemble("code", solution_list, problem)
return solution
async def alpha_codium(self, problem_id:str, problem:str, ensemble_count:int = 3):
# async def __call__(self,problem_id, problem:str, ensemble_count:int = 3):
async def alpha_codium(self, problem_id: str, problem: str, ensemble_count: int = 3):
"""
Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering
Link: https://arxiv.org/abs/2404.14963
Flow: An incomplete version of alpha codium, implementing the basic process of rephrase -> code ensemble -> tes
"""
test_cases = extract_test_cases_from_jsonl(problem_id)
entry_point = get_human_eval_plus()[problem_id]['entry_point']
rephrase_problem = await self.rephrase(problem) # 在rephrase 中拼接原始的问题描述
entry_point = get_human_eval_plus()[problem_id]["entry_point"]
rephrase_problem = await self.rephrase(problem) # 在rephrase 中拼接原始的问题描述
solution_list = []
for _ in range(ensemble_count):
for retry_count in range(5):
try:
solution = await self.generate_code_block.rephrase_generate(problem, rephrase_problem, function_name=entry_point)
solution = solution.get('code_solution')
solution_list.append(solution)
break
except Exception as e:
print(e)
solution = await self.generate_code_block.rephrase_generate(
problem, rephrase_problem, function_name=entry_point
)
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.mdensemble("code", solution_list, problem)
solution = await self.tester(problem_id, problem, rephrase_problem, solution, test_cases)
return solution
async def review_revise_ensemble(self, problem:str, ensemble_count:int = 2):
async def review_revise_ensemble(self, problem: str, ensemble_count: int = 2, revise_round: int = 3):
solution_list = []
for _ in range(ensemble_count):
solution = await self.single_solve(problem, 3)
solution = await self.single_solve(problem, revise_round)
solution_list.append(solution)
solution = await self.ensemble(solution_list, problem)
return solution
async def simple_ensemble(self, problem:str, ensemble_count:int = 3):
# async def __call__(self, problem:str, ensemble_count:int = 3):
async def simple_ensemble(self, problem: str, ensemble_count: int = 3):
solution_list = []
for _ in range(ensemble_count):
solution = await self.generate_code(problem)
# solution = await self.generate_code_block(problem)
solution = solution.get('code_solution')
solution = solution.get("code_solution")
solution_list.append(solution)
solution = await self.fuensemble(solution_list, problem)
return solution
async def single_solve(self, problem:str, max_loop:int):
async def single_solve(self, problem: str, max_loop: int):
solution = await self.generate_code(problem)
solution = solution.get('code_solution')
solution = solution.get("code_solution")
for _ in range(max_loop):
review_feedback = await self.review(problem, solution)
if review_feedback['review_result']:
if review_feedback["review_result"]:
break
solution = await self.revise(problem, solution, review_feedback['feedback'])
solution = solution.get('revised_solution')
solution = await self.revise(problem, solution, review_feedback["feedback"])
solution = solution.get("revised_solution")
return solution
class Gsm8kGraph(Graph):
def __init__(self, name:str, llm: LLM) -> None:
def __init__(self, name: str, llm: LLM) -> None:
super().__init__(name, llm)
self.generate = Generate(llm=llm)
self.rephrase = Rephrase(llm=llm)
async def __call__(self, problem:str):
async def __call__(self, problem: str):
solution = self.generate(problem)
return solution
class HotpotQAGraph(Graph):
def __init__(self, name: str, llm: LLM) -> None:
super().__init__(name, llm)
self.generate = Generate(llm=llm)
self.rephrase = Rephrase(llm=llm)
async def __call__(self, problem: str):
solution = self.generate(problem)
return solution
# async def __call__(self, problem:str):
# 这个地方没有修改对应的prompt可以对应着humaneval改一下
# problem = await self.rephrase(problem)
# solution = self.generate(problem)
# return solution

View file

@ -3,30 +3,60 @@
# @Author : didi
# @Desc : operator demo of ags
import ast
import random
import sys
import traceback
import random
from typing import List, Tuple, Any, Dict
from collections import Counter
from typing import Dict, List, Tuple
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from tenacity import retry, stop_after_attempt
from examples.ags.w_action_node.operator_an import GenerateOp, GenerateCodeOp, GenerateCodeBlockOp ,ReviewOp, ReviseOp, FuEnsembleOp, MdEnsembleOp, ReflectionTestOp, RephraseOp
from examples.ags.w_action_node.prompt import GENERATE_PROMPT, GENERATE_CODE_PROMPT, GENERATE_CODEBLOCK_PROMPT, REVIEW_PROMPT, REVISE_PROMPT, FU_ENSEMBLE_PROMPT, MD_ENSEMBLE_PROMPT, REFLECTION_ON_PUBILIC_TEST_PROMPT, REPHRASE_ON_PROBLEM_PROMPT, GENERATE_CODEBLOCK_REPHRASE_PROMPT
from examples.ags.w_action_node.prompt import DE_ENSEMBLE_CODE_FORMAT_PROMPT, DE_ENSEMBLE_TXT_FORMAT_PROMPT, DE_ENSEMBLE_ANGEL_PROMPT, DE_ENSEMBLE_DEVIL_PROMPT, DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT, DE_ENSEMBLE_JUDGE_FINAL_PROMPT
from examples.ags.w_action_node.operator_an import (
FuEnsembleOp,
GenerateCodeBlockOp,
GenerateCodeOp,
GenerateOp,
MdEnsembleOp,
ReflectionTestOp,
RephraseOp,
ReviewOp,
ReviseOp,
)
from examples.ags.w_action_node.prompt import (
DE_ENSEMBLE_ANGEL_PROMPT,
DE_ENSEMBLE_CODE_FORMAT_PROMPT,
DE_ENSEMBLE_DEVIL_PROMPT,
DE_ENSEMBLE_JUDGE_FINAL_PROMPT,
DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT,
DE_ENSEMBLE_TXT_FORMAT_PROMPT,
FU_ENSEMBLE_PROMPT,
GENERATE_CODE_PROMPT,
GENERATE_CODEBLOCK_PROMPT,
GENERATE_CODEBLOCK_REPHRASE_PROMPT,
GENERATE_PROMPT,
MD_ENSEMBLE_PROMPT,
REFLECTION_ON_PUBLIC_TEST_PROMPT,
REPHRASE_ON_PROBLEM_PROMPT,
REVIEW_PROMPT,
REVISE_PROMPT,
)
from examples.ags.w_action_node.utils import test_cases_2_test_functions
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
class Operator:
def __init__(self, name, llm:LLM):
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Generate(Operator):
def __init__(self, name:str ="Generator", llm: LLM = LLM()):
def __init__(self, name: str = "Generate", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description):
@ -34,10 +64,10 @@ class Generate(Operator):
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class GenerateCode(Operator):
def __init__(self, name:str ="Coder", llm: LLM = LLM()):
class GenerateCode(Operator):
def __init__(self, name: str = "GenerateCode", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description):
@ -45,39 +75,49 @@ class GenerateCode(Operator):
node = await ActionNode.from_pydantic(GenerateCodeOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class GenerateCodeBlock(Operator):
def __init__(self, name:str ="Coder", llm: LLM = LLM()):
class GenerateCodeBlock(Operator):
def __init__(self, name: str = "GenerateCodeBlock", llm: LLM = LLM()):
super().__init__(name, llm)
@retry(stop=stop_after_attempt(3))
async def __call__(self, problem_description, function_name):
prompt = GENERATE_CODEBLOCK_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(context=prompt, llm=self.llm, mode='code_fill',function_name=function_name)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(
context=prompt, llm=self.llm, mode="code_fill", function_name=function_name
)
response = node.instruct_content.model_dump()
return response
@retry(stop=stop_after_attempt(3))
async def rephrase_generate(self, problem_description, rephrase_problem, function_name):
prompt = GENERATE_CODEBLOCK_REPHRASE_PROMPT.format(problem_description=problem_description,rephrase_problem=rephrase_problem)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(context=prompt, llm=self.llm, mode='code_fill', function_name=function_name)
prompt = GENERATE_CODEBLOCK_REPHRASE_PROMPT.format(
problem_description=problem_description, rephrase_problem=rephrase_problem
)
node = await ActionNode.from_pydantic(GenerateCodeBlockOp).fill(
context=prompt, llm=self.llm, mode="code_fill", function_name=function_name
)
response = node.instruct_content.model_dump()
return response
class Review(Operator):
def __init__(self, criteria, name:str ="Reviewer", llm: LLM = LLM()):
def __init__(self, criteria, name: str = "Review", llm: LLM = LLM()):
self.criteria = criteria
super().__init__(name, llm)
async def __call__(self, problem_description, solution):
prompt = REVIEW_PROMPT.format(problem_description=problem_description, solution=solution, criteria=self.criteria)
prompt = REVIEW_PROMPT.format(
problem_description=problem_description, solution=solution, criteria=self.criteria
)
node = await ActionNode.from_pydantic(ReviewOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class Revise(Operator):
def __init__(self, name:str ="Reviser", llm: LLM = LLM()):
class Revise(Operator):
def __init__(self, name: str = "Revise", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description, solution, feedback):
@ -86,12 +126,16 @@ class Revise(Operator):
response = node.instruct_content.model_dump()
return response
class FuEnsemble(Operator):
def __init__(self, name:str ="FuseEnsembler", llm: LLM = LLM()):
class FuEnsemble(Operator):
"""
Function: Critically evaluating multiple solution candidates, synthesizing their strengths, and developing an enhanced, integrated solution.
"""
def __init__(self, name: str = "FuEnsemble", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, solutions:List, problem_description):
async def __call__(self, solutions: List, problem_description):
solution_text = ""
for solution in solutions:
solution_text += str(solution) + "\n"
@ -99,16 +143,18 @@ class FuEnsemble(Operator):
node = await ActionNode.from_pydantic(FuEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response
class MdEnsemble(Operator):
"""
MedPrompt
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, name:str ="MedEnsembler", llm: LLM = LLM(), vote_count:int=3):
def __init__(self, name: str = "MdEnsemble", llm: LLM = LLM(), vote_count: int = 3):
super().__init__(name, llm)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
@ -116,12 +162,10 @@ class MdEnsemble(Operator):
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solution_type:str ,solutions:List[str], problem_description:str):
print(solutions)
async def __call__(self, solution_type: str, solutions: List[str], problem_description: str):
all_responses = []
# 如果Solution方案是Code我们利用AST去重
# 当Ensmeble方案是Code类型时我们使用AST进行去重
if solution_type == "code":
original_length = len(solutions)
unique_structures = {}
updated_solutions = []
@ -129,72 +173,63 @@ class MdEnsemble(Operator):
try:
tree = ast.parse(solution)
structure_key = ast.dump(tree, annotate_fields=False, include_attributes=False)
if structure_key not in unique_structures:
unique_structures[structure_key] = solution
updated_solutions.append(solution)
except SyntaxError:
# If the solution has a syntax error, we'll skip it
print("here",solution)
continue
solutions = updated_solutions
updated_length = len(solutions)
# print(f"Original number of solutions: {original_length}")
# print(f"Updated number of solutions: {updated_length}")
if updated_length == 1:
return {"final_solution": solutions[0]}
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, problem_description=problem_description)
node = await ActionNode.from_pydantic(MdEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get('solution_letter', '')
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
print(f"original index: {original_index}")
# print(f"original index: {original_index}")
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
print(f"most frequent_index: {most_frequent_index}")
final_answer = solutions[most_frequent_index]
print(f"final answer: \n{final_answer}")
# final_answer, frequency = self.most_frequent(all_responses)
return {"final_solution": final_answer}
class ScEnsemble(Operator):
"""
self consistency ensemble
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
"""
# ScEnsemble 的构建相对好做一点 30分钟左右
pass
class DbEnsemble(Operator):
class MADEnsemble(Operator):
"""
(Should we be going MAD? A Look at Multi-Agent Debate Strategies for LLMs)
The system is a multi-round debate system where each agent is given the
question and responses generated by all agents. For each round, a judge
analyzes the responses provided determines whether to terminate the
debate or keep going. At the end of the debate the judge is also responsible
for determining the final answer.
Paper: Should we be going MAD? A Look at Multi-Agent Debate Strategies for LLMs
Link: https://arxiv.org/abs/2311.17371
"""
def __init__(self, name:str ="DebateEnsemble", llm: LLM = LLM()):
def __init__(self, name: str = "DebateEnsemble", llm: LLM = LLM()):
super().__init__(name, llm)
self.agents = ["angel","devil","judge"]
self.format_requirements = {
"txt":DE_ENSEMBLE_TXT_FORMAT_PROMPT,
"code":DE_ENSEMBLE_CODE_FORMAT_PROMPT
}
def get_system_prompt(self, name:str, mode:str='txt'):
self.agents = ["angel", "devil", "judge"]
self.format_requirements = {"txt": DE_ENSEMBLE_TXT_FORMAT_PROMPT, "code": DE_ENSEMBLE_CODE_FORMAT_PROMPT}
def get_system_prompt(self, name: str, mode: str = "txt"):
if name == "angel":
if mode == "code":
return DE_ENSEMBLE_ANGEL_PROMPT + "\n" + DE_ENSEMBLE_CODE_FORMAT_PROMPT
@ -205,10 +240,10 @@ class DbEnsemble(Operator):
return DE_ENSEMBLE_DEVIL_PROMPT + "\n" + DE_ENSEMBLE_TXT_FORMAT_PROMPT
elif name == "judge":
if mode == "final":
return DE_ENSEMBLE_JUDGE_FINAL_PROMPT
return DE_ENSEMBLE_JUDGE_FINAL_PROMPT
return DE_ENSEMBLE_JUDGE_UNIVERSAL_PROMPT
def construct_messages(self, message_history_with_name, name, mode:str="txt", phase:str="universal"):
def construct_messages(self, message_history_with_name, name, mode: str = "txt", phase: str = "universal"):
"""
基于name与mode来构建system message.
基于name来构建messages
@ -221,67 +256,63 @@ class DbEnsemble(Operator):
elif name == "judge":
messages = self._construct_judge(message_history_with_name, mode, messages)
return messages
def _construct_debate(self, message_history_with_name, name, messages):
user_message = ""
for message in message_history_with_name:
if message["name"] == "Judge":
continue
elif message["name"] == name:
if user_message:
messages.append({
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
})
messages.append({
"role": "assistant",
"name": name,
"content": message["content"],
})
messages.append(
{
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
}
)
messages.append(
{
"role": "assistant",
"name": name,
"content": message["content"],
}
)
user_message = ""
else:
user_message += message["content"]
if user_message:
messages.append({
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
})
messages.append(
{
"role": "user",
"name": "user",
"content": user_message.strip("\n"),
}
)
return messages
def _construct_judge(self, message_history_with_name, mode, messages):
pass
async def debate_answer(self, message_history:List, role:str="angel"):
async def debate_answer(self, message_history: List, role: str = "angel"):
messages = self.construct_messages(message_history, role)
response = await self.llm.acompletion_text(messages=messages)
message_history.append({
"role":"user",
"name":role,
"content":response}
)
message_history.append({"role": "user", "name": role, "content": response})
return message_history, response
async def judge_answer(self, message_history:List, phase:str="universal"):
async def judge_answer(self, message_history: List, phase: str = "universal"):
messages = self.construct_messages(message_history, "judge", phase=phase)
response = await self.llm.acompletion_text(messages=messages)
message_history.append({
"role": "user",
"name": "judge",
"content": response}
)
message_history.append({"role": "user", "name": "judge", "content": response})
return message_history, response
async def __call__(self, origin_solution:str, problem_description:str, max_round:int = 3, mode:str='txt'):
async def __call__(self, origin_solution: str, problem_description: str, max_round: int = 3, mode: str = "txt"):
# 思路输入一个原始答案构建一个agent代表这个答案进行辩论另一个agentdevil使用debate llm的内容进行辩论法官在每一轮次做出决定是否终止到了maxround还没终止就由法官进行总结。
message_history_with_name = [
{"role":"user", "name":"angel", "content":origin_solution}
]
message_history_with_name = [{"role": "user", "name": "angel", "content": origin_solution}]
for index in range(max_round):
for agent in self.agents:
if agent == "angel":
@ -291,89 +322,108 @@ class DbEnsemble(Operator):
elif agent == "devil":
message_history_with_name, rsp = self.debate_answer(message_history_with_name, role="devil")
elif agent == "judge":
message_history_with_name, judge_result = self.judge_answer(message_history_with_name, phase="universal")
message_history_with_name, judge_result = self.judge_answer(
message_history_with_name, phase="universal"
)
if not judge_result["is_debating"]:
"""
这里需要在 self.judge_answer 中设置一个自动给出solution的地方
"""
return {"final_solution":judge_result["final_solution"]}
message_history_with_name.pop(-1)
message_history_with_name, judge_answer = self.judge_answer(message_history_with_name, phase="final")
return {"final_solution": judge_result["final_solution"]}
message_history_with_name.pop(-1)
message_history_with_name, judge_answer = self.judge_answer(message_history_with_name, phase="final")
return {"final_solution": judge_answer["debate_answer"]}
return {"final_solution":judge_answer["debate_answer"]}
class Rephrase(Operator):
"""
1. AlphaCodium
2. https://arxiv.org/abs/2404.14963
Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering
Link: https://arxiv.org/abs/2404.14963
Paper: Achieving >97% on GSM8K: Deeply Understanding the Problems Makes LLMs Better Solvers for Math Word Problems
Link: https://arxiv.org/abs/2404.14963
"""
def __init__(self, name:str ="Rephraser", llm: LLM = LLM()):
def __init__(self, name: str = "Rephrase", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, problem_description:str)->str:
async def __call__(self, problem_description: str) -> str:
prompt = REPHRASE_ON_PROBLEM_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(RephraseOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return response["rephrased_problem"]
class Test(Operator):
def __init__(self, name:str ="Tester", llm: LLM = LLM()):
def __init__(self, name: str = "Test", llm: LLM = LLM()):
super().__init__(name, llm)
def exec_code(self, solution, test_cases, problem_id):
# TODO 未来还要做修改,最好能做到一个样例一测
# TODO
# 1. 获取更加详细的Test error信息
# 2. 更换Public Test数据集当前使用的数据存在Label Leak(使用的Reflexion的数据集) -> 这个问题使用LLM抽取解决直接生成为assert代码串
# 3. 实现单独测试每一个test case -> 1
solution = solution["final_solution"]
test_code = test_cases_2_test_functions(solution, test_cases)
print("test_code", test_code)
try:
exec(test_code, globals())
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
with open("tester.txt", "a") as f:
f.write("test_error" +problem_id + "\n")
error_infomation = {"test_fail_case": {
"error_type": "AssertionError",
"error_message": str(e),
"traceback": tb_str
}}
print("error here", error_infomation)
f.write("test_error" + problem_id + "\n")
error_infomation = {
"test_fail_case": {"error_type": "AssertionError", "error_message": str(e), "traceback": tb_str}
}
logger.info(f"test error: {error_infomation}")
return error_infomation
except Exception as e:
with open("tester.txt", "a") as f:
f.write(problem_id + "\n")
return {"exec_fail_case":str(e)}
return {"exec_fail_case": str(e)}
return []
async def __call__(self, problem_id, problem, rephrase_problem, solution, test_cases):
result = self.exec_code(solution, test_cases, problem_id)
print("result here", result)
if result == []:
return solution
# 处理代码执行失败的代码
elif "exec_fail_case" in result:
result = result["exec_fail_case"]
prompt = REFLECTION_ON_PUBILIC_TEST_PROMPT.format(problem_description=problem, rephrase_problem=rephrase_problem, code_solution=solution, exec_pass=f"executed unsuccessfully, error: \n {result}", test_fail="executed unsucessfully")
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem_description=problem,
rephrase_problem=rephrase_problem,
code_solution=solution,
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return {"final_solution":response["refined_solution"]}
return {"final_solution": response["refined_solution"]}
else:
result = result["test_fail_case"]
prompt = REFLECTION_ON_PUBILIC_TEST_PROMPT.format(problem_description=problem, rephrase_problem=rephrase_problem, code_solution=solution, exec_pass="executed successfully", test_fail=result)
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem_description=problem,
rephrase_problem=rephrase_problem,
code_solution=solution,
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
return {"final_solution":response["refined_solution"]}
return {"final_solution": response["refined_solution"]}
class FindFact(Operator):
pass
def __init__(self, name: str = "FindFact", llm: LLM = LLM()):
super().__init__(name, llm)
class SelfAsk(Operator):
pass
def __init__(self, name: str = "SelfAsk", llm: LLM = LLM()):
super().__init__(name, llm)
class Verify(Operator):
"""
? 还没有想好
"""
pass
def __init__(self, name: str = "Verify", llm: LLM = LLM()):
super().__init__(name, llm)

View file

@ -5,26 +5,42 @@
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Your Solution for this problem")
class GenerateCodeOp(BaseModel):
code_solution: str = Field(default="", description="Complete and correct code here.")
class GenerateCodeBlockOp(BaseModel):
code_solution: str = Field(default="", description="Your complete code solution for this problem")
class ReviewOp(BaseModel):
review_result: bool = Field(default=False, description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'")
feedback: str = Field(default="", description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.")
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
class ReviseOp(BaseModel):
revised_solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
class FuEnsembleOp(BaseModel):
thought: str = Field(default="", description="Analyze the solutions and think how to combine the advantages of various solutions to form the best possible solution.")
thought: str = Field(
default="",
description="Analyze the solutions and think how to combine the advantages of various solutions to form the best possible solution.",
)
final_solution: str = Field(default="", description="Output the final solution after analysis and integration")
class MdEnsembleOp(BaseModel):
thought: str = Field(
default="""Example thought process:
@ -35,22 +51,30 @@ class MdEnsembleOp(BaseModel):
5. The use of 'isinstance' for type checking is a good practice.
6. The function handles decimal separators well by replacing ',' with '.'.
Overall, this solution effectively solves the problem of comparing two values, with good error handling and flexibility. It could be improved by specifying behavior for equal values, but it's a strong solution as is.""",
description="Step-by-step analysis of the solutions to determine the best one."
)
solution_letter: str = Field(
default="",
description="The letter of the chosen best solution (only one letter)."
description="Step-by-step analysis of the solutions to determine the best one.",
)
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class TestCaseExtractOp(BaseModel):
test_cases: list = Field(default=[('<function name>', [5, 8, 7, 1], 12), ('<function name>', [3, 3, 3, 3, 3], 9)],
description="Extracted test cases from the problem description")
test_cases: list = Field(
default=[
"assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True",
"assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False",
"",
],
description="Extracted test cases from the problem description",
)
class RephraseOp(BaseModel):
rephrased_problem: str = Field(default="", description="Rephrased problem description for this problem")
class ReflectionTestOp(BaseModel):
reflection: str = Field(default="", description="对关于代码执行错误或者测试用例失败step by step的思考")
refined_solution: str = Field(default="", description="对于代码执行错误或者测试用例失败的修正方案")
reflection: str = Field(
default="", description="Step-by-step reflection on code execution errors or test case failures"
)
refined_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)

View file

@ -7,45 +7,22 @@ GENERATE_PROMPT = """
Generate Solution for the following problem: {problem_description}
"""
# GENERATE_CODE_PROMPT = """
# Below is an instruction that describes a task, paired with an input that provides further context.
# Write a response that appropriately completes the request.
# ### Instruction:
# Write a program to perform the given task.
# Input:
# {problem_description}
# ### Response:
# """
GENERATE_CODE_PROMPT = """
You are an expert programmer tasked with solving a coding problem.
### Problem Description:
### Problem Description
{problem_description}
### Instructions:
### Instructions
The above is an incomplete Python code fragment. Return the complete and correct code with no additional text.
Please maintain the JSON format in your response.
### Your Response:
### Your Response
"""
# GENERATE_CODEBLOCK_PROMPT = """
# You are an expert programmer tasked with solving a coding problem.
# ### Problem Description:
# {problem_description}
# ### Instructions:
# The above is an incomplete Python code fragment. Return the complete and correct code with no additional text.
# """
GENERATE_CODEBLOCK_REPHRASE_PROMPT = """
Please provide a self-contained Python script that solves the following problem in a markdown code block:
### Problem Description:
### Problem Description
{problem_description}
### self reflection on the problem
@ -58,12 +35,7 @@ When creating your solution:
4. Avoid adding additional test cases beyond those provided in the problem description.
"""
# GENERATE_CODEBLOCK_PROMPT = """
# Please provide a self-contained Python script that solves the following problem in a markdown code block:
# {problem_description}
# """
GENERATE_CODEBLOCK_PROMPT ="""
GENERATE_CODEBLOCK_PROMPT = """
Please provide a self-contained Python script that solves the following problem in a markdown code block:
{problem_description}
@ -127,10 +99,10 @@ Please strictly output in JSON format, do not output irrelevant content. """
DE_ENSEMBLE_CODE_FORMAT_PROMPT = """
Now please output your answer in json format, with the format as follows:
{{
"reason":"<为什么要这样做>",
"code_solution":"<你觉得合适的solution用代码表示出来>"
}}
{
"reason":"<why do it this way>",
"code_solution":"<the solution you think is appropriate, expressed in code>"
}
Please strictly output in JSON format, do not output irrelevant content. """
DE_ENSEMBLE_ANGEL_PROMPT = """
@ -159,18 +131,6 @@ You, as the moderator, will evaluate both sides' answers and determine if there
Please strictly output in JSON format, do not output irrelevant content
"""
EXTRACT_CASE_PROMPT = """
You are given a coding problem, and you need to extract the test cases from the problem description.
{problem_description}
一个problem中会有多个测试用例每个测试用例包含三个部分
1. 函数名
2. 输入
3. 期望输出
每个测试用例包裹在一个三元组之中三元组之间用逗号分隔整体用列表包裹
由于结果需要被解析到JSON中True与False请表示为true, false;
"""
REPHRASE_ON_PROBLEM_PROMPT = """
You are given a code contest problem:
@ -183,26 +143,26 @@ Reflect on the problem, and describe it in your own words, in bullet points. Pay
"""
REFLECTION_ON_PUBILIC_TEST_PROMPT = """
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
You are given a code contest problem, and a self-reflection on the problem:
### problem
{problem_description}
### self reflection on the problem
{rephrase_problem}
=======================
A Python code solution was generated for the problem:
### Code Solution
{code_solution}
=======================
This section of the code execution result is
### Execution Result
{exec_pass}
=======================
However, when running the following input example, the code solution above failed to produce the expected output:
#### Failed Test Case
{test_fail}
@ -210,4 +170,31 @@ However, when running the following input example, the code solution above faile
Your goal is to analyze the code solution and the error, and propose a fixed code which will produce the expected output for the provided test input.
The fixed code should keep the solution robust, and work for all other input examples as well.
Make sure the fixed code has a reasonable runtime - less than three seconds on a modern computer, given the problem constraints for large input.
"""
"""
EXTRACT_CASE_PROMPT = """
You are given a coding problem, and you need to extract the test cases from the problem description.
## Problem Description
{problem_description}
Your task is to extract test cases from the above description and convert them into Python assert statements (as strings). These statements should be returned in a list for testing purposes.
Example:
Input:
>>> has_close_elements([1.0, 2.0, 3.0], 0.5)
False
>>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
True
Output:
[
"assert candidate([1.0, 2.0, 3.0], 0.5) == False",
"assert candidate([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3) == True"
]
Please ensure that:
1. Each test case is converted to a separate assert statement.
2. The function name in the original example (e.g., 'has_close_elements') is replaced with 'candidate'.
3. The assert statements are returned as strings in a list.
"""

View file

@ -3,67 +3,42 @@
# @Author : didi
# @Desc : utils for experiment
import ast
import json
import re
from typing import List, Dict, Any, Tuple
from metagpt.llm import LLM
from metagpt.actions.action_node import ActionNode
from typing import Any, List, Tuple
from examples.ags.w_action_node.operator_an import TestCaseExtractOp
from examples.ags.w_action_node.prompt import EXTRACT_CASE_PROMPT
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
def extract_task_id(task_id: str) -> int:
"""Extract the numeric part of the task_id."""
match = re.search(r'/(\d+)', task_id)
match = re.search(r"/(\d+)", task_id)
return int(match.group(1)) if match else 0
def jsonl_ranker(input_file: str, output_file: str):
def sort_json_by_key(input_file: str, output_file: str, key: str = "task_id"):
"""
Read a JSONL file, sort the entries based on task_id, and write to a new JSONL file.
:param input_file: Path to the input JSONL file
:param output_file: Path to the output JSONL file
"""
# Read and parse the JSONL file
with open(input_file, 'r') as f:
with open(input_file, "r") as f:
data = [json.loads(line) for line in f]
# Sort the data based on the numeric part of task_id
sorted_data = sorted(data, key=lambda x: extract_task_id(x['task_id']))
sorted_data = sorted(data, key=lambda x: extract_task_id(x[key]))
# Write the sorted data to a new JSONL file
with open(output_file, 'w') as f:
with open(output_file, "w") as f:
for item in sorted_data:
f.write(json.dumps(item) + '\n')
f.write(json.dumps(item) + "\n")
# def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test.jsonl"):
# # TODO 这个JSONL效率有点神经病
# if problem_id == "Humaneval/87":
# return [ ["get_row", [[[1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 1, 6], [1, 2, 3, 4, 5, 1]], 1], [(0, 0), (1, 4), (1, 0), (2, 5), (2, 0)]], ["get_row", [[], 1], []], ["get_row", [[[], [1], [1, 2, 3]], 3], [(2, 2)]] ]
# elif problem_id == "Humaneval/95":
# return [ ["check_dict_case", [{"a": "apple", "b": "banana"}], True], ["check_dict_case", [{"a": "apple", "A": "banana", "B": "banana"}], False], ["check_dict_case", [{"a": "apple", "8": "banana", "a": "apple"}], False], ["check_dict_case", [{"Name": "John", "Age": "36", "City": "Houston"}], False], ["check_dict_case", [{"STATE": "NC", "ZIP": "12345"}], True] ]
# elif problem_id == "Humaneval/107":
# return [ ["even_odd_palindrome", [3], (1, 2)], ["even_odd_palindrome", [12], (4, 6)] ]
# elif problem_id == "Humaneval/112":
# return [ ["reverse_delete", ["abcde", "ae"], ("bcd", False)], ["reverse_delete", ["abcdef", "b"], ("acdef", False)], ["reverse_delete", ["abcdedcba", "ab"], ("cdedc", True)] ]
# elif problem_id == "Humaneval/127":
# return [ ["intersection", [(1, 2), (2, 3)], "NO"], ["intersection", [(-1, 1), (0, 4)], "NO"], ["intersection", [(-3, -1), (-5, 5)], "YES"] ]
# elif problem_id == "Humaneval/136":
# return [ ["largest_smallest_integers", [2, 4, 1, 3, 5, 7], (None, 1)], ["largest_smallest_integers", [], (None, None)], ["largest_smallest_integers", [0], (None, None)] ]
# elif problem_id == "Humaneval/148":
# return [ ["bf", ["Jupiter", "Neptune"], ("Saturn", "Uranus")], ["bf", ["Earth", "Mercury"], ("Venus",)], ["bf", ["Mercury", "Uranus"], ("Venus", "Earth", "Mars", "Jupiter", "Saturn")], ["bf", ["InvalidPlanet", "Neptune"], ()], ["bf", ["Jupiter", "InvalidPlanet"], ()], ["bf", ["Mercury", "Mercury"], ()] ]
# elif problem_id == "Humaneval/155":
# return [ ["even_odd_count", [-12], (1, 1)], ["even_odd_count", [123], (1, 2)] ]
# with open(file_path, 'r') as file:
# for line in file:
# data = json.loads(line)
# if problem_id in data:
# return data[problem_id]
# return None
import json
import ast
def parse_python_literal(s):
try:
@ -71,7 +46,8 @@ def parse_python_literal(s):
except (ValueError, SyntaxError):
return s
def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_reflexion.jsonl"):
def extract_test_cases_from_jsonl(problem_id: str, file_path: str = "public_test_reflexion.jsonl"):
# 保留原有的硬编码测试用例
hardcoded_cases = {
"HumanEval/32": "",
@ -84,7 +60,7 @@ def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_ref
return hardcoded_cases[problem_id]
# 如果没有硬编码的测试用例,从文件中读取
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
for line in file:
data = json.loads(line)
if data.get("id") == problem_id:
@ -92,106 +68,63 @@ def extract_test_cases_from_jsonl(problem_id:str, file_path:str="public_test_ref
return None # 如果没有找到问题,返回 None
def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]:
# 使用正则表达式匹配测试用例,现在捕获函数名和任意输出
pattern = r'>>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)'
pattern = r">>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)"
matches = re.findall(pattern, docstring, re.DOTALL)
test_cases = []
for match in matches:
func_name, input_str, expected_output = match
# 处理输入
input_list = []
for item in input_str.split(','):
for item in input_str.split(","):
item = item.strip()
try:
# 尝试将输入转换为数值类型
if '.' in item:
if "." in item:
input_list.append(float(item))
else:
input_list.append(int(item))
except ValueError:
# 如果无法转换为数值,则保留为字符串
input_list.append(item.strip("'\""))
# 处理输出
try:
# 尝试将输出转换为数值或布尔值
if expected_output.lower() == 'true':
if expected_output.lower() == "true":
expected_output = True
elif expected_output.lower() == 'false':
elif expected_output.lower() == "false":
expected_output = False
elif '.' in expected_output:
elif "." in expected_output:
expected_output = float(expected_output)
else:
expected_output = int(expected_output)
except ValueError:
# 如果无法转换,则保留为字符串
expected_output = expected_output.strip("'\"")
test_cases.append([func_name, input_list, expected_output])
return test_cases
async def llm_extract_test_case(id, problem_description: str, file_path:str="public_test.jsonl"):
async def llm_extract_test_case(id, problem_description: str, file_path: str = "public_test.jsonl"):
prompt = EXTRACT_CASE_PROMPT.format(problem_description=problem_description)
node = await ActionNode.from_pydantic(TestCaseExtractOp).fill(context=prompt, llm=LLM())
result = node.instruct_content.model_dump()
with open(file_path,"a") as f:
f.write(json.dumps({id:result["test_cases"]}) + '\n')
return {id:result["test_cases"]}
with open(file_path, "a") as f:
f.write(json.dumps({id: result["test_cases"]}) + "\n")
return {id: result["test_cases"]}
import json
# def test_cases_2_test_functions(solution: str, test_case: List):
# print("test_case", test_case)
# function_name = test_case[0]
# def format_param(param):
# if isinstance(param, str):
# return repr(param)
# elif isinstance(param, (int, float, bool)):
# return str(param)
# elif isinstance(param, list):
# return '[' + ', '.join(format_param(item) for item in param) + ']'
# elif isinstance(param, tuple):
# return '(' + ', '.join(format_param(item) for item in param) + ')'
# elif isinstance(param, dict):
# return '{' + ', '.join(f'{format_param(k)}: {format_param(v)}' for k, v in param.items()) + '}'
# elif isinstance(param, type(None)):
# return 'None'
# else:
# raise ValueError(f"Unsupported parameter type: {type(param)}")
# parameters = ', '.join(format_param(item) for item in test_case[1])
# print(test_case[1], parameters)
# expected_output = format_param(test_case[2])
# print(type(test_case[2]), test_case[2], expected_output)
# tester_function = f"""
# {solution}
# def check(candidate):
# assert candidate({parameters}) == {expected_output}
# check({function_name})
# """
# print(f"""
# Generated test function:
# {tester_function}
# """)
# return tester_function
def test_cases_2_test_functions(solution: str, test_cases: str):
tester_function = f"""
{solution}
{test_cases}
"""
return tester_function
"""
return tester_function

View file

@ -1,21 +1,19 @@
import asyncio
import json
from metagpt.llm import LLM
from evalplus.data import get_human_eval_plus, write_jsonl
from examples.ags.benchmark.humaneval import sample_generate, samples_generate, extract_failure_tests, automatic_evalplus
from examples.ags.w_action_node.utils import jsonl_ranker, llm_extract_test_case
from examples.ags.w_action_node.graph import HumanEvalGraph
from examples.ags.w_action_node.utils import extract_test_cases_from_jsonl
# 132 141 136 80 73
# asyncio.run(sample_generate('HumanEval/140',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/140',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/140',result_path="llm_based_1000.jsonl",mode="llm"))
from examples.ags.benchmark.humaneval import sample_generate, samples_generate
asyncio.run(sample_generate("HumanEval/0", result_path="llm_based_1000.jsonl", mode="llm"))
asyncio.run(samples_generate(mode="alpha_codium", result_path="alpha_based_1000.jsonl"))
# asyncio.run(sample_generate('HumanEval/140',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/140',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/67',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/108',result_path="llm_based_1000.jsonl",mode="llm"))
# asyncio.run(sample_generate('HumanEval/110',result_path="llm_based_1000.jsonl",mode="llm"))
asyncio.run(samples_generate(mode='alpha',result_path="alpha_based_104.jsonl"))
# jsonl_ranker("llm_based_137.jsonl", "llm_based_137.jsonl")
# asyncio.run(samples_generate(mode='alpha',result_path="alpha_based_108.jsonl"))
# sort_json_by_key("alpha_based_108.jsonl", "alpha_based_108.jsonl")
# 64 84 160 148 109
# result_path = "ags_based_6.jsonl"
# if automatic_evalplus(result_path):
# unpassed_exapmle = extract_failure_tests(result_path[:-6]+"_eval_results.json")
@ -27,9 +25,6 @@ asyncio.run(samples_generate(mode='alpha',result_path="alpha_based_104.jsonl"))
# for example in failure_list:
# asyncio.run(sample_generate(example))
# TODO 抽取Public Test没搞完先用几个测试跑一下流程
# from evalplus.data import get_human_eval_plus
# id_list = [87, 95, 107, 112, 127, 136, 148, 155]
# id_list = [155]
# cases_id = [f"HumanEval/{case_id}" for case_id in id_list]
@ -52,6 +47,6 @@ asyncio.run(samples_generate(mode='alpha',result_path="alpha_based_104.jsonl"))
# solver = HumanEvalGraph(name="solver", llm=LLM(), criteria='correctness, efficiency, readability', vote_count=1)
# result = asyncio.run(solver.alpha_codium(problem_id="HumanEval/140", problem=case_prompt, ensemble_count=1))
# 1. Public Test 数据集不对
# 1. Public Test 数据集不对
# 2. 修改两个Prompt的具体内容
# 3. 尝试增加Test错误之后的修改能力
# 3. 尝试增加Test错误之后的修改能力

File diff suppressed because it is too large Load diff

View file

@ -38,6 +38,7 @@ class ReviseMode(Enum):
TAG = "CONTENT"
MODE_CODE_FILL = "code_fill"
LANGUAGE_CONSTRAINT = "Language: Please use the same language as Human INPUT."
FORMAT_CONSTRAINT = f"Format: output wrapped inside [{TAG}][/{TAG}] like format example, nothing else."
@ -149,8 +150,6 @@ class ActionNode:
prevs: List["ActionNode"] # previous nodes
nexts: List["ActionNode"] # next nodes
MODE_CODE_FILL = "code_fill"
def __init__(
self,
key: str,
@ -474,53 +473,26 @@ class ActionNode:
"""
model_class = self.create_class()
fields = model_class.model_fields
# Assuming there's only one field in the model
if len(fields) == 1:
return next(iter(fields))
# If there are multiple fields, we might want to use self.key to find the right one
return self.key
async def code_fill(
self,
context,
function_name=None,
timeout=USE_CONFIG_TIMEOUT
):
async def code_fill(self, context, function_name=None, timeout=USE_CONFIG_TIMEOUT):
"""
fill CodeBlock Node
"""
def extract_code_from_response(response):
"""
Extracts code wrapped in triple backticks from the response,
removing any language specifier.
:param response: The full response from the LLM
:return: The extracted code, or None if no code is found
"""
code_pattern = r"```(?:\w+\n)?([\s\S]*?)```"
matches = re.findall(code_pattern, response)
if matches:
# The first group in the regex contains the code without the language specifier
code = matches[0].strip()
return code
return None
import re
field_name = self.get_field_name()
prompt = context
# print("generate prompt", "\n", prompt)
content = await self.llm.aask(prompt, timeout=timeout)
# print("generate content", "\n", content)
extracted_code = sanitize(code=content, entrypoint=function_name)
# extracted_code = extract_code_from_response(content)
result = {field_name: extracted_code}
# print("final_result", "\n", result)
return result
async def messages_fill(
self,
):
@ -540,7 +512,7 @@ class ActionNode:
images: Optional[Union[str, list[str]]] = None,
timeout=USE_CONFIG_TIMEOUT,
exclude=[],
function_name: str = None
function_name: str = None,
):
"""Fill the node(s) with mode.

View file

@ -4,28 +4,35 @@
@Time : 2024/7/24 16:37
@Author : didi
@File : code_node.py
@Acknowledgement https://github.com/evalplus/evalplus/blob/master/evalplus/sanitize.py
"""
import os
import ast
import pathlib
import traceback
from enum import Enum
from typing import Dict, Generator, List, Optional, Set, Tuple
import tree_sitter_python
from tqdm import tqdm
from tree_sitter import Language, Node, Parser
CLASS_TYPE = "class_definition"
FUNCTION_TYPE = "function_definition"
IMPORT_TYPE = ["import_statement", "import_from_statement"]
IDENTIFIER_TYPE = "identifier"
ATTRIBUTE_TYPE = "attribute"
RETURN_TYPE = "return_statement"
EXPRESSION_TYPE = "expression_statement"
ASSIGNMENT_TYPE = "assignment"
class NodeType(Enum):
CLASS = "class_definition"
FUNCTION = "function_definition"
IMPORT = ["import_statement", "import_from_statement"]
IDENTIFIER = "identifier"
ATTRIBUTE = "attribute"
RETURN = "return_statement"
EXPRESSION = "expression_statement"
ASSIGNMENT = "assignment"
def traverse_tree(node: Node) -> Generator[Node, None, None]:
"""
Traverse the tree structure starting from the given node.
:param node: The root node to start the traversal from.
:return: A generator object that yields nodes in the tree.
"""
cursor = node.walk()
depth = 0
@ -43,6 +50,7 @@ def traverse_tree(node: Node) -> Generator[Node, None, None]:
else:
depth -= 1
def syntax_check(code, verbose=False):
try:
ast.parse(code)
@ -52,6 +60,7 @@ def syntax_check(code, verbose=False):
traceback.print_exc()
return False
def code_extract(text: str) -> str:
lines = text.split("\n")
longest_line_pair = (0, 0)
@ -68,22 +77,25 @@ def code_extract(text: str) -> str:
return "\n".join(lines[longest_line_pair[0] : longest_line_pair[1] + 1])
def get_definition_name(node: Node) -> str:
for child in node.children:
if child.type == IDENTIFIER_TYPE:
if child.type == NodeType.IDENTIFIER.value:
return child.text.decode("utf8")
def has_return_statement(node: Node) -> bool:
traverse_nodes = traverse_tree(node)
for node in traverse_nodes:
if node.type == RETURN_TYPE:
if node.type == NodeType.RETURN.value:
return True
return False
def get_deps(nodes: List[Tuple[str, Node]]) -> Dict[str, Set[str]]:
def dfs_get_deps(node: Node, deps: Set[str]) -> None:
for child in node.children:
if child.type == IDENTIFIER_TYPE:
if child.type == NodeType.IDENTIFIER.value:
deps.add(child.text.decode("utf8"))
else:
dfs_get_deps(child, deps)
@ -104,12 +116,23 @@ def get_function_dependency(entrypoint: str, call_graph: Dict[str, str]) -> Set[
if current not in call_graph:
continue
for neighbour in call_graph[current]:
if not (neighbour in visited):
if neighbour not in visited:
visited.add(neighbour)
queue.append(neighbour)
return visited
def sanitize(code: str, entrypoint: Optional[str] = None) -> str:
"""
Sanitize and extract relevant parts of the given Python code.
This function parses the input code, extracts import statements, class and function definitions,
and variable assignments. If an entrypoint is provided, it only includes definitions that are
reachable from the entrypoint in the call graph.
:param code: The input Python code as a string.
:param entrypoint: Optional name of a function to use as the entrypoint for dependency analysis.
:return: A sanitized version of the input code, containing only relevant parts.
"""
code = code_extract(code)
code_bytes = bytes(code, "utf8")
parser = Parser(Language(tree_sitter_python.language()))
@ -123,30 +146,24 @@ def sanitize(code: str, entrypoint: Optional[str] = None) -> str:
definition_nodes = []
for child in root_node.children:
if child.type in IMPORT_TYPE:
if child.type in NodeType.IMPORT.value:
import_nodes.append(child)
elif child.type == CLASS_TYPE:
elif child.type == NodeType.CLASS.value:
name = get_definition_name(child)
if not (
name in class_names or name in variable_names or name in function_names
):
if not (name in class_names or name in variable_names or name in function_names):
definition_nodes.append((name, child))
class_names.add(name)
elif child.type == FUNCTION_TYPE:
elif child.type == NodeType.FUNCTION.value:
name = get_definition_name(child)
if not (
name in function_names or name in variable_names or name in class_names
) and has_return_statement(child):
if not (name in function_names or name in variable_names or name in class_names) and has_return_statement(
child
):
definition_nodes.append((name, child))
function_names.add(get_definition_name(child))
elif (
child.type == EXPRESSION_TYPE and child.children[0].type == ASSIGNMENT_TYPE
):
elif child.type == NodeType.EXPRESSION.value and child.children[0].type == NodeType.ASSIGNMENT.value:
subchild = child.children[0]
name = get_definition_name(subchild)
if not (
name in variable_names or name in function_names or name in class_names
):
if not (name in variable_names or name in function_names or name in class_names):
definition_nodes.append((name, subchild))
variable_names.add(name)
@ -161,7 +178,7 @@ def sanitize(code: str, entrypoint: Optional[str] = None) -> str:
for pair in definition_nodes:
name, node = pair
if entrypoint and not (name in reacheable):
if entrypoint and name not in reacheable:
continue
sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n"
return sanitized_output[:-1].decode("utf8")

View file

@ -581,6 +581,31 @@ def write_json_file(json_file: str, data: list, encoding: str = None, indent: in
json.dump(data, fout, ensure_ascii=False, indent=indent, default=to_jsonable_python)
def read_jsonl_file(jsonl_file: str, encoding="utf-8") -> list[dict]:
if not Path(jsonl_file).exists():
raise FileNotFoundError(f"json_file: {jsonl_file} not exist, return []")
datas = []
with open(jsonl_file, "r", encoding=encoding) as fin:
try:
for line in fin:
data = json.loads(line)
datas.append(data)
except Exception:
raise ValueError(f"read jsonl file: {jsonl_file} failed")
return datas
def add_jsonl_file(jsonl_file: str, data: list[dict], encoding: str = None, indent: int = 4):
folder_path = Path(jsonl_file).parent
if not folder_path.exists():
folder_path.mkdir(parents=True, exist_ok=True)
with open(jsonl_file, "a", encoding=encoding) as fout:
for json_item in data:
json_str = json.dumps(json_item, indent=indent)
fout.write(json_str + "\n")
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
"""
Reads in a csv file to a list of list. If header is True, it returns a