Update Eval

This commit is contained in:
didi 2024-10-16 12:06:34 +08:00
parent bb229f2319
commit eea94865ad
2 changed files with 245 additions and 177 deletions

View file

@ -1,18 +1,16 @@
import os
import re
import json
import asyncio
import pandas as pd
import string
import re
from typing import List, Tuple, Callable, Dict, Any, Set, Union
from collections import Counter
import numpy as np
from scipy.optimize import linear_sum_assignment
from datetime import datetime
from typing import Any, Callable, Dict, List, Tuple
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from examples.aflow.benchmark.utils import generate_random_indices
global cost
cost = 0
def is_number(text: str) -> bool:
try:
@ -41,7 +39,7 @@ def normalize_answer(s):
return white_space_fix(remove_articles(remove_punc(lower(s))))
def compute_f1_score(prediction, ground_truth):
def calculate_score(ground_truth: str, prediction: str):
"""
Compute the F1 score between prediction and ground truth answers.
"""
@ -50,59 +48,120 @@ def compute_f1_score(prediction, ground_truth):
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
return f1, prediction
# def fuzzy_match(s1: str, s2: str) -> bool:
# s1 = normalize(s1)
# s2 = normalize(s2)
# if s1 == "" or s2 == "":
# return s1 == s2
# return s1 in s2 or s2 in s1
def ensure_log_file_exists(path: str):
log_file = os.path.join(path, 'log.json')
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([], f, indent=4, ensure_ascii=False)
# def drop_metric(sample: str, reference: list[str]) -> Tuple[float, float]:
# em_scores = []
# f1_scores = []
# for answer in reference:
# if answer.strip() != "":
# em, f1 = get_drop_metrics(sample, answer)
# em_scores.append(em)
# f1_scores.append(f1)
# return (max(em_scores), max(f1_scores))
def log_mismatch(problem: str, expected_output, prediction: str, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
async def evaluate_problem(inputs: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]:
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
# 添加新的日志记录
data.append(log_data)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["inputs", "prediction", "expected_output", "score", "cost"])
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(annotation: dict, graph: Callable, log_path) -> Tuple[str, str, float]:
expected_output = annotation["ref_text"]
inputs = annotation["context"]
answers = expected_output.split("|")
max_retries = 5
retries = 0
while retries < max_retries:
try:
global cost
prediction, cost = await graph(inputs)
output, cost = await graph(inputs) if graph else "None"
f1_scores = []
# if '|' in the output, split it and calculate the score for each part
for answer in answers:
if answer.strip() != "":
f1_score = compute_f1_score(prediction, answer)
f1_scores.append(f1_score)
if '|' in output:
output_parts = output.split('|')
for output_part in output_parts:
f1_score, _ = calculate_score(answer, output_part)
f1_scores.append(f1_score)
else:
f1_score, _ = calculate_score(answer, output)
f1_scores.append(f1_score)
max_score = max(f1_scores)
# matches = [
# fuzzy_match(prediction, answer)
# for answer in answers
# ]
uni_score = max_score
# score = True in matches
score = max_score
if uni_score == 0:
log_mismatch(inputs, expected_output, output, output, log_path)
else:
ensure_log_file_exists(log_path)
break
@ -112,81 +171,29 @@ async def evaluate_problem(inputs: str, answers: List[Dict[str, Any]], graph: Ca
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
score = 0.0
output = e
uni_score = 0.0
cost = None
break
return prediction, score
return inputs, output, expected_output, uni_score, cost
async def evaluate_all_questions(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]:
async def evaluate_all_questions(data: List[Tuple[str, Dict[str, Any]]], graph: Callable, path, max_concurrent_tasks: int = 50) -> List[List[Any]]:
semaphore = asyncio.Semaphore(max_concurrent_tasks)
results = []
async def sem_evaluate(annotation: Dict[str, Any]):
async def sem_evaluate(problem):
async with semaphore:
inputs = annotation["context"]
answers = annotation["targets"]
prediction, score = await evaluate_problem(inputs, answers, graph)
results.append([annotation["id"], prediction, answers, score])
return await evaluate_problem(problem, graph, path)
tasks = [sem_evaluate(annotation) for annotation in annotations]
await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP passages", total=len(annotations))
tasks = [sem_evaluate(problem) for problem in data]
return results
return await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP problems", total=len(data))
def save_results_to_csv(results: List[List[Any]], path: str) -> float:
df = pd.DataFrame(results, columns=["id", "prediction", "answers", "score"])
average_score = df["score"].mean()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
# -- From ADAS --
def load_drop(file_path, samples, test=False, total_length=1000):
import gzip
with gzip.open(file_path, "rb") as file:
data = [json.loads(line) for line in file]
random_indices = generate_random_indices(len(data), total_length, False)
random_indices = random_indices[:samples] if not test else random_indices[samples:]
examples = [data[i] for i in random_indices]
for example in examples:
example["targets"] = example["ref_text"].split("|")
return examples
async def drop_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> float:
# data = load_data(file_path, samples, test=test)
data = load_drop(file_path, samples, test=test)
results = await evaluate_all_questions(data, graph, max_concurrent_tasks=30)
average_score = save_results_to_csv(results, path=path)
async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str, va_list: list):
data = await load_data(file_path, va_list)
results = await evaluate_all_questions(data, graph, path, max_concurrent_tasks=25)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost
def load_drop_from_file(file_path):
import gzip
with gzip.open(file_path, "rb") as file:
data = [json.loads(line) for line in file]
for example in data:
example["targets"] = example["ref_text"].split("|")
return data
async def optimize_drop_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_drop_from_file(file_path)
results = await evaluate_all_questions(data, graph, max_concurrent_tasks=50)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost

View file

@ -6,15 +6,10 @@ import numpy as np
from typing import List, Tuple, Callable, Set
from collections import Counter
from tqdm.asyncio import tqdm_asyncio
from scipy.optimize import linear_sum_assignment
import string
import re
from examples.aflow.benchmark.utils import generate_random_indices
global cost
cost = 0
import os
from datetime import datetime
def is_number(text: str) -> bool:
try:
@ -43,7 +38,7 @@ def normalize_answer(s):
return white_space_fix(remove_articles(remove_punc(lower(s))))
def f1_score(prediction, ground_truth):
def calculate_score(ground_truth: str, prediction: str):
"""
Compute the F1 score between prediction and ground truth answers.
"""
@ -52,36 +47,101 @@ def f1_score(prediction, ground_truth):
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
return f1, prediction
def ensure_log_file_exists(path: str):
log_file = os.path.join(path, 'log.json')
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([], f, indent=4, ensure_ascii=False)
async def load_data(file_path: str, samples=20, total_length=1250, test=False) -> List[dict]:
def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
# 添加新的日志记录
data.append(log_data)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), total_length, False) # get random indices of 1250
random_indices = random_indices[:samples] if not test else random_indices[samples:] # get n_samples for validation or test
data = [data[i] for i in random_indices]
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
async def evaluate_problem(input: str, context_str: str, graph: Callable, expected_output: str):
def save_results_to_csv(results: List[Tuple[str, str, str, str, float, float]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "context", "prediction", "expected_output", "score", "cost"])
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(problem: dict, graph: Callable, log_path: str):
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
max_retries = 5
retries = 0
# global cost
# prediction, cost = await graph(input, context_str) if graph else "None"
# score = f1_score(prediction, expected_output)
while retries < max_retries:
try:
global cost
prediction, cost = await graph(input, context_str) if graph else "None"
score = f1_score(prediction, expected_output)
output, cost = await graph(input_text, context_str) if graph else "None"
uni_score, extracted_output = calculate_score(expected_output, output)
if uni_score == 0:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)
break
except Exception as e:
@ -90,62 +150,63 @@ async def evaluate_problem(input: str, context_str: str, graph: Callable, expect
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
score = 0
output = e
cost = None
uni_score = 0
break
return input, prediction, expected_output, score
return input_text, context_str, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50):
async def evaluate_problem_optimize(problem: dict, graph: Callable, log_path: str):
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
max_retries = 5
retries = 0
while retries < max_retries:
try:
output, cost = await graph(inputs) if graph else "None"
uni_score, extracted_output = calculate_score(expected_output, output)
if uni_score == 0:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = e
cost = None
uni_score = 0
break
return input_text, context_str, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_concurrent_tasks: int = 50):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
return await evaluate_problem(input_text, context_str, graph, expected_output)
return await evaluate_problem_optimize(problem, graph, path)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data))
def save_results_to_csv(results: List[Tuple[str, str, str, float]], path: str) -> float:
df = pd.DataFrame(
results, columns=["question", "prediction", "expected_output", "score"]
)
average_score = df["score"].mean()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
async def hotpotqa_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> float:
data = await load_data(file_path, samples, test=test)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
average_score = save_results_to_csv(results, path=path)
async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str, va_list: list):
data = await load_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=20)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_hotpotqa_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost: .5f}")
print(f"Cost per sample: {(cost / len(data)):.9f}")
return average_score, cost
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost