Base classes (#2)

Simplify code using base classes
This commit is contained in:
cybermaggedon 2024-07-17 16:56:47 +01:00 committed by GitHub
parent d007a5c97c
commit a0def33dba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 746 additions and 1423 deletions

View file

@ -0,0 +1,3 @@
from . processor import *

View file

@ -0,0 +1,266 @@
import os
import argparse
import pulsar
import time
from pulsar.schema import JsonSchema
from .. log_level import LogLevel
class BaseProcessor:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
def __init__(
self,
pulsar_host=default_pulsar_host,
log_level=LogLevel.INFO,
):
self.client = None
if pulsar_host == None:
pulsar_host = default_pulsar_host
self.pulsar_host = pulsar_host
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
def __del__(self):
if self.client:
self.client.close()
@staticmethod
def add_args(parser):
parser.add_argument(
'-p', '--pulsar-host',
default=__class__.default_pulsar_host,
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
def run(self):
raise RuntimeError("Something should have implemented the run method")
@classmethod
def start(cls, prog, doc):
parser = argparse.ArgumentParser(
prog=prog,
description=doc
)
cls.add_args(parser)
args = parser.parse_args()
args = vars(args)
try:
p = cls(**args)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
class Consumer(BaseProcessor):
def __init__(
self,
pulsar_host=None,
log_level=LogLevel.INFO,
input_queue="input",
subscriber="subscriber",
input_schema=None,
):
super(Consumer, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
)
if input_schema == None:
raise RuntimeError("input_schema must be specified")
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(input_schema),
)
def run(self):
while True:
msg = self.consumer.receive()
try:
self.handle(msg)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
@staticmethod
def add_args(parser, default_input_queue, default_subscriber):
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
class ConsumerProducer(BaseProcessor):
def __init__(
self,
pulsar_host=None,
log_level=LogLevel.INFO,
input_queue="input",
output_queue="output",
subscriber="subscriber",
input_schema=None,
output_schema=None,
):
super(ConsumerProducer, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
)
if input_schema == None:
raise RuntimeError("input_schema must be specified")
if output_schema == None:
raise RuntimeError("output_schema must be specified")
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(input_schema),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(output_schema),
)
def run(self):
while True:
msg = self.consumer.receive()
try:
resp = self.handle(msg)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def send(self, msg, properties={}):
self.producer.send(msg, properties)
@staticmethod
def add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
):
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
class Producer(BaseProcessor):
def __init__(
self,
pulsar_host=None,
log_level=LogLevel.INFO,
output_queue="output",
output_schema=None,
):
super(Producer, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
)
if output_schema == None:
raise RuntimeError("output_schema must be specified")
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(output_schema),
)
def send(self, msg, properties={}):
self.producer.send(msg, properties)
@staticmethod
def add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
):
BaseProcessor.add_args(parser)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)

View file

@ -4,28 +4,22 @@ Simple decoder, accepts text documents on input, outputs chunks from the
as text as separate output objects. as text as separate output objects.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter
import time
from ... schema import TextDocument, Chunk, Source from ... schema import TextDocument, Chunk, Source
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'text-doc-load' default_input_queue = 'text-doc-load'
default_output_queue = 'chunk-load' default_output_queue = 'chunk-load'
default_subscriber = 'chunker-recursive' default_subscriber = 'chunker-recursive'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -34,21 +28,14 @@ class Processor:
chunk_overlap=100, chunk_overlap=100,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=TextDocument,
self.consumer = self.client.subscribe( output_schema=Chunk,
input_queue, subscriber,
schema=JsonSchema(TextDocument),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Chunk),
) )
self.text_splitter = RecursiveCharacterTextSplitter( self.text_splitter = RecursiveCharacterTextSplitter(
@ -58,18 +45,7 @@ class Processor:
is_separator_regex=False, is_separator_regex=False,
) )
print("Chunker inited") def handle(self, msg):
def run(self):
print("Chunker running")
while True:
msg = self.consumer.receive()
print("Chunker message received")
try:
v = msg.value() v = msg.value()
print(f"Chunking {v.source.id}...", flush=True) print(f"Chunking {v.source.id}...", flush=True)
@ -91,61 +67,16 @@ class Processor:
chunk=chunk.page_content.encode("utf-8"), chunk=chunk.page_content.encode("utf-8"),
) )
self.producer.send(r) self.send(r)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True) print("Done.", flush=True)
except Exception as e: @staticmethod
print(e, flush=True) def add_args(parser):
# Message failed to be processed ConsumerProducer.add_args(
self.consumer.negative_acknowledge(msg) parser, default_input_queue, default_subscriber,
default_output_queue,
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -162,30 +93,7 @@ def run():
help=f'Chunk overlap (default: 100)' help=f'Chunk overlap (default: 100)'
) )
args = parser.parse_args() def run():
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start('chunker', __doc__)

View file

@ -4,71 +4,47 @@ Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects. PDF document as text as separate output objects.
""" """
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile import tempfile
import base64 import base64
import os from langchain_community.document_loaders import PyPDFLoader
import argparse
import time
from ... schema import Document, TextDocument, Source from ... schema import Document, TextDocument, Source
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'document-load' default_input_queue = 'document-load'
default_output_queue = 'text-doc-load' default_output_queue = 'text-doc-load'
default_subscriber = 'pdf-decoder' default_subscriber = 'pdf-decoder'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=Document,
self.consumer = self.client.subscribe( output_schema=TextDocument,
input_queue, subscriber,
schema=JsonSchema(Document),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextDocument),
) )
print("PDF inited") print("PDF inited")
print("Pulsar", pulsar_host) def handle(self, msg):
print("Input", input_queue)
print("Output", output_queue)
print("Subscriber", subscriber)
def run(self):
print("PDF running")
while True:
msg = self.consumer.receive()
print("PDF message received") print("PDF message received")
try:
v = msg.value() v = msg.value()
print(f"Decoding {v.source.id}...", flush=True) print(f"Decoding {v.source.id}...", flush=True)
with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
@ -93,82 +69,19 @@ class Processor:
text=page.page_content.encode("utf-8"), text=page.page_content.encode("utf-8"),
) )
self.producer.send(r) self.send(r)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True) print("Done.", flush=True)
except Exception as e: @staticmethod
print(e, flush=True) def add_args(parser):
# Message failed to be processed ConsumerProducer.add_args(
self.consumer.negative_acknowledge(msg) parser, default_input_queue, default_subscriber,
default_output_queue,
def __del__(self): )
if self.client:
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start("pdf-decoder", __doc__)
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,29 +4,22 @@ Embeddings service, applies an embeddings model selected from HuggingFace.
Input is text, output is embeddings vector. Input is text, output is embeddings vector.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_huggingface import HuggingFaceEmbeddings from langchain_huggingface import HuggingFaceEmbeddings
import time
from ... schema import EmbeddingsRequest, EmbeddingsResponse from ... schema import EmbeddingsRequest, EmbeddingsResponse
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'embeddings' default_input_queue = 'embeddings'
default_output_queue = 'embeddings-response' default_output_queue = 'embeddings-response'
default_subscriber = 'embeddings-hf' default_subscriber = 'embeddings-hf'
default_model="all-MiniLM-L6-v2" default_model="all-MiniLM-L6-v2"
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -34,37 +27,23 @@ class Processor:
model=default_model, model=default_model,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=EmbeddingsRequest,
self.consumer = self.client.subscribe( output_schema=EmbeddingsResponse,
input_queue, subscriber,
schema=JsonSchema(EmbeddingsRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(EmbeddingsResponse),
) )
self.embeddings = HuggingFaceEmbeddings(model_name=model) self.embeddings = HuggingFaceEmbeddings(model_name=model)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
# Sender-produced ID # Sender-produced ID
id = msg.properties()["id"] id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True) print(f"Handling input {id}...", flush=True)
@ -78,58 +57,12 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True) default_output_queue,
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -138,28 +71,7 @@ def run():
help=f'LLM model (default: all-MiniLM-L6-v2)' help=f'LLM model (default: all-MiniLM-L6-v2)'
) )
args = parser.parse_args() def run():
Processor.start("embeddings-hf", __doc__)
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -3,31 +3,23 @@
Embeddings service, applies an embeddings model selected from HuggingFace. Embeddings service, applies an embeddings model selected from HuggingFace.
Input is text, output is embeddings vector. Input is text, output is embeddings vector.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.embeddings import OllamaEmbeddings from langchain_community.embeddings import OllamaEmbeddings
import time
from ... schema import EmbeddingsRequest, EmbeddingsResponse from ... schema import EmbeddingsRequest, EmbeddingsResponse
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'embeddings' default_input_queue = 'embeddings'
default_output_queue = 'embeddings-response' default_output_queue = 'embeddings-response'
default_subscriber = 'embeddings-ollama' default_subscriber = 'embeddings-ollama'
default_model="mxbai-embed-large" default_model="mxbai-embed-large"
default_ollama = 'http://localhost:11434' default_ollama = 'http://localhost:11434'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -36,32 +28,19 @@ class Processor:
ollama=default_ollama, ollama=default_ollama,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=EmbeddingsRequest,
self.consumer = self.client.subscribe( output_schema=EmbeddingsResponse,
input_queue, subscriber,
schema=JsonSchema(EmbeddingsRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(EmbeddingsResponse),
) )
self.embeddings = OllamaEmbeddings(base_url=ollama, model=model) self.embeddings = OllamaEmbeddings(base_url=ollama, model=model)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
@ -81,58 +60,12 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True) default_output_queue,
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='embeddings-ollama',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -147,29 +80,7 @@ def run():
help=f'ollama (default: {default_ollama})' help=f'ollama (default: {default_ollama})'
) )
args = parser.parse_args() def run():
Processor.start('embeddings-ollama', __doc__)
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
ollama=args.ollama,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,49 +4,34 @@ Vectorizer, calls the embeddings service to get embeddings for a chunk.
Input is text chunk, output is chunk and vectors. Input is text chunk, output is chunk and vectors.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import Chunk, VectorsChunk from ... schema import Chunk, VectorsChunk
from ... embeddings_client import EmbeddingsClient from ... embeddings_client import EmbeddingsClient
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'chunk-load' default_input_queue = 'chunk-load'
default_output_queue = 'vectors-chunk-load' default_output_queue = 'vectors-chunk-load'
default_subscriber = 'embeddings-vectorizer' default_subscriber = 'embeddings-vectorizer'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=Chunk,
self.consumer = self.client.subscribe( output_schema=VectorsChunk,
input_queue, subscriber,
schema=JsonSchema(Chunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(VectorsChunk),
) )
self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host)
@ -56,13 +41,7 @@ class Processor:
r = VectorsChunk(source=source, chunk=chunk, vectors=vectors) r = VectorsChunk(source=source, chunk=chunk, vectors=vectors)
self.producer.send(r) self.producer.send(r)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
print(f"Indexing {v.source.id}...", flush=True) print(f"Indexing {v.source.id}...", flush=True)
@ -84,80 +63,15 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True) default_output_queue,
)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start("embeddings-vectorize", __doc__)
prog='embeddings-vectorizer',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,8 +4,6 @@ Graph writer. Input is graph edge. Writes edges to Cassandra graph.
""" """
import pulsar import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64 import base64
import os import os
import argparse import argparse
@ -14,46 +12,36 @@ import time
from ... trustgraph import TrustGraph from ... trustgraph import TrustGraph
from ... schema import Triple from ... schema import Triple
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import Consumer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-load' default_input_queue = 'graph-load'
default_subscriber = 'graph-write-cassandra' default_subscriber = 'graph-write-cassandra'
default_graph_host='localhost' default_graph_host='localhost'
class Processor: class Processor(Consumer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
graph_host=default_graph_host, graph_host=default_graph_host,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) subscriber=subscriber,
) input_schema=Triple,
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(Triple),
) )
self.tg = TrustGraph([graph_host]) self.tg = TrustGraph([graph_host])
self.count = 0 self.count = 0
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
@ -68,81 +56,20 @@ class Processor:
if (self.count % 1000) == 0: if (self.count % 1000) == 0:
print(self.count, "...", flush=True) print(self.count, "...", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: Consumer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='graph-write-cassandra',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
'-g', '--graph-host', '-g', '--graph-host',
default="localhost", default="localhost",
help=f'Output queue (default: localhost)' help=f'Graph host (default: localhost)'
) )
args = parser.parse_args() def run():
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_host=args.graph_host,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("graph-write-cassandra", __doc__)

View file

@ -4,57 +4,41 @@ Simple decoder, accepts vector+text chunks input, applies entity analysis to
get entity definitions which are output as graph edges. get entity definitions which are output as graph edges.
""" """
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse import urllib.parse
import time import json
from ... schema import VectorsChunk, Triple, Source, Value from ... schema import VectorsChunk, Triple, Source, Value
from ... log_level import LogLevel from ... log_level import LogLevel
from ... llm_client import LlmClient from ... llm_client import LlmClient
from ... prompts import to_definitions from ... prompts import to_definitions
from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION
from ... base import ConsumerProducer
DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load' default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load' default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-definitions' default_subscriber = 'kg-extract-definitions'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=VectorsChunk,
self.consumer = self.client.subscribe( output_schema=Triple,
input_queue, subscriber,
schema=JsonSchema(VectorsChunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
) )
self.llm = LlmClient(pulsar_host=pulsar_host) self.llm = LlmClient(pulsar_host=pulsar_host)
@ -81,21 +65,13 @@ class Processor:
t = Triple(s=s, p=p, o=o) t = Triple(s=s, p=p, o=o)
self.producer.send(t) self.producer.send(t)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
print(f"Indexing {v.source.id}...", flush=True) print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8") chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try: try:
defs = self.get_definitions(chunk) defs = self.get_definitions(chunk)
@ -118,80 +94,15 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception: ", e, flush=True) default_output_queue,
)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start("kg-extract-definitions", __doc__)
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -5,37 +5,29 @@ relationship analysis to get entity relationship edges which are output as
graph edges. graph edges.
""" """
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse import urllib.parse
import time import json
from pulsar.schema import JsonSchema
from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value
from ... log_level import LogLevel from ... log_level import LogLevel
from ... llm_client import LlmClient from ... llm_client import LlmClient
from ... prompts import to_relationships from ... prompts import to_relationships
from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES
from ... base import ConsumerProducer
RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load' default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load' default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-relationships' default_subscriber = 'kg-extract-relationships'
default_vector_queue='vectors-load' default_vector_queue='vectors-load'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
vector_queue=default_vector_queue, vector_queue=default_vector_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
@ -43,19 +35,14 @@ class Processor:
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = pulsar.Client( super(Processor, self).__init__(
pulsar_host, pulsar_host=pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) log_level=log_level,
) input_queue=input_queue,
output_queue=output_queue,
self.consumer = self.client.subscribe( subscriber=subscriber,
input_queue, subscriber, input_schema=VectorsChunk,
schema=JsonSchema(VectorsChunk), output_schema=Triple,
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
) )
self.vec_prod = self.client.create_producer( self.vec_prod = self.client.create_producer(
@ -92,21 +79,13 @@ class Processor:
r = VectorsAssociation(entity=ent, vectors=vec) r = VectorsAssociation(entity=ent, vectors=vec)
self.vec_prod.send(r) self.vec_prod.send(r)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
print(f"Indexing {v.source.id}...", flush=True) print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8") chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try: try:
rels = self.get_relationships(chunk) rels = self.get_relationships(chunk)
@ -168,56 +147,12 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception: ", e, flush=True) default_output_queue,
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='kg-extract-relationships',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -226,28 +161,7 @@ def run():
help=f'Vector output queue (default: {default_vector_queue})' help=f'Vector output queue (default: {default_vector_queue})'
) )
args = parser.parse_args() def run():
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
vector_queue=args.vector_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("kg-extract-relationships", __doc__)

View file

@ -4,30 +4,23 @@ Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response. Input is prompt, output is response.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.llms import Ollama from langchain_community.llms import Ollama
import time
from ... schema import TextCompletionRequest, TextCompletionResponse from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text' default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response' default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-ollama-text' default_subscriber = 'llm-ollama-text'
default_model = 'gemma2' default_model = 'gemma2'
default_ollama = 'http://localhost:11434' default_ollama = 'http://localhost:11434'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -36,37 +29,23 @@ class Processor:
ollama=default_ollama, ollama=default_ollama,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=TextCompletionRequest,
self.consumer = self.client.subscribe( output_schema=TextCompletionResponse,
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
) )
self.llm = Ollama(base_url=ollama, model=model) self.llm = Ollama(base_url=ollama, model=model)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
# Sender-produced ID # Sender-produced ID
id = msg.properties()["id"] id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True) print(f"Handling prompt {id}...", flush=True)
@ -75,63 +54,19 @@ class Processor:
response = self.llm.invoke(prompt) response = self.llm.invoke(prompt)
print("Send response...", flush=True) print("Send response...", flush=True)
r = TextCompletionResponse(response=response) r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
self.send(r, properties={"id": id})
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True) default_output_queue,
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -146,29 +81,8 @@ def run():
help=f'ollama (default: {default_ollama})' help=f'ollama (default: {default_ollama})'
) )
args = parser.parse_args() def run():
Processor.start("llm-ollama-text", __doc__)
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
ollama=args.ollama,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,30 +4,22 @@ Simple RAG service, performs query using graph RAG an LLM.
Input is query, output is response. Input is query, output is response.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import GraphRagQuery, GraphRagResponse from ... schema import GraphRagQuery, GraphRagResponse
from ... log_level import LogLevel from ... log_level import LogLevel
from ... graph_rag import GraphRag from ... graph_rag import GraphRag
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-rag-query' default_input_queue = 'graph-rag-query'
default_output_queue = 'graph-rag-response' default_output_queue = 'graph-rag-response'
default_subscriber = 'graph-rag' default_subscriber = 'graph-rag'
default_graph_hosts = [ 'localhost' ] default_graph_hosts = [ 'localhost' ]
default_vector_store = 'http://localhost:19530' default_vector_store = 'http://localhost:19530'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -39,21 +31,14 @@ class Processor:
max_sg_size=3000, max_sg_size=3000,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=GraphRagQuery,
self.consumer = self.client.subscribe( output_schema=GraphRagResponse,
input_queue, subscriber,
schema=JsonSchema(GraphRagQuery),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(GraphRagResponse),
) )
self.rag = GraphRag( self.rag = GraphRag(
@ -66,13 +51,7 @@ class Processor:
max_sg_size=max_sg_size, max_sg_size=max_sg_size,
) )
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
@ -90,58 +69,12 @@ class Processor:
print("Done.", flush=True) print("Done.", flush=True)
# Acknowledge successful processing of the message @staticmethod
self.consumer.acknowledge(msg) def add_args(parser):
except Exception as e: ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
print("Exception:", e, flush=True) default_output_queue,
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='graph-rag',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -177,31 +110,8 @@ def run():
help=f'Max subgraph size (default: 3000)' help=f'Max subgraph size (default: 3000)'
) )
args = parser.parse_args() def run():
while True: Processor.start('graph-rag', __doc__)
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_hosts=args.graph_hosts.split(","),
vector_store=args.vector_store,
entity_limit=args.entity_limit,
triple_limit=args.triple_limit,
max_sg_size=args.max_subgraph_size,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -3,109 +3,48 @@
Accepts entity/vector pairs and writes them to a Milvus store. Accepts entity/vector pairs and writes them to a Milvus store.
""" """
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import time
from ... schema import VectorsAssociation from ... schema import VectorsAssociation
from ... log_level import LogLevel from ... log_level import LogLevel
from ... triple_vectors import TripleVectors from ... triple_vectors import TripleVectors
from ... base import Consumer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-load' default_input_queue = 'vectors-load'
default_subscriber = 'vector-write-milvus' default_subscriber = 'vector-write-milvus'
default_store_uri = 'http://localhost:19530' default_store_uri = 'http://localhost:19530'
class Processor: class Processor(Consumer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
store_uri=default_store_uri, store_uri=default_store_uri,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) subscriber=subscriber,
) input_schema=VectorsAssociation,
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsAssociation),
) )
self.vecstore = TripleVectors(store_uri) self.vecstore = TripleVectors(store_uri)
def run(self): def handle(self, msg):
while True:
msg = self.consumer.receive()
try:
v = msg.value() v = msg.value()
if v.entity.value != "": if v.entity.value != "":
for vec in v.vectors: for vec in v.vectors:
self.vecstore.insert(vec, v.entity.value) self.vecstore.insert(vec, v.entity.value)
@staticmethod
def add_args(parser):
# Acknowledge successful processing of the message Consumer.add_args(
self.consumer.acknowledge(msg) parser, default_input_queue, default_subscriber,
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
) )
parser.add_argument( parser.add_argument(
@ -114,27 +53,8 @@ def run():
help=f'Milvus store URI (default: http://milvus:19530)' help=f'Milvus store URI (default: http://milvus:19530)'
) )
args = parser.parse_args() def run():
while True: Processor.start("vector-write-milvus", __doc__)
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
store_uri=args.store_uri,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)