Trustgraph initial code drop

This commit is contained in:
Cyber MacGeddon 2024-07-10 23:20:06 +01:00
parent c5f4604a7b
commit 9b5cbbf9ca
94 changed files with 5399 additions and 0 deletions

0
trustgraph/__init__.py Normal file
View file

View file

View file

@ -0,0 +1,3 @@
from . chunker import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . chunker import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,164 @@
"""
Simple decoder, accepts text documents on input, outputs chunks from the
as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_text_splitters import RecursiveCharacterTextSplitter
import time
from ... schema import TextDocument, Chunk, Source
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextDocument),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Chunk),
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Chunking {v.source.id}...", flush=True)
texts = self.text_splitter.create_documents(
[v.text.decode("utf-8")]
)
for ix, chunk in enumerate(texts):
id = v.source.id + "-c" + str(ix)
r = Chunk(
source=Source(
source=v.source.source,
id=id,
title=v.source.title
),
chunk=chunk.page_content.encode("utf-8"),
)
self.producer.send(r)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True)
except Exception as e:
print(e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'text-doc-load'
default_output_queue = 'chunk-load'
default_subscriber = 'chunker-recursive'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

View file

@ -0,0 +1,3 @@
from . pdf_decoder import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . pdf_decoder import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,159 @@
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import time
from ... schema import Document, TextDocument, Source
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(Document),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextDocument),
)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Decoding {v.source.id}...", flush=True)
with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
fp.write(base64.b64decode(v.data))
fp.close()
with open(fp.name, mode='rb') as f:
loader = PyPDFLoader(fp.name)
pages = loader.load()
for ix, page in enumerate(pages):
id = v.source.id + "-p" + str(ix)
r = TextDocument(
source=Source(
source=v.source.source,
title=v.source.title,
id=id,
),
text=page.page_content.encode("utf-8"),
)
self.producer.send(r)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True)
except Exception as e:
print(e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'document-load'
default_output_queue = 'text-doc-load'
default_subscriber = 'pdf-decoder'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

102
trustgraph/edge_map.py Normal file
View file

@ -0,0 +1,102 @@
from pymilvus import MilvusClient, CollectionSchema, FieldSchema, DataType
class VectorStore:
def __init__(self, uri="http://localhost:19530"):
self.client = MilvusClient(uri=uri)
self.collection = "edges"
self.dimension = 384
if not self.client.has_collection(collection_name=self.collection):
self.init_collection()
def init_collection(self):
pkey_field = FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True,
)
vec_field = FieldSchema(
name="vector",
dtype=DataType.FLOAT_VECTOR,
dim=self.dimension,
)
entity_field = FieldSchema(
name="entity",
dtype=DataType.VARCHAR,
max_length=65535,
)
schema = CollectionSchema(
fields = [pkey_field, vec_field, entity_field],
description = "Edge map schema",
)
self.client.create_collection(
collection_name=self.collection,
schema=schema,
metric_type="IP",
)
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
metric_type="COSINE",
index_type="FLAT", # IVF_FLAT?!
index_name="vector_index",
params={ "nlist": 128 }
)
self.client.create_index(
collection_name=self.collection,
index_params=index_params
)
def insert(self, embeds, entity):
data = [
{
"vector": embeds,
"entity": entity,
}
]
self.client.insert(collection_name=self.collection, data=data)
def search(self, embeds, fields=["entity"], limit=10):
search_params = {
"metric_type": "COSINE",
"params": {
"radius": 0.1,
"range_filter": 0.8
}
}
self.client.load_collection(
collection_name=self.collection,
# replica_number=1
)
res = self.client.search(
collection_name=self.collection,
data=[embeds],
limit=limit,
output_fields=fields,
search_params=search_params,
)[0]
self.client.release_collection(
collection_name=self.collection,
)
return res

View file

View file

@ -0,0 +1,3 @@
from . hf import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . hf import run
if __name__ == '__main__':
run()

161
trustgraph/embeddings/hf/hf.py Executable file
View file

@ -0,0 +1,161 @@
"""
Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_huggingface import HuggingFaceEmbeddings
import time
from ... schema import EmbeddingsRequest, EmbeddingsResponse
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
model,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(EmbeddingsRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(EmbeddingsResponse),
)
self.embeddings = HuggingFaceEmbeddings(model_name=model)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
text = v.text
embeds = self.embeddings.embed_documents([text])
print("Send response...", flush=True)
r = EmbeddingsResponse(vectors=embeds)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
print("Closing", flush=True)
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'embeddings'
default_output_queue = 'embeddings-response'
default_subscriber = 'embeddings-hf'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="all-MiniLM-L6-v2",
help=f'LLM model (default: all-MiniLM-L6-v2)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,3 @@
from . vectorize import *

View file

@ -0,0 +1,6 @@
from . vectorize import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,167 @@
"""
Vectorizer, applies an embedding algorithm to a chunk. Input is a chunk,
output is chunk and vectors.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import Chunk, VectorsChunk
from ... embeddings_client import EmbeddingsClient
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
model,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(Chunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(VectorsChunk),
)
self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host)
def emit(self, source, chunk, vectors):
r = VectorsChunk(source=source, chunk=chunk, vectors=vectors)
self.producer.send(r)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
try:
vectors = self.embeddings.request(chunk)
self.emit(
source=v.source,
chunk=chunk.encode("utf-8"),
vectors=vectors
)
except Exception as e:
print("Exception:", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='embeddings-vectorizer',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'chunk-load'
default_output_queue = 'vectors-chunk-load'
default_subscriber = 'embeddings-vectorizer'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="all-MiniLM-L6-v2",
help=f'LLM model (default: all-MiniLM-L6-v2)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,70 @@
#!/usr/bin/env python3
import pulsar
import _pulsar
from pulsar.schema import JsonSchema
from trustgraph.schema import EmbeddingsRequest, EmbeddingsResponse
import hashlib
import uuid
# Ugly
ERROR=_pulsar.LoggerLevel.Error
WARN=_pulsar.LoggerLevel.Warn
INFO=_pulsar.LoggerLevel.Info
DEBUG=_pulsar.LoggerLevel.Debug
class EmbeddingsClient:
def __init__(
self, log_level=ERROR, client_id=None,
pulsar_host="pulsar://pulsar:6650",
):
if client_id == None:
client_id = str(uuid.uuid4())
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level),
)
self.producer = self.client.create_producer(
topic='embeddings',
schema=JsonSchema(EmbeddingsRequest),
chunking_enabled=True,
)
self.consumer = self.client.subscribe(
'embeddings-response', client_id,
schema=JsonSchema(EmbeddingsResponse),
)
def request(self, text, timeout=500):
id = str(uuid.uuid4())
r = EmbeddingsRequest(
text=text
)
self.producer.send(r, properties={ "id": id })
while True:
msg = self.consumer.receive(timeout_millis=timeout * 1000)
mid = msg.properties()["id"]
if mid == id:
resp = msg.value().vectors
self.consumer.acknowledge(msg)
return resp
# Ignore messages with wrong ID
self.consumer.acknowledge(msg)
def __del__(self):
self.producer.close()
self.consumer.close()
self.client.close()

View file

View file

@ -0,0 +1,3 @@
from . write import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,144 @@
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... trustgraph import TrustGraph
from ... schema import Triple
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
subscriber,
log_level,
graph_host,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(Triple),
)
self.tg = TrustGraph([graph_host])
self.count = 0
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
self.tg.insert(
v.s.value,
v.p.value,
v.o.value
)
self.count += 1
if (self.count % 1000) == 0:
print(self.count, "...", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='graph-write-cassandra',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-load'
default_subscriber = 'graph-write-cassandra'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-g', '--graph-host',
default="localhost",
help=f'Output queue (default: localhost)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_host=args.graph_host,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

227
trustgraph/graph_rag.py Normal file
View file

@ -0,0 +1,227 @@
from trustgraph.trustgraph import TrustGraph
from trustgraph.edge_map import VectorStore
from trustgraph.trustgraph import TrustGraph
from trustgraph.llm_client import LlmClient
from trustgraph.embeddings_client import EmbeddingsClient
LABEL="http://www.w3.org/2000/01/rdf-schema#label"
DEFINITION="http://www.w3.org/2004/02/skos/core#definition"
class GraphRag:
def __init__(
self,
graph_hosts=None,
pulsar_host="pulsar://pulsar:6650",
vector_store="http://milvus:19530",
verbose=False
):
self.verbose=verbose
if graph_hosts == None:
graph_hosts = ["cassandra"]
if self.verbose:
print("Initialising...", flush=True)
self.graph = TrustGraph(graph_hosts)
self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host)
self.vecstore = VectorStore(vector_store)
self.entity_limit=50
self.query_limit=30
self.max_sg_size=3000
self.label_cache = {}
self.llm = LlmClient(pulsar_host=pulsar_host)
if self.verbose:
print("Initialised", flush=True)
def get_vector(self, query):
if self.verbose:
print("Compute embeddings...", flush=True)
qembeds = self.embeddings.request(query)
if self.verbose:
print("Done.", flush=True)
return qembeds
def get_entities(self, query):
everything = []
vectors = self.get_vector(query)
if self.verbose:
print("Get entities...", flush=True)
for vector in vectors:
res = self.vecstore.search(
vector,
limit=self.entity_limit
)
entities = set([
item["entity"]["entity"]
for item in res
])
everything.extend(entities)
if self.verbose:
print("Entities:", flush=True)
for ent in everything:
print(" ", ent, flush=True)
return everything
def maybe_label(self, e):
if e in self.label_cache:
return self.label_cache[e]
res = self.graph.get_sp(e, LABEL)
res = list(res)
if len(res) == 0:
self.label_cache[e] = e
return e
self.label_cache[e] = res[0][0]
return self.label_cache[e]
def get_nodes(self, query):
ents = self.get_entities(query)
if self.verbose:
print("Get labels...", flush=True)
nodes = [
self.maybe_label(e)
for e in ents
]
if self.verbose:
print("Nodes:", flush=True)
for node in nodes:
print(" ", node, flush=True)
return nodes
def get_subgraph(self, query):
entities = self.get_entities(query)
subgraph = set()
if self.verbose:
print("Get subgraph...", flush=True)
for e in entities:
res = self.graph.get_s(e, limit=self.query_limit)
for p, o in res:
subgraph.add((e, p, o))
res = self.graph.get_p(e, limit=self.query_limit)
for s, o in res:
subgraph.add((s, e, o))
res = self.graph.get_o(e, limit=self.query_limit)
for s, p in res:
subgraph.add((s, p, e))
subgraph = list(subgraph)
subgraph = subgraph[0:self.max_sg_size]
if self.verbose:
print("Subgraph:", flush=True)
for edge in subgraph:
print(" ", str(edge), flush=True)
if self.verbose:
print("Done.", flush=True)
return subgraph
def get_labelgraph(self, query):
subgraph = self.get_subgraph(query)
sg2 = []
for edge in subgraph:
if edge[1] == LABEL:
continue
s = self.maybe_label(edge[0])
p = self.maybe_label(edge[1])
o = self.maybe_label(edge[2])
sg2.append((s, p, o))
return sg2
def get_cypher(self, query):
sg = self.get_labelgraph(query)
sg2 = []
for s, p, o in sg:
sg2.append(f"({s})-[{p}]->({o})")
kg = "\n".join(sg2)
kg = kg.replace("\\", "-")
return kg
def get_graph_prompt(self, query):
kg = self.get_cypher(query)
prompt=f"""<instructions>Study the knowledge graph provided, and use
the information to answer the question. The question should be answered
in plain English only.
</instructions>
<knowledge-graph>
{kg}
</knowledge-graph>
<question>
{query}
</question>
"""
return prompt
def query(self, query):
if self.verbose:
print("Construct prompt...", flush=True)
prompt = self.get_graph_prompt(query)
if self.verbose:
print("Invoke LLM...", flush=True)
resp = self.llm.request(prompt)
if self.verbose:
print("Done", flush=True)
return resp

View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import pulsar
import _pulsar
from pulsar.schema import JsonSchema
from trustgraph.schema import GraphRagQuery, GraphRagResponse
import hashlib
import uuid
# Ugly
ERROR=_pulsar.LoggerLevel.Error
WARN=_pulsar.LoggerLevel.Warn
INFO=_pulsar.LoggerLevel.Info
DEBUG=_pulsar.LoggerLevel.Debug
class GraphRagClient:
def __init__(
self, log_level=ERROR, client_id=None,
pulsar_host="pulsar://pulsar:6650",
):
if client_id == None:
client_id = str(uuid.uuid4())
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level),
)
self.producer = self.client.create_producer(
topic='graph-rag-query',
schema=JsonSchema(GraphRagQuery),
chunking_enabled=True,
)
self.consumer = self.client.subscribe(
'graph-rag-response', client_id,
schema=JsonSchema(GraphRagResponse),
)
def request(self, query, timeout=500):
id = str(uuid.uuid4())
r = GraphRagQuery(
query=query
)
self.producer.send(r, properties={ "id": id })
while True:
msg = self.consumer.receive(timeout_millis=timeout * 1000)
mid = msg.properties()["id"]
if mid == id:
resp = msg.value().response
self.consumer.acknowledge(msg)
return resp
# Ignore messages with wrong ID
self.consumer.acknowledge(msg)
def __del__(self):
self.client.close()

View file

View file

@ -0,0 +1,3 @@
from . extract import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . extract import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,193 @@
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse
import time
from ... schema import VectorsChunk, Triple, Source, Value
from ... log_level import LogLevel
from ... llm_client import LlmClient
from ... prompts import to_definitions
from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION
DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True)
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsChunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
)
self.llm = LlmClient(pulsar_host=pulsar_host)
def to_uri(self, text):
part = text.replace(" ", "-").lower().encode("utf-8")
quoted = urllib.parse.quote(part)
uri = TRUSTGRAPH_ENTITIES + quoted
return uri
def get_definitions(self, chunk):
prompt = to_definitions(chunk)
resp = self.llm.request(prompt)
defs = json.loads(resp)
return defs
def emit_edge(self, s, p, o):
t = Triple(s=s, p=p, o=o)
self.producer.send(t)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try:
defs = self.get_definitions(chunk)
print(json.dumps(defs, indent=4), flush=True)
for defn in defs:
s = defn["entity"]
s_uri = self.to_uri(s)
o = defn["definition"]
s_value = Value(value=str(s_uri), is_uri=True)
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(s_value, DEFINITION_VALUE, o_value)
except Exception as e:
print("Exception: ", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception: ", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-definitions'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,3 @@
from . extract import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . extract import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,252 @@
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import rdflib
import json
import urllib.parse
import time
from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value
from ... log_level import LogLevel
from ... llm_client import LlmClient
from ... prompts import to_relationships
from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES
RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True)
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
vec_queue,
subscriber,
log_level,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsChunk),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triple),
)
self.vec_prod = self.client.create_producer(
topic=vec_queue,
schema=JsonSchema(VectorsAssociation),
)
self.llm = LlmClient(pulsar_host=pulsar_host)
def to_uri(self, text):
part = text.replace(" ", "-").lower().encode("utf-8")
quoted = urllib.parse.quote(part)
uri = TRUSTGRAPH_ENTITIES + quoted
return uri
def get_relationships(self, chunk):
prompt = to_relationships(chunk)
resp = self.llm.request(prompt)
rels = json.loads(resp)
return rels
def emit_edge(self, s, p, o):
t = Triple(s=s, p=p, o=o)
self.producer.send(t)
def emit_vec(self, ent, vec):
r = VectorsAssociation(entity=ent, vectors=vec)
self.vec_prod.send(r)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
print(f"Indexing {v.source.id}...", flush=True)
chunk = v.chunk.decode("utf-8")
g = rdflib.Graph()
try:
rels = self.get_relationships(chunk)
print(json.dumps(rels, indent=4), flush=True)
for rel in rels:
s = rel["subject"]
p = rel["predicate"]
o = rel["object"]
s_uri = self.to_uri(s)
s_value = Value(value=str(s_uri), is_uri=True)
p_uri = self.to_uri(p)
p_value = Value(value=str(p_uri), is_uri=True)
if rel["object-entity"]:
o_uri = self.to_uri(o)
o_value = Value(value=str(o_uri), is_uri=True)
else:
o_value = Value(value=str(o), is_uri=False)
self.emit_edge(
s_value,
p_value,
o_value
)
# Label for s
self.emit_edge(
s_value,
RDF_LABEL_VALUE,
Value(value=str(s), is_uri=False)
)
# Label for p
self.emit_edge(
p_value,
RDF_LABEL_VALUE,
Value(value=str(p), is_uri=False)
)
if rel["object-entity"]:
# Label for o
self.emit_edge(
o_value,
RDF_LABEL_VALUE,
Value(value=str(o), is_uri=False)
)
self.emit_vec(s_value, v.vectors)
self.emit_vec(p_value, v.vectors)
if rel["object-entity"]:
self.emit_vec(o_value, v.vectors)
except Exception as e:
print("Exception: ", e, flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception: ", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='kg-extract-relationships',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-chunk-load'
default_output_queue = 'graph-load'
default_subscriber = 'kg-extract-relationships'
default_vector_queue='vectors-load'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-c', '--vector-queue',
default=default_vector_queue,
help=f'Vector output queue (default: {default_vector_queue})'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
vec_queue=args.vector_queue,
subscriber=args.subscriber,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

View file

@ -0,0 +1,3 @@
from . llm import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . llm import run
if __name__ == '__main__':
run()

213
trustgraph/llm/azure_text/llm.py Executable file
View file

@ -0,0 +1,213 @@
"""
Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.llms import Ollama
import requests
import time
import json
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
endpoint,
token,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
)
self.endpoint = endpoint
self.token = token
def build_prompt(self, system, content):
data = {
"messages": [
{
"role": "system", "content": system
},
{
"role": "user", "content": content
}
],
"max_tokens": 4192,
"temperature": 0.2,
"top_p": 1
}
body = json.dumps(data)
return body
def call_llm(self, body):
url = self.endpoint
# Replace this with the primary/secondary key, AMLToken, or
# Microsoft Entra ID token for the endpoint
api_key = self.token
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
resp = requests.post(url, data=body, headers=headers)
result = resp.json()
message_content = result['choices'][0]['message']['content']
return message_content
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = self.build_prompt(
"You are a helpful chatbot",
v.prompt
)
response = self.call_llm(prompt)
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-ollama-text'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-e', '--endpoint',
help=f'LLM model endpoint'
)
parser.add_argument(
'-k', '--token',
help=f'LLM model token'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
endpoint=args.endpoint,
token=args.token,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,3 @@
from . llm import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . llm import run
if __name__ == '__main__':
run()

190
trustgraph/llm/claude_text/llm.py Executable file
View file

@ -0,0 +1,190 @@
"""
Simple LLM service, performs text prompt completion using Claude.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import anthropic
import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
model,
api_key,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
)
self.model = model
self.claude = anthropic.Anthropic(api_key=api_key)
print("Initialised", flush=True)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
response = message = self.claude.messages.create(
model=self.model,
max_tokens=1000,
temperature=0.1,
system = "You are a helpful chatbot.",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
}
]
}
]
)
resp = response.content[0].text
print(resp, flush=True)
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-claude-text'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="claude-3-5-sonnet-20240620",
help=f'LLM model (default: claude-3-5-sonnet-20240620)'
)
parser.add_argument(
'-k', '--api-key',
help=f'Claude API key'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
api_key=args.api_key,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,3 @@
from . llm import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . llm import run
if __name__ == '__main__':
run()

169
trustgraph/llm/ollama_text/llm.py Executable file
View file

@ -0,0 +1,169 @@
"""
Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.llms import Ollama
import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
model,
ollama,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
)
self.llm = Ollama(base_url=ollama, model=model)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
response = self.llm.invoke(prompt)
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
print("Closing")
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-ollama-text'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="gemma2",
help=f'LLM model (default: gemma2)'
)
parser.add_argument(
'-r', '--ollama',
default="http://localhost:11434",
help=f'ollama (default: http://localhost:11434)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
ollama=args.ollama,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -0,0 +1,3 @@
from . llm import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . llm import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,254 @@
"""
Simple LLM service, performs text prompt completion using an Ollama service.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import vertexai
import time
from google.oauth2 import service_account
import google
from vertexai.preview.generative_models import (
Content,
FunctionDeclaration,
GenerativeModel,
GenerationConfig,
HarmCategory,
HarmBlockThreshold,
Part,
Tool,
)
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
credentials,
region,
model,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
)
self.parameters = {
"temperature": 0.2,
"top_p": 1.0,
"top_k": 32,
"candidate_count": 1,
"max_output_tokens": 8192,
}
self.generation_config = GenerationConfig(
temperature=0.2,
top_p=1.0,
top_k=10,
candidate_count=1,
max_output_tokens=8191,
)
# Block none doesn't seem to work
block_level = HarmBlockThreshold.BLOCK_ONLY_HIGH
# block_level = HarmBlockThreshold.BLOCK_NONE
self.safety_settings = {
HarmCategory.HARM_CATEGORY_HARASSMENT: block_level,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: block_level,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: block_level,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: block_level,
}
print("Initialise VertexAI...", flush=True)
if credentials:
vertexai.init(
location=region,
credentials=credentials,
project=credentials.project_id,
)
else:
vertexai.init(
location=region
)
print(f"Initialise model {model}", flush=True)
self.llm = GenerativeModel(model)
print("Initialisation complete", flush=True)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
resp = self.llm.generate_content(
prompt, generation_config=self.generation_config,
safety_settings=self.safety_settings
)
resp = resp.text
resp = resp.replace("```json", "")
resp = resp.replace("```", "")
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except google.api_core.exceptions.ResourceExhausted:
print("429, resource busy, sleeping", flush=True)
time.sleep(15)
self.consumer.negative_acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-vertexai-text'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="gemini-1.0-pro-001",
help=f'LLM model (default: gemini-1.0-pro-001)'
)
# Also: text-bison-32k
parser.add_argument(
'-k', '--private-key',
help=f'Google Cloud private JSON file'
)
parser.add_argument(
'-r', '--region',
default='us-west1',
help=f'Google Cloud region (default: us-west1)',
)
args = parser.parse_args()
if args.private_key:
credentials = service_account.Credentials.from_service_account_file(
args.private_key
)
else:
credentials = None
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
credentials=credentials,
region=args.region,
model=args.model,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

71
trustgraph/llm_client.py Normal file
View file

@ -0,0 +1,71 @@
#!/usr/bin/env python3
import pulsar
import _pulsar
from pulsar.schema import JsonSchema
from trustgraph.schema import TextCompletionRequest, TextCompletionResponse
import hashlib
import uuid
# Ugly
ERROR=_pulsar.LoggerLevel.Error
WARN=_pulsar.LoggerLevel.Warn
INFO=_pulsar.LoggerLevel.Info
DEBUG=_pulsar.LoggerLevel.Debug
class LlmClient:
def __init__(
self, log_level=ERROR, client_id=None,
pulsar_host="pulsar://pulsar:6650",
):
if client_id == None:
client_id = str(uuid.uuid4())
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level),
)
self.producer = self.client.create_producer(
topic='llm-complete-text',
schema=JsonSchema(TextCompletionRequest),
chunking_enabled=True,
)
self.consumer = self.client.subscribe(
'llm-complete-text-response', client_id,
schema=JsonSchema(TextCompletionResponse),
)
def request(self, prompt, timeout=500):
id = str(uuid.uuid4())
r = TextCompletionRequest(
prompt=prompt
)
self.producer.send(r, properties={ "id": id })
while True:
msg = self.consumer.receive(timeout_millis=timeout * 1000)
mid = msg.properties()["id"]
if mid == id:
resp = msg.value().response
self.consumer.acknowledge(msg)
return resp
# Ignore messages with wrong ID
self.consumer.acknowledge(msg)
def __del__(self):
self.producer.close()
self.consumer.close()
self.client.close()

20
trustgraph/log_level.py Normal file
View file

@ -0,0 +1,20 @@
from enum import Enum
import _pulsar
class LogLevel(Enum):
DEBUG = 'debug'
INFO = 'info'
WARN = 'warn'
ERROR = 'error'
def __str__(self):
return self.value
def to_pulsar(self):
if self == LogLevel.DEBUG: return _pulsar.LoggerLevel.Debug
if self == LogLevel.INFO: return _pulsar.LoggerLevel.Info
if self == LogLevel.WARN: return _pulsar.LoggerLevel.Warn
if self == LogLevel.ERROR: return _pulsar.LoggerLevel.Error
raise RuntimeError("Log level mismatch")

138
trustgraph/prompts.py Normal file
View file

@ -0,0 +1,138 @@
def turtle_extract(text):
prompt = f"""<instructions>
Study the following text and extract knowledge as
information in Turtle RDF format.
When declaring any new URIs, use <https://trustgraph.ai/e#> prefix,
and declare appropriate namespace tags.
</instructions>
<text>
{text}
</text>
<requirements>
Do not use placeholders for information you do not know.
You will respond only with raw Turtle RDF data. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract must be written as plain text. Do not add markdown formatting.
</requirements>"""
return prompt
def scholar(text):
# Build the prompt for Article style extraction
jsonexample = """{
"title": "Article title here",
"abstract": "Abstract text here",
"keywords": ["keyword1", "keyword2", "keyword3"],
"people": ["person1", "person2", "person3"]
}"""
promptscholar = f"""Your task is to read the provided text and write a scholarly abstract to fully explain all of the concepts described in the provided text. The abstract must include all conceptual details.
<text>
{text}
</text>
<instructions>
- Structure: For the provided text, write a title, abstract, keywords,
and people for the concepts found in the provided text. Ignore
document formatting in the provided text such as table of contents,
headers, footers, section metadata, and URLs.
- Focus on Concepts The abstract must focus on concepts found in the
provided text. The abstract must be factually accurate. Do not
write any concepts not found in the provided text. Do not
speculate. Do not omit any conceptual details.
- Completeness: The abstract must capture all topics the reader will
need to understand the concepts found in the provided text. Describe
all terms, definitions, entities, people, events, concepts,
conceptual relationships, and any other topics necessary for the
reader to understand the concepts of the provided text.
- Format: Respond in the form of a valid JSON object.
</instructions>
<example>
{jsonexample}
</example>
<requirements>
You will respond only with the JSON object. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract must be written as plain text.
</requirements>"""
return promptscholar
def to_json_ld(text):
prompt = f"""<instructions>
Study the following text and output any facts you discover in
well-structured JSON-LD format.
Use any schema you understand from schema.org to describe the facts.
</instructions>
<text>
{text}
</text>
<requirements>
You will respond only with raw JSON-LD data in JSON format. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract must be written as plain text. Do not add markdown formatting
or headers or prefixes. Do not use information which is not present in
the input text.
</requirements>"""
return prompt
def to_relationships(text):
prompt = f"""<instructions>
Study the following text and derive entity relationships. For each
relationship, derive the subject, predicate and object of the relationship.
Output relationships in JSON format as an arary of objects with fields:
- subject: the subject of the relationship
- predicate: the predicate
- object: the object of the relationship
- object-entity: false if the object is a simple data type: name, value or date. true if it is an entity.
</instructions>
<text>
{text}
</text>
<requirements>
You will respond only with raw JSON format data. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract must be written as plain text. Do not add markdown formatting
or headers or prefixes.
</requirements>"""
return prompt
def to_definitions(text):
prompt = f"""<instructions>
Study the following text and derive definitions for any discovered entities.
Do not provide definitions for entities whose definitions are incomplete
or unknown.
Output relationships in JSON format as an arary of objects with fields:
- entity: the name of the entity
- definition: English text which defines the entity
</instructions>
<text>
{text}
</text>
<requirements>
You will respond only with raw JSON format data. Do not provide
explanations. Do not use special characters in the abstract text. The
abstract will be written as plain text. Do not add markdown formatting
or headers or prefixes. Do not include null or unknown definitions.
</requirements>"""
return prompt

View file

View file

@ -0,0 +1,3 @@
from . rag import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . rag import run
if __name__ == '__main__':
run()

172
trustgraph/rag/graph/rag.py Executable file
View file

@ -0,0 +1,172 @@
"""
Simple RAG service, performs query using graph RAG an LLM.
Input is query, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import time
from ... schema import GraphRagQuery, GraphRagResponse
from ... log_level import LogLevel
from ... graph_rag import GraphRag
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
output_queue,
subscriber,
log_level,
graph_hosts,
vector_store,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(GraphRagQuery),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(GraphRagResponse),
)
self.rag = GraphRag(
pulsar_host=pulsar_host,
graph_hosts=graph_hosts,
vector_store=vector_store,
verbose=True,
)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
response = self.rag.query(v.query)
print("Send response...", flush=True)
r = GraphRagResponse(response = response)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
print("Closing", flush=True)
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'graph-rag-query'
default_output_queue = 'graph-rag-response'
default_subscriber = 'graph-rag'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-g', '--graph-hosts',
default='cassandra',
help=f'Graph hosts, comma separated (default: cassandra)'
)
parser.add_argument(
'-v', '--vector-store',
default='http://milvus:19530',
help=f'Vector host (default: http://milvus:19530)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
graph_hosts=args.graph_hosts.split(","),
vector_store=args.vector_store,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

6
trustgraph/rdf.py Normal file
View file

@ -0,0 +1,6 @@
RDF_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
DEFINITION = "http://www.w3.org/2004/02/skos/core#definition"
TRUSTGRAPH_ENTITIES = "http://trustgraph.ai/e/"

67
trustgraph/schema.py Normal file
View file

@ -0,0 +1,67 @@
from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double
from enum import Enum
#class Command(Enum):
# reindex = 1
#class IndexCommand(Record):
# command = Command
class Value(Record):
value = String()
is_uri = Boolean()
type = String()
class Source(Record):
source = String()
id = String()
title = String()
class Document(Record):
source = Source()
data = Bytes()
class TextDocument(Record):
source = Source()
text = Bytes()
class Chunk(Record):
source = Source()
chunk = Bytes()
class VectorsChunk(Record):
source = Source()
vectors = Array(Array(Double()))
chunk = Bytes()
class VectorsAssociation(Record):
source = Source()
vectors = Array(Array(Double()))
entity = Value()
class Triple(Record):
source = Source()
s = Value()
p = Value()
o = Value()
class TextCompletionRequest(Record):
prompt = String()
class TextCompletionResponse(Record):
response = String()
class EmbeddingsRequest(Record):
text = String()
class EmbeddingsResponse(Record):
vectors = Array(Array(Double()))
class GraphRagQuery(Record):
query = String()
class GraphRagResponse(Record):
response = String()

108
trustgraph/trustgraph.py Normal file
View file

@ -0,0 +1,108 @@
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
class TrustGraph:
def __init__(self, hosts=None):
if hosts is None:
hosts = ["localhost"]
self.cluster = Cluster(hosts)
self.session = self.cluster.connect()
self.init()
def clear(self):
self.session.execute("""
drop keyspace if exists trustgraph;
""");
self.init()
def init(self):
self.session.execute("""
create keyspace if not exists trustgraph
with replication = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
""");
self.session.set_keyspace('trustgraph')
self.session.execute("""
create table if not exists triples (
s text,
p text,
o text,
PRIMARY KEY (s, p)
);
""");
self.session.execute("""
create index if not exists triples_p
ON triples (p);
""");
self.session.execute("""
create index if not exists triples_o
ON triples (o);
""");
def insert(self, s, p, o):
self.session.execute(
"insert into triples (s, p, o) values (%s, %s, %s)",
(s, p, o)
)
def get_all(self, limit=50):
return self.session.execute(
f"select s, p, o from triples limit {limit}"
)
def get_s(self, s, limit=10):
return self.session.execute(
f"select p, o from triples where s = %s",
(s,)
)
def get_p(self, p, limit=10):
return self.session.execute(
f"select s, o from triples where p = %s limit {limit}",
(p,)
)
def get_o(self, o, limit=10):
return self.session.execute(
f"select s, p from triples where o = %s limit {limit}",
(o,)
)
def get_sp(self, s, p, limit=10):
return self.session.execute(
f"select o from triples where s = %s and p = %s limit {limit}",
(s, p)
)
def get_po(self, p, o, limit=10):
return self.session.execute(
f"select s from triples where p = %s and o = %s allow filtering limit {limit}",
(p, o)
)
def get_os(self, o, s, limit=10):
return self.session.execute(
f"select s from triples where o = %s and s = %s limit {limit}",
(o, s)
)
def get_spo(self, s, p, o, limit=10):
return self.session.execute(
f"""select s as x from triples where s = %s and p = %s and o = %s limit {limit}""",
(s, p, o)
)

View file

View file

@ -0,0 +1,3 @@
from . write import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,136 @@
"""
Simple decoder, accepts PDF documents on input, outputs pages from the
PDF document as text as separate output objects.
"""
import pulsar
from pulsar.schema import JsonSchema
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import base64
import os
import argparse
import time
from ... schema import VectorsAssociation
from ... log_level import LogLevel
from ... edge_map import VectorStore
class Processor:
def __init__(
self,
pulsar_host,
input_queue,
subscriber,
store_uri,
log_level,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(VectorsAssociation),
)
self.vecstore = VectorStore(store_uri)
def run(self):
while True:
msg = self.consumer.receive()
try:
v = msg.value()
if v.entity.value != "":
for vec in v.vectors:
self.vecstore.insert(vec, v.entity.value)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
def run():
parser = argparse.ArgumentParser(
prog='pdf-decoder',
description=__doc__,
)
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'vectors-load'
default_subscriber = 'vector-write-milvus'
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-t', '--store-uri',
default="http://localhost:19530",
help=f'Milvus store URI (default: http://localhost:19530)'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
subscriber=args.subscriber,
store_uri=args.store_uri,
log_level=args.log_level,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)