knowledge service (#367)

* Write knowledge core elements to Cassandra

* Store service works, building management service

* kg-manager
This commit is contained in:
cybermaggedon 2025-05-06 23:44:10 +01:00 committed by GitHub
parent d0da122bed
commit 807c19fd22
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1196 additions and 243 deletions

39
test-api/test-knowledge-fetch Executable file
View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import requests
import json
import sys
import base64
import time
url = "http://localhost:8088/api/v1/"
############################################################################
input = {
"operation": "fetch-kg-core",
"id": "https://trustgraph.ai/doc/intelligence-and-state",
"user": "trustgraph",
}
resp = requests.post(
f"{url}knowledge",
json=input,
)
print(resp.text)
resp = resp.json()
print(resp)
if "error" in resp:
print(f"Error: {resp['error']}")
sys.exit(1)
# print(resp["response"])
print(resp)
sys.exit(0)
############################################################################

50
test-api/test-knowledge-fetch2 Executable file
View file

@ -0,0 +1,50 @@
#!/usr/bin/env python3
import requests
import asyncio
import json
import sys
import base64
import time
from websockets.asyncio.client import connect
url = "ws://localhost:8088/api/v1/socket"
############################################################################
async def run():
async with connect(url) as ws:
req = {
"id": "aa11",
"service": "knowledge",
"request": {
"operation": "fetch-kg-core",
"user": "trustgraph",
"id": "https://trustgraph.ai/doc/intelligence-and-state"
}
}
await ws.send(json.dumps(req))
while True:
msg = await ws.recv()
obj = json.loads(msg)
print(obj)
if "error" in obj:
print(f"Error: {obj['error']}")
break
if "response" not in obj: continue
if "eos" in obj["response"]:
if obj["response"]["eos"]:
break
############################################################################
asyncio.run(run())

38
test-api/test-knowledge-list Executable file
View file

@ -0,0 +1,38 @@
#!/usr/bin/env python3
import requests
import json
import sys
import base64
import time
url = "http://localhost:8088/api/v1/"
############################################################################
input = {
"operation": "list-kg-cores",
"user": "trustgraph",
}
resp = requests.post(
f"{url}knowledge",
json=input,
)
print(resp.text)
resp = resp.json()
print(resp)
if "error" in resp:
print(f"Error: {resp['error']}")
sys.exit(1)
# print(resp["response"])
print(resp)
sys.exit(0)
############################################################################

View file

@ -13,4 +13,5 @@ from . lookup import *
from . library import *
from . config import *
from . flows import *
from . knowledge import *

View file

@ -0,0 +1,49 @@
from pulsar.schema import Record, Bytes, String, Array, Long, Boolean
from . types import Triple
from . topic import topic
from . types import Error
from . metadata import Metadata
from . documents import Document, TextDocument
from . graph import Triples, GraphEmbeddings
# fetch-kg-core
# -> (???)
# <- ()
# <- (error)
# delete-kg-core
# -> (???)
# <- ()
# <- (error)
# list-kg-cores
# -> (user)
# <- ()
# <- (error)
class KnowledgeRequest(Record):
# fetch-kg-core, delete-kg-core, list-kg-cores
operation = String()
# list-kg-cores, delete-kg-core
user = String()
# fetch-kg-core, list-kg-cores, delete-kg-core
id = String()
class KnowledgeResponse(Record):
error = Error()
ids = Array(String())
eos = Boolean() # Indicates end of knowledge core stream
triples = Triples()
graph_embeddings = GraphEmbeddings()
knowledge_request_queue = topic(
'knowledge', kind='non-persistent', namespace='request'
)
knowledge_response_queue = topic(
'knowledge', kind='non-persistent', namespace='response',
)

View file

@ -51,17 +51,6 @@ from . documents import Document, TextDocument
# <- (processing_metadata[])
# <- (error)
# OLD:
# add(Metadata, Bytes) : error?
# copy(id, user, collection)
# move(id, user, collection)
# delete(id)
# get(id) : Bytes
# reindex(id)
# list(user, collection) : id[]
# info(id[]) : DocumentInfo[]
# search(<key,op,value>[]) : id[]
class DocumentMetadata(Record):
id = String()
time = Long()

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from trustgraph.cores import run
run()

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from trustgraph.storage.knowledge import run
run()

View file

@ -95,6 +95,8 @@ setuptools.setup(
"scripts/kg-extract-definitions",
"scripts/kg-extract-relationships",
"scripts/kg-extract-topics",
"scripts/kg-store",
"scripts/kg-manager",
"scripts/librarian",
"scripts/metering",
"scripts/object-extract-row",

View file

@ -0,0 +1,3 @@
from . service import run

View file

@ -0,0 +1,5 @@
from . service import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,104 @@
from .. schema import KnowledgeResponse, Error
from .. knowledge import hash
from .. exceptions import RequestError
from .. tables.knowledge import KnowledgeTableStore
import base64
import uuid
class KnowledgeManager:
def __init__(
self, cassandra_host, cassandra_user, cassandra_password,
keyspace,
):
self.table_store = KnowledgeTableStore(
cassandra_host, cassandra_user, cassandra_password, keyspace
)
async def delete_kg_core(self, request, respond):
print("Deleting core...", flush=True)
await self.table_store.delete_kg_core(
request.user, request.id
)
await respond(
KnowledgeResponse(
error = None,
ids = None,
eos = False,
triples = None,
graph_embeddings = None,
)
)
async def fetch_kg_core(self, request, respond):
print("Fetch core...", flush=True)
async def publish_triples(t):
await respond(
KnowledgeResponse(
error = None,
ids = None,
eos = False,
triples = t,
graph_embeddings = None,
)
)
# Remove doc table row
await self.table_store.get_triples(
request.user,
request.id,
publish_triples,
)
async def publish_ge(g):
await respond(
KnowledgeResponse(
error = None,
ids = None,
eos = False,
triples = None,
graph_embeddings = g,
)
)
# Remove doc table row
await self.table_store.get_graph_embeddings(
request.user,
request.id,
publish_ge,
)
print("Fetch complete", flush=True)
await respond(
KnowledgeResponse(
error = None,
ids = None,
eos = True,
triples = None,
graph_embeddings = None,
)
)
async def list_kg_cores(self, request, respond):
ids = await self.table_store.list_kg_cores(request.user)
await respond(
KnowledgeResponse(
error = None,
ids = ids,
eos = False,
triples = None,
graph_embeddings = None
)
)

View file

@ -0,0 +1,228 @@
"""
Knowledge core service, manages cores and exports them
"""
from functools import partial
import asyncio
import base64
import json
from .. base import AsyncProcessor, Consumer, Producer, Publisher, Subscriber
from .. base import ConsumerMetrics, ProducerMetrics
from .. schema import KnowledgeRequest, KnowledgeResponse, Error
from .. schema import knowledge_request_queue, knowledge_response_queue
from .. schema import Document, Metadata
from .. schema import TextDocument, Metadata
from .. exceptions import RequestError
from . knowledge import KnowledgeManager
default_ident = "knowledge"
default_knowledge_request_queue = knowledge_request_queue
default_knowledge_response_queue = knowledge_response_queue
default_cassandra_host = "cassandra"
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "knowledge"
class Processor(AsyncProcessor):
def __init__(self, **params):
id = params.get("id")
knowledge_request_queue = params.get(
"knowledge_request_queue", default_knowledge_request_queue
)
knowledge_response_queue = params.get(
"knowledge_response_queue", default_knowledge_response_queue
)
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_password = params.get("cassandra_password")
super(Processor, self).__init__(
**params | {
"knowledge_request_queue": knowledge_request_queue,
"knowledge_response_queue": knowledge_response_queue,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
}
)
knowledge_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "knowledge-request"
)
knowledge_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "knowledge-response"
)
self.knowledge_request_consumer = Consumer(
taskgroup = self.taskgroup,
client = self.pulsar_client,
flow = None,
topic = knowledge_request_queue,
subscriber = id,
schema = KnowledgeRequest,
handler = self.on_knowledge_request,
metrics = knowledge_request_metrics,
)
self.knowledge_response_producer = Producer(
client = self.pulsar_client,
topic = knowledge_response_queue,
schema = KnowledgeResponse,
metrics = knowledge_response_metrics,
)
self.knowledge = KnowledgeManager(
cassandra_host = cassandra_host.split(","),
cassandra_user = cassandra_user,
cassandra_password = cassandra_password,
keyspace = keyspace,
)
self.register_config_handler(self.on_knowledge_config)
self.flows = {}
print("Initialised.", flush=True)
async def start(self):
await super(Processor, self).start()
await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start()
async def on_knowledge_config(self, config, version):
print("config version", version)
if "flows" in config:
self.flows = {
k: json.loads(v)
for k, v in config["flows"].items()
}
print(self.flows)
async def process_request(self, v, id):
if v.operation is None:
raise RequestError("Null operation")
print("request", v.operation)
impls = {
"list-kg-cores": self.knowledge.list_kg_cores,
"fetch-kg-core": self.knowledge.fetch_kg_core,
"delete-kg-core": self.knowledge.delete_kg_core,
}
if v.operation not in impls:
raise RequestError(f"Invalid operation: {v.operation}")
async def respond(x):
await self.knowledge_response_producer.send(
x, { "id": id }
)
return await impls[v.operation](v, respond)
async def on_knowledge_request(self, msg, consumer, flow):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling input {id}...", flush=True)
try:
await self.process_request(v, id)
# await self.knowledge_response_producer.send(
# resp, properties={"id": id}
# )
return
except RequestError as e:
resp = KnowledgeResponse(
error = Error(
type = "request-error",
message = str(e),
)
)
await self.knowledge_response_producer.send(
resp, properties={"id": id}
)
return
except Exception as e:
resp = KnowledgeResponse(
error = Error(
type = "unexpected-error",
message = str(e),
)
)
await self.knowledge_response_producer.send(
resp, properties={"id": id}
)
return
print("Done.", flush=True)
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
parser.add_argument(
'--knowledge-request-queue',
default=default_knowledge_request_queue,
help=f'Config request queue (default: {default_knowledge_request_queue})'
)
parser.add_argument(
'--knowledge-response-queue',
default=default_knowledge_response_queue,
help=f'Config response queue {default_knowledge_response_queue}',
)
parser.add_argument(
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1,68 @@
import base64
from ... schema import KnowledgeRequest, KnowledgeResponse
from ... schema import knowledge_request_queue
from ... schema import knowledge_response_queue
from . requestor import ServiceRequestor
from . serialize import serialize_graph_embeddings
from . serialize import serialize_triples
from . serialize import to_document_metadata, to_processing_metadata
class KnowledgeRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
super(KnowledgeRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=knowledge_request_queue,
response_queue=knowledge_response_queue,
request_schema=KnowledgeRequest,
response_schema=KnowledgeResponse,
timeout=timeout,
)
def to_request(self, body):
return KnowledgeRequest(
operation = body.get("operation", None),
user = body.get("user", None),
id = body.get("id", None),
)
def from_response(self, message):
print("Processing message")
# Response to list,
if message.ids is not None:
print("-> IDS")
return {
"ids": message.ids
}, True
if message.triples:
print("-> triples")
return {
"triples": serialize_triples(message.triples)
}, False
if message.graph_embeddings:
print("-> ge")
return {
"graph-embeddings": serialize_graph_embeddings(
message.graph_embeddings
)
}, False
if message.eos is True:
print("-> eos")
return {
"eos": True
}, True
# Empty case, return from successful delete.
return {}, True

View file

@ -5,6 +5,7 @@ import uuid
from . config import ConfigRequestor
from . flow import FlowRequestor
from . librarian import LibrarianRequestor
from . knowledge import KnowledgeRequestor
from . embeddings import EmbeddingsRequestor
from . agent import AgentRequestor
@ -44,6 +45,7 @@ global_dispatchers = {
"config": ConfigRequestor,
"flow": FlowRequestor,
"librarian": LibrarianRequestor,
"knowledge": KnowledgeRequestor,
}
sender_dispatchers = {

View file

@ -82,7 +82,7 @@ class ServiceRequestor:
resp, fin = self.from_response(resp)
print(resp, fin)
print(resp, fin, flush=True)
if responder:
await responder(resp, fin)

View file

@ -2,7 +2,7 @@
from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple
from .. knowledge import hash
from .. exceptions import RequestError
from . table_store import TableStore
from .. tables.library import LibraryTableStore
from . blob_store import BlobStore
import base64
@ -21,7 +21,7 @@ class Librarian:
minio_host, minio_access_key, minio_secret_key, bucket_name
)
self.table_store = TableStore(
self.table_store = LibraryTableStore(
cassandra_host, cassandra_user, cassandra_password, keyspace
)

View file

@ -210,7 +210,7 @@ class Processor(AsyncProcessor):
if v.operation is None:
raise RequestError("Null operation")
print("requets", v.operation)
print("request", v.operation)
impls = {
"add-document": self.librarian.add_document,

View file

@ -0,0 +1,3 @@
from . store import run

View file

@ -0,0 +1,5 @@
from . store import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,78 @@
"""
Stores knowledge-cores in Cassandra
"""
import json
import urllib.parse
from ... schema import Triples, GraphEmbeddings
from ... base import FlowProcessor, ConsumerSpec
from ... tables.knowledge import KnowledgeTableStore
default_ident = "kg-store"
default_cassandra_host = "cassandra"
keyspace = "knowledge"
class Processor(FlowProcessor):
def __init__(self, **params):
id = params.get("id")
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_password = params.get("cassandra_password")
super(Processor, self).__init__(
**params | {
"id": id,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
}
)
self.register_specification(
ConsumerSpec(
name = "triples-input",
schema = Triples,
handler = self.on_triples
)
)
self.register_specification(
ConsumerSpec(
name = "graph-embeddings-input",
schema = GraphEmbeddings,
handler = self.on_graph_embeddings
)
)
self.table_store = KnowledgeTableStore(
cassandra_host = cassandra_host.split(","),
cassandra_user = cassandra_user,
cassandra_password = cassandra_password,
keyspace = keyspace,
)
async def on_triples(self, msg, consumer, flow):
v = msg.value()
await self.table_store.add_triples(v)
async def on_graph_embeddings(self, msg, consumer, flow):
v = msg.value()
await self.table_store.add_graph_embeddings(v)
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1,504 @@
from .. schema import KnowledgeResponse, Triple, Triples, EntityEmbeddings
from .. schema import Metadata, Value, GraphEmbeddings
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLSv1_2
import uuid
import time
import asyncio
class KnowledgeTableStore:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password, keyspace,
):
self.keyspace = keyspace
print("Connecting to Cassandra...", flush=True)
if cassandra_user and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_user, password=cassandra_password
)
self.cluster = Cluster(
cassandra_host,
auth_provider=auth_provider,
ssl_context=ssl_context
)
else:
self.cluster = Cluster(cassandra_host)
self.cassandra = self.cluster.connect()
print("Connected.", flush=True)
self.ensure_cassandra_schema()
self.prepare_statements()
def ensure_cassandra_schema(self):
print("Ensure Cassandra schema...", flush=True)
print("Keyspace...", flush=True)
# FIXME: Replication factor should be configurable
self.cassandra.execute(f"""
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
'replication_factor' : 1
}};
""");
self.cassandra.set_keyspace(self.keyspace)
print("triples table...", flush=True)
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS triples (
user text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
triples list<tuple<
text, boolean, text, boolean, text, boolean
>>,
PRIMARY KEY ((user, document_id), id)
);
""");
print("graph_embeddings table...", flush=True)
self.cassandra.execute("""
create table if not exists graph_embeddings (
user text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
entity_embeddings list<
tuple<
tuple<text, boolean>,
list<list<double>>
>
>,
PRIMARY KEY ((user, document_id), id)
);
""");
self.cassandra.execute("""
CREATE INDEX IF NOT EXISTS graph_embeddings_user ON
graph_embeddings ( user );
""");
print("document_embeddings table...", flush=True)
self.cassandra.execute("""
create table if not exists document_embeddings (
user text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
chunks list<
tuple<
blob,
list<list<double>>
>
>,
PRIMARY KEY ((user, document_id), id)
);
""");
self.cassandra.execute("""
CREATE INDEX IF NOT EXISTS document_embeddings_user ON
document_embeddings ( user );
""");
print("Cassandra schema OK.", flush=True)
def prepare_statements(self):
self.insert_triples_stmt = self.cassandra.prepare("""
INSERT INTO triples
(
id, user, document_id,
time, metadata, triples
)
VALUES (?, ?, ?, ?, ?, ?)
""")
self.insert_graph_embeddings_stmt = self.cassandra.prepare("""
INSERT INTO graph_embeddings
(
id, user, document_id, time, metadata, entity_embeddings
)
VALUES (?, ?, ?, ?, ?, ?)
""")
self.insert_document_embeddings_stmt = self.cassandra.prepare("""
INSERT INTO document_embeddings
(
id, user, document_id, time, metadata, chunks
)
VALUES (?, ?, ?, ?, ?, ?)
""")
self.list_cores_stmt = self.cassandra.prepare("""
SELECT DISTINCT user, document_id FROM graph_embeddings
WHERE user = ?
""")
self.get_triples_stmt = self.cassandra.prepare("""
SELECT id, time, metadata, triples
FROM triples
WHERE user = ? AND document_id = ?
""")
self.get_graph_embeddings_stmt = self.cassandra.prepare("""
SELECT id, time, metadata, entity_embeddings
FROM graph_embeddings
WHERE user = ? AND document_id = ?
""")
self.get_document_embeddings_stmt = self.cassandra.prepare("""
SELECT id, time, metadata, chunks
FROM document_embeddings
WHERE user = ? AND document_id = ?
""")
self.delete_triples_stmt = self.cassandra.prepare("""
DELETE FROM triples
WHERE user = ? AND document_id = ?
""")
self.delete_graph_embeddings_stmt = self.cassandra.prepare("""
DELETE FROM graph_embeddings
WHERE user = ? AND document_id = ?
""")
async def add_triples(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
triples = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.triples
]
while True:
try:
resp = self.cassandra.execute(
self.insert_triples_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, triples,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def add_graph_embeddings(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
entities = [
(
(v.entity.value, v.entity.is_uri),
v.vectors
)
for v in m.entities
]
while True:
try:
resp = self.cassandra.execute(
self.insert_graph_embeddings_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, entities,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def add_document_embeddings(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
chunks = [
(
v.chunk,
v.vectors,
)
for v in m.chunks
]
while True:
try:
resp = self.cassandra.execute(
self.insert_document_embeddings_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.id, when,
metadata, chunks,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def list_kg_cores(self, user):
print("List kg cores...")
while True:
try:
resp = self.cassandra.execute(
self.list_cores_stmt,
(user,)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
lst = [
row[1]
for row in resp
]
print("Done")
return lst
async def delete_kg_core(self, user, document_id):
print("Delete kg cores...")
while True:
try:
resp = self.cassandra.execute(
self.delete_triples_stmt,
(user, document_id)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
while True:
try:
resp = self.cassandra.execute(
self.delete_graph_embeddings_stmt,
(user, document_id)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def get_triples(self, user, document_id, receiver):
print("Get triples...")
while True:
try:
resp = self.cassandra.execute(
self.get_triples_stmt,
(user, document_id)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
for row in resp:
if row[2]:
metadata = [
Triple(
s = Value(value = elt[0], is_uri = elt[1]),
p = Value(value = elt[2], is_uri = elt[3]),
o = Value(value = elt[4], is_uri = elt[5]),
)
for elt in row[2]
]
else:
metadata = []
triples = [
Triple(
s = Value(value = elt[0], is_uri = elt[1]),
p = Value(value = elt[2], is_uri = elt[3]),
o = Value(value = elt[4], is_uri = elt[5]),
)
for elt in row[3]
]
await receiver(
Triples(
metadata = Metadata(
id = document_id,
user = user,
collection = "default", # FIXME: What to put here?
metadata = metadata,
),
triples = triples
)
)
print("Done")
async def get_graph_embeddings(self, user, document_id, receiver):
print("Get GE...")
while True:
try:
resp = self.cassandra.execute(
self.get_graph_embeddings_stmt,
(user, document_id)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
for row in resp:
if row[2]:
metadata = [
Triple(
s = Value(value = elt[0], is_uri = elt[1]),
p = Value(value = elt[2], is_uri = elt[3]),
o = Value(value = elt[4], is_uri = elt[5]),
)
for elt in row[2]
]
else:
metadata = []
entities = [
EntityEmbeddings(
entity = Value(value = ent[0][0], is_uri = ent[0][1]),
vectors = ent[1]
)
for ent in row[3]
]
await receiver(
GraphEmbeddings(
metadata = Metadata(
id = document_id,
user = user,
collection = "default", # FIXME: What to put here?
metadata = metadata,
),
entities = entities
)
)
print("Done")

View file

@ -14,7 +14,7 @@ import uuid
import time
import asyncio
class TableStore:
class LibraryTableStore:
def __init__(
self,
@ -104,71 +104,6 @@ class TableStore:
);
""");
return
print("triples table...", flush=True)
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS triples (
user text,
collection text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
triples list<tuple<
text, boolean, text, boolean, text, boolean
>>,
PRIMARY KEY (user, collection, document_id, id)
);
""");
print("graph_embeddings table...", flush=True)
self.cassandra.execute("""
create table if not exists graph_embeddings (
user text,
collection text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
entity_embeddings list<
tuple<
tuple<text, boolean>,
list<list<double>>
>
>,
PRIMARY KEY (user, collection, document_id, id)
);
""");
print("document_embeddings table...", flush=True)
self.cassandra.execute("""
create table if not exists document_embeddings (
user text,
collection text,
document_id text,
id uuid,
time timestamp,
metadata list<tuple<
text, boolean, text, boolean, text, boolean
>>,
chunks list<
tuple<
blob,
list<list<double>>
>
>,
PRIMARY KEY (user, collection, document_id, id)
);
""");
print("Cassandra schema OK.", flush=True)
def prepare_statements(self):
@ -252,35 +187,6 @@ class TableStore:
WHERE user = ?
""")
return
self.insert_triples_stmt = self.cassandra.prepare("""
INSERT INTO triples
(
id, user, collection, document_id, time,
metadata, triples
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")
self.insert_graph_embeddings_stmt = self.cassandra.prepare("""
INSERT INTO graph_embeddings
(
id, user, collection, document_id, time,
metadata, entity_embeddings
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")
self.insert_document_embeddings_stmt = self.cassandra.prepare("""
INSERT INTO document_embeddings
(
id, user, collection, document_id, time,
metadata, chunks
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")
async def document_exists(self, user, id):
resp = self.cassandra.execute(
@ -391,50 +297,6 @@ class TableStore:
print("Delete complete", flush=True)
async def add_triples(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
triples = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.triples
]
while True:
try:
resp = self.cassandra.execute(
self.insert_triples_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.collection, m.metadata.id, when,
metadata, triples,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def list_documents(self, user):
print("List documents...")
@ -661,92 +523,3 @@ class TableStore:
return lst
async def add_graph_embeddings(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
entities = [
(
(v.entity.value, v.entity.is_uri),
v.vectors
)
for v in m.entities
]
while True:
try:
resp = self.cassandra.execute(
self.insert_graph_embeddings_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.collection, m.metadata.id, when,
metadata, entities,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def add_document_embeddings(self, m):
when = int(time.time() * 1000)
if m.metadata.metadata:
metadata = [
(
v.s.value, v.s.is_uri, v.p.value, v.p.is_uri,
v.o.value, v.o.is_uri
)
for v in m.metadata.metadata
]
else:
metadata = []
chunks = [
(
v.chunk,
v.vectors,
)
for v in m.chunks
]
while True:
try:
resp = self.cassandra.execute(
self.insert_document_embeddings_stmt,
(
uuid.uuid4(), m.metadata.user,
m.metadata.collection, m.metadata.id, when,
metadata, chunks,
)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)