diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 7a7024cf..f459dad9 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -27,6 +27,7 @@ scrape_configs: - 'vectorize:8000' - 'embeddings:8000' - 'kg-extract-definitions:8000' + - 'kg-extract-topics:8000' - 'kg-extract-relationships:8000' - 'store-graph-embeddings:8000' - 'store-triples:8000' diff --git a/scripts/kg-extract-topics b/scripts/kg-extract-topics new file mode 100755 index 00000000..e8ff2688 --- /dev/null +++ b/scripts/kg-extract-topics @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.extract.kg.topics import run + +run() + diff --git a/setup.py b/setup.py index 601524c9..94883544 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,7 @@ setuptools.setup( "scripts/graph-to-turtle", "scripts/init-pulsar-manager", "scripts/kg-extract-definitions", + "scripts/kg-extract-topics", "scripts/kg-extract-relationships", "scripts/load-graph-embeddings", "scripts/load-pdf", diff --git a/trustgraph/clients/prompt_client.py b/trustgraph/clients/prompt_client.py index dd1f4e5d..f7f5a3ef 100644 --- a/trustgraph/clients/prompt_client.py +++ b/trustgraph/clients/prompt_client.py @@ -44,6 +44,13 @@ class PromptClient(BaseClient): kind="extract-definitions", chunk=chunk, timeout=timeout ).definitions + + def request_topics(self, chunk, timeout=300): + + return self.call( + kind="extract-topics", chunk=chunk, + timeout=timeout + ).topics def request_relationships(self, chunk, timeout=300): diff --git a/trustgraph/extract/kg/topics/__init__.py b/trustgraph/extract/kg/topics/__init__.py new file mode 100644 index 00000000..81287a3c --- /dev/null +++ b/trustgraph/extract/kg/topics/__init__.py @@ -0,0 +1,3 @@ + +from . extract import * + diff --git a/trustgraph/extract/kg/topics/__main__.py b/trustgraph/extract/kg/topics/__main__.py new file mode 100755 index 00000000..403fe672 --- /dev/null +++ b/trustgraph/extract/kg/topics/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . extract import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/extract/kg/topics/extract.py b/trustgraph/extract/kg/topics/extract.py new file mode 100755 index 00000000..e2ebe5b0 --- /dev/null +++ b/trustgraph/extract/kg/topics/extract.py @@ -0,0 +1,134 @@ + +""" +Simple decoder, accepts embeddings+text chunks input, applies entity analysis to +get entity definitions which are output as graph edges. +""" + +import urllib.parse +import json + +from .... schema import ChunkEmbeddings, Triple, Source, Value +from .... schema import chunk_embeddings_ingest_queue, triples_store_queue +from .... schema import prompt_request_queue +from .... schema import prompt_response_queue +from .... log_level import LogLevel +from .... clients.prompt_client import PromptClient +from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION +from .... base import ConsumerProducer + +DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = chunk_embeddings_ingest_queue +default_output_queue = triples_store_queue +default_subscriber = module + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + pr_request_queue = params.get( + "prompt_request_queue", prompt_request_queue + ) + pr_response_queue = params.get( + "prompt_response_queue", prompt_response_queue + ) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": ChunkEmbeddings, + "output_schema": Triple, + "prompt_request_queue": pr_request_queue, + "prompt_response_queue": pr_response_queue, + } + ) + + self.prompt = PromptClient( + pulsar_host=self.pulsar_host, + input_queue=pr_request_queue, + output_queue=pr_response_queue, + subscriber = module + "-prompt", + ) + + def to_uri(self, text): + + part = text.replace(" ", "-").lower().encode("utf-8") + quoted = urllib.parse.quote(part) + uri = TRUSTGRAPH_ENTITIES + quoted + + return uri + + def get_topics(self, chunk): + + return self.prompt.request_topics(chunk) + + def emit_edge(self, s, p, o): + + t = Triple(s=s, p=p, o=o) + self.producer.send(t) + + def handle(self, msg): + + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) + + chunk = v.chunk.decode("utf-8") + + try: + + defs = self.get_topics(chunk) + + for defn in defs: + + s = defn.name + o = defn.definition + + if s == "": continue + if o == "": continue + + if s is None: continue + if o is None: continue + + s_uri = self.to_uri(s) + + s_value = Value(value=str(s_uri), is_uri=True) + o_value = Value(value=str(o), is_uri=False) + + self.emit_edge(s_value, DEFINITION_VALUE, o_value) + + except Exception as e: + print("Exception: ", e, flush=True) + + print("Done.", flush=True) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '--prompt-request-queue', + default=prompt_request_queue, + help=f'Prompt request queue (default: {prompt_request_queue})', + ) + + parser.add_argument( + '--prompt-completion-response-queue', + default=prompt_response_queue, + help=f'Prompt response queue (default: {prompt_response_queue})', + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph/model/prompt/generic/prompts.py b/trustgraph/model/prompt/generic/prompts.py index e9254dd9..c16afc89 100644 --- a/trustgraph/model/prompt/generic/prompts.py +++ b/trustgraph/model/prompt/generic/prompts.py @@ -1,50 +1,92 @@ def to_relationships(text): - prompt = f""" -Study the following text and derive entity relationships. For each -relationship, derive the subject, predicate and object of the relationship. -Output relationships in JSON format as an arary of objects with fields: -- subject: the subject of the relationship -- predicate: the predicate -- object: the object of the relationship -- object-entity: false if the object is a simple data type: name, value or date. true if it is an entity. - + prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text. - +Read the provided text. You will model the text as an information network for a RDF knowledge graph in JSON. + +Information Network Rules: +- An information network has subjects connected by predicates to objects. +- A subject is a named-entity or a conceptual topic. +- One subject can have many predicates and objects. +- An object is a property or attribute of a subject. +- A subject can be connected by a predicate to another subject. + +Reading Instructions: +- Ignore document formatting in the provided text. +- Study the provided text carefully. + +Here is the text: {text} - - -You will respond only with raw JSON format data. Do not provide -explanations. Do not use special characters in the abstract text. The -abstract must be written as plain text. Do not add markdown formatting -or headers or prefixes. -""" +Response Instructions: +- Obey the information network rules. +- Do not return special characters. +- Respond only with well-formed JSON. +- The JSON response shall be an array of JSON objects with keys "subject", "predicate", "object", and "object-entity". +- The JSON response shall use the following structure: + +```json +[{{"subject": string, "predicate": string, "object": string, "object-entity": boolean}}] +``` + +- The key "object-entity" is TRUE only if the "object" is a subject. +- Do not write any additional text or explanations. +""" + + return prompt + +def to_topics(text): + + prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text.\nRead the provided text. You will identify topics and their definitions in JSON. + +Reading Instructions: +- Ignore document formatting in the provided text. +- Study the provided text carefully. + +Here is the text: +{text} + +Response Instructions: +- Do not respond with special characters. +- Return only topics that are concepts and unique to the provided text. +- Respond only with well-formed JSON. +- The JSON response shall be an array of objects with keys "topic" and "definition". +- The JSON response shall use the following structure: + +```json +[{{"topic": string, "definition": string}}] +``` + +- Do not write any additional text or explanations. +""" return prompt def to_definitions(text): - prompt = f""" -Study the following text and derive definitions for any discovered entities. -Do not provide definitions for entities whose definitions are incomplete -or unknown. -Output relationships in JSON format as an arary of objects with fields: -- entity: the name of the entity -- definition: English text which defines the entity - + prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text.\nRead the provided text. You will identify entities and their definitions in JSON. - +Reading Instructions: +- Ignore document formatting in the provided text. +- Study the provided text carefully. + +Here is the text: {text} - - -You will respond only with raw JSON format data. Do not provide -explanations. Do not use special characters in the abstract text. The -abstract will be written as plain text. Do not add markdown formatting -or headers or prefixes. Do not include null or unknown definitions. -""" +Response Instructions: +- Do not respond with special characters. +- Return only entities that are named-entities such as: people, organizations, physical objects, locations, animals, products, commodotities, or substances. +- Respond only with well-formed JSON. +- The JSON response shall be an array of objects with keys "entity" and "definition". +- The JSON response shall use the following structure: + +```json +[{{"entity": string, "definition": string}}] +``` + +- Do not write any additional text or explanations. +""" return prompt diff --git a/trustgraph/model/prompt/generic/service.py b/trustgraph/model/prompt/generic/service.py index bc78f664..16986980 100755 --- a/trustgraph/model/prompt/generic/service.py +++ b/trustgraph/model/prompt/generic/service.py @@ -3,8 +3,10 @@ Language service abstracts prompt engineering from LLM. """ import json +import re from .... schema import Definition, Relationship, Triple +from .... schema import Topic from .... schema import PromptRequest, PromptResponse, Error from .... schema import TextCompletionRequest, TextCompletionResponse from .... schema import text_completion_request_queue @@ -13,7 +15,7 @@ from .... schema import prompt_request_queue, prompt_response_queue from .... base import ConsumerProducer from .... clients.llm_client import LlmClient -from . prompts import to_definitions, to_relationships +from . prompts import to_definitions, to_relationships, to_topics from . prompts import to_kg_query, to_document_query, to_rows module = ".".join(__name__.split(".")[1:-1]) @@ -56,12 +58,15 @@ class Processor(ConsumerProducer): ) def parse_json(self, text): - - # Hacky, workaround temperamental JSON markdown - text = text.replace("```json", "") - text = text.replace("```", "") + json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL) + + if json_match: + json_str = json_match.group(1).strip() + else: + # If no delimiters, assume the entire output is JSON + json_str = text.strip() - return json.loads(text) + return json.loads(json_str) def handle(self, msg): @@ -80,6 +85,11 @@ class Processor(ConsumerProducer): self.handle_extract_definitions(id, v) return + elif kind == "extract-topics": + + self.handle_extract_topics(id, v) + return + elif kind == "extract-relationships": self.handle_extract_relationships(id, v) @@ -164,6 +174,65 @@ class Processor(ConsumerProducer): self.producer.send(r, properties={"id": id}) + def handle_extract_topics(self, id, v): + + try: + + prompt = to_topics(v.chunk) + + ans = self.llm.request(prompt) + + # Silently ignore JSON parse error + try: + defs = self.parse_json(ans) + except: + print("JSON parse error, ignored", flush=True) + defs = [] + + output = [] + + for defn in defs: + + try: + e = defn["topic"] + d = defn["definition"] + + if e == "": continue + if e is None: continue + if d == "": continue + if d is None: continue + + output.append( + Topic( + name=e, definition=d + ) + ) + + except: + print("definition fields missing, ignored", flush=True) + + print("Send response...", flush=True) + r = PromptResponse(topics=output, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = PromptResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + response=None, + ) + + self.producer.send(r, properties={"id": id}) + def handle_extract_relationships(self, id, v): try: diff --git a/trustgraph/model/prompt/template/prompts.py b/trustgraph/model/prompt/template/prompts.py index 8b25c621..e3148157 100644 --- a/trustgraph/model/prompt/template/prompts.py +++ b/trustgraph/model/prompt/template/prompts.py @@ -5,6 +5,9 @@ def to_relationships(template, text): def to_definitions(template, text): return template.format(text=text) +def to_topics(template, text): + return template.format(text=text) + def to_rows(template, schema, text): field_schema = [ diff --git a/trustgraph/model/prompt/template/service.py b/trustgraph/model/prompt/template/service.py index ce595720..14b65d5a 100755 --- a/trustgraph/model/prompt/template/service.py +++ b/trustgraph/model/prompt/template/service.py @@ -4,8 +4,10 @@ Language service abstracts prompt engineering from LLM. """ import json +import re from .... schema import Definition, Relationship, Triple +from .... schema import Topic from .... schema import PromptRequest, PromptResponse, Error from .... schema import TextCompletionRequest, TextCompletionResponse from .... schema import text_completion_request_queue @@ -15,7 +17,7 @@ from .... base import ConsumerProducer from .... clients.llm_client import LlmClient from . prompts import to_definitions, to_relationships, to_rows -from . prompts import to_kg_query, to_document_query +from . prompts import to_kg_query, to_document_query, to_topics module = ".".join(__name__.split(".")[1:-1]) @@ -38,6 +40,7 @@ class Processor(ConsumerProducer): ) definition_template = params.get("definition_template") relationship_template = params.get("relationship_template") + topic_template = params.get("topic_template") rows_template = params.get("rows_template") knowledge_query_template = params.get("knowledge_query_template") document_query_template = params.get("document_query_template") @@ -62,18 +65,22 @@ class Processor(ConsumerProducer): ) self.definition_template = definition_template + self.topic_template = topic_template self.relationship_template = relationship_template self.rows_template = rows_template self.knowledge_query_template = knowledge_query_template self.document_query_template = document_query_template def parse_json(self, text): - - # Hacky, workaround temperamental JSON markdown - text = text.replace("```json", "") - text = text.replace("```", "") + json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL) + + if json_match: + json_str = json_match.group(1).strip() + else: + # If no delimiters, assume the entire output is JSON + json_str = text.strip() - return json.loads(text) + return json.loads(json_str) def handle(self, msg): @@ -92,6 +99,11 @@ class Processor(ConsumerProducer): self.handle_extract_definitions(id, v) return + elif kind == "extract-topics": + + self.handle_extract_topics(id, v) + return + elif kind == "extract-relationships": self.handle_extract_relationships(id, v) @@ -176,6 +188,66 @@ class Processor(ConsumerProducer): self.producer.send(r, properties={"id": id}) + def handle_extract_topics(self, id, v): + + try: + + prompt = to_topics(self.topic_template, v.chunk) + + ans = self.llm.request(prompt) + + # Silently ignore JSON parse error + try: + defs = self.parse_json(ans) + except: + print("JSON parse error, ignored", flush=True) + defs = [] + + output = [] + + for defn in defs: + + try: + e = defn["topic"] + d = defn["definition"] + + if e == "": continue + if e is None: continue + if d == "": continue + if d is None: continue + + output.append( + Topic( + name=e, definition=d + ) + ) + + except: + print("definition fields missing, ignored", flush=True) + + print("Send response...", flush=True) + r = PromptResponse(topics=output, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = PromptResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + response=None, + ) + + self.producer.send(r, properties={"id": id}) + + def handle_extract_relationships(self, id, v): try: @@ -415,6 +487,12 @@ class Processor(ConsumerProducer): help=f'Definition extraction template', ) + parser.add_argument( + '--topic-template', + required=True, + help=f'Topic extraction template', + ) + parser.add_argument( '--rows-template', required=True, diff --git a/trustgraph/schema/prompt.py b/trustgraph/schema/prompt.py index 69f81ff3..c7dbfd43 100644 --- a/trustgraph/schema/prompt.py +++ b/trustgraph/schema/prompt.py @@ -12,6 +12,10 @@ class Definition(Record): name = String() definition = String() +class Topic(Record): + name = String() + definition = String() + class Relationship(Record): s = String() p = String() @@ -46,6 +50,7 @@ class PromptResponse(Record): error = Error() answer = String() definitions = Array(Definition()) + topics = Array(Topic()) relationships = Array(Relationship()) rows = Array(Map(String()))