Add Neo4j support (#9)

- Add triples-write-neo4j and triples-query-neo4j to interact with neo4j
- Add docker-compose-openai-neo4j to demo Neo4j working
This commit is contained in:
cybermaggedon 2024-08-14 09:06:33 +01:00 committed by GitHub
parent 2f72fceaa2
commit d3e213f194
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 1008 additions and 230 deletions

View file

@ -0,0 +1,3 @@
from . service import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . hf import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,328 @@
"""
Triples query service. Input is a (s, p, o) triple, some values may be
null. Output is a list of triples.
"""
from neo4j import GraphDatabase
from .... schema import TriplesQueryRequest, TriplesQueryResponse
from .... schema import Value, Triple
from .... schema import triples_request_queue
from .... schema import triples_response_queue
from .... base import ConsumerProducer
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = triples_request_queue
default_output_queue = triples_response_queue
default_subscriber = module
default_graph_host = 'bolt://neo4j:7687'
default_username = 'neo4j'
default_password = 'password'
class Processor(ConsumerProducer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("passowrd", default_password)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TriplesQueryRequest,
"output_schema": TriplesQueryResponse,
"graph_host": graph_host,
}
)
self.db = "neo4j"
self.io = GraphDatabase.driver(graph_host, auth=(username, password))
def create_value(self, ent):
if ent.startswith("http://") or ent.startswith("https://"):
return Value(value=ent, is_uri=True)
else:
return Value(value=ent, is_uri=False)
def handle(self, msg):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
triples = []
if v.s is not None:
if v.p is not None:
if v.o is not None:
# SPO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, value=v.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) "
"RETURN $src as src",
src=v.s.value, rel=v.p.value, uri=v.o.value,
database_=self.db,
)
for rec in records:
triples.append((v.s.value, v.p.value, v.o.value))
else:
# SP
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) "
"RETURN dest.value as dest",
src=v.s.value, rel=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) "
"RETURN dest.uri as dest",
src=v.s.value, rel=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, v.p.value, data["dest"]))
else:
if v.o is not None:
# SO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN rel.uri as rel",
src=v.s.value, value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN rel.uri as rel",
src=v.s.value, uri=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], v.o.value))
else:
# S
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) "
"RETURN rel.uri as rel, dest.value as dest",
src=v.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) "
"RETURN rel.uri as rel, dest.uri as dest",
src=v.s.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((v.s.value, data["rel"], data["dest"]))
else:
if v.p is not None:
if v.o is not None:
# PO
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) "
"RETURN src.uri as src",
uri=v.p.value, value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src",
uri=v.p.value, dest=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, v.o.value))
else:
# P
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) "
"RETURN src.uri as src, dest.value as dest",
uri=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) "
"RETURN src.uri as src, dest.uri as dest",
uri=v.p.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], v.p.value, data["dest"]))
else:
if v.o is not None:
# O
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) "
"RETURN src.uri as src, rel.uri as rel",
value=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) "
"RETURN src.uri as src, rel.uri as rel",
uri=v.o.value,
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], v.o.value))
else:
# *
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Literal) "
"RETURN src.uri as src, rel.uri as rel, dest.value as dest",
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], data["dest"]))
records, summary, keys = self.io.execute_query(
"MATCH (src:Node)-[rel:Rel]->(dest:Node) "
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest",
database_=self.db,
)
for rec in records:
data = rec.data()
triples.append((data["src"], data["rel"], data["dest"]))
triples = [
Triple(
s=self.create_value(t[0]),
p=self.create_value(t[1]),
o=self.create_value(t[2])
)
for t in triples
]
print("Send response...", flush=True)
r = TriplesQueryResponse(triples=triples)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
@staticmethod
def add_args(parser):
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument(
'-g', '--graph-host',
default=default_graph_host,
help=f'Graph host (default: {default_graph_host})'
)
parser.add_argument(
'--username',
default=default_username,
help=f'Neo4j username (default: {default_username})'
)
parser.add_argument(
'--password',
default=default_password,
help=f'Neo4j password (default: {default_password})'
)
def run():
Processor.start(module, __doc__)

View file

@ -0,0 +1,3 @@
from . write import *

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python3
from . write import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,156 @@
"""
Graph writer. Input is graph edge. Writes edges to Cassandra graph.
"""
import pulsar
import base64
import os
import argparse
import time
from neo4j import GraphDatabase
from .... schema import Triple
from .... schema import triples_store_queue
from .... log_level import LogLevel
from .... base import Consumer
module = ".".join(__name__.split(".")[1:-1])
default_input_queue = triples_store_queue
default_subscriber = module
default_graph_host = 'bolt://neo4j:7687'
default_username = 'neo4j'
default_password = 'password'
class Processor(Consumer):
def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
subscriber = params.get("subscriber", default_subscriber)
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("passowrd", default_password)
super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": Triple,
"graph_host": graph_host,
}
)
self.db = "neo4j"
self.io = GraphDatabase.driver(graph_host, auth=(username, password))
def create_node(self, uri):
print("Create node", uri)
summary = self.io.execute_query(
"MERGE (n:Node {uri: $uri})",
uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def create_literal(self, value):
print("Create literal", value)
summary = self.io.execute_query(
"MERGE (n:Literal {value: $value})",
value=value,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def relate_node(self, src, uri, dest):
print("Create node rel", src, uri, dest)
summary = self.io.execute_query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Node {uri: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=src, dest=dest, uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def relate_literal(self, src, uri, dest):
print("Create literal rel", src, uri, dest)
summary = self.io.execute_query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Literal {value: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
src=src, dest=dest, uri=uri,
database_=self.db,
).summary
print("Created {nodes_created} nodes in {time} ms.".format(
nodes_created=summary.counters.nodes_created,
time=summary.result_available_after
))
def handle(self, msg):
v = msg.value()
self.create_node(v.s.value)
if v.o.is_uri:
self.create_node(v.o.value)
self.relate_node(v.s.value, v.p.value, v.o.value)
else:
self.create_literal(v.o.value)
self.relate_literal(v.s.value, v.p.value, v.o.value)
@staticmethod
def add_args(parser):
Consumer.add_args(
parser, default_input_queue, default_subscriber,
)
parser.add_argument(
'-g', '--graph_host',
default=default_graph_host,
help=f'Graph host (default: {default_graph_host})'
)
parser.add_argument(
'--username',
default=default_username,
help=f'Neo4j username (default: {default_username})'
)
parser.add_argument(
'--password',
default=default_password,
help=f'Neo4j password (default: {default_password})'
)
def run():
Processor.start(module, __doc__)