This commit is contained in:
Cyber MacGeddon 2024-07-17 15:58:11 +01:00
parent 40b3553053
commit 39d066a07c
4 changed files with 58 additions and 211 deletions

View file

@ -196,7 +196,6 @@ class ConsumerProducer(BaseProcessor):
def send(self, msg, properties={}):
print(msg)
self.producer.send(msg, properties)
@staticmethod
@ -250,7 +249,6 @@ class Producer(BaseProcessor):
def send(self, msg, properties={}):
print(msg)
self.producer.send(msg, properties)
@staticmethod

View file

@ -4,8 +4,6 @@ Graph writer. Input is graph edge. Writes edges to Cassandra graph.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
@ -14,135 +12,64 @@ import time
from ... trustgraph import TrustGraph
from ... schema import Triple
from ... log_level import LogLevel
from ... base import Consumer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-load'
default_subscriber = 'graph-write-cassandra'
default_graph_host='localhost'
class Processor:
class Processor(Consumer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
subscriber=default_subscriber,
graph_host=default_graph_host,
log_level=LogLevel.INFO,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(Triple),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
subscriber=subscriber,
input_schema=Triple,
)
self.tg = TrustGraph([graph_host])
self.count = 0
def run(self):
def handle(self, msg):
while True:
v = msg.value()
msg = self.consumer.receive()
self.tg.insert(
v.s.value,
v.p.value,
v.o.value
)
try:
self.count += 1
v = msg.value()
if (self.count % 1000) == 0:
print(self.count, "...", flush=True)
self.tg.insert(
v.s.value,
v.p.value,
v.o.value
)
@staticmethod
def add_args(parser):
self.count += 1
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
if (self.count % 1000) == 0:
print(self.count, "...", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
parser.add_argument(
'-g', '--graph-host',
default="localhost",
help=f'Graph host (default: localhost)'
)
def run():
parser = argparse.ArgumentParser(
prog='graph-write-cassandra',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-g', '--graph-host',
default="localhost",
help=f'Output queue (default: localhost)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_host=args.graph_host,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("graph-write-cassandra", __doc__)

View file

@ -4,14 +4,7 @@ Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.llms import Ollama
import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
@ -42,8 +35,8 @@ class Processor(ConsumerProducer):
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
request_schema=TextCompletionRequest,
response_schema=TextCompletionResponse,
input_schema=TextCompletionRequest,
output_schema=TextCompletionResponse,
)
self.llm = Ollama(base_url=ollama, model=model)

View file

@ -15,126 +15,55 @@ import time
from ... schema import VectorsAssociation
from ... log_level import LogLevel
from ... triple_vectors import TripleVectors
from ... base import Consumer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-load'
default_subscriber = 'vector-write-milvus'
default_store_uri = 'http://localhost:19530'
class Processor:
class Processor(Consumer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
subscriber=default_subscriber,
store_uri=default_store_uri,
log_level=LogLevel.INFO,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsAssociation),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
subscriber=subscriber,
input_schema=VectorsAssociation,
)
self.vecstore = TripleVectors(store_uri)
def run(self):
def handle(self, msg):
while True:
v = msg.value()
msg = self.consumer.receive()
if v.entity.value != "":
for vec in v.vectors:
self.vecstore.insert(vec, v.entity.value)
@staticmethod
def add_args(parser):
try:
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
v = msg.value()
if v.entity.value != "":
for vec in v.vectors:
self.vecstore.insert(vec, v.entity.value)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
if self.client:
self.client.close()
parser.add_argument(
'-t', '--store-uri',
default="http://milvus:19530",
help=f'Milvus store URI (default: http://milvus:19530)'
)
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-t', '--store-uri',
default="http://milvus:19530",
help=f'Milvus store URI (default: http://milvus:19530)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
store_uri=args.store_uri,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("vector-write-milvus", __doc__)