diff --git a/examples/st_game/memory/associative_memory.py b/examples/st_game/memory/associative_memory.py index 6a40b3dda..c771906ec 100644 --- a/examples/st_game/memory/associative_memory.py +++ b/examples/st_game/memory/associative_memory.py @@ -7,12 +7,13 @@ from metagpt.schema import Message import json from datetime import datetime + class MemoryBasic(Message): - def __init__(self, memory_id:str, memory_count:int, type_count:int, memory_type:str, depth:int, content:int, - creaetd:datetime, expiration:datetime, - subject:str, predicate:str, object:str, - embedding_key:str, poignancy:int, keywords:list, filling:list): + def __init__(self, memory_id: str, memory_count: int, type_count: int, memory_type: str, depth: int, content: int, + creaetd: datetime, expiration: datetime, + subject: str, predicate: str, object: str, + embedding_key: str, poignancy: int, keywords: list, filling: list): """ MemoryBasic继承于MG的Message类,其中content属性替代description属性 Message类中对于Chat类型支持的非常好,对于Agent个体的Perceive,Reflection,Plan支持的并不多 @@ -29,29 +30,30 @@ class MemoryBasic(Message): self.memory_count: int = memory_count # 第几个记忆,实际数值与Memory相等,但是类型为整数 self.type_count: int = type_count # 第几种记忆,类型为整数(具体不太理解如何生成的) self.memory_type: str = memory_type # 记忆类型,使用Field,包含 event,thought,chat三种类型 - self.depth:str = depth # 记忆深度,类型为整数 + self.depth: str = depth # 记忆深度,类型为整数 self.created: datetime = creaetd # 创建时间 self.expiration: datetime = expiration # 记忆失效时间,默认为空() self.last_accessed: datetime = creaetd # 上一次调用的时间,初始化时候与self.created一致 self.subject: str = subject # 主语,str类型 - self.predicate:str = predicate # 谓语,str类型 - self.object:str = object # 宾语,str类型 + self.predicate: str = predicate # 谓语,str类型 + self.object: str = object # 宾语,str类型 self.embedding_key: str = embedding_key # 内容与self.content一致 - self.poignancy:int = poignancy # importance值,整数类型 - self.keywords:list = keywords # keywords,列表 - self.filling:list = filling # None或者列表 + self.poignancy: int = poignancy # importance值,整数类型 + self.keywords: list = keywords # keywords,列表 + self.filling: list = filling # None或者列表 + class AgentMemory(Memory): """ GA中主要存储三种JSON 1. embedding.json (Dict embedding_key:embedding) - 2. Node.json (Dict Node_id:Node) - 3. kw_strength.json + 2. Node.json (Dict Node_id:Node) + 3. kw_strength.json """ - def __init__(self, memory_saved:str): + def __init__(self, memory_saved: str): """ AgentMemory类继承自Memory类,重写storage替代GA中id_to_node,一方面存储所有信息,一方面作为JSON转化 index存储与不同Agent的chat信息 @@ -61,7 +63,7 @@ class AgentMemory(Memory): self.storage: list[MemoryBasic] = [] # 重写Stroage,存储MemoryBasic所有节点 self.event_list = [] # 存储event记忆 self.thought_list = [] # 存储thought记忆 - + self.event_keywords = dict() # 存储keywords self.thought_keywords = dict() self.chat_keywords = dict() diff --git a/examples/st_game/memory/retrieve.py b/examples/st_game/memory/retrieve.py index 79042df06..5ac4a9b29 100644 --- a/examples/st_game/memory/retrieve.py +++ b/examples/st_game/memory/retrieve.py @@ -1,118 +1,122 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : Retrive函数实现 +# @Desc : Retrieve函数实现 +import datetime from numpy import dot from numpy.linalg import norm -from datetime import datetime -from associative_memory import AgentMemory,MemoryBasic +from associative_memory import AgentMemory, MemoryBasic from utils.utils import embedding_tools -def agent_retrive(agentmemory:AgentMemory, currtime:datetime, memory_forget:float, query:str, n:int= 30, topk:int=4) -> list[MemoryBasic]: + +def agent_retrieve(agent_memory: AgentMemory, curr_time: datetime.datetime, memory_forget: float, query: str, n: int = 30, topk: int = 4) -> list[MemoryBasic]: """ - retrive需要集合Role使用,原因在于Role才具有AgentMemory,scratch - 逻辑:Role调用该函数,self._rc.AgentMemory,self._rc.scratch.currtime,self._rc.scratch.memory_forget + Retrieve需要集合Role使用,原因在于Role才具有AgentMemory,scratch + 逻辑:Role调用该函数,self._rc.AgentMemory,self._rc.scratch.curr_time,self._rc.scratch.memory_forget 输入希望查询的内容与希望回顾的条数,返回TopK条高分记忆,即List[MemoryBasic] Score_lists示例 { - "memory":memories[i], MemoryBasic类 - "importance":memories[i].poignancy - "recency":衰减因子计算结果 - "relevance":搜索结果 + "memory": memories[i], MemoryBasic类 + "importance": memories[i].poignancy + "recency": 衰减因子计算结果 + "relevance": 搜索结果 } """ - memories = agentmemory.storage - sorted_memories = sorted(memories, key=lambda memory_node: memory_node.last_accessed_time,reverse=True) + memories = agent_memory.storage + sorted_memories = sorted(memories, key=lambda memory_node: memory_node.last_accessed_time, reverse=True) memories = sorted_memories[:n] if len(sorted_memories) >= n else sorted_memories - Score_list = [] - Score_list = extract_importance(memories, Score_list) - Score_list = extract_recency(currtime, memory_forget, Score_list) - Score_list = extract_relevance(query, Score_list) - Score_list = normalize_Socre_floats(Score_list, 0, 1) + score_list = [] + score_list = extract_importance(memories, score_list) + score_list = extract_recency(curr_time, memory_forget, score_list) + score_list = extract_relevance(query, score_list) + score_list = normalize_score_floats(score_list, 0, 1) + + total_dict = {} + gw = [1, 1, 1] # 三个因素的权重,重要性,近因性,相关性 + for i in range(len(score_list)): + total_score = (score_list[i]['importance'] * gw[0] + + score_list[i]['recency'] * gw[1] + + score_list[i]['relevance'] * gw[2] + ) + total_dict[score_list[i]['memory']] = total_score - total_dict = {} - gw = [1,1,1] # 三个因素的权重,重要性,近因性,相关性 - for i in range(len(Score_list)): - total_score = (Score_list[i]['importance']*gw[0] + - Score_list[i]['recency']*gw[1] + - Score_list[i]['relevance']*gw[2] - ) - total_dict[Score_list[i]['memory']] = total_score - result = top_highest_x_values(total_dict, topk) return result + def top_highest_x_values(d, x): """ 输入字典,Topx 返回以字典值排序,字典键组成的List[MemoryBasic] """ - top_v = [item[0] for item in sorted(d.items(),key=lambda item: item[1],reverse= True)[:x]] + top_v = [item[0] for item in sorted(d.items(), key=lambda item: item[1], reverse=True)[:x]] return top_v -def extract_importance(memories, Score_list): +def extract_importance(memories, score_list): """ 抽取重要性 """ for i in range(len(memories)): - Score = {"memory":memories[i], - "importance":memories[i].poignancy + score = {"memory": memories[i], + "importance": memories[i].poignancy } - Score_list.append(Score) - return Score_list + score_list.append(score) + return score_list -# 抽取相关性 -def extract_relevance(query, Score_list): + +def extract_relevance(query, score_list): """ 抽取相关性 """ query_embedding = embedding_tools(query) # 进行 - for i in range(len(Score_list)): - result = cos_sim(Score_list[i]["memory"].embedding_key, query_embedding) - Score_list[i]['relevance'] = result + for i in range(len(score_list)): + result = cos_sim(score_list[i]["memory"].embedding_key, query_embedding) + score_list[i]['relevance'] = result - return Score_list + return score_list -# 抽取近因性 -def extract_recency(currtime, memory_forget, Score_list): + +def extract_recency(curr_time, memory_forget, score_list): """ 抽取近因性,目前使用的现实世界过一天走一个衰减因子 """ - for i in range(len(Score_list)): - day_count = (currtime-Score_list[i]['memory'].created).days - Score_list[i]['recency'] = memory_forget**day_count - return Score_list + for i in range(len(score_list)): + day_count = (curr_time - score_list[i]['memory'].created).days + score_list[i]['recency'] = memory_forget**day_count + return score_list -def cos_sim(a, b): - """ - 计算余弦相似度 - """ - return dot(a, b)/(norm(a)*norm(b)) -def normalize_List_floats(Single_list, target_min, target_max): +def cos_sim(a, b): + """ + 计算余弦相似度 + """ + return dot(a, b) / (norm(a) * norm(b)) + + +def normalize_list_floats(single_list, target_min, target_max): """ 单个列表归一化 """ - min_val = min(Single_list) - max_val = max(Single_list) + min_val = min(single_list) + max_val = max(single_list) range_val = max_val - min_val - if range_val == 0: - for i in range(len(Single_list)): - Single_list[i] = (target_max - target_min)/2 - else: - for i in range(len(Single_list)): - Single_list[i] = ((Single_list[i] - min_val) * (target_max - target_min) - / range_val + target_min) - return Single_list + if range_val == 0: + for i in range(len(single_list)): + single_list[i] = (target_max - target_min) / 2 + else: + for i in range(len(single_list)): + single_list[i] = ((single_list[i] - min_val) * (target_max - target_min) + / range_val + target_min) + return single_list -def normalize_socre_floats(Score_list, target_min, target_max): +def normalize_score_floats(score_list, target_min, target_max): """ 整体归一化 """ @@ -120,19 +124,19 @@ def normalize_socre_floats(Score_list, target_min, target_max): relevance_list = [] recency_list = [] - for i in range(len(Score_list)): - importance_list.append(Score_list[i]['importance']) - relevance_list.append(Score_list[i]['relevance']) - recency_list.append(Score_list[i]['recency']) + for i in range(len(score_list)): + importance_list.append(score_list[i]['importance']) + relevance_list.append(score_list[i]['relevance']) + recency_list.append(score_list[i]['recency']) # 进行归一化操作 - importance_list = normalize_List_floats(importance_list,target_min, target_max) - relevance_list = normalize_List_floats(relevance_list,target_min, target_max) - recency_list =normalize_List_floats(recency_list,target_min, target_max) + importance_list = normalize_list_floats(importance_list, target_min, target_max) + relevance_list = normalize_list_floats(relevance_list, target_min, target_max) + recency_list = normalize_list_floats(recency_list, target_min, target_max) - for i in range(len(Score_list)): - Score_list[i]['importance'] = importance_list[i] - Score_list[i]['relevance'] = relevance_list[i] - Score_list[i]['recency'] = recency_list[i] - - return Score_list + for i in range(len(score_list)): + score_list[i]['importance'] = importance_list[i] + score_list[i]['relevance'] = relevance_list[i] + score_list[i]['recency'] = recency_list[i] + + return score_list diff --git a/examples/st_game/prompts/run_gpt_prompts.py b/examples/st_game/prompts/run_gpt_prompts.py index 86db1c7c2..16ccbc29c 100644 --- a/examples/st_game/prompts/run_gpt_prompts.py +++ b/examples/st_game/prompts/run_gpt_prompts.py @@ -1,18 +1,19 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : 调用PromptTemplates中模板,实现 +# @Desc : 调用Prompts中模板,实现相关Action -from wrapper_prompt import special_response_generate,prompt_generate +from wrapper_prompt import special_response_generate, prompt_generate from memory.scratch import Scratch from memory.associative_memory import MemoryBasic import json -def get_poignancy_action(scratch:Scratch, content:MemoryBasic.content)->str: + +def get_poignancy_action(scratch: Scratch, content: MemoryBasic.content) -> str: """ 衡量事件心酸度 """ - def create_prompt_input(scratch, content): - prompt_input = [scratch.name, + def create_prompt_input(scratch, content): + prompt_input = [scratch.name, scratch.iss, scratch.name, content] @@ -20,14 +21,13 @@ def get_poignancy_action(scratch:Scratch, content:MemoryBasic.content)->str: # 1. Prompt构建 # 2. Instruction给出 - prompt_template = "poignancy_chat_v1.txt" ######## - prompt_input = create_prompt_input(scratch, content) ######## + prompt_template = "poignancy_chat_v1.txt" # 保留原来的注释 + prompt_input = create_prompt_input(scratch, content) # 保留原来的注释 prompt = prompt_generate(prompt_input, prompt_template) special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10." poignancy = special_response_generate(prompt, special_instruction) try: poi_dict = json.loads(poignancy) - return (poi_dict['poignancy']) - except: + return str(poi_dict['poignancy']) # 将返回值强制转换为字符串 + except json.JSONDecodeError as e: return poignancy - diff --git a/examples/st_game/prompts/wrapper_prompt.py b/examples/st_game/prompts/wrapper_prompt.py index b61e13520..0950f99d1 100644 --- a/examples/st_game/prompts/wrapper_prompt.py +++ b/examples/st_game/prompts/wrapper_prompt.py @@ -1,42 +1,50 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : 基于Prmopt Templates 填充Prompt; 为Prompt包装与调用 +# @Desc : 基于Prompt Templates 填充Prompt; 为Prompt包装与调用 from metagpt import llm -def prompt_generate(curr_input:list, prompt_path:str): - """ - curr_input:输入一个按照PromptTemplate的要求的列表 - prompt_path:输入一个Promptpath - """ - if type(curr_input) == type("string"): - curr_input = [curr_input] - curr_input = [str(i) for i in curr_input] - f = open(prompt_path, "r") - prompt = f.read() - f.close() - for count, i in enumerate(curr_input): +def prompt_generate(curr_input: list, prompt_path: str): + """ + curr_input: 输入一个按照Prompt Template的要求的列表 + prompt_path: 输入一个Prompt path + """ + # 如果输入是字符串,将其转换为列表 + if isinstance(curr_input, str): + curr_input = [curr_input] + + # 将输入列表中的每个元素转换为字符串 + curr_input = [str(i) for i in curr_input] + + with open(prompt_path, "r") as f: + prompt = f.read() + + for count, i in enumerate(curr_input): prompt = prompt.replace(f"!!", i) - if "###" in prompt: + + if "###" in prompt: prompt = prompt.split("###")[1] + return prompt.strip() -def response_generate(prompt:str): + +def response_generate(prompt: str): """ - 待完善,我没有找到MG中可以设置Temprature以及Maxtoken的位置 + 待完善,我没有找到MG中可以设置Temperature以及Maxtoken的位置 """ return llm.ai_func(prompt) -def special_response_generate(prompt:str,special_instruction:str,example_output:str = None): + +def special_response_generate(prompt: str, special_instruction: str, example_output: str = None): """ 当对于Prompt生成有特殊要求时,调用该函数增加special_instruction或example_output """ prompt = '"""\n' + prompt + '\n"""\n' - prompt += f"Output the response to the prompt above in json. {special_instruction}\n" - if example_output: - prompt += "Example output json:\n" + prompt += f"Output the response to the prompt above in JSON. {special_instruction}\n" + + if example_output: + prompt += "Example output JSON:\n" prompt += '{"output": "' + str(example_output) + '"}' + return response_generate(prompt) - -