diff --git a/examples/aflow/optimize.py b/examples/aflow/optimize.py index 2a0b6d5bb..503f84ec0 100644 --- a/examples/aflow/optimize.py +++ b/examples/aflow/optimize.py @@ -3,11 +3,10 @@ # @Author : didi # @Desc : Entrance of AFlow. -from metagpt.ext.aflow.scripts.optimizer import Optimizer -from metagpt.ext.aflow.scripts.optimizer import DatasetType, QuestionType, OptimizerType -from metagpt.ext.aflow.data.download_data import download + from metagpt.configs.models_config import ModelsConfig -from typing import Literal +from metagpt.ext.aflow.data.download_data import download +from metagpt.ext.aflow.scripts.optimizer import DatasetType, Optimizer, QuestionType # DatasetType, QuestionType, and OptimizerType definitions # DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"] @@ -32,30 +31,30 @@ claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620") # Config operators. operators = [ - "Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes. + "Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes. # "AnswerGenerate" # It's for qa # "CustomCodeGenerate", # It's for code - "ScEnsemble", # It's for code, math and qa + "ScEnsemble", # It's for code, math and qa # "Test", # It's for code - "Programmer", # It's for math + "Programmer", # It's for math ] # Create an optimizer instance optimizer = Optimizer( - dataset=dataset, # Config dataset - question_type=question_type, # Config Question Type - opt_llm_config=claude_llm_config, # Config Optimizer LLM - exec_llm_config=mini_llm_config, # Config Execution LLM - check_convergence=check_convergence, # Whether Early Stop - operators=operators, # Config Operators you want to use - optimized_path=optimized_path, # Config Optimized workflow's file path - sample=sample, # Only Top(sample) rounds will be selected. - initial_round=initial_round, # Optimize from initial round - max_rounds=max_rounds # The max iteration of AFLOW. + dataset=dataset, # Config dataset + question_type=question_type, # Config Question Type + opt_llm_config=claude_llm_config, # Config Optimizer LLM + exec_llm_config=mini_llm_config, # Config Execution LLM + check_convergence=check_convergence, # Whether Early Stop + operators=operators, # Config Operators you want to use + optimized_path=optimized_path, # Config Optimized workflow's file path + sample=sample, # Only Top(sample) rounds will be selected. + initial_round=initial_round, # Optimize from initial round + max_rounds=max_rounds, # The max iteration of AFLOW. ) if __name__ == "__main__": # Optimize workflow via setting the optimizer's mode to 'Graph' optimizer.optimize("Graph") # Test workflow via setting the optimizer's mode to 'Test' - # optimizer.optimize("Test") \ No newline at end of file + # optimizer.optimize("Test") diff --git a/metagpt/actions/action_node.py b/metagpt/actions/action_node.py index 0eb0e9069..1909d3835 100644 --- a/metagpt/actions/action_node.py +++ b/metagpt/actions/action_node.py @@ -39,11 +39,14 @@ class ReviseMode(Enum): TAG = "CONTENT" + + class FillMode(Enum): CODE_FILL = "code_fill" XML_FILL = "xml_fill" SINGLE_FILL = "single_fill" + LANGUAGE_CONSTRAINT = "Language: Please use the same language as Human INPUT." FORMAT_CONSTRAINT = f"Format: output wrapped inside [{TAG}][/{TAG}] like format example, nothing else." @@ -558,7 +561,7 @@ class ActionNode: if match: raw_value = match.group(1).strip() field_type = field_types.get(field_name) - + if field_type == str: extracted_data[field_name] = raw_value elif field_type == int: @@ -567,7 +570,7 @@ class ActionNode: except ValueError: extracted_data[field_name] = 0 # 或者其他默认值 elif field_type == bool: - extracted_data[field_name] = raw_value.lower() in ('true', 'yes', '1', 'on', 'True') + extracted_data[field_name] = raw_value.lower() in ("true", "yes", "1", "on", "True") elif field_type == list: try: extracted_data[field_name] = eval(raw_value) @@ -622,7 +625,6 @@ class ActionNode: if self.schema: schema = self.schema - if mode == FillMode.CODE_FILL.value: result = await self.code_fill(context, function_name, timeout) self.instruct_content = self.create_class()(**result) diff --git a/metagpt/ext/aflow/benchmark/benchmark.py b/metagpt/ext/aflow/benchmark/benchmark.py index 4132a9b04..1712e1eac 100644 --- a/metagpt/ext/aflow/benchmark/benchmark.py +++ b/metagpt/ext/aflow/benchmark/benchmark.py @@ -1,14 +1,15 @@ import asyncio import json import os -from typing import List, Tuple, Callable, Any from abc import ABC, abstractmethod from datetime import datetime +from typing import Any, Callable, List, Tuple import aiofiles import pandas as pd from tqdm.asyncio import tqdm_asyncio + class BaseBenchmark(ABC): def __init__(self, name: str, file_path: str, log_path: str): self.name = name @@ -17,7 +18,7 @@ class BaseBenchmark(ABC): async def load_data(self, specific_indices: List[int] = None) -> List[dict]: data = [] - async with aiofiles.open(self.file_path, mode="r", encoding='utf-8') as file: + async with aiofiles.open(self.file_path, mode="r", encoding="utf-8") as file: async for line in file: data.append(json.loads(line)) @@ -47,13 +48,13 @@ class BaseBenchmark(ABC): "question": problem, "right_answer": expected_output, "model_output": prediction, - "extracted_output": extracted_output + "extracted_output": extracted_output, } - log_file = os.path.join(self.log_path, 'log.json') + log_file = os.path.join(self.log_path, "log.json") if os.path.exists(log_file): - with open(log_file, 'r', encoding='utf-8') as f: + with open(log_file, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError: @@ -63,7 +64,7 @@ class BaseBenchmark(ABC): data.append(log_data) - with open(log_file, 'w', encoding='utf-8') as f: + with open(log_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) @abstractmethod @@ -97,5 +98,3 @@ class BaseBenchmark(ABC): print(f"Average score on {self.name} dataset: {average_score:.5f}") print(f"Total Cost: {total_cost:.5f}") return average_score, average_cost, total_cost - - diff --git a/metagpt/ext/aflow/benchmark/drop.py b/metagpt/ext/aflow/benchmark/drop.py index 61e5bb616..7963dcd45 100644 --- a/metagpt/ext/aflow/benchmark/drop.py +++ b/metagpt/ext/aflow/benchmark/drop.py @@ -1,19 +1,13 @@ -import os import re -import json -import asyncio import string from collections import Counter -from datetime import datetime -from typing import Any, Callable, Dict, List, Tuple +from typing import Callable, List, Tuple -import aiofiles -import pandas as pd -from tqdm.asyncio import tqdm_asyncio -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class DROPBenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): super().__init__(name, file_path, log_path) @@ -53,12 +47,7 @@ class DROPBenchmark(BaseBenchmark): f1 = (2 * precision * recall) / (precision + recall) return f1, prediction - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - retry=retry_if_exception_type(Exception), - reraise=True - ) + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True) async def _generate_output(self, graph, input_text): return await graph(input_text) diff --git a/metagpt/ext/aflow/benchmark/gsm8k.py b/metagpt/ext/aflow/benchmark/gsm8k.py index 1292dded1..86887db7f 100644 --- a/metagpt/ext/aflow/benchmark/gsm8k.py +++ b/metagpt/ext/aflow/benchmark/gsm8k.py @@ -3,22 +3,13 @@ # @Author : all # @Desc : test on gsm8k import re -import json -import asyncio -import aiofiles -import pandas as pd -from typing import Optional, List, Tuple, Callable, Any - -from pandas import Series -from tqdm.asyncio import tqdm_asyncio -import os -import time -from datetime import datetime -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +from typing import Callable, List, Optional, Tuple +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class GSM8KBenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): super().__init__(name, file_path, log_path) @@ -38,13 +29,8 @@ class GSM8KBenchmark(BaseBenchmark): if prediction is None: return 0.0, prediction return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction - - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - retry=retry_if_exception_type(Exception), - reraise=True - ) + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True) async def _generate_output(self, graph, input_text): return await graph(input_text) diff --git a/metagpt/ext/aflow/benchmark/hotpotqa.py b/metagpt/ext/aflow/benchmark/hotpotqa.py index 4b9e81d9f..3b485c022 100644 --- a/metagpt/ext/aflow/benchmark/hotpotqa.py +++ b/metagpt/ext/aflow/benchmark/hotpotqa.py @@ -1,16 +1,13 @@ -import json -import asyncio -import aiofiles -import pandas as pd -from typing import List, Tuple, Callable, Any -import string import re -import os +import string from collections import Counter -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +from typing import Callable, List, Tuple + +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class HotpotQABenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): super().__init__(name, file_path, log_path) @@ -43,12 +40,7 @@ class HotpotQABenchmark(BaseBenchmark): f1 = (2 * precision * recall) / (precision + recall) return f1, prediction - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - retry=retry_if_exception_type(Exception), - reraise=True - ) + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True) async def _generate_output(self, graph, input_text): return await graph(input_text) @@ -63,7 +55,9 @@ class HotpotQABenchmark(BaseBenchmark): output, cost = await self._generate_output(graph, inputs) score, extracted_output = self.calculate_score(expected_output, output) - if score < 0.3: # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1 + if ( + score < 0.3 + ): # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1 self.log_mismatch(input_text, expected_output, output, extracted_output) return input_text, context_str, output, expected_output, score, cost diff --git a/metagpt/ext/aflow/benchmark/humaneval.py b/metagpt/ext/aflow/benchmark/humaneval.py index 2fea25bbe..53bc7cfde 100644 --- a/metagpt/ext/aflow/benchmark/humaneval.py +++ b/metagpt/ext/aflow/benchmark/humaneval.py @@ -1,17 +1,13 @@ -import os -import time -import json import asyncio import threading -from datetime import datetime -from typing import List, Tuple, Callable, Dict, Any, Optional -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +import time +from typing import Any, Callable, Dict, List, Optional, Tuple +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed -import pandas as pd - -from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark from metagpt.actions.code_sanitize import sanitize +from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class HumanEvalBenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): @@ -52,72 +48,86 @@ class HumanEvalBenchmark(BaseBenchmark): solution = sanitize(code=solution, entrypoint=entry_point) try: global_dict = { - 'math': __import__('math'), - 'hashlib': __import__('hashlib'), - 're': __import__('re'), - 'List': List, - 'Dict': Dict, - 'Tuple': Tuple, - 'Optional': Optional, - 'Any': Any + "math": __import__("math"), + "hashlib": __import__("hashlib"), + "re": __import__("re"), + "List": List, + "Dict": Dict, + "Tuple": Tuple, + "Optional": Optional, + "Any": Any, } - + # Add handling for special cases if entry_point == "decode_cyclic": - solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution + solution = ( + '\n\ndef encode_cyclic(s: str):\n """\n returns encoded string by cycling groups of three characters.\n """\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return "".join(groups)' + + "\n\n" + + solution + ) elif entry_point == "decode_shift": - solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution + solution = ( + '\n\ndef encode_shift(s: str):\n """\n returns encoded string by shifting every character by 5 in the alphabet.\n """\n return "".join([chr(((ord(ch) + 5 - ord("a")) % 26) + ord("a")) for ch in s])\n\n\n' + + solution + ) elif entry_point == "find_zero": - solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution - + solution = ( + "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + + solution + ) + exec(solution, global_dict) - + if entry_point not in global_dict: raise ValueError(f"Function {entry_point} is not defined in the solution.") - + exec(test, global_dict) - + check = global_dict["check"] - + result = self.run_with_timeout(check, (global_dict[entry_point],), 15) - + if result is None: result = (self.PASS, "The solution passed all test cases.") - + except self.TimeoutError: - result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.") + result = ( + self.FAIL, + "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.", + ) except Exception as e: error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}" result = (self.FAIL, error_message) - - with open('error.log', 'a', encoding='utf-8') as log_file: + + with open("error.log", "a", encoding="utf-8") as log_file: log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n") - + return result - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - retry=retry_if_exception_type(Exception), - reraise=True - ) + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True) async def _generate_output(self, graph, prompt, entry_point): # Generate output with a timeout of 60 seconds return await asyncio.wait_for(graph(prompt, entry_point), timeout=60) async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]: input_text = data["prompt"] - expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"] + expected_output = ( + "\nCorrect Solution:\ndef " + + data["entry_point"] + + "(params you should put here):" + + "\n\n" + + data["canonical_solution"] + ) try: # Generate prediction using the graph function prediction, cost = await self._generate_output(graph, input_text, data["entry_point"]) - + # Check the solution ret = self.check_solution(prediction, data["test"], data["entry_point"]) test_case_details = ret[1] expected_output = test_case_details + expected_output - + # Calculate score based on the check result score = 1.0 if ret[0] == self.PASS else 0.0 diff --git a/metagpt/ext/aflow/benchmark/math.py b/metagpt/ext/aflow/benchmark/math.py index 6ea6189f3..475f9e7bd 100644 --- a/metagpt/ext/aflow/benchmark/math.py +++ b/metagpt/ext/aflow/benchmark/math.py @@ -1,16 +1,16 @@ import re +from math import isclose +from typing import Any, Callable, List, Tuple + import regex from sympy import N, simplify from sympy.parsing.latex import parse_latex from sympy.parsing.sympy_parser import parse_expr -from math import isclose -import multiprocessing -from typing import Any, Callable, Tuple, List -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type - +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class MATHBenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): super().__init__(name, file_path, log_path) @@ -21,7 +21,7 @@ class MATHBenchmark(BaseBenchmark): if boxed_matches: return boxed_matches[-1].strip() - sentence_end_pattern = r'(? Tuple[str, str, str, int, float]: input_text = problem["problem"] expected_output = problem["solution"] @@ -125,6 +118,5 @@ class MATHBenchmark(BaseBenchmark): print(f"Maximum retries reached. Skipping this sample. Error: {e}") return input_text, str(e), expected_output, 0.0, 0.0 - def get_result_columns(self) -> List[str]: return ["question", "prediction", "expected_output", "score", "cost"] diff --git a/metagpt/ext/aflow/benchmark/mbpp.py b/metagpt/ext/aflow/benchmark/mbpp.py index d94c57cc5..67bd7f255 100644 --- a/metagpt/ext/aflow/benchmark/mbpp.py +++ b/metagpt/ext/aflow/benchmark/mbpp.py @@ -1,15 +1,13 @@ -import os -import json -import time -import asyncio import threading -from datetime import datetime -from typing import List, Tuple, Callable, Any, Optional, Dict -from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +import time +from typing import Any, Callable, Dict, List, Optional, Tuple + +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from metagpt.actions.code_sanitize import sanitize from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark + class MBPPBenchmark(BaseBenchmark): def __init__(self, name: str, file_path: str, log_path: str): super().__init__(name, file_path, log_path) @@ -49,47 +47,45 @@ class MBPPBenchmark(BaseBenchmark): solution = sanitize(code=solution, entrypoint=entry_point) try: global_dict = { - 'math': __import__('math'), - 'hashlib': __import__('hashlib'), - 're': __import__('re'), - 'List': List, - 'Dict': Dict, - 'Tuple': Tuple, - 'Optional': Optional, - 'Any': Any + "math": __import__("math"), + "hashlib": __import__("hashlib"), + "re": __import__("re"), + "List": List, + "Dict": Dict, + "Tuple": Tuple, + "Optional": Optional, + "Any": Any, } - + exec(solution, global_dict) - + if entry_point not in global_dict: raise ValueError(f"Function {entry_point} is not defined in the solution.") - + exec(test, global_dict) - + check = global_dict["check"] - + result = self.run_with_timeout(check, 15) - + if result is None: result = (self.PASS, "The solution passed all test cases.") - + except self.TimeoutError: - result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.") + result = ( + self.FAIL, + "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.", + ) except Exception as e: error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}" result = (self.FAIL, error_message) - - with open('error.log', 'a', encoding='utf-8') as log_file: + + with open("error.log", "a", encoding="utf-8") as log_file: log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n") - + return result - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - retry=retry_if_exception_type(Exception), - reraise=True - ) + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True) async def _generate_output(self, graph, prompt, entry_point): return await graph(prompt, entry_point) @@ -100,12 +96,12 @@ class MBPPBenchmark(BaseBenchmark): try: # Generate prediction using the graph function prediction, cost = await self._generate_output(graph, input_text, data["entry_point"]) - + # Check the solution ret = self.check_solution(prediction, data["test"], data["entry_point"]) test_case_details = ret[1] expected_output = test_case_details + "\nCorrect Solution:" + data["code"] - + # Calculate score based on the check result score = 1.0 if ret[0] == self.PASS else 0.0 diff --git a/metagpt/ext/aflow/benchmark/utils.py b/metagpt/ext/aflow/benchmark/utils.py index a9e0bb8a6..e620a52a3 100644 --- a/metagpt/ext/aflow/benchmark/utils.py +++ b/metagpt/ext/aflow/benchmark/utils.py @@ -1,7 +1,9 @@ -import os import json +import os + import numpy as np + def generate_random_indices(n, n_samples, test=False): """ 生成随机索引 @@ -18,36 +20,39 @@ def generate_random_indices(n, n_samples, test=False): else: return indices[:n_samples] + def split_data_set(file_path, samples, test=False): data = [] - with open(file_path, 'r') as file: + with open(file_path, "r") as file: for line in file: data.append(json.loads(line)) random_indices = generate_random_indices(len(data), samples, test) data = [data[i] for i in random_indices] return data + # save data into a jsonl file def save_data(data, file_path): - with open(file_path, 'w') as file: + with open(file_path, "w") as file: for d in data: - file.write(json.dumps(d) + '\n') + file.write(json.dumps(d) + "\n") + def log_mismatch(problem, expected_output, prediction, predicted_number, path): log_data = { "question": problem, "right_answer": expected_output, "model_output": prediction, - "extracted_output": predicted_number + "extracted_output": predicted_number, } - log_file = os.path.join(path, 'log.json') + log_file = os.path.join(path, "log.json") # 检查log文件是否已经存在 if os.path.exists(log_file): # 如果存在,加载现有的日志数据 - with open(log_file, 'r', encoding='utf-8') as f: + with open(log_file, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError: @@ -60,5 +65,5 @@ def log_mismatch(problem, expected_output, prediction, predicted_number, path): data.append(log_data) # 将数据写回到log.json文件 - with open(log_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=4, ensure_ascii=False) \ No newline at end of file + with open(log_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=4, ensure_ascii=False) diff --git a/metagpt/ext/aflow/data/download_data.py b/metagpt/ext/aflow/data/download_data.py index f3727aea1..4e176cd56 100644 --- a/metagpt/ext/aflow/data/download_data.py +++ b/metagpt/ext/aflow/data/download_data.py @@ -4,64 +4,66 @@ # @Desc : Download and extract dataset files import os -import requests import tarfile +from typing import Dict + +import requests from tqdm import tqdm -from typing import List, Dict + def download_file(url: str, filename: str) -> None: """Download a file from the given URL and show progress.""" response = requests.get(url, stream=True) - total_size = int(response.headers.get('content-length', 0)) + total_size = int(response.headers.get("content-length", 0)) block_size = 1024 - progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True) - - with open(filename, 'wb') as file: + progress_bar = tqdm(total=total_size, unit="iB", unit_scale=True) + + with open(filename, "wb") as file: for data in response.iter_content(block_size): size = file.write(data) progress_bar.update(size) progress_bar.close() + def extract_tar_gz(filename: str, extract_path: str) -> None: """Extract a tar.gz file to the specified path.""" - with tarfile.open(filename, 'r:gz') as tar: + with tarfile.open(filename, "r:gz") as tar: tar.extractall(path=extract_path) + def process_dataset(url: str, filename: str, extract_path: str) -> None: """Download, extract, and clean up a dataset.""" print(f"Downloading {filename}...") download_file(url, filename) - + print(f"Extracting {filename}...") extract_tar_gz(filename, extract_path) - + print(f"{filename} download and extraction completed.") - + os.remove(filename) print(f"Removed {filename}") + # Define the datasets to be downloaded # Users can modify this list to choose which datasets to download datasets_to_download: Dict[str, Dict[str, str]] = { "datasets": { "url": "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e", "filename": "aflow_data.tar.gz", - "extract_path": "metagpt/ext/aflow/data" - }, - "results": { - "url": "", # 请填入正确的URL - "filename": "result.tar.gz", - "extract_path": "metagpt/ext/aflow/data/results" + "extract_path": "metagpt/ext/aflow/data", }, + "results": {"url": "", "filename": "result.tar.gz", "extract_path": "metagpt/ext/aflow/data/results"}, # 请填入正确的URL "initial_rounds": { "url": "https://drive.google.com/uc?export=download&id=1UBoW4WBWjX2gs4I_jq3ALdXeLdwDJMdP", "filename": "initial_rounds.tar.gz", - "extract_path": "metagpt/ext/aflow/scripts/optimized" - } + "extract_path": "metagpt/ext/aflow/scripts/optimized", + }, } + def download(datasets): """Main function to process all selected datasets.""" for dataset_name in datasets: dataset = datasets_to_download[dataset_name] - process_dataset(dataset['url'], dataset['filename'], dataset['extract_path']) \ No newline at end of file + process_dataset(dataset["url"], dataset["filename"], dataset["extract_path"]) diff --git a/metagpt/ext/aflow/scripts/evaluator.py b/metagpt/ext/aflow/scripts/evaluator.py index 873f4ad9b..ecc009a5c 100644 --- a/metagpt/ext/aflow/scripts/evaluator.py +++ b/metagpt/ext/aflow/scripts/evaluator.py @@ -3,20 +3,20 @@ # @Author : all # @Desc : Evaluation for different datasets -from typing import Literal, Tuple, Optional, Dict -import asyncio +from typing import Dict, Literal, Tuple from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark -from metagpt.ext.aflow.benchmark.gsm8k import GSM8KBenchmark -from metagpt.ext.aflow.benchmark.math import MATHBenchmark -from metagpt.ext.aflow.benchmark.humaneval import HumanEvalBenchmark -from metagpt.ext.aflow.benchmark.hotpotqa import HotpotQABenchmark -from metagpt.ext.aflow.benchmark.mbpp import MBPPBenchmark from metagpt.ext.aflow.benchmark.drop import DROPBenchmark +from metagpt.ext.aflow.benchmark.gsm8k import GSM8KBenchmark +from metagpt.ext.aflow.benchmark.hotpotqa import HotpotQABenchmark +from metagpt.ext.aflow.benchmark.humaneval import HumanEvalBenchmark +from metagpt.ext.aflow.benchmark.math import MATHBenchmark +from metagpt.ext.aflow.benchmark.mbpp import MBPPBenchmark # If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"] + class Evaluator: """ Complete the evaluation for different datasets here @@ -33,7 +33,9 @@ class Evaluator: "DROP": DROPBenchmark, } - async def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False) -> Tuple[float, float, float]: + async def graph_evaluate( + self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False + ) -> Tuple[float, float, float]: if dataset not in self.dataset_configs: raise ValueError(f"Unsupported dataset: {dataset}") @@ -44,7 +46,7 @@ class Evaluator: # Use params to configure the graph and benchmark configured_graph = await self._configure_graph(dataset, graph, params) - va_list = [1,2,3] # Use va_list from params, or use default value if not provided + va_list = [1, 2, 3] # Use va_list from params, or use default value if not provided return await benchmark.run_evaluation(configured_graph, va_list) async def _configure_graph(self, dataset, graph, params: dict): diff --git a/metagpt/ext/aflow/scripts/operator.py b/metagpt/ext/aflow/scripts/operator.py index f94a0db32..0d1354210 100644 --- a/metagpt/ext/aflow/scripts/operator.py +++ b/metagpt/ext/aflow/scripts/operator.py @@ -2,41 +2,42 @@ # @Date : 6/27/2024 17:36 PM # @Author : didi # @Desc : operator demo of aflow +import asyncio +import concurrent.futures import random import sys -import asyncio import traceback from collections import Counter from typing import Dict, List, Tuple -import concurrent.futures from tenacity import retry, stop_after_attempt, wait_fixed -from metagpt.ext.aflow.scripts.utils import extract_test_cases_from_jsonl +from metagpt.actions.action_node import ActionNode from metagpt.ext.aflow.scripts.operator_an import ( + AnswerGenerateOp, + CodeGenerateOp, FormatOp, GenerateOp, - CodeGenerateOp, - AnswerGenerateOp, - ScEnsembleOp, - ReflectionTestOp, MdEnsembleOp, + ReflectionTestOp, ReviewOp, ReviseOp, - + ScEnsembleOp, ) from metagpt.ext.aflow.scripts.prompts.prompt import ( - FORMAT_PROMPT, ANSWER_GENERATION_PROMPT, - SC_ENSEMBLE_PROMPT, + FORMAT_PROMPT, + MD_ENSEMBLE_PROMPT, PYTHON_CODE_VERIFIER_PROMPT, REFLECTION_ON_PUBLIC_TEST_PROMPT, - MD_ENSEMBLE_PROMPT, REVIEW_PROMPT, REVISE_PROMPT, + SC_ENSEMBLE_PROMPT, +) +from metagpt.ext.aflow.scripts.utils import ( + extract_test_cases_from_jsonl, + test_case_2_test_function, ) -from metagpt.ext.aflow.scripts.utils import test_case_2_test_function -from metagpt.actions.action_node import ActionNode from metagpt.llm import LLM from metagpt.logs import logger @@ -66,7 +67,8 @@ class Custom(Operator): prompt = instruction + input response = await self._fill_node(GenerateOp, prompt, mode="single_fill") return response - + + class AnswerGenerate(Operator): def __init__(self, llm: LLM, name: str = "AnswerGenerate"): super().__init__(llm, name) @@ -76,6 +78,7 @@ class AnswerGenerate(Operator): response = await self._fill_node(AnswerGenerateOp, prompt, mode="xml_fill") return response + class CustomCodeGenerate(Operator): def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"): super().__init__(llm, name) @@ -112,15 +115,27 @@ class ScEnsemble(Operator): return {"response": solutions[answer_mapping[answer]]} + def run_code(code): try: # Create a new global namespace global_namespace = {} disallowed_imports = [ - "os", "sys", "subprocess", "multiprocessing", - "matplotlib", "seaborn", "plotly", "bokeh", "ggplot", - "pylab", "tkinter", "PyQt5", "wx", "pyglet" + "os", + "sys", + "subprocess", + "multiprocessing", + "matplotlib", + "seaborn", + "plotly", + "bokeh", + "ggplot", + "pylab", + "tkinter", + "PyQt5", + "wx", + "pyglet", ] # Check for prohibited imports @@ -132,8 +147,8 @@ def run_code(code): # Use exec to execute the code exec(code, global_namespace) # Assume the code defines a function named 'solve' - if 'solve' in global_namespace and callable(global_namespace['solve']): - result = global_namespace['solve']() + if "solve" in global_namespace and callable(global_namespace["solve"]): + result = global_namespace["solve"]() return "Success", str(result) else: return "Error", "Function 'solve' not found" @@ -141,7 +156,7 @@ def run_code(code): exc_type, exc_value, exc_traceback = sys.exc_info() tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback) return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}" - + class Programmer(Operator): def __init__(self, llm: LLM, name: str = "Programmer"): @@ -170,11 +185,7 @@ class Programmer(Operator): """ Asynchronous method to generate code. """ - prompt = PYTHON_CODE_VERIFIER_PROMPT.format( - problem=problem, - analysis=analysis, - feedback=feedback - ) + prompt = PYTHON_CODE_VERIFIER_PROMPT.format(problem=problem, analysis=analysis, feedback=feedback) response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve") return response @@ -208,9 +219,8 @@ class Test(Operator): super().__init__(llm, name) def exec_code(self, solution, entry_point): - test_cases = extract_test_cases_from_jsonl(entry_point) - + fail_cases = [] for test_case in test_cases: test_code = test_case_2_test_function(solution, test_case, entry_point) @@ -239,9 +249,7 @@ class Test(Operator): else: return "no error" - async def __call__( - self, problem, solution, entry_point, test_loop: int = 3 - ): + async def __call__(self, problem, solution, entry_point, test_loop: int = 3): """ "Test": { "description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information", @@ -271,13 +279,13 @@ class Test(Operator): ) response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill") solution = response["reflection_and_solution"] - + result = self.exec_code(solution, entry_point) if result == "no error": return {"result": True, "solution": solution} else: return {"result": False, "solution": solution} - + class Format(Operator): def __init__(self, llm: LLM, name: str = "Format"): @@ -286,7 +294,7 @@ class Format(Operator): async def __call__(self, problem, solution, mode: str = None): prompt = FORMAT_PROMPT.format(problem_description=problem, solution=solution) response = await self._fill_node(FormatOp, prompt, mode) - return response + return response class Review(Operator): @@ -298,6 +306,7 @@ class Review(Operator): response = await self._fill_node(ReviewOp, prompt, mode="xml_fill") return response + class Revise(Operator): def __init__(self, llm: LLM, name: str = "Revise"): super().__init__(llm, name) @@ -305,7 +314,7 @@ class Revise(Operator): async def __call__(self, problem, solution, feedback, mode: str = None): prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback) response = await self._fill_node(ReviseOp, prompt, mode="xml_fill") - return response + return response class MdEnsemble(Operator): @@ -348,4 +357,4 @@ class MdEnsemble(Operator): most_frequent_index = Counter(all_responses).most_common(1)[0][0] final_answer = solutions[most_frequent_index] - return {"solution": final_answer} + return {"solution": final_answer} diff --git a/metagpt/ext/aflow/scripts/operator_an.py b/metagpt/ext/aflow/scripts/operator_an.py index 3898a19c5..d0201dea2 100644 --- a/metagpt/ext/aflow/scripts/operator_an.py +++ b/metagpt/ext/aflow/scripts/operator_an.py @@ -9,31 +9,46 @@ from pydantic import BaseModel, Field class GenerateOp(BaseModel): response: str = Field(default="", description="Your solution for this problem") + class CodeGenerateOp(BaseModel): code: str = Field(default="", description="Your complete code solution for this problem") + class AnswerGenerateOp(BaseModel): thought: str = Field(default="", description="The step by step thinking process") answer: str = Field(default="", description="The final answer to the question") + class FormatOp(BaseModel): solution: str = Field(default="", description="Your formatted answer for this problem") + class ScEnsembleOp(BaseModel): thought: str = Field(default="", description="The thought of the most consistent solution.") solution_letter: str = Field(default="", description="The letter of most consistent solution.") + class ReflectionTestOp(BaseModel): - reflection_and_solution: str = Field(default="", description="Corrective solution for code execution errors or test case failures") + reflection_and_solution: str = Field( + default="", description="Corrective solution for code execution errors or test case failures" + ) + class MdEnsembleOp(BaseModel): thought: str = Field(default="", description="Step-by-step analysis of the solutions to determine the best one.") solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).") + class ReviewOp(BaseModel): - review_result: bool = Field(default=False, description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'") - feedback: str = Field(default="",description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.") + review_result: bool = Field( + default=False, + description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'", + ) + feedback: str = Field( + default="", + description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.", + ) + class ReviseOp(BaseModel): solution: str = Field(default="", description="Based on the feedback, revised solution for this problem") - diff --git a/metagpt/ext/aflow/scripts/optimizer.py b/metagpt/ext/aflow/scripts/optimizer.py index f0bb260f3..63da6a56c 100644 --- a/metagpt/ext/aflow/scripts/optimizer.py +++ b/metagpt/ext/aflow/scripts/optimizer.py @@ -10,13 +10,13 @@ from typing import List, Literal from pydantic import BaseModel, Field from metagpt.actions.action_node import ActionNode -from metagpt.provider.llm_provider_registry import create_llm_instance -from metagpt.logs import logger -from metagpt.ext.aflow.scripts.optimizer_utils.graph_utils import GraphUtils -from metagpt.ext.aflow.scripts.optimizer_utils.data_utils import DataUtils -from metagpt.ext.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils -from metagpt.ext.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils from metagpt.ext.aflow.scripts.optimizer_utils.convergence_utils import ConvergenceUtils +from metagpt.ext.aflow.scripts.optimizer_utils.data_utils import DataUtils +from metagpt.ext.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils +from metagpt.ext.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils +from metagpt.ext.aflow.scripts.optimizer_utils.graph_utils import GraphUtils +from metagpt.logs import logger +from metagpt.provider.llm_provider_registry import create_llm_instance DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"] QuestionType = Literal["math", "code", "qa"] @@ -31,17 +31,17 @@ class GraphOptimize(BaseModel): class Optimizer: def __init__( - self, - dataset: DatasetType, - question_type: QuestionType, - opt_llm_config, - exec_llm_config, - operators: List, - sample: int, - check_convergence: bool = False, - optimized_path: str = None, - initial_round: int = 1, - max_rounds: int = 20 + self, + dataset: DatasetType, + question_type: QuestionType, + opt_llm_config, + exec_llm_config, + operators: List, + sample: int, + check_convergence: bool = False, + optimized_path: str = None, + initial_round: int = 1, + max_rounds: int = 20, ) -> None: self.optimize_llm_config = opt_llm_config self.optimize_llm = create_llm_instance(self.optimize_llm_config) @@ -68,8 +68,8 @@ class Optimizer: def optimize(self, mode: OptimizerType = "Graph"): if mode == "Test": - test_n = 3 # validation datasets's execution number - for i in range(test_n): + test_n = 3 # validation datasets's execution number + for i in range(test_n): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) score = loop.run_until_complete(self.test()) @@ -99,15 +99,16 @@ class Optimizer: if retry_count < max_retries: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + self.round += 1 logger.info(f"Score for round {self.round}: {score}") converged, convergence_round, final_round = self.convergence_utils.check_convergence(top_k=3) if converged and self.check_convergence: - - logger.info(f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}") + logger.info( + f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}" + ) # Print average scores and standard deviations for each round self.convergence_utils.print_results() break @@ -115,7 +116,7 @@ class Optimizer: time.sleep(5) async def _optimize_graph(self): - validation_n = 2 # validation datasets's execution number + validation_n = 2 # validation datasets's execution number graph_path = f"{self.root_path}/workflows" data = self.data_utils.load_results(graph_path) @@ -152,8 +153,9 @@ class Optimizer: response = await self.graph_utils.get_graph_optimize_response(graph_optimize_node) # Check if the modification meets the conditions - check = self.experience_utils.check_modification(processed_experience, response["modification"], - sample["round"]) + check = self.experience_utils.check_modification( + processed_experience, response["modification"], sample["round"] + ) # If `check` is True, break the loop; otherwise, regenerate the graph if check: @@ -175,7 +177,7 @@ class Optimizer: return avg_score async def test(self): - rounds = [5] # You can choose the rounds you want to test here. + rounds = [5] # You can choose the rounds you want to test here. data = [] graph_path = f"{self.root_path}/workflows_test" @@ -187,11 +189,9 @@ class Optimizer: directory = self.graph_utils.create_round_directory(graph_path, round) self.graph = self.graph_utils.load_graph(round, graph_path) - score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test( - self, directory, is_test=True - ) + score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test(self, directory, is_test=True) new_data = self.data_utils.create_result_data(round, score, avg_cost, total_cost) data.append(new_data) - self.data_utils.save_results(json_file_path, data) \ No newline at end of file + self.data_utils.save_results(json_file_path, data) diff --git a/metagpt/ext/aflow/scripts/optimizer_utils/convergence_utils.py b/metagpt/ext/aflow/scripts/optimizer_utils/convergence_utils.py index 0f990356c..246a94798 100644 --- a/metagpt/ext/aflow/scripts/optimizer_utils/convergence_utils.py +++ b/metagpt/ext/aflow/scripts/optimizer_utils/convergence_utils.py @@ -1,13 +1,16 @@ # -*- coding: utf-8 -*- # @Date : 9/23/2024 10:00 AM # @Author : Issac -# @Desc : +# @Desc : -import numpy as np import json import os + +import numpy as np + from metagpt.logs import logger + class ConvergenceUtils: def __init__(self, root_path): self.root_path = root_path @@ -27,11 +30,11 @@ class ConvergenceUtils: # If file doesn't exist, create a new one with an empty list if not os.path.exists(result_file): - with open(result_file, 'w') as file: + with open(result_file, "w") as file: json.dump([], file) # Read file and return data - with open(result_file, 'r') as file: + with open(result_file, "r") as file: return json.load(file) def process_rounds(self): @@ -41,8 +44,8 @@ class ConvergenceUtils: self.data = self.load_data(root_path=self.root_path) rounds = {} for entry in self.data: - round_number = entry['round'] - score = entry['score'] + round_number = entry["round"] + score = entry["score"] if round_number not in rounds: rounds[round_number] = [] rounds[round_number].append(score) @@ -77,19 +80,23 @@ class ConvergenceUtils: sigma_Y_previous = None # Standard error of Y value from previous round for i in range(len(self.avg_scores)): # Dynamically select top_k from current round and all previous rounds - top_k_indices = np.argsort(self.avg_scores[:i + 1])[::-1][:top_k] # Select top k indices by descending average score + top_k_indices = np.argsort(self.avg_scores[: i + 1])[::-1][ + :top_k + ] # Select top k indices by descending average score top_k_scores = [self.avg_scores[j] for j in top_k_indices] # Get list of top k scores - top_k_stds = [self.stds[j] for j in top_k_indices] # Get list of standard deviations corresponding to top k scores + top_k_stds = [ + self.stds[j] for j in top_k_indices + ] # Get list of standard deviations corresponding to top k scores # Calculate mean of top k scores for current round, i.e., Y_current Y_current = np.mean(top_k_scores) # Calculate standard error of Y_current (sigma_Y_current), representing score dispersion - sigma_Y_current = np.sqrt(np.sum([s ** 2 for s in top_k_stds]) / (top_k ** 2)) + sigma_Y_current = np.sqrt(np.sum([s**2 for s in top_k_stds]) / (top_k**2)) # If not the first round, calculate change in Y (Delta_Y) and corresponding standard error if previous_Y is not None: # Calculate Y difference between current round and previous round Delta_Y = Y_current - previous_Y # Calculate standard error of Y difference (sigma_Delta_Y) - sigma_Delta_Y = np.sqrt(sigma_Y_current ** 2 + sigma_Y_previous ** 2) + sigma_Delta_Y = np.sqrt(sigma_Y_current**2 + sigma_Y_previous**2) # Check if Y change is within acceptable confidence interval, i.e., convergence condition if abs(Delta_Y) <= z * sigma_Delta_Y: convergence_count += 1 @@ -105,7 +112,6 @@ class ConvergenceUtils: # If convergence condition not met, return not converged return False, None, None - def print_results(self): """ Print average score and standard deviation for all rounds. @@ -114,8 +120,8 @@ class ConvergenceUtils: for i, (avg_score, std) in enumerate(zip(self.avg_scores, self.stds), 1): logger.info(f"Round {i}: Average Score = {avg_score:.4f}, Standard Deviation = {std:.4f}") -if __name__ == "__main__": +if __name__ == "__main__": # Use this class and specify top_k checker = ConvergenceUtils("path") # For example, set top_k=5 converged, convergence_round, final_round = checker.check_convergence() diff --git a/metagpt/ext/aflow/scripts/optimizer_utils/data_utils.py b/metagpt/ext/aflow/scripts/optimizer_utils/data_utils.py index 4732f7c92..af891e565 100644 --- a/metagpt/ext/aflow/scripts/optimizer_utils/data_utils.py +++ b/metagpt/ext/aflow/scripts/optimizer_utils/data_utils.py @@ -1,9 +1,11 @@ +import datetime import json import os import random -import datetime + import numpy as np import pandas as pd + from metagpt.logs import logger @@ -15,7 +17,7 @@ class DataUtils: def load_results(self, path: str) -> list: result_path = os.path.join(path, "results.json") if os.path.exists(result_path): - with open(result_path, 'r') as json_file: + with open(result_path, "r") as json_file: try: return json.load(json_file) except json.JSONDecodeError: @@ -94,7 +96,7 @@ class DataUtils: if not os.path.exists(log_dir): return "" # 如果文件不存在,返回空字符串 logger.info(log_dir) - with open(log_dir, 'r', encoding='utf-8') as f: + with open(log_dir, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): @@ -119,16 +121,10 @@ class DataUtils: def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict: now = datetime.datetime.now() - return { - "round": round, - "score": score, - "avg_cost": avg_cost, - "total_cost": total_cost, - "time": now - } + return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now} def save_results(self, json_file_path: str, data: list): - with open(json_file_path, 'w') as json_file: + with open(json_file_path, "w") as json_file: json.dump(data, json_file, default=str, indent=4) def _load_scores(self, path=None, mode="Graph"): @@ -140,17 +136,14 @@ class DataUtils: result_file = os.path.join(rounds_dir, "results.json") self.top_scores = [] - with open(result_file, 'r', encoding='utf-8') as file: + with open(result_file, "r", encoding="utf-8") as file: data = json.load(file) df = pd.DataFrame(data) - scores_per_round = df.groupby('round')['score'].mean().to_dict() + scores_per_round = df.groupby("round")["score"].mean().to_dict() for round_number, average_score in scores_per_round.items(): - self.top_scores.append({ - "round": round_number, - "score": average_score - }) + self.top_scores.append({"round": round_number, "score": average_score}) self.top_scores.sort(key=lambda x: x["score"], reverse=True) diff --git a/metagpt/ext/aflow/scripts/optimizer_utils/evaluation_utils.py b/metagpt/ext/aflow/scripts/optimizer_utils/evaluation_utils.py index 79d015666..77683017e 100644 --- a/metagpt/ext/aflow/scripts/optimizer_utils/evaluation_utils.py +++ b/metagpt/ext/aflow/scripts/optimizer_utils/evaluation_utils.py @@ -12,9 +12,11 @@ class EvaluationUtils: for i in range(validation_n): score, avg_cost, total_cost = await evaluator.graph_evaluate( - optimizer.dataset, optimizer.graph, + optimizer.dataset, + optimizer.graph, {"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config}, - directory, is_test=False + directory, + is_test=False, ) new_data = optimizer.data_utils.create_result_data(optimizer.round, score, avg_cost, total_cost) @@ -31,9 +33,11 @@ class EvaluationUtils: for i in range(validation_n): score, avg_cost, total_cost = await evaluator.graph_evaluate( - optimizer.dataset, optimizer.graph, + optimizer.dataset, + optimizer.graph, {"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config}, - directory, is_test=False + directory, + is_test=False, ) cur_round = optimizer.round + 1 if initial is False else optimizer.round @@ -51,7 +55,9 @@ class EvaluationUtils: async def evaluate_graph_test(self, optimizer, directory, is_test=True): evaluator = Evaluator(eval_path=directory) return await evaluator.graph_evaluate( - optimizer.dataset, optimizer.graph, + optimizer.dataset, + optimizer.graph, {"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config}, - directory, is_test=is_test + directory, + is_test=is_test, ) diff --git a/metagpt/ext/aflow/scripts/optimizer_utils/experience_utils.py b/metagpt/ext/aflow/scripts/optimizer_utils/experience_utils.py index 74ca8d23f..cffd8b522 100644 --- a/metagpt/ext/aflow/scripts/optimizer_utils/experience_utils.py +++ b/metagpt/ext/aflow/scripts/optimizer_utils/experience_utils.py @@ -1,8 +1,10 @@ import json import os from collections import defaultdict + from metagpt.logs import logger + class ExperienceUtils: def __init__(self, root_path: str): self.root_path = root_path @@ -32,12 +34,12 @@ class ExperienceUtils: if data["succeed"]: experience_data[father_node]["success"][round_number] = { "modification": data["modification"], - "score": data["after"] + "score": data["after"], } else: experience_data[father_node]["failure"][round_number] = { "modification": data["modification"], - "score": data["after"] + "score": data["after"], } except Exception as e: logger.info(f"Error processing {round_dir}: {str(e)}") @@ -69,10 +71,10 @@ class ExperienceUtils: experience_data = processed_experience.get(sample_round) if experience_data: for key, value in experience_data["failure"].items(): - if value['modification'] == modification: + if value["modification"] == modification: return False for key, value in experience_data["success"].items(): - if value['modification'] == modification: + if value["modification"] == modification: return False return True else: diff --git a/metagpt/ext/aflow/scripts/optimizer_utils/graph_utils.py b/metagpt/ext/aflow/scripts/optimizer_utils/graph_utils.py index 0471ad306..a0ebe9b26 100644 --- a/metagpt/ext/aflow/scripts/optimizer_utils/graph_utils.py +++ b/metagpt/ext/aflow/scripts/optimizer_utils/graph_utils.py @@ -1,17 +1,17 @@ +import json import os import re -import json -from typing import List -import traceback import time -from metagpt.logs import logger +import traceback +from typing import List from metagpt.ext.aflow.scripts.prompts.optimize_prompt import ( WORKFLOW_CUSTOM_USE, WORKFLOW_INPUT, WORKFLOW_OPTIMIZE_PROMPT, - WORKFLOW_TEMPLATE + WORKFLOW_TEMPLATE, ) +from metagpt.logs import logger class GraphUtils: @@ -72,11 +72,24 @@ class GraphUtils: interface = matched_data["interface"] return f"{id}. {operator_name}: {desc}, with interface {interface})." - def create_graph_optimize_prompt(self, experience: str, score: float, graph: str, prompt: str, - operator_description: str, type: str, log_data: str) -> str: + def create_graph_optimize_prompt( + self, + experience: str, + score: float, + graph: str, + prompt: str, + operator_description: str, + type: str, + log_data: str, + ) -> str: graph_input = WORKFLOW_INPUT.format( - experience=experience, score=score, graph=graph, prompt=prompt, operator_description=operator_description, - type=type, log=log_data + experience=experience, + score=score, + graph=graph, + prompt=prompt, + operator_description=operator_description, + type=type, + log=log_data, ) graph_system = WORKFLOW_OPTIMIZE_PROMPT.format(type=type) return graph_input + WORKFLOW_CUSTOM_USE + graph_system diff --git a/metagpt/ext/aflow/scripts/prompts/prompt.py b/metagpt/ext/aflow/scripts/prompts/prompt.py index ddf1f9424..16bf78af8 100644 --- a/metagpt/ext/aflow/scripts/prompts/prompt.py +++ b/metagpt/ext/aflow/scripts/prompts/prompt.py @@ -86,4 +86,4 @@ solution: {solution} feedback: {feedback} Ensure the output code is self-contained, and without any additional text or test cases. -""" \ No newline at end of file +""" diff --git a/metagpt/ext/aflow/scripts/utils.py b/metagpt/ext/aflow/scripts/utils.py index d74bea1b5..f69eaf4ca 100644 --- a/metagpt/ext/aflow/scripts/utils.py +++ b/metagpt/ext/aflow/scripts/utils.py @@ -8,6 +8,7 @@ import json import re from typing import Any, List, Tuple + def extract_task_id(task_id: str) -> int: """Extract the numeric part of the task_id.""" match = re.search(r"/(\d+)", task_id) @@ -52,23 +53,21 @@ def parse_python_literal(s): return s -def extract_test_cases_from_jsonl( - entry_point: str, dataset: str = "HumanEval" -): +def extract_test_cases_from_jsonl(entry_point: str, dataset: str = "HumanEval"): if dataset == "HumanEval": file_path = "metagpt/ext/aflow/data/humaneval_public_test.jsonl" - # Retain the original hardcoded test cases + # Retain the original hardcoded test cases hardcoded_cases = { - "find_zero": "", - "decode_cyclic": "", - "decode_shift": "", - "by_length":"", - "add":"", - "triangle_area":"", - "correct_bracketing":"", - "solve":"", - "sum_squares":"", - "starts_one_ends":"" + "find_zero": "", + "decode_cyclic": "", + "decode_shift": "", + "by_length": "", + "add": "", + "triangle_area": "", + "correct_bracketing": "", + "solve": "", + "sum_squares": "", + "starts_one_ends": "", } elif dataset == "MBPP": file_path = "metagpt/ext/aflow/data/mbpp_public_test.jsonl" @@ -80,7 +79,7 @@ def extract_test_cases_from_jsonl( "swap_List": "", "square_Sum": "", "sort_sublists": "", - "unique_sublists": "" + "unique_sublists": "", } # Check if there are hardcoded test cases if entry_point in hardcoded_cases: @@ -93,7 +92,7 @@ def extract_test_cases_from_jsonl( if data.get("entry_point") == entry_point: return data.get("test") - return None + return None def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]: diff --git a/metagpt/ext/aflow/scripts/workflow.py b/metagpt/ext/aflow/scripts/workflow.py index 37133bcc2..d0f883071 100644 --- a/metagpt/ext/aflow/scripts/workflow.py +++ b/metagpt/ext/aflow/scripts/workflow.py @@ -5,12 +5,12 @@ from typing import Literal -from metagpt.ext.aflow.scripts.operator import Generate from metagpt.provider.llm_provider_registry import create_llm_instance from metagpt.utils.cost_manager import CostManager DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"] + class Workflow: def __init__( self, @@ -28,5 +28,3 @@ class Workflow: Implementation of the workflow """ raise NotImplementedError("This method should be implemented by the subclass") - - \ No newline at end of file