Standard core complete

This commit is contained in:
Cyber MacGeddon 2024-07-17 16:48:33 +01:00
parent 25e3c8e97d
commit d0bc32892a
4 changed files with 178 additions and 538 deletions

View file

@ -4,28 +4,19 @@ Simple decoder, accepts text documents on input, outputs chunks from the
as text as separate output objects. as text as separate output objects.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_text_splitters import RecursiveCharacterTextSplitter
import time
from ... schema import TextDocument, Chunk, Source from ... schema import TextDocument, Chunk, Source
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'text-doc-load' default_input_queue = 'text-doc-load'
default_output_queue = 'chunk-load' default_output_queue = 'chunk-load'
default_subscriber = 'chunker-recursive' default_subscriber = 'chunker-recursive'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -34,21 +25,14 @@ class Processor:
chunk_overlap=100, chunk_overlap=100,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=TextDocument,
self.consumer = self.client.subscribe( output_schema=Chunk,
input_queue, subscriber,
schema=JsonSchema(TextDocument),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Chunk),
) )
self.text_splitter = RecursiveCharacterTextSplitter( self.text_splitter = RecursiveCharacterTextSplitter(
@ -58,134 +42,55 @@ class Processor:
is_separator_regex=False, is_separator_regex=False,
) )
print("Chunker inited") def handle(self, msg):
def run(self): v = msg.value()
print(f"Chunking {v.source.id}...", flush=True)
print("Chunker running") texts = self.text_splitter.create_documents(
[v.text.decode("utf-8")]
)
while True: for ix, chunk in enumerate(texts):
msg = self.consumer.receive() id = v.source.id + "-c" + str(ix)
print("Chunker message received")
try: r = Chunk(
source=Source(
source=v.source.source,
id=id,
title=v.source.title
),
chunk=chunk.page_content.encode("utf-8"),
)
v = msg.value() self.send(r)
print(f"Chunking {v.source.id}...", flush=True)
texts = self.text_splitter.create_documents( print("Done.", flush=True)
[v.text.decode("utf-8")]
)
for ix, chunk in enumerate(texts): @staticmethod
def add_args(parser):
id = v.source.id + "-c" + str(ix) ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
r = Chunk( parser.add_argument(
source=Source( '-z', '--chunk-size',
source=v.source.source, type=int,
id=id, default=2000,
title=v.source.title help=f'Chunk size (default: 2000)'
), )
chunk=chunk.page_content.encode("utf-8"),
)
self.producer.send(r) parser.add_argument(
'-v', '--chunk-overlap',
# Acknowledge successful processing of the message type=int,
self.consumer.acknowledge(msg) default=100,
help=f'Chunk overlap (default: 100)'
print("Done.", flush=True) )
except Exception as e:
print(e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start('chunker', __doc__)
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-z', '--chunk-size',
type=int,
default=2000,
help=f'Chunk size (default: 2000)'
)
parser.add_argument(
'-v', '--chunk-overlap',
type=int,
default=100,
help=f'Chunk overlap (default: 100)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,171 +4,82 @@ Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects. PDF document as text as separate output objects.
""" """
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import time
from ... schema import Document, TextDocument, Source from ... schema import Document, TextDocument, Source
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'document-load' default_input_queue = 'document-load'
default_output_queue = 'text-doc-load' default_output_queue = 'text-doc-load'
default_subscriber = 'pdf-decoder' default_subscriber = 'pdf-decoder'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=Document,
self.consumer = self.client.subscribe( output_schema=TextDocument,
input_queue, subscriber,
schema=JsonSchema(Document),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextDocument),
) )
print("PDF inited") print("PDF inited")
print("Pulsar", pulsar_host) def handle(self, msg):
print("Input", input_queue)
print("Output", output_queue)
print("Subscriber", subscriber)
def run(self): print("PDF message received")
print("PDF running") v = msg.value()
while True: print(f"Decoding {v.source.id}...", flush=True)
msg = self.consumer.receive() with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
print("PDF message received") fp.write(base64.b64decode(v.data))
fp.close()
try: with open(fp.name, mode='rb') as f:
v = msg.value() loader = PyPDFLoader(fp.name)
print(f"Decoding {v.source.id}...", flush=True) pages = loader.load()
with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: for ix, page in enumerate(pages):
fp.write(base64.b64decode(v.data)) id = v.source.id + "-p" + str(ix)
fp.close() r = TextDocument(
source=Source(
source=v.source.source,
title=v.source.title,
id=id,
),
text=page.page_content.encode("utf-8"),
)
with open(fp.name, mode='rb') as f: self.send(r)
loader = PyPDFLoader(fp.name) print("Done.", flush=True)
pages = loader.load()
for ix, page in enumerate(pages): @staticmethod
def add_args(parser):
id = v.source.id + "-p" + str(ix) ConsumerProducer.add_args(
r = TextDocument( parser, default_input_queue, default_subscriber,
source=Source( default_output_queue,
source=v.source.source, )
title=v.source.title,
id=id,
),
text=page.page_content.encode("utf-8"),
)
self.producer.send(r)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True)
except Exception as e:
print(e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start("pdf-decoder", __doc__)
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -4,49 +4,34 @@ Vectorizer, calls the embeddings service to get embeddings for a chunk.
Input is text chunk, output is chunk and vectors. Input is text chunk, output is chunk and vectors.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import Chunk, VectorsChunk from ... schema import Chunk, VectorsChunk
from ... embeddings_client import EmbeddingsClient from ... embeddings_client import EmbeddingsClient
from ... log_level import LogLevel from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'chunk-load' default_input_queue = 'chunk-load'
default_output_queue = 'vectors-chunk-load' default_output_queue = 'vectors-chunk-load'
default_subscriber = 'embeddings-vectorizer' default_subscriber = 'embeddings-vectorizer'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
log_level=LogLevel.INFO, log_level=LogLevel.INFO,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=Chunk,
self.consumer = self.client.subscribe( output_schema=VectorsChunk,
input_queue, subscriber,
schema=JsonSchema(Chunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(VectorsChunk),
) )
self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host)
@ -56,108 +41,37 @@ class Processor:
r = VectorsChunk(source=source, chunk=chunk, vectors=vectors) r = VectorsChunk(source=source, chunk=chunk, vectors=vectors)
self.producer.send(r) self.producer.send(r)
def run(self): def handle(self, msg):
while True: v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
msg = self.consumer.receive() chunk = v.chunk.decode("utf-8")
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
try:
vectors = self.embeddings.request(chunk)
self.emit(
source=v.source,
chunk=chunk.encode("utf-8"),
vectors=vectors
)
except Exception as e:
print("Exception:", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='embeddings-vectorizer',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try: try:
p = Processor( vectors = self.embeddings.request(chunk)
pulsar_host=args.pulsar_host,
input_queue=args.input_queue, self.emit(
output_queue=args.output_queue, source=v.source,
subscriber=args.subscriber, chunk=chunk.encode("utf-8"),
log_level=args.log_level, vectors=vectors
) )
p.run()
except Exception as e: except Exception as e:
print("Exception:", e, flush=True) print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10) print("Done.", flush=True)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
def run():
Processor.start("embeddings-vectorize", __doc__)

View file

@ -4,30 +4,22 @@ Simple RAG service, performs query using graph RAG an LLM.
Input is query, output is response. Input is query, output is response.
""" """
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import GraphRagQuery, GraphRagResponse from ... schema import GraphRagQuery, GraphRagResponse
from ... log_level import LogLevel from ... log_level import LogLevel
from ... graph_rag import GraphRag from ... graph_rag import GraphRag
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-rag-query' default_input_queue = 'graph-rag-query'
default_output_queue = 'graph-rag-response' default_output_queue = 'graph-rag-response'
default_subscriber = 'graph-rag' default_subscriber = 'graph-rag'
default_graph_hosts = [ 'localhost' ] default_graph_hosts = [ 'localhost' ]
default_vector_store = 'http://localhost:19530' default_vector_store = 'http://localhost:19530'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(
self, self,
pulsar_host=default_pulsar_host, pulsar_host=None,
input_queue=default_input_queue, input_queue=default_input_queue,
output_queue=default_output_queue, output_queue=default_output_queue,
subscriber=default_subscriber, subscriber=default_subscriber,
@ -39,21 +31,14 @@ class Processor:
max_sg_size=3000, max_sg_size=3000,
): ):
self.client = None super(Processor, self).__init__(
pulsar_host=pulsar_host,
self.client = pulsar.Client( log_level=log_level,
pulsar_host, input_queue=input_queue,
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) output_queue=output_queue,
) subscriber=subscriber,
input_schema=TextCompletionRequest,
self.consumer = self.client.subscribe( output_schema=TextCompletionResponse,
input_queue, subscriber,
schema=JsonSchema(GraphRagQuery),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(GraphRagResponse),
) )
self.rag = GraphRag( self.rag = GraphRag(
@ -66,142 +51,67 @@ class Processor:
max_sg_size=max_sg_size, max_sg_size=max_sg_size,
) )
def run(self): def handle(self, msg):
while True: v = msg.value()
msg = self.consumer.receive() # Sender-produced ID
try: id = msg.properties()["id"]
v = msg.value() print(f"Handling input {id}...", flush=True)
# Sender-produced ID response = self.rag.query(v.query)
id = msg.properties()["id"] print("Send response...", flush=True)
r = GraphRagResponse(response = response)
self.producer.send(r, properties={"id": id})
print(f"Handling input {id}...", flush=True) print("Done.", flush=True)
response = self.rag.query(v.query) @staticmethod
def add_args(parser):
print("Send response...", flush=True) ConsumerProducer.add_args(
r = GraphRagResponse(response = response) parser, default_input_queue, default_subscriber,
self.producer.send(r, properties={"id": id}) default_output_queue,
)
print("Done.", flush=True) parser.add_argument(
'-g', '--graph-hosts',
default='cassandra',
help=f'Graph hosts, comma separated (default: cassandra)'
)
# Acknowledge successful processing of the message parser.add_argument(
self.consumer.acknowledge(msg) '-v', '--vector-store',
default='http://milvus:19530',
help=f'Vector host (default: http://milvus:19530)'
)
except Exception as e: parser.add_argument(
'-e', '--entity-limit',
type=int,
default=50,
help=f'Entity vector fetch limit (default: 50)'
)
print("Exception:", e, flush=True) parser.add_argument(
'-t', '--triple-limit',
type=int,
default=30,
help=f'Triple query limit, per query (default: 30)'
)
# Message failed to be processed parser.add_argument(
self.consumer.negative_acknowledge(msg) '-u', '--max-subgraph-size',
type=int,
def __del__(self): default=3000,
help=f'Max subgraph size (default: 3000)'
if self.client: )
self.client.close()
def run(): def run():
parser = argparse.ArgumentParser( Processor.start('graph-rag', __doc__)
prog='graph-rag',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-g', '--graph-hosts',
default='cassandra',
help=f'Graph hosts, comma separated (default: cassandra)'
)
parser.add_argument(
'-v', '--vector-store',
default='http://milvus:19530',
help=f'Vector host (default: http://milvus:19530)'
)
parser.add_argument(
'-e', '--entity-limit',
type=int,
default=50,
help=f'Entity vector fetch limit (default: 50)'
)
parser.add_argument(
'-t', '--triple-limit',
type=int,
default=30,
help=f'Triple query limit, per query (default: 30)'
)
parser.add_argument(
'-u', '--max-subgraph-size',
type=int,
default=3000,
help=f'Max subgraph size (default: 3000)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_hosts=args.graph_hosts.split(","),
vector_store=args.vector_store,
entity_limit=args.entity_limit,
triple_limit=args.triple_limit,
max_sg_size=args.max_subgraph_size,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)