Feature/doc metadata labels (#130)

* Add schema load util

* Added a sample schema turtle file will be useful for future testing and
tutorials.

* Fixed graph label metadata confusion, was created incorrect subjectOf
edges.
This commit is contained in:
cybermaggedon 2024-10-29 21:18:02 +00:00 committed by GitHub
parent dedb66379d
commit 24d099793d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 196 additions and 3 deletions

32
schema.ttl Normal file
View file

@ -0,0 +1,32 @@
@prefix ns1: <http://trustgraph.ai/e/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix schema: <https://schema.org/> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
schema:subjectOf rdfs:label "subject of" .
skos:definition rdfs:label "definition" .
rdf:type rdfs:label "type" .
schema:DigitalDocument rdfs:label "digital document" .
schema:Organization rdfs:label "organization" .
schema:PublicationEvent rdfs:label "publication event" .
schema:copyrightNotice rdfs:label "copyright notice" .
schema:copyrightHolder rdfs:label "copyright holder" .
schema:copyrightYear rdfs:label "copyright year" .
schema:license rdfs:label "license" .
schema:publication rdfs:label "publication" .
schema:startDate rdfs:label "start date" .
schema:endDate rdfs:label "end date" .
schema:publishedBy rdfs:label "published by" .
schema:datePublished rdfs:label "date published" .
schema:publication rdfs:label "publication" .
schema:datePublished rdfs:label "date published" .
schema:url rdfs:label "url" .
schema:identifier rdfs:label "identifier" .
schema:keywords rdfs:label "keyword" .
skos:definition rdfs:label "definition" .

View file

@ -3,12 +3,14 @@ from . defs import *
from .. schema import Triple, Value
class DigitalDocument:
def __init__(
self, id, name=None, description=None, copyright_notice=None,
copyright_holder=None, copyright_year=None, license=None,
identifier=None,
publication=None, url=None, keywords=[]
):
self.id = id
self.name = name
self.description = description
@ -116,4 +118,3 @@ class DigitalDocument:
p=Value(value=URL, is_uri=True),
o=Value(value=self.url, is_uri=True)
))

View file

@ -67,4 +67,3 @@ class PublicationEvent:
s=Value(value=self.id, is_uri=True),
p=Value(value=END_DATE, is_uri=True),
o=Value(value=self.end_date, is_uri=False)))

View file

@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
Loads Graph embeddings into TrustGraph processing.
"""
import pulsar
from pulsar.schema import JsonSchema
from trustgraph.schema import Triples, Triple, Value, Metadata
from trustgraph.schema import triples_store_queue
import argparse
import os
import time
import pyarrow as pa
import rdflib
from trustgraph.log_level import LogLevel
default_user = 'trustgraph'
default_collection = 'default'
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
default_output_queue = triples_store_queue
class Loader:
def __init__(
self,
pulsar_host,
output_queue,
log_level,
files,
user,
collection,
):
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(Triples),
chunking_enabled=True,
)
self.files = files
self.user = user
self.collection = collection
def run(self):
try:
for file in self.files:
self.load_file(file)
except Exception as e:
print(e, flush=True)
def load_file(self, file):
g = rdflib.Graph()
g.parse(file, format="turtle")
for e in g:
s = Value(value=str(e[0]), is_uri=True)
p = Value(value=str(e[1]), is_uri=True)
if type(e[2]) == rdflib.term.URIRef:
o = Value(value=str(e[2]), is_uri=True)
else:
o = Value(value=str(e[2]), is_uri=False)
r = Triples(
metadata=Metadata(
id=None,
metadata=[],
user=self.user,
collection=self.collection,
),
triples=[ Triple(s=s, p=p, o=o) ]
)
self.producer.send(r)
def __del__(self):
self.client.close()
def main():
parser = argparse.ArgumentParser(
prog='loader',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-u', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-c', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.ERROR,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'files', nargs='+',
help=f'Turtle files to load'
)
args = parser.parse_args()
while True:
try:
p = Loader(
pulsar_host=args.pulsar_host,
output_queue=args.output_queue,
log_level=args.log_level,
files=args.files,
user=args.user,
collection=args.collection,
)
p.run()
print("File loaded.")
break
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
main()

View file

@ -46,6 +46,7 @@ setuptools.setup(
"scripts/tg-init-pulsar-manager",
"scripts/tg-load-pdf",
"scripts/tg-load-text",
"scripts/tg-load-turtle",
"scripts/tg-query-document-rag",
"scripts/tg-query-graph-rag",
"scripts/tg-init-pulsar",

View file

@ -189,7 +189,7 @@ class Processor(ConsumerProducer):
# 'Subject of' for o
triples.append(Triple(
s=o_value,
p=RDF_LABEL_VALUE,
p=SUBJECT_OF_VALUE,
o=Value(value=v.metadata.id, is_uri=True)
))