Fix/core save api (#172)

* Acknowledge messaages from Pulsar, doh!
* Change API to deliver a boolean e if value is an entity
* Change loaders to use new API
* Changes, entity-aware API is complete
This commit is contained in:
cybermaggedon 2024-11-26 16:46:38 +00:00 committed by GitHub
parent 340d7a224f
commit 887fafcf8c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 104 additions and 70 deletions

View file

@ -4,7 +4,7 @@ import json
import dataclasses import dataclasses
import base64 import base64
from trustgraph.knowledge import hash from trustgraph.knowledge import hash, Uri, Literal
class ProtocolException(Exception): class ProtocolException(Exception):
pass pass
@ -12,14 +12,6 @@ class ProtocolException(Exception):
class ApplicationException(Exception): class ApplicationException(Exception):
pass pass
class Uri(str):
def is_uri(self): return True
def is_literal(self): return False
class Literal(str):
def is_uri(self): return False
def is_literal(self): return True
@dataclasses.dataclass @dataclasses.dataclass
class Triple: class Triple:
s : str s : str
@ -213,9 +205,16 @@ class Api:
"limit": limit "limit": limit
} }
if s: input["s"] = s if not isinstance(s, Uri):
if p: input["p"] = p raise RuntimeError("s must be Uri")
if o: input["o"] = o if not isinstance(p, Uri):
raise RuntimeError("p must be Uri")
if not isinstance(o, Uri) and not isinstance(o, Literal):
raise RuntimeError("o must be Uri or Literal")
if s: input["s"] = { "v": str(s), "e": isinstance(s, Uri), }
if p: input["p"] = { "v": str(p), "e": isinstance(p, Uri), }
if o: input["o"] = { "v": str(o), "e": isinstance(o, Uri), }
url = f"{self.url}triples-query" url = f"{self.url}triples-query"
@ -273,9 +272,9 @@ class Api:
if metadata: if metadata:
metadata.emit( metadata.emit(
lambda t: triples.append({ lambda t: triples.append({
"s": t.s.value, "s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": t.p.value, "p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": t.o.value "o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
}) })
) )
@ -312,9 +311,9 @@ class Api:
if metadata: if metadata:
metadata.emit( metadata.emit(
lambda t: triples.append({ lambda t: triples.append({
"s": t.s.value, "s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": t.p.value, "p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": t.o.value "o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
}) })
) )

View file

@ -1,4 +1,5 @@
from . defs import *
from . identifier import * from . identifier import *
from . publication import * from . publication import *
from . document import * from . document import *

View file

@ -23,3 +23,11 @@ URL = 'https://schema.org/url'
IDENTIFIER = 'https://schema.org/identifier' IDENTIFIER = 'https://schema.org/identifier'
KEYWORD = 'https://schema.org/keywords' KEYWORD = 'https://schema.org/keywords'
class Uri(str):
def is_uri(self): return True
def is_literal(self): return False
class Literal(str):
def is_uri(self): return False
def is_literal(self): return True

View file

@ -1,6 +1,16 @@
from . defs import * from . defs import *
from .. schema import Triple, Value
def Value(value, is_uri):
if is_uri:
return Uri(value)
else:
return Literal(value)
def Triple(s, p, o):
return {
"s": s, "p": p, "o": o,
}
class DigitalDocument: class DigitalDocument:

View file

@ -1,6 +1,16 @@
from . defs import * from . defs import *
from .. schema import Triple, Value
def Value(value, is_uri):
if is_uri:
return Uri(value)
else:
return Literal(value)
def Triple(s, p, o):
return {
"s": s, "p": p, "o": o,
}
class Organization: class Organization:
def __init__(self, id, name=None, description=None): def __init__(self, id, name=None, description=None):

View file

@ -1,6 +1,16 @@
from . defs import * from . defs import *
from .. schema import Triple, Value
def Value(value, is_uri):
if is_uri:
return Uri(value)
else:
return Literal(value)
def Triple(s, p, o):
return {
"s": s, "p": p, "o": o,
}
class PublicationEvent: class PublicationEvent:
def __init__( def __init__(

View file

@ -14,7 +14,7 @@ import time
import uuid import uuid
from trustgraph.schema import Document, document_ingest_queue from trustgraph.schema import Document, document_ingest_queue
from trustgraph.schema import Metadata from trustgraph.schema import Metadata, Triple, Value
from trustgraph.log_level import LogLevel from trustgraph.log_level import LogLevel
from trustgraph.knowledge import hash, to_uri from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
@ -79,7 +79,14 @@ class Loader:
r = Document( r = Document(
metadata=Metadata( metadata=Metadata(
id=id, id=id,
metadata=triples, metadata=[
Triple(
s=Value(value=t["s"]["v"], is_uri=t["s"]["e"]),
p=Value(value=t["p"]["v"], is_uri=t["p"]["e"]),
o=Value(value=t["o"]["v"], is_uri=t["o"]["e"])
)
for t in triples
],
user=self.user, user=self.user,
collection=self.collection, collection=self.collection,
), ),

View file

@ -13,7 +13,7 @@ import time
import uuid import uuid
from trustgraph.schema import TextDocument, text_ingest_queue from trustgraph.schema import TextDocument, text_ingest_queue
from trustgraph.schema import Metadata from trustgraph.schema import Metadata, Triple, Value
from trustgraph.log_level import LogLevel from trustgraph.log_level import LogLevel
from trustgraph.knowledge import hash, to_uri from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
@ -78,7 +78,14 @@ class Loader:
r = TextDocument( r = TextDocument(
metadata=Metadata( metadata=Metadata(
id=id, id=id,
metadata=triples, metadata=[
Triple(
s=Value(value=t["s"]["v"], is_uri=t["s"]["e"]),
p=Value(value=t["p"]["v"], is_uri=t["p"]["e"]),
o=Value(value=t["o"]["v"], is_uri=t["o"]["e"])
)
for t in triples
],
user=self.user, user=self.user,
collection=self.collection, collection=self.collection,
), ),

View file

@ -73,10 +73,7 @@ default_timeout = 600
default_port = 8088 default_port = 8088
def to_value(x): def to_value(x):
if x.startswith("http:") or x.startswith("https:"): return Value(value=x["v"], is_uri=x["e"])
return Value(value=x, is_uri=True)
else:
return Value(value=x, is_uri=False)
def to_subgraph(x): def to_subgraph(x):
return [ return [
@ -156,6 +153,9 @@ class Subscriber:
while True: while True:
msg = await consumer.receive() msg = await consumer.receive()
# Acknowledge successful reception of the message
await consumer.acknowledge(msg)
try: try:
id = msg.properties()["id"] id = msg.properties()["id"]
except: except:
@ -192,43 +192,41 @@ class Subscriber:
if id in self.full: if id in self.full:
del self.full[id] del self.full[id]
def serialize_value(v):
return {
"v": v.value,
"e": v.is_uri,
}
def serialize_triple(t):
return {
"s": serialize_value(t.s),
"p": serialize_value(t.p),
"o": serialize_value(t.o)
}
def serialize_subgraph(sg):
return [
serialize_triple(t)
for t in sg
]
def serialize_triples(message): def serialize_triples(message):
return { return {
"metadata": { "metadata": {
"id": message.metadata.id, "id": message.metadata.id,
"metadata": [ "metadata": serialize_subgraph(message.metadata.metadata),
{
"s": t.s.value,
"p": t.p.value,
"o": t.o.value,
}
for t in message.metadata.metadata
],
"user": message.metadata.user, "user": message.metadata.user,
"collection": message.metadata.collection, "collection": message.metadata.collection,
}, },
"triples": [ "triples": serialize_subgraph(message.triples),
{
"s": t.s.value,
"p": t.p.value,
"o": t.o.value,
}
for t in message.triples
]
} }
def serialize_graph_embeddings(message): def serialize_graph_embeddings(message):
return { return {
"metadata": { "metadata": {
"id": message.metadata.id, "id": message.metadata.id,
"metadata": [ "metadata": serialize_subgraph(message.metadata.metadata),
{
"s": t.s.value,
"p": t.p.value,
"o": t.o.value,
}
for t in message.metadata.metadata
],
"user": message.metadata.user, "user": message.metadata.user,
"collection": message.metadata.collection, "collection": message.metadata.collection,
}, },
@ -560,23 +558,7 @@ class Api:
return web.json_response( return web.json_response(
{ {
"response": [ "response": serialize_subgraph(resp.triples),
{
"s": {
"v": t.s.value,
"e": t.s.is_uri,
},
"p": {
"v": t.p.value,
"e": t.p.is_uri,
},
"o": {
"v": t.o.value,
"e": t.o.is_uri,
}
}
for t in resp.triples
]
} }
) )