diff --git a/examples/aflow/benchmark/benchmark.py b/examples/aflow/benchmark/benchmark.py new file mode 100644 index 000000000..4132a9b04 --- /dev/null +++ b/examples/aflow/benchmark/benchmark.py @@ -0,0 +1,101 @@ +import asyncio +import json +import os +from typing import List, Tuple, Callable, Any +from abc import ABC, abstractmethod +from datetime import datetime + +import aiofiles +import pandas as pd +from tqdm.asyncio import tqdm_asyncio + +class BaseBenchmark(ABC): + def __init__(self, name: str, file_path: str, log_path: str): + self.name = name + self.file_path = file_path + self.log_path = log_path + + async def load_data(self, specific_indices: List[int] = None) -> List[dict]: + data = [] + async with aiofiles.open(self.file_path, mode="r", encoding='utf-8') as file: + async for line in file: + data.append(json.loads(line)) + + if specific_indices is not None: + filtered_data = [data[i] for i in specific_indices if i < len(data)] + return filtered_data + + return data + + def save_results_to_csv(self, results: List[Tuple[Any, ...]], columns: List[str]): + df = pd.DataFrame(results, columns=columns) + avg_score = df["score"].mean() + t_cost = df["cost"].max() + a_cost = t_cost / len(df) if len(df) > 0 else 0 + + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{avg_score:.5f}_{current_time}.csv" + output_file = os.path.join(self.log_path, filename) + + df.to_csv(output_file, index=False) + print(f"Results saved to {output_file}") + + return avg_score, a_cost, t_cost + + def log_mismatch(self, problem: str, expected_output: Any, prediction: str, extracted_output: Any): + log_data = { + "question": problem, + "right_answer": expected_output, + "model_output": prediction, + "extracted_output": extracted_output + } + + log_file = os.path.join(self.log_path, 'log.json') + + if os.path.exists(log_file): + with open(log_file, 'r', encoding='utf-8') as f: + try: + data = json.load(f) + except json.JSONDecodeError: + data = [] + else: + data = [] + + data.append(log_data) + + with open(log_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=4, ensure_ascii=False) + + @abstractmethod + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[Any, ...]: + pass + + @abstractmethod + def calculate_score(self, expected_output: Any, prediction: Any) -> Tuple[float, Any]: + pass + + @abstractmethod + def get_result_columns(self) -> List[str]: + pass + + async def evaluate_all_problems(self, data: List[dict], graph: Callable, max_concurrent_tasks: int = 50): + semaphore = asyncio.Semaphore(max_concurrent_tasks) + + async def sem_evaluate(problem): + async with semaphore: + return await self.evaluate_problem(problem, graph) + + tasks = [sem_evaluate(problem) for problem in data] + + return await tqdm_asyncio.gather(*tasks, desc=f"Evaluating {self.name} problems", total=len(data)) + + async def run_evaluation(self, graph: Callable, va_list: List[int], max_concurrent_tasks: int = 50): + data = await self.load_data(va_list) + results = await self.evaluate_all_problems(data, graph, max_concurrent_tasks) + columns = self.get_result_columns() + average_score, average_cost, total_cost = self.save_results_to_csv(results, columns) + print(f"Average score on {self.name} dataset: {average_score:.5f}") + print(f"Total Cost: {total_cost:.5f}") + return average_score, average_cost, total_cost + + diff --git a/examples/aflow/benchmark/drop.py b/examples/aflow/benchmark/drop.py index cf606abca..547356f0f 100644 --- a/examples/aflow/benchmark/drop.py +++ b/examples/aflow/benchmark/drop.py @@ -11,184 +11,81 @@ import aiofiles import pandas as pd from tqdm.asyncio import tqdm_asyncio +from examples.aflow.benchmark.benchmark import BaseBenchmark -def is_number(text: str) -> bool: - try: - float(text) - return True - except ValueError: - return False +class DROPBenchmark(BaseBenchmark): + def __init__(self, name: str, file_path: str, log_path: str): + super().__init__(name, file_path, log_path) -def normalize_answer(s: str) -> List[str]: - """ - Normalize answers for evaluation. - """ + def normalize_answer(self, s: str) -> List[str]: + """ + Normalize answers for evaluation. + """ - def remove_articles(text): - return re.sub(r"\b(a|an|the)\b", " ", text) + def remove_articles(text): + return re.sub(r"\b(a|an|the)\b", " ", text) - def white_space_fix(text): - return " ".join(text.split()) + def white_space_fix(text): + return " ".join(text.split()) - def remove_punc(text): - exclude = set(string.punctuation) - return "".join(ch for ch in text if ch not in exclude) + def remove_punc(text): + exclude = set(string.punctuation) + return "".join(ch for ch in text if ch not in exclude) - def lower(text): - return text.lower() + def lower(text): + return text.lower() - return white_space_fix(remove_articles(remove_punc(lower(s)))) + return white_space_fix(remove_articles(remove_punc(lower(s)))) -def calculate_score(ground_truth: str, prediction: str) -> Tuple[float, str]: - """ - Compute the F1 score between prediction and ground truth answers. - """ - prediction_tokens = normalize_answer(prediction).split() - ground_truth_tokens = normalize_answer(ground_truth).split() - common = Counter(prediction_tokens) & Counter(ground_truth_tokens) - num_same = sum(common.values()) - if num_same == 0: - return 0, prediction - precision = 1.0 * num_same / len(prediction_tokens) - recall = 1.0 * num_same / len(ground_truth_tokens) - f1 = (2 * precision * recall) / (precision + recall) - return f1, prediction + def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]: + """ + Compute the F1 score between prediction and ground truth answers. + """ + prediction_tokens = self.normalize_answer(prediction).split() + ground_truth_tokens = self.normalize_answer(ground_truth).split() + common = Counter(prediction_tokens) & Counter(ground_truth_tokens) + num_same = sum(common.values()) + if num_same == 0: + return 0, prediction + precision = 1.0 * num_same / len(prediction_tokens) + recall = 1.0 * num_same / len(ground_truth_tokens) + f1 = (2 * precision * recall) / (precision + recall) + return f1, prediction -def ensure_log_file_exists(path: str): - log_file = os.path.join(path, 'log.json') - if not os.path.exists(log_file): - with open(log_file, 'w', encoding='utf-8') as f: - json.dump([], f, indent=4, ensure_ascii=False) + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]: + input_text = problem["context"] + expected_output = problem["ref_text"] + answers = expected_output.split("|") + max_retries = 5 + retries = 0 -def log_mismatch(problem: str, expected_output, prediction: str, predicted_number, path): - log_data = { - "question": problem, - "right_answer": expected_output, - "model_output": prediction, - "extracted_output": predicted_number - } - - log_file = os.path.join(path, 'log.json') - - # Check if the log file already exists - if os.path.exists(log_file): - # If it exists, load the existing log data - with open(log_file, 'r', encoding='utf-8') as f: + while retries < max_retries: try: - data = json.load(f) - except json.JSONDecodeError: - data = [] - else: - # If it doesn't exist, create an empty list - data = [] + output, cost = await graph(input_text) + f1_scores = [] - # Add the new log data to the existing list - data.append(log_data) + for answer in answers: + if answer.strip() != "": + output_parts = output.split("|") + for output_part in output_parts: + f1_score, _ = self.calculate_score(answer, output_part) + f1_scores.append(f1_score) - # Write the updated list back to the log file - with open(log_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4, ensure_ascii=False) + uni_score = max(f1_scores) -async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: - data = [] - # Read the data from the file - async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: - async for line in file: - data.append(json.loads(line)) + if uni_score < 0.3: + self.log_mismatch(input_text, expected_output, output, output) - # Then further filter based on a specific index list in randomly selected samples - if specific_indices is not None: - filtered_data = [data[i] for i in specific_indices if i < len(data)] - return filtered_data + return input_text, output, expected_output, uni_score, cost - return data + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") -def save_results_to_csv(results: List[Tuple[str, str, str, int]], path): - # Create a DataFrame from the results - df = pd.DataFrame(results, columns=["inputs", "prediction", "expected_output", "score", "cost"]) + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + return input_text, str(e), expected_output, 0.0, 0.0 - # Calculate the average score and cost - avg_score = df["score"].mean() - t_cost = df["cost"].max() - a_cost = t_cost / len(df) if len(df) > 0 else 0 - - # Get the current time in the format YYYYMMDD_HHMMSS - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - - # Generate a filename with the average score and the current time, rounded to five decimal places - filename = f"{avg_score:.5f}_{current_time}.csv" - output_file = os.path.join(path, filename) - - # Save the DataFrame to a CSV file - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return avg_score, a_cost, t_cost - - -async def evaluate_problem(annotation: dict, graph: Callable, log_path) -> Tuple[str, str, float]: - expected_output = annotation["ref_text"] - inputs = annotation["context"] - answers = expected_output.split("|") - - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - output, cost = await graph(inputs) if graph else (None, None) - - f1_scores = [] - - # if "|" in the output, split it and calculate the score for each part - for answer in answers: - if answer.strip() != "": - output_parts = output.split("|") - for output_part in output_parts: - f1_score, _ = calculate_score(answer, output_part) - f1_scores.append(f1_score) - - uni_score = max(f1_scores) - - print("uni_score", uni_score) - - if uni_score < 0.3: - log_mismatch(inputs, expected_output, output, output, log_path) - else: - ensure_log_file_exists(log_path) - - break - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - output = e - uni_score = 0.0 - cost = None - break - - return inputs, output, expected_output, uni_score, cost - -async def evaluate_all_questions(data: List[Tuple[str, Dict[str, Any]]], graph: Callable, path, max_concurrent_tasks: int = 50) -> List[List[Any]]: - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - return await evaluate_problem(problem, graph, path) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP problems", total=len(data)) - - -async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str, va_list: list): - data = await load_data(file_path, va_list) - results = await evaluate_all_questions(data, graph, path, max_concurrent_tasks=25) - average_score, average_cost, total_cost = save_results_to_csv(results, path=path) - print(f"Average score on DROP dataset: {average_score:.5f}") - print(f"Total Cost: {total_cost:.5f}") - return average_score, average_cost, total_cost + def get_result_columns(self) -> List[str]: + return ["inputs", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/gsm8k.py b/examples/aflow/benchmark/gsm8k.py index 66cc8ed16..9f7813f86 100644 --- a/examples/aflow/benchmark/gsm8k.py +++ b/examples/aflow/benchmark/gsm8k.py @@ -15,164 +15,52 @@ import os import time from datetime import datetime +from examples.aflow.benchmark.benchmark import BaseBenchmark -def extract_number(text: str) -> Optional[float]: - """Clean text and extract a single number""" - matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text)) - if matches: - last_number = matches[-1].replace(",", "") - try: - return float(last_number) - except ValueError: - return None - else: - return None +class GSM8KBenchmark(BaseBenchmark): + def __init__(self, name: str, file_path: str, log_path: str): + super().__init__(name, file_path, log_path) - -def loose_match_score(expected_output: float, prediction: float, tolerance: float = 1e-6) -> int: - # 如果预测输出为空,返回不匹配 - if prediction is None: - return 0 - - a = expected_output - b = prediction - - # 比较两个提取出的数字,允许一定的容差 - if abs(a - b) <= tolerance: - return 1 # 数字相近,认为匹配成功 - else: - return 0 # 数字不匹配 - - -def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number: float, path): - log_data = { - "question": problem, - "right_answer": expected_output, - "model_output": prediction, - "extracted_output": predicted_number - } - - log_file = os.path.join(path, 'log.json') - - # 检查log文件是否已经存在 - if os.path.exists(log_file): - # 如果存在,加载现有的日志数据 - with open(log_file, 'r', encoding='utf-8') as f: + def extract_number(self, text: str) -> Optional[float]: + matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text)) + if matches: + last_number = matches[-1].replace(",", "") try: - data = json.load(f) - except json.JSONDecodeError: - data = [] - else: - # 如果不存在,创建一个新的日志列表 - data = [] + return float(last_number) + except ValueError: + return None + else: + return None - # 添加新的日志记录 - data.append(log_data) + def calculate_score(self, expected_output: float, prediction: float) -> Tuple[float, float]: + if prediction is None: + return 0.0, prediction + return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction - # 将数据写回到log.json文件 - with open(log_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4, ensure_ascii=False) + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]: + max_retries = 5 + retries = 0 + while retries < max_retries: + try: + prediction, cost = await graph(problem["question"]) + predicted_number = self.extract_number(prediction) + expected_output = self.extract_number(problem["answer"]) -async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: - data = [] - # 异步读取文件内容 - async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: - async for line in file: - data.append(json.loads(line)) + score, _ = self.calculate_score(expected_output, predicted_number) - # 然后在随机选择的样本中基于特定索引列表进行进一步筛选 - if specific_indices is not None: - filtered_data = [data[i] for i in specific_indices if i < len(data)] - return filtered_data + if score == 0: + self.log_mismatch(problem["question"], expected_output, prediction, predicted_number) - return data + return problem["question"], prediction, expected_output, score, cost + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") -def save_results_to_csv(results: List[Tuple[str, str, str, int]], path): - # 创建 DataFrame - df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"]) + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + return problem["question"], str(e), self.extract_number(problem["answer"]), 0.0, 0.0 - # 计算统计数据 - avg_score = df["score"].mean() - t_cost = df["cost"].max() - a_cost = t_cost / len(df) if len(df) > 0 else 0 - - # 获取当前时间,格式为 YYYYMMDD_HHMMSS - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - - # 生成文件名,包含平均分和当前时间,保留五位小数 - filename = f"{avg_score:.5f}_{current_time}.csv" - output_file = os.path.join(path, filename) - - # 保存到 CSV - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return avg_score, a_cost, t_cost - - -async def evaluate_problem(input: str, graph, expected_output: str, path) -> tuple[ - str, str , float, int, str]: - max_retries = 5 - retries = 0 - uni_score = 0 - - while retries < max_retries: - try: - prediction = await graph(input) if graph else "None" # 这是一个占位符,替换成实际的模型生成逻辑 - cost = prediction[1] - output = prediction[0] - if output is not None: - predicted_number = extract_number(output) - expected_output = extract_number(expected_output) - else: - predicted_number = None - - uni_score = loose_match_score(expected_output, predicted_number) - - if uni_score == 0: - log_mismatch(input, expected_output, output, predicted_number, path) - else: - pass - - break - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - time.sleep(5 * retries) - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - output = e - cost = None - uni_score = 0 - break - - return input, output, expected_output, uni_score, cost - - -async def evaluate_all_problems(data: List[dict], graph, path, max_concurrent_tasks: int = 100): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - input_text = problem["question"] - expected_output = problem["answer"] - return await evaluate_problem(input_text, graph, expected_output, path) - - tasks = [sem_evaluate(problem) for problem in data] - - # 使用tqdm.gather来显示进度条 - return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data)) - -async def optimize_gsm8k_evaluation(graph: Callable, file_path: str, path: str, va_list: list) -> tuple[ - Any, Any, Any]: - """Optimize GSM8K evaluation main function""" - data = await load_data(file_path, va_list) - results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=30) - average_score, average_cost, total_cost = save_results_to_csv(results, path=path) - print(f"Average score: {average_score:.5f}") - print(f"Total Cost: {total_cost:.5f}") - return average_score, average_cost, total_cost + def get_result_columns(self) -> List[str]: + return ["question", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/hotpotqa.py b/examples/aflow/benchmark/hotpotqa.py index 68dd2a89a..fa6371aff 100644 --- a/examples/aflow/benchmark/hotpotqa.py +++ b/examples/aflow/benchmark/hotpotqa.py @@ -2,211 +2,73 @@ import json import asyncio import aiofiles import pandas as pd -import numpy as np -from typing import List, Tuple, Callable, Set -from collections import Counter -from tqdm.asyncio import tqdm_asyncio +from typing import List, Tuple, Callable, Any import string import re import os -from datetime import datetime +from collections import Counter -def is_number(text: str) -> bool: - try: - float(text) - return True - except ValueError: - return False +from examples.aflow.benchmark.benchmark import BaseBenchmark -def normalize_answer(s): - """ - Normalize answers for evaluation. - """ +class HotpotQABenchmark(BaseBenchmark): + def __init__(self, name: str, file_path: str, log_path: str): + super().__init__(name, file_path, log_path) - def remove_articles(text): - return re.sub(r"\b(a|an|the)\b", " ", text) + def normalize_answer(self, s: str) -> str: + def remove_articles(text): + return re.sub(r"\b(a|an|the)\b", " ", text) - def white_space_fix(text): - return " ".join(text.split()) + def white_space_fix(text): + return " ".join(text.split()) - def remove_punc(text): - exclude = set(string.punctuation) - return "".join(ch for ch in text if ch not in exclude) + def remove_punc(text): + exclude = set(string.punctuation) + return "".join(ch for ch in text if ch not in exclude) - def lower(text): - return text.lower() + def lower(text): + return text.lower() - return white_space_fix(remove_articles(remove_punc(lower(s)))) + return white_space_fix(remove_articles(remove_punc(lower(s)))) -def calculate_score(ground_truth: str, prediction: str): - """ - Compute the F1 score between prediction and ground truth answers. - """ - prediction_tokens = normalize_answer(prediction).split() - ground_truth_tokens = normalize_answer(ground_truth).split() - common = Counter(prediction_tokens) & Counter(ground_truth_tokens) - num_same = sum(common.values()) - if num_same == 0: - return 0, prediction - precision = 1.0 * num_same / len(prediction_tokens) - recall = 1.0 * num_same / len(ground_truth_tokens) - f1 = (2 * precision * recall) / (precision + recall) - return f1, prediction + def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]: + prediction_tokens = self.normalize_answer(prediction).split() + ground_truth_tokens = self.normalize_answer(ground_truth).split() + common = Counter(prediction_tokens) & Counter(ground_truth_tokens) + num_same = sum(common.values()) + if num_same == 0: + return 0, prediction + precision = 1.0 * num_same / len(prediction_tokens) + recall = 1.0 * num_same / len(ground_truth_tokens) + f1 = (2 * precision * recall) / (precision + recall) + return f1, prediction -def ensure_log_file_exists(path: str): - log_file = os.path.join(path, 'log.json') - if not os.path.exists(log_file): - with open(log_file, 'w', encoding='utf-8') as f: - json.dump([], f, indent=4, ensure_ascii=False) + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]: + input_text = problem["question"] + expected_output = problem["answer"] + paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] + context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) + inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:" -def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number, path): - log_data = { - "question": problem, - "right_answer": expected_output, - "model_output": prediction, - "extracted_output": predicted_number - } + max_retries = 5 + retries = 0 - log_file = os.path.join(path, 'log.json') - - # 检查log文件是否已经存在 - if os.path.exists(log_file): - # 如果存在,加载现有的日志数据 - with open(log_file, 'r', encoding='utf-8') as f: + while retries < max_retries: try: - data = json.load(f) - except json.JSONDecodeError: - data = [] - else: - # 如果不存在,创建一个新的日志列表 - data = [] + output, cost = await graph(inputs) + score, _ = self.calculate_score(expected_output, output) - # 添加新的日志记录 - data.append(log_data) + if score < 0.3: + self.log_mismatch(input_text, expected_output, output, output) - # 将数据写回到log.json文件 - with open(log_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4, ensure_ascii=False) + return input_text, context_str, output, expected_output, score, cost -async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: - data = [] - # 异步读取文件内容 - async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: - async for line in file: - data.append(json.loads(line)) + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - # 然后在随机选择的样本中基于特定索引列表进行进一步筛选 - if specific_indices is not None: - filtered_data = [data[i] for i in specific_indices if i < len(data)] - return filtered_data + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + return input_text, context_str, str(e), expected_output, 0.0, 0.0 - return data - -def save_results_to_csv(results: List[Tuple[str, str, str, str, float, float]], path): - # 创建 DataFrame - df = pd.DataFrame(results, columns=["question", "context", "prediction", "expected_output", "score", "cost"]) - - # 计算统计数据 - avg_score = df["score"].mean() - t_cost = df["cost"].max() - a_cost = t_cost / len(df) if len(df) > 0 else 0 - - # 获取当前时间,格式为 YYYYMMDD_HHMMSS - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - - # 生成文件名,包含平均分和当前时间,保留五位小数 - filename = f"{avg_score:.5f}_{current_time}.csv" - output_file = os.path.join(path, filename) - - # 保存到 CSV - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return avg_score, a_cost, t_cost - -async def evaluate_problem(problem: dict, graph: Callable, log_path: str): - input_text = problem["question"] - expected_output = problem["answer"] - paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - output, cost = await graph(input_text, context_str) if graph else "None" - uni_score, extracted_output = calculate_score(expected_output, output) - - if uni_score == 0: - log_mismatch(input_text, expected_output, output, extracted_output, log_path) - else: - ensure_log_file_exists(log_path) - - break - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - output = e - cost = None - uni_score = 0 - break - - return input_text, context_str, output, expected_output, uni_score, cost - -async def evaluate_problem_optimize(problem: dict, graph: Callable, log_path: str): - input_text = problem["question"] - expected_output = problem["answer"] - paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)] - context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) - inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:" - - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - output, cost = await graph(inputs) if graph else "None" - uni_score, extracted_output = calculate_score(expected_output, output) - - if uni_score < 0.3: - log_mismatch(input_text, expected_output, output, extracted_output, log_path) - else: - ensure_log_file_exists(log_path) - - break - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - output = e - cost = None - uni_score = 0 - break - - return input_text, context_str, output, expected_output, uni_score, cost - - -async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_concurrent_tasks: int = 50): - semaphore = asyncio.Semaphore(max_concurrent_tasks) - - async def sem_evaluate(problem): - async with semaphore: - return await evaluate_problem_optimize(problem, graph, path) - - tasks = [sem_evaluate(problem) for problem in data] - - return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data)) - -async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str, va_list: list): - data = await load_data(file_path, va_list) - results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=20) - average_score, average_cost, total_cost = save_results_to_csv(results, path=path) - print(f"Average score on HotpotQA dataset: {average_score:.5f}") - print(f"Total Cost: {total_cost:.5f}") - return average_score, average_cost, total_cost + def get_result_columns(self) -> List[str]: + return ["question", "context", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/humaneval.py b/examples/aflow/benchmark/humaneval.py index eede6bacc..797fe765c 100644 --- a/examples/aflow/benchmark/humaneval.py +++ b/examples/aflow/benchmark/humaneval.py @@ -2,219 +2,139 @@ import os import time import json import asyncio -import aiofiles import threading from datetime import datetime from typing import List, Tuple, Callable, Dict, Any, Optional -import re import pandas as pd -from tqdm.asyncio import tqdm_asyncio -from examples.aflow.benchmark.utils import generate_random_indices -from examples.aflow.benchmark.utils import log_mismatch +from examples.aflow.benchmark.benchmark import BaseBenchmark from metagpt.actions.code_sanitize import sanitize +class HumanEvalBenchmark(BaseBenchmark): + def __init__(self, name: str, file_path: str, log_path: str): + super().__init__(name, file_path, log_path) -async def load_data(file_path: str, samples=1, test=False) -> List[dict]: - data = [] - async with aiofiles.open(file_path, mode="r") as file: - async for line in file: - data.append(json.loads(line)) - random_indices = generate_random_indices(len(data), samples, test) - data = [data[i] for i in random_indices] - return data + PASS = "PASS" + FAIL = "FAIL" + class TimeoutError(Exception): + pass -async def load_file_data(file_path: str, specific_indices: List[int] = None) -> List[dict]: - data = [] - # 异步读取文件内容 - async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file: - async for line in file: - data.append(json.loads(line)) + def run_with_timeout(self, func, args, timeout): + result = [] + stop_event = threading.Event() - # 然后在随机选择的样本中基于特定索引列表进行进一步筛选 - if specific_indices is not None: - filtered_data = [data[i] for i in specific_indices if i < len(data)] - return filtered_data + def target(): + try: + result.append(func(*args)) + except Exception as e: + result.append(e) + finally: + stop_event.set() - return data + thread = threading.Thread(target=target) + thread.start() + is_timeout = not stop_event.wait(timeout) -PASS = "PASS" -FAIL = "FAIL" + if is_timeout: + raise self.TimeoutError("Function execution timed out") -class TimeoutError(Exception): - pass + if not result: + return None + if isinstance(result[0], Exception): + raise result[0] + return result[0] -def run_with_timeout(func, args, timeout): - result = [] - stop_event = threading.Event() - - def target(): + def check_solution(self, solution, test, entry_point): + solution = sanitize(code=solution, entrypoint=entry_point) try: - result.append(func(*args)) - except Exception as e: - result.append(e) - finally: - stop_event.set() - - thread = threading.Thread(target=target) - thread.start() - is_timeout = not stop_event.wait(timeout) - - if is_timeout: - # 线程仍在运行,我们无法强制终止它,但至少可以标记超时 - raise TimeoutError("Function execution timed out") - - if not result: - return None - if isinstance(result[0], Exception): - raise result[0] - return result[0] - -def check_solution(solution, test, entry_point): - - solution = sanitize(code=solution, entrypoint=entry_point) - try: - # 定义一个包含所有必要模块的全局字典 - global_dict = { - 'math': __import__('math'), - 'hashlib': __import__('hashlib'), - 're': __import__('re'), - 'List': List, - 'Dict': Dict, - 'Tuple': Tuple, - 'Optional': Optional, - 'Any': Any - } - if entry_point == "decode_cyclic": - solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution - elif entry_point == "decode_shift": - solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution - elif entry_point == "find_zero": - solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution - # 执行解决方案 - exec(solution, global_dict) - - # 确保入口点函数已定义 - if entry_point not in global_dict: - raise ValueError(f"函数 {entry_point} 在解决方案中未定义。") - - # 执行测试用例 - exec(test, global_dict) - - # 获取检查函数 - check = global_dict["check"] - - # 运行检查函数,设置超时时间为120秒 - result = run_with_timeout(check, (global_dict[entry_point],), 15) - - if result is None: - result = (PASS, "解决方案通过了所有测试用例。") - - except TimeoutError: - result = (FAIL, "执行超时。请检查您的解决方案是否包含无限循环或过于耗时的操作。") - except Exception as e: - # 记录详细的错误信息 - error_message = f"错误: {str(e)}.\n 解决方案: {solution}.\n 测试: {test}" - result = (FAIL, error_message) - - # 将错误信息写入error.log文件 - with open('error.log', 'a', encoding='utf-8') as log_file: - log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n") - - return result - - -async def evaluate_problem(data: dict, graph: Callable, path) -> Tuple[str, str, str, int, str]: - max_retries = 5 - retries = 0 - - expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] - - while retries < max_retries: - try: - prediction = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60) if graph else "None" - cost = prediction[1] - solution = prediction[0] - ret = check_solution(solution, data["test"], data["entry_point"]) - test_case_details = ret[1] - expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] - score = 1 if ret[0] == PASS else 0 - - if score == 0: - log_mismatch(data["prompt"], expected_output, solution, score, path) - break + global_dict = { + 'math': __import__('math'), + 'hashlib': __import__('hashlib'), + 're': __import__('re'), + 'List': List, + 'Dict': Dict, + 'Tuple': Tuple, + 'Optional': Optional, + 'Any': Any + } - except TimeoutError: - solution = None - ret = (FAIL, ["超时"]) - score = 0 - cost = 0 - break - + # Add handling for special cases + if entry_point == "decode_cyclic": + solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution + elif entry_point == "decode_shift": + solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution + elif entry_point == "find_zero": + solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution + + exec(solution, global_dict) + + if entry_point not in global_dict: + raise ValueError(f"Function {entry_point} is not defined in the solution.") + + exec(test, global_dict) + + check = global_dict["check"] + + result = self.run_with_timeout(check, (global_dict[entry_point],), 15) + + if result is None: + result = (self.PASS, "The solution passed all test cases.") + + except self.TimeoutError: + result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.") except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}" + result = (self.FAIL, error_message) + + with open('error.log', 'a', encoding='utf-8') as log_file: + log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n") + + return result - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - solution = None - ret = (FAIL, []) - score = 0 - cost = 0 + async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: + max_retries = 5 + retries = 0 + + expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] + + while retries < max_retries: + try: + prediction, cost = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60) + ret = self.check_solution(prediction, data["test"], data["entry_point"]) + test_case_details = ret[1] + expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] + score = 1.0 if ret[0] == self.PASS else 0.0 + + if score == 0: + self.log_mismatch(data["prompt"], expected_output, prediction, score) + break + + except asyncio.TimeoutError: + prediction = None + ret = (self.FAIL, ["Timeout"]) + score = 0.0 + cost = 0.0 break - return data["prompt"], solution, expected_output, score, cost # 修改返回值以包含cost + except Exception as e: + retries += 1 + print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") -async def evaluate_all_problems(data: List[dict], graph: Callable, path:str="", max_concurrent_tasks: int = 50) -> List[Tuple[str, str, str, int, str]]: - semaphore = asyncio.Semaphore(max_concurrent_tasks) + if retries == max_retries: + print("Maximum retries reached. Skipping this sample.") + prediction = None + ret = (self.FAIL, []) + score = 0.0 + cost = 0.0 + break - async def sem_evaluate(problem): - async with semaphore: - return await evaluate_problem(problem, graph, path) + return data["prompt"], prediction, expected_output, score, cost - tasks = [sem_evaluate(problem) for problem in data] + def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]: + # The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface + return 0.0, prediction - return await tqdm_asyncio.gather(*tasks, desc="Evaluating HumanEval problems", total=len(data)) - -def save_results_to_csv(results: List[Tuple[str, str, str, int]], path): - # 创建 DataFrame - df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"]) - - # 计算统计数据 - avg_score = df["score"].mean() - t_cost = df["cost"].max() - a_cost = t_cost / len(df) if len(df) > 0 else 0 - - # 获取当前时间,格式为 YYYYMMDD_HHMMSS - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - - # 生成文件名,包含平均分和当前时间,保留五位小数 - filename = f"{avg_score:.5f}_{current_time}.csv" - output_file = os.path.join(path, filename) - - # 保存到 CSV - df.to_csv(output_file, index=False) - print(f"Results saved to {output_file}") - - return avg_score, a_cost, t_cost - -async def humaneval_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]: - data = await load_data(file_path, samples, test=test) - results = await evaluate_all_problems(data, graph, max_concurrent_tasks=5) - average_score, average_cost, total_cost = save_results_to_csv(results, path=path) - print(f"Average score on HumanEval dataset: {average_score:.5f}") - print(f"Total Cost: {total_cost:.5f}") - print(f"Average cost on HumanEval dataset: {average_cost:.5f}") - return average_score, total_cost # 修改返回值以包含total_cost - - -async def optimize_humaneval_evaluation(graph: Callable, file_path: str, path: str, va_list: List[int]) -> Tuple[float, float, float]: - data = await load_file_data(file_path, va_list) - results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=10) - average_score, average_cost, total_cost = save_results_to_csv(results, path=path) - print(f"Average score on HumanEval dataset: {average_score:.5f}") - print(f"Total Cost: {total_cost:.5f}") - print(f"Average cost on HumanEval dataset: {average_cost:.5f}") - return average_score, average_cost, total_cost + def get_result_columns(self) -> List[str]: + return ["inputs", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/scripts/evaluator.py b/examples/aflow/scripts/evaluator.py index 285627b75..1e746c172 100644 --- a/examples/aflow/scripts/evaluator.py +++ b/examples/aflow/scripts/evaluator.py @@ -3,15 +3,16 @@ # @Author : all # @Desc : Evaluation for different datasets -from typing import Literal, Tuple, Optional +from typing import Literal, Tuple, Optional, Dict import asyncio -from examples.aflow.benchmark.gsm8k import optimize_gsm8k_evaluation -from examples.aflow.benchmark.math import optimize_math_evaluation -from examples.aflow.benchmark.humaneval import optimize_humaneval_evaluation -from examples.aflow.benchmark.hotpotqa import optimize_hotpotqa_evaluation -from examples.aflow.benchmark.mbpp import optimize_mbpp_evaluation -from examples.aflow.benchmark.drop import optimize_drop_evaluation +from examples.aflow.benchmark.benchmark import BaseBenchmark +from examples.aflow.benchmark.gsm8k import GSM8KBenchmark +from examples.aflow.benchmark.math import MATHBenchmark +from examples.aflow.benchmark.humaneval import HumanEvalBenchmark +from examples.aflow.benchmark.hotpotqa import HotpotQABenchmark +from examples.aflow.benchmark.mbpp import MBPPBenchmark +from examples.aflow.benchmark.drop import DROPBenchmark # If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"] @@ -23,51 +24,40 @@ class Evaluator: def __init__(self, eval_path: str): self.eval_path = eval_path - self.dataset_configs = { - "GSM8K": {"name": "GSM8K", "eval_func": optimize_gsm8k_evaluation}, - "MATH": {"name": "MATH", "eval_func": optimize_math_evaluation}, - "HumanEval": {"name": "HumanEval", "eval_func": optimize_humaneval_evaluation}, - "HotpotQA": {"name": "HotpotQA", "eval_func": optimize_hotpotqa_evaluation}, - "MBPP": {"name": "MBPP", "eval_func": optimize_mbpp_evaluation}, - "DROP": {"name": "DROP", "eval_func": optimize_drop_evaluation}, + self.dataset_configs: Dict[DatasetType, BaseBenchmark] = { + "GSM8K": GSM8KBenchmark, + "MATH": MATHBenchmark, + "HumanEval": HumanEvalBenchmark, + "HotpotQA": HotpotQABenchmark, + "MBPP": MBPPBenchmark, + "DROP": DROPBenchmark, } - def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path, is_test=False): - """ - Evaluates on validation dataset. - """ - if dataset in self.dataset_configs: - return self._generic_eval(dataset, graph, params, path, is_test) - else: - return None + async def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False) -> Tuple[float, float, float]: + if dataset not in self.dataset_configs: + raise ValueError(f"Unsupported dataset: {dataset}") - async def _generic_eval(self, dataset: DatasetType, graph_class, params: dict, path: str, test: bool = False) -> Tuple[float, float, float]: - """ - Generic evaluation function for all datasets. - """ - async def load_graph(): - dataset_config = params["dataset"] - llm_config = params["llm_config"] - return graph_class(name=self.dataset_configs[dataset]["name"], llm_config=llm_config, dataset=dataset_config) + data_path = self._get_data_path(dataset, is_test) + benchmark_class = self.dataset_configs[dataset] + benchmark = benchmark_class(dataset, data_path, path) - data_path, va_list = self._get_data_path_and_va_list(dataset, test) - graph = await load_graph() - - eval_func = self.dataset_configs[dataset]["eval_func"] - avg_score, avg_cost, total_cost = await eval_func(graph, data_path, path, va_list) - - return avg_score, avg_cost, total_cost + # Use params to configure the graph and benchmark + configured_graph = await self._configure_graph(graph, params) - def _get_data_path_and_va_list(self, dataset: DatasetType, test: bool) -> Tuple[str, Optional[list]]: - """ - Get data path and validation list based on dataset and test flag. - """ + va_list = [1,2,3] # Use va_list from params, or use default value if not provided + return await benchmark.run_evaluation(configured_graph, va_list) + + async def _configure_graph(self, graph, params: dict): + # Here you can configure the graph based on params + # For example: set LLM configuration, dataset configuration, etc. + dataset_config = params.get("dataset", {}) + llm_config = params.get("llm_config", {}) + return graph(name=self.dataset_configs[dataset]["name"], llm_config=llm_config, dataset=dataset_config) + + def _get_data_path(self, dataset: DatasetType, test: bool) -> str: base_path = f"examples/aflow/data/{dataset.lower()}" - if test: - return f"{base_path}_test.jsonl", None - else: - return f"{base_path}_validate.jsonl", [1, 2, 3] # Replace with the actual filtered index list + return f"{base_path}_test.jsonl" if test else f"{base_path}_validate.jsonl" # Alias methods for backward compatibility for dataset in ["gsm8k", "math", "humaneval", "mbpp", "hotpotqa", "drop"]: - setattr(Evaluator, f"_{dataset}_eval", lambda self, *args, dataset=dataset.upper(), **kwargs: self._generic_eval(dataset, *args, **kwargs)) \ No newline at end of file + setattr(Evaluator, f"_{dataset}_eval", lambda self, *args, dataset=dataset.upper(), **kwargs: self.graph_evaluate(dataset, *args, **kwargs))