Feature/memgraph optim (#193)

* Separate memgraph query/write modules to optimise for memgraph
* Used 1GB memory for Memgraph
* Deployed specialised memgraph query/write processors, created memgraph indexes
* One triple is loaded as a single transaction
* Fixed index creation
This commit is contained in:
cybermaggedon 2024-12-06 00:12:49 +00:00 committed by GitHub
parent e3d06ab80b
commit bffaf62490
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 651 additions and 4 deletions

View file

@ -0,0 +1,3 @@
from . service import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . hf import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,357 @@
"""
Triples query service for memgraph.
Input is a (s, p, o) triple, some values may be null. Output is a list of
triples.
"""
from neo4j import GraphDatabase
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Value, Triple
from .... schema import triples_request_queue
from .... schema import triples_response_queue
from .... base import ConsumerProducer
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = triples_request_queue
default_output_queue = triples_response_queue
default_subscriber = module
default_graph_host = 'bolt://memgraph:7687'
default_username = 'memgraph'
default_password = 'password'
default_database = 'memgraph'
class Processor(ConsumerProducer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("password", default_password)
database = params.get("database", default_database)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TriplesQueryRequest,
"output_schema": TriplesQueryResponse,
"graph_host": graph_host,
}
)
self.db = database
self.io = GraphDatabase.driver(graph_host, auth=(username, password))
def create_value(self, ent):
if ent.startswith("http://") or ent.startswith("https://"):
return Value(value=ent, is_uri=True)
else:
return Value(value=ent, is_uri=False)
def handle(self, msg):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = []
if v.s is not None:
if v.p is not None:
if v.o is not None:
# SPO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, value=v.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, uri=v.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
else:
# SP
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) "
"RETURN dest.value as dest",
src=v.s.value, rel=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) "
"RETURN dest.uri as dest",
src=v.s.value, rel=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
else:
if v.o is not None:
# SO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN rel.uri as rel",
src=v.s.value, value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN rel.uri as rel",
src=v.s.value, uri=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
else:
# S
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) "
"RETURN rel.uri as rel, dest.value as dest",
src=v.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) "
"RETURN rel.uri as rel, dest.uri as dest",
src=v.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
else:
if v.p is not None:
if v.o is not None:
# PO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) "
"RETURN src.uri as src",
uri=v.p.value, value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src",
uri=v.p.value, dest=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
else:
# P
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) "
"RETURN src.uri as src, dest.value as dest",
uri=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) "
"RETURN src.uri as src, dest.uri as dest",
uri=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
else:
if v.o is not None:
# O
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN src.uri as src, rel.uri as rel",
value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src, rel.uri as rel",
uri=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
else:
# *
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal) "
"RETURN src.uri as src, rel.uri as rel, dest.value as dest",
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node) "
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest",
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], data["dest"]))
triples = [
Triple(
s=self.create_value(t[0]),
p=self.create_value(t[1]),
o=self.create_value(t[2])
)
for t in triples
]
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples, error=None)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
except Exception as e:
print(f"Exception: {e}")
print("Send error response...", flush=True)
r = TriplesQueryResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
)
self.producer.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument(
'-g', '--graph-host',
default=default_graph_host,
help=f'Graph host (default: {default_graph_host})'
)
parser.add_argument(
'--username',
default=default_username,
help=f'Memgraph username (default: {default_username})'
)
parser.add_argument(
'--password',
default=default_password,
help=f'Memgraph password (default: {default_password})'
)
parser.add_argument(
'--database',
default=default_database,
help=f'Memgraph database (default: {default_database})'
)
def run():
Processor.start(module, __doc__)

View file

@ -1,7 +1,8 @@
"""
Triples query service. Input is a (s, p, o) triple, some values may be
null. Output is a list of triples.
Triples query service for neo4j.
Input is a (s, p, o) triple, some values may be null. Output is a list of
triples.
"""
from neo4j import GraphDatabase

View file

@ -0,0 +1,3 @@
from . write import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,252 @@
"""
Graph writer. Input is graph edge. Writes edges to Cassandra graph.
"""
import pulsar
import base64
import os
import argparse
import time
from neo4j import GraphDatabase
from .... schema import Triples
from .... schema import triples_store_queue
from .... log_level import LogLevel
from .... base import Consumer
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = triples_store_queue
default_subscriber = module
default_graph_host = 'bolt://memgraph:7687'
default_username = 'memgraph'
default_password = 'password'
default_database = 'memgraph'
class Processor(Consumer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("password", default_password)
database = params.get("database", default_database)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": Triples,
"graph_host": graph_host,
}
)
self.db = database
self.io = GraphDatabase.driver(graph_host, auth=(username, password))
with self.io.session(database=self.db) as session:
self.create_indexes(session)
def create_indexes(self, session):
print("Create indexes...", flush=True)
try:
session.run(
"CREATE INDEX ON :Node",
)
except Exception as e:
print(e, flush=True)
# Maybe index already exists
print("Index create failure ignored", flush=True)
try:
session.run(
"CREATE INDEX ON :Node(uri)"
)
except Exception as e:
print(e, flush=True)
# Maybe index already exists
print("Index create failure ignored", flush=True)
try:
session.run(
"CREATE INDEX ON :Literal",
)
except Exception as e:
print(e, flush=True)
# Maybe index already exists
print("Index create failure ignored", flush=True)
try:
session.run(
"CREATE INDEX ON :Literal(value)"
)
except Exception as e:
print(e, flush=True)
# Maybe index already exists
print("Index create failure ignored", flush=True)
print("Index creation done", flush=True)
def create_node(self, uri):
print("Create node", uri)
summary = self.io.execute_query(
"MERGE (n:Node {uri: $uri})",
uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def create_literal(self, value):
print("Create literal", value)
summary = self.io.execute_query(
"MERGE (n:Literal {value: $value})",
value=value,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def relate_node(self, src, uri, dest):
print("Create node rel", src, uri, dest)
summary = self.io.execute_query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Node {uri: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=src, dest=dest, uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def relate_literal(self, src, uri, dest):
print("Create literal rel", src, uri, dest)
summary = self.io.execute_query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Literal {value: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=src, dest=dest, uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def create_triple(self, tx, t):
# Create new s node with given uri, if not exists
result = tx.run(
"MERGE (n:Node {uri: $uri})",
uri=t.s.value
)
if t.o.is_uri:
# Create new o node with given uri, if not exists
result = tx.run(
"MERGE (n:Node {uri: $uri})",
uri=t.o.value
)
result = tx.run(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Node {uri: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=t.s.value, dest=t.o.value, uri=t.p.value,
)
else:
# Create new o literal with given uri, if not exists
result = tx.run(
"MERGE (n:Literal {value: $value})",
value=t.o.value
)
result = tx.run(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Literal {value: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=t.s.value, dest=t.o.value, uri=t.p.value,
)
def handle(self, msg):
v = msg.value()
for t in v.triples:
# self.create_node(t.s.value)
# if t.o.is_uri:
# self.create_node(t.o.value)
# self.relate_node(t.s.value, t.p.value, t.o.value)
# else:
# self.create_literal(t.o.value)
# self.relate_literal(t.s.value, t.p.value, t.o.value)
with self.io.session(database=self.db) as session:
session.execute_write(self.create_triple, t)
@staticmethod
def add_args(parser):
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
parser.add_argument(
'-g', '--graph_host',
default=default_graph_host,
help=f'Graph host (default: {default_graph_host})'
)
parser.add_argument(
'--username',
default=default_username,
help=f'Memgraph username (default: {default_username})'
)
parser.add_argument(
'--password',
default=default_password,
help=f'Memgraph password (default: {default_password})'
)
parser.add_argument(
'--database',
default=default_database,
help=f'Memgraph database (default: {default_database})'
)
def run():
Processor.start(module, __doc__)