diff --git a/trustgraph/kg/extract_definitions/extract.py b/trustgraph/kg/extract_definitions/extract.py index 7ff569dc..c86e926a 100755 --- a/trustgraph/kg/extract_definitions/extract.py +++ b/trustgraph/kg/extract_definitions/extract.py @@ -4,57 +4,38 @@ Simple decoder, accepts vector+text chunks input, applies entity analysis to get entity definitions which are output as graph edges. """ -import pulsar -from pulsar.schema import JsonSchema -from langchain_community.document_loaders import PyPDFLoader -import tempfile -import base64 -import os -import argparse -import rdflib -import json -import urllib.parse -import time - from ... schema import VectorsChunk, Triple, Source, Value from ... log_level import LogLevel from ... llm_client import LlmClient from ... prompts import to_definitions from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION +from ... base import ConsumerProducer DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'vectors-chunk-load' default_output_queue = 'graph-load' default_subscriber = 'kg-extract-definitions' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, log_level=LogLevel.INFO, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(VectorsChunk), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(Triple), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=VectorsChunk, + output_schema=Triple, ) self.llm = LlmClient(pulsar_host=pulsar_host) @@ -81,117 +62,44 @@ class Processor: t = Triple(s=s, p=p, o=o) self.producer.send(t) - def run(self): + def handle(self, msg): - while True: + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) - msg = self.consumer.receive() - - try: - - v = msg.value() - print(f"Indexing {v.source.id}...", flush=True) - - chunk = v.chunk.decode("utf-8") - - g = rdflib.Graph() - - try: - - defs = self.get_definitions(chunk) - print(json.dumps(defs, indent=4), flush=True) - - for defn in defs: - - s = defn["entity"] - s_uri = self.to_uri(s) - - o = defn["definition"] - - s_value = Value(value=str(s_uri), is_uri=True) - o_value = Value(value=str(o), is_uri=False) - - self.emit_edge(s_value, DEFINITION_VALUE, o_value) - - except Exception as e: - print("Exception: ", e, flush=True) - - print("Done.", flush=True) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception: ", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() - -def run(): - - parser = argparse.ArgumentParser( - prog='pdf-decoder', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - args = parser.parse_args() - - while True: + chunk = v.chunk.decode("utf-8") try: - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - ) + defs = self.get_definitions(chunk) + print(json.dumps(defs, indent=4), flush=True) - p.run() + for defn in defs: + + s = defn["entity"] + s_uri = self.to_uri(s) + + o = defn["definition"] + + s_value = Value(value=str(s_uri), is_uri=True) + o_value = Value(value=str(o), is_uri=False) + + self.emit_edge(s_value, DEFINITION_VALUE, o_value) except Exception as e: + print("Exception: ", e, flush=True) - print("Exception:", e, flush=True) - print("Will retry...", flush=True) + print("Done.", flush=True) - time.sleep(10) + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + +def run(): + + Processor.start("kg-extract-definitions", __doc__) diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py index 14f837df..e2aaeef1 100755 --- a/trustgraph/kg/extract_relationships/extract.py +++ b/trustgraph/kg/extract_relationships/extract.py @@ -5,37 +5,27 @@ relationship analysis to get entity relationship edges which are output as graph edges. """ -import pulsar from pulsar.schema import JsonSchema -from langchain_community.document_loaders import PyPDFLoader -import tempfile -import base64 -import os -import argparse -import rdflib -import json -import urllib.parse -import time from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value from ... log_level import LogLevel from ... llm_client import LlmClient from ... prompts import to_relationships from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES +from ... base import ConsumerProducer RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'vectors-chunk-load' default_output_queue = 'graph-load' default_subscriber = 'kg-extract-relationships' default_vector_queue='vectors-load' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, vector_queue=default_vector_queue, output_queue=default_output_queue, @@ -43,19 +33,14 @@ class Processor: log_level=LogLevel.INFO, ): - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(VectorsChunk), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(Triple), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=VectorsChunk, + output_schema=Triple, ) self.vec_prod = self.client.create_producer( @@ -92,162 +77,91 @@ class Processor: r = VectorsAssociation(entity=ent, vectors=vec) self.vec_prod.send(r) - def run(self): + def handle(self, msg): - while True: + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) - msg = self.consumer.receive() + chunk = v.chunk.decode("utf-8") - try: - - v = msg.value() - print(f"Indexing {v.source.id}...", flush=True) - - chunk = v.chunk.decode("utf-8") - - g = rdflib.Graph() - - try: - - rels = self.get_relationships(chunk) - print(json.dumps(rels, indent=4), flush=True) - - for rel in rels: - - s = rel["subject"] - p = rel["predicate"] - o = rel["object"] - - s_uri = self.to_uri(s) - s_value = Value(value=str(s_uri), is_uri=True) - - p_uri = self.to_uri(p) - p_value = Value(value=str(p_uri), is_uri=True) - - if rel["object-entity"]: - o_uri = self.to_uri(o) - o_value = Value(value=str(o_uri), is_uri=True) - else: - o_value = Value(value=str(o), is_uri=False) - - self.emit_edge( - s_value, - p_value, - o_value - ) - - # Label for s - self.emit_edge( - s_value, - RDF_LABEL_VALUE, - Value(value=str(s), is_uri=False) - ) - - # Label for p - self.emit_edge( - p_value, - RDF_LABEL_VALUE, - Value(value=str(p), is_uri=False) - ) - - if rel["object-entity"]: - # Label for o - self.emit_edge( - o_value, - RDF_LABEL_VALUE, - Value(value=str(o), is_uri=False) - ) - - self.emit_vec(s_value, v.vectors) - self.emit_vec(p_value, v.vectors) - if rel["object-entity"]: - self.emit_vec(o_value, v.vectors) - - except Exception as e: - print("Exception: ", e, flush=True) - - print("Done.", flush=True) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception: ", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - self.client.close() - -def run(): - - parser = argparse.ArgumentParser( - prog='kg-extract-relationships', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-c', '--vector-queue', - default=default_vector_queue, - help=f'Vector output queue (default: {default_vector_queue})' - ) - - args = parser.parse_args() - - while True: + g = rdflib.Graph() try: - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - vector_queue=args.vector_queue, - subscriber=args.subscriber, - log_level=args.log_level, - ) + rels = self.get_relationships(chunk) + print(json.dumps(rels, indent=4), flush=True) - p.run() + for rel in rels: + + s = rel["subject"] + p = rel["predicate"] + o = rel["object"] + + s_uri = self.to_uri(s) + s_value = Value(value=str(s_uri), is_uri=True) + + p_uri = self.to_uri(p) + p_value = Value(value=str(p_uri), is_uri=True) + + if rel["object-entity"]: + o_uri = self.to_uri(o) + o_value = Value(value=str(o_uri), is_uri=True) + else: + o_value = Value(value=str(o), is_uri=False) + + self.emit_edge( + s_value, + p_value, + o_value + ) + + # Label for s + self.emit_edge( + s_value, + RDF_LABEL_VALUE, + Value(value=str(s), is_uri=False) + ) + + # Label for p + self.emit_edge( + p_value, + RDF_LABEL_VALUE, + Value(value=str(p), is_uri=False) + ) + + if rel["object-entity"]: + # Label for o + self.emit_edge( + o_value, + RDF_LABEL_VALUE, + Value(value=str(o), is_uri=False) + ) + + self.emit_vec(s_value, v.vectors) + self.emit_vec(p_value, v.vectors) + if rel["object-entity"]: + self.emit_vec(o_value, v.vectors) except Exception as e: + print("Exception: ", e, flush=True) - print("Exception:", e, flush=True) - print("Will retry...", flush=True) + print("Done.", flush=True) - time.sleep(10) + @staticmethod + def add_args(parser): + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-c', '--vector-queue', + default=default_vector_queue, + help=f'Vector output queue (default: {default_vector_queue})' + ) + +def run(): + + Processor.start("kg-extract-relationships", __doc__)