This commit is contained in:
didi 2024-10-21 12:46:17 +08:00
parent ade10684b7
commit efa00f8bbb
6 changed files with 393 additions and 735 deletions

View file

@ -0,0 +1,101 @@
import asyncio
import json
import os
from typing import List, Tuple, Callable, Any
from abc import ABC, abstractmethod
from datetime import datetime
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
class BaseBenchmark(ABC):
def __init__(self, name: str, file_path: str, log_path: str):
self.name = name
self.file_path = file_path
self.log_path = log_path
async def load_data(self, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(self.file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
def save_results_to_csv(self, results: List[Tuple[Any, ...]], columns: List[str]):
df = pd.DataFrame(results, columns=columns)
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(self.log_path, filename)
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
def log_mismatch(self, problem: str, expected_output: Any, prediction: str, extracted_output: Any):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": extracted_output
}
log_file = os.path.join(self.log_path, 'log.json')
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
data = []
data.append(log_data)
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
@abstractmethod
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[Any, ...]:
pass
@abstractmethod
def calculate_score(self, expected_output: Any, prediction: Any) -> Tuple[float, Any]:
pass
@abstractmethod
def get_result_columns(self) -> List[str]:
pass
async def evaluate_all_problems(self, data: List[dict], graph: Callable, max_concurrent_tasks: int = 50):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
return await self.evaluate_problem(problem, graph)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc=f"Evaluating {self.name} problems", total=len(data))
async def run_evaluation(self, graph: Callable, va_list: List[int], max_concurrent_tasks: int = 50):
data = await self.load_data(va_list)
results = await self.evaluate_all_problems(data, graph, max_concurrent_tasks)
columns = self.get_result_columns()
average_score, average_cost, total_cost = self.save_results_to_csv(results, columns)
print(f"Average score on {self.name} dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost

View file

@ -11,184 +11,81 @@ import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from examples.aflow.benchmark.benchmark import BaseBenchmark
def is_number(text: str) -> bool:
try:
float(text)
return True
except ValueError:
return False
class DROPBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def normalize_answer(s: str) -> List[str]:
"""
Normalize answers for evaluation.
"""
def normalize_answer(self, s: str) -> List[str]:
"""
Normalize answers for evaluation.
"""
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
return white_space_fix(remove_articles(remove_punc(lower(s))))
def calculate_score(ground_truth: str, prediction: str) -> Tuple[float, str]:
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = normalize_answer(prediction).split()
ground_truth_tokens = normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = self.normalize_answer(prediction).split()
ground_truth_tokens = self.normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
def ensure_log_file_exists(path: str):
log_file = os.path.join(path, 'log.json')
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([], f, indent=4, ensure_ascii=False)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = problem["context"]
expected_output = problem["ref_text"]
answers = expected_output.split("|")
max_retries = 5
retries = 0
def log_mismatch(problem: str, expected_output, prediction: str, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
log_file = os.path.join(path, 'log.json')
# Check if the log file already exists
if os.path.exists(log_file):
# If it exists, load the existing log data
with open(log_file, 'r', encoding='utf-8') as f:
while retries < max_retries:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# If it doesn't exist, create an empty list
data = []
output, cost = await graph(input_text)
f1_scores = []
# Add the new log data to the existing list
data.append(log_data)
for answer in answers:
if answer.strip() != "":
output_parts = output.split("|")
for output_part in output_parts:
f1_score, _ = self.calculate_score(answer, output_part)
f1_scores.append(f1_score)
# Write the updated list back to the log file
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
uni_score = max(f1_scores)
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# Read the data from the file
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
if uni_score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
# Then further filter based on a specific index list in randomly selected samples
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return input_text, output, expected_output, uni_score, cost
return data
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# Create a DataFrame from the results
df = pd.DataFrame(results, columns=["inputs", "prediction", "expected_output", "score", "cost"])
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return input_text, str(e), expected_output, 0.0, 0.0
# Calculate the average score and cost
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# Get the current time in the format YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# Generate a filename with the average score and the current time, rounded to five decimal places
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# Save the DataFrame to a CSV file
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(annotation: dict, graph: Callable, log_path) -> Tuple[str, str, float]:
expected_output = annotation["ref_text"]
inputs = annotation["context"]
answers = expected_output.split("|")
max_retries = 5
retries = 0
while retries < max_retries:
try:
output, cost = await graph(inputs) if graph else (None, None)
f1_scores = []
# if "|" in the output, split it and calculate the score for each part
for answer in answers:
if answer.strip() != "":
output_parts = output.split("|")
for output_part in output_parts:
f1_score, _ = calculate_score(answer, output_part)
f1_scores.append(f1_score)
uni_score = max(f1_scores)
print("uni_score", uni_score)
if uni_score < 0.3:
log_mismatch(inputs, expected_output, output, output, log_path)
else:
ensure_log_file_exists(log_path)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = e
uni_score = 0.0
cost = None
break
return inputs, output, expected_output, uni_score, cost
async def evaluate_all_questions(data: List[Tuple[str, Dict[str, Any]]], graph: Callable, path, max_concurrent_tasks: int = 50) -> List[List[Any]]:
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
return await evaluate_problem(problem, graph, path)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP problems", total=len(data))
async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str, va_list: list):
data = await load_data(file_path, va_list)
results = await evaluate_all_questions(data, graph, path, max_concurrent_tasks=25)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -15,164 +15,52 @@ import os
import time
from datetime import datetime
from examples.aflow.benchmark.benchmark import BaseBenchmark
def extract_number(text: str) -> Optional[float]:
"""Clean text and extract a single number"""
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text))
if matches:
last_number = matches[-1].replace(",", "")
try:
return float(last_number)
except ValueError:
return None
else:
return None
class GSM8KBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def loose_match_score(expected_output: float, prediction: float, tolerance: float = 1e-6) -> int:
# 如果预测输出为空,返回不匹配
if prediction is None:
return 0
a = expected_output
b = prediction
# 比较两个提取出的数字,允许一定的容差
if abs(a - b) <= tolerance:
return 1 # 数字相近,认为匹配成功
else:
return 0 # 数字不匹配
def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number: float, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
def extract_number(self, text: str) -> Optional[float]:
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text))
if matches:
last_number = matches[-1].replace(",", "")
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
return float(last_number)
except ValueError:
return None
else:
return None
# 添加新的日志记录
data.append(log_data)
def calculate_score(self, expected_output: float, prediction: float) -> Tuple[float, float]:
if prediction is None:
return 0.0, prediction
return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]:
max_retries = 5
retries = 0
while retries < max_retries:
try:
prediction, cost = await graph(problem["question"])
predicted_number = self.extract_number(prediction)
expected_output = self.extract_number(problem["answer"])
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
score, _ = self.calculate_score(expected_output, predicted_number)
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
if score == 0:
self.log_mismatch(problem["question"], expected_output, prediction, predicted_number)
return data
return problem["question"], prediction, expected_output, score, cost
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return problem["question"], str(e), self.extract_number(problem["answer"]), 0.0, 0.0
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(input: str, graph, expected_output: str, path) -> tuple[
str, str , float, int, str]:
max_retries = 5
retries = 0
uni_score = 0
while retries < max_retries:
try:
prediction = await graph(input) if graph else "None" # 这是一个占位符,替换成实际的模型生成逻辑
cost = prediction[1]
output = prediction[0]
if output is not None:
predicted_number = extract_number(output)
expected_output = extract_number(expected_output)
else:
predicted_number = None
uni_score = loose_match_score(expected_output, predicted_number)
if uni_score == 0:
log_mismatch(input, expected_output, output, predicted_number, path)
else:
pass
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
time.sleep(5 * retries)
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = e
cost = None
uni_score = 0
break
return input, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph, path, max_concurrent_tasks: int = 100):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
input_text = problem["question"]
expected_output = problem["answer"]
return await evaluate_problem(input_text, graph, expected_output, path)
tasks = [sem_evaluate(problem) for problem in data]
# 使用tqdm.gather来显示进度条
return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data))
async def optimize_gsm8k_evaluation(graph: Callable, file_path: str, path: str, va_list: list) -> tuple[
Any, Any, Any]:
"""Optimize GSM8K evaluation main function"""
data = await load_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=30)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -2,211 +2,73 @@ import json
import asyncio
import aiofiles
import pandas as pd
import numpy as np
from typing import List, Tuple, Callable, Set
from collections import Counter
from tqdm.asyncio import tqdm_asyncio
from typing import List, Tuple, Callable, Any
import string
import re
import os
from datetime import datetime
from collections import Counter
def is_number(text: str) -> bool:
try:
float(text)
return True
except ValueError:
return False
from examples.aflow.benchmark.benchmark import BaseBenchmark
def normalize_answer(s):
"""
Normalize answers for evaluation.
"""
class HotpotQABenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def normalize_answer(self, s: str) -> str:
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
return white_space_fix(remove_articles(remove_punc(lower(s))))
def calculate_score(ground_truth: str, prediction: str):
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = normalize_answer(prediction).split()
ground_truth_tokens = normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
prediction_tokens = self.normalize_answer(prediction).split()
ground_truth_tokens = self.normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
def ensure_log_file_exists(path: str):
log_file = os.path.join(path, 'log.json')
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([], f, indent=4, ensure_ascii=False)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]:
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
max_retries = 5
retries = 0
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
while retries < max_retries:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
output, cost = await graph(inputs)
score, _ = self.calculate_score(expected_output, output)
# 添加新的日志记录
data.append(log_data)
if score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
return input_text, context_str, output, expected_output, score, cost
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return input_text, context_str, str(e), expected_output, 0.0, 0.0
return data
def save_results_to_csv(results: List[Tuple[str, str, str, str, float, float]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "context", "prediction", "expected_output", "score", "cost"])
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(problem: dict, graph: Callable, log_path: str):
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
max_retries = 5
retries = 0
while retries < max_retries:
try:
output, cost = await graph(input_text, context_str) if graph else "None"
uni_score, extracted_output = calculate_score(expected_output, output)
if uni_score == 0:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = e
cost = None
uni_score = 0
break
return input_text, context_str, output, expected_output, uni_score, cost
async def evaluate_problem_optimize(problem: dict, graph: Callable, log_path: str):
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
max_retries = 5
retries = 0
while retries < max_retries:
try:
output, cost = await graph(inputs) if graph else "None"
uni_score, extracted_output = calculate_score(expected_output, output)
if uni_score < 0.3:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = e
cost = None
uni_score = 0
break
return input_text, context_str, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_concurrent_tasks: int = 50):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
return await evaluate_problem_optimize(problem, graph, path)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data))
async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str, va_list: list):
data = await load_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=20)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost
def get_result_columns(self) -> List[str]:
return ["question", "context", "prediction", "expected_output", "score", "cost"]

View file

@ -2,219 +2,139 @@ import os
import time
import json
import asyncio
import aiofiles
import threading
from datetime import datetime
from typing import List, Tuple, Callable, Dict, Any, Optional
import re
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from examples.aflow.benchmark.utils import generate_random_indices
from examples.aflow.benchmark.utils import log_mismatch
from examples.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.actions.code_sanitize import sanitize
class HumanEvalBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
async def load_data(file_path: str, samples=1, test=False) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples, test)
data = [data[i] for i in random_indices]
return data
PASS = "PASS"
FAIL = "FAIL"
class TimeoutError(Exception):
pass
async def load_file_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
def run_with_timeout(self, func, args, timeout):
result = []
stop_event = threading.Event()
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
def target():
try:
result.append(func(*args))
except Exception as e:
result.append(e)
finally:
stop_event.set()
return data
thread = threading.Thread(target=target)
thread.start()
is_timeout = not stop_event.wait(timeout)
PASS = "PASS"
FAIL = "FAIL"
if is_timeout:
raise self.TimeoutError("Function execution timed out")
class TimeoutError(Exception):
pass
if not result:
return None
if isinstance(result[0], Exception):
raise result[0]
return result[0]
def run_with_timeout(func, args, timeout):
result = []
stop_event = threading.Event()
def target():
def check_solution(self, solution, test, entry_point):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
result.append(func(*args))
except Exception as e:
result.append(e)
finally:
stop_event.set()
thread = threading.Thread(target=target)
thread.start()
is_timeout = not stop_event.wait(timeout)
if is_timeout:
# 线程仍在运行,我们无法强制终止它,但至少可以标记超时
raise TimeoutError("Function execution timed out")
if not result:
return None
if isinstance(result[0], Exception):
raise result[0]
return result[0]
def check_solution(solution, test, entry_point):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
# 定义一个包含所有必要模块的全局字典
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
}
if entry_point == "decode_cyclic":
solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution
elif entry_point == "decode_shift":
solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution
elif entry_point == "find_zero":
solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution
# 执行解决方案
exec(solution, global_dict)
# 确保入口点函数已定义
if entry_point not in global_dict:
raise ValueError(f"函数 {entry_point} 在解决方案中未定义。")
# 执行测试用例
exec(test, global_dict)
# 获取检查函数
check = global_dict["check"]
# 运行检查函数设置超时时间为120秒
result = run_with_timeout(check, (global_dict[entry_point],), 15)
if result is None:
result = (PASS, "解决方案通过了所有测试用例。")
except TimeoutError:
result = (FAIL, "执行超时。请检查您的解决方案是否包含无限循环或过于耗时的操作。")
except Exception as e:
# 记录详细的错误信息
error_message = f"错误: {str(e)}.\n 解决方案: {solution}.\n 测试: {test}"
result = (FAIL, error_message)
# 将错误信息写入error.log文件
with open('error.log', 'a', encoding='utf-8') as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
async def evaluate_problem(data: dict, graph: Callable, path) -> Tuple[str, str, str, int, str]:
max_retries = 5
retries = 0
expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
while retries < max_retries:
try:
prediction = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60) if graph else "None"
cost = prediction[1]
solution = prediction[0]
ret = check_solution(solution, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
score = 1 if ret[0] == PASS else 0
if score == 0:
log_mismatch(data["prompt"], expected_output, solution, score, path)
break
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
}
except TimeoutError:
solution = None
ret = (FAIL, ["超时"])
score = 0
cost = 0
break
# Add handling for special cases
if entry_point == "decode_cyclic":
solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution
elif entry_point == "decode_shift":
solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution
elif entry_point == "find_zero":
solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution
exec(solution, global_dict)
if entry_point not in global_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
exec(test, global_dict)
check = global_dict["check"]
result = self.run_with_timeout(check, (global_dict[entry_point],), 15)
if result is None:
result = (self.PASS, "The solution passed all test cases.")
except self.TimeoutError:
result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.")
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
result = (self.FAIL, error_message)
with open('error.log', 'a', encoding='utf-8') as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
solution = None
ret = (FAIL, [])
score = 0
cost = 0
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
max_retries = 5
retries = 0
expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
while retries < max_retries:
try:
prediction, cost = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60)
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
score = 1.0 if ret[0] == self.PASS else 0.0
if score == 0:
self.log_mismatch(data["prompt"], expected_output, prediction, score)
break
except asyncio.TimeoutError:
prediction = None
ret = (self.FAIL, ["Timeout"])
score = 0.0
cost = 0.0
break
return data["prompt"], solution, expected_output, score, cost # 修改返回值以包含cost
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
async def evaluate_all_problems(data: List[dict], graph: Callable, path:str="", max_concurrent_tasks: int = 50) -> List[Tuple[str, str, str, int, str]]:
semaphore = asyncio.Semaphore(max_concurrent_tasks)
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
ret = (self.FAIL, [])
score = 0.0
cost = 0.0
break
async def sem_evaluate(problem):
async with semaphore:
return await evaluate_problem(problem, graph, path)
return data["prompt"], prediction, expected_output, score, cost
tasks = [sem_evaluate(problem) for problem in data]
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
# The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface
return 0.0, prediction
return await tqdm_asyncio.gather(*tasks, desc="Evaluating HumanEval problems", total=len(data))
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def humaneval_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]:
data = await load_data(file_path, samples, test=test)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=5)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on HumanEval dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
print(f"Average cost on HumanEval dataset: {average_cost:.5f}")
return average_score, total_cost # 修改返回值以包含total_cost
async def optimize_humaneval_evaluation(graph: Callable, file_path: str, path: str, va_list: List[int]) -> Tuple[float, float, float]:
data = await load_file_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=10)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on HumanEval dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
print(f"Average cost on HumanEval dataset: {average_cost:.5f}")
return average_score, average_cost, total_cost
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -3,15 +3,16 @@
# @Author : all
# @Desc : Evaluation for different datasets
from typing import Literal, Tuple, Optional
from typing import Literal, Tuple, Optional, Dict
import asyncio
from examples.aflow.benchmark.gsm8k import optimize_gsm8k_evaluation
from examples.aflow.benchmark.math import optimize_math_evaluation
from examples.aflow.benchmark.humaneval import optimize_humaneval_evaluation
from examples.aflow.benchmark.hotpotqa import optimize_hotpotqa_evaluation
from examples.aflow.benchmark.mbpp import optimize_mbpp_evaluation
from examples.aflow.benchmark.drop import optimize_drop_evaluation
from examples.aflow.benchmark.benchmark import BaseBenchmark
from examples.aflow.benchmark.gsm8k import GSM8KBenchmark
from examples.aflow.benchmark.math import MATHBenchmark
from examples.aflow.benchmark.humaneval import HumanEvalBenchmark
from examples.aflow.benchmark.hotpotqa import HotpotQABenchmark
from examples.aflow.benchmark.mbpp import MBPPBenchmark
from examples.aflow.benchmark.drop import DROPBenchmark
# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
@ -23,51 +24,40 @@ class Evaluator:
def __init__(self, eval_path: str):
self.eval_path = eval_path
self.dataset_configs = {
"GSM8K": {"name": "GSM8K", "eval_func": optimize_gsm8k_evaluation},
"MATH": {"name": "MATH", "eval_func": optimize_math_evaluation},
"HumanEval": {"name": "HumanEval", "eval_func": optimize_humaneval_evaluation},
"HotpotQA": {"name": "HotpotQA", "eval_func": optimize_hotpotqa_evaluation},
"MBPP": {"name": "MBPP", "eval_func": optimize_mbpp_evaluation},
"DROP": {"name": "DROP", "eval_func": optimize_drop_evaluation},
self.dataset_configs: Dict[DatasetType, BaseBenchmark] = {
"GSM8K": GSM8KBenchmark,
"MATH": MATHBenchmark,
"HumanEval": HumanEvalBenchmark,
"HotpotQA": HotpotQABenchmark,
"MBPP": MBPPBenchmark,
"DROP": DROPBenchmark,
}
def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path, is_test=False):
"""
Evaluates on validation dataset.
"""
if dataset in self.dataset_configs:
return self._generic_eval(dataset, graph, params, path, is_test)
else:
return None
async def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False) -> Tuple[float, float, float]:
if dataset not in self.dataset_configs:
raise ValueError(f"Unsupported dataset: {dataset}")
async def _generic_eval(self, dataset: DatasetType, graph_class, params: dict, path: str, test: bool = False) -> Tuple[float, float, float]:
"""
Generic evaluation function for all datasets.
"""
async def load_graph():
dataset_config = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name=self.dataset_configs[dataset]["name"], llm_config=llm_config, dataset=dataset_config)
data_path = self._get_data_path(dataset, is_test)
benchmark_class = self.dataset_configs[dataset]
benchmark = benchmark_class(dataset, data_path, path)
data_path, va_list = self._get_data_path_and_va_list(dataset, test)
graph = await load_graph()
eval_func = self.dataset_configs[dataset]["eval_func"]
avg_score, avg_cost, total_cost = await eval_func(graph, data_path, path, va_list)
return avg_score, avg_cost, total_cost
# Use params to configure the graph and benchmark
configured_graph = await self._configure_graph(graph, params)
def _get_data_path_and_va_list(self, dataset: DatasetType, test: bool) -> Tuple[str, Optional[list]]:
"""
Get data path and validation list based on dataset and test flag.
"""
va_list = [1,2,3] # Use va_list from params, or use default value if not provided
return await benchmark.run_evaluation(configured_graph, va_list)
async def _configure_graph(self, graph, params: dict):
# Here you can configure the graph based on params
# For example: set LLM configuration, dataset configuration, etc.
dataset_config = params.get("dataset", {})
llm_config = params.get("llm_config", {})
return graph(name=self.dataset_configs[dataset]["name"], llm_config=llm_config, dataset=dataset_config)
def _get_data_path(self, dataset: DatasetType, test: bool) -> str:
base_path = f"examples/aflow/data/{dataset.lower()}"
if test:
return f"{base_path}_test.jsonl", None
else:
return f"{base_path}_validate.jsonl", [1, 2, 3] # Replace with the actual filtered index list
return f"{base_path}_test.jsonl" if test else f"{base_path}_validate.jsonl"
# Alias methods for backward compatibility
for dataset in ["gsm8k", "math", "humaneval", "mbpp", "hotpotqa", "drop"]:
setattr(Evaluator, f"_{dataset}_eval", lambda self, *args, dataset=dataset.upper(), **kwargs: self._generic_eval(dataset, *args, **kwargs))
setattr(Evaluator, f"_{dataset}_eval", lambda self, *args, dataset=dataset.upper(), **kwargs: self.graph_evaluate(dataset, *args, **kwargs))