From eea94865ad65f7650b61264c8d7064d635f242d7 Mon Sep 17 00:00:00 2001 From: didi <84363704+didiforgithub@users.noreply.github.com> Date: Wed, 16 Oct 2024 12:06:34 +0800 Subject: [PATCH] Update Eval --- examples/aflow/benchmark/drop.py | 223 ++++++++++++++------------- examples/aflow/benchmark/hotpotqa.py | 199 +++++++++++++++--------- 2 files changed, 245 insertions(+), 177 deletions(-) diff --git a/examples/aflow/benchmark/drop.py b/examples/aflow/benchmark/drop.py index 98c836d80..aee8e21cc 100644 --- a/examples/aflow/benchmark/drop.py +++ b/examples/aflow/benchmark/drop.py @@ -1,18 +1,16 @@ +import os +import re import json import asyncio -import pandas as pd import string -import re -from typing import List, Tuple, Callable, Dict, Any, Set, Union from collections import Counter -import numpy as np -from scipy.optimize import linear_sum_assignment +from datetime import datetime +from typing import Any, Callable, Dict, List, Tuple + +import aiofiles +import pandas as pd from tqdm.asyncio import tqdm_asyncio -from examples.aflow.benchmark.utils import generate_random_indices - -global cost -cost = 0 def is_number(text: str) -> bool: try: @@ -41,7 +39,7 @@ def normalize_answer(s): return white_space_fix(remove_articles(remove_punc(lower(s)))) -def compute_f1_score(prediction, ground_truth): +def calculate_score(ground_truth: str, prediction: str): """ Compute the F1 score between prediction and ground truth answers. """ @@ -50,59 +48,120 @@ def compute_f1_score(prediction, ground_truth): common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: - return 0 + return 0, prediction precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) - return f1 + return f1, prediction -# def fuzzy_match(s1: str, s2: str) -> bool: -# s1 = normalize(s1) -# s2 = normalize(s2) - -# if s1 == "" or s2 == "": -# return s1 == s2 - -# return s1 in s2 or s2 in s1 +def ensure_log_file_exists(path: str): + log_file = os.path.join(path, 'log.json') + if not os.path.exists(log_file): + with open(log_file, 'w', encoding='utf-8') as f: + json.dump([], f, indent=4, ensure_ascii=False) -# def drop_metric(sample: str, reference: list[str]) -> Tuple[float, float]: -# em_scores = [] -# f1_scores = [] -# for answer in reference: -# if answer.strip() != "": -# em, f1 = get_drop_metrics(sample, answer) -# em_scores.append(em) -# f1_scores.append(f1) -# return (max(em_scores), max(f1_scores)) +def log_mismatch(problem: str, expected_output, prediction: str, predicted_number, path): + log_data = { + "question": problem, + "right_answer": expected_output, + "model_output": prediction, + "extracted_output": predicted_number + } -async def evaluate_problem(inputs: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]: + log_file = os.path.join(path, 'log.json') + + # 检查log文件是否已经存在 + if os.path.exists(log_file): + # 如果存在,加载现有的日志数据 + with open(log_file, 'r', encoding='utf-8') as f: + try: + data = json.load(f) + except json.JSONDecodeError: + data = [] + else: + # 如果不存在,创建一个新的日志列表 + data = [] + + # 添加新的日志记录 + data.append(log_data) + + # 将数据写回到log.json文件 + with open(log_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=4, ensure_ascii=False) + +async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: + data = [] + # 异步读取文件内容 + async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: + async for line in file: + data.append(json.loads(line)) + + # 然后在随机选择的样本中基于特定索引列表进行进一步筛选 + if specific_indices is not None: + filtered_data = [data[i] for i in specific_indices if i < len(data)] + return filtered_data + + return data + +def save_results_to_csv(results: List[Tuple[str, str, str, int]], path): + # 创建 DataFrame + df = pd.DataFrame(results, columns=["inputs", "prediction", "expected_output", "score", "cost"]) + + # 计算统计数据 + avg_score = df["score"].mean() + t_cost = df["cost"].max() + a_cost = t_cost / len(df) if len(df) > 0 else 0 + + # 获取当前时间,格式为 YYYYMMDD_HHMMSS + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + + # 生成文件名,包含平均分和当前时间,保留五位小数 + filename = f"{avg_score:.5f}_{current_time}.csv" + output_file = os.path.join(path, filename) + + # 保存到 CSV + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return avg_score, a_cost, t_cost + + +async def evaluate_problem(annotation: dict, graph: Callable, log_path) -> Tuple[str, str, float]: + expected_output = annotation["ref_text"] + inputs = annotation["context"] + answers = expected_output.split("|") max_retries = 5 retries = 0 while retries < max_retries: try: - global cost - prediction, cost = await graph(inputs) + output, cost = await graph(inputs) if graph else "None" f1_scores = [] + # if '|' in the output, split it and calculate the score for each part + for answer in answers: if answer.strip() != "": - f1_score = compute_f1_score(prediction, answer) - f1_scores.append(f1_score) + if '|' in output: + output_parts = output.split('|') + for output_part in output_parts: + f1_score, _ = calculate_score(answer, output_part) + f1_scores.append(f1_score) + else: + f1_score, _ = calculate_score(answer, output) + f1_scores.append(f1_score) max_score = max(f1_scores) - # matches = [ - # fuzzy_match(prediction, answer) - # for answer in answers - # ] + uni_score = max_score - # score = True in matches - - score = max_score + if uni_score == 0: + log_mismatch(inputs, expected_output, output, output, log_path) + else: + ensure_log_file_exists(log_path) break @@ -112,81 +171,29 @@ async def evaluate_problem(inputs: str, answers: List[Dict[str, Any]], graph: Ca if retries == max_retries: print("Maximum retries reached. Skipping this sample.") - prediction = None - score = 0.0 + output = e + uni_score = 0.0 + cost = None break - return prediction, score + return inputs, output, expected_output, uni_score, cost -async def evaluate_all_questions(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]: +async def evaluate_all_questions(data: List[Tuple[str, Dict[str, Any]]], graph: Callable, path, max_concurrent_tasks: int = 50) -> List[List[Any]]: semaphore = asyncio.Semaphore(max_concurrent_tasks) - results = [] - async def sem_evaluate(annotation: Dict[str, Any]): + async def sem_evaluate(problem): async with semaphore: - inputs = annotation["context"] - answers = annotation["targets"] - prediction, score = await evaluate_problem(inputs, answers, graph) - results.append([annotation["id"], prediction, answers, score]) + return await evaluate_problem(problem, graph, path) - tasks = [sem_evaluate(annotation) for annotation in annotations] - await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP passages", total=len(annotations)) + tasks = [sem_evaluate(problem) for problem in data] - return results + return await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP problems", total=len(data)) -def save_results_to_csv(results: List[List[Any]], path: str) -> float: - df = pd.DataFrame(results, columns=["id", "prediction", "answers", "score"]) - average_score = df["score"].mean() - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - -# -- From ADAS -- - -def load_drop(file_path, samples, test=False, total_length=1000): - import gzip - with gzip.open(file_path, "rb") as file: - data = [json.loads(line) for line in file] - - random_indices = generate_random_indices(len(data), total_length, False) - random_indices = random_indices[:samples] if not test else random_indices[samples:] - examples = [data[i] for i in random_indices] - - for example in examples: - example["targets"] = example["ref_text"].split("|") - - return examples - -async def drop_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> float: - # data = load_data(file_path, samples, test=test) - data = load_drop(file_path, samples, test=test) - results = await evaluate_all_questions(data, graph, max_concurrent_tasks=30) - average_score = save_results_to_csv(results, path=path) +async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str, va_list: list): + data = await load_data(file_path, va_list) + results = await evaluate_all_questions(data, graph, path, max_concurrent_tasks=25) + average_score, average_cost, total_cost = save_results_to_csv(results, path=path) print(f"Average score on DROP dataset: {average_score:.5f}") - global cost - print(f"Total cost: {cost: .5f}") - print(f"Cost per sample: {(cost / len(data)):.9f}") - return average_score, cost - -def load_drop_from_file(file_path): - import gzip - with gzip.open(file_path, "rb") as file: - data = [json.loads(line) for line in file] - - for example in data: - example["targets"] = example["ref_text"].split("|") - - return data - -async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]: - data = await load_drop_from_file(file_path) - results = await evaluate_all_questions(data, graph, max_concurrent_tasks=50) - average_score = save_results_to_csv(results, path=path) - print(f"Average score on DROP dataset: {average_score:.5f}") - global cost - print(f"Total cost: {cost: .5f}") - print(f"Cost per sample: {(cost / len(data)):.9f}") - return average_score, cost \ No newline at end of file + print(f"Total Cost: {total_cost:.5f}") + return average_score, average_cost, total_cost diff --git a/examples/aflow/benchmark/hotpotqa.py b/examples/aflow/benchmark/hotpotqa.py index 5a08842a1..265e4d995 100644 --- a/examples/aflow/benchmark/hotpotqa.py +++ b/examples/aflow/benchmark/hotpotqa.py @@ -6,15 +6,10 @@ import numpy as np from typing import List, Tuple, Callable, Set from collections import Counter from tqdm.asyncio import tqdm_asyncio -from scipy.optimize import linear_sum_assignment import string import re - - -from examples.aflow.benchmark.utils import generate_random_indices - -global cost -cost = 0 +import os +from datetime import datetime def is_number(text: str) -> bool: try: @@ -43,7 +38,7 @@ def normalize_answer(s): return white_space_fix(remove_articles(remove_punc(lower(s)))) -def f1_score(prediction, ground_truth): +def calculate_score(ground_truth: str, prediction: str): """ Compute the F1 score between prediction and ground truth answers. """ @@ -52,36 +47,101 @@ def f1_score(prediction, ground_truth): common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: - return 0 + return 0, prediction precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) - return f1 + return f1, prediction +def ensure_log_file_exists(path: str): + log_file = os.path.join(path, 'log.json') + if not os.path.exists(log_file): + with open(log_file, 'w', encoding='utf-8') as f: + json.dump([], f, indent=4, ensure_ascii=False) -async def load_data(file_path: str, samples=20, total_length=1250, test=False) -> List[dict]: +def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number, path): + log_data = { + "question": problem, + "right_answer": expected_output, + "model_output": prediction, + "extracted_output": predicted_number + } + + log_file = os.path.join(path, 'log.json') + + # 检查log文件是否已经存在 + if os.path.exists(log_file): + # 如果存在,加载现有的日志数据 + with open(log_file, 'r', encoding='utf-8') as f: + try: + data = json.load(f) + except json.JSONDecodeError: + data = [] + else: + # 如果不存在,创建一个新的日志列表 + data = [] + + # 添加新的日志记录 + data.append(log_data) + + # 将数据写回到log.json文件 + with open(log_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=4, ensure_ascii=False) + +async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: data = [] - async with aiofiles.open(file_path, mode="r") as file: + # 异步读取文件内容 + async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: async for line in file: data.append(json.loads(line)) - random_indices = generate_random_indices(len(data), total_length, False) # get random indices of 1250 - random_indices = random_indices[:samples] if not test else random_indices[samples:] # get n_samples for validation or test - data = [data[i] for i in random_indices] + + # 然后在随机选择的样本中基于特定索引列表进行进一步筛选 + if specific_indices is not None: + filtered_data = [data[i] for i in specific_indices if i < len(data)] + return filtered_data + return data -async def evaluate_problem(input: str, context_str: str, graph: Callable, expected_output: str): +def save_results_to_csv(results: List[Tuple[str, str, str, str, float, float]], path): + # 创建 DataFrame + df = pd.DataFrame(results, columns=["question", "context", "prediction", "expected_output", "score", "cost"]) + + # 计算统计数据 + avg_score = df["score"].mean() + t_cost = df["cost"].max() + a_cost = t_cost / len(df) if len(df) > 0 else 0 + + # 获取当前时间,格式为 YYYYMMDD_HHMMSS + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + + # 生成文件名,包含平均分和当前时间,保留五位小数 + filename = f"{avg_score:.5f}_{current_time}.csv" + output_file = os.path.join(path, filename) + + # 保存到 CSV + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return avg_score, a_cost, t_cost + +async def evaluate_problem(problem: dict, graph: Callable, log_path: str): + input_text = problem["question"] + expected_output = problem["answer"] + paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] + context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) + max_retries = 5 retries = 0 - # global cost - # prediction, cost = await graph(input, context_str) if graph else "None" - # score = f1_score(prediction, expected_output) - while retries < max_retries: try: - global cost - prediction, cost = await graph(input, context_str) if graph else "None" - score = f1_score(prediction, expected_output) + output, cost = await graph(input_text, context_str) if graph else "None" + uni_score, extracted_output = calculate_score(expected_output, output) + + if uni_score == 0: + log_mismatch(input_text, expected_output, output, extracted_output, log_path) + else: + ensure_log_file_exists(log_path) break except Exception as e: @@ -90,62 +150,63 @@ async def evaluate_problem(input: str, context_str: str, graph: Callable, expect if retries == max_retries: print("Maximum retries reached. Skipping this sample.") - prediction = None - score = 0 + output = e + cost = None + uni_score = 0 break - return input, prediction, expected_output, score + return input_text, context_str, output, expected_output, uni_score, cost -async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50): +async def evaluate_problem_optimize(problem: dict, graph: Callable, log_path: str): + input_text = problem["question"] + expected_output = problem["answer"] + paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] + context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) + inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:" + + max_retries = 5 + retries = 0 + + while retries < max_retries: + try: + output, cost = await graph(inputs) if graph else "None" + uni_score, extracted_output = calculate_score(expected_output, output) + + if uni_score == 0: + log_mismatch(input_text, expected_output, output, extracted_output, log_path) + else: + ensure_log_file_exists(log_path) + + break + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + output = e + cost = None + uni_score = 0 + break + + return input_text, context_str, output, expected_output, uni_score, cost + + +async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_concurrent_tasks: int = 50): semaphore = asyncio.Semaphore(max_concurrent_tasks) async def sem_evaluate(problem): async with semaphore: - input_text = problem["question"] - expected_output = problem["answer"] - paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - return await evaluate_problem(input_text, context_str, graph, expected_output) + return await evaluate_problem_optimize(problem, graph, path) tasks = [sem_evaluate(problem) for problem in data] return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data)) -def save_results_to_csv(results: List[Tuple[str, str, str, float]], path: str) -> float: - df = pd.DataFrame( - results, columns=["question", "prediction", "expected_output", "score"] - ) - average_score = df["score"].mean() - - output_file = f"{path}/{average_score:.5f}.csv" - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return average_score - -async def hotpotqa_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> float: - data = await load_data(file_path, samples, test=test) - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20) - average_score = save_results_to_csv(results, path=path) +async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str, va_list: list): + data = await load_data(file_path, va_list) + results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=20) + average_score, average_cost, total_cost = save_results_to_csv(results, path=path) print(f"Average score on HotpotQA dataset: {average_score:.5f}") - global cost - print(f"Total cost: {cost: .5f}") - print(f"Cost per sample: {(cost / len(data)):.9f}") - return average_score - -async def load_file_data(file_path: str) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - return data - -async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]: - data = await load_file_data(file_path) - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50) - average_score = save_results_to_csv(results, path=path) - print(f"Average score on HotpotQA dataset: {average_score:.5f}") - global cost - print(f"Total cost: {cost: .5f}") - print(f"Cost per sample: {(cost / len(data)):.9f}") - return average_score, cost \ No newline at end of file + print(f"Total Cost: {total_cost:.5f}") + return average_score, average_cost, total_cost