Update baseline and benchmark; update evaluator

This commit is contained in:
didi 2024-09-22 15:46:50 +08:00
parent 63f3f884c9
commit 22e8f9d7fc
40 changed files with 2393 additions and 485 deletions

View file

@ -4,6 +4,7 @@ import pandas as pd
import string
import re
from typing import List, Tuple, Callable, Dict, Any, Set, Union
from collections import Counter
import numpy as np
from scipy.optimize import linear_sum_assignment
from tqdm.asyncio import tqdm_asyncio
@ -13,213 +14,69 @@ from examples.ags.benchmark.utils import generate_random_indices
global cost
cost = 0
def _remove_articles(text: str) -> str:
regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
return re.sub(regex, " ", text)
def _white_space_fix(text: str) -> str:
return " ".join(text.split())
EXCLUDE = set(string.punctuation)
def _is_number(text: str) -> bool:
def is_number(text: str) -> bool:
try:
float(text)
return True
except ValueError:
return False
def _normalize_number(text: str) -> str:
if _is_number(text):
return str(float(text))
else:
return text
def _remove_punc(text: str) -> str:
if not _is_number(text):
return "".join(ch for ch in text if ch not in EXCLUDE)
else:
return text
def _lower(text: str) -> str:
return text.lower()
def _tokenize(text: str) -> List[str]:
return re.split(" |-", text)
def _normalize_answer(text: str) -> str:
"""Lower text and remove punctuation, articles and extra whitespace."""
parts = [
_white_space_fix(_remove_articles(_normalize_number(_remove_punc(_lower(token)))))
for token in _tokenize(text)
]
parts = [part for part in parts if part.strip()]
normalized = " ".join(parts).strip()
return normalized
def _answer_to_bags(
answer: Union[str, List[str], Tuple[str, ...]]
) -> Tuple[List[str], List[Set[str]]]:
if isinstance(answer, (list, tuple)):
raw_spans = answer
else:
raw_spans = [answer]
normalized_spans: List[str] = []
token_bags = []
for raw_span in raw_spans:
normalized_span = _normalize_answer(raw_span)
normalized_spans.append(normalized_span)
token_bags.append(set(normalized_span.split()))
return normalized_spans, token_bags
def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]:
def normalize_answer(s):
"""
Takes gold and predicted answer sets and first finds the optimal 1-1 alignment
between them and gets maximum metric values over all the answers.
Normalize answers for evaluation.
"""
scores = np.zeros([len(gold), len(predicted)])
for gold_index, gold_item in enumerate(gold):
for pred_index, pred_item in enumerate(predicted):
if _match_numbers_if_present(gold_item, pred_item):
scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item)
row_ind, col_ind = linear_sum_assignment(-scores)
max_scores = np.zeros([max(len(gold), len(predicted))])
for row, column in zip(row_ind, col_ind):
max_scores[row] = max(max_scores[row], scores[row, column])
return max_scores
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float:
intersection = len(gold_bag.intersection(predicted_bag))
if not predicted_bag:
precision = 1.0
else:
precision = intersection / float(len(predicted_bag))
if not gold_bag:
recall = 1.0
else:
recall = intersection / float(len(gold_bag))
f1 = (
(2 * precision * recall) / (precision + recall)
if not (precision == 0.0 and recall == 0.0)
else 0.0
)
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def compute_f1_score(prediction, ground_truth):
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = normalize_answer(prediction).split()
ground_truth_tokens = normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
def _match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool:
gold_numbers = set()
predicted_numbers = set()
for word in gold_bag:
if _is_number(word):
gold_numbers.add(word)
for word in predicted_bag:
if _is_number(word):
predicted_numbers.add(word)
if (not gold_numbers) or gold_numbers.intersection(predicted_numbers):
return True
return False
# def fuzzy_match(s1: str, s2: str) -> bool:
# s1 = normalize(s1)
# s2 = normalize(s2)
def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float:
intersection = len(gold_bag.intersection(predicted_bag))
if not predicted_bag:
precision = 1.0
else:
precision = intersection / float(len(predicted_bag))
if not gold_bag:
recall = 1.0
else:
recall = intersection / float(len(gold_bag))
f1 = (
(2 * precision * recall) / (precision + recall)
if not (precision == 0.0 and recall == 0.0)
else 0.0
)
return f1
# if s1 == "" or s2 == "":
# return s1 == s2
def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]:
"""
Takes gold and predicted answer sets and first finds the optimal 1-1 alignment
between them and gets maximum metric values over all the answers.
"""
scores = np.zeros([len(gold), len(predicted)])
for gold_index, gold_item in enumerate(gold):
for pred_index, pred_item in enumerate(predicted):
if _match_numbers_if_present(gold_item, pred_item):
scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item)
row_ind, col_ind = linear_sum_assignment(-scores)
# return s1 in s2 or s2 in s1
max_scores = np.zeros([max(len(gold), len(predicted))])
for row, column in zip(row_ind, col_ind):
max_scores[row] = max(max_scores[row], scores[row, column])
return max_scores
def get_metrics(
predicted: Union[str, List[str], Tuple[str, ...]], gold: Union[str, List[str], Tuple[str, ...]]
) -> Tuple[float, float]:
"""
Takes a predicted answer and a gold answer (that are both either a string or a list of
strings), and returns exact match and the DROP F1 metric for the prediction. If you are
writing a script for evaluating objects in memory (say, the output of predictions during
validation, or while training), this is the function you want to call, after using
:func:`answer_json_to_strings` when reading the gold answer from the released data file.
"""
predicted_bags = _answer_to_bags(predicted)
gold_bags = _answer_to_bags(gold)
# def drop_metric(sample: str, reference: list[str]) -> Tuple[float, float]:
# em_scores = []
# f1_scores = []
# for answer in reference:
# if answer.strip() != "":
# em, f1 = get_drop_metrics(sample, answer)
# em_scores.append(em)
# f1_scores.append(f1)
# return (max(em_scores), max(f1_scores))
if set(predicted_bags[0]) == set(gold_bags[0]) and len(predicted_bags[0]) == len(gold_bags[0]):
exact_match = 1.0
else:
exact_match = 0.0
f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1])
f1 = np.mean(f1_per_bag)
f1 = round(f1, 2)
return exact_match, f1
def answer_json_to_strings(answer: Dict[str, Any]) -> Tuple[Tuple[str, ...], str]:
"""
Takes an answer JSON blob from the DROP data release and converts it into strings used for
evaluation.
"""
if "number" in answer and answer["number"]:
return tuple([str(answer["number"])]), "number"
elif "spans" in answer and answer["spans"]:
return tuple(answer["spans"]), "span" if len(answer["spans"]) == 1 else "spans"
elif "date" in answer:
return (
tuple(
[
"{0} {1} {2}".format(
answer["date"]["day"], answer["date"]["month"], answer["date"]["year"]
)
]
),
"date",
)
else:
raise ValueError(
f"Answer type not found, should be one of number, spans or date at: {json.dumps(answer)}"
)
def load_data(file_path: str, samples: int, test=False) -> List[Tuple[str, Dict[str, Any]]]:
with open(file_path, mode="r") as file:
data = json.load(file)
data = list(data.items())
random_indices = generate_random_indices(len(data), samples, test)
data = [data[i] for i in random_indices]
return data
async def evaluate_problem(question: str, passage: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]:
async def evaluate_problem(inputs: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]:
max_retries = 5
retries = 0
@ -227,21 +84,26 @@ async def evaluate_problem(question: str, passage: str, answers: List[Dict[str,
while retries < max_retries:
try:
global cost
prediction, cost = await graph(question, passage)
prediction, cost = await graph(inputs)
max_score = 0.0
max_type = None
best_answer = None
f1_scores = []
for answer in answers:
golden_answer, golden_type = answer_json_to_strings(answer)
_, f1_score = get_metrics(prediction, golden_answer)
if golden_answer[0].strip() != "":
max_score = max(max_score, f1_score)
if max_score == f1_score:
max_type = golden_type
best_answer = golden_answer
if answer.strip() != "":
f1_score = compute_f1_score(prediction, answer)
f1_scores.append(f1_score)
max_score = max(f1_scores)
# matches = [
# fuzzy_match(prediction, answer)
# for answer in answers
# ]
# score = True in matches
score = max_score
break
except Exception as e:
@ -250,35 +112,30 @@ async def evaluate_problem(question: str, passage: str, answers: List[Dict[str,
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
best_answer = None
prediction = None
max_score = 0.0
score = 0.0
break
return best_answer, prediction, max_score
return prediction, score
async def evaluate_all_passages(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]:
async def evaluate_all_questions(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]:
semaphore = asyncio.Semaphore(max_concurrent_tasks)
results = []
async def sem_evaluate(id: str, annotation: Dict[str, Any]):
async def sem_evaluate(annotation: Dict[str, Any]):
async with semaphore:
passage = annotation["passage"]
for qa_pair in annotation["qa_pairs"]:
question = qa_pair["question"]
answers = [qa_pair["answer"]]
if "validated_answers" in qa_pair and qa_pair["validated_answers"]:
answers += qa_pair["validated_answers"]
best_answer, prediction, score = await evaluate_problem(question, passage, answers, graph)
results.append([id, question, prediction, best_answer, score])
inputs = annotation["context"]
answers = annotation["targets"]
prediction, score = await evaluate_problem(inputs, answers, graph)
results.append([annotation["id"], prediction, answers, score])
tasks = [sem_evaluate(id, annotation) for id, annotation in annotations]
tasks = [sem_evaluate(annotation) for annotation in annotations]
await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP passages", total=len(annotations))
return results
def save_results_to_csv(results: List[List[Any]], path: str) -> float:
df = pd.DataFrame(results, columns=["id", "question", "prediction", "best_answer", "score"])
df = pd.DataFrame(results, columns=["id", "prediction", "answers", "score"])
average_score = df["score"].mean()
output_file = f"{path}/{average_score:.5f}.csv"
@ -287,11 +144,49 @@ def save_results_to_csv(results: List[List[Any]], path: str) -> float:
return average_score
# -- From ADAS --
def load_drop(file_path, samples, test=False, total_length=1000):
import gzip
with gzip.open(file_path, "rb") as file:
data = [json.loads(line) for line in file]
random_indices = generate_random_indices(len(data), total_length, False)
random_indices = random_indices[:samples] if not test else random_indices[samples:]
examples = [data[i] for i in random_indices]
for example in examples:
example["targets"] = example["ref_text"].split("|")
return examples
async def drop_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> float:
data = load_data(file_path, samples, test=test)
results = await evaluate_all_passages(data, graph, max_concurrent_tasks=20)
# data = load_data(file_path, samples, test=test)
data = load_drop(file_path, samples, test=test)
results = await evaluate_all_questions(data, graph, max_concurrent_tasks=30)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost}")
return average_score
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost
def load_drop_from_file(file_path):
import gzip
with gzip.open(file_path, "rb") as file:
data = [json.loads(line) for line in file]
for example in data:
example["targets"] = example["ref_text"].split("|")
return data
async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_drop_from_file(file_path)
results = await evaluate_all_questions(data, graph, max_concurrent_tasks=50)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost

View file

@ -72,6 +72,8 @@ async def evaluate_problem(input: str, graph: Callable, expected_output: str) ->
score = loose_match_score(expected_output, output)
break
# TODO 添加LOG入口
except Exception as e:
retries += 1
@ -108,3 +110,20 @@ async def gsm8k_evaluation(graph: Callable, file_path: str, samples: int, path:
print(f"Average score: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_gsm8k_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
"""Optimize GSM8K evaluation main function"""
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost

View file

@ -59,25 +59,29 @@ def f1_score(prediction, ground_truth):
return f1
async def load_data(file_path: str, samples=20, total_length=1000, test=False) -> List[dict]:
async def load_data(file_path: str, samples=20, total_length=1250, test=False) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
data = data[:total_length]
random_indices = generate_random_indices(len(data), samples, test)
random_indices = generate_random_indices(len(data), total_length, False) # get random indices of 1250
random_indices = random_indices[:samples] if not test else random_indices[samples:] # get n_samples for validation or test
data = [data[i] for i in random_indices]
return data
async def evaluate_problem(input: str, context_str: str, graph: Callable, expected_output: str):
max_retries = 5
retries = 0
# global cost
# prediction, cost = await graph(input, context_str) if graph else "None"
# score = f1_score(prediction, expected_output)
while retries < max_retries:
try:
global cost
prediction, cost = await graph(input, context_str) if graph else "None"
score = f1_score(prediction["solution"], expected_output)
score = f1_score(prediction, expected_output)
break
except Exception as e:
@ -125,5 +129,23 @@ async def hotpotqa_evaluation(graph: Callable, file_path: str, samples: int, pat
average_score = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost}")
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost

View file

@ -2,7 +2,7 @@ import json
import asyncio
import aiofiles
import pandas as pd
from typing import List, Tuple, Callable
from typing import List, Tuple, Callable, Dict, Any, Optional
from tqdm.asyncio import tqdm_asyncio
from examples.ags.benchmark.utils import generate_random_indices
@ -19,32 +19,81 @@ async def load_data(file_path: str, samples=1, test=False) -> List[dict]:
data = [data[i] for i in random_indices]
return data
async def check_solution(solution, test_cases, entry_point):
# Define a local dictionary to execute the solution
local_dict = {}
exec("from typing import List\n\n" + solution, {}, local_dict)
# async def check_solution(solution, test_cases, entry_point):
# # Define a local dictionary to execute the solution
# local_dict = {}
# exec("from typing import List, Tuple, Callable, Dict\n\n" + solution, {}, local_dict)
# Ensure the entry point function is defined
if entry_point not in local_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
# # Ensure the entry point function is defined
# if entry_point not in local_dict:
# raise ValueError(f"Function {entry_point} is not defined in the solution.")
details = [False for _ in range(len(test_cases))]
# details = [False for _ in range(len(test_cases))]
# Check each test case
for i, test in enumerate(test_cases):
# Replace 'candidate' with the actual function call
test_expr = test.replace("candidate", entry_point)
try:
# Evaluate the test case
if eval(test_expr, {}, local_dict):
details[i] = True
except Exception as e:
print(f"Error evaluating test case '{test}': {e}")
# # Check each test case
# for i, test in enumerate(test_cases):
# # Replace 'candidate' with the actual function call
# test_expr = test.replace("candidate", entry_point)
# try:
# # Evaluate the test case
# if eval(test_expr, {}, local_dict):
# details[i] = True
# except Exception as e:
# print(f"Error evaluating test case '{test}': {e}")
if all(details):
return PASS, details
# if all(details):
# return PASS, details
return FAIL, details
# return FAIL, details
async def check_solution(solution, test, entry_point):
try:
# 定义一个包含所有必要模块的全局字典
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
}
if entry_point == "decode_cyclic":
solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution
elif entry_point == "decode_shift":
solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution
elif entry_point == "find_zero":
solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution
# 执行解决方案
exec(solution, global_dict)
# 确保入口点函数已定义
if entry_point not in global_dict:
raise ValueError(f"函数 {entry_point} 在解决方案中未定义。")
# 执行测试用例
exec(test, global_dict)
# 获取检查函数
check = global_dict["check"]
# 运行检查函数
result = check(global_dict[entry_point])
if result is None:
result = (PASS, "解决方案通过了所有测试用例。")
except Exception as e:
# 记录详细的错误信息
error_message = f"错误: {str(e)}.\n 解决方案: {solution}.\n 测试: {test}"
result = (FAIL, error_message)
# 将错误信息写入error.log文件
with open('error.log', 'a', encoding='utf-8') as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str, int, str]:
max_retries = 5
@ -52,10 +101,10 @@ async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str,
while retries < max_retries:
try:
prediction = await graph(data["prompt"]) if graph else "None"
prediction = await graph(data["prompt"], data["entry_point"]) if graph else "None"
cost = prediction[1] # 添加这行来获取cost
solution = prediction[0] # 修改这行以获取实际的预测结果
ret = await check_solution(solution, data["test_cases"], data["entry_point"])
ret = await check_solution(solution, data["test"], data["entry_point"])
score = 1 if ret[0] == PASS else 0
break
@ -114,14 +163,34 @@ def save_results_to_jsonl(results: List[Tuple[str, str, str, int, str]], path: s
total_cost += float(result[4]) # 添加这行来累加cost
print(f"save to {full_path}")
avg_score /= len(results)
total_cost = results[-1][4] # 使用最后一个结果的cost作为总cost
# 从full_path中读取所有结果,选择得分最高的
with open(full_path, 'r') as f:
all_results = [json.loads(line) for line in f]
max_result = max(all_results, key=lambda x: x['cost'])
total_cost = max_result['cost']
return round(avg_score, 5), round(total_cost, 5) # 修改返回值以包含total_cost
async def humaneval_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]:
data = await load_data(file_path, samples, test=test)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_jsonl(results, path=path)
print(f"Average score on HumanEval dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost # 修改返回值以包含total_cost
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_humaneval_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_jsonl(results, path=path)
print(f"Average score on HumanEval dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost

View file

@ -21,7 +21,7 @@ def extract_answer(text: str) -> str:
return boxed_match.group(1)
# If no \boxed{...}, return the last sentence
sentences = text.split(".")
sentences = text.split(".") # TODO 使用jinyu修改
return sentences[-1].strip() if sentences else ""
def parse_digits(num):
@ -221,10 +221,11 @@ async def load_data(file_path: str, samples: int = 200, test=False) -> List[dict
data = [data[i] for i in random_indices]
return data
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> Tuple[float, float]:
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
average_score = df["score"].mean()
total_cost = df["cost"].iloc[-1]
total_cost = df["cost"].max()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
@ -277,3 +278,18 @@ async def math_evaluation(graph: Callable, file_path: str, samples: int, path: s
print(f"Average score on MATH dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_math_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on MATH dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost

View file

@ -1,8 +1,9 @@
import json
import time
import asyncio
import aiofiles
import pandas as pd
from typing import List, Tuple, Callable
from typing import List, Tuple, Callable, Any, Optional, Dict
from tqdm.asyncio import tqdm_asyncio
from examples.ags.benchmark.utils import generate_random_indices
@ -19,37 +20,52 @@ async def load_data(file_path: str, samples=1, test=False) -> List[dict]:
data = [data[i] for i in random_indices]
return data
async def check_solution(solution, test_cases, timeout=1):
# Define a local dictionary to execute the solution
local_dict = {}
exec(solution, {}, local_dict)
details = [False for _ in range(len(test_cases))]
async def evaluate_test(test):
# Delete 'assert' from test
test_expr = test.replace("assert ", "")
try:
# Evaluate the test case with timeout
await asyncio.wait_for(asyncio.to_thread(eval, test_expr, {}, local_dict), timeout)
return True
except asyncio.TimeoutError:
print(f"Test case '{test}' timed out.")
except Exception as e:
print(f"Error evaluating test case '{test}': {e}")
return False
# Check each test case
for i, test in enumerate(test_cases):
result = await evaluate_test(test)
details[i] = result
if not result:
return FAIL, details
if all(details):
return PASS, details
return FAIL, details
async def check_solution(solution, test, entry_point):
try:
# 定义一个包含所有必要模块的全局字典
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
}
# 执行解决方案
exec(solution, global_dict)
# 确保入口点函数已定义
if entry_point not in global_dict:
raise ValueError(f"函数 {entry_point} 在解决方案中未定义。")
# 执行测试用例
exec(test, global_dict)
# 获取检查函数
check = global_dict["check"]
# 运行检查函数
result = check()
if result is None:
result = (PASS, "解决方案通过了所有测试用例。")
# except ValueError as ve:
# if "函数" in str(ve) and "在解决方案中未定义" in str(ve):
# raise
except Exception as e:
# 记录详细的错误信息
error_message = f"错误: {str(e)}.\n 解决方案: {solution}.\n 测试: {test}"
result = (FAIL, error_message)
# 将错误信息写入error.log文件
with open('error_mbpp.log', 'a', encoding='utf-8') as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str, int, str]:
max_retries = 5
@ -57,10 +73,10 @@ async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str,
while retries < max_retries:
try:
prediction = await graph(data["prompt"]) if graph else "None"
prediction = await graph(data["prompt"], data["entry_point"]) if graph else "None"
cost = prediction[1]
solution = prediction[0]
ret = await check_solution(solution, data["test_list"])
ret = await check_solution(solution, data["test"], data["entry_point"])
score = 1 if ret[0] == PASS else 0
break
@ -92,7 +108,7 @@ async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurren
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> Tuple[float, float]:
df = pd.DataFrame(results, columns=["question", "prediction", "test_case_details", "score", "cost"])
average_score = df["score"].mean()
total_cost = df["cost"].iloc[-1]
total_cost = df["cost"].max()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
@ -100,9 +116,25 @@ def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str
return average_score, total_cost
async def mbpp_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]:
data = await load_data(file_path, samples)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
average_score, total_cost = save_results_to_csv(results, path=path, test=test)
data = await load_data(file_path, samples, test)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=25)
average_score, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on MBPP dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_mbpp_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on MBPP dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost

View file

@ -14,4 +14,6 @@ def generate_random_indices(n, n_samples, test=False):
if test:
return indices[n_samples:]
else:
return indices[:n_samples]
return indices[:n_samples]
# TODO yzy 补充分割数据集的函数

View file

@ -39,7 +39,7 @@ class CoTSolveGraph(SolveGraph):
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost # {"solution": solution}
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
@ -49,7 +49,7 @@ if __name__ == "__main__":
# llm_config = ModelsConfig.default().get("gpt-4o")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 264 #264 # 1055 #314
samples = 10 #264 # 1055 #314
# samples = 100
path = "examples/ags/data/baselines/general/gsm8k/"
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=False)

View file

@ -1,3 +1,7 @@
import sys
sys.path = ['H:\Hack\MetaGPT-MathAI'] + sys.path # 不然找不到根目录的模块
# print(sys.path)
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.hotpotqa import hotpotqa_evaluation
@ -9,29 +13,31 @@ from pydantic import BaseModel, Field
from typing import Tuple
HOTPOTQA_PROMPT = """
问题: {question}
上下文:
{context}
请一步步思考,并在最后给出你的答案和支持性句子使用XML标签包裹内容
Think step by step and solve the problem.
1. In the "thought" field, explain your thinking process in detail.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Question: {question}
The revelant context: {context}
"""
class GenerateOp(BaseModel):
answer: str = Field(default="", description="问题的答案")
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, question: str, context: str, mode: str = None) -> Tuple[str, str]:
thought = ""
prompt = HOTPOTQA_PROMPT.format(question=question, context=context)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response["answer"]
return response
class CoTSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
@ -40,17 +46,21 @@ class CoTSolveGraph(SolveGraph):
async def __call__(self, question: str, context: str) -> Tuple[str, str]:
answer = await self.cot_generate(question, context, mode="context_fill")
return answer, self.llm.cost_manager.total_cost
return answer["answer"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-chat")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="HotpotQA")
file_path = "examples/ags/data/hotpotqa.jsonl"
samples = 50 # TODO 选择前1000条跑实验
path = "examples/ags/data/baselines/general/hotpotqa"
score = await hotpotqa_evaluation(graph, file_path, samples, path)
file_path = "examples/ags/data/hotpotqa.jsonl" #相对路径有问题 等着再改
samples = 10 # 250 for validation, 1000 for test
path = "examples/ags/data/baselines/general/hotpotqa" #相对路径有问题 等着再改
score = await hotpotqa_evaluation(graph, file_path, samples, path, test=False)
return score
import asyncio

View file

@ -8,19 +8,21 @@ from metagpt.llm import LLM
from pydantic import BaseModel, Field
HUMANEVAL_PROMPT_GPT = """
{question}\nPlease reason step by step, and put your python function in the end.
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function without any additional text or test cases.
"""
# TODO 这里的Code_fill 不是很好做,需要对应着之前的代码修改一个版本才能跑通
class GenerateOp(BaseModel):
solution: str = Field(default="", description="问题的Python函数实现")
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
async def __call__(self, problem, function_name, mode: str = None):
prompt = HUMANEVAL_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
@ -32,19 +34,20 @@ class CoTSolveGraph(SolveGraph):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="code_fill")
return solution["solution"]
async def __call__(self, problem, function_name):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
llm_config = ModelsConfig.default().get("deepseek-chat")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="HumanEval")
file_path = "examples/ags/data/human-eval-new.jsonl"
samples = 131 # 33/131
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33 # 33/131
path = "examples/ags/data/baselines/general/humaneval"
score = await humaneval_evaluation(graph, file_path, samples, path)
score = await humaneval_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio

View file

@ -8,12 +8,8 @@ from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
MATH_PROMPT_GPT = """
{question}\nPlease reason step by step, and put your final answer in the end. Wrap content using xml tags.
"""
MATH_PROMPT_DS = """
{question}\nPlease reason step by step, and put your final answer within \\boxed{{}}.
GENERATE_COT_PROMPT = """
{question}\nPlease reason step by step. At the end, provide the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
"""
class GenerateOp(BaseModel):
@ -24,7 +20,7 @@ class CoTGenerate(Operator):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = MATH_PROMPT_GPT.format(question=problem)
prompt = GENERATE_COT_PROMPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
@ -47,27 +43,12 @@ if __name__ == "__main__":
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/math.jsonl"
samples = 100
# samples = 100
file_path = "examples/ags/data/math_test.jsonl"
# samples = None
samples = 0
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path)
score = await math_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())
# self consistency operator; universal self consistency;
# IO指的没有任何Trick看LLM自身的一个效果。使用 model 发布者在对应的 dataset 使用的 prompt。
# deepseek-chat; gpt-4o-mini; gpt-35-turbo-1106
GENERATE_PROMPT = """
Generate Solution for the following problem: {problem_description}
"""
# med ensemble
asyncio.run(main())

View file

@ -8,20 +8,20 @@ from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Tuple
MBPP_PROMPT = """
{question}\nPlease reason step by step, and put your python function in the end.
"""
MBPP_PROMPT_COT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function, ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="问题的Python函数实现")
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, question: str, mode: str = None) -> Tuple[str, str]:
prompt = MBPP_PROMPT.format(question=question)
fill_kwargs = {"context": prompt, "llm": self.llm}
async def __call__(self, problem, function_name, mode: str = None):
prompt = MBPP_PROMPT_COT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
@ -33,19 +33,19 @@ class CoTSolveGraph(SolveGraph):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
async def __call__(self, question: str) -> Tuple[str, str]:
response = await self.cot_generate(question, mode="context_fill")
return response["solution"]
async def __call__(self, question: str, entry_point) -> Tuple[str, str]:
solution = await self.cot_generate(question, entry_point, mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="MBPP")
file_path = "examples/ags/data/mbpp-new.jsonl"
samples = 30
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86
path = "examples/ags/data/baselines/general/mbpp"
score = await mbpp_evaluation(graph, file_path, samples, path)
score = await mbpp_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio

View file

@ -0,0 +1,62 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.gsm8k import gsm8k_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
GSM8K_PROMPT_IO = """
{question}\nGenerate an answer to this question. At the end, provide the final answer in the format "Answer is <number>", where <number> is a single number.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class Generate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GSM8K_PROMPT_IO.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class GenerateSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.generate = Generate(self.llm)
async def __call__(self, problem):
solution = await self.generate(problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-coder")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = GenerateSolveGraph(name="Generate", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 1219
path = "examples/ags/data/baselines/general"
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio
asyncio.run(main())
# medprompt operator; universal self consistency;
# IO指的没有任何Trick看LLM自身的一个效果。使用 model 发布者在对应的 dataset 使用的 prompt。
# deepseek-chat; gpt-4o-mini; gpt-35-turbo-1106
# med ensemble

View file

@ -0,0 +1,67 @@
import sys
sys.path = ['H:\Hack\MetaGPT-MathAI'] + sys.path # 不然找不到根目录的模块
# print(sys.path)
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.hotpotqa import hotpotqa_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Tuple
HOTPOTQA_PROMPT = """
Given a question and a context, please answer the question.
1. In the "thought" field, explain your thinking process.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Question: {question}
The revelant context: {context}
"""
class GenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")
class IOGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, question: str, context: str, mode: str = None) -> Tuple[str, str]:
thought = ""
prompt = HOTPOTQA_PROMPT.format(question=question, context=context)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class IOSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = IOGenerate(self.llm)
async def __call__(self, question: str, context: str) -> Tuple[str, str]:
answer = await self.cot_generate(question, context, mode="context_fill")
return answer["answer"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-chat")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = IOSolveGraph(name="IO", llm_config=llm_config, dataset="HotpotQA")
file_path = "examples/ags/data/hotpotqa.jsonl" #相对路径有问题 等着再改
samples = 250 # 250 for validation, 1000 for test
path = "examples/ags/data/baselines/general/hotpotqa" #相对路径有问题 等着再改
score = await hotpotqa_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,54 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.humaneval import humaneval_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
HUMANEVAL_PROMPT_IO = """
{question}\nGenerate an answer to this question, without any additional test cases.
"""
# TODO 这里的Code_fill 不是很好做,需要对应着之前的代码修改一个版本才能跑通
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class Generate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = HUMANEVAL_PROMPT_IO.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class IOSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = Generate(self.llm)
async def __call__(self, problem, function_name):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
llm_config = ModelsConfig.default().get("deepseek-chat")
graph = IOSolveGraph(name="Io", llm_config=llm_config, dataset="HumanEval")
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33 # 33/131
path = "examples/ags/data/baselines/general/humaneval"
score = await humaneval_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,54 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.math import math_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
GENERATE_IO_PROMPT = """
{question}\nPlease generate a solution for the problem. At the end, provide the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class IOGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GENERATE_IO_PROMPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class IOSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = IOGenerate(self.llm)
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-coder")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = IOSolveGraph(name="CoT", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/math_test.jsonl" #486
# samples = None
samples = 0
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,55 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.mbpp import mbpp_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
MBPP_PROMPT_IO = """
{question}\nGenerate an answer to this question, ensure the output code is self-contained, meaning it should have the correct function name and return statement, but without any additional test cases.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class Generate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = MBPP_PROMPT_IO.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class IOSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = Generate(self.llm)
async def __call__(self, problem, function_name):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("deepseek-chat")
# llm_config = ModelsConfig.default().get("gpt-35-turbo")
graph = IOSolveGraph(name="Io", llm_config=llm_config, dataset="MBPP")
# result = await graph("Write a function to round every number of a given list of numbers and print the total sum multiplied by the length of the list.\n\ndef round_and_sum(list1):", "round_and_sum")
# print(result)
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86 # 86/341
path = "examples/ags/data/baselines/general/mbpp"
score = await mbpp_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -63,7 +63,7 @@ class MdEnsemble(Operator):
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, name: str = "MdEnsemble", llm: LLM = LLM(), vote_count: int = 3):
def __init__(self, llm: LLM, name: str = "MdEnsemble", vote_count: int = 3):
super().__init__(name, llm)
self.vote_count = vote_count
@ -107,11 +107,11 @@ class MedPromptGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str, vote_count: int = 3):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.md_ensemble = MdEnsemble(self.llm, vote_count=vote_count)
self.md_ensemble = MdEnsemble(llm=self.llm, vote_count=vote_count)
async def __call__(self, problem):
solutions = []
for i in range(2):
for i in range(3):
solution = await self.cot_generate(problem, mode="context_fill")
solutions.append(solution["solution"])
solution = await self.md_ensemble(solutions, problem, mode="context_fill")
@ -119,10 +119,10 @@ class MedPromptGraph(SolveGraph):
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("deepseek-coder")
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("deepseek-coder")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = MedPromptGraph(name="MedPrompt", llm_config=llm_config, dataset="Gsm8K", vote_count=2)
graph = MedPromptGraph(name="MedPrompt", llm_config=llm_config, dataset="Gsm8K", vote_count=5)
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 264
path = "examples/ags/data/baselines/general"

View file

@ -0,0 +1,124 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.humaneval import humaneval_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Tuple
from collections import Counter
import random
HUMANEVAL_PROMPT_GPT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function without any additional text or test cases.
"""
MD_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the solution that is more capable of solving the problem compared to other solutions, as this is crucial for problem-solving.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = HUMANEVAL_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class MdEnsembleOp(BaseModel):
thought: str = Field(
default="",
description="Step-by-step analysis of the solutions to determine the best one.",
)
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class MdEnsemble(Operator):
"""
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, llm: LLM, name: str = "MdEnsemble", vote_count: int = 5):
super().__init__(name, llm)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
random.shuffle(shuffled_solutions)
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
print(f"solution count: {len(solutions)}")
all_responses = []
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(MdEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"solution": final_answer}
class MedPromptGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str, vote_count: int = 5):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.md_ensemble = MdEnsemble(self.llm, vote_count=vote_count)
async def __call__(self, problem, function_name):
solutions = []
for i in range(3):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
solutions.append(solution["solution"])
solution = await self.md_ensemble(solutions, problem, mode="context_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MedPromptGraph(name="MedPrompt", llm_config=llm_config, dataset="HumanEval", vote_count=5)
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33
path = "examples/ags/data/baselines/general/humaneval"
score, cost = await humaneval_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,129 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.math import math_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Tuple
from collections import Counter
import random
GENERATE_COT_PROMPT = """
{question}\nPlease reason step by step. At the end, provide the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GENERATE_COT_PROMPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
MD_ENSEMBLE_PROMPT = """
You are given a problem:
{question}
Here is a list of possible solutions to the problem:
{solutions}
Using the inputs above, your goal is to choose the best solution to the problem.
The main consideration is that the solution can fully solve the problem in a correct and robust manner.
Provide your final decision by writing the chosen solution letter.
Please follow the required format in your response.
"""
class MdEnsembleOp(BaseModel):
thought: str = Field(
default="",
description="Step-by-step analysis of the solutions to determine the best one.",
)
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class MdEnsemble(Operator):
"""
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, name: str = "MdEnsemble", llm: LLM = LLM(), vote_count: int = 3):
super().__init__(name, llm)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
random.shuffle(shuffled_solutions)
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
print(f"solution count: {len(solutions)}")
all_responses = []
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(MdEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"solution": final_answer}
class MedPromptGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str, vote_count: int = 3):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(llm=self.llm)
self.md_ensemble = MdEnsemble(llm=self.llm, vote_count=vote_count)
async def __call__(self, problem):
solutions = []
for i in range(2):
solution = await self.cot_generate(problem, mode="context_fill")
solutions.append(solution["solution"])
solution = await self.md_ensemble(solutions, problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MedPromptGraph(name="MedPrompt", llm_config=llm_config, dataset="Gsm8K", vote_count=2)
file_path = "examples/ags/data/math_test.jsonl"
# samples = None
samples = 0
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,127 @@
from examples.ags.benchmark.mbpp import mbpp_evaluation
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Tuple
from collections import Counter
import random
MBPP_PROMPT_COT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function, ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
MD_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the solution that is more capable of solving the problem compared to other solutions, as this is crucial for problem-solving.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = MBPP_PROMPT_COT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class MdEnsembleOp(BaseModel):
thought: str = Field(
default="",
description="Step-by-step analysis of the solutions to determine the best one.",
)
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class MdEnsemble(Operator):
"""
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, llm: LLM, name: str = "MdEnsemble", vote_count: int = 5):
super().__init__(name, llm)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
random.shuffle(shuffled_solutions)
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
print(f"solution count: {len(solutions)}")
all_responses = []
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(MdEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"solution": final_answer}
class MedPromptGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str, vote_count: int = 5):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.md_ensemble = MdEnsemble(self.llm, vote_count=vote_count)
async def __call__(self, problem, function_name):
solutions = []
for i in range(3):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
solutions.append(solution["solution"])
solution = await self.md_ensemble(solutions, problem, mode="context_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("deepseek-chat")
# llm_config = ModelsConfig.default().get("gpt-35-turbo")
graph = MedPromptGraph(name="MedPrompt", llm_config=llm_config, dataset="MBPP")
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86 # 86/341
path = "examples/ags/data/baselines/general/mbpp"
score = await mbpp_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -22,7 +22,7 @@ FINAL_DECISION_PROMPT = """
Considering all the thinking processes and answers:
{all_thinking}
{all_answers}
Please reason carefully and provide the final answer. To ensure accuracy, only provide the answer in the solution, without any steps.
Please reason carefully and provide the final answer. To ensure accuracy, At the end, provide the final answer in solution field with the format "Answer is <number>", where <number> is a single number, without any additional information or explanation.
"""
class DebateOp(BaseModel):
@ -73,10 +73,9 @@ class MultiPersonaGraph(SolveGraph):
self.debate_agents = [
DebateAgent(self.llm, f"Debate Agent {i}", role)
for i, role in enumerate([
'Math Competition Champion',
'Elementary School Math Teacher',
'Math Professor',
'Computer Scientist'
'Innovative Math Thinker - Math PhD',
'Critical Reasoning Expert - Math Professor',
'Computational Thinking Specialist - Math And Computer Science Researcher'
])
]
self.final_decision_agent = FinalDecisionAgent(self.llm)
@ -107,12 +106,12 @@ class MultiPersonaGraph(SolveGraph):
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("deepseek-coder")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MultiPersonaGraph(name="multi-persona", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 1
samples = 264
path = "examples/ags/data/baselines/general"
score, cost = await gsm8k_evaluation(graph, file_path, samples, path)
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio

View file

@ -0,0 +1,133 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.hotpotqa import hotpotqa_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
DEBATE_INITIAL_PROMPT = """
Given a question and context, please think step by step and then solve this task.
Question: {question}
Context: {relevant_context}
"""
DEBATE_PROMPT = """
Given a question and context,
Question: {question}
Context: {relevant_context}
Considering the solutions provided by other agents as additional suggestions. Please think carefully and provide an updated answer.
"""
FINAL_DECISION_PROMPT = """
Given a question and context,
Question: {question}
Context: {relevant_context}
Considering all the thinking processes and answers:
{all_thinking}
{all_answers}
Please reason carefully and provide the final answer. Give the final answer in solution field. You MUST Keep the answer very concise in a few words, without any additional information.
"""
class DebateOp(BaseModel):
thinking: str = Field(default="", description="thinking process")
answer: str = Field(default="", description="answer")
class FinalDecisionOp(BaseModel):
thinking: str = Field(default="", description="final thinking process")
solution: str = Field(default="", description="final answer")
class DebateAgent(Operator):
def __init__(self, llm: LLM, name: str, role: str):
super().__init__(name, llm)
self.role = role
async def __call__(self, question: str, relevant_context: str, context: List[str] = None, mode: str = None):
role_prompt = f"You are a {self.role}. Based on your professional knowledge and thinking style,"
if context is None:
prompt = role_prompt + DEBATE_INITIAL_PROMPT.format(question=question, relevant_context=relevant_context)
else:
prompt = role_prompt + DEBATE_PROMPT.format(question=question, relevant_context=relevant_context) + "\n".join(context)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(DebateOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class FinalDecisionAgent(Operator):
def __init__(self, llm: LLM, name: str = "FinalDecision"):
super().__init__(name, llm)
async def __call__(self, question: str, relevant_context: str, all_thinking: List[str], all_answers: List[str], mode: str = None):
prompt = FINAL_DECISION_PROMPT.format(
question = question,
relevant_context = relevant_context,
all_thinking="\n".join(all_thinking),
all_answers="\n".join(all_answers)
)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(FinalDecisionOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class MultiPersonaGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.debate_agents = [
DebateAgent(self.llm, f"Debate Agent {i}", role)
for i, role in enumerate([
'Comprehensive Knowledge Maven - Information Scientist',
'Analytical Insight Specialist - Cognitive Psychologist',
'Fact Verification Expert - Data Analyst'
])
]
self.final_decision_agent = FinalDecisionAgent(self.llm)
async def __call__(self, question, relevant_context):
max_round = 2
all_thinking = [[] for _ in range(max_round)]
all_answers = [[] for _ in range(max_round)]
for r in range(max_round):
for i, agent in enumerate(self.debate_agents):
if r == 0:
result = await agent(question, relevant_context, mode="context_fill")
else:
context = [f"{agent.role}'s previous round thinking: {all_thinking[r-1][i]}"] + \
[f"{self.debate_agents[j].role}'s thinking: {all_thinking[r-1][j]}" for j in range(len(self.debate_agents)) if j != i]
result = await agent(question, relevant_context, context, mode="context_fill")
all_thinking[r].append(result["thinking"])
all_answers[r].append(result["answer"])
final_result = await self.final_decision_agent(
question,
relevant_context,
[f"{agent.role}'s final thinking: {thinking}" for agent, thinking in zip(self.debate_agents, all_thinking[-1])],
[f"{agent.role}'s final answer: {answer}" for agent, answer in zip(self.debate_agents, all_answers[-1])],
mode="context_fill"
)
return final_result["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MultiPersonaGraph(name="multi-persona", llm_config=llm_config, dataset="HotpotQA")
file_path = "examples/ags/data/hotpotqa.jsonl" #相对路径有问题 等着再改
samples = 250 # 250 for validation, 1000 for test
path = "examples/ags/data/baselines/general/hotpotqa" #相对路径有问题 等着再改
score = await hotpotqa_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,117 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.humaneval import humaneval_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
DEBATE_INITIAL_PROMPT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function without any additional text or test cases.
"""
DEBATE_PROMPT = """
{question}
Considering the solutions provided by other agents as additional suggestions. Please think carefully and provide an updated python function without any additional text or test cases.
"""
FINAL_DECISION_PROMPT = """
{question}
Considering all the thinking processes and answers:
{all_thinking}
{all_answers}
Please reason carefully and provide the final answer. Make sure the code output is wrapped with ```python``` without any additional text or test cases.
"""
class DebateOp(BaseModel):
thinking: str = Field(default="", description="think")
answer: str = Field(default="", description="answer")
class FinalDecisionOp(BaseModel):
solution: str = Field(default="", description="final answer")
class DebateAgent(Operator):
def __init__(self, llm: LLM, name: str, role: str):
super().__init__(name, llm)
self.role = role
async def __call__(self, problem: str, function_name: str, context: List[str] = None, mode: str = None):
role_prompt = f"You are a {self.role}. Based on your professional knowledge and thinking style,"
if context is None:
prompt = role_prompt + DEBATE_INITIAL_PROMPT.format(question=problem)
else:
prompt = role_prompt + DEBATE_PROMPT.format(question=problem) + "\n".join(context)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name":function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(DebateOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class FinalDecisionAgent(Operator):
def __init__(self, llm: LLM, name: str = "FinalDecision"):
super().__init__(name, llm)
async def __call__(self, problem: str, function_name, all_thinking: List[str], all_answers: List[str], mode: str = None):
prompt = FINAL_DECISION_PROMPT.format(
question=problem,
all_thinking="\n".join(all_thinking),
all_answers="\n".join(all_answers)
)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name":function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(FinalDecisionOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class MultiPersonaGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.debate_agents = [
DebateAgent(self.llm, f"Debate Agent {i}", role)
for i, role in enumerate([
'Innovative CS Thinker - ICPC Competitor',
'Critical Reasoning Expert - Math Professor',
'Computational Thinking Specialist - Computer Science Researcher'
])
]
self.final_decision_agent = FinalDecisionAgent(self.llm)
async def __call__(self, problem, function_name):
max_round = 2
all_thinking = [[] for _ in range(max_round)]
all_answers = [[] for _ in range(max_round)]
for r in range(max_round):
for i, agent in enumerate(self.debate_agents):
if r == 0:
result = await agent(problem, function_name, mode="context_fill")
else:
context = [f"{agent.role}'s previous round thinking: {all_thinking[r-1][i]}"] + \
[f"{self.debate_agents[j].role}'s thinking: {all_thinking[r-1][j]}" for j in range(len(self.debate_agents)) if j != i]
result = await agent(problem, context, mode="context_fill")
all_thinking[r].append(result["thinking"])
all_answers[r].append(result["answer"])
final_result = await self.final_decision_agent(
problem,
function_name ,
[f"{agent.role}'s final thinking: {thinking}" for agent, thinking in zip(self.debate_agents, all_thinking[-1])],
[f"{agent.role}'s final answer: {answer}" for agent, answer in zip(self.debate_agents, all_answers[-1])],
mode="code_fill"
)
return final_result['solution'], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MultiPersonaGraph(name="multi-persona", llm_config=llm_config, dataset="HumanEval")
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33
path = "examples/ags/data/baselines/general/humaneval"
score, cost = await humaneval_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,122 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.math import math_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
DEBATE_INITIAL_PROMPT = """
{question}\nPlease reason step by step, the reason process can be put in the thinking field. At the end, provide the final answer in the answer field with the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
Make sure the output is wrapped with correct xml tags!
"""
DEBATE_PROMPT = """
{question}
Considering the solutions provided by other agents as additional suggestions, the reason process can be put in the thinking field. Please think carefully and provide an updated answer in the answer field with the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
Make sure the output is wrapped with correct xml tags!
"""
FINAL_DECISION_PROMPT = """
{question}
Considering all the thinking processes and answers:
{all_thinking}
{all_answers}
The thinking process can be put in the thinking field.
Please reason carefully and provide the final answer in the answer field with the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
Make sure the output is wrapped with correct xml tags!
"""
class DebateOp(BaseModel):
thinking: str = Field(default="", description="thinking process")
answer: str = Field(default="", description="answer")
class FinalDecisionOp(BaseModel):
thinking: str = Field(default="", description="final thinking process")
solution: str = Field(default="", description="final answer")
class DebateAgent(Operator):
def __init__(self, llm: LLM, name: str, role: str):
super().__init__(name, llm)
self.role = role
async def __call__(self, problem: str, context: List[str] = None, mode: str = None):
role_prompt = f"You are a {self.role}. Based on your professional knowledge and thinking style,"
if context is None:
prompt = role_prompt + DEBATE_INITIAL_PROMPT.format(question=problem)
else:
prompt = role_prompt + DEBATE_PROMPT.format(question=problem) + "\n".join(context)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(DebateOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class FinalDecisionAgent(Operator):
def __init__(self, llm: LLM, name: str = "FinalDecision"):
super().__init__(name, llm)
async def __call__(self, problem: str, all_thinking: List[str], all_answers: List[str], mode: str = None):
prompt = FINAL_DECISION_PROMPT.format(
question=problem,
all_thinking="\n".join(all_thinking),
all_answers="\n".join(all_answers)
)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(FinalDecisionOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class MultiPersonaGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.debate_agents = [
DebateAgent(self.llm, f"Debate Agent {i}", role)
for i, role in enumerate([
'Innovative Math Thinker - Math PhD',
'Critical Reasoning Expert - Math Professor',
'Computational Thinking Specialist - Math And Computer Science Researcher'
])
]
self.final_decision_agent = FinalDecisionAgent(self.llm)
async def __call__(self, problem):
max_round = 2
all_thinking = [[] for _ in range(max_round)]
all_answers = [[] for _ in range(max_round)]
for r in range(max_round):
for i, agent in enumerate(self.debate_agents):
if r == 0:
result = await agent(problem, mode="context_fill")
else:
context = [f"{agent.role}'s previous round thinking: {all_thinking[r-1][i]}"] + \
[f"{self.debate_agents[j].role}'s thinking: {all_thinking[r-1][j]}" for j in range(len(self.debate_agents)) if j != i]
result = await agent(problem, context, mode="context_fill")
all_thinking[r].append(result["thinking"])
all_answers[r].append(result["answer"])
final_result = await self.final_decision_agent(
problem,
[f"{agent.role}'s final thinking: {thinking}" for agent, thinking in zip(self.debate_agents, all_thinking[-1])],
[f"{agent.role}'s final answer: {answer}" for agent, answer in zip(self.debate_agents, all_answers[-1])],
mode="context_fill"
)
return final_result, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MultiPersonaGraph(name="multi-persona", llm_config=llm_config, dataset="MATH")
file_path = "examples/ags/data/math_test.jsonl"
samples = 0
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,118 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.mbpp import mbpp_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
DEBATE_INITIAL_PROMPT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function, ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
DEBATE_PROMPT = """
{question}
Considering the solutions provided by other agents as additional suggestions. Please think carefully and provide an updated self-contained python function which meaning it should have the correct function name and return statement, but it shouldn't have any additional text or test cases.
"""
FINAL_DECISION_PROMPT = """
{question}
Considering all the thinking processes and answers:
{all_thinking}
{all_answers}
Please reason carefully and provide the final answer. Make sure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
class DebateOp(BaseModel):
thinking: str = Field(default="", description="think")
answer: str = Field(default="", description="answer")
class FinalDecisionOp(BaseModel):
solution: str = Field(default="", description="final answer")
class DebateAgent(Operator):
def __init__(self, llm: LLM, name: str, role: str):
super().__init__(name, llm)
self.role = role
async def __call__(self, problem: str, function_name: str, context: List[str] = None, mode: str = None):
role_prompt = f"You are a {self.role}. Based on your professional knowledge and thinking style,"
if context is None:
prompt = role_prompt + DEBATE_INITIAL_PROMPT.format(question=problem)
else:
prompt = role_prompt + DEBATE_PROMPT.format(question=problem) + "\n".join(context)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name":function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(DebateOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class FinalDecisionAgent(Operator):
def __init__(self, llm: LLM, name: str = "FinalDecision"):
super().__init__(name, llm)
async def __call__(self, problem: str, function_name, all_thinking: List[str], all_answers: List[str], mode: str = None):
prompt = FINAL_DECISION_PROMPT.format(
question=problem,
all_thinking="\n".join(all_thinking),
all_answers="\n".join(all_answers)
)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name":function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(FinalDecisionOp).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class MultiPersonaGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.debate_agents = [
DebateAgent(self.llm, f"Debate Agent {i}", role)
for i, role in enumerate([
'Innovative CS Thinker - ICPC Competitor',
'Critical Reasoning Expert - Math Professor',
'Computational Thinking Specialist - Computer Science Researcher'
])
]
self.final_decision_agent = FinalDecisionAgent(self.llm)
async def __call__(self, problem, function_name):
max_round = 2
all_thinking = [[] for _ in range(max_round)]
all_answers = [[] for _ in range(max_round)]
for r in range(max_round):
for i, agent in enumerate(self.debate_agents):
if r == 0:
result = await agent(problem, function_name, mode="context_fill")
else:
context = [f"{agent.role}'s previous round thinking: {all_thinking[r-1][i]}"] + \
[f"{self.debate_agents[j].role}'s thinking: {all_thinking[r-1][j]}" for j in range(len(self.debate_agents)) if j != i]
result = await agent(problem, context, mode="context_fill")
all_thinking[r].append(result["thinking"])
all_answers[r].append(result["answer"])
final_result = await self.final_decision_agent(
problem,
function_name ,
[f"{agent.role}'s final thinking: {thinking}" for agent, thinking in zip(self.debate_agents, all_thinking[-1])],
[f"{agent.role}'s final answer: {answer}" for agent, answer in zip(self.debate_agents, all_answers[-1])],
mode="code_fill"
)
return final_result['solution'], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = MultiPersonaGraph(name="multi-persona", llm_config=llm_config, dataset="MBPP")
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86 # 86/341
path = "examples/ags/data/baselines/general/mbpp"
score, cost = await mbpp_evaluation(graph, file_path, samples, path, test=True)
print(f"per cost:{cost/341}")
return score
import asyncio
asyncio.run(main())

View file

@ -12,7 +12,7 @@ from collections import Counter
import random
GSM8K_PROMPT_GPT = """
{question}\nPlease reason step by step, and to ensure accuracy, provide the correct answer in the final, without any additional text.
{question}\nPlease reason step by step. At the end, provide the final answer in the format "Answer is <number>", where <number> is a single number, without any additional information or explanation.
"""
GSM8K_PROMPT_DS = """
@ -36,15 +36,17 @@ class CoTGenerate(Operator):
return response
SC_ENSEMBLE_PROMPT = """
Given the question descripted as follows: {question}
some solutions are generated to solve the question as follows:
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Evaluate these solutions and select the most consistent solution based on majority consensus.
Give your answer with a single id of solution (without anything else).
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
@ -73,7 +75,7 @@ class ScEnsemble(Operator):
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
return {"solution": solutions[answer_mapping[answer]]}
@ -87,7 +89,7 @@ class SelfConsistencyGraph(SolveGraph):
async def __call__(self, problem):
solutions = []
for i in range(2):
for i in range(5):
solution = await self.cot_generate(problem, mode="context_fill")
solutions.append(solution["solution"])
solution = await self.sc_ensemble(solutions, problem, mode="context_fill")
@ -100,9 +102,9 @@ if __name__ == "__main__":
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = SelfConsistencyGraph(name="SelfConsistency", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 1
samples = 264
path = "examples/ags/data/baselines/general"
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=False)
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio

View file

@ -0,0 +1,103 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.humaneval import humaneval_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
HUMANEVAL_PROMPT_GPT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function without any additional text or test cases.
"""
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = HUMANEVAL_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self, llm, name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
return {"solution": solutions[answer_mapping[answer]]}
class SelfConsistencyGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(llm=self.llm)
self.sc_ensemble = ScEnsemble(llm=self.llm)
async def __call__(self, problem, function_name):
solutions = []
for i in range(5):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
solutions.append(solution["solution"])
solution = await self.sc_ensemble(solutions, problem, mode="context_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
llm_config = ModelsConfig.default().get("deepseek-chat")
graph = SelfConsistencyGraph(name="SelfConsistency", llm_config=llm_config, dataset="HumanEval")
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33 # 33/131
path = "examples/ags/data/baselines/general/humaneval"
score = await humaneval_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,108 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.math import math_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Tuple
from collections import Counter
import random
GENERATE_COT_PROMPT = """
{question}\nPlease reason step by step. At the end, provide the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GENERATE_COT_PROMPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self, name: str = "ScEnsemble", llm: LLM = LLM()):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
return {"solution": solutions[answer_mapping[answer]]}
class SelfConsistencyGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(llm=self.llm)
self.sc_ensemble = ScEnsemble(llm=self.llm)
async def __call__(self, problem):
solutions = []
for i in range(5):
solution = await self.cot_generate(problem, mode="context_fill")
solutions.append(solution["solution"])
solution = await self.sc_ensemble(solutions, problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-coder")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = SelfConsistencyGraph(name="SelfConsistency", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/math_test.jsonl"
# samples = None
samples = 0
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path,test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,102 @@
from examples.ags.benchmark.mbpp import mbpp_evaluation
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import List
MBPP_PROMPT_COT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function, ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="Python Solution For This Question.")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = MBPP_PROMPT_COT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self, llm, name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
return {"solution": solutions[answer_mapping[answer]]}
class SelfConsistencyGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(llm=self.llm)
self.sc_ensemble = ScEnsemble(llm=self.llm)
async def __call__(self, problem, function_name):
solutions = []
for i in range(5):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
solutions.append(solution["solution"])
solution = await self.sc_ensemble(solutions, problem, mode="context_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("deepseek-chat")
# llm_config = ModelsConfig.default().get("gpt-35-turbo")
graph = SelfConsistencyGraph(name="SelfConsistency", llm_config=llm_config, dataset="MBPP")
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86 # 86/341
path = "examples/ags/data/baselines/general/mbpp"
score = await mbpp_evaluation(graph, file_path, samples, path, test=True)
return score
import asyncio
asyncio.run(main())

View file

@ -8,36 +8,38 @@ from pydantic import BaseModel, Field
from typing import Dict, Any
GSM8K_PROMPT_GPT = """
{question}\nPlease reason step by step, and put your final answer in the end. Wrap content using xml tags.
"""
GSM8K_PROMPT_DS = """
{question}\nPlease reason step by step, and put your final answer within \\boxed{{}}.
{question}\nPlease reason step by step. At the end, provide the final answer in the format "Answer is <number>", where <number> is a single number, without any additional information or explanation.
"""
REVIEW_PROMPT = """
For the question described as {question},
please review the following solution: {solution}, and criticize on where might be wrong. You should provide a review result in boolean format.
If you believe the solution is capable of resolving the issue, return True; otherwise, return False, and include your feedback.
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
problem: {problem}
solution: {solution}
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
"""
REVISE_PROMPT = """
For the question described as {question}, \nand an error solution: {solution}, \nwith the feedback: {feedback},
Given the previous solution and feedback, carefully refine the solution to solve the question and ensure it aligns with the original format.
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final answer in the format "Answer is <number>", where <number> is a single number.
problem: {problem}
solution: {solution}
feedback: {feedback}
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class ReviewOp(BaseModel):
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
class ReviseOp(BaseModel):
@ -48,7 +50,7 @@ class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
async def __call__(self, problem, mode: str = "context_fill"):
prompt = GSM8K_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
@ -61,8 +63,8 @@ class Review(Operator):
def __init__(self, llm: LLM, name: str = "Review"):
super().__init__(name, llm)
async def __call__(self, problem, solution, mode: str = None):
prompt = REVIEW_PROMPT.format(question=problem, solution=solution)
async def __call__(self, problem, solution, mode: str = "context_fill"):
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
@ -71,11 +73,11 @@ class Review(Operator):
return response
class Revise(Operator):
def __init__(self, name: str = "Revise", llm: LLM = LLM()):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(name, llm)
async def __call__(self, problem, solution, feedback, mode: str = None):
prompt = REVISE_PROMPT.format(question=problem, solution=solution, feedback=feedback)
async def __call__(self, problem, solution, feedback, mode: str = "context_fill"):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
@ -92,7 +94,7 @@ class SelfRefineGraph(SolveGraph):
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="context_fill")
for i in range(5):
for i in range(3):
review = await self.review(problem, solution)
if review["review_result"]:
break
@ -101,14 +103,12 @@ class SelfRefineGraph(SolveGraph):
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("deepseek-coder")
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
# llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = SelfRefineGraph(name="self-refine", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 10
samples = 264
path = "examples/ags/data/baselines/general"
score, cost = await gsm8k_evaluation(graph, file_path, samples, path)
score, cost = await gsm8k_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio

View file

@ -0,0 +1,119 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.humaneval import humaneval_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
HUMANEVAL_PROMPT_GPT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function without any additional text or test cases.
"""
REVIEW_PROMPT = """
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
problem: {problem}
solution: {solution}
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
"""
REVISE_PROMPT = """
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final code solution is wrapped with ```python```.
problem: {problem}
solution: {solution}
feedback: {feedback}
Ensure the output code is self-contained, and without any additional text or test cases.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class ReviewOp(BaseModel):
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
class ReviseOp(BaseModel):
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = HUMANEVAL_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Review(Operator):
def __init__(self, llm: LLM, name: str = "Review"):
super().__init__(name, llm)
async def __call__(self, problem, solution, mode: str = None):
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviewOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Revise(Operator):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(name, llm)
async def __call__(self, problem, solution, feedback, mode: str = None):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviseOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class SelfRefineGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
llm_config.temperature = 0.0
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.review = Review(self.llm)
self.revise = Revise(self.llm)
async def __call__(self, problem, function_name):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
for i in range(3):
review = await self.review(problem, solution, mode="context_fill")
if review["review_result"]:
break
solution = await self.revise(problem, solution, review["feedback"], mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = SelfRefineGraph(name="self-refine", llm_config=llm_config, dataset="HumanEval")
file_path = "examples/ags/data/human-eval.jsonl"
samples = 33
path = "examples/ags/data/baselines/general/humaneval"
score, cost = await humaneval_evaluation(graph, file_path, samples, path, test=True)
return score, cost
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,119 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.math import math_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
GENERATE_COT_PROMPT = """
{question}\nPlease reason step by step. At the end, provide the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
"""
REVIEW_PROMPT = """
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
problem: {problem}
solution: {solution}
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
"""
REVISE_PROMPT = """
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final answer in the format "\\boxed{{<number>}}", where <number> is a math answer(an expression or number), without any additional information or explanation.
problem: {problem}
solution: {solution}
feedback: {feedback}
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class ReviewOp(BaseModel):
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
class ReviseOp(BaseModel):
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GENERATE_COT_PROMPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Review(Operator):
def __init__(self, llm: LLM, name: str = "Review"):
super().__init__(name, llm)
async def __call__(self, problem, solution, mode: str = "context_fill"):
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviewOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Revise(Operator):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(name, llm)
async def __call__(self, problem, solution, feedback, mode: str = "context_fill"):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviseOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class SelfRefineGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.review = Review(self.llm)
self.revise = Revise(self.llm)
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="context_fill")
for i in range(3):
review = await self.review(problem, solution)
if review["review_result"]:
break
solution = await self.revise(problem, solution, review["feedback"])
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = SelfRefineGraph(name="self-refine", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/math_test.jsonl"
# samples = None
samples = 10
path = "examples/ags/data/baselines/general/math"
score = await math_evaluation(graph, file_path, samples, path,test=False)
return score
import asyncio
asyncio.run(main())

View file

@ -0,0 +1,117 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.mbpp import mbpp_evaluation
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
MBPP_PROMPT_COT = """
{question}\nPlease provide a step-by-step explanation in text, followed by your Python function, ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text."""
REVIEW_PROMPT = """
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
problem: {problem}
solution: {solution}
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
"""
REVISE_PROMPT = """
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final code solution is wrapped with ```python```.
problem: {problem}
solution: {solution}
feedback: {feedback}
Ensure the output code is self-contained, meaning it should have the correct function name and return statement, without any additional text.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class ReviewOp(BaseModel):
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
class ReviseOp(BaseModel):
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, function_name, mode: str = None):
prompt = MBPP_PROMPT_COT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm, "function_name": function_name}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Review(Operator):
def __init__(self, llm: LLM, name: str = "Review"):
super().__init__(name, llm)
async def __call__(self, problem, solution, mode: str = None):
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviewOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class Revise(Operator):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(name, llm)
async def __call__(self, problem, solution, feedback, mode: str = None):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReviseOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class SelfRefineGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
self.review = Review(self.llm)
self.revise = Revise(self.llm)
async def __call__(self, problem, function_name):
solution = await self.cot_generate(problem, function_name, mode="code_fill")
for i in range(3):
review = await self.review(problem, solution, mode="context_fill")
if review["review_result"]:
break
solution = await self.revise(problem, solution, review["feedback"], mode="code_fill")
return solution["solution"], self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
llm_config = ModelsConfig.default().get("gpt-4o-mini")
graph = SelfRefineGraph(name="self-refine", llm_config=llm_config, dataset="MBPP")
file_path = "examples/ags/data/mbpp-new-new.jsonl"
samples = 86# 86/341
path = "examples/ags/data/baselines/general/mbpp"
score, cost = await mbpp_evaluation(graph, file_path, samples, path, test=True)
print(f"per cost:{cost/341}")
return score
import asyncio
asyncio.run(main())

View file

@ -20,13 +20,13 @@ from sympy.parsing.latex import parse_latex
from sympy.parsing.sympy_parser import parse_expr
from tqdm.asyncio import tqdm_asyncio
from examples.ags.benchmark.gsm8k import gsm8k_evaluation
from examples.ags.benchmark.gsm8k import optimize_gsm8k_evaluation
from examples.ags.benchmark.utils import generate_random_indices
from examples.ags.benchmark.math import math_evaluation
from examples.ags.benchmark.humaneval import humaneval_evaluation
from examples.ags.benchmark.mbpp import mbpp_evaluation
from examples.ags.benchmark.drop import drop_evaluation
from examples.ags.benchmark.hotpotqa import hotpotqa_evaluation
from examples.ags.benchmark.math import optimize_math_evaluation
from examples.ags.benchmark.humaneval import optimize_humaneval_evaluation
from examples.ags.benchmark.mbpp import optimize_mbpp_evaluation
from examples.ags.benchmark.drop import optimize_drop_evaluation
from examples.ags.benchmark.hotpotqa import optimize_hotpotqa_evaluation
DatasetType = Literal["HumanEval", "MBPP", "Gsm8K", "MATH", "HotpotQA", "DROP"]
@ -41,119 +41,158 @@ class Evaluator:
def validation_evaluate(self, dataset: DatasetType, graph, params: dict, path):
"""
Evaluates on validation dataset.
dataset: dataset type
graph: graph class
params: params for graph
path: path to save results
"""
if dataset == "Gsm8K":
return self._gsm8k_eval(graph, params, path)
return self._gsm8k_eval(graph, params, path, test=False)
elif dataset == "MATH":
return self._math_eval(graph, params, path)
return self._math_eval(graph, params, path, test=False)
elif dataset == "HumanEval":
return self._humaneval_eval(graph, params, path)
return self._humaneval_eval(graph, params, path, test=False)
elif dataset == "HotpotQA":
return self._hotpotqa_eval(graph, params, path)
return self._hotpotqa_eval(graph, params, path, test=False)
elif dataset == "MBPP":
return self._mbpp_eval(graph, params, path)
return self._mbpp_eval(graph, params, path, test=False)
elif dataset == "DROP":
return self._drop_eval(graph, params, path)
return self._drop_eval(graph, params, path, test=False)
def test_evaluate(self, dataset: DatasetType):
def test_evaluate(self, dataset: DatasetType, graph, params: dict, path):
"""
Evaluates on test dataset.
"""
if dataset == "Gsm8K":
return self._gsm8k_eval(graph, params, path, test=True)
elif dataset == "MATH":
return self._math_eval(graph, params, path, test=True)
elif dataset == "HumanEval":
return self._humaneval_eval(graph, params, path, test=True)
elif dataset == "HotpotQA":
return self._hotpotqa_eval(graph, params, path, test=True)
elif dataset == "MBPP":
return self._mbpp_eval(graph, params, path, test=True)
elif dataset == "DROP":
return self._drop_eval(graph, params, path, test=True)
pass
async def _gsm8k_eval(self, graph_class, params, path, samples: int = 50):
async def _gsm8k_eval(self, graph_class, params, path, test=False):
"""
Evaluate on GSM8K dataset.
评估GSM8K数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="Gsm8K", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/gsm8k_test.jsonl"
else:
data_path = "examples/ags/data/gsm8k_validate.jsonl"
graph = await load_graph()
file_path = "examples/ags/data/gsm8k.jsonl"
score = await gsm8k_evaluation(graph, file_path, samples, path)
score, cost = await optimize_gsm8k_evaluation(graph, data_path, path)
return score
return score, cost
async def _math_eval(self, graph_class, params, path, samples: int = 200):
async def _math_eval(self, graph_class, params, path, test=False):
"""
Evaluate on MATH dataset.
评估MATH数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="MATH", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/math_test.jsonl"
else:
data_path = "examples/ags/data/math_validate.jsonl"
graph = await load_graph()
file_path = "examples/ags/w_action_node/data/math.jsonl" # 替换为实际的 MATH.jsonl 路径
score = await math_evaluation(graph, file_path, samples, path)
score, cost = await optimize_math_evaluation(graph, data_path, path)
return score
return score, cost
async def _humaneval_eval(self, graph_class, params, path, samples: int = 1):
async def _humaneval_eval(self, graph_class, params, path, test=False):
"""
Evaluate on HumanEval dataset.
评估HumanEval数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="HumanEval", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/human-eval_test.jsonl"
else:
data_path = "examples/ags/data/human-eval_validate.jsonl"
graph = await load_graph()
file_path = "examples/ags/scripts/data/human-eval-new.jsonl"
score = await humaneval_evaluation(graph, file_path, samples, path)
score, cost = await optimize_humaneval_evaluation(graph, data_path, path)
return score
return score, cost
async def _hotpotqa_eval(self, graph_class, params, path, samples: int = 20):
async def _hotpotqa_eval(self, graph_class, params, path, test=False):
"""
Evaluate on HotpotQA dataset.
评估HotpotQA数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="HotpotQA", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/hotpotqa_test.jsonl"
else:
data_path = "examples/ags/data/hotpotqa_validate.jsonl"
graph = await load_graph()
file_path = "examples/ags/scripts/data/hotpotqa.jsonl"
score = await hotpotqa_evaluation(graph, file_path, samples, path)
score, cost = await optimize_hotpotqa_evaluation(graph, data_path, path)
return score
return score, cost
async def _mbpp_eval(self, graph_class, params, path, samples: int = 1):
async def _mbpp_eval(self, graph_class, params, path, test=False):
"""
Evaluate on MBPP dataset.
评估MBPP数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="MBPP", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/mbpp_test.jsonl"
else:
data_path = "examples/ags/data/mbpp_validate.jsonl"
graph = await load_graph()
file_path = "examples/ags/scripts/data/mbpp-new.jsonl"
score = await mbpp_evaluation(graph, file_path, samples, path)
score, cost = await optimize_mbpp_evaluation(graph, data_path, path)
return score
return score, cost
async def _drop_eval(self, graph_class, params, path):
async def _drop_eval(self, graph_class, params, path, test=False):
"""
Evaluate on DROP dataset.
评估DROP数据集
"""
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="DROP", llm_config=llm_config, dataset=dataset)
if test:
data_path = "examples/ags/data/drop_test.json"
else:
data_path = "examples/ags/data/drop_validate.json"
graph = await load_graph()
file_path = "examples/ags/scripts/data/drop_dataset_dev.json"
score = await drop_evaluation(graph, file_path, path)
score, cost = await optimize_drop_evaluation(graph, data_path, path)
return score
return score, cost

View file

@ -347,6 +347,7 @@ class ScEnsemble(Operator):
return {"solution": solutions[answer_mapping[answer]]} # {"final_solution": "xxx"}
class Rephrase(Operator):
"""
Paper: Code Generation with AlphaCodium: From Prompt Engineering to Flow Engineering
@ -403,15 +404,7 @@ class Test(Operator):
return "no error"
async def __call__(
self,
problem_id,
problem,
rephrase_problem,
solution,
test_cases,
entry_point,
test_loop: int = 3,
mode: str = None,
self, problem_id, problem, rephrase_problem, solution, test_cases, entry_point, test_loop: int = 3
):
solution = solution["final_solution"]
for _ in range(test_loop):
@ -427,10 +420,7 @@ class Test(Operator):
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(**fill_kwargs)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
solution = response["refined_solution"]
else:
@ -441,10 +431,7 @@ class Test(Operator):
exec_pass="executed successfully",
test_fail=result,
)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(**fill_kwargs)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
solution = response["refined_solution"]
@ -455,22 +442,24 @@ class PythonInterpreterOp(Operator):
def __init__(self, name: str = "PythonInterpreterOp", llm: LLM = LLM()):
super().__init__(name, llm)
async def run_code(self, code, timeout=600):
with open("solve_code.py", "w", encoding="utf-8") as f: # TODO 这种依赖
f.write(code)
async def exec_code(self, code, timeout=600):
try:
process = Popen([sys.executable, "solve_code.py"], stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate(timeout=timeout)
if process.returncode != 0:
return "Error", stderr.decode("utf-8", errors="ignore")
# 创建一个新的全局命名空间
global_namespace = {}
# 使用exec执行代码
exec(code, global_namespace)
# 假设代码中定义了一个名为'solve'的函数
if 'solve' in global_namespace:
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Success", stdout.decode("utf-8", errors="ignore")
except TimeoutExpired:
process.terminate()
stdout, stderr = process.communicate()
return "Timeout", "代码执行超时。请尝试优化代码、算法或其他技术以减少执行时间。"
return "Error", "未找到'solve'函数"
except Exception as e:
return "Error", str(e)
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"
def extract_code_block(self, code_block):
match = re.search(r"```python(.*?)```", code_block, re.DOTALL)
@ -491,9 +480,9 @@ class PythonInterpreterOp(Operator):
response = node.instruct_content.model_dump()
code = self.extract_code_block(response["code"])
status, output = await self.run_code(code)
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
return {"code": code, "output": "code execution error, no result!"}
return {"code": code, "output": "代码执行错误,无结果!"}