diff --git a/examples/aflow/benchmark/drop.py b/examples/aflow/benchmark/drop.py index 547356f0f..e8ee124d7 100644 --- a/examples/aflow/benchmark/drop.py +++ b/examples/aflow/benchmark/drop.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Tuple import aiofiles import pandas as pd from tqdm.asyncio import tqdm_asyncio +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from examples.aflow.benchmark.benchmark import BaseBenchmark @@ -52,40 +53,41 @@ class DROPBenchmark(BaseBenchmark): f1 = (2 * precision * recall) / (precision + recall) return f1, prediction + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, input_text): + return await graph(input_text) + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]: input_text = problem["context"] expected_output = problem["ref_text"] answers = expected_output.split("|") - max_retries = 5 - retries = 0 + try: + output, cost = await self._generate_output(graph, input_text) + f1_scores = [] - while retries < max_retries: - try: - output, cost = await graph(input_text) - f1_scores = [] + for answer in answers: + if answer.strip() != "": + output_parts = output.split("|") + for output_part in output_parts: + f1_score, _ = self.calculate_score(answer, output_part) + f1_scores.append(f1_score) - for answer in answers: - if answer.strip() != "": - output_parts = output.split("|") - for output_part in output_parts: - f1_score, _ = self.calculate_score(answer, output_part) - f1_scores.append(f1_score) + uni_score = max(f1_scores) - uni_score = max(f1_scores) + if uni_score < 0.3: + self.log_mismatch(input_text, expected_output, output, output) - if uni_score < 0.3: - self.log_mismatch(input_text, expected_output, output, output) + return input_text, output, expected_output, uni_score, cost - return input_text, output, expected_output, uni_score, cost - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - return input_text, str(e), expected_output, 0.0, 0.0 + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, str(e), expected_output, 0.0, 0.0 def get_result_columns(self) -> List[str]: return ["inputs", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/gsm8k.py b/examples/aflow/benchmark/gsm8k.py index 1e38fc23d..f3b86644c 100644 --- a/examples/aflow/benchmark/gsm8k.py +++ b/examples/aflow/benchmark/gsm8k.py @@ -14,6 +14,8 @@ from tqdm.asyncio import tqdm_asyncio import os import time from datetime import datetime +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type + from examples.aflow.benchmark.benchmark import BaseBenchmark @@ -36,31 +38,33 @@ class GSM8KBenchmark(BaseBenchmark): if prediction is None: return 0.0, prediction return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction + + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, input_text): + return await graph(input_text) async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]: - max_retries = 5 - retries = 0 - - while retries < max_retries: - try: - prediction, cost = await graph(problem["question"]) - predicted_number = self.extract_number(prediction) - expected_output = self.extract_number(problem["answer"]) + input_text = problem["question"] + expected_output = self.extract_number(problem["answer"]) - score, _ = self.calculate_score(expected_output, predicted_number) + try: + output, cost = await self._generate_output(graph, input_text) + predicted_number = self.extract_number(output) + score, extracted_output = self.calculate_score(expected_output, predicted_number) - if score == 0: - self.log_mismatch(problem["question"], expected_output, prediction, predicted_number) + if score == 0: + self.log_mismatch(input_text, expected_output, output, extracted_output) - return problem["question"], prediction, expected_output, score, cost + return input_text, output, expected_output, score, cost - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - return problem["question"], str(e), self.extract_number(problem["answer"]), 0.0, 0.0 + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, str(e), expected_output, 0.0, 0.0 def get_result_columns(self) -> List[str]: return ["question", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/hotpotqa.py b/examples/aflow/benchmark/hotpotqa.py index fa6371aff..2a715c0ca 100644 --- a/examples/aflow/benchmark/hotpotqa.py +++ b/examples/aflow/benchmark/hotpotqa.py @@ -7,6 +7,7 @@ import string import re import os from collections import Counter +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from examples.aflow.benchmark.benchmark import BaseBenchmark @@ -42,6 +43,15 @@ class HotpotQABenchmark(BaseBenchmark): f1 = (2 * precision * recall) / (precision + recall) return f1, prediction + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, input_text): + return await graph(input_text) + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]: input_text = problem["question"] expected_output = problem["answer"] @@ -49,26 +59,18 @@ class HotpotQABenchmark(BaseBenchmark): context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs) inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:" - max_retries = 5 - retries = 0 + try: + output, cost = await self._generate_output(graph, inputs) + score, extracted_output = self.calculate_score(expected_output, output) - while retries < max_retries: - try: - output, cost = await graph(inputs) - score, _ = self.calculate_score(expected_output, output) + if score < 0.3: # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1 + self.log_mismatch(input_text, expected_output, output, extracted_output) - if score < 0.3: - self.log_mismatch(input_text, expected_output, output, output) + return input_text, context_str, output, expected_output, score, cost - return input_text, context_str, output, expected_output, score, cost - - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") - - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - return input_text, context_str, str(e), expected_output, 0.0, 0.0 + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, context_str, str(e), expected_output, 0.0, 0.0 def get_result_columns(self) -> List[str]: return ["question", "context", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/humaneval.py b/examples/aflow/benchmark/humaneval.py index 797fe765c..75252626e 100644 --- a/examples/aflow/benchmark/humaneval.py +++ b/examples/aflow/benchmark/humaneval.py @@ -5,6 +5,8 @@ import asyncio import threading from datetime import datetime from typing import List, Tuple, Callable, Dict, Any, Optional +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type + import pandas as pd @@ -93,44 +95,45 @@ class HumanEvalBenchmark(BaseBenchmark): return result - async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: - max_retries = 5 - retries = 0 + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, prompt, entry_point): + # Generate output with a timeout of 60 seconds + return await asyncio.wait_for(graph(prompt, entry_point), timeout=60) + async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: + input_text = data["prompt"] expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] - while retries < max_retries: - try: - prediction, cost = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60) - ret = self.check_solution(prediction, data["test"], data["entry_point"]) - test_case_details = ret[1] - expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] - score = 1.0 if ret[0] == self.PASS else 0.0 + try: + # Generate prediction using the graph function + prediction, cost = await self._generate_output(graph, input_text, data["entry_point"]) + + # Check the solution + ret = self.check_solution(prediction, data["test"], data["entry_point"]) + test_case_details = ret[1] + expected_output = test_case_details + expected_output + + # Calculate score based on the check result + score = 1.0 if ret[0] == self.PASS else 0.0 - if score == 0: - self.log_mismatch(data["prompt"], expected_output, prediction, score) - break - - except asyncio.TimeoutError: - prediction = None - ret = (self.FAIL, ["Timeout"]) - score = 0.0 - cost = 0.0 - break + # Log mismatch if the score is 0 + if score == 0: + self.log_mismatch(input_text, expected_output, prediction, score) - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + return input_text, prediction, expected_output, score, cost - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - prediction = None - ret = (self.FAIL, []) - score = 0.0 - cost = 0.0 - break + except asyncio.TimeoutError: + print("Timeout error. Skipping this sample.") + return input_text, "Timeout", expected_output, 0.0, 0.0 - return data["prompt"], prediction, expected_output, score, cost + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, str(e), expected_output, 0.0, 0.0 def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]: # The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface diff --git a/examples/aflow/benchmark/math.py b/examples/aflow/benchmark/math.py index d98859484..1b91c1bde 100644 --- a/examples/aflow/benchmark/math.py +++ b/examples/aflow/benchmark/math.py @@ -6,6 +6,8 @@ from sympy.parsing.sympy_parser import parse_expr from math import isclose import multiprocessing from typing import Any, Callable, Tuple, List +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type + from examples.aflow.benchmark.benchmark import BaseBenchmark @@ -95,22 +97,34 @@ class MATHBenchmark(BaseBenchmark): pass return False + + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, input_text): + return await graph(input_text) + + async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, int, float]: input_text = problem["problem"] expected_output = problem["solution"] - max_retries = 2 - retries = 0 - prediction = await graph(input_text) - cost = prediction[1] - output = prediction[0] + try: + output, cost = await self._generate_output(graph, input_text) + uni_score, extracted_output = self.calculate_score(expected_output, output) - uni_score, extracted_output = self.calculate_score(expected_output, output) + if uni_score == 0: + self.log_mismatch(input_text, expected_output, output, extracted_output) - if uni_score == 0: - self.log_mismatch(input_text, expected_output, output, extracted_output) + return input_text, output, expected_output, uni_score, cost + + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, str(e), expected_output, 0.0, 0.0 - return input_text, output, expected_output, uni_score, cost def get_result_columns(self) -> List[str]: return ["question", "prediction", "expected_output", "score", "cost"] diff --git a/examples/aflow/benchmark/mbpp.py b/examples/aflow/benchmark/mbpp.py index afa66c934..2edede302 100644 --- a/examples/aflow/benchmark/mbpp.py +++ b/examples/aflow/benchmark/mbpp.py @@ -5,6 +5,7 @@ import asyncio import threading from datetime import datetime from typing import List, Tuple, Callable, Any, Optional, Dict +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from metagpt.actions.code_sanitize import sanitize from examples.aflow.benchmark.benchmark import BaseBenchmark @@ -83,37 +84,40 @@ class MBPPBenchmark(BaseBenchmark): return result - async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: - max_retries = 5 - retries = 0 + @retry( + stop=stop_after_attempt(5), + wait=wait_fixed(1), + retry=retry_if_exception_type(Exception), + reraise=True + ) + async def _generate_output(self, graph, prompt, entry_point): + return await graph(prompt, entry_point) + async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: + input_text = data["prompt"] expected_output = "\nCorrect Solution:\ndef " + data["code"] - while retries < max_retries: - try: - prediction, cost = await graph(data["prompt"], data["entry_point"]) - ret = self.check_solution(prediction, data["test"], data["entry_point"]) - test_case_details = ret[1] - expected_output = test_case_details + "\nCorrect Solution:" + data["code"] - score = 1.0 if ret[0] == self.PASS else 0.0 + try: + # Generate prediction using the graph function + prediction, cost = await self._generate_output(graph, input_text, data["entry_point"]) + + # Check the solution + ret = self.check_solution(prediction, data["test"], data["entry_point"]) + test_case_details = ret[1] + expected_output = test_case_details + "\nCorrect Solution:" + data["code"] + + # Calculate score based on the check result + score = 1.0 if ret[0] == self.PASS else 0.0 - if score == 0: - self.log_mismatch(data["prompt"], expected_output, prediction, score) - break + # Log mismatch if the score is 0 + if score == 0: + self.log_mismatch(input_text, expected_output, prediction, score) - except Exception as e: - retries += 1 - print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})") + return input_text, prediction, expected_output, score, cost - if retries == max_retries: - print("Maximum retries reached. Skipping this sample.") - prediction = None - ret = (self.FAIL, []) - score = 0.0 - cost = 0.0 - break - - return data["prompt"], prediction, expected_output, score, cost + except Exception as e: + print(f"Maximum retries reached. Skipping this sample. Error: {e}") + return input_text, str(e), expected_output, 0.0, 0.0 def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]: # The scoring logic for MBPP is already implemented in evaluate_problem, this is just to conform to the interface diff --git a/metagpt/llm.py b/metagpt/llm.py index a918edd2a..465e419a1 100644 --- a/metagpt/llm.py +++ b/metagpt/llm.py @@ -10,12 +10,6 @@ from typing import Optional from metagpt.configs.llm_config import LLMConfig from metagpt.context import Context from metagpt.provider.base_llm import BaseLLM -from metagpt.utils.cost_manager import CostManager - -global cost_manager - -if not globals().get("cost_manager"): - cost_manager = CostManager() def LLM(llm_config: Optional[LLMConfig] = None, context: Context = None) -> BaseLLM: @@ -23,6 +17,4 @@ def LLM(llm_config: Optional[LLMConfig] = None, context: Context = None) -> Base ctx = context or Context() if llm_config is not None: return ctx.llm_with_cost_manager_from_llm_config(llm_config) - llm = ctx.llm() - llm.cost_manager = cost_manager - return llm + return ctx.llm()