From 7954e863cc2feca53243aef3874b9ee577ba43a8 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 23 Oct 2024 18:04:04 +0100 Subject: [PATCH] Feature: document metadata (#123) * Rework metadata structure in processing messages to be a subgraph * Add subgraph creation for tg-load-pdf and tg-load-text based on command-line passing of doc attributes * Document metadata is added to knowledge graph with subjectOf linkage to extracted entities --- .../trustgraph/knowledge/__init__.py | 6 + trustgraph-base/trustgraph/knowledge/defs.py | 25 ++++ .../trustgraph/knowledge/document.py | 119 ++++++++++++++++++ .../trustgraph/knowledge/identifier.py | 23 ++++ .../trustgraph/knowledge/organization.py | 40 ++++++ .../trustgraph/knowledge/publication.py | 70 +++++++++++ trustgraph-base/trustgraph/rdf.py | 1 + trustgraph-base/trustgraph/schema/__init__.py | 2 - trustgraph-base/trustgraph/schema/graph.py | 8 +- trustgraph-base/trustgraph/schema/metadata.py | 12 +- trustgraph-base/trustgraph/schema/types.py | 5 + trustgraph-cli/scripts/tg-load-pdf | 100 ++++++++++++++- trustgraph-cli/scripts/tg-load-text | 100 ++++++++++++++- .../trustgraph/chunking/recursive/chunker.py | 10 +- .../trustgraph/chunking/token/chunker.py | 10 +- .../trustgraph/decoding/pdf/pdf_decoder.py | 9 +- .../extract/kg/definitions/extract.py | 48 ++++++- .../extract/kg/relationships/extract.py | 93 +++++++++----- .../trustgraph/extract/kg/topics/extract.py | 9 +- .../storage/triples/cassandra/write.py | 17 +-- .../trustgraph/storage/triples/neo4j/write.py | 16 +-- 21 files changed, 625 insertions(+), 98 deletions(-) create mode 100644 trustgraph-base/trustgraph/knowledge/__init__.py create mode 100644 trustgraph-base/trustgraph/knowledge/defs.py create mode 100644 trustgraph-base/trustgraph/knowledge/document.py create mode 100644 trustgraph-base/trustgraph/knowledge/identifier.py create mode 100644 trustgraph-base/trustgraph/knowledge/organization.py create mode 100644 trustgraph-base/trustgraph/knowledge/publication.py diff --git a/trustgraph-base/trustgraph/knowledge/__init__.py b/trustgraph-base/trustgraph/knowledge/__init__.py new file mode 100644 index 00000000..0ab6b5db --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/__init__.py @@ -0,0 +1,6 @@ + +from . identifier import * +from . publication import * +from . document import * +from . organization import * + diff --git a/trustgraph-base/trustgraph/knowledge/defs.py b/trustgraph-base/trustgraph/knowledge/defs.py new file mode 100644 index 00000000..b95863c6 --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/defs.py @@ -0,0 +1,25 @@ + +IS_A = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type' +LABEL = 'http://www.w3.org/2000/01/rdf-schema#label' + +DIGITAL_DOCUMENT = 'https://schema.org/DigitalDocument' +PUBLICATION_EVENT = 'https://schema.org/PublicationEvent' +ORGANIZATION = 'https://schema.org/Organization' + +NAME = 'https://schema.org/name' +DESCRIPTION = 'https://schema.org/description' +COPYRIGHT_NOTICE = 'https://schema.org/copyrightNotice' +COPYRIGHT_HOLDER = 'https://schema.org/copyrightHolder' +COPYRIGHT_YEAR = 'https://schema.org/copyrightYear' +LICENSE = 'https://schema.org/license' +PUBLICATION = 'https://schema.org/publication' +START_DATE = 'https://schema.org/startDate' +END_DATE = 'https://schema.org/endDate' +PUBLISHED_BY = 'https://schema.org/publishedBy' +DATE_PUBLISHED = 'https://schema.org/datePublished' +PUBLICATION = 'https://schema.org/publication' +DATE_PUBLISHED = 'https://schema.org/datePublished' +URL = 'https://schema.org/url' +IDENTIFIER = 'https://schema.org/identifier' +KEYWORD = 'https://schema.org/keywords' + diff --git a/trustgraph-base/trustgraph/knowledge/document.py b/trustgraph-base/trustgraph/knowledge/document.py new file mode 100644 index 00000000..9c31d6fb --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/document.py @@ -0,0 +1,119 @@ + +from . defs import * +from .. schema import Triple, Value + +class DigitalDocument: + def __init__( + self, id, name=None, description=None, copyright_notice=None, + copyright_holder=None, copyright_year=None, license=None, + identifier=None, + publication=None, url=None, keywords=[] + ): + self.id = id + self.name = name + self.description = description + self.copyright_notice = copyright_notice + self.copyright_holder = copyright_holder + self.copyright_year = copyright_year + self.license = license + self.publication = publication + self.url = url + self.identifier = identifier + self.keywords = keywords + + def emit(self, emit): + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=IS_A, is_uri=True), + o=Value(value=DIGITAL_DOCUMENT, is_uri=True) + )) + + if self.name: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=LABEL, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=NAME, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + if self.identifier: + + emit(Triple( + s=Value(value=id, is_uri=True), + p=Value(value=IDENTIFIER, is_uri=True), + o=Value(value=self.identifier, is_uri=False) + )) + + if self.description: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=DESCRIPTION, is_uri=True), + o=Value(value=self.description, is_uri=False) + )) + + if self.copyright_notice: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=COPYRIGHT_NOTICE, is_uri=True), + o=Value(value=self.copyright_notice, is_uri=False) + )) + + if self.copyright_holder: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=COPYRIGHT_HOLDER, is_uri=True), + o=Value(value=self.copyright_holder, is_uri=False) + )) + + if self.copyright_year: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=COPYRIGHT_YEAR, is_uri=True), + o=Value(value=self.copyright_year, is_uri=False) + )) + + if self.license: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=LICENSE, is_uri=True), + o=Value(value=self.license, is_uri=False) + )) + + if self.keywords: + for k in self.keywords: + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=KEYWORD, is_uri=True), + o=Value(value=k, is_uri=False) + )) + + if self.publication: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=PUBLICATION, is_uri=True), + o=Value(value=self.publication.id, is_uri=True) + )) + + self.publication.emit(emit) + + if self.url: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=URL, is_uri=True), + o=Value(value=self.url, is_uri=True) + )) + diff --git a/trustgraph-base/trustgraph/knowledge/identifier.py b/trustgraph-base/trustgraph/knowledge/identifier.py new file mode 100644 index 00000000..e0052fce --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/identifier.py @@ -0,0 +1,23 @@ + +import uuid +import hashlib + +def hash(data): + + if isinstance(data, str): + data = data.encode("utf-8") + + # Create a SHA256 hash from the data + id = hashlib.sha256(data).hexdigest() + + # Convert into a UUID, 64-byte hash becomes 32-byte UUID + id = str(uuid.UUID(id[::2])) + + return id + +def to_uri(pref, id): + return f"https://trustgraph.ai/{pref}/{id}" + +PREF_PUBEV = "pubev" +PREF_ORG = "org" +PREF_DOC = "doc" diff --git a/trustgraph-base/trustgraph/knowledge/organization.py b/trustgraph-base/trustgraph/knowledge/organization.py new file mode 100644 index 00000000..1129dd6c --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/organization.py @@ -0,0 +1,40 @@ + +from . defs import * +from .. schema import Triple, Value + +class Organization: + def __init__(self, id, name=None, description=None): + self.id = id + self.name = name + self.description = description + + def emit(self, emit): + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=IS_A, is_uri=True), + o=Value(value=ORGANIZATION, is_uri=True) + )) + + if self.name: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=LABEL, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=NAME, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + if self.description: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=DESCRIPTION, is_uri=True), + o=Value(value=self.description, is_uri=False) + )) + diff --git a/trustgraph-base/trustgraph/knowledge/publication.py b/trustgraph-base/trustgraph/knowledge/publication.py new file mode 100644 index 00000000..16522282 --- /dev/null +++ b/trustgraph-base/trustgraph/knowledge/publication.py @@ -0,0 +1,70 @@ + +from . defs import * +from .. schema import Triple, Value + +class PublicationEvent: + def __init__( + self, id, organization=None, name=None, description=None, + start_date=None, end_date=None, + ): + self.id = id + self.organization = organization + self.name = name + self.description = description + self.start_date = start_date + self.end_date = end_date + + def emit(self, emit): + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=IS_A, is_uri=True), + o=Value(value=PUBLICATION_EVENT, is_uri=True))) + + if self.name: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=LABEL, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=NAME, is_uri=True), + o=Value(value=self.name, is_uri=False) + )) + + if self.description: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=DESCRIPTION, is_uri=True), + o=Value(value=self.description, is_uri=False) + )) + + if self.organization: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=PUBLISHED_BY, is_uri=True), + o=Value(value=self.organization.id, is_uri=True) + )) + + self.organization.emit(emit) + + if self.start_date: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=START_DATE, is_uri=True), + o=Value(value=self.start_date, is_uri=False) + )) + + if self.end_date: + + emit(Triple( + s=Value(value=self.id, is_uri=True), + p=Value(value=END_DATE, is_uri=True), + o=Value(value=self.end_date, is_uri=False))) + diff --git a/trustgraph-base/trustgraph/rdf.py b/trustgraph-base/trustgraph/rdf.py index b65d9c29..ef1da183 100644 --- a/trustgraph-base/trustgraph/rdf.py +++ b/trustgraph-base/trustgraph/rdf.py @@ -1,6 +1,7 @@ RDF_LABEL = "http://www.w3.org/2000/01/rdf-schema#label" DEFINITION = "http://www.w3.org/2004/02/skos/core#definition" +SUBJECT_OF = "https://schema.org/subjectOf" TRUSTGRAPH_ENTITIES = "http://trustgraph.ai/e/" diff --git a/trustgraph-base/trustgraph/schema/__init__.py b/trustgraph-base/trustgraph/schema/__init__.py index f3cc5b60..7f0334be 100644 --- a/trustgraph-base/trustgraph/schema/__init__.py +++ b/trustgraph-base/trustgraph/schema/__init__.py @@ -9,5 +9,3 @@ from . graph import * from . retrieval import * from . metadata import * - - diff --git a/trustgraph-base/trustgraph/schema/graph.py b/trustgraph-base/trustgraph/schema/graph.py index 107478a4..2d108a30 100644 --- a/trustgraph-base/trustgraph/schema/graph.py +++ b/trustgraph-base/trustgraph/schema/graph.py @@ -1,7 +1,7 @@ from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double -from . types import Error, Value +from . types import Error, Value, Triple from . topic import topic from . metadata import Metadata @@ -41,11 +41,9 @@ graph_embeddings_response_queue = topic( # Graph triples -class Triple(Record): +class Triples(Record): metadata = Metadata() - s = Value() - p = Value() - o = Value() + triples = Array(Triple()) triples_store_queue = topic('triples-store') diff --git a/trustgraph-base/trustgraph/schema/metadata.py b/trustgraph-base/trustgraph/schema/metadata.py index c7dbbae6..5922db26 100644 --- a/trustgraph-base/trustgraph/schema/metadata.py +++ b/trustgraph-base/trustgraph/schema/metadata.py @@ -1,10 +1,16 @@ -from pulsar.schema import Record, String +from pulsar.schema import Record, String, Array +from . types import Triple class Metadata(Record): - source = String() + + # Source identifier id = String() - title = String() + + # Subgraph + metadata = Array(Triple()) + + # Collection management user = String() collection = String() diff --git a/trustgraph-base/trustgraph/schema/types.py b/trustgraph-base/trustgraph/schema/types.py index 4cad70ac..b75a0884 100644 --- a/trustgraph-base/trustgraph/schema/types.py +++ b/trustgraph-base/trustgraph/schema/types.py @@ -10,6 +10,11 @@ class Value(Record): is_uri = Boolean() type = String() +class Triple(Record): + s = Value() + p = Value() + o = Value() + class Field(Record): name = String() # int, string, long, bool, float, double diff --git a/trustgraph-cli/scripts/tg-load-pdf b/trustgraph-cli/scripts/tg-load-pdf index 460a2f06..2ff718d4 100755 --- a/trustgraph-cli/scripts/tg-load-pdf +++ b/trustgraph-cli/scripts/tg-load-pdf @@ -6,14 +6,20 @@ Loads a PDF document into TrustGraph processing. import pulsar from pulsar.schema import JsonSchema -from trustgraph.schema import Document, document_ingest_queue, Metadata import base64 import hashlib import argparse import os import time +import uuid +from trustgraph.schema import Document, document_ingest_queue +from trustgraph.schema import Metadata from trustgraph.log_level import LogLevel +from trustgraph.knowledge import hash, to_uri +from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG +from trustgraph.knowledge import Organization, PublicationEvent +from trustgraph.knowledge import DigitalDocument default_user = 'trustgraph' default_collection = 'default' @@ -27,6 +33,7 @@ class Loader: user, collection, log_level, + metadata, ): self.client = pulsar.Client( @@ -42,6 +49,7 @@ class Loader: self.user = user self.collection = collection + self.metadata = metadata def load(self, files): @@ -55,13 +63,23 @@ class Loader: path = file data = open(path, "rb").read() - id = hashlib.sha256(path.encode("utf-8")).hexdigest()[0:8] + # Create a SHA256 hash from the data + id = hash(data) + + id = to_uri(PREF_DOC, id) + + triples = [] + + def emit(t): + triples.append(t) + + self.metadata.id = id + self.metadata.emit(emit) r = Document( metadata=Metadata( - source=path, - title=path, id=id, + metadata=triples, user=self.user, collection=self.collection, ), @@ -112,6 +130,54 @@ def main(): help=f'Collection ID (default: {default_collection})' ) + parser.add_argument( + '--name', help=f'Document name' + ) + + parser.add_argument( + '--description', help=f'Document description' + ) + + parser.add_argument( + '--copyright-notice', help=f'Copyright notice' + ) + + parser.add_argument( + '--copyright-holder', help=f'Copyright holder' + ) + + parser.add_argument( + '--copyright-year', help=f'Copyright year' + ) + + parser.add_argument( + '--license', help=f'Copyright license' + ) + + parser.add_argument( + '--publication-organization', help=f'Publication organization' + ) + + parser.add_argument( + '--publication-description', help=f'Publication description' + ) + + parser.add_argument( + '--publication-date', help=f'Publication date' + ) + + parser.add_argument( + '--url', help=f'Document URL' + ) + + parser.add_argument( + '--keyword', nargs='+', help=f'Keyword' + ) + + parser.add_argument( + '--identifier', '--id', help=f'Document ID' + ) + parser.add_argument( '-l', '--log-level', type=LogLevel, @@ -131,12 +197,38 @@ def main(): try: + document = DigitalDocument( + id, + name=args.name, + description=args.description, + copyright_notice=args.copyright_notice, + copyright_holder=args.copyright_holder, + copyright_year=args.copyright_year, + license=args.license, + url=args.url, + keywords=args.keyword, + ) + + if args.publication_organization: + org = Organization( + id=to_uri(PREF_ORG, hash(args.publication_organization)), + name=args.publication_organization, + ) + document.publication = PublicationEvent( + id = to_uri(PREF_PUBEV, str(uuid.uuid4())), + organization=org, + description=args.publication_description, + start_date=args.publication_date, + end_date=args.publication_date, + ) + p = Loader( pulsar_host=args.pulsar_host, output_queue=args.output_queue, user=args.user, collection=args.collection, log_level=args.log_level, + metadata=document, ) p.load(args.files) diff --git a/trustgraph-cli/scripts/tg-load-text b/trustgraph-cli/scripts/tg-load-text index e22af5b1..57e297b8 100755 --- a/trustgraph-cli/scripts/tg-load-text +++ b/trustgraph-cli/scripts/tg-load-text @@ -6,14 +6,20 @@ Loads a text document into TrustGraph processing. import pulsar from pulsar.schema import JsonSchema -from trustgraph.schema import TextDocument, text_ingest_queue, Metadata import base64 import hashlib import argparse import os import time +import uuid +from trustgraph.schema import TextDocument, text_ingest_queue +from trustgraph.schema import Metadata from trustgraph.log_level import LogLevel +from trustgraph.knowledge import hash, to_uri +from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG +from trustgraph.knowledge import Organization, PublicationEvent +from trustgraph.knowledge import DigitalDocument default_user = 'trustgraph' default_collection = 'default' @@ -27,6 +33,7 @@ class Loader: user, collection, log_level, + metadata, ): self.client = pulsar.Client( @@ -42,6 +49,7 @@ class Loader: self.user = user self.collection = collection + self.metadata = metadata def load(self, files): @@ -55,13 +63,23 @@ class Loader: path = file data = open(path, "rb").read() - id = hashlib.sha256(path.encode("utf-8")).hexdigest()[0:8] + # Create a SHA256 hash from the data + id = hash(data) + + id = to_uri(PREF_DOC, id) + + triples = [] + + def emit(t): + triples.append(t) + + self.metadata.id = id + self.metadata.emit(emit) r = TextDocument( metadata=Metadata( - source=path, - title=path, id=id, + metadata=triples, user=self.user, collection=self.collection, ), @@ -112,6 +130,54 @@ def main(): help=f'Collection ID (default: {default_collection})' ) + parser.add_argument( + '--name', help=f'Document name' + ) + + parser.add_argument( + '--description', help=f'Document description' + ) + + parser.add_argument( + '--copyright-notice', help=f'Copyright notice' + ) + + parser.add_argument( + '--copyright-holder', help=f'Copyright holder' + ) + + parser.add_argument( + '--copyright-year', help=f'Copyright year' + ) + + parser.add_argument( + '--license', help=f'Copyright license' + ) + + parser.add_argument( + '--publication-organization', help=f'Publication organization' + ) + + parser.add_argument( + '--publication-description', help=f'Publication description' + ) + + parser.add_argument( + '--publication-date', help=f'Publication date' + ) + + parser.add_argument( + '--url', help=f'Document URL' + ) + + parser.add_argument( + '--keyword', nargs='+', help=f'Keyword' + ) + + parser.add_argument( + '--identifier', '--id', help=f'Document ID' + ) + parser.add_argument( '-l', '--log-level', type=LogLevel, @@ -131,12 +197,38 @@ def main(): try: + document = DigitalDocument( + id, + name=args.name, + description=args.description, + copyright_notice=args.copyright_notice, + copyright_holder=args.copyright_holder, + copyright_year=args.copyright_year, + license=args.license, + url=args.url, + keywords=args.keyword, + ) + + if args.publication_organization: + org = Organization( + id=to_uri(PREF_ORG, hash(args.publication_organization)), + name=args.publication_organization, + ) + document.publication = PublicationEvent( + id = to_uri(PREF_PUBEV, str(uuid.uuid4())), + organization=org, + description=args.publication_description, + start_date=args.publication_date, + end_date=args.publication_date, + ) + p = Loader( pulsar_host=args.pulsar_host, output_queue=args.output_queue, user=args.user, collection=args.collection, log_level=args.log_level, + metadata=document, ) p.load(args.files) diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index bec854bb..694ced70 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -63,16 +63,8 @@ class Processor(ConsumerProducer): for ix, chunk in enumerate(texts): - id = v.metadata.id + "-c" + str(ix) - r = Chunk( - metadata=Metadata( - source=v.metadata.source, - id=id, - title=v.metadata.title, - user=v.metadata.user, - collection=v.metadata.collection, - ), + metadata=v.metadata, chunk=chunk.page_content.encode("utf-8"), ) diff --git a/trustgraph-flow/trustgraph/chunking/token/chunker.py b/trustgraph-flow/trustgraph/chunking/token/chunker.py index e7bc5667..dccd9c89 100755 --- a/trustgraph-flow/trustgraph/chunking/token/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/token/chunker.py @@ -62,16 +62,8 @@ class Processor(ConsumerProducer): for ix, chunk in enumerate(texts): - id = v.metadata.id + "-c" + str(ix) - r = Chunk( - metadata=Metadata( - source=v.metadata.source, - id=id, - title=v.metadata.title, - user=v.metadata.user, - collection=v.metadata.collection, - ), + metadata=v.metadata, chunk=chunk.page_content.encode("utf-8"), ) diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index 7e9aeafa..38ac9257 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -59,15 +59,8 @@ class Processor(ConsumerProducer): for ix, page in enumerate(pages): - id = v.metadata.id + "-p" + str(ix) r = TextDocument( - metadata=Metadata( - source=v.metadata.source, - title=v.metadata.title, - id=id, - user=v.metadata.user, - collection=v.metadata.collection, - ), + metadata=v.metadata, text=page.page_content.encode("utf-8"), ) diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index d528b74d..eed34574 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -7,16 +7,18 @@ get entity definitions which are output as graph edges. import urllib.parse import json -from .... schema import ChunkEmbeddings, Triple, Metadata, Value +from .... schema import ChunkEmbeddings, Triple, Triples, Metadata, Value from .... schema import chunk_embeddings_ingest_queue, triples_store_queue from .... schema import prompt_request_queue from .... schema import prompt_response_queue from .... log_level import LogLevel from .... clients.prompt_client import PromptClient -from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION +from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF from .... base import ConsumerProducer DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) +RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) +SUBJECT_OF_VALUE = Value(value=SUBJECT_OF, is_uri=True) module = ".".join(__name__.split(".")[1:-1]) @@ -44,7 +46,7 @@ class Processor(ConsumerProducer): "output_queue": output_queue, "subscriber": subscriber, "input_schema": ChunkEmbeddings, - "output_schema": Triple, + "output_schema": Triples, "prompt_request_queue": pr_request_queue, "prompt_response_queue": pr_response_queue, } @@ -69,9 +71,12 @@ class Processor(ConsumerProducer): return self.prompt.request_definitions(chunk) - def emit_edge(self, metadata, s, p, o): + def emit_edges(self, metadata, triples): - t = Triple(metadata=metadata, s=s, p=p, o=o) + t = Triples( + metadata=metadata, + triples=triples, + ) self.producer.send(t) def handle(self, msg): @@ -85,6 +90,13 @@ class Processor(ConsumerProducer): defs = self.get_definitions(chunk) + triples = [] + + # FIXME: Putting metadata into triples store is duplicated in + # relationships extractor too + for t in v.metadata.metadata: + triples.append(t) + for defn in defs: s = defn.name @@ -101,7 +113,31 @@ class Processor(ConsumerProducer): s_value = Value(value=str(s_uri), is_uri=True) o_value = Value(value=str(o), is_uri=False) - self.emit_edge(v.metadata, s_value, DEFINITION_VALUE, o_value) + triples.append(Triple( + s=s_value, + p=RDF_LABEL_VALUE, + o=Value(value=s, is_uri=False), + )) + + triples.append(Triple( + s=s_value, p=DEFINITION_VALUE, o=o_value + )) + + triples.append(Triple( + s=s_value, + p=SUBJECT_OF_VALUE, + o=Value(value=v.metadata.id, is_uri=True) + )) + + self.emit_edges( + Metadata( + id=v.metadata.id, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples + ) except Exception as e: print("Exception: ", e, flush=True) diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index cb80e47f..4daf6859 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -9,7 +9,7 @@ import urllib.parse import os from pulsar.schema import JsonSchema -from .... schema import ChunkEmbeddings, Triple, GraphEmbeddings +from .... schema import ChunkEmbeddings, Triple, Triples, GraphEmbeddings from .... schema import Metadata, Value from .... schema import chunk_embeddings_ingest_queue, triples_store_queue from .... schema import graph_embeddings_store_queue @@ -17,10 +17,11 @@ from .... schema import prompt_request_queue from .... schema import prompt_response_queue from .... log_level import LogLevel from .... clients.prompt_client import PromptClient -from .... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES +from .... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES, SUBJECT_OF from .... base import ConsumerProducer RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) +SUBJECT_OF_VALUE = Value(value=SUBJECT_OF, is_uri=True) module = ".".join(__name__.split(".")[1:-1]) @@ -50,7 +51,7 @@ class Processor(ConsumerProducer): "output_queue": output_queue, "subscriber": subscriber, "input_schema": ChunkEmbeddings, - "output_schema": Triple, + "output_schema": Triples, "prompt_request_queue": pr_request_queue, "prompt_response_queue": pr_response_queue, } @@ -69,7 +70,7 @@ class Processor(ConsumerProducer): "prompt_response_queue": pr_response_queue, "subscriber": subscriber, "input_schema": ChunkEmbeddings.__name__, - "output_schema": Triple.__name__, + "output_schema": Triples.__name__, "vector_schema": GraphEmbeddings.__name__, }) @@ -92,9 +93,12 @@ class Processor(ConsumerProducer): return self.prompt.request_relationships(chunk) - def emit_edge(self, metadata, s, p, o): + def emit_edges(self, metadata, triples): - t = Triple(metadata=metadata, s=s, p=p, o=o) + t = Triples( + metadata=metadata, + triples=triples, + ) self.producer.send(t) def emit_vec(self, metadata, ent, vec): @@ -113,6 +117,13 @@ class Processor(ConsumerProducer): rels = self.get_relationships(chunk) + triples = [] + + # FIXME: Putting metadata into triples store is duplicated in + # relationships extractor too + for t in v.metadata.metadata: + triples.append(t) + for rel in rels: s = rel.s @@ -139,43 +150,65 @@ class Processor(ConsumerProducer): else: o_value = Value(value=str(o), is_uri=False) - self.emit_edge( - v.metadata, - s_value, - p_value, - o_value - ) + triples.append(Triple( + s=s_value, + p=p_value, + o=o_value + )) # Label for s - self.emit_edge( - v.metadata, - s_value, - RDF_LABEL_VALUE, - Value(value=str(s), is_uri=False) - ) + triples.append(Triple( + s=s_value, + p=RDF_LABEL_VALUE, + o=Value(value=str(s), is_uri=False) + )) # Label for p - self.emit_edge( - v.metadata, - p_value, - RDF_LABEL_VALUE, - Value(value=str(p), is_uri=False) - ) + triples.append(Triple( + s=p_value, + p=RDF_LABEL_VALUE, + o=Value(value=str(p), is_uri=False) + )) if rel.o_entity: # Label for o - self.emit_edge( - v.metadata, - o_value, - RDF_LABEL_VALUE, - Value(value=str(o), is_uri=False) - ) + triples.append(Triple( + s=o_value, + p=RDF_LABEL_VALUE, + o=Value(value=str(o), is_uri=False) + )) + + # 'Subject of' for s + triples.append(Triple( + s=s_value, + p=SUBJECT_OF_VALUE, + o=Value(value=v.metadata.id, is_uri=True) + )) + + if rel.o_entity: + # 'Subject of' for o + triples.append(Triple( + s=o_value, + p=RDF_LABEL_VALUE, + o=Value(value=v.metadata.id, is_uri=True) + )) self.emit_vec(v.metadata, s_value, v.vectors) self.emit_vec(v.metadata, p_value, v.vectors) + if rel.o_entity: self.emit_vec(v.metadata, o_value, v.vectors) + self.emit_edges( + Metadata( + id=v.metadata.id, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples + ) + except Exception as e: print("Exception: ", e, flush=True) diff --git a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py index 81e52669..8dfc3e6e 100755 --- a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py @@ -7,7 +7,7 @@ get entity definitions which are output as graph edges. import urllib.parse import json -from .... schema import ChunkEmbeddings, Triple, Metadata, Value +from .... schema import ChunkEmbeddings, Triple, Triples, Metadata, Value from .... schema import chunk_embeddings_ingest_queue, triples_store_queue from .... schema import prompt_request_queue from .... schema import prompt_response_queue @@ -44,7 +44,7 @@ class Processor(ConsumerProducer): "output_queue": output_queue, "subscriber": subscriber, "input_schema": ChunkEmbeddings, - "output_schema": Triple, + "output_schema": Triples, "prompt_request_queue": pr_request_queue, "prompt_response_queue": pr_response_queue, } @@ -71,7 +71,10 @@ class Processor(ConsumerProducer): def emit_edge(self, metadata, s, p, o): - t = Triple(metadata=metadata, s=s, p=p, o=o) + t = Triples( + metadata=metadata, + triples=[Triple(s=s, p=p, o=o)], + ) self.producer.send(t) def handle(self, msg): diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index e8074c80..6825d452 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -10,7 +10,7 @@ import argparse import time from .... direct.cassandra import TrustGraph -from .... schema import Triple +from .... schema import Triple, Triples from .... schema import triples_store_queue from .... log_level import LogLevel from .... base import Consumer @@ -33,7 +33,7 @@ class Processor(Consumer): **params | { "input_queue": input_queue, "subscriber": subscriber, - "input_schema": Triple, + "input_schema": Triples, "graph_host": graph_host, } ) @@ -62,12 +62,13 @@ class Processor(Consumer): raise e self.table = table - - self.tg.insert( - v.s.value, - v.p.value, - v.o.value - ) + + for t in v.triples: + self.tg.insert( + t.s.value, + t.p.value, + t.o.value + ) @staticmethod def add_args(parser): diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index d20ecdd2..c346e5c4 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -116,14 +116,16 @@ class Processor(Consumer): v = msg.value() - self.create_node(v.s.value) + for t in v.triples: - if v.o.is_uri: - self.create_node(v.o.value) - self.relate_node(v.s.value, v.p.value, v.o.value) - else: - self.create_literal(v.o.value) - self.relate_literal(v.s.value, v.p.value, v.o.value) + self.create_node(t.s.value) + + if v.o.is_uri: + self.create_node(t.o.value) + self.relate_node(t.s.value, t.p.value, t.o.value) + else: + self.create_literal(t.o.value) + self.relate_literal(t.s.value, t.p.value, t.o.value) @staticmethod def add_args(parser):