Increase storage test coverage (#435)

* Fixing storage and adding tests

* PR pipeline only runs quick tests
This commit is contained in:
cybermaggedon 2025-07-15 09:33:35 +01:00 committed by GitHub
parent 4daa54abaf
commit f37decea2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 7606 additions and 754 deletions

View file

@ -5,94 +5,56 @@ of chunks
"""
from .... direct.milvus_doc_embeddings import DocVectors
from .... schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
from .... schema import DocumentEmbeddingsResponse
from .... schema import Error, Value
from .... schema import document_embeddings_request_queue
from .... schema import document_embeddings_response_queue
from .... base import ConsumerProducer
from .... base import DocumentEmbeddingsQueryService
module = "de-query"
default_input_queue = document_embeddings_request_queue
default_output_queue = document_embeddings_response_queue
default_subscriber = module
default_ident = "de-query"
default_store_uri = 'http://localhost:19530'
class Processor(ConsumerProducer):
class Processor(DocumentEmbeddingsQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
store_uri = params.get("store_uri", default_store_uri)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": DocumentEmbeddingsRequest,
"output_schema": DocumentEmbeddingsResponse,
"store_uri": store_uri,
}
)
self.vecstore = DocVectors(store_uri)
async def handle(self, msg):
async def query_document_embeddings(self, msg):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
# Handle zero limit case
if msg.limit <= 0:
return []
chunks = []
for vec in v.vectors:
for vec in msg.vectors:
resp = self.vecstore.search(vec, limit=v.limit)
resp = self.vecstore.search(vec, limit=msg.limit)
for r in resp:
chunk = r["entity"]["doc"]
chunk = chunk.encode("utf-8")
chunks.append(chunk)
print("Send response...", flush=True)
r = DocumentEmbeddingsResponse(documents=chunks, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return chunks
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = DocumentEmbeddingsResponse(
error=Error(
type = "llm-error",
message = str(e),
),
documents=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
DocumentEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
@ -102,5 +64,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -10,30 +10,21 @@ from pinecone.grpc import PineconeGRPC, GRPCClientConfig
import uuid
import os
from .... schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
from .... schema import Error, Value
from .... schema import document_embeddings_request_queue
from .... schema import document_embeddings_response_queue
from .... base import ConsumerProducer
from .... base import DocumentEmbeddingsQueryService
module = "de-query"
default_input_queue = document_embeddings_request_queue
default_output_queue = document_embeddings_response_queue
default_subscriber = module
default_ident = "de-query"
default_api_key = os.getenv("PINECONE_API_KEY", "not-specified")
class Processor(ConsumerProducer):
class Processor(DocumentEmbeddingsQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
self.url = params.get("url", None)
self.api_key = params.get("api_key", default_api_key)
if self.api_key is None or self.api_key == "not-specified":
raise RuntimeError("Pinecone API key must be specified")
if self.url:
self.pinecone = PineconeGRPC(
@ -47,88 +38,53 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": DocumentEmbeddingsRequest,
"output_schema": DocumentEmbeddingsResponse,
"url": self.url,
"api_key": self.api_key,
}
)
async def handle(self, msg):
async def query_document_embeddings(self, msg):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
# Handle zero limit case
if msg.limit <= 0:
return []
chunks = []
for vec in v.vectors:
for vec in msg.vectors:
dim = len(vec)
index_name = (
"d-" + v.user + "-" + str(dim)
"d-" + msg.user + "-" + msg.collection + "-" + str(dim)
)
index = self.pinecone.Index(index_name)
results = index.query(
namespace=v.collection,
vector=vec,
top_k=v.limit,
top_k=msg.limit,
include_values=False,
include_metadata=True
)
search_result = self.client.query_points(
collection_name=collection,
query=vec,
limit=v.limit,
with_payload=True,
).points
for r in results.matches:
doc = r.metadata["doc"]
chunks.add(doc)
chunks.append(doc)
print("Send response...", flush=True)
r = DocumentEmbeddingsResponse(documents=chunks, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return chunks
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = DocumentEmbeddingsResponse(
error=Error(
type = "llm-error",
message = str(e),
),
documents=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
DocumentEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-a', '--api-key',
@ -143,5 +99,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -5,35 +5,21 @@ entities
"""
from .... direct.milvus_graph_embeddings import EntityVectors
from .... schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
from .... schema import GraphEmbeddingsResponse
from .... schema import Error, Value
from .... schema import graph_embeddings_request_queue
from .... schema import graph_embeddings_response_queue
from .... base import ConsumerProducer
from .... base import GraphEmbeddingsQueryService
module = "ge-query"
default_input_queue = graph_embeddings_request_queue
default_output_queue = graph_embeddings_response_queue
default_subscriber = module
default_ident = "ge-query"
default_store_uri = 'http://localhost:19530'
class Processor(ConsumerProducer):
class Processor(GraphEmbeddingsQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
store_uri = params.get("store_uri", default_store_uri)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": GraphEmbeddingsRequest,
"output_schema": GraphEmbeddingsResponse,
"store_uri": store_uri,
}
)
@ -46,29 +32,34 @@ class Processor(ConsumerProducer):
else:
return Value(value=ent, is_uri=False)
async def handle(self, msg):
async def query_graph_embeddings(self, msg):
try:
v = msg.value()
entity_set = set()
entities = []
# Sender-produced ID
id = msg.properties()["id"]
# Handle zero limit case
if msg.limit <= 0:
return []
print(f"Handling input {id}...", flush=True)
for vec in msg.vectors:
entities = set()
for vec in v.vectors:
resp = self.vecstore.search(vec, limit=v.limit)
resp = self.vecstore.search(vec, limit=msg.limit * 2)
for r in resp:
ent = r["entity"]["entity"]
entities.add(ent)
# De-dupe entities
if ent not in entity_set:
entity_set.add(ent)
entities.append(ent)
# Convert set to list
entities = list(entities)
# Keep adding entities until limit
if len(entity_set) >= msg.limit: break
# Keep adding entities until limit
if len(entity_set) >= msg.limit: break
ents2 = []
@ -78,36 +69,19 @@ class Processor(ConsumerProducer):
entities = ents2
print("Send response...", flush=True)
r = GraphEmbeddingsResponse(entities=entities, error=None)
await self.send(r, properties={"id": id})
return entities
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = GraphEmbeddingsResponse(
error=Error(
type = "llm-error",
message = str(e),
),
entities=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
GraphEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-t', '--store-uri',
@ -117,5 +91,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -10,30 +10,23 @@ from pinecone.grpc import PineconeGRPC, GRPCClientConfig
import uuid
import os
from .... schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse
from .... schema import GraphEmbeddingsResponse
from .... schema import Error, Value
from .... schema import graph_embeddings_request_queue
from .... schema import graph_embeddings_response_queue
from .... base import ConsumerProducer
from .... base import GraphEmbeddingsQueryService
module = "ge-query"
default_input_queue = graph_embeddings_request_queue
default_output_queue = graph_embeddings_response_queue
default_subscriber = module
default_ident = "ge-query"
default_api_key = os.getenv("PINECONE_API_KEY", "not-specified")
class Processor(ConsumerProducer):
class Processor(GraphEmbeddingsQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
self.url = params.get("url", None)
self.api_key = params.get("api_key", default_api_key)
if self.api_key is None or self.api_key == "not-specified":
raise RuntimeError("Pinecone API key must be specified")
if self.url:
self.pinecone = PineconeGRPC(
@ -47,12 +40,8 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": GraphEmbeddingsRequest,
"output_schema": GraphEmbeddingsResponse,
"url": self.url,
"api_key": self.api_key,
}
)
@ -62,26 +51,23 @@ class Processor(ConsumerProducer):
else:
return Value(value=ent, is_uri=False)
async def handle(self, msg):
async def query_graph_embeddings(self, msg):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
# Handle zero limit case
if msg.limit <= 0:
return []
entity_set = set()
entities = []
for vec in v.vectors:
for vec in msg.vectors:
dim = len(vec)
index_name = (
"t-" + v.user + "-" + str(dim)
"t-" + msg.user + "-" + msg.collection + "-" + str(dim)
)
index = self.pinecone.Index(index_name)
@ -89,9 +75,8 @@ class Processor(ConsumerProducer):
# Heuristic hack, get (2*limit), so that we have more chance
# of getting (limit) entities
results = index.query(
namespace=v.collection,
vector=vec,
top_k=v.limit * 2,
top_k=msg.limit * 2,
include_values=False,
include_metadata=True
)
@ -106,10 +91,10 @@ class Processor(ConsumerProducer):
entities.append(ent)
# Keep adding entities until limit
if len(entity_set) >= v.limit: break
if len(entity_set) >= msg.limit: break
# Keep adding entities until limit
if len(entity_set) >= v.limit: break
if len(entity_set) >= msg.limit: break
ents2 = []
@ -118,37 +103,17 @@ class Processor(ConsumerProducer):
entities = ents2
print("Send response...", flush=True)
r = GraphEmbeddingsResponse(entities=entities, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return entities
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = GraphEmbeddingsResponse(
error=Error(
type = "llm-error",
message = str(e),
),
entities=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
GraphEmbeddingsQueryService.add_args(parser)
parser.add_argument(
'-a', '--api-key',
@ -163,5 +128,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -9,37 +9,24 @@ from falkordb import FalkorDB
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Value, Triple
from .... schema import triples_request_queue
from .... schema import triples_response_queue
from .... base import ConsumerProducer
from .... base import TriplesQueryService
module = "triples-query"
default_input_queue = triples_request_queue
default_output_queue = triples_response_queue
default_subscriber = module
default_ident = "triples-query"
default_graph_url = 'falkor://falkordb:6379'
default_database = 'falkordb'
class Processor(ConsumerProducer):
class Processor(TriplesQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_url = params.get("graph_host", default_graph_url)
graph_url = params.get("graph_url", default_graph_url)
database = params.get("database", default_database)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TriplesQueryRequest,
"output_schema": TriplesQueryResponse,
"graph_url": graph_url,
"database": database,
}
)
@ -54,50 +41,45 @@ class Processor(ConsumerProducer):
else:
return Value(value=ent, is_uri=False)
async def handle(self, msg):
async def query_triples(self, query):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = []
if v.s is not None:
if v.p is not None:
if v.o is not None:
if query.s is not None:
if query.p is not None:
if query.o is not None:
# SPO
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) "
"RETURN $src as src",
"RETURN $src as src "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"rel": v.p.value,
"value": v.o.value,
"src": query.s.value,
"rel": query.p.value,
"value": query.o.value,
},
).result_set
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) "
"RETURN $src as src",
"RETURN $src as src "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"rel": v.p.value,
"uri": v.o.value,
"src": query.s.value,
"rel": query.p.value,
"uri": query.o.value,
},
).result_set
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
else:
@ -105,116 +87,124 @@ class Processor(ConsumerProducer):
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) "
"RETURN dest.value as dest",
"RETURN dest.value as dest "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"rel": v.p.value,
"src": query.s.value,
"rel": query.p.value,
},
).result_set
for rec in records:
triples.append((v.s.value, v.p.value, rec[0]))
triples.append((query.s.value, query.p.value, rec[0]))
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) "
"RETURN dest.uri as dest",
"RETURN dest.uri as dest "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"rel": v.p.value,
"src": query.s.value,
"rel": query.p.value,
},
).result_set
for rec in records:
triples.append((v.s.value, v.p.value, rec[0]))
triples.append((query.s.value, query.p.value, rec[0]))
else:
if v.o is not None:
if query.o is not None:
# SO
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN rel.uri as rel",
"RETURN rel.uri as rel "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"value": v.o.value,
"src": query.s.value,
"value": query.o.value,
},
).result_set
for rec in records:
triples.append((v.s.value, rec[0], v.o.value))
triples.append((query.s.value, rec[0], query.o.value))
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN rel.uri as rel",
"RETURN rel.uri as rel "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"uri": v.o.value,
"src": query.s.value,
"uri": query.o.value,
},
).result_set
for rec in records:
triples.append((v.s.value, rec[0], v.o.value))
triples.append((query.s.value, rec[0], query.o.value))
else:
# s
records = self.io.query(
"match (src:node {uri: $src})-[rel:rel]->(dest:literal) "
"return rel.uri as rel, dest.value as dest",
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) "
"RETURN rel.uri as rel, dest.value as dest "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"src": query.s.value,
},
).result_set
for rec in records:
triples.append((v.s.value, rec[0], rec[1]))
triples.append((query.s.value, rec[0], rec[1]))
records = self.io.query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) "
"RETURN rel.uri as rel, dest.uri as dest",
"RETURN rel.uri as rel, dest.uri as dest "
"LIMIT " + str(query.limit),
params={
"src": v.s.value,
"src": query.s.value,
},
).result_set
for rec in records:
triples.append((v.s.value, rec[0], rec[1]))
triples.append((query.s.value, rec[0], rec[1]))
else:
if v.p is not None:
if query.p is not None:
if v.o is not None:
if query.o is not None:
# PO
records = self.io.query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) "
"RETURN src.uri as src",
"RETURN src.uri as src "
"LIMIT " + str(query.limit),
params={
"uri": v.p.value,
"value": v.o.value,
"uri": query.p.value,
"value": query.o.value,
},
).result_set
for rec in records:
triples.append((rec[0], v.p.value, v.o.value))
triples.append((rec[0], query.p.value, query.o.value))
records = self.io.query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src",
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $dest}) "
"RETURN src.uri as src "
"LIMIT " + str(query.limit),
params={
"uri": v.p.value,
"dest": v.o.value,
"uri": query.p.value,
"dest": query.o.value,
},
).result_set
for rec in records:
triples.append((rec[0], v.p.value, v.o.value))
triples.append((rec[0], query.p.value, query.o.value))
else:
@ -222,53 +212,57 @@ class Processor(ConsumerProducer):
records = self.io.query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) "
"RETURN src.uri as src, dest.value as dest",
"RETURN src.uri as src, dest.value as dest "
"LIMIT " + str(query.limit),
params={
"uri": v.p.value,
"uri": query.p.value,
},
).result_set
for rec in records:
triples.append((rec[0], v.p.value, rec[1]))
triples.append((rec[0], query.p.value, rec[1]))
records = self.io.query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) "
"RETURN src.uri as src, dest.uri as dest",
"RETURN src.uri as src, dest.uri as dest "
"LIMIT " + str(query.limit),
params={
"uri": v.p.value,
"uri": query.p.value,
},
).result_set
for rec in records:
triples.append((rec[0], v.p.value, rec[1]))
triples.append((rec[0], query.p.value, rec[1]))
else:
if v.o is not None:
if query.o is not None:
# O
records = self.io.query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN src.uri as src, rel.uri as rel",
"RETURN src.uri as src, rel.uri as rel "
"LIMIT " + str(query.limit),
params={
"value": v.o.value,
"value": query.o.value,
},
).result_set
for rec in records:
triples.append((rec[0], rec[1], v.o.value))
triples.append((rec[0], rec[1], query.o.value))
records = self.io.query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src, rel.uri as rel",
"RETURN src.uri as src, rel.uri as rel "
"LIMIT " + str(query.limit),
params={
"uri": v.o.value,
"uri": query.o.value,
},
).result_set
for rec in records:
triples.append((rec[0], rec[1], v.o.value))
triples.append((rec[0], rec[1], query.o.value))
else:
@ -276,7 +270,8 @@ class Processor(ConsumerProducer):
records = self.io.query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal) "
"RETURN src.uri as src, rel.uri as rel, dest.value as dest",
"RETURN src.uri as src, rel.uri as rel, dest.value as dest "
"LIMIT " + str(query.limit),
).result_set
for rec in records:
@ -284,7 +279,8 @@ class Processor(ConsumerProducer):
records = self.io.query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node) "
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest",
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest "
"LIMIT " + str(query.limit),
).result_set
for rec in records:
@ -296,40 +292,20 @@ class Processor(ConsumerProducer):
p=self.create_value(t[1]),
o=self.create_value(t[2])
)
for t in triples
for t in triples[:query.limit]
]
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return triples
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = TriplesQueryResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
TriplesQueryService.add_args(parser)
parser.add_argument(
'-g', '--graph-url',
@ -345,5 +321,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -9,28 +9,19 @@ from neo4j import GraphDatabase
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Value, Triple
from .... schema import triples_request_queue
from .... schema import triples_response_queue
from .... base import ConsumerProducer
from .... base import TriplesQueryService
module = "triples-query"
default_input_queue = triples_request_queue
default_output_queue = triples_response_queue
default_subscriber = module
default_ident = "triples-query"
default_graph_host = 'bolt://memgraph:7687'
default_username = 'memgraph'
default_password = 'password'
default_database = 'memgraph'
class Processor(ConsumerProducer):
class Processor(TriplesQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("password", default_password)
@ -38,12 +29,9 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TriplesQueryRequest,
"output_schema": TriplesQueryResponse,
"graph_host": graph_host,
"username": username,
"database": database,
}
)
@ -58,46 +46,39 @@ class Processor(ConsumerProducer):
else:
return Value(value=ent, is_uri=False)
async def handle(self, msg):
async def query_triples(self, query):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = []
if v.s is not None:
if v.p is not None:
if v.o is not None:
if query.s is not None:
if query.p is not None:
if query.o is not None:
# SPO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) "
"RETURN $src as src "
"LIMIT " + str(v.limit),
src=v.s.value, rel=v.p.value, value=v.o.value,
"LIMIT " + str(query.limit),
src=query.s.value, rel=query.p.value, value=query.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) "
"RETURN $src as src "
"LIMIT " + str(v.limit),
src=v.s.value, rel=v.p.value, uri=v.o.value,
"LIMIT " + str(query.limit),
src=query.s.value, rel=query.p.value, uri=query.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
else:
@ -106,56 +87,56 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) "
"RETURN dest.value as dest "
"LIMIT " + str(v.limit),
src=v.s.value, rel=v.p.value,
"LIMIT " + str(query.limit),
src=query.s.value, rel=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
triples.append((query.s.value, query.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) "
"RETURN dest.uri as dest "
"LIMIT " + str(v.limit),
src=v.s.value, rel=v.p.value,
"LIMIT " + str(query.limit),
src=query.s.value, rel=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
triples.append((query.s.value, query.p.value, data["dest"]))
else:
if v.o is not None:
if query.o is not None:
# SO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN rel.uri as rel "
"LIMIT " + str(v.limit),
src=v.s.value, value=v.o.value,
"LIMIT " + str(query.limit),
src=query.s.value, value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
triples.append((query.s.value, data["rel"], query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN rel.uri as rel "
"LIMIT " + str(v.limit),
src=v.s.value, uri=v.o.value,
"LIMIT " + str(query.limit),
src=query.s.value, uri=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
triples.append((query.s.value, data["rel"], query.o.value))
else:
@ -164,59 +145,59 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) "
"RETURN rel.uri as rel, dest.value as dest "
"LIMIT " + str(v.limit),
src=v.s.value,
"LIMIT " + str(query.limit),
src=query.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
triples.append((query.s.value, data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) "
"RETURN rel.uri as rel, dest.uri as dest "
"LIMIT " + str(v.limit),
src=v.s.value,
"LIMIT " + str(query.limit),
src=query.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
triples.append((query.s.value, data["rel"], data["dest"]))
else:
if v.p is not None:
if query.p is not None:
if v.o is not None:
if query.o is not None:
# PO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) "
"RETURN src.uri as src "
"LIMIT " + str(v.limit),
uri=v.p.value, value=v.o.value,
"LIMIT " + str(query.limit),
uri=query.p.value, value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
triples.append((data["src"], query.p.value, query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) "
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $dest}) "
"RETURN src.uri as src "
"LIMIT " + str(v.limit),
uri=v.p.value, dest=v.o.value,
"LIMIT " + str(query.limit),
uri=query.p.value, dest=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
triples.append((data["src"], query.p.value, query.o.value))
else:
@ -225,56 +206,56 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) "
"RETURN src.uri as src, dest.value as dest "
"LIMIT " + str(v.limit),
uri=v.p.value,
"LIMIT " + str(query.limit),
uri=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
triples.append((data["src"], query.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) "
"RETURN src.uri as src, dest.uri as dest "
"LIMIT " + str(v.limit),
uri=v.p.value,
"LIMIT " + str(query.limit),
uri=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
triples.append((data["src"], query.p.value, data["dest"]))
else:
if v.o is not None:
if query.o is not None:
# O
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN src.uri as src, rel.uri as rel "
"LIMIT " + str(v.limit),
value=v.o.value,
"LIMIT " + str(query.limit),
value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
triples.append((data["src"], data["rel"], query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src, rel.uri as rel "
"LIMIT " + str(v.limit),
uri=v.o.value,
"LIMIT " + str(query.limit),
uri=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
triples.append((data["src"], data["rel"], query.o.value))
else:
@ -283,7 +264,7 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal) "
"RETURN src.uri as src, rel.uri as rel, dest.value as dest "
"LIMIT " + str(v.limit),
"LIMIT " + str(query.limit),
database_=self.db,
)
@ -294,7 +275,7 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node) "
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest "
"LIMIT " + str(v.limit),
"LIMIT " + str(query.limit),
database_=self.db,
)
@ -308,40 +289,22 @@ class Processor(ConsumerProducer):
p=self.create_value(t[1]),
o=self.create_value(t[2])
)
for t in triples[:v.limit]
for t in triples[:query.limit]
]
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return triples
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = TriplesQueryResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
print(f"Exception: {e}")
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
TriplesQueryService.add_args(parser)
parser.add_argument(
'-g', '--graph-host',
@ -369,5 +332,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -9,28 +9,19 @@ from neo4j import GraphDatabase
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Value, Triple
from .... schema import triples_request_queue
from .... schema import triples_response_queue
from .... base import ConsumerProducer
from .... base import TriplesQueryService
module = "triples-query"
default_input_queue = triples_request_queue
default_output_queue = triples_response_queue
default_subscriber = module
default_ident = "triples-query"
default_graph_host = 'bolt://neo4j:7687'
default_username = 'neo4j'
default_password = 'password'
default_database = 'neo4j'
class Processor(ConsumerProducer):
class Processor(TriplesQueryService):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("password", default_password)
@ -38,12 +29,9 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TriplesQueryRequest,
"output_schema": TriplesQueryResponse,
"graph_host": graph_host,
"username": username,
"database": database,
}
)
@ -58,44 +46,37 @@ class Processor(ConsumerProducer):
else:
return Value(value=ent, is_uri=False)
async def handle(self, msg):
async def query_triples(self, query):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = []
if v.s is not None:
if v.p is not None:
if v.o is not None:
if query.s is not None:
if query.p is not None:
if query.o is not None:
# SPO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, value=v.o.value,
src=query.s.value, rel=query.p.value, value=query.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, uri=v.o.value,
src=query.s.value, rel=query.p.value, uri=query.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
triples.append((query.s.value, query.p.value, query.o.value))
else:
@ -104,52 +85,52 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) "
"RETURN dest.value as dest",
src=v.s.value, rel=v.p.value,
src=query.s.value, rel=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
triples.append((query.s.value, query.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) "
"RETURN dest.uri as dest",
src=v.s.value, rel=v.p.value,
src=query.s.value, rel=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
triples.append((query.s.value, query.p.value, data["dest"]))
else:
if v.o is not None:
if query.o is not None:
# SO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN rel.uri as rel",
src=v.s.value, value=v.o.value,
src=query.s.value, value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
triples.append((query.s.value, data["rel"], query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN rel.uri as rel",
src=v.s.value, uri=v.o.value,
src=query.s.value, uri=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
triples.append((query.s.value, data["rel"], query.o.value))
else:
@ -158,55 +139,55 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) "
"RETURN rel.uri as rel, dest.value as dest",
src=v.s.value,
src=query.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
triples.append((query.s.value, data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) "
"RETURN rel.uri as rel, dest.uri as dest",
src=v.s.value,
src=query.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
triples.append((query.s.value, data["rel"], data["dest"]))
else:
if v.p is not None:
if query.p is not None:
if v.o is not None:
if query.o is not None:
# PO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) "
"RETURN src.uri as src",
uri=v.p.value, value=v.o.value,
uri=query.p.value, value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
triples.append((data["src"], query.p.value, query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) "
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $dest}) "
"RETURN src.uri as src",
uri=v.p.value, dest=v.o.value,
uri=query.p.value, dest=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
triples.append((data["src"], query.p.value, query.o.value))
else:
@ -215,52 +196,52 @@ class Processor(ConsumerProducer):
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) "
"RETURN src.uri as src, dest.value as dest",
uri=v.p.value,
uri=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
triples.append((data["src"], query.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) "
"RETURN src.uri as src, dest.uri as dest",
uri=v.p.value,
uri=query.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
triples.append((data["src"], query.p.value, data["dest"]))
else:
if v.o is not None:
if query.o is not None:
# O
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN src.uri as src, rel.uri as rel",
value=v.o.value,
value=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
triples.append((data["src"], data["rel"], query.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src, rel.uri as rel",
uri=v.o.value,
uri=query.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
triples.append((data["src"], data["rel"], query.o.value))
else:
@ -295,37 +276,17 @@ class Processor(ConsumerProducer):
for t in triples
]
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples, error=None)
await self.send(r, properties={"id": id})
print("Done.", flush=True)
return triples
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = TriplesQueryResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
raise e
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
TriplesQueryService.add_args(parser)
parser.add_argument(
'-g', '--graph-host',
@ -353,5 +314,5 @@ class Processor(ConsumerProducer):
def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)