diff --git a/templates/components/memgraph.jsonnet b/templates/components/memgraph.jsonnet index 5ec0a76e..609da3a2 100644 --- a/templates/components/memgraph.jsonnet +++ b/templates/components/memgraph.jsonnet @@ -16,7 +16,7 @@ memgraph + { engine.container("store-triples") .with_image(images.trustgraph) .with_command([ - "triples-write-neo4j", + "triples-write-memgraph", "-p", url.pulsar, "-g", @@ -50,7 +50,7 @@ memgraph + { engine.container("query-triples") .with_image(images.trustgraph) .with_command([ - "triples-query-neo4j", + "triples-query-memgraph", "-p", url.pulsar, "-g", diff --git a/templates/stores/memgraph.jsonnet b/templates/stores/memgraph.jsonnet index 8f8b6216..75faf5f0 100644 --- a/templates/stores/memgraph.jsonnet +++ b/templates/stores/memgraph.jsonnet @@ -10,6 +10,9 @@ local images = import "values/images.jsonnet"; local container = engine.container("memgraph") .with_image(images.memgraph_mage) + .with_environment({ + MEMGRAPH: "--storage-properties-on-edges=true --storage-enable-edges-metadata=true" + }) .with_limits("1.0", "1000M") .with_reservations("0.5", "1000M") .with_port(7474, 7474, "api") diff --git a/trustgraph-flow/scripts/triples-query-memgraph b/trustgraph-flow/scripts/triples-query-memgraph new file mode 100755 index 00000000..443929e4 --- /dev/null +++ b/trustgraph-flow/scripts/triples-query-memgraph @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.query.triples.memgraph import run + +run() + diff --git a/trustgraph-flow/scripts/triples-write-memgraph b/trustgraph-flow/scripts/triples-write-memgraph new file mode 100755 index 00000000..3d94a576 --- /dev/null +++ b/trustgraph-flow/scripts/triples-write-memgraph @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.triples.memgraph import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index e6c732a3..c53f96e7 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -103,8 +103,10 @@ setuptools.setup( "scripts/text-completion-openai", "scripts/triples-query-cassandra", "scripts/triples-query-neo4j", + "scripts/triples-query-memgraph", "scripts/triples-write-cassandra", "scripts/triples-write-neo4j", + "scripts/triples-write-memgraph", "scripts/wikipedia-lookup", ] ) diff --git a/trustgraph-flow/trustgraph/query/triples/memgraph/__init__.py b/trustgraph-flow/trustgraph/query/triples/memgraph/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/memgraph/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/query/triples/memgraph/__main__.py b/trustgraph-flow/trustgraph/query/triples/memgraph/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/memgraph/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/query/triples/memgraph/service.py b/trustgraph-flow/trustgraph/query/triples/memgraph/service.py new file mode 100755 index 00000000..5144f781 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/memgraph/service.py @@ -0,0 +1,357 @@ + +""" +Triples query service for memgraph. +Input is a (s, p, o) triple, some values may be null. Output is a list of +triples. +""" + +from neo4j import GraphDatabase + +from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error +from .... schema import Value, Triple +from .... schema import triples_request_queue +from .... schema import triples_response_queue +from .... base import ConsumerProducer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_request_queue +default_output_queue = triples_response_queue +default_subscriber = module + +default_graph_host = 'bolt://memgraph:7687' +default_username = 'memgraph' +default_password = 'password' +default_database = 'memgraph' + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_host = params.get("graph_host", default_graph_host) + username = params.get("username", default_username) + password = params.get("password", default_password) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TriplesQueryRequest, + "output_schema": TriplesQueryResponse, + "graph_host": graph_host, + } + ) + + self.db = database + + self.io = GraphDatabase.driver(graph_host, auth=(username, password)) + + def create_value(self, ent): + + if ent.startswith("http://") or ent.startswith("https://"): + return Value(value=ent, is_uri=True) + else: + return Value(value=ent, is_uri=False) + + def handle(self, msg): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + triples = [] + + if v.s is not None: + if v.p is not None: + if v.o is not None: + + # SPO + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, value=v.o.value, + database_=self.db, + ) + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, uri=v.o.value, + database_=self.db, + ) + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + else: + + # SP + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) " + "RETURN dest.value as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) " + "RETURN dest.uri as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # SO + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN rel.uri as rel", + src=v.s.value, value=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN rel.uri as rel", + src=v.s.value, uri=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + else: + + # S + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) " + "RETURN rel.uri as rel, dest.value as dest", + src=v.s.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) " + "RETURN rel.uri as rel, dest.uri as dest", + src=v.s.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + + else: + + if v.p is not None: + + if v.o is not None: + + # PO + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) " + "RETURN src.uri as src", + uri=v.p.value, value=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src", + uri=v.p.value, dest=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + else: + + # P + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) " + "RETURN src.uri as src, dest.value as dest", + uri=v.p.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) " + "RETURN src.uri as src, dest.uri as dest", + uri=v.p.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # O + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN src.uri as src, rel.uri as rel", + value=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src, rel.uri as rel", + uri=v.o.value, + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + else: + + # * + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal) " + "RETURN src.uri as src, rel.uri as rel, dest.value as dest", + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + records, summary, keys = self.io.execute_query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node) " + "RETURN src.uri as src, rel.uri as rel, dest.uri as dest", + database_=self.db, + ) + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + triples = [ + Triple( + s=self.create_value(t[0]), + p=self.create_value(t[1]), + o=self.create_value(t[2]) + ) + for t in triples + ] + + print("Send response...", flush=True) + r = TriplesQueryResponse(triples=triples, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = TriplesQueryResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + response=None, + ) + + self.producer.send(r, properties={"id": id}) + + self.consumer.acknowledge(msg) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-g', '--graph-host', + default=default_graph_host, + help=f'Graph host (default: {default_graph_host})' + ) + + parser.add_argument( + '--username', + default=default_username, + help=f'Memgraph username (default: {default_username})' + ) + + parser.add_argument( + '--password', + default=default_password, + help=f'Memgraph password (default: {default_password})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'Memgraph database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/query/triples/neo4j/service.py b/trustgraph-flow/trustgraph/query/triples/neo4j/service.py index 2caa0193..d60bc4f4 100755 --- a/trustgraph-flow/trustgraph/query/triples/neo4j/service.py +++ b/trustgraph-flow/trustgraph/query/triples/neo4j/service.py @@ -1,7 +1,8 @@ """ -Triples query service. Input is a (s, p, o) triple, some values may be -null. Output is a list of triples. +Triples query service for neo4j. +Input is a (s, p, o) triple, some values may be null. Output is a list of +triples. """ from neo4j import GraphDatabase diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/__init__.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/__main__.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py new file mode 100755 index 00000000..17e8c67e --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -0,0 +1,252 @@ + +""" +Graph writer. Input is graph edge. Writes edges to Cassandra graph. +""" + +import pulsar +import base64 +import os +import argparse +import time + +from neo4j import GraphDatabase + +from .... schema import Triples +from .... schema import triples_store_queue +from .... log_level import LogLevel +from .... base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_store_queue +default_subscriber = module + +default_graph_host = 'bolt://memgraph:7687' +default_username = 'memgraph' +default_password = 'password' +default_database = 'memgraph' + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_host = params.get("graph_host", default_graph_host) + username = params.get("username", default_username) + password = params.get("password", default_password) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": Triples, + "graph_host": graph_host, + } + ) + + self.db = database + + self.io = GraphDatabase.driver(graph_host, auth=(username, password)) + + with self.io.session(database=self.db) as session: + self.create_indexes(session) + + def create_indexes(self, session): + + print("Create indexes...", flush=True) + + try: + session.run( + "CREATE INDEX ON :Node", + ) + except Exception as e: + print(e, flush=True) + # Maybe index already exists + print("Index create failure ignored", flush=True) + + try: + session.run( + "CREATE INDEX ON :Node(uri)" + ) + except Exception as e: + print(e, flush=True) + # Maybe index already exists + print("Index create failure ignored", flush=True) + + try: + session.run( + "CREATE INDEX ON :Literal", + ) + except Exception as e: + print(e, flush=True) + # Maybe index already exists + print("Index create failure ignored", flush=True) + + try: + session.run( + "CREATE INDEX ON :Literal(value)" + ) + except Exception as e: + print(e, flush=True) + # Maybe index already exists + print("Index create failure ignored", flush=True) + + print("Index creation done", flush=True) + + def create_node(self, uri): + + print("Create node", uri) + + summary = self.io.execute_query( + "MERGE (n:Node {uri: $uri})", + uri=uri, + database_=self.db, + ).summary + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=summary.counters.nodes_created, + time=summary.result_available_after + )) + + def create_literal(self, value): + + print("Create literal", value) + + summary = self.io.execute_query( + "MERGE (n:Literal {value: $value})", + value=value, + database_=self.db, + ).summary + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=summary.counters.nodes_created, + time=summary.result_available_after + )) + + def relate_node(self, src, uri, dest): + + print("Create node rel", src, uri, dest) + + summary = self.io.execute_query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Node {uri: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ).summary + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=summary.counters.nodes_created, + time=summary.result_available_after + )) + + def relate_literal(self, src, uri, dest): + + print("Create literal rel", src, uri, dest) + + summary = self.io.execute_query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Literal {value: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ).summary + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=summary.counters.nodes_created, + time=summary.result_available_after + )) + + def create_triple(self, tx, t): + + # Create new s node with given uri, if not exists + result = tx.run( + "MERGE (n:Node {uri: $uri})", + uri=t.s.value + ) + + if t.o.is_uri: + + # Create new o node with given uri, if not exists + result = tx.run( + "MERGE (n:Node {uri: $uri})", + uri=t.o.value + ) + + result = tx.run( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Node {uri: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=t.s.value, dest=t.o.value, uri=t.p.value, + ) + + else: + + # Create new o literal with given uri, if not exists + result = tx.run( + "MERGE (n:Literal {value: $value})", + value=t.o.value + ) + + result = tx.run( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Literal {value: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=t.s.value, dest=t.o.value, uri=t.p.value, + ) + + def handle(self, msg): + + v = msg.value() + + for t in v.triples: + + # self.create_node(t.s.value) + + # if t.o.is_uri: + # self.create_node(t.o.value) + # self.relate_node(t.s.value, t.p.value, t.o.value) + # else: + # self.create_literal(t.o.value) + # self.relate_literal(t.s.value, t.p.value, t.o.value) + + with self.io.session(database=self.db) as session: + session.execute_write(self.create_triple, t) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-g', '--graph_host', + default=default_graph_host, + help=f'Graph host (default: {default_graph_host})' + ) + + parser.add_argument( + '--username', + default=default_username, + help=f'Memgraph username (default: {default_username})' + ) + + parser.add_argument( + '--password', + default=default_password, + help=f'Memgraph password (default: {default_password})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'Memgraph database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) +