From 74dc79a3ddd00f3440e49b634abe9791f85d4207 Mon Sep 17 00:00:00 2001 From: stellahsr Date: Tue, 12 Sep 2023 22:14:30 +0800 Subject: [PATCH] update code --- README.md | 24 +- metagpt/prompts/sd_design.py | 101 +++++++++ metagpt/roles/ui_designer.py | 205 ++++++++++++++++++ metagpt/utils/resp_parse.py | 76 +++++++ tests/metagpt/actions/test_sd_design.py | 54 +++++ .../metagpt/utils/test_flatten_json_object.py | 39 ++++ .../utils/test_flatten_json_structure_json.py | 84 +++++++ tests/metagpt/utils/test_try_parse_json.py | 58 +++++ 8 files changed, 629 insertions(+), 12 deletions(-) create mode 100644 metagpt/prompts/sd_design.py create mode 100644 metagpt/roles/ui_designer.py create mode 100644 metagpt/utils/resp_parse.py create mode 100644 tests/metagpt/actions/test_sd_design.py create mode 100644 tests/metagpt/utils/test_flatten_json_object.py create mode 100644 tests/metagpt/utils/test_flatten_json_structure_json.py create mode 100644 tests/metagpt/utils/test_try_parse_json.py diff --git a/README.md b/README.md index b4a272ef0..62f623df8 100644 --- a/README.md +++ b/README.md @@ -66,20 +66,20 @@ # Step 3: Clone the repository to your local machine, and install it. **Note:** - If already have Chrome, Chromium, or MS Edge installed, you can skip downloading Chromium by setting the environment variable - `PUPPETEER_SKIP_CHROMIUM_DOWNLOAD` to `true`. +`PUPPETEER_SKIP_CHROMIUM_DOWNLOAD` to `true`. - Some people are [having issues](https://github.com/mermaidjs/mermaid.cli/issues/15) installing this tool globally. Installing it locally is an alternative solution, - ```bash - npm install @mermaid-js/mermaid-cli - ``` + ```bash + npm install @mermaid-js/mermaid-cli + ``` - don't forget to the configuration for mmdc in config.yml - ```yml - PUPPETEER_CONFIG: "./config/puppeteer-config.json" - MMDC: "./node_modules/.bin/mmdc" - ``` + ```yml + PUPPETEER_CONFIG: "./config/puppeteer-config.json" + MMDC: "./node_modules/.bin/mmdc" + ``` - if `python setup.py install` fails with error `[Errno 13] Permission denied: '/usr/local/lib/python3.11/dist-packages/test-easy-install-13129.write-test'`, try instead running `python setup.py install --user` @@ -149,12 +149,12 @@ # Run the script # Do not hire an engineer to implement the project python startup.py "Write a cli snake game" --implement False # Hire an engineer and perform code reviews -python startup.py "Write a cli snake game" --code_review True +python startup.py "Write a cli snake game" --code_review True ``` After running the script, you can find your new project in the `workspace/` directory. -### Preference of Platform or Tool +### Preference of Platform or Tool You can tell which platform or tool you want to use when stating your requirements. @@ -211,7 +211,7 @@ ### Code walkthrough ## QuickStart -It is difficult to install and configure the local environment for some users. The following tutorials will allow you to quickly experience the charm of MetaGPT. +It is difficult to install and configure the local environment for some users. The following tutorials will allow you to quickly experience the charm of MetaGPT. - [MetaGPT quickstart](https://deepwisdom.feishu.cn/wiki/CyY9wdJc4iNqArku3Lncl4v8n2b) @@ -224,7 +224,7 @@ ## Citation ```bibtex @misc{hong2023metagpt, - title={MetaGPT: Meta Programming for Multi-Agent Collaborative Framework}, + title={MetaGPT: Meta Programming for Multi-Agent Collaborative Framework}, author={Sirui Hong and Xiawu Zheng and Jonathan Chen and Yuheng Cheng and Jinlin Wang and Ceyao Zhang and Zili Wang and Steven Ka Shing Yau and Zijuan Lin and Liyang Zhou and Chenyu Ran and Lingfeng Xiao and Chenglin Wu}, year={2023}, eprint={2308.00352}, diff --git a/metagpt/prompts/sd_design.py b/metagpt/prompts/sd_design.py new file mode 100644 index 000000000..8719f1a5a --- /dev/null +++ b/metagpt/prompts/sd_design.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@Time : 2023/8/18 09:51 +@Author : stellahong +@File : __init__.py +""" + +MODEL_SELECTION_PROMPT = """Please help me find a suitable model for painting in this scene. +Model list will be given in the format like: +''' +model_name: model desc, +''' + +you should select the model and tell me the model name. answer it in the form like Model: model_name || Domain:xxx + +### +Model List: +{model_info} + +My scene is: {query} +""" + +DOMAIN_JUDGEMENT_TEMPLATE = ''' +use model {model_name}, decide the domain, answer it in the form like Domain: xxx + +### +Model Information: +{model_info} + +''' + +MODEL_SELECTION_OUTPUT_MAPPING = { + "Model:": (str, ...), } + +SD_PROMPT_KW_OPTIMIZE_TEMPLATE = ''' +I want you to act as a prompt generator. Compose each answer as a visual sentence. Do not write explanations on replies. Format the answers as javascript json arrays with a single string per answer. Return exactly {answer_count} to my question. Answer the questions exactly, in the form like responses:xxx. Answer the following question: + +Find 3 keywords related to the prompt "{messages}" that are not found in the prompt. The keywords should be related to each other. Each keyword is a single word. + +''' + +SD_PROMPT_IMPROVE_OPTIMIZE_TEMPLATE = ''' +I want you to act as a prompt generator. Compose each answer as a visual sentence. Do not write explanations on replies. Format the answers as javascript json arrays with a single string per answer. Return exactly {answer_count} to my question. Answer the questions exactly, in the form like responses:xxx. Answer the following question: + +domain is {domain} + +if domain is anime or game like, Take the prompt "{messages}, Cute kawaii sticker , white background, vector, pastel colors" and improve it. + +if domain is realistic like, Take the prompt "{messages}" and improve it. + +''' +# Die-cut sticker, illustration minimalism, + +FORMAT_INSTRUCTIONS = """The problem is to make the user input a better text2image prompt, the input is {query}" + + Let's first understand the problem and devise a plan to solve the problem. + + Based on the text2image model selected {model_name} and domain {domain} + You have access to the following tools: + + {tool_names} + {tool_description} + + Use a json blob to specify a tool by providing an action key (tool name) and an Observation (tool description). + + Valid "action" values: {tool_names} + + Provide only ONE action per $JSON_BLOB, as shown: + + ``` + {{{{ + "action": $TOOL_NAME, + "Observation": $TOOL_DESCRIPTION + }}}} + ``` + + Follow this format: + + ## Think Chain + ``` + Question: input question to answer + Thought: select a better method for the input by go through these two tools and its observations respectively + Action1: + ``` + $JSON_BLOB + ``` + Action2: + ``` + $JSON_BLOB + ``` + + Thought:When evaluating a prompt's richness, I need to specify which tool to use and I can only select one tool . To finish this selection, in the form: + ## Final Action: + TOOL_NAME + + """ + +PROMPT_OUTPUT_MAPPING = { + "Final Action:": (str, ...), +} diff --git a/metagpt/roles/ui_designer.py b/metagpt/roles/ui_designer.py new file mode 100644 index 000000000..05b906c16 --- /dev/null +++ b/metagpt/roles/ui_designer.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +# @Date : 2023/8/16 13:58 +# @Author : stellahong (stellahong@fuzhi.ai) +# @Desc : +from functools import wraps +import json5 + +from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.schema import Message + +from metagpt.actions.design import Tool, SDPromptExtend, SDPromptOptimize, SDPromptImprove +from metagpt.actions.ui_design import ModelSelection, SDGeneration + + +def retrieve(func): + @wraps(func) + def wrapper(*args, **kwargs): + content, keyword = func(*args, **kwargs) + info = content.replace(keyword, "") + return info + + return wrapper + + +class Designer(Role): + """Class representing the UI designer Role.""" + + def __init__( + self, + name="Catherine", + profile="UI Design", + goal="Generate UI icon", + constraints="Give clear icon description and generate images to finish the design", + actions=[ModelSelection, SDPromptExtend, SDGeneration]): + super().__init__(name, profile, goal, constraints) + + self._init_actions(actions) + + @property + def memory_model_name(self): + return "MODEL_NAME: " + + @property + def memory_user_input(self): + return "User Input: " + + @property + def memory_domain(self): + return "Domain: " + + def memory_property(self, memory_keyword: str, memory_content: str): + self._rc.memory.add(Message(f"{memory_keyword}{memory_content}", role=self.profile)) + + @retrieve + def get_important_memory(self, keyword: str): + query_memory = self._rc.memory.get_by_content(keyword)[0] + return query_memory.content, keyword + + async def _plan_and_select(self): + """ + 这里实现的是二选一的option,action在这里进行了选择 + 理论上应该可以实现4种选择 (&:表示串行顺序),目前只选择了前2种 + 1) action1 + 2) action2 + 3) action1 & action2 + 4) action2 & action1 + """ + msg = self._rc.memory.get(k=1)[0] + query = msg.content + logger.info(query) + if query == "PromptImprove": + self._actions.insert(self._rc.state + 1, SDPromptImprove()) + elif query == "PromptOptimize": + self._actions.insert(self._rc.state + 1, SDPromptOptimize()) + return self._rc.state + + async def _think(self) -> None: + logger.info(self._rc.state) + if self._rc.todo is None: + self._set_state(0) + return + + if self._rc.state == 1: + await self._plan_and_select() + self._set_state(self._rc.state + 1) + + elif self._rc.state + 1 < len(self._actions): + self._set_state(self._rc.state + 1) + else: + self._rc.todo = None + + async def handle_model_selection(self, query, **kwargs): + ms = ModelSelection() + model_name, domain = await ms.run(query) + logger.info(f"{model_name}, {domain}") + + self.memory_property(self.memory_user_input, query) + self.memory_property(self.memory_model_name, model_name) + self.memory_property(self.memory_domain, domain) + return f"{model_name}||{domain}" + + async def handle_sd_prompt_extend(self, *args, **kwargs): + tools = [ + Tool(name="PromptOptimize", + func=SDPromptOptimize().run, + description="Find 3 keywords related to the prompt that are not found in the prompt. The keywords should be related to each other. Each keyword is a single word. useful for when you need to add extra keywords for input prompt, specially for long enough input"), + + Tool(name="PromptImprove", + func=SDPromptImprove().run, + description="Take the prompt and improve it. useful for when you need to add improve and extend the prompt for input prompt, specially for short input"), + + ] + + query = self.get_important_memory(self.memory_user_input) + domain = self.get_important_memory(self.memory_domain) + sd_exd = SDPromptExtend(tools=tools) + resp = await sd_exd.run(query=query, domain=domain, answer_count=1) + return resp + + async def handle_sd_prompt_improve(self, *args, **kwargs): + query = self.get_important_memory(self.memory_user_input) + domain = self.get_important_memory(self.memory_domain) + sd_pi = SDPromptImprove() + resp = await sd_pi.run(query=query, domain=domain, answer_count=1) + return resp + + async def handle_sd_prompt_optimize(self, *args, **kwargs): + query = self.get_important_memory(self.memory_user_input) + domain = self.get_important_memory(self.memory_domain) + sd_op = SDPromptOptimize() + resp = await sd_op.run(query=query, domain=domain, answer_count=1) + return resp + + async def handle_sd_generation(self, *args, **kwargs): + msg = self._rc.memory.get_by_action(SDPromptImprove)[0] + image_name = self.get_important_memory(self.memory_user_input) + logger.info(type(msg.content)) + logger.info(msg.content) + resp = json5.loads(msg.content) + logger.info(resp) + model_name = self.get_important_memory(self.memory_model_name) + await SDGeneration().run(query=resp, model_name=model_name, **{"image_name":image_name}) + return resp + + async def _act(self) -> Message: + logger.info(f"{self._setting}: ready to {self._rc.todo}") + todo = self._rc.todo + msg = self._rc.memory.get(k=1)[0] + query = msg.content + logger.info(msg.cause_by) + logger.info(query) + logger.info(todo) + + handler_map = { + ModelSelection: self.handle_model_selection, + SDPromptExtend: self.handle_sd_prompt_extend, + + SDPromptImprove: self.handle_sd_prompt_improve, + SDPromptOptimize: self.handle_sd_prompt_optimize, + + SDGeneration: self.handle_sd_generation, + } + + handler = handler_map.get(type(todo)) + if handler: + resp = await handler(query) + if type(todo) in [SDPromptImprove, SDPromptOptimize]: + ret = Message(f"{resp}", role=self.profile, cause_by=SDPromptImprove) + else: + ret = Message(f"{resp}", role=self.profile, cause_by=type(todo)) + self._rc.memory.add(ret) + return ret + + raise ValueError(f"Unknown todo type: {type(todo)}") + + async def _react(self) -> Message: + while True: + await self._think() + if self._rc.todo is None: + break + + msg = await self._act() + return msg + + +if __name__ == "__main__": + import asyncio + import platform + test_queries = ["Flappy Bird", + "Clash of Clans", + "Subway Surfers", + "Pokémon Go", + "Super Mario", + "Tetris", + "Call of Duty" + ] + + for prompt in test_queries: + + designer = Designer() + if platform.system() == "Windows": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + asyncio.run(designer.run(prompt)) + \ No newline at end of file diff --git a/metagpt/utils/resp_parse.py b/metagpt/utils/resp_parse.py new file mode 100644 index 000000000..ec7293bcd --- /dev/null +++ b/metagpt/utils/resp_parse.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# @Date : 2023/8/22 22:18 +# @Author : stellahong (stellahong@fuzhi.ai) +# @Desc : +import json5 +import re + + +def flatten_json_structure(json_array): + if (isinstance(json_array, list) and len(json_array) == 1 and not isinstance(json_array[0], str)): + return flatten_json_structure(json_array[0]) + + if (isinstance(json_array, dict) and len(json_array.values()) == 1 and not isinstance(list(json_array.values())[0], + str)): + return flatten_json_structure(list(json_array.values())[0]) + + flattened_json_array = [] + + if (isinstance(json_array, dict)): + json_array = json_array.values() + + for json_object in json_array: + flattened_dict = flatten_json_object(json_object) + flattened_values = ", ".join(str(v) for v in flattened_dict.values()) + flattened_json_array.append(flattened_values) + + return flattened_json_array + + +def flatten_json_object(obj, parent_key='', sep=', '): + if isinstance(obj, str): + return dict([("value", obj)]) + + if isinstance(obj, list): + return dict([("value", sep.join(str(v) for v in obj))]) + + items = [] + for key, value in obj.items(): + new_key = f"{parent_key}{sep}{key}" if parent_key else key + if isinstance(value, dict): + items.extend(flatten_json_object(value, new_key, sep=sep).items()) + elif isinstance(value, list): + items.append((new_key, sep.join(str(v) for v in value))) + else: + items.append((new_key, value)) + return dict(items) + + +def try_parse_json(input_text): + input_text.index + start_index_brackets = input_text.find('[') + end_index_brackets = input_text.rfind(']') + start_index_curly = input_text.find('{') + end_index_curly = input_text.rfind('}') + + start_index = start_index_brackets + end_index = end_index_brackets + + if (start_index_curly != -1 and (start_index_curly < start_index_brackets or start_index_brackets < 0)): + start_index = start_index_curly + end_index = end_index_curly + + if start_index >= 0 and end_index > 0: + json_string = input_text[start_index:end_index + 1] + json_string = re.sub(r'\}[\s]*\{', '}, {', json_string) + json_string = re.sub(r'\][\s]*\[', '], [', json_string) + json_string = re.sub(r'\"[\s]*\"', '", "', json_string) + + try: + json_object = json5.loads(json_string) + except ValueError: + json_object = json5.loads(f"[{json_string}]") + + return json_object + + raise Exception("No JSON object found in input text.") diff --git a/tests/metagpt/actions/test_sd_design.py b/tests/metagpt/actions/test_sd_design.py new file mode 100644 index 000000000..0e321b342 --- /dev/null +++ b/tests/metagpt/actions/test_sd_design.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# @Date : 2023/7/22 02:40 +# @Author : stellahong (stellahong@fuzhi.ai) +# + +import pytest +from typing import List +from metagpt.actions.design import SDPromptOptimize, SDPromptImprove +from metagpt.actions.ui_design import ModelSelection + + +@pytest.mark.asyncio +async def test_ui_model_selection(): + ms = ModelSelection() + model_name, domain = await ms.run("Pokémon Go") + assert model_name == "pixelmix_v10" + + +@pytest.mark.asyncio +async def test_ui_sd_generation(): + pass + + +@pytest.mark.asyncio +async def test_ui_sd_prompt_optimize(): + sd_po = SDPromptOptimize() + resp = await sd_po.run(query="Pokémon Go", domain="Anime", answer_count=1) + assert type(resp) == List + assert len(resp) == 1 + + +@pytest.mark.asyncio +async def test_ui_sd_optimize_answer_count(): + sd_po = SDPromptOptimize() + answer_count = 2 + resp = await sd_po.run(query="Pokémon Go", domain="Anime", answer_count=2) + assert type(resp) == List + assert len(resp) == answer_count + +@pytest.mark.asyncio +async def test_ui_sd_improve_answer_count(): + sd_pi = SDPromptImprove() + answer_count = 2 + resp = await sd_pi.run(query="Pokémon Go", domain="Anime", answer_count=2) + assert type(resp) == List + assert len(resp) == answer_count + + +@pytest.mark.asyncio +async def test_ui_sd_prompt_improve(): + sd_pi = SDPromptImprove() + resp = await sd_pi.run(query="Pokémon Go", domain="Anime", answer_count=1) + assert type(resp) == List + assert len(resp) == 1 diff --git a/tests/metagpt/utils/test_flatten_json_object.py b/tests/metagpt/utils/test_flatten_json_object.py new file mode 100644 index 000000000..c4324f694 --- /dev/null +++ b/tests/metagpt/utils/test_flatten_json_object.py @@ -0,0 +1,39 @@ +import unittest +import json5 + +def flatten_json_object(obj, parent_key='', sep=', '): + if isinstance(obj, str): + return dict([("value", obj)]) + + if isinstance(obj, list): + return dict([("value", sep.join(str(v) for v in obj))]) + + items = [] + for key, value in obj.items(): + new_key = f"{parent_key}{sep}{key}" if parent_key else key + if isinstance(value, dict): + items.extend(flatten_json_object(value, new_key, sep=sep).items()) + elif isinstance(value, list): + items.append((new_key, sep.join(str(v) for v in value))) + else: + items.append((new_key, value)) + return dict(items) + +class TestFlattenJsonObject(unittest.TestCase): + def test_flatten_json_object(self): + json_obj = json5.loads('{"a": 1, "b": {"c": 2, "d": {"e": 3, "f": 4}}, "g": [5, 6, 7]}') + expected_result = {'a': 1, 'b, c': 2, 'b, d, e': 3, 'b, d, f': 4, 'g': '5, 6, 7'} + self.assertEqual(flatten_json_object(json_obj), expected_result) + + def test_flatten_json_object_with_string(self): + json_obj = json5.loads('{"a": "hello"}') + expected_result = {'a': 'hello'} + self.assertEqual(flatten_json_object(json_obj), expected_result) + + def test_flatten_json_object_with_list(self): + json_obj = json5.loads('{"a": [1, 2, 3]}') + expected_result = {'a': '1, 2, 3'} + self.assertEqual(flatten_json_object(json_obj), expected_result) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/metagpt/utils/test_flatten_json_structure_json.py b/tests/metagpt/utils/test_flatten_json_structure_json.py new file mode 100644 index 000000000..ba260984b --- /dev/null +++ b/tests/metagpt/utils/test_flatten_json_structure_json.py @@ -0,0 +1,84 @@ +import unittest +import json5 +import re + + +def flatten_json_object(obj, parent_key='', sep=', '): + if isinstance(obj, str): + return dict([("value", obj)]) + + if isinstance(obj, list): + return dict([("value", sep.join(str(v) for v in obj))]) + + items = [] + for key, value in obj.items(): + new_key = f"{parent_key}{sep}{key}" if parent_key else key + if isinstance(value, dict): + items.extend(flatten_json_object(value, new_key, sep=sep).items()) + elif isinstance(value, list): + items.append((new_key, sep.join(str(v) for v in value))) + else: + items.append((new_key, value)) + return dict(items) + +def flatten_json_structure(json_array): + if (isinstance(json_array, list) and len(json_array) == 1 and not isinstance(json_array[0], str)): + return flatten_json_structure(json_array[0]) + + if (isinstance(json_array, dict) and len(json_array.values()) == 1 and not isinstance(list(json_array.values())[0], + str)): + return flatten_json_structure(list(json_array.values())[0]) + + flattened_json_array = [] + + if (isinstance(json_array, dict)): + json_array = json_array.values() + + for json_object in json_array: + flattened_dict = flatten_json_object(json_object) + flattened_values = ", ".join(str(v) for v in flattened_dict.values()) + flattened_json_array.append(flattened_values) + + return flattened_json_array + +class TestFlattenJson(unittest.TestCase): + def test_flatten_json_structure(self): + input_json = [ + { + "name": "John", + "age": 30, + "city": "New York" + }, + { + "name": "Jane", + "age": 25, + "city": "Chicago" + } + ] + expected_output = ["John, 30, New York", "Jane, 25, Chicago"] + self.assertEqual(flatten_json_structure(input_json), expected_output) + + def test_flatten_json_structure_with_nested_json(self): + input_json = [ + { + "name": "John", + "age": 30, + "address": { + "city": "New York", + "state": "NY" + } + }, + { + "name": "Jane", + "age": 25, + "address": { + "city": "Chicago", + "state": "IL" + } + } + ] + expected_output = ["John, 30, New York, NY", "Jane, 25, Chicago, IL"] + self.assertEqual(flatten_json_structure(input_json), expected_output) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/metagpt/utils/test_try_parse_json.py b/tests/metagpt/utils/test_try_parse_json.py new file mode 100644 index 000000000..b3b205557 --- /dev/null +++ b/tests/metagpt/utils/test_try_parse_json.py @@ -0,0 +1,58 @@ +import unittest +import json5 +import re + +def try_parse_json(input_text): + input_text.index + start_index_brackets = input_text.find('[') + end_index_brackets = input_text.rfind(']') + start_index_curly = input_text.find('{') + end_index_curly = input_text.rfind('}') + + start_index = start_index_brackets + end_index = end_index_brackets + + if (start_index_curly != -1 and (start_index_curly < start_index_brackets or start_index_brackets < 0)): + start_index = start_index_curly + end_index = end_index_curly + + if start_index >= 0 and end_index > 0: + json_string = input_text[start_index:end_index + 1] + json_string = re.sub(r'\}[\s]*\{', '}, {', json_string) + json_string = re.sub(r'\][\s]*\[', '], [', json_string) + json_string = re.sub(r'"[\s]*"', '", "', json_string) + + try: + json_object = json5.loads(json_string) + except ValueError: + json_object = json5.loads(f"[{json_string}]") + + return json_object + + raise Exception("No JSON object found in input text.") + + +class TestTryParseJson(unittest.TestCase): + def test_valid_json(self): + input_text = '{"name": "John", "age": 30, "city": "New York"}' + expected_output = {"name": "John", "age": 30, "city": "New York"} + self.assertEqual(try_parse_json(input_text), expected_output) + + def test_invalid_json(self): + input_text = 'This is not a JSON string' + with self.assertRaises(Exception) as context: + try_parse_json(input_text) + self.assertTrue('No JSON object found in input text.' in str(context.exception)) + + def test_empty_json(self): + input_text = '{}' + expected_output = {} + self.assertEqual(try_parse_json(input_text), expected_output) + + def test_nested_json(self): + input_text = '{"name": "John", "age": 30, "city": "New York", "friends": ["Mike", "Anna"]}' + expected_output = {"name": "John", "age": 30, "city": "New York", "friends": ["Mike", "Anna"]} + self.assertEqual(try_parse_json(input_text), expected_output) + +if __name__ == '__main__': + unittest.main()