This commit is contained in:
Cyber MacGeddon 2024-07-17 16:17:08 +01:00
parent 48386bc7f3
commit 25e3c8e97d
2 changed files with 127 additions and 305 deletions

View file

@ -4,57 +4,38 @@ Simple decoder, accepts vector+text chunks input, applies entity analysis to
get entity definitions which are output as graph edges.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse
import time
from ... schema import VectorsChunk, Triple, Source, Value
from ... log_level import LogLevel
from ... llm_client import LlmClient
from ... prompts import to_definitions
from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION
from ... base import ConsumerProducer
DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-definitions'
class Processor:
class Processor(ConsumerProducer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
log_level=LogLevel.INFO,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsChunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
input_schema=VectorsChunk,
output_schema=Triple,
)
self.llm = LlmClient(pulsar_host=pulsar_host)
@ -81,117 +62,44 @@ class Processor:
t = Triple(s=s, p=p, o=o)
self.producer.send(t)
def run(self):
def handle(self, msg):
while True:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try:
defs = self.get_definitions(chunk)
print(json.dumps(defs, indent=4), flush=True)
for defn in defs:
s = defn["entity"]
s_uri = self.to_uri(s)
o = defn["definition"]
s_value = Value(value=str(s_uri), is_uri=True)
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(s_value, DEFINITION_VALUE, o_value)
except Exception as e:
print("Exception: ", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception: ", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
chunk = v.chunk.decode("utf-8")
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
defs = self.get_definitions(chunk)
print(json.dumps(defs, indent=4), flush=True)
p.run()
for defn in defs:
s = defn["entity"]
s_uri = self.to_uri(s)
o = defn["definition"]
s_value = Value(value=str(s_uri), is_uri=True)
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(s_value, DEFINITION_VALUE, o_value)
except Exception as e:
print("Exception: ", e, flush=True)
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
print("Done.", flush=True)
time.sleep(10)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
def run():
Processor.start("kg-extract-definitions", __doc__)

View file

@ -5,37 +5,27 @@ relationship analysis to get entity relationship edges which are output as
graph edges.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse
import time
from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value
from ... log_level import LogLevel
from ... llm_client import LlmClient
from ... prompts import to_relationships
from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES
from ... base import ConsumerProducer
RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-relationships'
default_vector_queue='vectors-load'
class Processor:
class Processor(ConsumerProducer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
vector_queue=default_vector_queue,
output_queue=default_output_queue,
@ -43,19 +33,14 @@ class Processor:
log_level=LogLevel.INFO,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsChunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
input_schema=VectorsChunk,
output_schema=Triple,
)
self.vec_prod = self.client.create_producer(
@ -92,162 +77,91 @@ class Processor:
r = VectorsAssociation(entity=ent, vectors=vec)
self.vec_prod.send(r)
def run(self):
def handle(self, msg):
while True:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
msg = self.consumer.receive()
chunk = v.chunk.decode("utf-8")
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try:
rels = self.get_relationships(chunk)
print(json.dumps(rels, indent=4), flush=True)
for rel in rels:
s = rel["subject"]
p = rel["predicate"]
o = rel["object"]
s_uri = self.to_uri(s)
s_value = Value(value=str(s_uri), is_uri=True)
p_uri = self.to_uri(p)
p_value = Value(value=str(p_uri), is_uri=True)
if rel["object-entity"]:
o_uri = self.to_uri(o)
o_value = Value(value=str(o_uri), is_uri=True)
else:
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(
s_value,
p_value,
o_value
)
# Label for s
self.emit_edge(
s_value,
RDF_LABEL_VALUE,
Value(value=str(s), is_uri=False)
)
# Label for p
self.emit_edge(
p_value,
RDF_LABEL_VALUE,
Value(value=str(p), is_uri=False)
)
if rel["object-entity"]:
# Label for o
self.emit_edge(
o_value,
RDF_LABEL_VALUE,
Value(value=str(o), is_uri=False)
)
self.emit_vec(s_value, v.vectors)
self.emit_vec(p_value, v.vectors)
if rel["object-entity"]:
self.emit_vec(o_value, v.vectors)
except Exception as e:
print("Exception: ", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception: ", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='kg-extract-relationships',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-c', '--vector-queue',
default=default_vector_queue,
help=f'Vector output queue (default: {default_vector_queue})'
)
args = parser.parse_args()
while True:
g = rdflib.Graph()
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
vector_queue=args.vector_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
rels = self.get_relationships(chunk)
print(json.dumps(rels, indent=4), flush=True)
p.run()
for rel in rels:
s = rel["subject"]
p = rel["predicate"]
o = rel["object"]
s_uri = self.to_uri(s)
s_value = Value(value=str(s_uri), is_uri=True)
p_uri = self.to_uri(p)
p_value = Value(value=str(p_uri), is_uri=True)
if rel["object-entity"]:
o_uri = self.to_uri(o)
o_value = Value(value=str(o_uri), is_uri=True)
else:
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(
s_value,
p_value,
o_value
)
# Label for s
self.emit_edge(
s_value,
RDF_LABEL_VALUE,
Value(value=str(s), is_uri=False)
)
# Label for p
self.emit_edge(
p_value,
RDF_LABEL_VALUE,
Value(value=str(p), is_uri=False)
)
if rel["object-entity"]:
# Label for o
self.emit_edge(
o_value,
RDF_LABEL_VALUE,
Value(value=str(o), is_uri=False)
)
self.emit_vec(s_value, v.vectors)
self.emit_vec(p_value, v.vectors)
if rel["object-entity"]:
self.emit_vec(o_value, v.vectors)
except Exception as e:
print("Exception: ", e, flush=True)
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
print("Done.", flush=True)
time.sleep(10)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument(
'-c', '--vector-queue',
default=default_vector_queue,
help=f'Vector output queue (default: {default_vector_queue})'
)
def run():
Processor.start("kg-extract-relationships", __doc__)