Feature/librarian (#310)

* Add fields to library schema

* Added list function, incomplete

* Librarian list operation
This commit is contained in:
cybermaggedon 2025-03-11 16:52:59 +00:00 committed by GitHub
parent 5575e885e5
commit f1559c5944
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 177 additions and 27 deletions

View file

@ -53,7 +53,10 @@ class LibrarianRequestor(ServiceRequestor):
response["document"] = serialize_document_package(message.document)
if message.info:
response["info"] = serialize_document_info(message.info)
response["info"] = [
serialize_document_info(v)
for v in message.info
]
return response, True

View file

@ -84,6 +84,9 @@ def serialize_document_package(message):
ret = {}
if message.id:
ret["id"] = message.id
if message.metadata:
ret["metadata"] = serialize_subgraph(message.metdata)
@ -108,8 +111,8 @@ def serialize_document_info(message):
ret = {}
if message.metadata:
ret["metadata"] = serialize_subgraph(message.metdata)
if message.id:
ret["id"] = message.id
if message.kind:
ret["kind"] = message.kind
@ -120,25 +123,45 @@ def serialize_document_info(message):
if message.collection:
ret["collection"] = message.collection
if message.title:
ret["title"] = message.title
if message.comments:
ret["comments"] = message.comments
if message.time:
ret["time"] = message.time
if message.metadata:
ret["metadata"] = serialize_subgraph(message.metadata)
return ret
def to_document_package(x):
return DocumentPackage(
metadata = to_subgraph(x["metadata"]),
document = x.get("document", None),
id = x.get("id", None),
kind = x.get("kind", None),
user = x.get("user", None),
collection = x.get("collection", None),
title = x.get("title", None),
comments = x.get("comments", None),
time = x.get("time", None),
document = x.get("document", None),
metadata = to_subgraph(x["metadata"]),
)
def to_document_info(x):
return DocumentInfo(
metadata = to_subgraph(x["metadata"]),
id = x.get("id", None),
kind = x.get("kind", None),
user = x.get("user", None),
collection = x.get("collection", None),
title = x.get("title", None),
comments = x.get("comments", None),
time = x.get("time", None),
metadata = to_subgraph(x["metadata"]),
)
def to_criteria(x):

View file

@ -1,4 +1,4 @@
from .. schema import LibrarianRequest, LibrarianResponse, Error
from .. schema import LibrarianRequest, LibrarianResponse, Error, Triple
from .. knowledge import hash
from .. exceptions import RequestError
from . table_store import TableStore
@ -26,7 +26,7 @@ class Librarian:
self.load_document = load_document
self.load_text = load_text
async def add(self, id, document):
async def add(self, document):
if document.kind not in (
"text/plain", "application/pdf"
@ -41,9 +41,9 @@ class Librarian:
self.table_store.add(object_id, document)
if document.kind == "application/pdf":
await self.load_document(id, document)
await self.load_document(document)
elif document.kind == "text/plain":
await self.load_text(id, document)
await self.load_text(document)
print("Add complete", flush=True)
@ -53,6 +53,20 @@ class Librarian:
info = None,
)
async def list(self, user, collection):
print("list")
info = self.table_store.list(user, collection)
print(">>", info)
return LibrarianResponse(
error = None,
document = None,
info = info,
)
def handle_triples(self, m):
self.table_store.add_triples(m)

View file

@ -223,11 +223,11 @@ class Processor(ConsumerProducer):
self.librarian.handle_document_embeddings(msg)
async def load_document(self, id, document):
async def load_document(self, document):
doc = Document(
metadata = Metadata(
id = id,
id = document.id,
metadata = document.metadata,
user = document.user,
collection = document.collection
@ -237,14 +237,14 @@ class Processor(ConsumerProducer):
self.document_load.send(None, doc)
async def load_text(self, id, document):
async def load_text(self, document):
text = base64.b64decode(document.document)
text = text.decode("utf-8")
doc = TextDocument(
metadata = Metadata(
id = id,
id = document.id,
metadata = document.metadata,
user = document.user,
collection = document.collection
@ -259,19 +259,33 @@ class Processor(ConsumerProducer):
if v.operation is None:
raise RequestError("Null operation")
print("op", v.operation)
if v.operation == "add":
if (
v.id and v.document and v.document.metadata and
v.document and v.document.id and v.document.metadata and
v.document.document and v.document.kind
):
return partial(
self.librarian.add,
id = v.id,
document = v.document,
)
else:
raise RequestError("Invalid call")
if v.operation == "list":
print("list", v)
print(v.user)
if v.user:
return partial(
self.librarian.list,
user = v.user,
collection = v.collection,
)
else:
print("BROK")
raise RequestError("Invalid call")
raise RequestError("Invalid operation: " + v.operation)
async def handle(self, msg):
@ -298,6 +312,7 @@ class Processor(ConsumerProducer):
try:
resp = await func()
print("->", resp)
except RequestError as e:
resp = LibrarianResponse(
error = Error(
@ -318,7 +333,7 @@ class Processor(ConsumerProducer):
await self.send(resp, properties={"id": id})
return
print("Send response...", flush=True)
print("Send response..!.", flush=True)
await self.send(resp, properties={"id": id})

View file

@ -1,4 +1,5 @@
from .. schema import LibrarianRequest, LibrarianResponse, Error
from .. schema import LibrarianRequest, LibrarianResponse
from .. schema import DocumentInfo, Error, Triple, Value
from .. knowledge import hash
from .. exceptions import RequestError
@ -61,7 +62,7 @@ class TableStore:
CREATE TABLE IF NOT EXISTS document (
user text,
collection text,
id uuid,
id text,
time timestamp,
title text,
comments text,
@ -157,6 +158,20 @@ class TableStore:
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""")
self.list_document_stmt = self.cassandra.prepare("""
SELECT
id, kind, user, collection, title, comments, time, metadata
FROM document
WHERE user = ?
""")
self.list_document_by_collection_stmt = self.cassandra.prepare("""
SELECT
id, kind, user, collection, title, comments, time, metadata
FROM document
WHERE user = ? AND collection = ?
""")
self.insert_triples_stmt = self.cassandra.prepare("""
INSERT INTO triples
(
@ -192,10 +207,9 @@ class TableStore:
raise RequestError("Invalid document kind: " + document.kind)
# Create random doc ID
doc_id = uuid.uuid4()
when = int(time.time() * 1000)
print("Adding", object_id, doc_id)
print("Adding", document.id, object_id)
metadata = [
(
@ -205,8 +219,6 @@ class TableStore:
for v in document.metadata
]
# FIXME: doc_id should be the user-supplied ID???
while True:
try:
@ -214,7 +226,7 @@ class TableStore:
resp = self.cassandra.execute(
self.insert_document_stmt,
(
doc_id, document.user, document.collection,
document.id, document.user, document.collection,
document.kind, object_id, when,
document.title, document.comments,
metadata
@ -275,6 +287,73 @@ class TableStore:
print(f"{e}, retry...", flush=True)
time.sleep(1)
def list(self, user, collection=None):
print("LIST")
while True:
print("TRY")
print(self.list_document_stmt)
try:
if collection:
resp = self.cassandra.execute(
self.list_document_by_collection_stmt,
(user, collection)
)
else:
resp = self.cassandra.execute(
self.list_document_stmt,
(user,)
)
break
print("OK")
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
time.sleep(1)
print("OK2")
info = [
DocumentInfo(
id = row[0],
kind = row[1],
user = row[2],
collection = row[3],
title = row[4],
comments = row[5],
time = int(1000 * row[6].timestamp()),
metadata = [
Triple(
s=Value(value=m[0], is_uri=m[1]),
p=Value(value=m[2], is_uri=m[3]),
o=Value(value=m[4], is_uri=m[5])
)
for m in row[7]
],
)
for row in resp
]
print("OK3")
print(info[0])
print(info[0].user)
print(info[0].time)
print(info[0].kind)
print(info[0].collection)
print(info[0].title)
print(info[0].comments)
print(info[0].metadata)
print(info[0].metadata)
return info
def add_graph_embeddings(self, m):
when = int(time.time() * 1000)