Update QA

This commit is contained in:
Zhaoyang Yu 2024-09-10 21:51:38 +08:00
parent 1de0653d70
commit f691c5f439
2 changed files with 426 additions and 0 deletions

View file

@ -0,0 +1,297 @@
import json
import asyncio
import pandas as pd
import string
import re
from typing import List, Tuple, Callable, Dict, Any, Set, Union
import numpy as np
from scipy.optimize import linear_sum_assignment
from tqdm.asyncio import tqdm_asyncio
from examples.ags.benchmark.utils import generate_random_indices
global cost
cost = 0
def _remove_articles(text: str) -> str:
regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
return re.sub(regex, " ", text)
def _white_space_fix(text: str) -> str:
return " ".join(text.split())
EXCLUDE = set(string.punctuation)
def _is_number(text: str) -> bool:
try:
float(text)
return True
except ValueError:
return False
def _normalize_number(text: str) -> str:
if _is_number(text):
return str(float(text))
else:
return text
def _remove_punc(text: str) -> str:
if not _is_number(text):
return "".join(ch for ch in text if ch not in EXCLUDE)
else:
return text
def _lower(text: str) -> str:
return text.lower()
def _tokenize(text: str) -> List[str]:
return re.split(" |-", text)
def _normalize_answer(text: str) -> str:
"""Lower text and remove punctuation, articles and extra whitespace."""
parts = [
_white_space_fix(_remove_articles(_normalize_number(_remove_punc(_lower(token)))))
for token in _tokenize(text)
]
parts = [part for part in parts if part.strip()]
normalized = " ".join(parts).strip()
return normalized
def _answer_to_bags(
answer: Union[str, List[str], Tuple[str, ...]]
) -> Tuple[List[str], List[Set[str]]]:
if isinstance(answer, (list, tuple)):
raw_spans = answer
else:
raw_spans = [answer]
normalized_spans: List[str] = []
token_bags = []
for raw_span in raw_spans:
normalized_span = _normalize_answer(raw_span)
normalized_spans.append(normalized_span)
token_bags.append(set(normalized_span.split()))
return normalized_spans, token_bags
def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]:
"""
Takes gold and predicted answer sets and first finds the optimal 1-1 alignment
between them and gets maximum metric values over all the answers.
"""
scores = np.zeros([len(gold), len(predicted)])
for gold_index, gold_item in enumerate(gold):
for pred_index, pred_item in enumerate(predicted):
if _match_numbers_if_present(gold_item, pred_item):
scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item)
row_ind, col_ind = linear_sum_assignment(-scores)
max_scores = np.zeros([max(len(gold), len(predicted))])
for row, column in zip(row_ind, col_ind):
max_scores[row] = max(max_scores[row], scores[row, column])
return max_scores
def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float:
intersection = len(gold_bag.intersection(predicted_bag))
if not predicted_bag:
precision = 1.0
else:
precision = intersection / float(len(predicted_bag))
if not gold_bag:
recall = 1.0
else:
recall = intersection / float(len(gold_bag))
f1 = (
(2 * precision * recall) / (precision + recall)
if not (precision == 0.0 and recall == 0.0)
else 0.0
)
return f1
def _match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool:
gold_numbers = set()
predicted_numbers = set()
for word in gold_bag:
if _is_number(word):
gold_numbers.add(word)
for word in predicted_bag:
if _is_number(word):
predicted_numbers.add(word)
if (not gold_numbers) or gold_numbers.intersection(predicted_numbers):
return True
return False
def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float:
intersection = len(gold_bag.intersection(predicted_bag))
if not predicted_bag:
precision = 1.0
else:
precision = intersection / float(len(predicted_bag))
if not gold_bag:
recall = 1.0
else:
recall = intersection / float(len(gold_bag))
f1 = (
(2 * precision * recall) / (precision + recall)
if not (precision == 0.0 and recall == 0.0)
else 0.0
)
return f1
def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]:
"""
Takes gold and predicted answer sets and first finds the optimal 1-1 alignment
between them and gets maximum metric values over all the answers.
"""
scores = np.zeros([len(gold), len(predicted)])
for gold_index, gold_item in enumerate(gold):
for pred_index, pred_item in enumerate(predicted):
if _match_numbers_if_present(gold_item, pred_item):
scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item)
row_ind, col_ind = linear_sum_assignment(-scores)
max_scores = np.zeros([max(len(gold), len(predicted))])
for row, column in zip(row_ind, col_ind):
max_scores[row] = max(max_scores[row], scores[row, column])
return max_scores
def get_metrics(
predicted: Union[str, List[str], Tuple[str, ...]], gold: Union[str, List[str], Tuple[str, ...]]
) -> Tuple[float, float]:
"""
Takes a predicted answer and a gold answer (that are both either a string or a list of
strings), and returns exact match and the DROP F1 metric for the prediction. If you are
writing a script for evaluating objects in memory (say, the output of predictions during
validation, or while training), this is the function you want to call, after using
:func:`answer_json_to_strings` when reading the gold answer from the released data file.
"""
predicted_bags = _answer_to_bags(predicted)
gold_bags = _answer_to_bags(gold)
if set(predicted_bags[0]) == set(gold_bags[0]) and len(predicted_bags[0]) == len(gold_bags[0]):
exact_match = 1.0
else:
exact_match = 0.0
f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1])
f1 = np.mean(f1_per_bag)
f1 = round(f1, 2)
return exact_match, f1
def answer_json_to_strings(answer: Dict[str, Any]) -> Tuple[Tuple[str, ...], str]:
"""
Takes an answer JSON blob from the DROP data release and converts it into strings used for
evaluation.
"""
if "number" in answer and answer["number"]:
return tuple([str(answer["number"])]), "number"
elif "spans" in answer and answer["spans"]:
return tuple(answer["spans"]), "span" if len(answer["spans"]) == 1 else "spans"
elif "date" in answer:
return (
tuple(
[
"{0} {1} {2}".format(
answer["date"]["day"], answer["date"]["month"], answer["date"]["year"]
)
]
),
"date",
)
else:
raise ValueError(
f"Answer type not found, should be one of number, spans or date at: {json.dumps(answer)}"
)
def load_data(file_path: str, samples: int) -> List[Tuple[str, Dict[str, Any]]]:
with open(file_path, mode="r") as file:
data = json.load(file)
data = list(data.items())
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
async def evaluate_problem(question: str, passage: str, answers: List[Dict[str, Any]], graph: Callable) -> Tuple[str, str, float]:
max_retries = 5
retries = 0
while retries < max_retries:
try:
global cost
prediction, cost = await graph(question, passage)
max_score = 0.0
max_type = None
best_answer = None
for answer in answers:
golden_answer, golden_type = answer_json_to_strings(answer)
_, f1_score = get_metrics(prediction, golden_answer)
if golden_answer[0].strip() != "":
max_score = max(max_score, f1_score)
if max_score == f1_score:
max_type = golden_type
best_answer = golden_answer
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
best_answer = None
prediction = None
max_score = 0.0
break
return best_answer, prediction, max_score
async def evaluate_all_passages(annotations: List[Tuple[str, Dict[str, Any]]], graph: Callable, max_concurrent_tasks: int = 50) -> List[List[Any]]:
semaphore = asyncio.Semaphore(max_concurrent_tasks)
results = []
async def sem_evaluate(id: str, annotation: Dict[str, Any]):
async with semaphore:
passage = annotation["passage"]
for qa_pair in annotation["qa_pairs"]:
question = qa_pair["question"]
answers = [qa_pair["answer"]]
if "validated_answers" in qa_pair and qa_pair["validated_answers"]:
answers += qa_pair["validated_answers"]
best_answer, prediction, score = await evaluate_problem(question, passage, answers, graph)
results.append([id, question, prediction, best_answer, score])
tasks = [sem_evaluate(id, annotation) for id, annotation in annotations]
await tqdm_asyncio.gather(*tasks, desc="Evaluating DROP passages", total=len(annotations))
return results
def save_results_to_csv(results: List[List[Any]], path: str) -> float:
df = pd.DataFrame(results, columns=["id", "question", "prediction", "best_answer", "score"])
average_score = df["score"].mean()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
async def drop_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float:
data = load_data(file_path, samples)
results = await evaluate_all_passages(data, graph, max_concurrent_tasks=20)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on DROP dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost}")
return average_score

View file

@ -0,0 +1,129 @@
import json
import asyncio
import aiofiles
import pandas as pd
import numpy as np
from typing import List, Tuple, Callable, Set
from collections import Counter
from tqdm.asyncio import tqdm_asyncio
from scipy.optimize import linear_sum_assignment
import string
import re
from examples.ags.benchmark.utils import generate_random_indices
global cost
cost = 0
def is_number(text: str) -> bool:
try:
float(text)
return True
except ValueError:
return False
def normalize_answer(s):
"""
Normalize answers for evaluation.
"""
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def f1_score(prediction, ground_truth):
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = normalize_answer(prediction).split()
ground_truth_tokens = normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
async def load_data(file_path: str, samples=20, total_length=1000) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
data = data[:total_length]
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
async def evaluate_problem(input: str, context_str: str, graph: Callable, expected_output: str):
max_retries = 5
retries = 0
while retries < max_retries:
try:
global cost
prediction, cost = await graph(input, context_str) if graph else "None"
score = f1_score(prediction, expected_output)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
score = 0
break
return input, prediction, expected_output, score
async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 50):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
return await evaluate_problem(input_text, context_str, graph, expected_output)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating HotpotQA problems", total=len(data))
def save_results_to_csv(results: List[Tuple[str, str, str, float]], path: str) -> float:
df = pd.DataFrame(
results, columns=["question", "prediction", "expected_output", "score"]
)
average_score = df["score"].mean()
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
async def hotpotqa_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float:
data = await load_data(file_path, samples)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
average_score = save_results_to_csv(results, path=path)
print(f"Average score on HotpotQA dataset: {average_score:.5f}")
global cost
print(f"Total cost: {cost}")
return average_score