Prompt and agent manager, dynamically load configuration from config-svc

- prompt-template takes config from the config-svc, dynamically reloads
  as new config appears.
- agent-react takes config from config-svc, dynamically reloads
- Fixed lack of data in config queue, needed to take the Earliest, not the
  Latest values.
- Changed text-completion and knowledge-query tool to both use 'query'
  as the argument.
- Prompt and agent no longer have command line args to supply
  configuration.
This commit is contained in:
cybermaggedon 2025-04-02 16:37:08 +01:00 committed by GitHub
parent b08c7f03a6
commit 298d09f388
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 141 additions and 246 deletions

View file

@ -85,19 +85,19 @@ class AgentManager:
return a
def react(self, question, history, think, observe):
async def react(self, question, history, think, observe):
act = self.reason(question, history)
logger.info(f"act: {act}")
if isinstance(act, Final):
think(act.thought)
await think(act.thought)
return act
else:
think(act.thought)
await think(act.thought)
if act.name in self.tools:
action = self.tools[act.name]
@ -110,7 +110,7 @@ class AgentManager:
logger.info(f"resp: {resp}")
observe(resp)
await observe(resp)
act.observation = resp

View file

@ -36,92 +36,12 @@ class Processor(ConsumerProducer):
def __init__(self, **params):
additional = params.get("context", None)
self.max_iterations = int(params.get("max_iterations", default_max_iterations))
self.max_iterations = int(
params.get("max_iterations", default_max_iterations)
)
tools = {}
# Parsing the prompt information to the prompt configuration
# structure
tool_type_arg = params.get("tool_type", [])
if tool_type_arg:
for t in tool_type_arg:
toks = t.split("=", 1)
if len(toks) < 2:
raise RuntimeError(
f"Tool-type string not well-formed: {t}"
)
ttoks = toks[1].split(":", 1)
if len(ttoks) < 1:
raise RuntimeError(
f"Tool-type string not well-formed: {t}"
)
if ttoks[0] == "knowledge-query":
impl = KnowledgeQueryImpl(self)
elif ttoks[0] == "text-completion":
impl = TextCompletionImpl(self)
else:
raise RuntimeError(
f"Tool-kind {ttoks[0]} not known"
)
if len(ttoks) == 1:
tools[toks[0]] = Tool(
name = toks[0],
description = "",
implementation = impl,
config = { "input": "query" },
arguments = {},
)
else:
tools[toks[0]] = Tool(
name = toks[0],
description = "",
implementation = impl,
config = { "input": ttoks[1] },
arguments = {},
)
# parsing the prompt information to the prompt configuration
# structure
tool_desc_arg = params.get("tool_description", [])
if tool_desc_arg:
for t in tool_desc_arg:
toks = t.split("=", 1)
if len(toks) < 2:
raise runtimeerror(
f"tool-type string not well-formed: {t}"
)
if toks[0] not in tools:
raise runtimeerror(f"description, tool {toks[0]} not known")
tools[toks[0]].description = toks[1]
# Parsing the prompt information to the prompt configuration
# structure
tool_arg_arg = params.get("tool_argument", [])
if tool_arg_arg:
for t in tool_arg_arg:
toks = t.split("=", 1)
if len(toks) < 2:
raise RuntimeError(
f"Tool-type string not well-formed: {t}"
)
ttoks = toks[1].split(":", 2)
if len(ttoks) != 3:
raise RuntimeError(
f"Tool argument string not well-formed: {t}"
)
if toks[0] not in tools:
raise RuntimeError(f"Description, tool {toks[0]} not known")
tools[toks[0]].arguments[ttoks[0]] = Argument(
name = ttoks[0],
type = ttoks[1],
description = ttoks[2]
)
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
@ -138,6 +58,8 @@ class Processor(ConsumerProducer):
"graph_rag_response_queue", gr_response_queue
)
self.config_key = params.get("config_type", "agent")
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
@ -176,20 +98,74 @@ class Processor(ConsumerProducer):
self.agent = AgentManager(
context=self,
tools=tools,
additional_context=additional
tools=[],
additional_context="",
)
def parse_json(self, text):
json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
else:
# If no delimiters, assume the entire output is JSON
json_str = text.strip()
async def on_config(self, version, config):
return json.loads(json_str)
print("Loading configuration version", version)
if self.config_key not in config:
print(f"No key {self.config_key} in config", flush=True)
return
config = config[self.config_key]
try:
# This is some extra stuff to put in the prompt
additional = config.get("additional-context", None)
ix = json.loads(config["tool-index"])
tools = {}
for k in ix:
pc = config[f"tool.{k}"]
data = json.loads(pc)
arguments = {
v.get("name"): Argument(
name = v.get("name"),
type = v.get("type"),
description = v.get("description")
)
for v in data["arguments"]
}
impl_id = data.get("type")
if impl_id == "knowledge-query":
impl = KnowledgeQueryImpl(self)
elif impl_id == "text-completion":
impl = TextCompletionImpl(self)
else:
raise RuntimeError(
f"Tool-kind {impl_id} not known"
)
tools[data.get("name")] = Tool(
name = data.get("name"),
description = data.get("description"),
implementation = impl,
config=data.get("config", {}),
arguments = arguments,
)
self.agent = AgentManager(
context=self,
tools=tools,
additional_context=additional
)
print("Prompt configuration reloaded.", flush=True)
except Exception as e:
print("Exception:", e, flush=True)
print("Configuration reload failed", flush=True)
async def handle(self, msg):
@ -246,7 +222,7 @@ class Processor(ConsumerProducer):
await self.send(r, properties={"id": id})
act = self.agent.react(v.question, history, think, observe)
act = await self.agent.react(v.question, history, think, observe)
print(f"Action: {act}", flush=True)
@ -337,46 +313,18 @@ class Processor(ConsumerProducer):
help=f'Graph RAG response queue (default: {gr_response_queue})',
)
parser.add_argument(
'--tool-type', nargs='*',
help=f'''Specifies the type of an agent tool. Takes the form
<id>=<specifier>. <id> is the name of the tool. <specifier> is one of
knowledge-query, text-completion. Additional parameters are specified
for different tools which are tool-specific. e.g. knowledge-query:<arg>
which specifies the name of the arg whose content is fed into the knowledge
query as a question. text-completion:<arg> specifies the name of the arg
whose content is fed into the text-completion service as a prompt'''
)
parser.add_argument(
'--tool-description', nargs='*',
help=f'''Specifies the textual description of a tool. Takes
the form <id>=<description>. The description is important, it teaches the
LLM how to use the tool. It should describe what it does and how to
use the arguments. This is specified in natural language.'''
)
parser.add_argument(
'--tool-argument', nargs='*',
help=f'''Specifies argument usage for a tool. Takes
the form <id>=<arg>:<type>:<description>. The description is important,
it is read by the LLM and used to determine how to use the argument.
<id> can be specified multiple times to give a tool multiple arguments.
<type> is one of string, number. <description> is a natural language
description.'''
)
parser.add_argument(
'--context',
help=f'Optional, specifies additional context text for the LLM.'
)
parser.add_argument(
'--max-iterations',
default=default_max_iterations,
help=f'Maximum number of react iterations (default: {default_max_iterations})',
)
parser.add_argument(
'--config-type',
default="agent",
help=f'Configuration key for prompts (default: agent)',
)
def run():
Processor.launch(module, __doc__)

View file

@ -5,7 +5,7 @@ class KnowledgeQueryImpl:
def __init__(self, context):
self.context = context
def invoke(self, **arguments):
return self.context.graph_rag.request(arguments.get("query"))
return self.context.graph_rag.request(arguments.get("question"))
# This tool implementation knows how to do text completion. This uses
# the prompt service, rather than talking to TextCompletion directly.

View file

@ -28,82 +28,6 @@ class Processor(ConsumerProducer):
def __init__(self, **params):
prompt_base = {}
# Parsing the prompt information to the prompt configuration
# structure
prompt_arg = params.get("prompt", [])
if prompt_arg:
for p in prompt_arg:
toks = p.split("=", 1)
if len(toks) < 2:
raise RuntimeError(f"Prompt string not well-formed: {p}")
prompt_base[toks[0]] = {
"template": toks[1]
}
prompt_response_type_arg = params.get("prompt_response_type", [])
if prompt_response_type_arg:
for p in prompt_response_type_arg:
toks = p.split("=", 1)
if len(toks) < 2:
raise RuntimeError(f"Response type not well-formed: {p}")
if toks[0] not in prompt_base:
raise RuntimeError(f"Response-type, {toks[0]} not known")
prompt_base[toks[0]]["response_type"] = toks[1]
prompt_schema_arg = params.get("prompt_schema", [])
if prompt_schema_arg:
for p in prompt_schema_arg:
toks = p.split("=", 1)
if len(toks) < 2:
raise RuntimeError(f"Schema arg not well-formed: {p}")
if toks[0] not in prompt_base:
raise RuntimeError(f"Schema, {toks[0]} not known")
try:
prompt_base[toks[0]]["schema"] = json.loads(toks[1])
except:
raise RuntimeError(f"Failed to parse JSON schema: {p}")
prompt_term_arg = params.get("prompt_term", [])
if prompt_term_arg:
for p in prompt_term_arg:
toks = p.split("=", 1)
if len(toks) < 2:
raise RuntimeError(f"Term arg not well-formed: {p}")
if toks[0] not in prompt_base:
raise RuntimeError(f"Term, {toks[0]} not known")
kvtoks = toks[1].split(":", 1)
if len(kvtoks) < 2:
raise RuntimeError(f"Term not well-formed: {toks[1]}")
k, v = kvtoks
if "terms" not in prompt_base[toks[0]]:
prompt_base[toks[0]]["terms"] = {}
prompt_base[toks[0]]["terms"][k] = v
global_terms = {}
global_term_arg = params.get("global_term", [])
if global_term_arg:
for t in global_term_arg:
toks = t.split("=", 1)
if len(toks) < 2:
raise RuntimeError(f"Global term arg not well-formed: {t}")
global_terms[toks[0]] = toks[1]
print(global_terms)
prompts = {
k: Prompt(**v)
for k, v in prompt_base.items()
}
prompt_configuration = PromptConfiguration(
system_template = params.get("system_prompt", ""),
global_terms = global_terms,
prompts = prompts
)
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
@ -113,12 +37,8 @@ class Processor(ConsumerProducer):
tc_response_queue = params.get(
"text_completion_response_queue", text_completion_response_queue
)
definition_template = params.get("definition_template")
relationship_template = params.get("relationship_template")
topic_template = params.get("topic_template")
rows_template = params.get("rows_template")
knowledge_query_template = params.get("knowledge_query_template")
document_query_template = params.get("document_query_template")
self.config_key = params.get("config_type", "prompt")
super(Processor, self).__init__(
**params | {
@ -151,11 +71,61 @@ class Processor(ConsumerProducer):
self.llm = Llm(self.llm)
# Null configuration, should reload quickly
self.manager = PromptManager(
llm = self.llm,
config = prompt_configuration,
config = PromptConfiguration("", {}, {})
)
async def on_config(self, version, config):
print("Loading configuration version", version)
if self.config_key not in config:
print(f"No key {self.config_key} in config", flush=True)
return
config = config[self.config_key]
try:
system = json.loads(config["system"])
ix = json.loads(config["template-index"])
prompts = {}
for k in ix:
pc = config[f"template.{k}"]
data = json.loads(pc)
prompt = data.get("prompt")
rtype = data.get("response-type", "text")
schema = data.get("schema", None)
prompts[k] = Prompt(
template = prompt,
response_type = rtype,
schema = schema,
terms = {}
)
self.manager = PromptManager(
self.llm,
PromptConfiguration(
system,
{},
prompts
)
)
print("Prompt configuration reloaded.", flush=True)
except Exception as e:
print("Exception:", e, flush=True)
print("Configuration reload failed", flush=True)
async def handle(self, msg):
v = msg.value()
@ -263,33 +233,9 @@ class Processor(ConsumerProducer):
)
parser.add_argument(
'--prompt', nargs='*',
help=f'Prompt template form id=template',
)
parser.add_argument(
'--prompt-response-type', nargs='*',
help=f'Prompt response type, form id=json|text',
)
parser.add_argument(
'--prompt-term', nargs='*',
help=f'Prompt response type, form id=key:value',
)
parser.add_argument(
'--prompt-schema', nargs='*',
help=f'Prompt response schema, form id=schema',
)
parser.add_argument(
'--system-prompt',
help=f'System prompt template',
)
parser.add_argument(
'--global-term', nargs='+',
help=f'Global term, form key:value'
'--config-type',
default="prompt",
help=f'Configuration key for prompts (default: prompt)',
)
def run():