diff --git a/trustgraph-base/trustgraph/base/base_processor.py b/trustgraph-base/trustgraph/base/base_processor.py index 35551b4f..05cdb940 100644 --- a/trustgraph-base/trustgraph/base/base_processor.py +++ b/trustgraph-base/trustgraph/base/base_processor.py @@ -67,6 +67,7 @@ class BaseProcessor: self.config_subscriber = self.client.subscribe( self.config_push_queue, config_subscriber_id, consumer_type=pulsar.ConsumerType.Shared, + initial_position=pulsar.InitialPosition.Earliest, schema=JsonSchema(ConfigPush), ) diff --git a/trustgraph-cli/scripts/tg-init-pulsar b/trustgraph-cli/scripts/tg-init-pulsar index c7d447bd..8487966a 100755 --- a/trustgraph-cli/scripts/tg-init-pulsar +++ b/trustgraph-cli/scripts/tg-init-pulsar @@ -132,7 +132,7 @@ def init(pulsar_admin_url, pulsar_host, pulsar_api_key, config, tenant): "retention_policies": { "retentionSizeInMB": -1, "retentionTimeInMinutes": 3, - "subscription_expiration_time_minutes": 30, + "subscriptionExpirationTimeMinutes": 30, } }) @@ -140,7 +140,7 @@ def init(pulsar_admin_url, pulsar_host, pulsar_api_key, config, tenant): "retention_policies": { "retentionSizeInMB": 10, "retentionTimeInMinutes": -1, - "subscription_expiration_time_minutes": 5, + "subscriptionExpirationTimeMinutes": 5, } }) diff --git a/trustgraph-flow/trustgraph/agent/react/agent_manager.py b/trustgraph-flow/trustgraph/agent/react/agent_manager.py index 5d071e30..a195bd80 100644 --- a/trustgraph-flow/trustgraph/agent/react/agent_manager.py +++ b/trustgraph-flow/trustgraph/agent/react/agent_manager.py @@ -85,19 +85,19 @@ class AgentManager: return a - def react(self, question, history, think, observe): + async def react(self, question, history, think, observe): act = self.reason(question, history) logger.info(f"act: {act}") if isinstance(act, Final): - think(act.thought) + await think(act.thought) return act else: - think(act.thought) + await think(act.thought) if act.name in self.tools: action = self.tools[act.name] @@ -110,7 +110,7 @@ class AgentManager: logger.info(f"resp: {resp}") - observe(resp) + await observe(resp) act.observation = resp diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index bc045b71..224efe3c 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -36,92 +36,12 @@ class Processor(ConsumerProducer): def __init__(self, **params): - additional = params.get("context", None) - - self.max_iterations = int(params.get("max_iterations", default_max_iterations)) + self.max_iterations = int( + params.get("max_iterations", default_max_iterations) + ) tools = {} - # Parsing the prompt information to the prompt configuration - # structure - tool_type_arg = params.get("tool_type", []) - if tool_type_arg: - for t in tool_type_arg: - toks = t.split("=", 1) - if len(toks) < 2: - raise RuntimeError( - f"Tool-type string not well-formed: {t}" - ) - ttoks = toks[1].split(":", 1) - if len(ttoks) < 1: - raise RuntimeError( - f"Tool-type string not well-formed: {t}" - ) - - if ttoks[0] == "knowledge-query": - impl = KnowledgeQueryImpl(self) - elif ttoks[0] == "text-completion": - impl = TextCompletionImpl(self) - else: - raise RuntimeError( - f"Tool-kind {ttoks[0]} not known" - ) - - if len(ttoks) == 1: - - tools[toks[0]] = Tool( - name = toks[0], - description = "", - implementation = impl, - config = { "input": "query" }, - arguments = {}, - ) - else: - tools[toks[0]] = Tool( - name = toks[0], - description = "", - implementation = impl, - config = { "input": ttoks[1] }, - arguments = {}, - ) - - # parsing the prompt information to the prompt configuration - # structure - tool_desc_arg = params.get("tool_description", []) - if tool_desc_arg: - for t in tool_desc_arg: - toks = t.split("=", 1) - if len(toks) < 2: - raise runtimeerror( - f"tool-type string not well-formed: {t}" - ) - if toks[0] not in tools: - raise runtimeerror(f"description, tool {toks[0]} not known") - tools[toks[0]].description = toks[1] - - # Parsing the prompt information to the prompt configuration - # structure - tool_arg_arg = params.get("tool_argument", []) - if tool_arg_arg: - for t in tool_arg_arg: - toks = t.split("=", 1) - if len(toks) < 2: - raise RuntimeError( - f"Tool-type string not well-formed: {t}" - ) - ttoks = toks[1].split(":", 2) - if len(ttoks) != 3: - raise RuntimeError( - f"Tool argument string not well-formed: {t}" - ) - if toks[0] not in tools: - raise RuntimeError(f"Description, tool {toks[0]} not known") - tools[toks[0]].arguments[ttoks[0]] = Argument( - name = ttoks[0], - type = ttoks[1], - description = ttoks[2] - ) - input_queue = params.get("input_queue", default_input_queue) output_queue = params.get("output_queue", default_output_queue) subscriber = params.get("subscriber", default_subscriber) @@ -138,6 +58,8 @@ class Processor(ConsumerProducer): "graph_rag_response_queue", gr_response_queue ) + self.config_key = params.get("config_type", "agent") + super(Processor, self).__init__( **params | { "input_queue": input_queue, @@ -176,20 +98,74 @@ class Processor(ConsumerProducer): self.agent = AgentManager( context=self, - tools=tools, - additional_context=additional + tools=[], + additional_context="", ) - def parse_json(self, text): - json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL) - - if json_match: - json_str = json_match.group(1).strip() - else: - # If no delimiters, assume the entire output is JSON - json_str = text.strip() + async def on_config(self, version, config): - return json.loads(json_str) + print("Loading configuration version", version) + + if self.config_key not in config: + print(f"No key {self.config_key} in config", flush=True) + return + + config = config[self.config_key] + + try: + + # This is some extra stuff to put in the prompt + additional = config.get("additional-context", None) + + ix = json.loads(config["tool-index"]) + + tools = {} + + for k in ix: + + pc = config[f"tool.{k}"] + data = json.loads(pc) + + arguments = { + v.get("name"): Argument( + name = v.get("name"), + type = v.get("type"), + description = v.get("description") + ) + for v in data["arguments"] + } + + impl_id = data.get("type") + + if impl_id == "knowledge-query": + impl = KnowledgeQueryImpl(self) + elif impl_id == "text-completion": + impl = TextCompletionImpl(self) + else: + raise RuntimeError( + f"Tool-kind {impl_id} not known" + ) + + tools[data.get("name")] = Tool( + name = data.get("name"), + description = data.get("description"), + implementation = impl, + config=data.get("config", {}), + arguments = arguments, + ) + + self.agent = AgentManager( + context=self, + tools=tools, + additional_context=additional + ) + + print("Prompt configuration reloaded.", flush=True) + + except Exception as e: + + print("Exception:", e, flush=True) + print("Configuration reload failed", flush=True) async def handle(self, msg): @@ -246,7 +222,7 @@ class Processor(ConsumerProducer): await self.send(r, properties={"id": id}) - act = self.agent.react(v.question, history, think, observe) + act = await self.agent.react(v.question, history, think, observe) print(f"Action: {act}", flush=True) @@ -337,46 +313,18 @@ class Processor(ConsumerProducer): help=f'Graph RAG response queue (default: {gr_response_queue})', ) - parser.add_argument( - '--tool-type', nargs='*', - help=f'''Specifies the type of an agent tool. Takes the form -=. is the name of the tool. is one of -knowledge-query, text-completion. Additional parameters are specified -for different tools which are tool-specific. e.g. knowledge-query: -which specifies the name of the arg whose content is fed into the knowledge -query as a question. text-completion: specifies the name of the arg -whose content is fed into the text-completion service as a prompt''' - ) - - parser.add_argument( - '--tool-description', nargs='*', - help=f'''Specifies the textual description of a tool. Takes -the form =. The description is important, it teaches the -LLM how to use the tool. It should describe what it does and how to -use the arguments. This is specified in natural language.''' - ) - - parser.add_argument( - '--tool-argument', nargs='*', - help=f'''Specifies argument usage for a tool. Takes -the form =::. The description is important, -it is read by the LLM and used to determine how to use the argument. - can be specified multiple times to give a tool multiple arguments. - is one of string, number. is a natural language -description.''' - ) - - parser.add_argument( - '--context', - help=f'Optional, specifies additional context text for the LLM.' - ) - parser.add_argument( '--max-iterations', default=default_max_iterations, help=f'Maximum number of react iterations (default: {default_max_iterations})', ) + parser.add_argument( + '--config-type', + default="agent", + help=f'Configuration key for prompts (default: agent)', + ) + def run(): Processor.launch(module, __doc__) diff --git a/trustgraph-flow/trustgraph/agent/react/tools.py b/trustgraph-flow/trustgraph/agent/react/tools.py index 941610be..023abc02 100644 --- a/trustgraph-flow/trustgraph/agent/react/tools.py +++ b/trustgraph-flow/trustgraph/agent/react/tools.py @@ -5,7 +5,7 @@ class KnowledgeQueryImpl: def __init__(self, context): self.context = context def invoke(self, **arguments): - return self.context.graph_rag.request(arguments.get("query")) + return self.context.graph_rag.request(arguments.get("question")) # This tool implementation knows how to do text completion. This uses # the prompt service, rather than talking to TextCompletion directly. diff --git a/trustgraph-flow/trustgraph/model/prompt/template/service.py b/trustgraph-flow/trustgraph/model/prompt/template/service.py index 58657d7d..a1267114 100755 --- a/trustgraph-flow/trustgraph/model/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/template/service.py @@ -28,82 +28,6 @@ class Processor(ConsumerProducer): def __init__(self, **params): - prompt_base = {} - - # Parsing the prompt information to the prompt configuration - # structure - prompt_arg = params.get("prompt", []) - if prompt_arg: - for p in prompt_arg: - toks = p.split("=", 1) - if len(toks) < 2: - raise RuntimeError(f"Prompt string not well-formed: {p}") - prompt_base[toks[0]] = { - "template": toks[1] - } - - prompt_response_type_arg = params.get("prompt_response_type", []) - if prompt_response_type_arg: - for p in prompt_response_type_arg: - toks = p.split("=", 1) - if len(toks) < 2: - raise RuntimeError(f"Response type not well-formed: {p}") - if toks[0] not in prompt_base: - raise RuntimeError(f"Response-type, {toks[0]} not known") - prompt_base[toks[0]]["response_type"] = toks[1] - - prompt_schema_arg = params.get("prompt_schema", []) - if prompt_schema_arg: - for p in prompt_schema_arg: - toks = p.split("=", 1) - if len(toks) < 2: - raise RuntimeError(f"Schema arg not well-formed: {p}") - if toks[0] not in prompt_base: - raise RuntimeError(f"Schema, {toks[0]} not known") - try: - prompt_base[toks[0]]["schema"] = json.loads(toks[1]) - except: - raise RuntimeError(f"Failed to parse JSON schema: {p}") - - prompt_term_arg = params.get("prompt_term", []) - if prompt_term_arg: - for p in prompt_term_arg: - toks = p.split("=", 1) - if len(toks) < 2: - raise RuntimeError(f"Term arg not well-formed: {p}") - if toks[0] not in prompt_base: - raise RuntimeError(f"Term, {toks[0]} not known") - kvtoks = toks[1].split(":", 1) - if len(kvtoks) < 2: - raise RuntimeError(f"Term not well-formed: {toks[1]}") - k, v = kvtoks - if "terms" not in prompt_base[toks[0]]: - prompt_base[toks[0]]["terms"] = {} - prompt_base[toks[0]]["terms"][k] = v - - global_terms = {} - - global_term_arg = params.get("global_term", []) - if global_term_arg: - for t in global_term_arg: - toks = t.split("=", 1) - if len(toks) < 2: - raise RuntimeError(f"Global term arg not well-formed: {t}") - global_terms[toks[0]] = toks[1] - - print(global_terms) - - prompts = { - k: Prompt(**v) - for k, v in prompt_base.items() - } - - prompt_configuration = PromptConfiguration( - system_template = params.get("system_prompt", ""), - global_terms = global_terms, - prompts = prompts - ) - input_queue = params.get("input_queue", default_input_queue) output_queue = params.get("output_queue", default_output_queue) subscriber = params.get("subscriber", default_subscriber) @@ -113,12 +37,8 @@ class Processor(ConsumerProducer): tc_response_queue = params.get( "text_completion_response_queue", text_completion_response_queue ) - definition_template = params.get("definition_template") - relationship_template = params.get("relationship_template") - topic_template = params.get("topic_template") - rows_template = params.get("rows_template") - knowledge_query_template = params.get("knowledge_query_template") - document_query_template = params.get("document_query_template") + + self.config_key = params.get("config_type", "prompt") super(Processor, self).__init__( **params | { @@ -151,11 +71,61 @@ class Processor(ConsumerProducer): self.llm = Llm(self.llm) + # Null configuration, should reload quickly self.manager = PromptManager( llm = self.llm, - config = prompt_configuration, + config = PromptConfiguration("", {}, {}) ) + async def on_config(self, version, config): + + print("Loading configuration version", version) + + if self.config_key not in config: + print(f"No key {self.config_key} in config", flush=True) + return + + config = config[self.config_key] + + try: + + system = json.loads(config["system"]) + ix = json.loads(config["template-index"]) + + prompts = {} + + for k in ix: + + pc = config[f"template.{k}"] + data = json.loads(pc) + + prompt = data.get("prompt") + rtype = data.get("response-type", "text") + schema = data.get("schema", None) + + prompts[k] = Prompt( + template = prompt, + response_type = rtype, + schema = schema, + terms = {} + ) + + self.manager = PromptManager( + self.llm, + PromptConfiguration( + system, + {}, + prompts + ) + ) + + print("Prompt configuration reloaded.", flush=True) + + except Exception as e: + + print("Exception:", e, flush=True) + print("Configuration reload failed", flush=True) + async def handle(self, msg): v = msg.value() @@ -263,33 +233,9 @@ class Processor(ConsumerProducer): ) parser.add_argument( - '--prompt', nargs='*', - help=f'Prompt template form id=template', - ) - - parser.add_argument( - '--prompt-response-type', nargs='*', - help=f'Prompt response type, form id=json|text', - ) - - parser.add_argument( - '--prompt-term', nargs='*', - help=f'Prompt response type, form id=key:value', - ) - - parser.add_argument( - '--prompt-schema', nargs='*', - help=f'Prompt response schema, form id=schema', - ) - - parser.add_argument( - '--system-prompt', - help=f'System prompt template', - ) - - parser.add_argument( - '--global-term', nargs='+', - help=f'Global term, form key:value' + '--config-type', + default="prompt", + help=f'Configuration key for prompts (default: prompt)', ) def run():