mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-17 15:35:21 +02:00
已完成修改
This commit is contained in:
parent
228b4c2604
commit
10e207f5c2
6 changed files with 281 additions and 258 deletions
|
|
@ -1,41 +1,42 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Desc : integration reflect action
|
||||
# @Desc : Integration Reflect Action
|
||||
|
||||
import re
|
||||
from ..roles.st_role import STRole
|
||||
from ..actions.st_action import STAction
|
||||
from ..memory.agent_memory import BasicMemory
|
||||
from metagpt.logs import logger
|
||||
|
||||
# run_gpt_prompt_focal_pt方法
|
||||
# Run GPT Prompt Focal Point method
|
||||
class AgentFocusPt(STAction):
|
||||
|
||||
def __init__(self, name="AgentFocusPt", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
llm_resp = "1) " + llm_resp.strip()
|
||||
ret = []
|
||||
for i in llm_resp.split("\n"):
|
||||
for i in llm_resp.split("\n"):
|
||||
ret += [i.split(") ")[-1]]
|
||||
return ret
|
||||
|
||||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, n: int, test_input = None) -> str:
|
||||
def create_prompt_input(role: STRole, statements, n, test_input=None):
|
||||
async def run(self, role: STRole, statements: str, n: int, test_input=None) -> str:
|
||||
def create_prompt_input(role: STRole, statements, n, test_input=None):
|
||||
prompt_input = [statements, str(n)]
|
||||
return prompt_input
|
||||
|
||||
prompt_input = create_prompt_input(role, statements,n)
|
||||
|
||||
prompt_input = create_prompt_input(role, statements, n)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"generate_focal_pt_v1.txt")
|
||||
|
||||
|
|
@ -44,63 +45,65 @@ class AgentFocusPt(STAction):
|
|||
output = await self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
return output
|
||||
|
||||
return output[0]
|
||||
|
||||
|
||||
# run_gpt_prompt_insight_and_guidance
|
||||
# Run GPT Prompt Insight and Guidance
|
||||
class AgentInsightAndGuidance(STAction):
|
||||
|
||||
def __init__(self, name="AgentInsightAndGuidance", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
llm_resp = "1. " + llm_resp.strip()
|
||||
ret = dict()
|
||||
for i in llm_resp.split("\n"):
|
||||
for i in llm_resp.split("\n"):
|
||||
row = i.split(". ")[-1]
|
||||
thought = row.split("(because of ")[0].strip()
|
||||
evi_raw = row.split("(because of ")[1].split(")")[0].strip()
|
||||
evi_raw = re.findall(r'\d+', evi_raw)
|
||||
evi_raw = [int(i.strip()) for i in evi_raw]
|
||||
ret[thought] = evi_raw
|
||||
return ret
|
||||
return ret
|
||||
|
||||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, n: int, test_input = None) -> str:
|
||||
def create_prompt_input(role: STRole, statements, n, test_input=None):
|
||||
async def run(self, role: STRole, statements: str, n: int, test_input=None) -> str:
|
||||
def create_prompt_input(role: STRole, statements, n, test_input=None):
|
||||
prompt_input = [statements, str(n)]
|
||||
return prompt_input
|
||||
|
||||
prompt_input = create_prompt_input(role, statements,n)
|
||||
|
||||
prompt_input = create_prompt_input(role, statements, n)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"insight_and_evidence_v1.txt")
|
||||
|
||||
output = await self._run_v1(prompt)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
return output
|
||||
|
||||
return output[0]
|
||||
|
||||
# run_gpt_prompt_event_triple
|
||||
# Run GPT Prompt Event Triple
|
||||
class AgentEventTriple(STAction):
|
||||
def __init__(self, name="AgentEventTriple", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
llm_resp = self._func_cleanup(llm_resp, prompt="")
|
||||
if len(llm_resp) != 2:
|
||||
if len(llm_resp) != 2:
|
||||
return False
|
||||
except: return False
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
cr = llm_resp.strip()
|
||||
|
|
@ -109,35 +112,37 @@ class AgentEventTriple(STAction):
|
|||
|
||||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, statements: str, role: STRole, verbose = False) -> str:
|
||||
def create_prompt_input(statements, role):
|
||||
if "(" in statements:
|
||||
|
||||
async def run(self, statements: str, role: STRole, verbose=False) -> str:
|
||||
def create_prompt_input(statements, role):
|
||||
if "(" in statements:
|
||||
statements = statements.split("(")[-1].split(")")[0]
|
||||
prompt_input = [role._rc.scratch.name,
|
||||
statements,
|
||||
role._rc.scratch.name]
|
||||
prompt_input = [role._rc.scratch.name,
|
||||
statements,
|
||||
role._rc.scratch.name]
|
||||
return prompt_input
|
||||
|
||||
|
||||
prompt_input = create_prompt_input(statements, role)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"generate_event_triple_v1.txt")
|
||||
|
||||
output = await self._run_v1(prompt)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
|
||||
return output[0]
|
||||
|
||||
# run_gpt_prompt_event_poignancy
|
||||
|
||||
# Run GPT Prompt Event Poignancy
|
||||
class AgentEventPoignancy(STAction):
|
||||
def __init__(self, name="AgentEventPoignancy", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
llm_resp = int(llm_resp.strip())
|
||||
|
|
@ -146,37 +151,39 @@ class AgentEventPoignancy(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, test_input = None, verbose = False) -> str:
|
||||
def create_prompt_input(role: STRole, statements: str, test_input=None):
|
||||
async def run(self, role: STRole, statements: str, test_input=None, verbose=False) -> str:
|
||||
def create_prompt_input(role: STRole, statements: str, test_input=None):
|
||||
prompt_input = [role._rc.scratch.name,
|
||||
role._rc.scratch.get_str_iss(),
|
||||
role._rc.scratch.name,
|
||||
statements]
|
||||
return prompt_input
|
||||
|
||||
|
||||
prompt_input = create_prompt_input(role, statements)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"poignancy_event_v1.txt")
|
||||
|
||||
example_output = "5" ########
|
||||
example_output = "5" # ########
|
||||
special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10."
|
||||
output = await self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
|
||||
return output[0]
|
||||
return output
|
||||
|
||||
# run_gpt_prompt_chat_poignancy
|
||||
|
||||
# Run GPT Prompt Chat Poignancy
|
||||
class AgentChatPoignancy(STAction):
|
||||
def __init__(self, name="AgentChatPoignancy", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
llm_resp = int(llm_resp.strip())
|
||||
|
|
@ -185,37 +192,39 @@ class AgentChatPoignancy(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, test_input = None, verbose = False) -> str:
|
||||
def create_prompt_input(role: STRole, statements, test_input=None):
|
||||
async def run(self, role: STRole, statements: str, test_input=None, verbose=False) -> str:
|
||||
def create_prompt_input(role: STRole, statements, test_input=None):
|
||||
prompt_input = [role._rc.scratch.name,
|
||||
role._rc.scratch.get_str_iss(),
|
||||
role._rc.scratch.name,
|
||||
statements]
|
||||
return prompt_input
|
||||
|
||||
|
||||
prompt_input = create_prompt_input(role, statements)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"poignancy_chat_v1.txt")
|
||||
|
||||
example_output = "5" ########
|
||||
example_output = "5" # ########
|
||||
special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10."
|
||||
output = await self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
|
||||
return output[0]
|
||||
return output
|
||||
|
||||
# run_gpt_prompt_planning_thought_on_convo
|
||||
|
||||
# Run GPT Prompt Planning Thought on Convo
|
||||
class AgentPlanThoughtOnConvo(STAction):
|
||||
def __init__(self, name="AgentPlanThoughtOnConvo", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
return llm_resp.split('"')[0].strip()
|
||||
|
|
@ -223,33 +232,35 @@ class AgentPlanThoughtOnConvo(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, test_input = None, verbose = False) -> str:
|
||||
def create_prompt_input(role, statements, test_input=None):
|
||||
prompt_input = [statements,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name,
|
||||
async def run(self, role: STRole, statements: str, test_input=None, verbose=False) -> str:
|
||||
def create_prompt_input(role, statements, test_input=None):
|
||||
prompt_input = [statements,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name]
|
||||
return prompt_input
|
||||
|
||||
|
||||
prompt_input = create_prompt_input(role, statements)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"planning_thought_on_convo_v1.txt")
|
||||
|
||||
output = await self._run_v1(prompt)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
|
||||
return output[0]
|
||||
return output
|
||||
|
||||
# run_gpt_prompt_memo_on_convo
|
||||
|
||||
# Run GPT Prompt Memory on Convo
|
||||
class AgentMemoryOnConvo(STAction):
|
||||
def __init__(self, name="AgentMemoryOnConvo", context: list[BasicMemory] = None, llm=None):
|
||||
super().__init__(name, context, llm)
|
||||
|
||||
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
|
||||
try:
|
||||
try:
|
||||
self._func_cleanup(llm_resp, prompt)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
|
||||
return llm_resp.split('"')[0].strip()
|
||||
|
|
@ -257,21 +268,22 @@ class AgentMemoryOnConvo(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, role: STRole, statements: str, test_input = None, verbose = False) -> str:
|
||||
def create_prompt_input(role, statements, test_input=None):
|
||||
prompt_input = [statements,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name,
|
||||
async def run(self, role: STRole, statements: str, test_input=None, verbose=False) -> str:
|
||||
def create_prompt_input(role, statements, test_input=None):
|
||||
prompt_input = [statements,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name,
|
||||
role._rc.scratch.name]
|
||||
return prompt_input
|
||||
|
||||
|
||||
prompt_input = create_prompt_input(role, statements)
|
||||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"memo_on_convo_v1.txt")
|
||||
example_output = 'Jane Doe was interesting to talk to.'
|
||||
example_output = 'Jane Doe was interesting to talk to.'
|
||||
special_instruction = 'The output should ONLY contain a string that summarizes anything interesting that the agent may have noticed'
|
||||
output = await self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {output}")
|
||||
|
||||
return output[0]
|
||||
return output
|
||||
|
|
|
|||
|
|
@ -7,12 +7,14 @@ Description: Compresses a simulation for replay demos.
|
|||
|
||||
import shutil
|
||||
import json
|
||||
from utils.tools import find_filenames, create_folder_if_not_there
|
||||
from utils.utils import find_filenames, create_folder_if_not_there
|
||||
from utils.const import STORAGE_PATH
|
||||
|
||||
|
||||
def compress(sim_code):
|
||||
# 构建文件路径
|
||||
sim_storage = f"../environment/frontend_server/storage/{sim_code}"
|
||||
compressed_storage = f"../environment/frontend_server/compressed_storage/{sim_code}"
|
||||
sim_storage = str(STORAGE_PATH.joinpath(f"{sim_code}"))
|
||||
compressed_storage = str(STORAGE_PATH.joinpath(f"compressed_storage/{sim_code}"))
|
||||
persona_folder = sim_storage + "/personas"
|
||||
move_folder = sim_storage + "/movement"
|
||||
meta_file = sim_storage + "/reverie/meta.json"
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
import datetime
|
||||
import json
|
||||
|
||||
from ..utils.tools import check_if_file_exists
|
||||
from ..utils.utils import check_if_file_exists
|
||||
|
||||
|
||||
class Scratch:
|
||||
|
|
|
|||
|
|
@ -1,217 +1,226 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Desc : reflect function
|
||||
import sys
|
||||
|
||||
# @Desc : Reflect function
|
||||
import datetime
|
||||
import random
|
||||
import sys
|
||||
|
||||
from numpy import dot
|
||||
from numpy.linalg import norm
|
||||
from ..roles.st_role import STRole
|
||||
from ..utils.utils import get_embedding
|
||||
from ..actions.run_reflect_action import AgentFocusPt,AgentInsightAndGuidance,AgentEventTriple,AgentEventPoignancy,AgentChatPoignancy,AgentPlanThoughtOnConvo,AgentMemoryOnConvo
|
||||
from metagpt.logs import logger
|
||||
from ..actions.run_reflect_action import (
|
||||
AgentFocusPt, AgentInsightAndGuidance, AgentEventTriple,
|
||||
AgentEventPoignancy, AgentChatPoignancy, AgentPlanThoughtOnConvo,
|
||||
AgentMemoryOnConvo
|
||||
)
|
||||
|
||||
def generate_focal_points(strole:STRole, n=3):
|
||||
|
||||
nodes = [[i.last_accessed, i]
|
||||
for i in strole._rc.memory.event_list + strole._rc.memory.thought_list
|
||||
if "idle" not in i.embedding_key]
|
||||
|
||||
def generate_focal_points(role: STRole, n=3):
|
||||
nodes = [
|
||||
[i.last_accessed, i] for i in
|
||||
role._rc.memory.event_list + role._rc.memory.thought_list
|
||||
if "idle" not in i.embedding_key
|
||||
]
|
||||
nodes = sorted(nodes, key=lambda x: x[0])
|
||||
nodes = [i for created, i in nodes]
|
||||
nodes = [i for _, i in nodes]
|
||||
|
||||
statements = ""
|
||||
for node in nodes[-1 * strole._rc.scratch.importance_ele_n:]:
|
||||
for node in nodes[-1 * role._rc.scratch.importance_ele_n:]:
|
||||
statements += node.embedding_key + "\n"
|
||||
run_focal_pt = AgentFocusPt()
|
||||
# Question 1
|
||||
return run_focal_pt.run(strole, statements, n)
|
||||
return run_focal_pt.run(role, statements, n)
|
||||
|
||||
|
||||
def generate_insights_and_evidence(strole, nodes, n=5):
|
||||
|
||||
def generate_insights_and_evidence(role, nodes, n=5):
|
||||
statements = ""
|
||||
for count, node in enumerate(nodes):
|
||||
for count, node in enumerate(nodes):
|
||||
statements += f'{str(count)}. {node.embedding_key}\n'
|
||||
run_insight_and_guidance = AgentInsightAndGuidance()
|
||||
ret = run_insight_and_guidance.run(strole, statements, n)
|
||||
ret = run_insight_and_guidance.run(role, statements, n)
|
||||
|
||||
print(ret)
|
||||
try:
|
||||
|
||||
for thought, evi_raw in ret.items():
|
||||
logger.info(ret)
|
||||
try:
|
||||
for thought, evi_raw in ret.items():
|
||||
evidence_node_id = [nodes[i].node_id for i in evi_raw]
|
||||
ret[thought] = evidence_node_id
|
||||
return ret
|
||||
except:
|
||||
return {"this is blank": "node_1"}
|
||||
except:
|
||||
return {"this is blank": "node_1"}
|
||||
|
||||
|
||||
def generate_action_event_triple(act_desp, strole):
|
||||
"""TODO
|
||||
def generate_action_event_triple(act_desp, role):
|
||||
"""TODO
|
||||
|
||||
INPUT:
|
||||
INPUT:
|
||||
act_desp: the description of the action (e.g., "sleeping")
|
||||
strole: The Persona class instance
|
||||
OUTPUT:
|
||||
role: The Persona class instance
|
||||
OUTPUT:
|
||||
a string of emoji that translates action description.
|
||||
EXAMPLE OUTPUT:
|
||||
EXAMPLE OUTPUT:
|
||||
"🧈🍞"
|
||||
"""
|
||||
run_event_triple = AgentEventTriple()
|
||||
return AgentEventTriple(act_desp, strole)
|
||||
return AgentEventTriple(act_desp, role)
|
||||
|
||||
|
||||
def generate_poig_score(strole:STRole, event_type, description):
|
||||
|
||||
if "is idle" in description:
|
||||
def generate_poig_score(role: STRole, event_type, description):
|
||||
if "is idle" in description:
|
||||
return 1
|
||||
|
||||
if event_type == "event" or event_type == "thought":
|
||||
if event_type == "event" or event_type == "thought":
|
||||
run_event_poignancy = AgentEventPoignancy()
|
||||
return run_event_poignancy.run(strole, description)[0]
|
||||
elif event_type == "chat":
|
||||
return run_event_poignancy.run(role, description)[0]
|
||||
elif event_type == "chat":
|
||||
run_chat_poignancy = AgentChatPoignancy()
|
||||
return run_chat_poignancy.run(strole,
|
||||
strole._rc.scratch.act_description)[0]
|
||||
return run_chat_poignancy.run(role,
|
||||
role._rc.scratch.act_description)[0]
|
||||
|
||||
|
||||
def generate_planning_thought_on_convo(strole, all_utt):
|
||||
def generate_planning_thought_on_convo(role, all_utt):
|
||||
run_planning_on_convo = AgentPlanThoughtOnConvo()
|
||||
return run_planning_on_convo.run(strole, all_utt)
|
||||
return run_planning_on_convo.run(role, all_utt)
|
||||
|
||||
|
||||
def generate_memo_on_convo(strole, all_utt):
|
||||
def generate_memo_on_convo(role, all_utt):
|
||||
run_memo_on_convo = AgentMemoryOnConvo()
|
||||
return run_memo_on_convo.run(strole, all_utt)
|
||||
|
||||
return run_memo_on_convo.run(role, all_utt)
|
||||
|
||||
|
||||
# Done
|
||||
def run_reflect(strole:STRole):
|
||||
def run_reflect(role: STRole):
|
||||
"""
|
||||
Run the actual reflection. We generate the focal points, retrieve any
|
||||
relevant nodes, and generate thoughts and insights.
|
||||
Run the actual reflection. We generate the focal points, retrieve any
|
||||
relevant nodes, and generate thoughts and insights.
|
||||
|
||||
INPUT:
|
||||
strole: Current Persona object
|
||||
Output:
|
||||
INPUT:
|
||||
role: Current Persona object
|
||||
Output:
|
||||
None
|
||||
"""
|
||||
# Reflection requires certain focal points. Generate that first.
|
||||
focal_points = generate_focal_points(strole, 3)
|
||||
# Retrieve the relevant Nodes object for each of the focal points.
|
||||
# <retrieved> has keys of focal points, and values of the associated Nodes.
|
||||
retrieved = strole.retrieve(focal_points)
|
||||
# Reflection requires certain focal points. Generate that first.
|
||||
focal_points = generate_focal_points(role, 3)
|
||||
# Retrieve the relevant Nodes object for each of the focal points.
|
||||
# <retrieved> has keys of focal points, and values of the associated Nodes.
|
||||
retrieved = role.retrieve(focal_points)
|
||||
|
||||
# For each of the focal points, generate thoughts and save it in the
|
||||
# agent's memory.
|
||||
for focal_pt, nodes in retrieved.items():
|
||||
# For each of the focal points, generate thoughts and save it in the
|
||||
# agent's memory.
|
||||
for focal_pt, nodes in retrieved.items():
|
||||
xx = [i.embedding_key for i in nodes]
|
||||
for xxx in xx: print(xxx)
|
||||
for xxx in xx: logger.info(xxx)
|
||||
|
||||
thoughts = generate_insights_and_evidence(strole, nodes, 5)
|
||||
thoughts = generate_insights_and_evidence(role, nodes, 5)
|
||||
# 生成的是字典类型
|
||||
for thought, evidence in thoughts.items():
|
||||
created = strole.scratch.curr_time
|
||||
expiration = strole.scratch.curr_time + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(thought, strole)
|
||||
for thought, evidence in thoughts.items():
|
||||
created = role.scratch.curr_time
|
||||
expiration = created + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(thought, role)
|
||||
keywords = set([s, p, o])
|
||||
thought_poignancy = generate_poig_score(strole, "thought", thought)
|
||||
thought_poignancy = generate_poig_score(role, "thought", thought)
|
||||
thought_embedding_pair = (thought, get_embedding(thought))
|
||||
|
||||
strole._rc.memory.add_thought(created, expiration, s, p, o,
|
||||
thought, keywords, thought_poignancy,
|
||||
thought_embedding_pair, evidence)
|
||||
role._rc.memory.add_thought(
|
||||
created, expiration, s, p, o, thought, keywords,
|
||||
thought_poignancy, thought_embedding_pair, evidence
|
||||
)
|
||||
|
||||
|
||||
# Done
|
||||
def reflection_trigger(strole: STRole):
|
||||
def reflection_trigger(role: STRole):
|
||||
"""
|
||||
Given the current strole, determine whether the strole should run a
|
||||
reflection.
|
||||
Given the current role, determine whether the role should run a
|
||||
reflection.
|
||||
|
||||
Our current implementation checks for whether the sum of the new importance
|
||||
measure has reached the set (hyper-parameter) threshold.
|
||||
|
||||
INPUT:
|
||||
strole: Current Persona object
|
||||
Output:
|
||||
True if we are running a new reflection.
|
||||
False otherwise.
|
||||
INPUT:
|
||||
role: Current Persona object
|
||||
Output:
|
||||
True if we are running a new reflection.
|
||||
False otherwise.
|
||||
"""
|
||||
print(strole._rc.scratch.name, "strole.scratch.importance_trigger_curr::", strole._rc.scratch.importance_trigger_curr)
|
||||
print(strole._rc.scratch.importance_trigger_max)
|
||||
logger.info(
|
||||
role._rc.scratch.name, "role.scratch.importance_trigger_curr::",
|
||||
role._rc.scratch.importance_trigger_curr
|
||||
)
|
||||
logger.info(role._rc.scratch.importance_trigger_max)
|
||||
|
||||
if (strole._rc.scratch.importance_trigger_curr <= 0 and
|
||||
[] != strole._rc.memory.seq_event + strole._rc.memory.seq_thought):
|
||||
return True
|
||||
if (role._rc.scratch.importance_trigger_curr <= 0 and
|
||||
[] != role._rc.memory.seq_event + role._rc.memory.seq_thought):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Done
|
||||
def reset_reflection_counter(strole: STRole):
|
||||
"""
|
||||
We reset the counters used for the reflection trigger.
|
||||
|
||||
INPUT:
|
||||
strole: Current Persona object
|
||||
Output:
|
||||
# Done
|
||||
def reset_reflection_counter(role: STRole):
|
||||
"""
|
||||
We reset the counters used for the reflection trigger.
|
||||
|
||||
INPUT:
|
||||
role: Current Persona object
|
||||
Output:
|
||||
None
|
||||
"""
|
||||
strole_imt_max = strole._rc.scratch.importance_trigger_max
|
||||
strole._rc.scratch.importance_trigger_curr = strole_imt_max
|
||||
strole._rc.scratch.importance_ele_n = 0
|
||||
role_imt_max = role._rc.scratch.importance_trigger_max
|
||||
role._rc.scratch.importance_trigger_curr = role_imt_max
|
||||
role._rc.scratch.importance_ele_n = 0
|
||||
|
||||
|
||||
# Question 1 chat函数
|
||||
def reflect(strole: STRole):
|
||||
def reflect(role: STRole):
|
||||
"""
|
||||
The main reflection module for the strole. We first check if the trigger
|
||||
conditions are met, and if so, run the reflection and reset any of the
|
||||
relevant counters.
|
||||
The main reflection module for the role. We first check if the trigger
|
||||
conditions are met, and if so, run the reflection and reset any of the
|
||||
relevant counters.
|
||||
|
||||
INPUT:
|
||||
strole: Current Persona object
|
||||
Output:
|
||||
INPUT:
|
||||
role: Current Persona object
|
||||
Output:
|
||||
None
|
||||
"""
|
||||
if reflection_trigger(strole):
|
||||
run_reflect(strole)
|
||||
reset_reflection_counter(strole)
|
||||
if reflection_trigger(role):
|
||||
run_reflect(role)
|
||||
reset_reflection_counter(role)
|
||||
|
||||
if strole._rc.scratch.chatting_end_time:
|
||||
if strole._rc.scratch.curr_time + datetime.timedelta(0,10) == strole._rc.scratch.chatting_end_time:
|
||||
if role._rc.scratch.chatting_end_time:
|
||||
if role._rc.scratch.curr_time + datetime.timedelta(0, 10) == role._rc.scratch.chatting_end_time:
|
||||
all_utt = ""
|
||||
if strole._rc.scratch.chat:
|
||||
for row in strole._rc.scratch.chat:
|
||||
if role._rc.scratch.chat:
|
||||
for row in role._rc.scratch.chat:
|
||||
all_utt += f"{row[0]}: {row[1]}\n"
|
||||
|
||||
# Question memory添加对话函数
|
||||
evidence = [strole._rc.memory.get_last_chat(strole._rc.scratch.chatting_with).memory_id]
|
||||
evidence = [role._rc.memory.get_last_chat(role._rc.scratch.chatting_with).memory_id]
|
||||
|
||||
planning_thought = generate_planning_thought_on_convo(strole, all_utt)
|
||||
planning_thought = f"For {strole._rc.scratch.name}'s planning: {planning_thought}"
|
||||
planning_thought = generate_planning_thought_on_convo(role, all_utt)
|
||||
planning_thought = f"For {role._rc.scratch.name}'s planning: {planning_thought}"
|
||||
|
||||
created = strole._rc.scratch.curr_time
|
||||
expiration = strole._rc.scratch.curr_time + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(planning_thought, strole)
|
||||
created = role._rc.scratch.curr_time
|
||||
expiration = created + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(planning_thought, role)
|
||||
keywords = set([s, p, o])
|
||||
thought_poignancy = generate_poig_score(strole, "thought", planning_thought)
|
||||
thought_poignancy = generate_poig_score(role, "thought", planning_thought)
|
||||
thought_embedding_pair = (planning_thought, get_embedding(planning_thought))
|
||||
|
||||
strole._rc.memory.add_thought(created, expiration, s, p, o,
|
||||
planning_thought, keywords, thought_poignancy,
|
||||
thought_embedding_pair, evidence)
|
||||
role._rc.memory.add_thought(
|
||||
created, expiration, s, p, o, planning_thought, keywords,
|
||||
thought_poignancy, thought_embedding_pair, evidence
|
||||
)
|
||||
|
||||
memo_thought = generate_memo_on_convo(strole, all_utt)
|
||||
memo_thought = f"{strole._rc.scratch.name} {memo_thought}"
|
||||
memo_thought = generate_memo_on_convo(role, all_utt)
|
||||
memo_thought = f"{role._rc.scratch.name} {memo_thought}"
|
||||
|
||||
created = strole._rc.scratch.curr_time
|
||||
expiration = strole._rc.scratch.curr_time + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(memo_thought, strole)
|
||||
created = role._rc.scratch.curr_time
|
||||
expiration = created + datetime.timedelta(days=30)
|
||||
s, p, o = generate_action_event_triple(memo_thought, role)
|
||||
keywords = set([s, p, o])
|
||||
thought_poignancy = generate_poig_score(strole, "thought", memo_thought)
|
||||
thought_poignancy = generate_poig_score(role, "thought", memo_thought)
|
||||
thought_embedding_pair = (memo_thought, get_embedding(memo_thought))
|
||||
|
||||
strole._rc.memory.add_thought(created, expiration, s, p, o,
|
||||
memo_thought, keywords, thought_poignancy,
|
||||
thought_embedding_pair, evidence)
|
||||
role._rc.memory.add_thought(
|
||||
created, expiration, s, p, o, memo_thought, keywords,
|
||||
thought_poignancy, thought_embedding_pair, evidence
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,60 +0,0 @@
|
|||
import os
|
||||
|
||||
def check_if_file_exists(curr_file):
|
||||
"""
|
||||
Checks if a file exists
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
True if the file exists
|
||||
False if the file does not exist
|
||||
"""
|
||||
try:
|
||||
with open(curr_file) as f_analysis_file:
|
||||
pass
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def create_folder_if_not_there(curr_path):
|
||||
"""
|
||||
Checks if a folder in the curr_path exists. If it does not exist, creates
|
||||
the folder.
|
||||
Note that if the curr_path designates a file location, it will operate on
|
||||
the folder that contains the file. But the function also works even if the
|
||||
path designates to just a folder.
|
||||
Args:
|
||||
curr_list: list to write. The list comes in the following form:
|
||||
[['key1', 'val1-1', 'val1-2'...],
|
||||
['key2', 'val2-1', 'val2-2'...],]
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
True: if a new folder is created
|
||||
False: if a new folder is not created
|
||||
"""
|
||||
outfolder_name = curr_path.split("/")
|
||||
if len(outfolder_name) != 1:
|
||||
# This checks if the curr path is a file or a folder.
|
||||
if "." in outfolder_name[-1]:
|
||||
outfolder_name = outfolder_name[:-1]
|
||||
|
||||
outfolder_name = "/".join(outfolder_name)
|
||||
if not os.path.exists(outfolder_name):
|
||||
os.makedirs(outfolder_name)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def find_filenames(path_to_dir, suffix=".csv"):
|
||||
"""
|
||||
Given a directory, find all files that end with the provided suffix and
|
||||
return their paths.
|
||||
ARGS:
|
||||
path_to_dir: Path to the current directory
|
||||
suffix: The target suffix.
|
||||
RETURNS:
|
||||
A list of paths to all files in the directory.
|
||||
"""
|
||||
filenames = os.listdir(path_to_dir)
|
||||
return [path_to_dir + "/" + filename
|
||||
for filename in filenames if filename.endswith(suffix)]
|
||||
|
|
@ -7,6 +7,7 @@ import json
|
|||
import openai
|
||||
from pathlib import Path
|
||||
import csv
|
||||
import os
|
||||
from ..prompts.run_gpt_prompts import get_poignancy_action, get_poignancy_chat
|
||||
|
||||
|
||||
|
|
@ -92,3 +93,62 @@ def extract_first_json_dict(data_str: str) -> Union[None, dict]:
|
|||
except json.JSONDecodeError:
|
||||
# If parsing fails, return None
|
||||
return None
|
||||
|
||||
def check_if_file_exists(curr_file):
|
||||
"""
|
||||
Checks if a file exists
|
||||
ARGS:
|
||||
curr_file: path to the current csv file.
|
||||
RETURNS:
|
||||
True if the file exists
|
||||
False if the file does not exist
|
||||
"""
|
||||
try:
|
||||
with open(curr_file) as f_analysis_file:
|
||||
pass
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def create_folder_if_not_there(curr_path):
|
||||
"""
|
||||
Checks if a folder in the curr_path exists. If it does not exist, creates
|
||||
the folder.
|
||||
Note that if the curr_path designates a file location, it will operate on
|
||||
the folder that contains the file. But the function also works even if the
|
||||
path designates to just a folder.
|
||||
Args:
|
||||
curr_list: list to write. The list comes in the following form:
|
||||
[['key1', 'val1-1', 'val1-2'...],
|
||||
['key2', 'val2-1', 'val2-2'...],]
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
True: if a new folder is created
|
||||
False: if a new folder is not created
|
||||
"""
|
||||
outfolder_name = curr_path.split("/")
|
||||
if len(outfolder_name) != 1:
|
||||
# This checks if the curr path is a file or a folder.
|
||||
if "." in outfolder_name[-1]:
|
||||
outfolder_name = outfolder_name[:-1]
|
||||
|
||||
outfolder_name = "/".join(outfolder_name)
|
||||
if not os.path.exists(outfolder_name):
|
||||
os.makedirs(outfolder_name)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def find_filenames(path_to_dir, suffix=".csv"):
|
||||
"""
|
||||
Given a directory, find all files that end with the provided suffix and
|
||||
return their paths.
|
||||
ARGS:
|
||||
path_to_dir: Path to the current directory
|
||||
suffix: The target suffix.
|
||||
RETURNS:
|
||||
A list of paths to all files in the directory.
|
||||
"""
|
||||
filenames = os.listdir(path_to_dir)
|
||||
return [path_to_dir + "/" + filename
|
||||
for filename in filenames if filename.endswith(suffix)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue