Merge pull request #1510 from didiforgithub/main

AFLOW
This commit is contained in:
Alexander Wu 2024-10-29 20:33:17 +08:00 committed by GitHub
commit dbfd37bb4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 3286 additions and 0 deletions

1
.gitignore vendored
View file

@ -188,3 +188,4 @@ cov.xml
*-structure.json
*.dot
.python-version
*.csv

View file

@ -185,4 +185,13 @@ ## Citation
archivePrefix={arXiv},
primaryClass={cs.AI}
}
@misc{zhang2024aflow,
title={AFlow: Automating Agentic Workflow Generation},
author={Jiayi Zhang and Jinyu Xiang and Zhaoyang Yu and Fengwei Teng and Xionghui Chen and Jiaqi Chen and Mingchen Zhuge and Xin Cheng and Sirui Hong and Jinlin Wang and Bingnan Zheng and Bang Liu and Yuyu Luo and Chenglin Wu},
year={2024},
eprint={2410.10762},
archivePrefix={arXiv},
primaryClass={cs.AI},
url={https://arxiv.org/abs/2410.10762},
}
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 302 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 889 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 542 KiB

89
examples/aflow/README.md Normal file
View file

@ -0,0 +1,89 @@
# AFlow: Automating Agentic Workflow Generation
AFlow is a framework for automatically generating and optimizing Agentic Workflows. It uses Monte Carlo tree search in a code-represented workflow space to find effective workflows, replacing manual development with machine effort. Our approach shows potential to outperform handcrafted workflows on various tasks.
[Read our paper on arXiv](https://arxiv.org/abs/2410.10762)
<p align="center">
<a href=""><img src="../../docs/resources/aflow/AFLOW-performance.jpg" alt="Performance Of AFlow" title="Performance of AFlow<sub>1</sub>" width="80%"></a>
</p>
## Framework Components
- **Node**: Basic unit of LLM invocation. See `metagpt/actions/action_node.py` for a flexible interface to control LLM, temperature, format, and prompt.
- **Operator**: Predefined combinations of Nodes to enhance search efficiency. Encapsulates common operations like Generate, Format, Review, Revise, Ensemble, Test, and Programmer. See `metagpt/ext/aflow/operator.py` for details. You can customize your own Operator by referencing the implementations in this code.
- **Workflow**: A sequence of LLM-invoking nodes connected by edges. Can be represented as graphs, neural networks, or code to express various execution structures. See `metagpt/ext/aflow/workflow.py` for our implementation.
- **Optimizer**: Uses LLMs within a Monte Carlo Tree Search variant to explore and refine workflows. Iteratively selects, expands, evaluates, and updates workflows based on performance. See `metagpt/ext/aflow/scripts/optimizer.py` for details.
- **Evaluator**: Assesses workflow performance on given tasks. Provides feedback to guide the optimization process towards more effective workflows. See `metagpt/ext/aflow/scripts/evaluator.py` for details.
<p align="center">
<a href=""><img src="../../docs/resources/aflow/AFLOW-method.jpg" alt="Framework of AFlow" title="Framework of AFlow <sub>1</sub>" width="80%"></a>
</p>
## Datasets
### Experimental Datasets
We conducted experiments on six datasets (HumanEval, MBPP, GSM8K, MATH, HotpotQA, DROP) and provide their evaluation code. The data can be found in this [datasets](https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e) link, or you can download them using `metagpt/ext/aflow/data/download_data.py`
<p align="center">
<a href=""><img src="../../docs/resources/aflow/AFLOW-experiment.jpg" alt="Performance Of AFlow" title="Performance Of AFlow <sub>1</sub>" width="80%"></a>
</p>
### Custom Datasets
For custom tasks, you can reference the code in the `metagpt/ext/aflow/benchmark` folder. Inherit the `BaseBenchmark` class and implement `evaluate_problem`, `calculate_score`, and `get_result_columns` to add your custom dataset benchmark. Then, add your benchmark name in `metagpt/ext/aflow/scripts/evaluator.py` and `metagpt/ext/aflow/scripts/optimizer.py` to find effective workflows for your custom dataset.
## Quick Start
1. Configure optimization parameters:
- Use command line arguments or modify default parameters in `examples/aflow/optimize.py`:
```python
--dataset MATH # Dataset type (HumanEval/MBPP/GSM8K/MATH/HotpotQA/DROP)
--sample 4 # Sample count - number of workflows to be resampled
--question_type math # Question type (math/code/qa)
--optimized_path PATH # Optimized result save path
--initial_round 1 # Initial round
--max_rounds 20 # Max iteration rounds for AFLOW
--check_convergence # Whether to enable early stop
--validation_rounds 5 # Validation rounds for AFLOW
--if_first_optimize # Set True for first optimization, False afterwards
```
2. Configure LLM parameters in `config/config2.yaml` (see `examples/aflow/config2.example.yaml` for reference)
3. Set up operators in `optimize.py` and in `optimized_path/template/operator.py`, `optimized_path/template/operator.json`. You can reference our implementation to add operators for specific datasets
4. For first-time use, download datasets and initial rounds by setting `download(["datasets", "initial_rounds"])` in `examples/aflow/optimize.py`
5. (Optional) Add your custom dataset and corresponding evaluation function following the [Custom Datasets](#custom-datasets) section
6. (Optional) If you want to use a portion of the validation data, you can set `va_list` in `examples/aflow/evaluator.py`
7. Run the optimization:
```bash
# Using default parameters
python -m examples.aflow.optimize
# Or with custom parameters
python -m examples.aflow.optimize --dataset MATH --sample 4 --question_type math
```
## Reproduce the Results in the Paper
1. We provide the raw data obtained from our experiments in this [link](https://drive.google.com/uc?export=download&id=1Sr5wjgKf3bN8OC7G6cO3ynzJqD4w6_Dv), including the workflows and prompts generated in each iteration, as well as their trajectories on the validation dataset. We also provide the optimal workflow for each dataset and the corresponding data on the test dataset. You can download these data using `metagpt/ext/aflow/data/download_data.py`.
2. You can directly reproduce our experimental results by running the scripts in `examples/aflow/experiments`.
## Citation
If you use AFlow in your research, please cite our paper:
```
@misc{zhang2024aflow,
title={AFlow: Automating Agentic Workflow Generation},
author={Jiayi Zhang and Jinyu Xiang and Zhaoyang Yu and Fengwei Teng and Xionghui Chen and Jiaqi Chen and Mingchen Zhuge and Xin Cheng and Sirui Hong and Jinlin Wang and Bingnan Zheng and Bang Liu and Yuyu Luo and Chenglin Wu},
year={2024},
eprint={2410.10762},
archivePrefix={arXiv},
primaryClass={cs.AI},
url={https://arxiv.org/abs/2410.10762},
}
```

View file

@ -0,0 +1,12 @@
models:
"<model_name>": # model: "gpt-4-turbo" # or gpt-3.5-turbo
api_type: "openai" # or azure / ollama / groq etc.
base_url: "<your base url>"
api_key: "<your api key>"
temperature: 0
"<model_name>":
api_type: "openai"
base_url: "<your base url>"
api_key: "<your api key>"
temperature: 0
CALC_USAGE: True

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for DROP")
parser.add_argument("--dataset", type=str, default="DROP", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="qa", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"AnswerGenerate",
"ScEnsemble",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for GSM8K")
parser.add_argument("--dataset", type=str, default="GSM8K", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="math", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"ScEnsemble",
"Programmer",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for HotpotQA")
parser.add_argument("--dataset", type=str, default="HotpotQA", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="qa", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"AnswerGenerate",
"ScEnsemble",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for HumanEval")
parser.add_argument("--dataset", type=str, default="HumanEval", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="code", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"CustomCodeGenerate",
"ScEnsemble",
"Test",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for MATH")
parser.add_argument("--dataset", type=str, default="MATH", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="math", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"ScEnsemble",
"Programmer",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.scripts.evaluator import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer for MBPP")
parser.add_argument("--dataset", type=str, default="MBPP", help="Dataset type")
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="code", help="Question type")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
operators = [
"Custom",
"CustomCodeGenerate",
"ScEnsemble",
"Test",
]
optimizer = Optimizer(
dataset=args.dataset,
question_type=args.question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=args.check_convergence,
operators=operators,
optimized_path=args.optimized_path,
sample=args.sample,
initial_round=args.initial_round,
max_rounds=args.max_rounds,
validation_rounds=args.validation_rounds,
)
optimizer.optimize("Graph")

View file

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Entrance of AFlow.
import argparse
from metagpt.configs.models_config import ModelsConfig
from metagpt.ext.aflow.data.download_data import download
from metagpt.ext.aflow.scripts.optimizer import Optimizer
def parse_args():
parser = argparse.ArgumentParser(description="AFlow Optimizer")
parser.add_argument(
"--dataset",
type=str,
default="MATH",
help="Dataset type, including HumanEval, MBPP, GSM8K, MATH, HotpotQA, DROP",
)
parser.add_argument("--sample", type=int, default=4, help="Sample count")
parser.add_argument("--question_type", type=str, default="math", help="Question type, including math, code, qa")
parser.add_argument(
"--optimized_path", type=str, default="metagpt/ext/aflow/scripts/optimized", help="Optimized result save path"
)
parser.add_argument("--initial_round", type=int, default=1, help="Initial round")
parser.add_argument("--max_rounds", type=int, default=20, help="Max iteration rounds")
parser.add_argument("--check_convergence", type=bool, default=True, help="Whether to enable early stop")
parser.add_argument("--validation_rounds", type=int, default=5, help="Validation rounds")
parser.add_argument("--if_first_optimize", type=bool, default=True, help="Whether this is first optimization")
return parser.parse_args()
# Config llm model, you can modify `config/config2.yaml` to use more llms.
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
# Config operators.
operators = [
"Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes.
# "AnswerGenerate", # It's for qa
# "CustomCodeGenerate", # It's for code
"ScEnsemble", # It's for code, math and qa
# "Test", # It's for code
"Programmer", # It's for math
]
if __name__ == "__main__":
args = parse_args()
# Create an optimizer instance
optimizer = Optimizer(
dataset=args.dataset, # Config dataset
question_type=args.question_type, # Config Question Type
opt_llm_config=claude_llm_config, # Config Optimizer LLM
exec_llm_config=mini_llm_config, # Config Execution LLM
check_convergence=args.check_convergence, # Whether Early Stop
operators=operators, # Config Operators you want to use
optimized_path=args.optimized_path, # Config Optimized workflow's file path
sample=args.sample, # Only Top(sample) rounds will be selected.
initial_round=args.initial_round, # Optimize from initial round
max_rounds=args.max_rounds, # The max iteration of AFLOW.
validation_rounds=args.validation_rounds, # The validation rounds of AFLOW.
)
# When you fisrt use, please download the datasets and initial rounds; If you want to get a look of the results, please download the results.
download(["datasets", "initial_rounds"], if_first_download=args.if_first_optimize)
# Optimize workflow via setting the optimizer's mode to 'Graph'
optimizer.optimize("Graph")
# Test workflow via setting the optimizer's mode to 'Test'
# optimizer.optimize("Test")

View file

@ -9,6 +9,7 @@ NOTE: You should use typing.List instead of list to do type annotation. Because
we can use typing to extract the type of the node, but we cannot use built-in list to extract.
"""
import json
import re
import typing
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Type, Union
@ -23,6 +24,7 @@ from metagpt.logs import logger
from metagpt.provider.postprocess.llm_output_postprocess import llm_output_postprocess
from metagpt.utils.common import OutputParser, general_after_log
from metagpt.utils.human_interaction import HumanInteraction
from metagpt.utils.sanitize import sanitize
class ReviewMode(Enum):
@ -38,9 +40,17 @@ class ReviseMode(Enum):
TAG = "CONTENT"
class FillMode(Enum):
CODE_FILL = "code_fill"
XML_FILL = "xml_fill"
SINGLE_FILL = "single_fill"
LANGUAGE_CONSTRAINT = "Language: Please use the same language as Human INPUT."
FORMAT_CONSTRAINT = f"Format: output wrapped inside [{TAG}][/{TAG}] like format example, nothing else."
SIMPLE_TEMPLATE = """
## context
{context}
@ -471,6 +481,116 @@ class ActionNode:
return self
def get_field_name(self):
"""
Get the field name from the Pydantic model associated with this ActionNode.
"""
model_class = self.create_class()
fields = model_class.model_fields
# Assuming there's only one field in the model
if len(fields) == 1:
return next(iter(fields))
# If there are multiple fields, we might want to use self.key to find the right one
return self.key
def get_field_names(self):
"""
获取与此ActionNode关联的Pydantic模型的字段名称
"""
model_class = self.create_class()
return model_class.model_fields.keys()
def get_field_types(self):
"""
获取与此ActionNode关联的Pydantic模型的字段类型
"""
model_class = self.create_class()
return {field_name: field.annotation for field_name, field in model_class.model_fields.items()}
def xml_compile(self, context):
"""
Compile the prompt to make it easier for the model to understand the format.
"""
field_names = self.get_field_names()
# Construct the example using the field names
examples = []
for field_name in field_names:
examples.append(f"<{field_name}>content</{field_name}>")
# Join all examples into a single string
example_str = "\n".join(examples)
# Add the example to the context
context += f"""
### Response format (must be strictly followed): All content must be enclosed in the given XML tags, ensuring each opening <tag> has a corresponding closing </tag>, with no incomplete or self-closing tags allowed.\n
{example_str}
"""
return context
async def code_fill(
self, context: str, function_name: Optional[str] = None, timeout: int = USE_CONFIG_TIMEOUT
) -> Dict[str, str]:
"""
Fill CodeBlock Using ``` ```
"""
field_name = self.get_field_name()
prompt = context
content = await self.llm.aask(prompt, timeout=timeout)
extracted_code = sanitize(code=content, entrypoint=function_name)
result = {field_name: extracted_code}
return result
async def single_fill(self, context: str) -> Dict[str, str]:
field_name = self.get_field_name()
prompt = context
content = await self.llm.aask(prompt)
result = {field_name: content}
return result
async def xml_fill(self, context: str) -> Dict[str, Any]:
"""
Fill context with XML tags and convert according to field types, including string, integer, boolean, list and dict types
"""
field_names = self.get_field_names()
field_types = self.get_field_types()
extracted_data: Dict[str, Any] = {}
content = await self.llm.aask(context)
for field_name in field_names:
pattern = rf"<{field_name}>(.*?)</{field_name}>"
match = re.search(pattern, content, re.DOTALL)
if match:
raw_value = match.group(1).strip()
field_type = field_types.get(field_name)
if field_type == str:
extracted_data[field_name] = raw_value
elif field_type == int:
try:
extracted_data[field_name] = int(raw_value)
except ValueError:
extracted_data[field_name] = 0 # 或者其他默认值
elif field_type == bool:
extracted_data[field_name] = raw_value.lower() in ("true", "yes", "1", "on", "True")
elif field_type == list:
try:
extracted_data[field_name] = eval(raw_value)
if not isinstance(extracted_data[field_name], list):
raise ValueError
except:
extracted_data[field_name] = [] # 默认空列表
elif field_type == dict:
try:
extracted_data[field_name] = eval(raw_value)
if not isinstance(extracted_data[field_name], dict):
raise ValueError
except:
extracted_data[field_name] = {} # 默认空字典
return extracted_data
async def fill(
self,
context,
@ -481,6 +601,7 @@ class ActionNode:
images: Optional[Union[str, list[str]]] = None,
timeout=USE_CONFIG_TIMEOUT,
exclude=[],
function_name: str = None,
):
"""Fill the node(s) with mode.
@ -507,6 +628,22 @@ class ActionNode:
if self.schema:
schema = self.schema
if mode == FillMode.CODE_FILL.value:
result = await self.code_fill(context, function_name, timeout)
self.instruct_content = self.create_class()(**result)
return self
elif mode == FillMode.XML_FILL.value:
context = self.xml_compile(context=self.context)
result = await self.xml_fill(context)
self.instruct_content = self.create_class()(**result)
return self
elif mode == FillMode.SINGLE_FILL.value:
result = await self.single_fill(context)
self.instruct_content = self.create_class()(**result)
return self
if strgy == "simple":
return await self.simple_fill(schema=schema, mode=mode, images=images, timeout=timeout, exclude=exclude)
elif strgy == "complex":

View file

@ -0,0 +1,29 @@
# Custom Evaluation Function via Benchmark Class
## How to Use
To create a benchmark for a new dataset, follow these steps:
1. Create a new Python file, e.g., `my_dataset_benchmark.py`
2. Import the base class:
```python
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
```
3. Create a new class that inherits from `BaseBenchmark`:
```python
class MyDatasetBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
```
4. Implement the required abstract methods:
- `evaluate_problem`: Evaluate a single problem
- `calculate_score`: Calculate the score for a prediction
- `get_result_columns`: Define column names for the results CSV file
5. Override other methods as needed, such as `load_data` or `save_results_to_csv`
## Example
Refer to the `DROPBenchmark` class in the `drop.py` file for an example of how to implement a benchmark for a specific dataset.
By following these guidelines, you can easily create custom benchmark evaluations for new datasets.

View file

@ -0,0 +1,104 @@
import asyncio
import json
import os
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, List, Tuple
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from metagpt.logs import logger
from metagpt.utils.common import write_json_file
class BaseBenchmark(ABC):
def __init__(self, name: str, file_path: str, log_path: str):
self.name = name
self.file_path = file_path
self.log_path = log_path
PASS = "PASS"
FAIL = "FAIL"
async def load_data(self, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(self.file_path, mode="r", encoding="utf-8") as file:
async for line in file:
data.append(json.loads(line))
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
def save_results_to_csv(self, results: List[Tuple[Any, ...]], columns: List[str]):
df = pd.DataFrame(results, columns=columns)
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(self.log_path, filename)
df.to_csv(output_file, index=False)
logger.info(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
def log_mismatch(
self,
problem: str,
expected_output: Any,
prediction: str,
extracted_output: Any,
extract_answer_code: str = "None",
):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": extracted_output,
"extract_answer_code": extract_answer_code,
}
log_file = Path(self.log_path) / "log.json"
if log_file.exists():
with log_file.open("r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
data = []
data.append(log_data)
write_json_file(log_file, data, encoding="utf-8", indent=4)
@abstractmethod
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[Any, ...]:
pass
@abstractmethod
def calculate_score(self, expected_output: Any, prediction: Any) -> Tuple[float, Any]:
pass
@abstractmethod
def get_result_columns(self) -> List[str]:
pass
async def evaluate_all_problems(self, data: List[dict], graph: Callable, max_concurrent_tasks: int = 50):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
return await self.evaluate_problem(problem, graph)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc=f"Evaluating {self.name} problems", total=len(data))
async def run_evaluation(self, graph: Callable, va_list: List[int], max_concurrent_tasks: int = 50):
data = await self.load_data(va_list)
results = await self.evaluate_all_problems(data, graph, max_concurrent_tasks)
columns = self.get_result_columns()
average_score, average_cost, total_cost = self.save_results_to_csv(results, columns)
logger.info(f"Average score on {self.name} dataset: {average_score:.5f}")
logger.info(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost

View file

@ -0,0 +1,83 @@
import re
import string
from collections import Counter
from typing import Callable, List, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
class DROPBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def normalize_answer(self, s: str) -> List[str]:
"""
Normalize answers for evaluation.
"""
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
"""
Compute the F1 score between prediction and ground truth answers.
"""
prediction_tokens = self.normalize_answer(prediction).split()
ground_truth_tokens = self.normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = problem["context"]
expected_output = problem["ref_text"]
answers = expected_output.split("|")
try:
output, cost = await self._generate_output(graph, input_text)
f1_scores = []
for answer in answers:
if answer.strip() != "":
output_parts = output.split("|")
for output_part in output_parts:
f1_score, _ = self.calculate_score(answer, output_part)
f1_scores.append(f1_score)
uni_score = max(f1_scores)
if uni_score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
return input_text, output, expected_output, uni_score, cost
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# @Date :
# @Author : all
# @Desc : test on gsm8k
import re
from typing import Callable, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
class GSM8KBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def extract_number(self, text: str) -> Optional[float]:
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text))
if matches:
last_number = matches[-1].replace(",", "")
try:
return float(last_number)
except ValueError:
return None
else:
return None
def calculate_score(self, expected_output: float, prediction: float) -> Tuple[float, float]:
if prediction is None:
return 0.0, prediction
return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]:
input_text = problem["question"]
expected_output = self.extract_number(problem["answer"])
try:
output, cost = await self._generate_output(graph, input_text)
predicted_number = self.extract_number(output)
score, extracted_output = self.calculate_score(expected_output, predicted_number)
if score == 0:
self.log_mismatch(input_text, expected_output, output, extracted_output)
return input_text, output, expected_output, score, cost
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,71 @@
import re
import string
from collections import Counter
from typing import Callable, List, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
class HotpotQABenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def normalize_answer(self, s: str) -> str:
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
prediction_tokens = self.normalize_answer(prediction).split()
ground_truth_tokens = self.normalize_answer(ground_truth).split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return 0, prediction
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]:
input_text = problem["question"]
expected_output = problem["answer"]
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
try:
output, cost = await self._generate_output(graph, inputs)
score, extracted_output = self.calculate_score(expected_output, output)
if (
score < 0.3
): # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1
self.log_mismatch(input_text, expected_output, output, extracted_output)
return input_text, context_str, output, expected_output, score, cost
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, context_str, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "context", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,151 @@
import asyncio
import threading
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
from metagpt.utils.sanitize import sanitize
class HumanEvalBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
class TimeoutError(Exception):
pass
def run_with_timeout(self, func, args, timeout):
result = []
stop_event = threading.Event()
def target():
try:
result.append(func(*args))
except Exception as e:
result.append(e)
finally:
stop_event.set()
thread = threading.Thread(target=target)
thread.start()
is_timeout = not stop_event.wait(timeout)
if is_timeout:
raise self.TimeoutError("Function execution timed out")
if not result:
return None
if isinstance(result[0], Exception):
raise result[0]
return result[0]
def check_solution(self, solution, test, entry_point):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
global_dict = {
"math": __import__("math"),
"hashlib": __import__("hashlib"),
"re": __import__("re"),
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Optional": Optional,
"Any": Any,
}
# Add handling for special cases
if entry_point == "decode_cyclic":
solution = (
'\n\ndef encode_cyclic(s: str):\n """\n returns encoded string by cycling groups of three characters.\n """\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return "".join(groups)'
+ "\n\n"
+ solution
)
elif entry_point == "decode_shift":
solution = (
'\n\ndef encode_shift(s: str):\n """\n returns encoded string by shifting every character by 5 in the alphabet.\n """\n return "".join([chr(((ord(ch) + 5 - ord("a")) % 26) + ord("a")) for ch in s])\n\n\n'
+ solution
)
elif entry_point == "find_zero":
solution = (
"\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n"
+ solution
)
exec(solution, global_dict)
if entry_point not in global_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
exec(test, global_dict)
check = global_dict["check"]
result = self.run_with_timeout(check, (global_dict[entry_point],), 15)
if result is None:
result = (self.PASS, "The solution passed all test cases.")
except self.TimeoutError:
result = (
self.FAIL,
"Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.",
)
except Exception as e:
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
result = (self.FAIL, error_message)
with open("error.log", "a", encoding="utf-8") as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, prompt, entry_point):
# Generate output with a timeout of 60 seconds
return await asyncio.wait_for(graph(prompt, entry_point), timeout=60)
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = data["prompt"]
expected_output = (
"\nCorrect Solution:\ndef "
+ data["entry_point"]
+ "(params you should put here):"
+ "\n\n"
+ data["canonical_solution"]
)
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + expected_output
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0
# Log mismatch if the score is 0
if score == 0:
self.log_mismatch(input_text, expected_output, prediction, score)
return input_text, prediction, expected_output, score, cost
except asyncio.TimeoutError:
logger.info("Timeout error. Skipping this sample.")
return input_text, "Timeout", expected_output, 0.0, 0.0
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
# The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface
return 0.0, prediction
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,137 @@
import inspect
import re
from math import isclose
from typing import Any, Callable, List, Tuple
import regex
from sympy import N, simplify
from sympy.parsing.latex import parse_latex
from sympy.parsing.sympy_parser import parse_expr
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
class MATHBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
def extract_model_answer(self, text: str) -> str:
pattern = r"\\boxed{((?:[^{}]|{[^{}]*})*)}"
boxed_matches = re.findall(pattern, text, re.DOTALL)
if boxed_matches:
return boxed_matches[-1].strip()
sentence_end_pattern = r"(?<!\d)[.!?]\s+"
sentences = re.split(sentence_end_pattern, text)
sentences = [s.strip() for s in sentences if s.strip()]
return sentences[-1] if sentences else ""
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[int, str]:
expected_answer = self.extract_model_answer(expected_output)
predicted_answer = self.extract_model_answer(prediction)
if self.math_equal(predicted_answer, expected_answer):
return 1, predicted_answer
else:
return 0, predicted_answer
def math_equal(self, prediction: Any, reference: Any) -> bool:
if str(prediction) == str(reference):
return True
try:
if self.is_digit(prediction) and self.is_digit(reference):
prediction = self.parse_digits(prediction)
reference = self.parse_digits(reference)
return isclose(prediction, reference, abs_tol=1e-3)
except:
pass
try:
return self.symbolic_equal(prediction, reference)
except:
pass
return False
def is_digit(self, num):
return self.parse_digits(num) is not None
def parse_digits(self, num):
num = regex.sub(",", "", str(num))
try:
return float(num)
except:
if num.endswith("%"):
num = num[:-1]
if num.endswith("\\"):
num = num[:-1]
try:
return float(num) / 100
except:
pass
return None
def symbolic_equal(self, a, b):
def _parse(s):
for f in [parse_latex, parse_expr]:
try:
return f(s)
except:
pass
return s
a = _parse(a)
b = _parse(b)
try:
if simplify(a - b) == 0:
return True
except:
pass
try:
if isclose(N(a), N(b), abs_tol=1e-3):
return True
except:
pass
return False
def get_function_code(self, func):
try:
source_code = inspect.getsource(func)
return source_code
except OSError:
return "no code"
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, int, float]:
input_text = problem["problem"]
expected_output = problem["solution"]
try:
output, cost = await self._generate_output(graph, input_text)
uni_score, extracted_output = self.calculate_score(expected_output, output)
if uni_score == 0:
self.log_mismatch(
input_text,
expected_output,
output,
extracted_output,
extract_answer_code=self.get_function_code(self.extract_model_answer),
)
return input_text, output, expected_output, uni_score, cost
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,121 @@
import threading
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.logs import logger
from metagpt.utils.sanitize import sanitize
class MBPPBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
class TimeoutError(Exception):
pass
def run_with_timeout(self, func, timeout):
result = []
stop_event = threading.Event()
def target():
try:
result.append(func())
except Exception as e:
result.append(e)
finally:
stop_event.set()
thread = threading.Thread(target=target)
thread.start()
is_timeout = not stop_event.wait(timeout)
if is_timeout:
raise self.TimeoutError("Function execution timed out")
if not result:
return None
if isinstance(result[0], Exception):
raise result[0]
return result[0]
def check_solution(self, solution, test, entry_point):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
global_dict = {
"math": __import__("math"),
"hashlib": __import__("hashlib"),
"re": __import__("re"),
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Optional": Optional,
"Any": Any,
}
exec(solution, global_dict)
if entry_point not in global_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
exec(test, global_dict)
check = global_dict["check"]
result = self.run_with_timeout(check, 15)
if result is None:
result = (self.PASS, "The solution passed all test cases.")
except self.TimeoutError:
result = (
self.FAIL,
"Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.",
)
except Exception as e:
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
result = (self.FAIL, error_message)
with open("error.log", "a", encoding="utf-8") as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, prompt, entry_point):
return await graph(prompt, entry_point)
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = data["prompt"]
expected_output = "\nCorrect Solution:\ndef " + data["code"]
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:" + data["code"]
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0
# Log mismatch if the score is 0
if score == 0:
self.log_mismatch(input_text, expected_output, prediction, score)
return input_text, prediction, expected_output, score, cost
except Exception as e:
logger.info(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
# The scoring logic for MBPP is already implemented in evaluate_problem, this is just to conform to the interface
return 0.0, prediction
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -0,0 +1,67 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/7/24 16:37
@Author : didi
@File : utils.py
"""
import json
import os
import numpy as np
from metagpt.utils.common import read_json_file, write_json_file
def generate_random_indices(n, n_samples, test=False):
"""
Generate random indices
"""
def _set_seed(seed=42):
np.random.seed(seed)
_set_seed()
indices = np.arange(n)
np.random.shuffle(indices)
if test:
return indices[n_samples:]
else:
return indices[:n_samples]
def split_data_set(file_path, samples, test=False):
data = []
with open(file_path, "r") as file:
for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples, test)
data = [data[i] for i in random_indices]
return data
def log_mismatch(problem, expected_output, prediction, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number,
}
log_file = os.path.join(path, "log.json")
# Check if the log file already exists
if os.path.exists(log_file):
# If it exists, load the existing log data
data = read_json_file(log_file)
else:
# If it does not exist, create a new log list
data = []
# Add the new log entry
data.append(log_data)
# Write the data back to log.json file
write_json_file(log_file, data, encoding="utf-8", indent=4)

View file

@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
# @Date : 2024-10-20
# @Author : MoshiQAQ & didi
# @Desc : Download and extract dataset files
import os
import tarfile
from typing import Dict
import requests
from tqdm import tqdm
from metagpt.logs import logger
def download_file(url: str, filename: str) -> None:
"""Download a file from the given URL and show progress."""
response = requests.get(url, stream=True)
total_size = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar = tqdm(total=total_size, unit="iB", unit_scale=True)
with open(filename, "wb") as file:
for data in response.iter_content(block_size):
size = file.write(data)
progress_bar.update(size)
progress_bar.close()
def extract_tar_gz(filename: str, extract_path: str) -> None:
"""Extract a tar.gz file to the specified path."""
with tarfile.open(filename, "r:gz") as tar:
tar.extractall(path=extract_path)
def process_dataset(url: str, filename: str, extract_path: str) -> None:
"""Download, extract, and clean up a dataset."""
logger.info(f"Downloading {filename}...")
download_file(url, filename)
logger.info(f"Extracting {filename}...")
extract_tar_gz(filename, extract_path)
logger.info(f"{filename} download and extraction completed.")
os.remove(filename)
logger.info(f"Removed {filename}")
# Define the datasets to be downloaded
# Users can modify this list to choose which datasets to download
datasets_to_download: Dict[str, Dict[str, str]] = {
"datasets": {
"url": "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e",
"filename": "aflow_data.tar.gz",
"extract_path": "metagpt/ext/aflow/data",
},
"results": {
"url": "https://drive.google.com/uc?export=download&id=1Sr5wjgKf3bN8OC7G6cO3ynzJqD4w6_Dv",
"filename": "result.tar.gz",
"extract_path": "metagpt/ext/aflow/data/results",
},
"initial_rounds": {
"url": "https://drive.google.com/uc?export=download&id=1UBoW4WBWjX2gs4I_jq3ALdXeLdwDJMdP",
"filename": "initial_rounds.tar.gz",
"extract_path": "metagpt/ext/aflow/scripts/optimized",
},
}
def download(required_datasets, if_first_download: bool = True):
"""Main function to process all selected datasets"""
if if_first_download:
for dataset_name in required_datasets:
dataset = datasets_to_download[dataset_name]
extract_path = dataset["extract_path"]
process_dataset(dataset["url"], dataset["filename"], extract_path)
else:
logger.info("Skip downloading datasets")

View file

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 10:00 AM
# @Author : all
# @Desc : Evaluation for different datasets
from typing import Dict, Literal, Tuple
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.ext.aflow.benchmark.drop import DROPBenchmark
from metagpt.ext.aflow.benchmark.gsm8k import GSM8KBenchmark
from metagpt.ext.aflow.benchmark.hotpotqa import HotpotQABenchmark
from metagpt.ext.aflow.benchmark.humaneval import HumanEvalBenchmark
from metagpt.ext.aflow.benchmark.math import MATHBenchmark
from metagpt.ext.aflow.benchmark.mbpp import MBPPBenchmark
# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Evaluator:
"""
Complete the evaluation for different datasets here
"""
def __init__(self, eval_path: str):
self.eval_path = eval_path
self.dataset_configs: Dict[DatasetType, BaseBenchmark] = {
"GSM8K": GSM8KBenchmark,
"MATH": MATHBenchmark,
"HumanEval": HumanEvalBenchmark,
"HotpotQA": HotpotQABenchmark,
"MBPP": MBPPBenchmark,
"DROP": DROPBenchmark,
}
async def graph_evaluate(
self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False
) -> Tuple[float, float, float]:
if dataset not in self.dataset_configs:
raise ValueError(f"Unsupported dataset: {dataset}")
data_path = self._get_data_path(dataset, is_test)
benchmark_class = self.dataset_configs[dataset]
benchmark = benchmark_class(name=dataset, file_path=data_path, log_path=path)
# Use params to configure the graph and benchmark
configured_graph = await self._configure_graph(dataset, graph, params)
if is_test:
va_list = None # For test data, generally use None to test all
else:
va_list = None # Use None to test all Validation data, or set va_list (e.g., [1, 2, 3]) to use partial data
return await benchmark.run_evaluation(configured_graph, va_list)
async def _configure_graph(self, dataset, graph, params: dict):
# Here you can configure the graph based on params
# For example: set LLM configuration, dataset configuration, etc.
dataset_config = params.get("dataset", {})
llm_config = params.get("llm_config", {})
return graph(name=dataset, llm_config=llm_config, dataset=dataset_config)
def _get_data_path(self, dataset: DatasetType, test: bool) -> str:
base_path = f"metagpt/ext/aflow/data/{dataset.lower()}"
return f"{base_path}_test.jsonl" if test else f"{base_path}_validate.jsonl"

View file

@ -0,0 +1,360 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of aflow
import asyncio
import concurrent.futures
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions.action_node import ActionNode
from metagpt.ext.aflow.scripts.operator_an import (
AnswerGenerateOp,
CodeGenerateOp,
FormatOp,
GenerateOp,
MdEnsembleOp,
ReflectionTestOp,
ReviewOp,
ReviseOp,
ScEnsembleOp,
)
from metagpt.ext.aflow.scripts.prompts.prompt import (
ANSWER_GENERATION_PROMPT,
FORMAT_PROMPT,
MD_ENSEMBLE_PROMPT,
PYTHON_CODE_VERIFIER_PROMPT,
REFLECTION_ON_PUBLIC_TEST_PROMPT,
REVIEW_PROMPT,
REVISE_PROMPT,
SC_ENSEMBLE_PROMPT,
)
from metagpt.ext.aflow.scripts.utils import (
extract_test_cases_from_jsonl,
test_case_2_test_function,
)
from metagpt.llm import LLM
from metagpt.logs import logger
class Operator:
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(llm, name)
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
response = await self._fill_node(AnswerGenerateOp, prompt, mode="xml_fill")
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(llm, name)
async def __call__(self, problem, entry_point, instruction):
prompt = instruction + problem
response = await self._fill_node(GenerateOp, prompt, mode="code_fill", function_name=entry_point)
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str], problem: str):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(question=problem, solutions=solution_text)
response = await self._fill_node(ScEnsembleOp, prompt, mode="xml_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}
def run_code(code):
try:
# Create a new global namespace
global_namespace = {}
disallowed_imports = [
"os",
"sys",
"subprocess",
"multiprocessing",
"matplotlib",
"seaborn",
"plotly",
"bokeh",
"ggplot",
"pylab",
"tkinter",
"PyQt5",
"wx",
"pyglet",
]
# Check for prohibited imports
for lib in disallowed_imports:
if f"import {lib}" in code or f"from {lib}" in code:
logger.info("Detected prohibited import: %s", lib)
return "Error", f"Prohibited import: {lib} and graphing functionalities"
# Use exec to execute the code
exec(code, global_namespace)
# Assume the code defines a function named 'solve'
if "solve" in global_namespace and callable(global_namespace["solve"]):
result = global_namespace["solve"]()
return "Success", str(result)
else:
return "Error", "Function 'solve' not found"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"
class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
super().__init__(llm, name)
async def exec_code(self, code, timeout=30):
"""
Asynchronously execute code and return an error if timeout occurs.
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
# Submit run_code task to the process pool
future = loop.run_in_executor(executor, run_code, code)
# Wait for the task to complete or timeout
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
# Timeout, attempt to shut down the process pool
executor.shutdown(wait=False, cancel_futures=True)
return "Error", "Code execution timed out"
except Exception as e:
return "Error", f"Unknown error: {str(e)}"
async def code_generate(self, problem, analysis, feedback, mode):
"""
Asynchronous method to generate code.
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(problem=problem, analysis=analysis, feedback=feedback)
response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve")
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def __call__(self, problem: str, analysis: str = "None"):
"""
Call method, generate code and execute, retry up to 3 times.
"""
code = None
output = None
feedback = ""
for i in range(3):
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
code = code_response.get("code")
if not code:
return {"code": code, "output": "No code generated"}
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
else:
logger.info(f"Execution error on attempt {i + 1}, error message: {output}")
feedback = (
f"\nThe result of the error from the code you wrote in the previous round:\n"
f"Code: {code}\n\nStatus: {status}, {output}"
)
return {"code": code, "output": output}
class Test(Operator):
def __init__(self, llm: LLM, name: str = "Test"):
super().__init__(llm, name)
def exec_code(self, solution, entry_point):
test_cases = extract_test_cases_from_jsonl(entry_point)
fail_cases = []
for test_case in test_cases:
test_code = test_case_2_test_function(solution, test_case, entry_point)
try:
exec(test_code, globals())
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
with open("tester.txt", "a") as f:
f.write("test_error of " + entry_point + "\n")
error_infomation = {
"test_fail_case": {
"test_case": test_case,
"error_type": "AssertionError",
"error_message": str(e),
"traceback": tb_str,
}
}
fail_cases.append(error_infomation)
except Exception as e:
with open("tester.txt", "a") as f:
f.write(entry_point + " " + str(e) + "\n")
return {"exec_fail_case": str(e)}
if fail_cases != []:
return fail_cases
else:
return "no error"
async def __call__(self, problem, solution, entry_point, test_loop: int = 3):
"""
"Test": {
"description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information",
"interface": "test(problem: str, solution: str, entry_point: str) -> str"
}
"""
for _ in range(test_loop):
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
elif "exec_fail_case" in result:
result = result["exec_fail_case"]
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
else:
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass="executed successfully",
test_fail=result,
)
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
else:
return {"result": False, "solution": solution}
class Format(Operator):
def __init__(self, llm: LLM, name: str = "Format"):
super().__init__(llm, name)
async def __call__(self, problem, solution, mode: str = None):
prompt = FORMAT_PROMPT.format(problem_description=problem, solution=solution)
response = await self._fill_node(FormatOp, prompt, mode)
return response
class Review(Operator):
def __init__(self, llm: LLM, name: str = "Review"):
super().__init__(llm, name)
async def __call__(self, problem, solution, mode: str = None):
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
response = await self._fill_node(ReviewOp, prompt, mode="xml_fill")
return response
class Revise(Operator):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(llm, name)
async def __call__(self, problem, solution, feedback, mode: str = None):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
response = await self._fill_node(ReviseOp, prompt, mode="xml_fill")
return response
class MdEnsemble(Operator):
"""
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
Link: https://arxiv.org/abs/2311.16452
"""
def __init__(self, llm: LLM, name: str = "MdEnsemble", vote_count: int = 5):
super().__init__(llm, name)
self.vote_count = vote_count
@staticmethod
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
shuffled_solutions = solutions.copy()
random.shuffle(shuffled_solutions)
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
return shuffled_solutions, answer_mapping
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
logger.info(f"solution count: {len(solutions)}")
all_responses = []
for _ in range(self.vote_count):
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
solution_text = ""
for index, solution in enumerate(shuffled_solutions):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
response = await self._fill_node(MdEnsembleOp, prompt, mode="xml_fill")
answer = response.get("solution_letter", "A")
answer = answer.strip().upper()
if answer in answer_mapping:
original_index = answer_mapping[answer]
all_responses.append(original_index)
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"solution": final_answer}

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class CodeGenerateOp(BaseModel):
code: str = Field(default="", description="Your complete code solution for this problem")
class AnswerGenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")
class FormatOp(BaseModel):
solution: str = Field(default="", description="Your formatted answer for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ReflectionTestOp(BaseModel):
reflection_and_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)
class MdEnsembleOp(BaseModel):
thought: str = Field(default="", description="Step-by-step analysis of the solutions to determine the best one.")
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class ReviewOp(BaseModel):
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
class ReviseOp(BaseModel):
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")

View file

@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-
# @Date : 8/12/2024 22:00 PM
# @Author : issac
# @Desc : optimizer for graph
import asyncio
import time
from typing import List, Literal
from pydantic import BaseModel, Field
from metagpt.actions.action_node import ActionNode
from metagpt.ext.aflow.scripts.evaluator import DatasetType
from metagpt.ext.aflow.scripts.optimizer_utils.convergence_utils import ConvergenceUtils
from metagpt.ext.aflow.scripts.optimizer_utils.data_utils import DataUtils
from metagpt.ext.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils
from metagpt.ext.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils
from metagpt.ext.aflow.scripts.optimizer_utils.graph_utils import GraphUtils
from metagpt.logs import logger
from metagpt.provider.llm_provider_registry import create_llm_instance
QuestionType = Literal["math", "code", "qa"]
OptimizerType = Literal["Graph", "Test"]
class GraphOptimize(BaseModel):
modification: str = Field(default="", description="modification")
graph: str = Field(default="", description="graph")
prompt: str = Field(default="", description="prompt")
class Optimizer:
def __init__(
self,
dataset: DatasetType,
question_type: QuestionType,
opt_llm_config,
exec_llm_config,
operators: List,
sample: int,
check_convergence: bool = False,
optimized_path: str = None,
initial_round: int = 1,
max_rounds: int = 20,
validation_rounds: int = 5,
) -> None:
self.optimize_llm_config = opt_llm_config
self.optimize_llm = create_llm_instance(self.optimize_llm_config)
self.execute_llm_config = exec_llm_config
self.dataset = dataset
self.type = question_type
self.check_convergence = check_convergence
self.graph = None
self.operators = operators
self.root_path = f"{optimized_path}/{self.dataset}"
self.sample = sample
self.top_scores = []
self.round = initial_round
self.max_rounds = max_rounds
self.validation_rounds = validation_rounds
self.graph_utils = GraphUtils(self.root_path)
self.data_utils = DataUtils(self.root_path)
self.experience_utils = ExperienceUtils(self.root_path)
self.evaluation_utils = EvaluationUtils(self.root_path)
self.convergence_utils = ConvergenceUtils(self.root_path)
def optimize(self, mode: OptimizerType = "Graph"):
if mode == "Test":
test_n = 3 # validation datasets's execution number
for i in range(test_n):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
score = loop.run_until_complete(self.test())
return None
for opt_round in range(self.max_rounds):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
retry_count = 0
max_retries = 1
while retry_count < max_retries:
try:
score = loop.run_until_complete(self._optimize_graph())
break
except Exception as e:
retry_count += 1
logger.info(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
if retry_count == max_retries:
logger.info("Max retries reached. Moving to next round.")
score = None
wait_time = 5 * retry_count
time.sleep(wait_time)
if retry_count < max_retries:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.round += 1
logger.info(f"Score for round {self.round}: {score}")
converged, convergence_round, final_round = self.convergence_utils.check_convergence(top_k=3)
if converged and self.check_convergence:
logger.info(
f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}"
)
# Print average scores and standard deviations for each round
self.convergence_utils.print_results()
break
time.sleep(5)
async def _optimize_graph(self):
validation_n = self.validation_rounds # validation datasets's execution number
graph_path = f"{self.root_path}/workflows"
data = self.data_utils.load_results(graph_path)
if self.round == 1:
directory = self.graph_utils.create_round_directory(graph_path, self.round)
# Load graph using graph_utils
self.graph = self.graph_utils.load_graph(self.round, graph_path)
avg_score = await self.evaluation_utils.evaluate_graph(self, directory, validation_n, data, initial=True)
# Create a loop until the generated graph meets the check conditions
while True:
directory = self.graph_utils.create_round_directory(graph_path, self.round + 1)
top_rounds = self.data_utils.get_top_rounds(self.sample)
sample = self.data_utils.select_round(top_rounds)
prompt, graph_load = self.graph_utils.read_graph_files(sample["round"], graph_path)
graph = self.graph_utils.extract_solve_graph(graph_load)
processed_experience = self.experience_utils.load_experience()
experience = self.experience_utils.format_experience(processed_experience, sample["round"])
operator_description = self.graph_utils.load_operators_description(self.operators)
log_data = self.data_utils.load_log(sample["round"])
graph_optimize_prompt = self.graph_utils.create_graph_optimize_prompt(
experience, sample["score"], graph[0], prompt, operator_description, self.type, log_data
)
graph_optimize_node = await ActionNode.from_pydantic(GraphOptimize).fill(
context=graph_optimize_prompt, mode="xml_fill", llm=self.optimize_llm
)
response = await self.graph_utils.get_graph_optimize_response(graph_optimize_node)
# Check if the modification meets the conditions
check = self.experience_utils.check_modification(
processed_experience, response["modification"], sample["round"]
)
# If `check` is True, break the loop; otherwise, regenerate the graph
if check:
break
# Save the graph and evaluate
self.graph_utils.write_graph_files(directory, response, self.round + 1, self.dataset)
experience = self.experience_utils.create_experience_data(sample, response["modification"])
self.graph = self.graph_utils.load_graph(self.round + 1, graph_path)
logger.info(directory)
avg_score = await self.evaluation_utils.evaluate_graph(self, directory, validation_n, data, initial=False)
self.experience_utils.update_experience(directory, experience, avg_score)
return avg_score
async def test(self):
rounds = [5] # You can choose the rounds you want to test here.
data = []
graph_path = f"{self.root_path}/workflows_test"
json_file_path = self.data_utils.get_results_file_path(graph_path)
data = self.data_utils.load_results(graph_path)
for round in rounds:
directory = self.graph_utils.create_round_directory(graph_path, round)
self.graph = self.graph_utils.load_graph(round, graph_path)
score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test(self, directory, is_test=True)
new_data = self.data_utils.create_result_data(round, score, avg_cost, total_cost)
data.append(new_data)
self.data_utils.save_results(json_file_path, data)

View file

@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# @Date : 9/23/2024 10:00 AM
# @Author : Issac
# @Desc :
import json
import os
import numpy as np
from metagpt.logs import logger
class ConvergenceUtils:
def __init__(self, root_path):
self.root_path = root_path
self.data = None
self.rounds = None
self.avg_scores, self.stds = None, None
def load_data(self, root_path):
"""
Read JSON file, create a new file if it doesn't exist, then return the data.
"""
rounds_dir = os.path.join(root_path, "workflows")
result_file = os.path.join(rounds_dir, "results.json")
# Ensure directory exists
os.makedirs(rounds_dir, exist_ok=True)
# If file doesn't exist, create a new one with an empty list
if not os.path.exists(result_file):
with open(result_file, "w") as file:
json.dump([], file)
# Read file and return data
with open(result_file, "r") as file:
return json.load(file)
def process_rounds(self):
"""
Organize data by round, return a dictionary of scores by round.
"""
self.data = self.load_data(root_path=self.root_path)
rounds = {}
for entry in self.data:
round_number = entry["round"]
score = entry["score"]
if round_number not in rounds:
rounds[round_number] = []
rounds[round_number].append(score)
return rounds
def calculate_avg_and_std(self):
"""
Calculate average score and standard deviation for each round, return two lists: average scores and standard deviations.
"""
self.rounds = self.process_rounds()
sorted_rounds = sorted(self.rounds.items(), key=lambda x: x[0])
avg_scores = []
stds = []
for round_number, scores in sorted_rounds:
avg_scores.append(np.mean(scores))
stds.append(np.std(scores))
return avg_scores, stds
def check_convergence(self, top_k=3, z=0, consecutive_rounds=5):
"""
Check for convergence. z is the z-score corresponding to the confidence level.
consecutive_rounds is the number of consecutive rounds that must meet the stop condition.
"""
# Calculate average score and standard deviation for each round
self.avg_scores, self.stds = self.calculate_avg_and_std()
# If total rounds are not enough to calculate top_k+1 rounds, return not converged
if len(self.avg_scores) < top_k + 1:
return False, None, None
convergence_count = 0 # Convergence counter
previous_y = None # Y value of the previous round (average of top_k scores)
sigma_y_previous = None # Standard error of Y value from previous round
for i in range(len(self.avg_scores)):
# Dynamically select top_k from current round and all previous rounds
top_k_indices = np.argsort(self.avg_scores[: i + 1])[::-1][
:top_k
] # Select top k indices by descending average score
top_k_scores = [self.avg_scores[j] for j in top_k_indices] # Get list of top k scores
top_k_stds = [
self.stds[j] for j in top_k_indices
] # Get list of standard deviations corresponding to top k scores
# Calculate mean of top k scores for current round, i.e., y_current
y_current = np.mean(top_k_scores)
# Calculate standard error of y_current (sigma_y_current), representing score dispersion
sigma_y_current = np.sqrt(np.sum([s**2 for s in top_k_stds]) / (top_k**2))
# If not the first round, calculate change in Y (Delta_Y) and corresponding standard error
if previous_y is not None:
# Calculate Y difference between current round and previous round
delta_y = y_current - previous_y
# Calculate standard error of Y difference (sigma_Delta_Y)
sigma_delta_y = np.sqrt(sigma_y_current**2 + sigma_y_previous**2)
# Check if Y change is within acceptable confidence interval, i.e., convergence condition
if abs(delta_y) <= z * sigma_delta_y:
convergence_count += 1
# If consecutive converged rounds reach set value, return convergence information
if convergence_count >= consecutive_rounds:
return True, i - consecutive_rounds + 1, i
else:
# If change is large, reset convergence counter
convergence_count = 0
# Update Y value and standard error for previous round
previous_y = y_current
sigma_y_previous = sigma_y_current
# If convergence condition not met, return not converged
return False, None, None
def print_results(self):
"""
Print average score and standard deviation for all rounds.
"""
self.avg_scores, self.stds = self.calculate_avg_and_std()
for i, (avg_score, std) in enumerate(zip(self.avg_scores, self.stds), 1):
logger.info(f"Round {i}: Average Score = {avg_score:.4f}, Standard Deviation = {std:.4f}")
if __name__ == "__main__":
# Use this class and specify top_k
checker = ConvergenceUtils("path") # For example, set top_k=5
converged, convergence_round, final_round = checker.check_convergence()
if converged:
logger.info(f"Convergence detected, occurred at round {convergence_round}, final round is {final_round}")
else:
logger.info("No convergence detected within all rounds")
# Print average score and standard deviation for each round
checker.print_results()

View file

@ -0,0 +1,145 @@
import datetime
import json
import os
import random
import numpy as np
import pandas as pd
from metagpt.logs import logger
from metagpt.utils.common import read_json_file, write_json_file
class DataUtils:
def __init__(self, root_path: str):
self.root_path = root_path
self.top_scores = []
def load_results(self, path: str) -> list:
result_path = os.path.join(path, "results.json")
if os.path.exists(result_path):
return read_json_file(result_path, encoding="utf-8")
return []
def get_top_rounds(self, sample: int, path=None, mode="Graph"):
self._load_scores(path, mode)
unique_rounds = set()
unique_top_scores = []
first_round = next((item for item in self.top_scores if item["round"] == 1), None)
if first_round:
unique_top_scores.append(first_round)
unique_rounds.add(1)
for item in self.top_scores:
if item["round"] not in unique_rounds:
unique_top_scores.append(item)
unique_rounds.add(item["round"])
if len(unique_top_scores) >= sample:
break
return unique_top_scores
def select_round(self, items):
if not items:
raise ValueError("Item list is empty.")
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True)
scores = [item["score"] * 100 for item in sorted_items]
probabilities = self._compute_probabilities(scores)
logger.info(f"\nMixed probability distribution: {probabilities}")
logger.info(f"\nSorted rounds: {sorted_items}")
selected_index = np.random.choice(len(sorted_items), p=probabilities)
logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}")
return sorted_items[selected_index]
def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3):
scores = np.array(scores, dtype=np.float64)
n = len(scores)
if n == 0:
raise ValueError("Score list is empty.")
uniform_prob = np.full(n, 1.0 / n, dtype=np.float64)
max_score = np.max(scores)
shifted_scores = scores - max_score
exp_weights = np.exp(alpha * shifted_scores)
sum_exp_weights = np.sum(exp_weights)
if sum_exp_weights == 0:
raise ValueError("Sum of exponential weights is 0, cannot normalize.")
score_prob = exp_weights / sum_exp_weights
mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob
total_prob = np.sum(mixed_prob)
if not np.isclose(total_prob, 1.0):
mixed_prob = mixed_prob / total_prob
return mixed_prob
def load_log(self, cur_round, path=None, mode: str = "Graph"):
if mode == "Graph":
log_dir = os.path.join(self.root_path, "workflows", f"round_{cur_round}", "log.json")
else:
log_dir = path
# 检查文件是否存在
if not os.path.exists(log_dir):
return "" # 如果文件不存在,返回空字符串
logger.info(log_dir)
data = read_json_file(log_dir, encoding="utf-8")
if isinstance(data, dict):
data = [data]
elif not isinstance(data, list):
data = list(data)
if not data:
return ""
sample_size = min(3, len(data))
random_samples = random.sample(data, sample_size)
log = ""
for sample in random_samples:
log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n"
return log
def get_results_file_path(self, graph_path: str) -> str:
return os.path.join(graph_path, "results.json")
def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict:
now = datetime.datetime.now()
return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now}
def save_results(self, json_file_path: str, data: list):
write_json_file(json_file_path, data, encoding="utf-8", indent=4)
def _load_scores(self, path=None, mode="Graph"):
if mode == "Graph":
rounds_dir = os.path.join(self.root_path, "workflows")
else:
rounds_dir = path
result_file = os.path.join(rounds_dir, "results.json")
self.top_scores = []
data = read_json_file(result_file, encoding="utf-8")
df = pd.DataFrame(data)
scores_per_round = df.groupby("round")["score"].mean().to_dict()
for round_number, average_score in scores_per_round.items():
self.top_scores.append({"round": round_number, "score": average_score})
self.top_scores.sort(key=lambda x: x["score"], reverse=True)
return self.top_scores

View file

@ -0,0 +1,63 @@
from metagpt.ext.aflow.scripts.evaluator import Evaluator
class EvaluationUtils:
def __init__(self, root_path: str):
self.root_path = root_path
async def evaluate_initial_round(self, optimizer, graph_path, directory, validation_n, data):
# 使用 optimizer 的 graph_utils 来加载图
optimizer.graph = optimizer.graph_utils.load_graph(optimizer.round, graph_path)
evaluator = Evaluator(eval_path=directory)
for i in range(validation_n):
score, avg_cost, total_cost = await evaluator.graph_evaluate(
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory,
is_test=False,
)
new_data = optimizer.data_utils.create_result_data(optimizer.round, score, avg_cost, total_cost)
data.append(new_data)
result_path = optimizer.data_utils.get_results_file_path(graph_path)
optimizer.data_utils.save_results(result_path, data)
return data
async def evaluate_graph(self, optimizer, directory, validation_n, data, initial=False):
evaluator = Evaluator(eval_path=directory)
sum_score = 0
for i in range(validation_n):
score, avg_cost, total_cost = await evaluator.graph_evaluate(
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory,
is_test=False,
)
cur_round = optimizer.round + 1 if initial is False else optimizer.round
new_data = optimizer.data_utils.create_result_data(cur_round, score, avg_cost, total_cost)
data.append(new_data)
result_path = optimizer.data_utils.get_results_file_path(f"{optimizer.root_path}/workflows")
optimizer.data_utils.save_results(result_path, data)
sum_score += score
return sum_score / validation_n
async def evaluate_graph_test(self, optimizer, directory, is_test=True):
evaluator = Evaluator(eval_path=directory)
return await evaluator.graph_evaluate(
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory,
is_test=is_test,
)

View file

@ -0,0 +1,96 @@
import json
import os
from collections import defaultdict
from metagpt.logs import logger
from metagpt.utils.common import read_json_file, write_json_file
class ExperienceUtils:
def __init__(self, root_path: str):
self.root_path = root_path
def load_experience(self, path=None, mode: str = "Graph"):
if mode == "Graph":
rounds_dir = os.path.join(self.root_path, "workflows")
else:
rounds_dir = path
experience_data = defaultdict(lambda: {"score": None, "success": {}, "failure": {}})
for round_dir in os.listdir(rounds_dir):
if os.path.isdir(os.path.join(rounds_dir, round_dir)) and round_dir.startswith("round_"):
round_path = os.path.join(rounds_dir, round_dir)
try:
round_number = int(round_dir.split("_")[1])
json_file_path = os.path.join(round_path, "experience.json")
if os.path.exists(json_file_path):
data = read_json_file(json_file_path, encoding="utf-8")
father_node = data["father node"]
if experience_data[father_node]["score"] is None:
experience_data[father_node]["score"] = data["before"]
if data["succeed"]:
experience_data[father_node]["success"][round_number] = {
"modification": data["modification"],
"score": data["after"],
}
else:
experience_data[father_node]["failure"][round_number] = {
"modification": data["modification"],
"score": data["after"],
}
except Exception as e:
logger.info(f"Error processing {round_dir}: {str(e)}")
experience_data = dict(experience_data)
output_path = os.path.join(rounds_dir, "processed_experience.json")
with open(output_path, "w", encoding="utf-8") as outfile:
json.dump(experience_data, outfile, indent=4, ensure_ascii=False)
logger.info(f"Processed experience data saved to {output_path}")
return experience_data
def format_experience(self, processed_experience, sample_round):
experience_data = processed_experience.get(sample_round)
if experience_data:
experience = f"Original Score: {experience_data['score']}\n"
experience += "These are some conclusions drawn from experience:\n\n"
for key, value in experience_data["failure"].items():
experience += f"-Absolutely prohibit {value['modification']} (Score: {value['score']})\n"
for key, value in experience_data["success"].items():
experience += f"-Absolutely prohibit {value['modification']} \n"
experience += "\n\nNote: Take into account past failures and avoid repeating the same mistakes, as these failures indicate that these approaches are ineffective. You must fundamentally change your way of thinking, rather than simply using more advanced Python syntax like for, if, else, etc., or modifying the prompt."
else:
experience = f"No experience data found for round {sample_round}."
return experience
def check_modification(self, processed_experience, modification, sample_round):
experience_data = processed_experience.get(sample_round)
if experience_data:
for key, value in experience_data["failure"].items():
if value["modification"] == modification:
return False
for key, value in experience_data["success"].items():
if value["modification"] == modification:
return False
return True
else:
return True # 如果 experience_data 为空,也返回 True
def create_experience_data(self, sample, modification):
return {
"father node": sample["round"],
"modification": modification,
"before": sample["score"],
"after": None,
"succeed": None,
}
def update_experience(self, directory, experience, avg_score):
experience["after"] = avg_score
experience["succeed"] = bool(avg_score > experience["before"])
write_json_file(os.path.join(directory, "experience.json"), experience, encoding="utf-8", indent=4)

View file

@ -0,0 +1,125 @@
import json
import os
import re
import time
import traceback
from typing import List
from metagpt.ext.aflow.scripts.prompts.optimize_prompt import (
WORKFLOW_CUSTOM_USE,
WORKFLOW_INPUT,
WORKFLOW_OPTIMIZE_PROMPT,
WORKFLOW_TEMPLATE,
)
from metagpt.logs import logger
class GraphUtils:
def __init__(self, root_path: str):
self.root_path = root_path
def create_round_directory(self, graph_path: str, round_number: int) -> str:
directory = os.path.join(graph_path, f"round_{round_number}")
os.makedirs(directory, exist_ok=True)
return directory
def load_graph(self, round_number: int, workflows_path: str):
workflows_path = workflows_path.replace("\\", ".").replace("/", ".")
graph_module_name = f"{workflows_path}.round_{round_number}.graph"
try:
graph_module = __import__(graph_module_name, fromlist=[""])
graph_class = getattr(graph_module, "Workflow")
return graph_class
except ImportError as e:
logger.info(f"Error loading graph for round {round_number}: {e}")
raise
def read_graph_files(self, round_number: int, workflows_path: str):
prompt_file_path = os.path.join(workflows_path, f"round_{round_number}", "prompt.py")
graph_file_path = os.path.join(workflows_path, f"round_{round_number}", "graph.py")
try:
with open(prompt_file_path, "r", encoding="utf-8") as file:
prompt_content = file.read()
with open(graph_file_path, "r", encoding="utf-8") as file:
graph_content = file.read()
except FileNotFoundError as e:
logger.info(f"Error: File not found for round {round_number}: {e}")
raise
except Exception as e:
logger.info(f"Error loading prompt for round {round_number}: {e}")
raise
return prompt_content, graph_content
def extract_solve_graph(self, graph_load: str) -> List[str]:
pattern = r"class Workflow:.+"
return re.findall(pattern, graph_load, re.DOTALL)
def load_operators_description(self, operators: List[str]) -> str:
path = f"{self.root_path}/workflows/template/operator.json"
operators_description = ""
for id, operator in enumerate(operators):
operator_description = self._load_operator_description(id + 1, operator, path)
operators_description += f"{operator_description}\n"
return operators_description
def _load_operator_description(self, id: int, operator_name: str, file_path: str) -> str:
with open(file_path, "r") as f:
operator_data = json.load(f)
matched_data = operator_data[operator_name]
desc = matched_data["description"]
interface = matched_data["interface"]
return f"{id}. {operator_name}: {desc}, with interface {interface})."
def create_graph_optimize_prompt(
self,
experience: str,
score: float,
graph: str,
prompt: str,
operator_description: str,
type: str,
log_data: str,
) -> str:
graph_input = WORKFLOW_INPUT.format(
experience=experience,
score=score,
graph=graph,
prompt=prompt,
operator_description=operator_description,
type=type,
log=log_data,
)
graph_system = WORKFLOW_OPTIMIZE_PROMPT.format(type=type)
return graph_input + WORKFLOW_CUSTOM_USE + graph_system
async def get_graph_optimize_response(self, graph_optimize_node):
max_retries = 5
retries = 0
while retries < max_retries:
try:
response = graph_optimize_node.instruct_content.model_dump()
return response
except Exception as e:
retries += 1
logger.info(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
logger.info("Maximum retries reached. Skipping this sample.")
break
traceback.print_exc()
time.sleep(5)
return None
def write_graph_files(self, directory: str, response: dict, round_number: int, dataset: str):
graph = WORKFLOW_TEMPLATE.format(graph=response["graph"], round=round_number, dataset=dataset)
with open(os.path.join(directory, "graph.py"), "w", encoding="utf-8") as file:
file.write(graph)
with open(os.path.join(directory, "prompt.py"), "w", encoding="utf-8") as file:
file.write(response["prompt"])
with open(os.path.join(directory, "__init__.py"), "w", encoding="utf-8") as file:
file.write("")

View file

@ -0,0 +1,59 @@
WORKFLOW_OPTIMIZE_PROMPT = """You are building a Graph and corresponding Prompt to jointly solve {type} problems.
Referring to the given graph and prompt, which forms a basic example of a {type} solution approach,
please reconstruct and optimize them. You can add, modify, or delete nodes, parameters, or prompts. Include your
single modification in XML tags in your reply. Ensure they are complete and correct to avoid runtime failures. When
optimizing, you can incorporate critical thinking methods like review, revise, ensemble (generating multiple answers through different/similar prompts, then voting/integrating/checking the majority to obtain a final answer), selfAsk, etc. Consider
Python's loops (for, while, list comprehensions), conditional statements (if-elif-else, ternary operators),
or machine learning techniques (e.g., linear regression, decision trees, neural networks, clustering). The graph
complexity should not exceed 10. Use logical and control flow (IF-ELSE, loops) for a more enhanced graphical
representation.Ensure that all the prompts required by the current graph from prompt_custom are included.Exclude any other prompts.
Output the modified graph and all the necessary Prompts in prompt_custom (if needed).
The prompt you need to generate is only the one used in `prompt_custom.XXX` within Custom. Other methods already have built-in prompts and are prohibited from being generated. Only generate those needed for use in `prompt_custom`; please remove any unused prompts in prompt_custom.
the generated prompt must not contain any placeholders.
Considering information loss, complex graphs may yield better results, but insufficient information transmission can omit the solution. It's crucial to include necessary context during the process."""
WORKFLOW_INPUT = """
Here is a graph and the corresponding prompt (prompt only related to the custom method) that performed excellently in a previous iteration (maximum score is 1). You must make further optimizations and improvements based on this graph. The modified graph must differ from the provided example, and the specific differences should be noted within the <modification>xxx</modification> section.\n
<sample>
<experience>{experience}</experience>
<modification>(such as:add a review step/delete a operator/modify a prompt)</modification>
<score>{score}</score>
<graph>{graph}</graph>
<prompt>{prompt}</prompt>(only prompt_custom)
<operator_description>{operator_description}</operator_description>
</sample>
Below are the logs of some results with the aforementioned Graph that performed well but encountered errors, which can be used as references for optimization:
{log}
First, provide optimization ideas. **Only one detail point can be modified at a time**, and no more than 5 lines of code may be changed per modificationextensive modifications are strictly prohibited to maintain project focus!
When introducing new functionalities in the graph, please make sure to import the necessary libraries or modules yourself, except for operator, prompt_custom, create_llm_instance, and CostManage, which have already been automatically imported.
**Under no circumstances should Graph output None for any field.**
Use custom methods to restrict your output format, rather than using code (outside of the code, the system will extract answers based on certain rules and score them).
It is very important to format the Graph output answers, you can refer to the standard answer format in the log.
"""
WORKFLOW_CUSTOM_USE = """\nHere's an example of using the `custom` method in graph:
```
# You can write your own prompt in <prompt>prompt_custom</prompt> and then use it in the Custom method in the graph
response = await self.custom(input=problem, instruction=prompt_custom.XXX_PROMPT)
# You can also concatenate previously generated string results in the input to provide more comprehensive contextual information.
# response = await self.custom(input=problem+f"xxx:{xxx}, xxx:{xxx}", instruction=prompt_custom.XXX_PROMPT)
# The output from the Custom method can be placed anywhere you need it, as shown in the example below
solution = await self.generate(problem=f"question:{problem}, xxx:{response['response']}")
```
Note: In custom, the input and instruction are directly concatenated(instruction+input), and placeholders are not supported. Please ensure to add comments and handle the concatenation externally.\n
**Introducing multiple operators at appropriate points can enhance performance. If you find that some provided operators are not yet used in the graph, try incorporating them.**
"""
WORKFLOW_TEMPLATE = """from typing import Literal
import metagpt.ext.aflow.scripts.optimized.{dataset}.workflows.template.operator as operator
import metagpt.ext.aflow.scripts.optimized.{dataset}.workflows.round_{round}.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
{graph}
"""

View file

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# @Date : 6/26/2024 17:07 PM
# @Author : didi
# @Desc : prompts of operators
ANSWER_GENERATION_PROMPT = """
Think step by step and solve the problem.
1. In the "thought" field, explain your thinking process in detail.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Your task: {input}
"""
FORMAT_PROMPT = """
For the question described as {problem_description},
please extract a short and concise answer contains only one word/few words from the following solution: {solution}.
Make sure there are no additional comments or explanations in your response.
"""
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
PYTHON_CODE_VERIFIER_PROMPT = """
You are a professional Python programmer. Your task is to write complete, self-contained code based on a given mathematical problem and output the answer. The code should include all necessary imports and dependencies, and be ready to run without additional setup or environment configuration.
Problem description: {problem}
Other analysis: {analysis}
{feedback}
Your code should:
1. Implement the calculation steps described in the problem.
2. Define a function named `solve` that performs the calculation and returns the result. The `solve` function should not require any input parameters; instead, it should obtain all necessary inputs from within the function or from globally defined variables.
3. `solve` function return the final calculation result.
Please ensure your code is efficient, well-commented, and follows Python best practices. The output should be limited to basic data types such as strings, integers, and floats. It is prohibited to transmit images or other file formats. The code output is intended for a text-based language model.
"""
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
### problem
{problem}
### Code Solution
{solution}
### Execution Result
{exec_pass}
#### Failed Test Case
{test_fail}
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
"""
MD_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the solution that is more capable of solving the problem compared to other solutions, as this is crucial for problem-solving.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
REVIEW_PROMPT = """
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
problem: {problem}
solution: {solution}
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
"""
REVISE_PROMPT = """
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final code solution is wrapped with ```python```.
problem: {problem}
solution: {solution}
feedback: {feedback}
Ensure the output code is self-contained, and without any additional text or test cases.
"""

View file

@ -0,0 +1,125 @@
"""
@Time : 2024/7/24 16:37
@Author : didi
@File : utils.py
"""
import json
import re
from enum import Enum
from typing import Any, List, Tuple
class CodeDataset(Enum):
HUMAN_EVAL = "HumanEval"
MBPP = "MBPP"
def extract_test_cases_from_jsonl(entry_point: str, dataset: CodeDataset = CodeDataset.HUMAN_EVAL):
if dataset == CodeDataset.HUMAN_EVAL:
file_path = "metagpt/ext/aflow/data/humaneval_public_test.jsonl"
# Retain the original hardcoded test cases
hardcoded_cases = {
"find_zero": "",
"decode_cyclic": "",
"decode_shift": "",
"by_length": "",
"add": "",
"triangle_area": "",
"correct_bracketing": "",
"solve": "",
"sum_squares": "",
"starts_one_ends": "",
}
elif dataset == CodeDataset.MBPP:
file_path = "metagpt/ext/aflow/data/mbpp_public_test.jsonl"
hardcoded_cases = {
"remove_odd": "",
"replace_spaces": "",
"snake_to_camel": "",
"Split": "",
"swap_List": "",
"square_Sum": "",
"sort_sublists": "",
"unique_sublists": "",
}
# Check if there are hardcoded test cases
if entry_point in hardcoded_cases:
return hardcoded_cases[entry_point]
# If there are no hardcoded test cases, read from the file
with open(file_path, "r") as file:
for line in file:
data = json.loads(line)
if data.get("entry_point") == entry_point:
return data.get("test")
return None
def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]:
# Use regular expressions to match test cases, now capturing function names and any output
pattern = r">>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)"
matches = re.findall(pattern, docstring, re.DOTALL)
test_cases = []
for match in matches:
func_name, input_str, expected_output = match
# Process input
input_list = []
for item in input_str.split(","):
item = item.strip()
try:
# Try to convert input to numeric type
if "." in item:
input_list.append(float(item))
else:
input_list.append(int(item))
except ValueError:
# If unable to convert to numeric, keep as string
input_list.append(item.strip("'\""))
# Process output
try:
# Try to convert output to numeric or boolean value
if expected_output.lower() == "true":
expected_output = True
elif expected_output.lower() == "false":
expected_output = False
elif "." in expected_output:
expected_output = float(expected_output)
else:
expected_output = int(expected_output)
except ValueError:
# If unable to convert, keep as string
expected_output = expected_output.strip("'\"")
test_cases.append([func_name, input_list, expected_output])
return test_cases
def test_cases_2_test_functions(solution: str, test_cases: str):
tester_function = f"""
{solution}
{test_cases}
"""
return tester_function
def test_case_2_test_function(solution: str, test_case: str, entry_point: str):
tester_function = f"""
{solution}
def check(candidate):
{test_case}
def test_check():
check({entry_point})
test_check()
"""
return tester_function

View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from metagpt.ext.aflow.scripts.evaluator import DatasetType
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
raise NotImplementedError("This method should be implemented by the subclass")

View file

@ -581,6 +581,30 @@ def write_json_file(json_file: str, data: list, encoding: str = None, indent: in
json.dump(data, fout, ensure_ascii=False, indent=indent, default=to_jsonable_python)
def read_jsonl_file(jsonl_file: str, encoding="utf-8") -> list[dict]:
if not Path(jsonl_file).exists():
raise FileNotFoundError(f"json_file: {jsonl_file} not exist, return []")
datas = []
with open(jsonl_file, "r", encoding=encoding) as fin:
try:
for line in fin:
data = json.loads(line)
datas.append(data)
except Exception:
raise ValueError(f"read jsonl file: {jsonl_file} failed")
return datas
def add_jsonl_file(jsonl_file: str, data: list[dict], encoding: str = None):
folder_path = Path(jsonl_file).parent
if not folder_path.exists():
folder_path.mkdir(parents=True, exist_ok=True)
with open(jsonl_file, "a", encoding=encoding) as fout:
for json_item in data:
fout.write(json.dumps(json_item) + "\n")
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
"""
Reads in a csv file to a list of list. If header is True, it returns a

183
metagpt/utils/sanitize.py Normal file
View file

@ -0,0 +1,183 @@
"""
@Time : 2024/7/24 16:37
@Author : didi
@File : utils.py
@Acknowledgement https://github.com/evalplus/evalplus/blob/master/evalplus/sanitize.py
"""
import ast
import traceback
from enum import Enum
from typing import Dict, Generator, List, Optional, Set, Tuple
import tree_sitter_python
from tree_sitter import Language, Node, Parser
class NodeType(Enum):
CLASS = "class_definition"
FUNCTION = "function_definition"
IMPORT = ["import_statement", "import_from_statement"]
IDENTIFIER = "identifier"
ATTRIBUTE = "attribute"
RETURN = "return_statement"
EXPRESSION = "expression_statement"
ASSIGNMENT = "assignment"
def traverse_tree(node: Node) -> Generator[Node, None, None]:
"""
Traverse the tree structure starting from the given node.
:param node: The root node to start the traversal from.
:return: A generator object that yields nodes in the tree.
"""
cursor = node.walk()
depth = 0
visited_children = False
while True:
if not visited_children:
yield cursor.node
if not cursor.goto_first_child():
depth += 1
visited_children = True
elif cursor.goto_next_sibling():
visited_children = False
elif not cursor.goto_parent() or depth == 0:
break
else:
depth -= 1
def syntax_check(code, verbose=False):
try:
ast.parse(code)
return True
except (SyntaxError, MemoryError):
if verbose:
traceback.print_exc()
return False
def code_extract(text: str) -> str:
lines = text.split("\n")
longest_line_pair = (0, 0)
longest_so_far = 0
for i in range(len(lines)):
for j in range(i + 1, len(lines)):
current_lines = "\n".join(lines[i : j + 1])
if syntax_check(current_lines):
current_length = sum(1 for line in lines[i : j + 1] if line.strip())
if current_length > longest_so_far:
longest_so_far = current_length
longest_line_pair = (i, j)
return "\n".join(lines[longest_line_pair[0] : longest_line_pair[1] + 1])
def get_definition_name(node: Node) -> str:
for child in node.children:
if child.type == NodeType.IDENTIFIER.value:
return child.text.decode("utf8")
def has_return_statement(node: Node) -> bool:
traverse_nodes = traverse_tree(node)
for node in traverse_nodes:
if node.type == NodeType.RETURN.value:
return True
return False
def get_deps(nodes: List[Tuple[str, Node]]) -> Dict[str, Set[str]]:
def dfs_get_deps(node: Node, deps: Set[str]) -> None:
for child in node.children:
if child.type == NodeType.IDENTIFIER.value:
deps.add(child.text.decode("utf8"))
else:
dfs_get_deps(child, deps)
name2deps = {}
for name, node in nodes:
deps = set()
dfs_get_deps(node, deps)
name2deps[name] = deps
return name2deps
def get_function_dependency(entrypoint: str, call_graph: Dict[str, str]) -> Set[str]:
queue = [entrypoint]
visited = {entrypoint}
while queue:
current = queue.pop(0)
if current not in call_graph:
continue
for neighbour in call_graph[current]:
if neighbour not in visited:
visited.add(neighbour)
queue.append(neighbour)
return visited
def sanitize(code: str, entrypoint: Optional[str] = None) -> str:
"""
Sanitize and extract relevant parts of the given Python code.
This function parses the input code, extracts import statements, class and function definitions,
and variable assignments. If an entrypoint is provided, it only includes definitions that are
reachable from the entrypoint in the call graph.
:param code: The input Python code as a string.
:param entrypoint: Optional name of a function to use as the entrypoint for dependency analysis.
:return: A sanitized version of the input code, containing only relevant parts.
"""
code = code_extract(code)
code_bytes = bytes(code, "utf8")
parser = Parser(Language(tree_sitter_python.language()))
tree = parser.parse(code_bytes)
class_names = set()
function_names = set()
variable_names = set()
root_node = tree.root_node
import_nodes = []
definition_nodes = []
for child in root_node.children:
if child.type in NodeType.IMPORT.value:
import_nodes.append(child)
elif child.type == NodeType.CLASS.value:
name = get_definition_name(child)
if not (name in class_names or name in variable_names or name in function_names):
definition_nodes.append((name, child))
class_names.add(name)
elif child.type == NodeType.FUNCTION.value:
name = get_definition_name(child)
if not (name in function_names or name in variable_names or name in class_names) and has_return_statement(
child
):
definition_nodes.append((name, child))
function_names.add(get_definition_name(child))
elif child.type == NodeType.EXPRESSION.value and child.children[0].type == NodeType.ASSIGNMENT.value:
subchild = child.children[0]
name = get_definition_name(subchild)
if not (name in variable_names or name in function_names or name in class_names):
definition_nodes.append((name, subchild))
variable_names.add(name)
if entrypoint:
name2deps = get_deps(definition_nodes)
reacheable = get_function_dependency(entrypoint, name2deps)
sanitized_output = b""
for node in import_nodes:
sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n"
for pair in definition_nodes:
name, node = pair
if entrypoint and name not in reacheable:
continue
sanitized_output += code_bytes[node.start_byte : node.end_byte] + b"\n"
return sanitized_output[:-1].decode("utf8")