From 7ffe68b499eb985ba29fbcb702e5e729cfb06d74 Mon Sep 17 00:00:00 2001 From: didi <84363704+didiforgithub@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:19:22 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BA=86Evaluator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/ags/benchmark/drop.py | 186 ++++++ examples/ags/benchmark/hotpotqa.py | 292 ++++----- examples/ags/benchmark/humaneval.py | 233 +++---- examples/ags/benchmark/math.py | 277 ++++++++ examples/ags/benchmark/mbpp.py | 105 +++ examples/ags/scripts/evaluator.py | 965 ++-------------------------- 6 files changed, 846 insertions(+), 1212 deletions(-) create mode 100644 examples/ags/benchmark/drop.py create mode 100644 examples/ags/benchmark/math.py create mode 100644 examples/ags/benchmark/mbpp.py diff --git a/examples/ags/benchmark/drop.py b/examples/ags/benchmark/drop.py new file mode 100644 index 000000000..42851ec83 --- /dev/null +++ b/examples/ags/benchmark/drop.py @@ -0,0 +1,186 @@ +import json +import asyncio +import pandas as pd +from typing import List, Tuple, Callable, Dict, Any, Set +import numpy as np +from scipy.optimize import linear_sum_assignment +from tqdm.asyncio import tqdm_asyncio + +def is_number(text: str) -> bool: + try: + float(text) + return True + except ValueError: + return False + +def normalize_answer(text): + import re + import string + + def remove_articles(text): + return re.sub(r"\b(a|an|the)\b", " ", text) + + def white_space_fix(text): + return " ".join(text.split()) + + def remove_punc(text): + exclude = set(string.punctuation) + return "".join(ch for ch in text if ch not in exclude) + + def lower(text): + return text.lower() + + def tokenize(text): + return re.split(" |-", text) + + def normalize_number(text: str) -> str: + if is_number(text): + return str(float(text)) + else: + return text + + parts = [ + white_space_fix(remove_articles(normalize_number(remove_punc(lower(token))))) + for token in tokenize(text) + ] + parts = [part for part in parts if part.strip()] + normalized = " ".join(parts).strip() + return normalized + +def answer_to_bags(answer: str) -> Set[str]: + raw_spans = [answer] + + normalized_spans = [] + token_bags = [] + for raw_span in raw_spans: + normalized_span = normalize_answer(raw_span) + normalized_spans.append(normalized_span) + token_bags.append(set(normalized_span.split())) + return normalized_spans, token_bags + +def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: + """ + Takes gold and predicted answer sets and first finds the optimal 1-1 alignment + between them and gets maximum metric values over all the answers. + """ + scores = np.zeros([len(gold), len(predicted)]) + for gold_index, gold_item in enumerate(gold): + for pred_index, pred_item in enumerate(predicted): + if match_numbers_if_present(gold_item, pred_item): + scores[gold_index, pred_index] = f1_score(pred_item, gold_item) + row_ind, col_ind = linear_sum_assignment(-scores) + + max_scores = np.zeros([max(len(gold), len(predicted))]) + for row, column in zip(row_ind, col_ind): + max_scores[row] = max(max_scores[row], scores[row, column]) + return max_scores + +def match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: + gold_numbers = set() + predicted_numbers = set() + for word in gold_bag: + if is_number(word): + gold_numbers.add(word) + for word in predicted_bag: + if is_number(word): + predicted_numbers.add(word) + if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): + return True + return False + +def f1_score(predicted_bag: Set[str], gold_bag: Set[str]) -> float: + intersection = len(gold_bag.intersection(predicted_bag)) + if not predicted_bag: + precision = 1.0 + else: + precision = intersection / float(len(predicted_bag)) + if not gold_bag: + recall = 1.0 + else: + recall = intersection / float(len(gold_bag)) + f1 = (2 * precision * recall) / (precision + recall) if not (precision == 0.0 and recall == 0.0) else 0.0 + return f1 + +def load_data(file_path: str) -> List[Tuple[str, Dict[str, Any]]]: + with open(file_path, mode="r") as file: + data = json.load(file) + return list(data.items()) + +async def evaluate_problem(question: str, passage: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]: + def answer_json_to_strings(answer: Dict[str, Any]) -> Tuple[Tuple[str, ...], str]: + if "number" in answer and answer["number"]: + return tuple([str(answer["number"])]), "number" + elif "spans" in answer and answer["spans"]: + return tuple(answer["spans"]), "span" if len(answer["spans"]) == 1 else "spans" + elif "date" in answer: + return ( + tuple( + [ + "{0} {1} {2}".format( + answer["date"]["day"], answer["date"]["month"], answer["date"]["year"] + ) + ] + ), + "date", + ) + else: + raise ValueError(f"Answer type not found, should be one of number, spans or date at: {json.dumps(answer)}") + + prediction = await graph(question, passage) + + def get_f1_score(prediction: str, golden_answer: str) -> float: + predicted_bags = answer_to_bags(prediction) + gold_bags = answer_to_bags(golden_answer) + + f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) + score = np.mean(f1_per_bag) + return score + + max_score = 0.0 + best_answer = None + for answer in answers: + golden_answer, _ = answer_json_to_strings(answer) + golden_answer = golden_answer[0] + score = get_f1_score(prediction, golden_answer) + if score > max_score: + max_score = score + best_answer = golden_answer + + return best_answer, prediction, max_score + +async def evaluate_all_passages(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]: + semaphore = asyncio.Semaphore(max_concurrent_tasks) + results = [] + + async def sem_evaluate(id: str, annotation: Dict[str, Any]): + async with semaphore: + passage = annotation["passage"] + for qa_pair in annotation["qa_pairs"]: + question = qa_pair["question"] + answers = [qa_pair["answer"]] + if "validated_answers" in qa_pair and qa_pair["validated_answers"]: + answers.extend(qa_pair["validated_answers"]) + best_answer, prediction, score = await evaluate_problem(question, passage, answers, graph) + results.append([id, question, prediction, best_answer, score]) + + tasks = [sem_evaluate(id, annotation) for id, annotation in annotations] + await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP passages", total=len(annotations)) + + return results + +def save_results_to_csv(results: List[List[Any]], path: str) -> float: + df = pd.DataFrame(results, columns=["id", "question", "prediction", "best_answer", "score"]) + average_score = df["score"].mean() + + output_file = f"{path}/{average_score:.5f}.csv" + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return average_score + +async def drop_evaluation(graph: Callable, file_path: str, path: str) -> float: + data = load_data(file_path) + results = await evaluate_all_passages(data, graph, max_concurrent_tasks=20) + average_score = save_results_to_csv(results, path=path) + print(f"Average score on DROP dataset: {average_score:.5f}") + return average_score diff --git a/examples/ags/benchmark/hotpotqa.py b/examples/ags/benchmark/hotpotqa.py index faad880d3..2e9be830d 100644 --- a/examples/ags/benchmark/hotpotqa.py +++ b/examples/ags/benchmark/hotpotqa.py @@ -1,123 +1,25 @@ -import asyncio import json -import os -import re -import string -from typing import Literal, Optional - +import asyncio import aiofiles +import pandas as pd +import numpy as np +from typing import List, Tuple, Callable, Set +from tqdm.asyncio import tqdm_asyncio +from scipy.optimize import linear_sum_assignment -from examples.ags.scripts.graph import HotpotQAGraph -from examples.ags.scripts.operator import Format, GenerateOnContext -from examples.ags.scripts.utils import get_hotpotqa -from metagpt.llm import LLM -from metagpt.logs import logger +from examples.ags.benchmark.utils import generate_random_indices -HOTPOTQA_PATH = "hotpotqa_1000.jsonl" +def is_number(text: str) -> bool: + try: + float(text) + return True + except ValueError: + return False +def normalize_answer(text): + import re + import string -def sort_json_by_key(input_path, output_path): - with open(input_path) as f: - data = [json.loads(line) for line in f] - data.sort(key=lambda x: x["task_id"]) - with open(output_path, "w") as f: - for line in data: - f.write(json.dumps(line) + "\n") - - -extract_supporting_sentences = GenerateOnContext( - llm=LLM(), requirement="supporting sentences to get the final answers (split by newline)" -) -generate_on_context = GenerateOnContext(llm=LLM(), requirement="a concise answer without additional context") -format = Format(llm=LLM()) -solver = HotpotQAGraph( - name="solver", - llm=LLM(), - criteria="correctness, only concise answer, without additional context", - HOTPOTQA_PATH=HOTPOTQA_PATH, -) - -ModeType = Literal["ags", "alpha_codium", "llm"] - - -async def llm_generate(id): - dp = get_hotpotqa(HOTPOTQA_PATH)[id] - paragraphs = [item[1] for item in dp["context"] if isinstance(item[1], list)] - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - - supporting_sentences = await extract_supporting_sentences(dp["question"], context_str) - supporting_sentences_str = "\n".join(supporting_sentences.get("solution")) - - answer_result = await generate_on_context(dp["question"], supporting_sentences_str) - answer_result = answer_result.get("solution") - - answer_formated = await format(dp["question"], answer_result) - sample_dict = dict( - task_id=id, - answer=answer_formated.get("solution"), - supporting_sentences=supporting_sentences.get("solution").split("\n"), - ) - return sample_dict - - -async def route_generate(mode: ModeType, id): - if mode == "ags": - sample_dict = await solver(id) - elif mode == "llm": - sample_dict = await llm_generate(id) - else: - raise ValueError(f"Invalid mode: {mode}") - - return sample_dict - - -async def sample_generate(id, result_path: str = "samples.jsonl", mode: ModeType = "llm"): - sample_dict = await route_generate(mode, id) - async with aiofiles.open(result_path, mode="a") as f: - await f.write(json.dumps(sample_dict) + "\n") - # sort_json_by_key(result_path, result_path) - - -async def samples_generate( - mode: ModeType, data_path: str = HOTPOTQA_PATH, result_path: str = "samples.jsonl", max_concurrency: int = 50 -): - ids = list(get_hotpotqa(HOTPOTQA_PATH).keys()) - - file_lock = asyncio.Lock() - semaphore = asyncio.Semaphore(max_concurrency) - - async def answer_and_write(mode: ModeType, id) -> Optional[str]: - async with semaphore: - try: - sample_dict = await route_generate(mode, id) - except Exception: - return id - async with file_lock: - async with aiofiles.open(result_path, mode="a") as f: - await f.write(json.dumps(sample_dict) + "\n") - return None - - tasks = [answer_and_write(mode, id) for id in ids] - results = await asyncio.gather(*tasks) - failed_ids = [id for id in results if id is not None] - - if failed_ids: - logger.info(failed_ids) - for id in failed_ids: - try: - await sample_generate(id, result_path, mode) - failed_ids.remove(id) - except Exception: - logger.error(f"Failed to generate sample for id: {id}") - - sort_json_by_key(result_path, result_path) - - if not failed_ids: - eval_path = result_path[:-6] + "_eval.json" - logger.info(eval(result_path, data_path, eval_path)) - - -def normalize_answer(s): def remove_articles(text): return re.sub(r"\b(a|an|the)\b", " ", text) @@ -131,43 +33,143 @@ def normalize_answer(s): def lower(text): return text.lower() - return white_space_fix(remove_articles(remove_punc(lower(s)))) + def tokenize(text): + return re.split(" |-", text) + def normalize_number(text: str) -> str: + if is_number(text): + return str(float(text)) + else: + return text -def exact_match_score(prediction, ground_truth): - return normalize_answer(prediction) == normalize_answer(ground_truth) + parts = [ + white_space_fix(remove_articles(normalize_number(remove_punc(lower(token))))) + for token in tokenize(text) + ] + parts = [part for part in parts if part.strip()] + normalized = " ".join(parts).strip() + return normalized +def answer_to_bags(answer: str) -> Set[str]: + raw_spans = [answer] -def eval(prediction_file, gold_file, eval_file): - # if existing eval file - if os.path.exists(eval_file): - # read the result - with open(eval_file) as f: - eval_results = [json.loads(line) for line in f] - em = sum([result["em"] for result in eval_results]) - logger.info(f"EM: {em/len(eval_results)}") - return + normalized_spans = [] + token_bags = [] + for raw_span in raw_spans: + normalized_span = normalize_answer(raw_span) + normalized_spans.append(normalized_span) + token_bags.append(set(normalized_span.split())) + return normalized_spans, token_bags - sort_json_by_key(prediction_file, prediction_file) - with open(prediction_file) as f: - predictions = [json.loads(line) for line in f] +def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: + """ + Takes gold and predicted answer sets and first finds the optimal 1-1 alignment + between them and gets maximum metric values over all the answers. + """ + scores = np.zeros([len(gold), len(predicted)]) + for gold_index, gold_item in enumerate(gold): + for pred_index, pred_item in enumerate(predicted): + if match_numbers_if_present(gold_item, pred_item): + scores[gold_index, pred_index] = f1_score(pred_item, gold_item) + row_ind, col_ind = linear_sum_assignment(-scores) - with open(gold_file) as f: - golds = [json.loads(line) for line in f] + max_scores = np.zeros([max(len(gold), len(predicted))]) + for row, column in zip(row_ind, col_ind): + max_scores[row] = max(max_scores[row], scores[row, column]) + return max_scores - eval_results = [] - em = 0 - for prediction, gold in zip(predictions, golds): - if prediction["task_id"] != gold["_id"]: - raise ValueError(f"Task ID {gold['_id']} do not match") - result = exact_match_score(prediction["answer"], gold["answer"]) - em += result - eval_results.append( - {"task_id": prediction["task_id"], "solution": prediction["answer"], "answer": gold["answer"], "em": result} - ) +def match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: + gold_numbers = set() + predicted_numbers = set() + for word in gold_bag: + if is_number(word): + gold_numbers.add(word) + for word in predicted_bag: + if is_number(word): + predicted_numbers.add(word) + if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): + return True + return False - with open(eval_file, "w") as f: - for line in eval_results: - f.write(json.dumps(line) + "\n") +def f1_score(predicted_bag: Set[str], gold_bag: Set[str]) -> float: + intersection = len(gold_bag.intersection(predicted_bag)) + if not predicted_bag: + precision = 1.0 + else: + precision = intersection / float(len(predicted_bag)) + if not gold_bag: + recall = 1.0 + else: + recall = intersection / float(len(gold_bag)) + f1 = (2 * precision * recall) / (precision + recall) if not (precision == 0.0 and recall == 0.0) else 0.0 + return f1 - logger.info(f"EM: {em/len(predictions)}") +async def load_data(file_path: str, samples=20) -> List[dict]: + data = [] + async with aiofiles.open(file_path, mode="r") as file: + async for line in file: + data.append(json.loads(line)) + random_indices = generate_random_indices(len(data), samples) + data = [data[i] for i in random_indices] + return data + +async def evaluate_problem(input: str, context_str: str, graph: Callable, expected_output: str): + max_retries = 5 + retries = 0 + + while retries < max_retries: + try: + prediction, supporting_sentences = await graph(input, context_str) if graph else "None" + predicted_bags = answer_to_bags(prediction) + gold_bags = answer_to_bags(expected_output) + + f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) + score = np.mean(f1_per_bag) + + break + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + prediction = None + supporting_sentences = None + score = 0 + break + + return input, prediction, expected_output, supporting_sentences, score + +async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50): + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + async def sem_evaluate(problem): + async with semaphore: + input_text = problem["question"] + expected_output = problem["answer"] + paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] + context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) + return await evaluate_problem(input_text, context_str, graph, expected_output) + + tasks = [sem_evaluate(problem) for problem in data] + + return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data)) + +def save_results_to_csv(results: List[Tuple[str, str, str, str, float]], path: str) -> float: + df = pd.DataFrame( + results, columns=["question", "prediction", "expected_output", "supporting_sentences", "score"] + ) + average_score = df["score"].mean() + + output_file = f"{path}/{average_score:.5f}.csv" + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return average_score + +async def hotpotqa_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float: + data = await load_data(file_path, samples) + results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) + average_score = save_results_to_csv(results, path=path) + print(f"Average score on HotpotQA dataset: {average_score:.5f}") + return average_score diff --git a/examples/ags/benchmark/humaneval.py b/examples/ags/benchmark/humaneval.py index 3f3b2867e..4d9ab0480 100644 --- a/examples/ags/benchmark/humaneval.py +++ b/examples/ags/benchmark/humaneval.py @@ -1,171 +1,112 @@ -# -*- coding: utf-8 -*- -# @Date : 7/7/2024 17:07 PM -# @Author : didi -# @Desc : test on human eval graph - -import asyncio import json -import os -import subprocess -import sys -from typing import Literal, Optional - +import asyncio import aiofiles -from evalplus.data import get_human_eval_plus +import pandas as pd +from typing import List, Tuple, Callable +from tqdm.asyncio import tqdm_asyncio -from examples.ags.scripts.graph import HumanEvalGraph -from examples.ags.scripts.operator import GenerateCodeBlock -from examples.ags.scripts.utils import sort_json_by_key -from metagpt.llm import LLM -from metagpt.logs import logger -from metagpt.utils.common import add_jsonl_file, read_json_file -from metagpt.utils.exceptions import handle_exception +from examples.ags.benchmark.utils import generate_random_indices -generate_code_block = GenerateCodeBlock(llm=LLM()) -solver = HumanEvalGraph(name="solver", llm=LLM(), criteria="correctness, efficiency, readability", vote_count=1) +PASS = "pass" +FAIL = "fail" -ModeType = Literal["ags", "alpha_codium", "llm"] +async def load_data(file_path: str, samples=1) -> List[dict]: + data = [] + async with aiofiles.open(file_path, mode="r") as file: + async for line in file: + data.append(json.loads(line)) + random_indices = generate_random_indices(len(data), samples) + data = [data[i] for i in random_indices] + return data +async def check_solution(solution, test_cases, entry_point): + # Define a local dictionary to execute the solution + local_dict = {} + exec("from typing import List\n\n" + solution, {}, local_dict) -async def llm_generate(id): - case = get_human_eval_plus()[f"{id}"] - solution_result = await generate_code_block(case["prompt"], case["entry_point"]) - sample_dict = dict(task_id=case["task_id"], solution=solution_result["code_solution"]) - return sample_dict + # Ensure the entry point function is defined + if entry_point not in local_dict: + raise ValueError(f"Function {entry_point} is not defined in the solution.") + details = [False for _ in range(len(test_cases))] -async def ags_generate(id, ensemble_count: int = 5): - case = get_human_eval_plus()[f"{id}"] - solution_result = await solver(case["prompt"], case["entry_point"], ensemble_count=ensemble_count) - sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"]) - return sample_dict + # Check each test case + for i, test in enumerate(test_cases): + # Replace 'candidate' with the actual function call + test_expr = test.replace("candidate", entry_point) + try: + # Evaluate the test case + if eval(test_expr, {}, local_dict): + details[i] = True + except Exception as e: + print(f"Error evaluating test case '{test}': {e}") + if all(details): + return PASS, details -async def alpha_codium_generate(id, ensemble_count: int = 1): - case = get_human_eval_plus()[f"{id}"] - solution_result = await solver.alpha_codium(case["task_id"], case["prompt"], ensemble_count=ensemble_count) - sample_dict = dict(task_id=case["task_id"], solution=solution_result["final_solution"]) - return sample_dict + return FAIL, details +async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str, int]: + max_retries = 5 + retries = 0 -async def route_generate(mode: ModeType, id: str): - token_usage = 0 - money_usage = 0 - if mode == "ags": - sample_dict = await ags_generate(id) - elif mode == "alpha_codium": - sample_dict = await alpha_codium_generate(id, 5) - elif mode == "llm": - sample_dict = await llm_generate(id) - else: - raise ValueError(f"Invalid mode: {mode}") - return sample_dict, token_usage, money_usage + while retries < max_retries: + try: + solution = await graph(data["prompt"]) if graph else "None" + ret = await check_solution(solution, data["test_cases"], data["entry_point"]) + score = 1 if ret[0] == PASS else 0 + break -async def sample_generate(id, result_path: str = "samples.jsonl", mode: ModeType = "ags"): - sample_dict, token_usage, money_usage = await route_generate(mode, id) - add_jsonl_file(result_path, [sample_dict]) - sort_json_by_key(result_path, result_path) + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + solution = None + ret = (FAIL, []) + score = 0 + break -async def samples_generate(mode: ModeType, result_path: str = "samples.jsonl", max_concurrency: int = 50): - ids = list(get_human_eval_plus().keys()) - file_lock = asyncio.Lock() - semaphore = asyncio.Semaphore(max_concurrency) + return data["prompt"], solution, ret[1], score - async def solve_and_write(id: str, mode: ModeType) -> Optional[str]: +async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50) -> List[Tuple[str, str, str, int]]: + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + async def sem_evaluate(problem): async with semaphore: - try: - sample_dict, token_usage, money_usage = await route_generate(mode, id) - except Exception: - return id - async with file_lock: - async with aiofiles.open(result_path, mode="a") as f: - await f.write(json.dumps(sample_dict) + "\n") - return None + return await evaluate_problem(problem, graph) - tasks = [solve_and_write(id, mode) for id in ids] - results = await asyncio.gather(*tasks) - failed_tasks = [task_id for task_id in results if task_id is not None] + tasks = [sem_evaluate(problem) for problem in data] - if failed_tasks: - logger.info(failed_tasks) + return await tqdm_asyncio.gather(*tasks, desc="Evaluating HumanEval problems", total=len(data)) - async def retry_failed_task(task_id): - try: - await sample_generate(task_id, result_path, mode) - return None - except Exception: - logger.error(f"{task_id} fail") - return task_id +def save_results_to_jsonl(results: List[Tuple[str, str, str, int]], path: str) -> float: + avg_score = 0 - retry_results = await asyncio.gather(*[retry_failed_task(task_id) for task_id in failed_tasks]) - failed_tasks = [task_id for task_id in retry_results if task_id is not None] + with open(path, "w") as f: + for result in results: + f.write( + json.dumps( + { + "question": result[0], + "prediction": result[1], + "test_case_details": result[2], + "score": result[3], + } + ) + + "\n" + ) + avg_score += result[3] + print(f"Results saved to {path}") + avg_score /= len(results) - sort_json_by_key(result_path, result_path) + return avg_score - if not failed_tasks: - if automatic_evalplus(result_path): - eval_path = result_path[:-6] + "_eval_results.json" - unpassed_exapmle = extract_failure_tests(eval_path) - logger.info(unpassed_exapmle) - else: - logger.info(failed_tasks) - - -@handle_exception(exception_type=subprocess.CalledProcessError, exception_msg="sanitize error", default_return=None) -def automatic_sanitize(result_path: str = "samples.jsonl") -> Optional[str]: - """ - 在命令行中自动执行 evalplus.sanitize --samples result_path - 返回result_path前缀加上"-sanitized.jsonl" - """ - command = ["evalplus.sanitize", "--samples", result_path] - - subprocess.run(command, check=True) - - base_name = os.path.splitext(result_path)[0] - sanitized_path = f"{base_name}-sanitized.jsonl" - - return sanitized_path - - -@handle_exception( - exception_type=subprocess.CalledProcessError, - exception_msg="Error in automatic_evalplus function", - default_return=False, -) -def automatic_evalplus(result_path: str = "samples.jsonl") -> bool: - """ - 在命令行中自动执行 evalplus.evaluate --dataset humaneval --samples samples.jsonl --parallel 2 --base-only - """ - command = [ - sys.executable, # 使用当前 Python 解释器 - "-m", - "evalplus.evaluate", - "--dataset", - "humaneval", - "--samples", - result_path, - "--parallel", - "2", - "--base-only", - ] - - result = subprocess.run(command, check=True, capture_output=True, text=True) - logger.info(f"ouptput: \n {result.stdout}") - return True - - -def extract_failure_tests(file_path: str = "samples_eval_results.json"): - task_results = read_json_file(file_path) - - failed_tests = [] - for task in task_results["eval"].values(): - if task[0]["base_status"] == "fail": - failed_test = { - "task_id": task[0]["task_id"], - } - failed_tests.append(failed_test) - logger.info(f"length of failed tests: {len(failed_tests)}") - - return failed_tests +async def humaneval_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float: + data = await load_data(file_path, samples) + results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) + average_score = save_results_to_jsonl(results, path=path) + print(f"Average score on HumanEval dataset: {average_score:.5f}") + return average_score diff --git a/examples/ags/benchmark/math.py b/examples/ags/benchmark/math.py new file mode 100644 index 000000000..329237a51 --- /dev/null +++ b/examples/ags/benchmark/math.py @@ -0,0 +1,277 @@ +import re +import regex +from sympy import N, simplify +from sympy.parsing.latex import parse_latex +from sympy.parsing.sympy_parser import parse_expr +from math import isclose +import multiprocessing +import json +import asyncio +import aiofiles +import pandas as pd +from typing import Optional, List, Tuple, Callable, Union +from tqdm.asyncio import tqdm_asyncio + +from examples.ags.benchmark.utils import generate_random_indices + +def extract_answer(text: str) -> str: + # Look for the answer within \boxed{...} + boxed_match = re.search(r"\\boxed{(.*?)}", text) + if boxed_match: + return boxed_match.group(1) + + # If no \boxed{...}, return the last sentence + sentences = text.split(".") + return sentences[-1].strip() if sentences else "" + +def parse_digits(num): + # format: 234.23 || 23% + num = regex.sub(",", "", str(num)) + try: + return float(num) + except: + if num.endswith("%"): + num = num[:-1] + if num.endswith("\\"): + num = num[:-1] + try: + return float(num) / 100 + except: + pass + return None + +def is_digit(num): + # paired with parse_digits + return parse_digits(num) is not None + +def symbolic_equal(a, b): + def _parse(s): + for f in [parse_latex, parse_expr]: + try: + return f(s) + except: + pass + return s + + a = _parse(a) + b = _parse(b) + + try: + if simplify(a - b) == 0: + return True + except: + pass + + try: + if isclose(N(a), N(b), abs_tol=1e-3): + return True + except: + pass + return False + +def call_with_timeout(func, *args, timeout=5, **kwargs): + output_queue = multiprocessing.Queue() + process_args = args + (output_queue,) + process = multiprocessing.Process(target=func, args=process_args, kwargs=kwargs) + process.start() + process.join(timeout) + + if process.is_alive(): + process.terminate() + process.join() + return False + + return output_queue.get() + +def math_equal( + prediction: Union[bool, float, str], + reference: Union[float, str], + include_percentage: bool = True, + is_close: bool = True, + timeout: bool = False, +) -> bool: + """ + Exact match of math if and only if: + 1. numerical equal: both can convert to float and are equal + 2. symbolic equal: both can convert to sympy expression and are equal + """ + if str(prediction) == str(reference): + return True + + try: # 1. numerical equal + if is_digit(prediction) and is_digit(reference): + prediction = parse_digits(prediction) + reference = parse_digits(reference) + # number questions + if include_percentage: + gt_result = [reference / 100, reference, reference * 100] + else: + gt_result = [reference] + for item in gt_result: + try: + if is_close: + if isclose(item, prediction, abs_tol=1e-3): + return True + else: + if item == prediction: + return True + except Exception: + continue + return False + except: + pass + + if not prediction and prediction not in [0, False]: + return False + + # 2. symbolic equal + reference = str(reference).strip() + prediction = str(prediction).strip() + + if ( + regex.match(r"(\(|\[).+(\)|\])", prediction) is not None + and regex.match(r"(\(|\[).+(\)|\])", reference) is not None + ): + pred_parts = prediction[1:-1].split(",") + ref_parts = reference[1:-1].split(",") + if len(pred_parts) == len(ref_parts): + if all( + [ + math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close) + for i in range(len(pred_parts)) + ] + ): + return True + + if ( + (prediction.startswith("\\begin{pmatrix}") or prediction.startswith("\\begin{bmatrix}")) + and (prediction.endswith("\\end{pmatrix}") or prediction.endswith("\\end{bmatrix}")) + and (reference.startswith("\\begin{pmatrix}") or reference.startswith("\\begin{bmatrix}")) + and (reference.endswith("\\end{pmatrix}") or reference.endswith("\\end{bmatrix}")) + ): + pred_lines = [ + line.strip() + for line in prediction[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\") + if line.strip() + ] + ref_lines = [ + line.strip() + for line in reference[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\") + if line.strip() + ] + matched = True + if len(pred_lines) == len(ref_lines): + for pred_line, ref_line in zip(pred_lines, ref_lines): + pred_parts = pred_line.split("&") + ref_parts = ref_line.split("&") + if len(pred_parts) == len(ref_parts): + if not all( + [ + math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close) + for i in range(len(pred_parts)) + ] + ): + matched = False + break + else: + matched = False + if not matched: + break + else: + matched = False + if matched: + return True + + if prediction.count("=") == 1 and reference.count("=") == 1: + pred = prediction.split("=") + pred = f"{pred[0].strip()} - ({pred[1].strip()})" + ref = reference.split("=") + ref = f"{ref[0].strip()} - ({ref[1].strip()})" + if symbolic_equal(pred, ref) or symbolic_equal(f"-({pred})", ref): + return True + elif prediction.count("=") == 1 and len(prediction.split("=")[0].strip()) <= 2 and "=" not in reference: + if math_equal(prediction.split("=")[1], reference, include_percentage, is_close): + return True + elif reference.count("=") == 1 and len(reference.split("=")[0].strip()) <= 2 and "=" not in prediction: + if math_equal(prediction, reference.split("=")[1], include_percentage, is_close): + return True + + # symbolic equal with sympy + if timeout: + if call_with_timeout(symbolic_equal, prediction, reference): + return True + else: + if symbolic_equal(prediction, reference): + return True + + return False + +def calculate_score(expected_output: str, prediction: str) -> int: + expected_answer = extract_answer(expected_output) + predicted_answer = extract_answer(prediction) + + return 1 if math_equal(predicted_answer, expected_answer) else 0 + +async def load_data(file_path: str, samples: int = 200) -> List[dict]: + data = [] + async with aiofiles.open(file_path, mode="r") as file: + async for line in file: + data.append(json.loads(line)) + random_indices = generate_random_indices(len(data), samples) + data = [data[i] for i in random_indices] + return data + +def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> float: + df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"]) + average_score = df["score"].mean() + + output_file = f"{path}/{average_score:.5f}.csv" + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + return average_score + +async def evaluate_problem(problem: dict, graph: Callable) -> Tuple[str, str, str, int, str]: + input_text = problem["problem"] + expected_output = problem["solution"] + max_retries = 5 + retries = 0 + + while retries < max_retries: + try: + prediction = await graph(input_text) + cost = prediction[1] + output = prediction[0]["solution"] + + score = calculate_score(expected_output, output) + break + + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + output = None + cost = None + score = 0 + break + + return input_text, output, expected_output, score, cost + +async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 20) -> List[Tuple[str, str, str, int, str]]: + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + async def sem_evaluate(problem): + async with semaphore: + return await evaluate_problem(problem, graph) + + tasks = [sem_evaluate(problem) for problem in data] + + return await tqdm_asyncio.gather(*tasks, desc="Evaluating MATH problems", total=len(data)) + +async def math_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float: + data = await load_data(file_path, samples) + results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) + average_score = save_results_to_csv(results, path=path) + print(f"Average score on MATH dataset: {average_score:.5f}") + return average_score diff --git a/examples/ags/benchmark/mbpp.py b/examples/ags/benchmark/mbpp.py new file mode 100644 index 000000000..14075874a --- /dev/null +++ b/examples/ags/benchmark/mbpp.py @@ -0,0 +1,105 @@ +import json +import asyncio +import aiofiles +import pandas as pd +from typing import List, Tuple, Callable +from tqdm.asyncio import tqdm_asyncio + +from examples.ags.benchmark.utils import generate_random_indices + +PASS = "pass" +FAIL = "fail" + +async def load_data(file_path: str, samples=1) -> List[dict]: + data = [] + async with aiofiles.open(file_path, mode="r") as file: + async for line in file: + data.append(json.loads(line)) + random_indices = generate_random_indices(len(data), samples) + data = [data[i] for i in random_indices] + return data + +async def check_solution(solution, test_cases, timeout=1): + # Define a local dictionary to execute the solution + local_dict = {} + exec(solution, {}, local_dict) + + details = [False for _ in range(len(test_cases))] + + async def evaluate_test(test): + # Delete 'assert' from test + test_expr = test.replace("assert ", "") + try: + # Evaluate the test case with timeout + await asyncio.wait_for(asyncio.to_thread(eval, test_expr, {}, local_dict), timeout) + return True + except asyncio.TimeoutError: + print(f"Test case '{test}' timed out.") + except Exception as e: + print(f"Error evaluating test case '{test}': {e}") + return False + + # Check each test case + for i, test in enumerate(test_cases): + result = await evaluate_test(test) + details[i] = result + if not result: + return FAIL, details + + if all(details): + return PASS, details + + return FAIL, details + +async def evaluate_problem(data: dict, graph: Callable) -> Tuple[str, str, str, int]: + max_retries = 5 + retries = 0 + + while retries < max_retries: + try: + solution = await graph(data["prompt"]) if graph else "None" + ret = await check_solution(solution, data["test_list"]) + + score = 1 if ret[0] == PASS else 0 + break + + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + solution = None + ret = (FAIL, []) + score = 0 + break + + return data["prompt"], solution, ret[1], score + +async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50) -> List[Tuple[str, str, str, int]]: + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + async def sem_evaluate(problem): + async with semaphore: + return await evaluate_problem(problem, graph) + + tasks = [sem_evaluate(problem) for problem in data] + + return await tqdm_asyncio.gather(*tasks, desc="Evaluating MBPP problems", total=len(data)) + +def save_results_to_csv(results: List[Tuple[str, str, str, int]], path: str) -> float: + df = pd.DataFrame(results, columns=["question", "prediction", "test_case_details", "score"]) + average_score = df["score"].mean() + + output_file = f"{path}/{average_score:.5f}.csv" + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return average_score + +async def mbpp_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float: + data = await load_data(file_path, samples) + results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) + average_score = save_results_to_csv(results, path=path) + print(f"Average score on MBPP dataset: {average_score:.5f}") + return average_score diff --git a/examples/ags/scripts/evaluator.py b/examples/ags/scripts/evaluator.py index b6b01875f..4d17afb6b 100644 --- a/examples/ags/scripts/evaluator.py +++ b/examples/ags/scripts/evaluator.py @@ -22,6 +22,11 @@ from tqdm.asyncio import tqdm_asyncio from examples.ags.benchmark.gsm8k import gsm8k_evaluation from examples.ags.benchmark.utils import generate_random_indices +from examples.ags.benchmark.math import math_evaluation +from examples.ags.benchmark.humaneval import humaneval_evaluation +from examples.ags.benchmark.mbpp import mbpp_evaluation +from examples.ags.benchmark.drop import drop_evaluation +from examples.ags.benchmark.hotpotqa import hotpotqa_evaluation DatasetType = Literal["HumanEval", "MBPP", "Gsm8K", "MATH", "HotpotQA", "DROP"] @@ -41,15 +46,15 @@ class Evaluator: if dataset == "Gsm8K": return self._gsm8k_eval(graph, params, path) elif dataset == "MATH": - self._math_eval(graph, params, path) + return self._math_eval(graph, params, path) elif dataset == "HumanEval": - return self._humaneval_eval(graph, params) + return self._humaneval_eval(graph, params, path) elif dataset == "HotpotQA": - return self._hotpotqa_eval(graph, params) + return self._hotpotqa_eval(graph, params, path) elif dataset == "MBPP": - return self._mbpp_eval(graph, params) + return self._mbpp_eval(graph, params, path) elif dataset == "DROP": - return self._drop_eval(graph, params) + return self._drop_eval(graph, params, path) def test_evaluate(self, dataset: DatasetType): """ @@ -77,960 +82,78 @@ class Evaluator: """ Evaluate on MATH dataset. """ - async def load_graph(): dataset = params["dataset"] llm_config = params["llm_config"] + return graph_class(name="MATH", llm_config=llm_config, dataset=dataset) - graph = graph_class(name="MATH", llm_config=llm_config, dataset=dataset) - return graph - - def extract_answer(text: str) -> str: - # Look for the answer within \boxed{...} - boxed_match = re.search(r"\\boxed{(.*?)}", text) - if boxed_match: - return boxed_match.group(1) - - # If no \boxed{...}, return the last sentence - sentences = text.split(".") - return sentences[-1].strip() if sentences else "" - - def parse_digits(num): - # format: 234.23 || 23% - num = regex.sub(",", "", str(num)) - try: - return float(num) - except: - if num.endswith("%"): - num = num[:-1] - if num.endswith("\\"): - num = num[:-1] - try: - return float(num) / 100 - except: - pass - return None - - def is_digit(num): - # paired with parse_digits - return parse_digits(num) is not None - - def symbolic_equal(a, b): - def _parse(s): - for f in [parse_latex, parse_expr]: - try: - return f(s) - except: - pass - return s - - a = _parse(a) - b = _parse(b) - - try: - if simplify(a - b) == 0: - return True - except: - pass - - try: - if isclose(N(a), N(b), abs_tol=1e-3): - return True - except: - pass - return False - - def call_with_timeout(func, *args, timeout=5, **kwargs): - output_queue = multiprocessing.Queue() - process_args = args + (output_queue,) - process = multiprocessing.Process(target=func, args=process_args, kwargs=kwargs) - process.start() - process.join(timeout) - - if process.is_alive(): - process.terminate() - process.join() - return False - - return output_queue.get() - - def math_equal( - prediction: Union[bool, float, str], - reference: Union[float, str], - include_percentage: bool = True, - is_close: bool = True, - timeout: bool = False, - ) -> bool: - """ - Exact match of math if and only if: - 1. numerical equal: both can convert to float and are equal - 2. symbolic equal: both can convert to sympy expression and are equal - """ - if str(prediction) == str(reference): - return True - - try: # 1. numerical equal - if is_digit(prediction) and is_digit(reference): - prediction = parse_digits(prediction) - reference = parse_digits(reference) - # number questions - if include_percentage: - gt_result = [reference / 100, reference, reference * 100] - else: - gt_result = [reference] - for item in gt_result: - try: - if is_close: - if isclose(item, prediction, abs_tol=1e-3): - return True - else: - if item == prediction: - return True - except Exception: - continue - return False - except: - pass - - if not prediction and prediction not in [0, False]: - return False - - # 2. symbolic equal - reference = str(reference).strip() - prediction = str(prediction).strip() - - if ( - regex.match(r"(\(|\[).+(\)|\])", prediction) is not None - and regex.match(r"(\(|\[).+(\)|\])", reference) is not None - ): - pred_parts = prediction[1:-1].split(",") - ref_parts = reference[1:-1].split(",") - if len(pred_parts) == len(ref_parts): - if all( - [ - math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close) - for i in range(len(pred_parts)) - ] - ): - return True - - if ( - (prediction.startswith("\\begin{pmatrix}") or prediction.startswith("\\begin{bmatrix}")) - and (prediction.endswith("\\end{pmatrix}") or prediction.endswith("\\end{bmatrix}")) - and (reference.startswith("\\begin{pmatrix}") or reference.startswith("\\begin{bmatrix}")) - and (reference.endswith("\\end{pmatrix}") or reference.endswith("\\end{bmatrix}")) - ): - pred_lines = [ - line.strip() - for line in prediction[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\") - if line.strip() - ] - ref_lines = [ - line.strip() - for line in reference[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\") - if line.strip() - ] - matched = True - if len(pred_lines) == len(ref_lines): - for pred_line, ref_line in zip(pred_lines, ref_lines): - pred_parts = pred_line.split("&") - ref_parts = ref_line.split("&") - if len(pred_parts) == len(ref_parts): - if not all( - [ - math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close) - for i in range(len(pred_parts)) - ] - ): - matched = False - break - else: - matched = False - if not matched: - break - else: - matched = False - if matched: - return True - - if prediction.count("=") == 1 and reference.count("=") == 1: - pred = prediction.split("=") - pred = f"{pred[0].strip()} - ({pred[1].strip()})" - ref = reference.split("=") - ref = f"{ref[0].strip()} - ({ref[1].strip()})" - if symbolic_equal(pred, ref) or symbolic_equal(f"-({pred})", ref): - return True - elif prediction.count("=") == 1 and len(prediction.split("=")[0].strip()) <= 2 and "=" not in reference: - if math_equal(prediction.split("=")[1], reference, include_percentage, is_close): - return True - elif reference.count("=") == 1 and len(reference.split("=")[0].strip()) <= 2 and "=" not in prediction: - if math_equal(prediction, reference.split("=")[1], include_percentage, is_close): - return True - - # symbolic equal with sympy - if timeout: - if call_with_timeout(symbolic_equal, prediction, reference): - return True - else: - if symbolic_equal(prediction, reference): - return True - - return False - - def calculate_score(expected_output: str, prediction: str) -> int: - expected_answer = extract_answer(expected_output) - predicted_answer = extract_answer(prediction) - - return 1 if math_equal(predicted_answer, expected_answer) else 0 - - async def _evaluate_problem(problem: dict, graph) -> Tuple[str, str, str, int, str]: - input_text = problem["problem"] - expected_output = problem["solution"] - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - prediction = await graph(input_text) if graph else "None" - cost = prediction[1] - output = prediction[0]["solution"] - - score = calculate_score(expected_output, output) - break - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - output = None - cost = None - score = 0 - break - - return input_text, output, expected_output, score, cost - - async def load_data(file_path: str) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - return data[:samples] - - async def evaluate_all_problems(data: List[dict], graph, max_concurrent_tasks: int = 300): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - return await _evaluate_problem(problem, graph) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating MATH problems", total=len(data)) - - def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path): - df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"]) - average_score = df["score"].mean() - - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - - async def math_evaluation(): - file_path = "examples/ags/w_action_node/data/math.jsonl" # Replace with the actual path to MATH.jsonl - data = await load_data(file_path) - - graph = await load_graph() - - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) - - average_score = save_results_to_csv(results, path=path) - - print(f"Average score on MATH dataset: {average_score:.5f}") - return average_score - - score = await math_evaluation() - + graph = await load_graph() + file_path = "examples/ags/w_action_node/data/math.jsonl" # 替换为实际的 MATH.jsonl 路径 + + score = await math_evaluation(graph, file_path, samples, path) + return score - async def _humaneval_eval(self, graph_class, params, test=False): + async def _humaneval_eval(self, graph_class, params, path, samples: int = 1): """ Evaluate on HumanEval dataset. """ - PASS = "pass" - FAIL = "fail" - async def load_graph(): dataset = params["dataset"] llm_config = params["llm_config"] + return graph_class(name="HumanEval", llm_config=llm_config, dataset=dataset) - graph = graph_class(name="HumanEval", llm_config=llm_config, dataset=dataset) - return graph - - async def load_data(file_path: str, samples=1) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - random_indices = generate_random_indices(len(data), samples) - data = [data[i] for i in random_indices] - return data - - async def check_solution(solution, test_cases, entry_point): - # Define a local dictionary to execute the solution - local_dict = {} - exec("from typing import List\n\n" + solution, {}, local_dict) - - # Ensure the entry point function is defined - if entry_point not in local_dict: - raise ValueError(f"Function {entry_point} is not defined in the solution.") - - details = [False for _ in range(len(test_cases))] - - # Check each test case - for i, test in enumerate(test_cases): - # Replace 'candidate' with the actual function call - test_expr = test.replace("candidate", entry_point) - try: - # Evaluate the test case - if eval(test_expr, {}, local_dict): - details[i] = True - except Exception as e: - print(f"Error evaluating test case '{test}': {e}") - - if all(details): - return PASS, details - - return FAIL, details - - async def _evaluate_problem(data, graph) -> Tuple[str, str, str, int]: - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - solution = await graph(data["prompt"]) if graph else "None" - ret = await check_solution(solution, data["test_cases"], data["entry_point"]) - - score = 1 if ret[0] == PASS else 0 - break - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - solution = None - ret = (FAIL, []) - score = 0 - break - - return data["prompt"], solution, ret[1], score - - async def evaluate_all_problems(data: List[dict], graph, max_concurrent_tasks: int = 50): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - return await _evaluate_problem(problem, graph) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data)) - - def save_results_to_jsonl(results: List[Tuple[str, str, str, int]], path): - avg_score = 0 - - with open(path, "w") as f: - for result in results: - f.write( - json.dumps( - { - "question": result[0], - "prediction": result[1], - "test_case_details": result[2], - "score": result[3], - } - ) - + "\n" - ) - avg_score += result[3] - print(f"Results saved to {path}") - avg_score /= len(results) - - return avg_score - - async def humaneval(): - file_path = "examples/ags/scripts/data/human-eval-new.jsonl" - data = await load_data(file_path) - - graph = await load_graph() - - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) - - # 保存结果到JSONL文件并获取平均分 - average_score = save_results_to_jsonl(results, path=self.eval_path) - - print(f"Average score: {average_score:.5f}") - return average_score - - score = await humaneval() - + graph = await load_graph() + file_path = "examples/ags/scripts/data/human-eval-new.jsonl" + + score = await humaneval_evaluation(graph, file_path, samples, path) + return score - async def _hotpotqa_eval(self, graph_class, params, test=False): + async def _hotpotqa_eval(self, graph_class, params, path, samples: int = 20): """ Evaluate on HotpotQA dataset. """ - - def is_number(text: str) -> bool: - try: - float(text) - return True - except ValueError: - return False - - def normalize_answer(text): - import re - import string - - def remove_articles(text): - return re.sub(r"\b(a|an|the)\b", " ", text) - - def white_space_fix(text): - return " ".join(text.split()) - - def remove_punc(text): - exclude = set(string.punctuation) - return "".join(ch for ch in text if ch not in exclude) - - def lower(text): - return text.lower() - - def tokenize(text): - return re.split(" |-", text) - - def normalize_number(text: str) -> str: - if is_number(text): - return str(float(text)) - else: - return text - - parts = [ - white_space_fix(remove_articles(normalize_number(remove_punc(lower(token))))) - for token in tokenize(text) - ] - parts = [part for part in parts if part.strip()] - normalized = " ".join(parts).strip() - return normalized - - def answer_to_bags(answer: str) -> Set[str]: - raw_spans = [answer] - - normalized_spans = [] - token_bags = [] - for raw_span in raw_spans: - normalized_span = normalize_answer(raw_span) - normalized_spans.append(normalized_span) - token_bags.append(set(normalized_span.split())) - return normalized_spans, token_bags - - def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: - """ - Takes gold and predicted answer sets and first finds the optimal 1-1 alignment - between them and gets maximum metric values over all the answers. - """ - scores = np.zeros([len(gold), len(predicted)]) - for gold_index, gold_item in enumerate(gold): - for pred_index, pred_item in enumerate(predicted): - if match_numbers_if_present(gold_item, pred_item): - scores[gold_index, pred_index] = f1_score(pred_item, gold_item) - row_ind, col_ind = linear_sum_assignment(-scores) - - max_scores = np.zeros([max(len(gold), len(predicted))]) - for row, column in zip(row_ind, col_ind): - max_scores[row] = max(max_scores[row], scores[row, column]) - return max_scores - - def match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: - gold_numbers = set() - predicted_numbers = set() - for word in gold_bag: - if is_number(word): - gold_numbers.add(word) - for word in predicted_bag: - if is_number(word): - predicted_numbers.add(word) - if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): - return True - return False - - def f1_score(predicted_bag: Set[str], gold_bag: Set[str]) -> float: - intersection = len(gold_bag.intersection(predicted_bag)) - if not predicted_bag: - precision = 1.0 - else: - precision = intersection / float(len(predicted_bag)) - if not gold_bag: - recall = 1.0 - else: - recall = intersection / float(len(gold_bag)) - f1 = (2 * precision * recall) / (precision + recall) if not (precision == 0.0 and recall == 0.0) else 0.0 - return f1 - async def load_graph(): dataset = params["dataset"] llm_config = params["llm_config"] + return graph_class(name="HotpotQA", llm_config=llm_config, dataset=dataset) - graph = graph_class(name="HotpotQA", llm_config=llm_config, dataset=dataset) - return graph - - async def load_data(file_path: str, samples=20) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - random_indices = generate_random_indices(len(data), samples) - data = [data[i] for i in random_indices] - return data - - async def _evaluate_problem(input: str, context_str: str, graph, expected_output: str): - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - # TODO Hotpotqa Graph 需要修改输入和输出 - prediction, supporting_sentences = await graph(input, context_str) if graph else "None" - predicted_bags = answer_to_bags(prediction) - gold_bags = answer_to_bags(expected_output) - - if set(predicted_bags[0]) == set(gold_bags[0]): - pass - else: - pass - - f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) - score = np.mean(f1_per_bag) - # f1 = round(f1, 2) - - break - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - prediction = None - supporting_sentences = None - score = 0 - break - - return input, prediction, expected_output, supporting_sentences, score - - async def evaluate_all_problems(data: List[dict], graph, max_concurrent_tasks: int = 50): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - input_text = problem["question"] - expected_output = problem["answer"] - paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - return await _evaluate_problem(input_text, context_str, graph, expected_output) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data)) - - def save_results_to_csv(results: List[Tuple[str, str, str, str, int]], path): - df = pd.DataFrame( - results, columns=["question", "prediction", "expected_output", "supporting_sentences", "score"] - ) - average_score = df["score"].mean() - - # 生成文件名,保留五位小数 - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - - async def hotpotqa(): - file_path = "examples/ags/scripts/data/hotpotqa.jsonl" # 替换为您的JSONL文件路径 - data = await load_data(file_path) - - graph = await load_graph() - - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) - - # 保存结果到JSONL文件并获取平均分 - average_score = save_results_to_csv(results, path=self.eval_path) - - print(f"Average score: {average_score:.5f}") - return average_score - - score = await hotpotqa() - + graph = await load_graph() + file_path = "examples/ags/scripts/data/hotpotqa.jsonl" + + score = await hotpotqa_evaluation(graph, file_path, samples, path) + return score - async def _mbpp_eval(self, graph_class, params, test=False): + async def _mbpp_eval(self, graph_class, params, path, samples: int = 1): """ Evaluate on MBPP dataset. """ - - PASS = "pass" - FAIL = "fail" - - async def load_data(file_path: str, samples=1) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - random_indices = generate_random_indices(len(data), samples) - data = [data[i] for i in random_indices] - return data - async def load_graph(): dataset = params["dataset"] llm_config = params["llm_config"] + return graph_class(name="MBPP", llm_config=llm_config, dataset=dataset) - graph = graph_class(name="MBPP", llm_config=llm_config, dataset=dataset) - return graph - - async def check_solution(solution, test_cases, timeout=1): - # Define a local dictionary to execute the solution - local_dict = {} - exec(solution, {}, local_dict) - - details = [False for _ in range(len(test_cases))] - - async def evaluate_test(test): - # Delete 'assert' from test - test_expr = test.replace("assert ", "") - try: - # Evaluate the test case with timeout - await asyncio.wait_for(asyncio.to_thread(eval, test_expr, {}, local_dict), timeout) - return True - except asyncio.TimeoutError: - print(f"Test case '{test}' timed out.") - except Exception as e: - print(f"Error evaluating test case '{test}': {e}") - return False - - # Check each test case - for i, test in enumerate(test_cases): - result = await evaluate_test(test) - details[i] = result - if not result: - return FAIL, details - - if all(details): - return PASS, details - - return FAIL, details - - async def _evaluate_problem(data, graph) -> Tuple[str, str, str, int]: - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - solution = await graph(data["prompt"]) if graph else "None" - ret = await check_solution(solution, data["test_list"]) - - score = 1 if ret[0] == PASS else 0 - break - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - solution = None - ret = (FAIL, []) - score = 0 - break - - return data["prompt"], solution, ret[1], score - - async def evaluate_all_problems(data: List[dict], graph, max_concurrent_tasks: int = 50): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - return await _evaluate_problem(problem, graph) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data)) - - def save_results_to_csv(results: List[Tuple[str, str, str, str, int]], path): - df = pd.DataFrame(results, columns=["question", "prediction", "test_case_details", "score"]) - average_score = df["score"].mean() - - # 生成文件名,保留五位小数 - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - - async def mbpp(): - file_path = "examples/ags/scripts/data/mbpp-new.jsonl" - data = await load_data(file_path) - - graph = await load_graph() - - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) - - # 保存结果到JSONL文件并获取平均分 - average_score = save_results_to_csv(results, path=self.eval_path) - - print(f"Average score: {average_score:.5f}") - return average_score - - score = await mbpp() - + graph = await load_graph() + file_path = "examples/ags/scripts/data/mbpp-new.jsonl" + + score = await mbpp_evaluation(graph, file_path, samples, path) + return score - async def _drop_eval(self, graph_class, params, test=False): + async def _drop_eval(self, graph_class, params, path): """ Evaluate on DROP dataset. """ - - def is_number(text: str) -> bool: - try: - float(text) - return True - except ValueError: - return False - - def normalize_answer(text): - import re - import string - - def remove_articles(text): - return re.sub(r"\b(a|an|the)\b", " ", text) - - def white_space_fix(text): - return " ".join(text.split()) - - def remove_punc(text): - exclude = set(string.punctuation) - return "".join(ch for ch in text if ch not in exclude) - - def lower(text): - return text.lower() - - def tokenize(text): - return re.split(" |-", text) - - def normalize_number(text: str) -> str: - if is_number(text): - return str(float(text)) - else: - return text - - parts = [ - white_space_fix(remove_articles(normalize_number(remove_punc(lower(token))))) - for token in tokenize(text) - ] - parts = [part for part in parts if part.strip()] - normalized = " ".join(parts).strip() - return normalized - - # def exact_match_score(prediction, ground_truth): - # return int(normalize_answer(prediction) == normalize_answer(ground_truth)) - - def answer_to_bags(answer: str) -> Set[str]: - raw_spans = [answer] - - normalized_spans = [] - token_bags = [] - for raw_span in raw_spans: - normalized_span = normalize_answer(raw_span) - normalized_spans.append(normalized_span) - token_bags.append(set(normalized_span.split())) - return normalized_spans, token_bags - - def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: - """ - Takes gold and predicted answer sets and first finds the optimal 1-1 alignment - between them and gets maximum metric values over all the answers. - """ - scores = np.zeros([len(gold), len(predicted)]) - for gold_index, gold_item in enumerate(gold): - for pred_index, pred_item in enumerate(predicted): - if match_numbers_if_present(gold_item, pred_item): - scores[gold_index, pred_index] = f1_score(pred_item, gold_item) - row_ind, col_ind = linear_sum_assignment(-scores) - - max_scores = np.zeros([max(len(gold), len(predicted))]) - for row, column in zip(row_ind, col_ind): - max_scores[row] = max(max_scores[row], scores[row, column]) - return max_scores - - def match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: - gold_numbers = set() - predicted_numbers = set() - for word in gold_bag: - if is_number(word): - gold_numbers.add(word) - for word in predicted_bag: - if is_number(word): - predicted_numbers.add(word) - if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): - return True - return False - - def f1_score(predicted_bag: Set[str], gold_bag: Set[str]) -> float: - intersection = len(gold_bag.intersection(predicted_bag)) - if not predicted_bag: - precision = 1.0 - else: - precision = intersection / float(len(predicted_bag)) - if not gold_bag: - recall = 1.0 - else: - recall = intersection / float(len(gold_bag)) - f1 = (2 * precision * recall) / (precision + recall) if not (precision == 0.0 and recall == 0.0) else 0.0 - return f1 - async def load_graph(): dataset = params["dataset"] llm_config = params["llm_config"] + return graph_class(name="DROP", llm_config=llm_config, dataset=dataset) - graph = graph_class(name="HotpotQA", llm_config=llm_config, dataset=dataset) - return graph - - def load_data(file_path: str, samples=1) -> List[dict]: - with open(file_path, mode="r") as file: - data = json.load(file) - data = list(data.items()) - random_indices = generate_random_indices(len(data), samples) - data = [data[i] for i in random_indices] - return data - - async def _evaluate_problem(question, passage, answers, graph): - max_retries = 5 - retries = 0 - - def answer_json_to_strings(answer: Dict[str, Any]) -> Tuple[Tuple[str, ...], str]: - """ - Takes an answer JSON blob from the DROP data release and converts it into strings used for - evaluation. - """ - if "number" in answer and answer["number"]: - return tuple([str(answer["number"])]), "number" - elif "spans" in answer and answer["spans"]: - return tuple(answer["spans"]), "span" if len(answer["spans"]) == 1 else "spans" - elif "date" in answer: - return ( - tuple( - [ - "{0} {1} {2}".format( - answer["date"]["day"], answer["date"]["month"], answer["date"]["year"] - ) - ] - ), - "date", - ) - else: - raise ValueError( - f"Answer type not found, should be one of number, spans or date at: {json.dumps(answer)}" - ) - - prediction = await graph(question, passage) if graph else "None" - while retries < max_retries: - try: - - def get_f1_score(prediction: str, golden_answer: str) -> float: - predicted_bags = answer_to_bags(prediction) - gold_bags = answer_to_bags(golden_answer) - - if set(predicted_bags[0]) == set(gold_bags[0]): - pass - else: - pass - - f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) - score = np.mean(f1_per_bag) - return score - - max_score = 0.0 - best_answer = None - for answer in answers: - golden_answer, _ = answer_json_to_strings(answer) - golden_answer = golden_answer[0] - score = get_f1_score(prediction, golden_answer) - if score > max_score: - max_score = score - best_answer = golden_answer - - break - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - - max_score = 0 - break - - return best_answer, prediction, max_score - - async def evaluate_all_passages(annotations, graph, max_concurrent_tasks: int = 50): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - results = [] - - async def sem_evaluate(id, annotation): - async with semaphore: - passage = annotation["passage"] - for qa_pair in annotation["qa_pairs"]: - question = qa_pair["question"] - answers = [qa_pair["answer"]] - if "validated_answers" in qa_pair and qa_pair["validated_answers"]: - answers.extend(qa_pair["validated_answers"]) - best_answer, prediction, score = await _evaluate_problem(question, passage, answers, graph) - results.append([id, question, prediction, best_answer, score]) - - tasks = [sem_evaluate(id, annotation) for id, annotation in annotations] - await tqdm_asyncio.gather(*tasks, desc="Evaluating passages", total=len(annotations)) - - return results - - def save_results_to_csv(results: List[Tuple[str, str, str, str, int]], path): - df = pd.DataFrame(results, columns=["id", "question", "prediction", "best_answer", "score"]) - average_score = df["score"].mean() - - # 生成文件名,保留五位小数 - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - - async def drop(): - file_path = "examples/ags/scripts/data/drop_dataset_dev.json" # 替换为您的JSONL文件路径 - data = load_data(file_path) - - graph = await load_graph() - - results = await evaluate_all_passages(data, graph, max_concurrent_tasks=20) - - # 保存结果到JSONL文件并获取平均分 - average_score = save_results_to_csv(results, path=self.eval_path) - - print(f"Average score: {average_score:.5f}") - return average_score - - score = await drop() - + graph = await load_graph() + file_path = "examples/ags/scripts/data/drop_dataset_dev.json" + + score = await drop_evaluation(graph, file_path, path) + return score