Extraction upgrade (#61)

* Added KG Topics
* Updated prompt-template
* Fixed prompt-generic
This commit is contained in:
Jack Colquitt 2024-09-15 14:47:57 -07:00 committed by GitHub
parent 13f6b5d87f
commit 728ff7542a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 401 additions and 45 deletions

View file

@ -27,6 +27,7 @@ scrape_configs:
- 'vectorize:8000'
- 'embeddings:8000'
- 'kg-extract-definitions:8000'
- 'kg-extract-topics:8000'
- 'kg-extract-relationships:8000'
- 'store-graph-embeddings:8000'
- 'store-triples:8000'

6
scripts/kg-extract-topics Executable file
View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from trustgraph.extract.kg.topics import run
run()

View file

@ -75,6 +75,7 @@ setuptools.setup(
"scripts/graph-to-turtle",
"scripts/init-pulsar-manager",
"scripts/kg-extract-definitions",
"scripts/kg-extract-topics",
"scripts/kg-extract-relationships",
"scripts/load-graph-embeddings",
"scripts/load-pdf",

View file

@ -44,6 +44,13 @@ class PromptClient(BaseClient):
kind="extract-definitions", chunk=chunk,
timeout=timeout
).definitions
def request_topics(self, chunk, timeout=300):
return self.call(
kind="extract-topics", chunk=chunk,
timeout=timeout
).topics
def request_relationships(self, chunk, timeout=300):

View file

@ -0,0 +1,3 @@
from . extract import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . extract import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,134 @@
"""
Simple decoder, accepts embeddings+text chunks input, applies entity analysis to
get entity definitions which are output as graph edges.
"""
import urllib.parse
import json
from .... schema import ChunkEmbeddings, Triple, Source, Value
from .... schema import chunk_embeddings_ingest_queue, triples_store_queue
from .... schema import prompt_request_queue
from .... schema import prompt_response_queue
from .... log_level import LogLevel
from .... clients.prompt_client import PromptClient
from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION
from .... base import ConsumerProducer
DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True)
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = chunk_embeddings_ingest_queue
default_output_queue = triples_store_queue
default_subscriber = module
class Processor(ConsumerProducer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
pr_request_queue = params.get(
"prompt_request_queue", prompt_request_queue
)
pr_response_queue = params.get(
"prompt_response_queue", prompt_response_queue
)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": ChunkEmbeddings,
"output_schema": Triple,
"prompt_request_queue": pr_request_queue,
"prompt_response_queue": pr_response_queue,
}
)
self.prompt = PromptClient(
pulsar_host=self.pulsar_host,
input_queue=pr_request_queue,
output_queue=pr_response_queue,
subscriber = module + "-prompt",
)
def to_uri(self, text):
part = text.replace(" ", "-").lower().encode("utf-8")
quoted = urllib.parse.quote(part)
uri = TRUSTGRAPH_ENTITIES + quoted
return uri
def get_topics(self, chunk):
return self.prompt.request_topics(chunk)
def emit_edge(self, s, p, o):
t = Triple(s=s, p=p, o=o)
self.producer.send(t)
def handle(self, msg):
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
try:
defs = self.get_topics(chunk)
for defn in defs:
s = defn.name
o = defn.definition
if s == "": continue
if o == "": continue
if s is None: continue
if o is None: continue
s_uri = self.to_uri(s)
s_value = Value(value=str(s_uri), is_uri=True)
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(s_value, DEFINITION_VALUE, o_value)
except Exception as e:
print("Exception: ", e, flush=True)
print("Done.", flush=True)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument(
'--prompt-request-queue',
default=prompt_request_queue,
help=f'Prompt request queue (default: {prompt_request_queue})',
)
parser.add_argument(
'--prompt-completion-response-queue',
default=prompt_response_queue,
help=f'Prompt response queue (default: {prompt_response_queue})',
)
def run():
Processor.start(module, __doc__)

View file

@ -1,50 +1,92 @@
def to_relationships(text):
prompt = f"""<instructions>
Study the following text and derive entity relationships. For each
relationship, derive the subject, predicate and object of the relationship.
Output relationships in JSON format as an arary of objects with fields:
- subject: the subject of the relationship
- predicate: the predicate
- object: the object of the relationship
- object-entity: false if the object is a simple data type: name, value or date. true if it is an entity.
</instructions>
prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text.
<text>
Read the provided text. You will model the text as an information network for a RDF knowledge graph in JSON.
Information Network Rules:
- An information network has subjects connected by predicates to objects.
- A subject is a named-entity or a conceptual topic.
- One subject can have many predicates and objects.
- An object is a property or attribute of a subject.
- A subject can be connected by a predicate to another subject.
Reading Instructions:
- Ignore document formatting in the provided text.
- Study the provided text carefully.
Here is the text:
{text}
</text>
<requirements>
You will respond only with raw JSON format data. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract must be written as plain text. Do not add markdown formatting
or headers or prefixes.
</requirements>"""
Response Instructions:
- Obey the information network rules.
- Do not return special characters.
- Respond only with well-formed JSON.
- The JSON response shall be an array of JSON objects with keys "subject", "predicate", "object", and "object-entity".
- The JSON response shall use the following structure:
```json
[{{"subject": string, "predicate": string, "object": string, "object-entity": boolean}}]
```
- The key "object-entity" is TRUE only if the "object" is a subject.
- Do not write any additional text or explanations.
"""
return prompt
def to_topics(text):
prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text.\nRead the provided text. You will identify topics and their definitions in JSON.
Reading Instructions:
- Ignore document formatting in the provided text.
- Study the provided text carefully.
Here is the text:
{text}
Response Instructions:
- Do not respond with special characters.
- Return only topics that are concepts and unique to the provided text.
- Respond only with well-formed JSON.
- The JSON response shall be an array of objects with keys "topic" and "definition".
- The JSON response shall use the following structure:
```json
[{{"topic": string, "definition": string}}]
```
- Do not write any additional text or explanations.
"""
return prompt
def to_definitions(text):
prompt = f"""<instructions>
Study the following text and derive definitions for any discovered entities.
Do not provide definitions for entities whose definitions are incomplete
or unknown.
Output relationships in JSON format as an arary of objects with fields:
- entity: the name of the entity
- definition: English text which defines the entity
</instructions>
prompt = f"""You are a helpful assistant that performs information extraction tasks for a provided text.\nRead the provided text. You will identify entities and their definitions in JSON.
<text>
Reading Instructions:
- Ignore document formatting in the provided text.
- Study the provided text carefully.
Here is the text:
{text}
</text>
<requirements>
You will respond only with raw JSON format data. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract will be written as plain text. Do not add markdown formatting
or headers or prefixes. Do not include null or unknown definitions.
</requirements>"""
Response Instructions:
- Do not respond with special characters.
- Return only entities that are named-entities such as: people, organizations, physical objects, locations, animals, products, commodotities, or substances.
- Respond only with well-formed JSON.
- The JSON response shall be an array of objects with keys "entity" and "definition".
- The JSON response shall use the following structure:
```json
[{{"entity": string, "definition": string}}]
```
- Do not write any additional text or explanations.
"""
return prompt

View file

@ -3,8 +3,10 @@ Language service abstracts prompt engineering from LLM.
"""
import json
import re
from .... schema import Definition, Relationship, Triple
from .... schema import Topic
from .... schema import PromptRequest, PromptResponse, Error
from .... schema import TextCompletionRequest, TextCompletionResponse
from .... schema import text_completion_request_queue
@ -13,7 +15,7 @@ from .... schema import prompt_request_queue, prompt_response_queue
from .... base import ConsumerProducer
from .... clients.llm_client import LlmClient
from . prompts import to_definitions, to_relationships
from . prompts import to_definitions, to_relationships, to_topics
from . prompts import to_kg_query, to_document_query, to_rows
module = ".".join(__name__.split(".")[1:-1])
@ -56,12 +58,15 @@ class Processor(ConsumerProducer):
)
def parse_json(self, text):
# Hacky, workaround temperamental JSON markdown
text = text.replace("```json", "")
text = text.replace("```", "")
json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
else:
# If no delimiters, assume the entire output is JSON
json_str = text.strip()
return json.loads(text)
return json.loads(json_str)
def handle(self, msg):
@ -80,6 +85,11 @@ class Processor(ConsumerProducer):
self.handle_extract_definitions(id, v)
return
elif kind == "extract-topics":
self.handle_extract_topics(id, v)
return
elif kind == "extract-relationships":
self.handle_extract_relationships(id, v)
@ -164,6 +174,65 @@ class Processor(ConsumerProducer):
self.producer.send(r, properties={"id": id})
def handle_extract_topics(self, id, v):
try:
prompt = to_topics(v.chunk)
ans = self.llm.request(prompt)
# Silently ignore JSON parse error
try:
defs = self.parse_json(ans)
except:
print("JSON parse error, ignored", flush=True)
defs = []
output = []
for defn in defs:
try:
e = defn["topic"]
d = defn["definition"]
if e == "": continue
if e is None: continue
if d == "": continue
if d is None: continue
output.append(
Topic(
name=e, definition=d
)
)
except:
print("definition fields missing, ignored", flush=True)
print("Send response...", flush=True)
r = PromptResponse(topics=output, error=None)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = PromptResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
self.producer.send(r, properties={"id": id})
def handle_extract_relationships(self, id, v):
try:

View file

@ -5,6 +5,9 @@ def to_relationships(template, text):
def to_definitions(template, text):
return template.format(text=text)
def to_topics(template, text):
return template.format(text=text)
def to_rows(template, schema, text):
field_schema = [

View file

@ -4,8 +4,10 @@ Language service abstracts prompt engineering from LLM.
"""
import json
import re
from .... schema import Definition, Relationship, Triple
from .... schema import Topic
from .... schema import PromptRequest, PromptResponse, Error
from .... schema import TextCompletionRequest, TextCompletionResponse
from .... schema import text_completion_request_queue
@ -15,7 +17,7 @@ from .... base import ConsumerProducer
from .... clients.llm_client import LlmClient
from . prompts import to_definitions, to_relationships, to_rows
from . prompts import to_kg_query, to_document_query
from . prompts import to_kg_query, to_document_query, to_topics
module = ".".join(__name__.split(".")[1:-1])
@ -38,6 +40,7 @@ class Processor(ConsumerProducer):
)
definition_template = params.get("definition_template")
relationship_template = params.get("relationship_template")
topic_template = params.get("topic_template")
rows_template = params.get("rows_template")
knowledge_query_template = params.get("knowledge_query_template")
document_query_template = params.get("document_query_template")
@ -62,18 +65,22 @@ class Processor(ConsumerProducer):
)
self.definition_template = definition_template
self.topic_template = topic_template
self.relationship_template = relationship_template
self.rows_template = rows_template
self.knowledge_query_template = knowledge_query_template
self.document_query_template = document_query_template
def parse_json(self, text):
# Hacky, workaround temperamental JSON markdown
text = text.replace("```json", "")
text = text.replace("```", "")
json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
else:
# If no delimiters, assume the entire output is JSON
json_str = text.strip()
return json.loads(text)
return json.loads(json_str)
def handle(self, msg):
@ -92,6 +99,11 @@ class Processor(ConsumerProducer):
self.handle_extract_definitions(id, v)
return
elif kind == "extract-topics":
self.handle_extract_topics(id, v)
return
elif kind == "extract-relationships":
self.handle_extract_relationships(id, v)
@ -176,6 +188,66 @@ class Processor(ConsumerProducer):
self.producer.send(r, properties={"id": id})
def handle_extract_topics(self, id, v):
try:
prompt = to_topics(self.topic_template, v.chunk)
ans = self.llm.request(prompt)
# Silently ignore JSON parse error
try:
defs = self.parse_json(ans)
except:
print("JSON parse error, ignored", flush=True)
defs = []
output = []
for defn in defs:
try:
e = defn["topic"]
d = defn["definition"]
if e == "": continue
if e is None: continue
if d == "": continue
if d is None: continue
output.append(
Topic(
name=e, definition=d
)
)
except:
print("definition fields missing, ignored", flush=True)
print("Send response...", flush=True)
r = PromptResponse(topics=output, error=None)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = PromptResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
self.producer.send(r, properties={"id": id})
def handle_extract_relationships(self, id, v):
try:
@ -415,6 +487,12 @@ class Processor(ConsumerProducer):
help=f'Definition extraction template',
)
parser.add_argument(
'--topic-template',
required=True,
help=f'Topic extraction template',
)
parser.add_argument(
'--rows-template',
required=True,

View file

@ -12,6 +12,10 @@ class Definition(Record):
name = String()
definition = String()
class Topic(Record):
name = String()
definition = String()
class Relationship(Record):
s = String()
p = String()
@ -46,6 +50,7 @@ class PromptResponse(Record):
error = Error()
answer = String()
definitions = Array(Definition())
topics = Array(Topic())
relationships = Array(Relationship())
rows = Array(Map(String()))