mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-10 23:35:14 +02:00
Merge pull request #229 from trustgraph-ai/release/v0.18
Release/v0.18 onto master
This commit is contained in:
commit
78822a6692
11 changed files with 505 additions and 118 deletions
125
templates/README.md
Normal file
125
templates/README.md
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
|
||||
# TrustGraph template generation
|
||||
|
||||
There are two utilities here:
|
||||
|
||||
- `generate`: Generates a single Docker Compose launch configuration
|
||||
based on configuration you provide.
|
||||
- `generate-all`: Generates the release bundle for releases. You won't
|
||||
need to use this unless you are managing releases.
|
||||
|
||||
## `generate-all`
|
||||
|
||||
Previously, this generates a full set of all vector DB / triple store / LLM
|
||||
combinations, and put them in a single ZIP file. But this got out of
|
||||
hand, so at the time of writing, this generates a single configuraton
|
||||
using Qdrant vector DB, Ollama LLM support and Cassandra for a triple store.
|
||||
|
||||
The combinations are contained withing the code, it takes two arguments:
|
||||
- output ZIP file (is over-written)
|
||||
- TrustGraph version number
|
||||
|
||||
```
|
||||
templates/generate-all output.zip 0.18.11
|
||||
```
|
||||
|
||||
## `generate`
|
||||
|
||||
This utility takes a configuration file describing the components to bundle,
|
||||
and outputs a Docker Compose YAML file.
|
||||
|
||||
### Input configuration
|
||||
|
||||
The input configuration is a JSON file, an array of components to pull into
|
||||
the configuration. For each component, there is a name and a (possibly empty)
|
||||
object describing addtional parameters for that component.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
[
|
||||
{
|
||||
"name": "cassandra",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "pulsar",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "qdrant",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "embeddings-hf",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "graph-rag",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "grafana",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "trustgraph",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "googleaistudio",
|
||||
"parameters": {
|
||||
"googleaistudio-temperature": 0.3,
|
||||
"googleaistudio-max-output-tokens": 2048,
|
||||
"googleaistudio-model": "gemini-1.5-pro-002"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "prompt-template",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "override-recursive-chunker",
|
||||
"parameters": {
|
||||
"chunk-size": 1000,
|
||||
"chunk-overlap": 50
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "workbench-ui",
|
||||
"parameters": {}
|
||||
},
|
||||
{
|
||||
"name": "agent-manager-react",
|
||||
"parameters": {}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
If you want to make your own configuration you could try changing the
|
||||
configuration above:
|
||||
- Components which are essential: pulsar, trustgraph, graph-rag, grafana,
|
||||
agent-manager-react
|
||||
- You need a triple store, one of: cassandra, memgraph, falkordb, neo4j
|
||||
- You need a vector store, one of: qdrant, pinecone
|
||||
- You need an LLM, one of: azure, azure-openai, bedrock, claude, cohere,
|
||||
llamafile, ollama, openai, vertexai.
|
||||
- You need an embeddings implementation, one of: embeddings-hf,
|
||||
embeddings-ollama
|
||||
- Optionally add the Workbench tool: workbench-ui
|
||||
|
||||
Components have over-ridable parameters, look in the component definition
|
||||
in `templates/components/` to see what you can override.
|
||||
|
||||
### Invocation
|
||||
|
||||
Two parameters:
|
||||
- The output ZIP file
|
||||
- The version number
|
||||
|
||||
The configuration file described above is provided on standard input
|
||||
|
||||
```
|
||||
templates/generate out.zip 0.18.9 < config.json
|
||||
```
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ def main():
|
|||
print("Usage:")
|
||||
print(" generate <outfile> <version> < input.json")
|
||||
print()
|
||||
raise RuntimeError("Arg error")
|
||||
sys.exit(1)
|
||||
|
||||
outfile = sys.argv[1]
|
||||
version = sys.argv[2]
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ def full_config_object(
|
|||
|
||||
return config_object([
|
||||
graph_store, "pulsar", vector_store, embeddings,
|
||||
"graph-rag", "grafana", "trustgraph", llm
|
||||
"graph-rag", "grafana", "trustgraph", llm, "workbench-ui",
|
||||
])
|
||||
|
||||
def generate_config(
|
||||
|
|
@ -119,13 +119,19 @@ def generate_config(
|
|||
def generate_all(output, version):
|
||||
|
||||
for platform in [
|
||||
"docker-compose", "minikube-k8s", "gcp-k8s"
|
||||
"docker-compose",
|
||||
# "minikube-k8s", "gcp-k8s"
|
||||
]:
|
||||
for model in [
|
||||
"azure", "azure-openai", "bedrock", "claude", "cohere",
|
||||
"googleaistudio", "llamafile", "ollama", "openai", "vertexai",
|
||||
# "azure", "azure-openai", "bedrock", "claude", "cohere",
|
||||
# "googleaistudio", "llamafile",
|
||||
"ollama",
|
||||
# "openai", "vertexai",
|
||||
]:
|
||||
for graph in [ "cassandra", "neo4j", "falkordb" ]:
|
||||
for graph in [
|
||||
"cassandra",
|
||||
# "neo4j", "falkordb"
|
||||
]:
|
||||
|
||||
y = generate_config(
|
||||
llm=model, graph_store=graph, platform=platform,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,15 @@
|
|||
|
||||
Note! this is a subset of possible configurations, to generate your own
|
||||
launch config use the config util...
|
||||
|
||||
- Production: https://config-ui.demo.trustgraph.ai
|
||||
- Early release: https://dev.config-ui.demo.trustgraph.ai
|
||||
|
||||
The config util auto-generates deployment instructions for your
|
||||
configuration, so that's the recommended way to deploy.
|
||||
|
||||
----------------------------------------------------------------------------
|
||||
|
||||
These are launch configurations for TrustGraph. See https://trustgraph.ai for
|
||||
the quickstart using docker compose.
|
||||
|
||||
|
|
|
|||
103
test-api/test-load-document
Executable file
103
test-api/test-load-document
Executable file
|
|
@ -0,0 +1,103 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
import base64
|
||||
|
||||
url = "http://localhost:8088/api/v1/"
|
||||
|
||||
############################################################################
|
||||
|
||||
text = open("../sources/Challenger-Report-Vol1.pdf", "rb").read()
|
||||
|
||||
# Some random identifiers. The doc ID is important, as extracted knowledge
|
||||
# is linked back to this identifier
|
||||
org_id = "https://trustgraph.ai/org/1dd51ece-8bd3-48b8-98ce-1ac9164c5214"
|
||||
doc_id = "https://trustgraph.ai/doc/72ef3374-af7a-40c4-8c7b-45050aef5b90"
|
||||
pub_id = "https://trustgraph.ai/pubev/59012ae1-65d4-441f-8288-b6f3c6c15333"
|
||||
|
||||
# Organization metadata
|
||||
org_facts = [
|
||||
[org_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/Organization"],
|
||||
[org_id, "http://www.w3.org/2000/01/rdf-schema#label", "NASA"],
|
||||
[org_id, "https://schema.org/name", "NASA"]
|
||||
]
|
||||
|
||||
# Publication metadata. Note how it links to the Organization
|
||||
pub_facts = [
|
||||
[pub_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/PublicationEvent"],
|
||||
[pub_id, "https://schema.org/description", "Uploading to Github"],
|
||||
[pub_id, "https://schema.org/endDate", "1986-06-06"],
|
||||
[pub_id, "https://schema.org/publishedBy", org_id],
|
||||
[pub_id, "https://schema.org/startDate", "1986-06-06"]
|
||||
]
|
||||
|
||||
# Document metadata. Note how it links to the publication event
|
||||
doc_facts = [
|
||||
[doc_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/DigitalDocument"],
|
||||
[doc_id, "http://www.w3.org/2000/01/rdf-schema#label",
|
||||
"Challenger Report Volume 1"],
|
||||
[doc_id, "https://schema.org/copyrightHolder", "US Government"],
|
||||
[doc_id, "https://schema.org/copyrightNotice",
|
||||
"Work of the US Gov. Public Use Permitted"],
|
||||
[doc_id, "https://schema.org/copyrightYear", "1986"],
|
||||
[doc_id, "https://schema.org/description",
|
||||
"The findings of the Presidential Commission regarding the circumstances surrounding the Challenger accident are reported and recommendations for corrective action are outlined"
|
||||
],
|
||||
[doc_id, "https://schema.org/keywords", "nasa"],
|
||||
[doc_id, "https://schema.org/keywords", "challenger"],
|
||||
[doc_id, "https://schema.org/keywords", "space-shuttle"],
|
||||
[doc_id, "https://schema.org/keywords", "shuttle"],
|
||||
[doc_id, "https://schema.org/keywords", "orbiter"],
|
||||
[doc_id, "https://schema.org/name", "Challenger Report Volume 1"],
|
||||
[doc_id, "https://schema.org/publication", pub_id],
|
||||
[doc_id, "https://schema.org/url",
|
||||
"https://ntrs.nasa.gov/citations/19860015255"]
|
||||
]
|
||||
|
||||
def to_value(x):
|
||||
if x.startswith("https://"):
|
||||
return { "v": x, "e": True }
|
||||
if x.startswith("http://"):
|
||||
return { "v": x, "e": True }
|
||||
return { "v": x, "e": False }
|
||||
|
||||
# Convert the above metadata into the right form
|
||||
metadata = [
|
||||
{ "s": to_value(t[0]), "p": to_value(t[1]), "o": to_value(t[2]) }
|
||||
for t in org_facts + pub_facts + doc_facts
|
||||
]
|
||||
|
||||
input = {
|
||||
|
||||
# Document identifer. Knowledge derived by TrustGraph is linked to this
|
||||
# identifier, so the additional metadata specified above is linked to the
|
||||
# derived knowledge and users of the knowledge graph could see
|
||||
# information about the source of knowledge
|
||||
"id": doc_id,
|
||||
|
||||
# Additional metadata in the form of RDF triples
|
||||
"metadata": metadata,
|
||||
|
||||
# The PDF document, is presented as a base64 encoded document.
|
||||
"data": base64.b64encode(text).decode("utf-8")
|
||||
|
||||
}
|
||||
|
||||
resp = requests.post(
|
||||
f"{url}load/document",
|
||||
json=input,
|
||||
)
|
||||
|
||||
resp = resp.json()
|
||||
|
||||
if "error" in resp:
|
||||
print(f"Error: {resp['error']}")
|
||||
sys.exit(1)
|
||||
|
||||
print(resp)
|
||||
|
||||
100
test-api/test-load-text
Executable file
100
test-api/test-load-text
Executable file
|
|
@ -0,0 +1,100 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
import base64
|
||||
|
||||
url = "http://localhost:8088/api/v1/"
|
||||
|
||||
############################################################################
|
||||
|
||||
text = open("docs/README.cats", "rb").read()
|
||||
|
||||
# Some random identifiers. The doc ID is important, as extracted knowledge
|
||||
# is linked back to this identifier
|
||||
org_id = "https://trustgraph.ai/org/3c35111a-f8ce-54b2-4dd6-c673f8bf0d09"
|
||||
doc_id = "https://trustgraph.ai/doc/4faa45c1-f91a-a96a-d44f-2e57b9813db8"
|
||||
pub_id = "https://trustgraph.ai/pubev/a847d950-a281-4099-aaab-c5e35333ff61"
|
||||
|
||||
# Organization metadata
|
||||
org_facts = [
|
||||
[org_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/Organization"],
|
||||
[org_id, "http://www.w3.org/2000/01/rdf-schema#label", "trustgraph.ai"],
|
||||
[org_id, "https://schema.org/name", "trustgraph.ai"]
|
||||
]
|
||||
|
||||
# Publication metadata. Note how it links to the Organization
|
||||
pub_facts = [
|
||||
[pub_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/PublicationEvent"],
|
||||
[pub_id, "https://schema.org/description", "Uploading to Github"],
|
||||
[pub_id, "https://schema.org/endDate", "2024-10-23"],
|
||||
[pub_id, "https://schema.org/publishedBy", org_id],
|
||||
[pub_id, "https://schema.org/startDate", "2024-10-23"]
|
||||
]
|
||||
|
||||
# Document metadata. Note how it links to the publication event
|
||||
doc_facts = [
|
||||
[doc_id, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
"https://schema.org/DigitalDocument"],
|
||||
[doc_id, "http://www.w3.org/2000/01/rdf-schema#label", "Mark's cats"],
|
||||
[doc_id, "https://schema.org/copyrightHolder", "trustgraph.ai"],
|
||||
[doc_id, "https://schema.org/copyrightNotice", "Public domain"],
|
||||
[doc_id, "https://schema.org/copyrightYear", "2024"],
|
||||
[doc_id, "https://schema.org/description",
|
||||
"This document describes Mark's cats"],
|
||||
[doc_id, "https://schema.org/keywords", "animals"],
|
||||
[doc_id, "https://schema.org/keywords", "cats"],
|
||||
[doc_id, "https://schema.org/keywords", "home-life"],
|
||||
[doc_id, "https://schema.org/name", "Mark's cats"],
|
||||
[doc_id, "https://schema.org/publication", pub_id],
|
||||
[doc_id, "https://schema.org/url", "https://example.com"]
|
||||
]
|
||||
|
||||
def to_value(x):
|
||||
if x.startswith("https://"):
|
||||
return { "v": x, "e": True }
|
||||
if x.startswith("http://"):
|
||||
return { "v": x, "e": True }
|
||||
return { "v": x, "e": False }
|
||||
|
||||
# Convert the above metadata into the right form
|
||||
metadata = [
|
||||
{ "s": to_value(t[0]), "p": to_value(t[1]), "o": to_value(t[2]) }
|
||||
for t in org_facts + pub_facts + doc_facts
|
||||
]
|
||||
|
||||
input = {
|
||||
|
||||
# Document identifer. Knowledge derived by TrustGraph is linked to this
|
||||
# identifier, so the additional metadata specified above is linked to the
|
||||
# derived knowledge and users of the knowledge graph could see
|
||||
# information about the source of knowledge
|
||||
"id": doc_id,
|
||||
|
||||
# Additional metadata in the form of RDF triples
|
||||
"metadata": metadata,
|
||||
|
||||
# Text character set. Default is UTF-8
|
||||
"charset": "utf-8",
|
||||
|
||||
# The PDF document, is presented as a base64 encoded document.
|
||||
"text": base64.b64encode(text).decode("utf-8")
|
||||
|
||||
}
|
||||
|
||||
resp = requests.post(
|
||||
f"{url}load/text",
|
||||
json=input,
|
||||
)
|
||||
|
||||
resp = resp.json()
|
||||
|
||||
if "error" in resp:
|
||||
print(f"Error: {resp['error']}")
|
||||
sys.exit(1)
|
||||
|
||||
print(resp)
|
||||
|
||||
42
trustgraph-flow/trustgraph/gateway/document_load.py
Normal file
42
trustgraph-flow/trustgraph/gateway/document_load.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
|
||||
import base64
|
||||
|
||||
from .. schema import Document
|
||||
from .. schema import document_ingest_queue
|
||||
|
||||
from . sender import ServiceSender
|
||||
from . serialize import to_subgraph
|
||||
|
||||
class DocumentLoadSender(ServiceSender):
|
||||
def __init__(self, pulsar_host):
|
||||
|
||||
super(DocumentLoadSender, self).__init__(
|
||||
pulsar_host=pulsar_host,
|
||||
request_queue=document_ingest_queue,
|
||||
request_schema=Document,
|
||||
)
|
||||
|
||||
def to_request(self, body):
|
||||
|
||||
if "metadata" in data:
|
||||
metadata = to_subgraph(data["metadata"])
|
||||
else:
|
||||
metadata = []
|
||||
|
||||
# Doing a base64 decoe/encode here to make sure the
|
||||
# content is valid base64
|
||||
doc = base64.b64decode(data["data"])
|
||||
|
||||
print("Document received")
|
||||
|
||||
return Document(
|
||||
metadata=Metadata(
|
||||
id=data.get("id"),
|
||||
metadata=metadata,
|
||||
user=data.get("user", "trustgraph"),
|
||||
collection=data.get("collection", "default"),
|
||||
),
|
||||
data=base64.b64encode(doc).decode("utf-8")
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -26,4 +26,3 @@ class EmbeddingsRequestor(ServiceRequestor):
|
|||
def from_response(self, message):
|
||||
return { "vectors": message.vectors }, True
|
||||
|
||||
|
||||
|
|
|
|||
50
trustgraph-flow/trustgraph/gateway/sender.py
Normal file
50
trustgraph-flow/trustgraph/gateway/sender.py
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
|
||||
# Like ServiceRequestor, but just fire-and-forget instead of request/response
|
||||
|
||||
import asyncio
|
||||
from pulsar.schema import JsonSchema
|
||||
import uuid
|
||||
import logging
|
||||
|
||||
from . publisher import Publisher
|
||||
|
||||
logger = logging.getLogger("sender")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
class ServiceSender:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host,
|
||||
request_queue, request_schema,
|
||||
):
|
||||
|
||||
self.pub = Publisher(
|
||||
pulsar_host, request_queue,
|
||||
schema=JsonSchema(request_schema)
|
||||
)
|
||||
|
||||
async def start(self):
|
||||
|
||||
self.pub.start()
|
||||
|
||||
def to_request(self, request):
|
||||
raise RuntimeError("Not defined")
|
||||
|
||||
async def process(self, request, responder=None):
|
||||
|
||||
try:
|
||||
|
||||
await asyncio.to_thread(
|
||||
self.pub.send, None, self.to_request(request)
|
||||
)
|
||||
|
||||
if responder:
|
||||
await responder({}, True)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
logging.error(f"Exception: {e}")
|
||||
|
||||
return { "error": str(e) }
|
||||
|
||||
|
|
@ -24,9 +24,6 @@ from prometheus_client import start_http_server
|
|||
|
||||
from .. log_level import LogLevel
|
||||
|
||||
from .. schema import Metadata, Document, TextDocument
|
||||
from .. schema import document_ingest_queue, text_ingest_queue
|
||||
|
||||
from . serialize import to_subgraph
|
||||
from . running import Running
|
||||
from . publisher import Publisher
|
||||
|
|
@ -46,6 +43,8 @@ from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint
|
|||
from . triples_load import TriplesLoadEndpoint
|
||||
from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint
|
||||
from . mux import MuxEndpoint
|
||||
from . document_load import DocumentLoadSender
|
||||
from . text_load import TextLoadSender
|
||||
|
||||
from . endpoint import ServiceEndpoint
|
||||
from . auth import Authenticator
|
||||
|
|
@ -120,6 +119,12 @@ class Api:
|
|||
pulsar_host=self.pulsar_host, timeout=self.timeout,
|
||||
auth = self.auth,
|
||||
),
|
||||
"document-load": DocumentLoadSender(
|
||||
pulsar_host=self.pulsar_host,
|
||||
),
|
||||
"text-load": TextLoadSender(
|
||||
pulsar_host=self.pulsar_host,
|
||||
),
|
||||
}
|
||||
|
||||
self.endpoints = [
|
||||
|
|
@ -164,6 +169,14 @@ class Api:
|
|||
endpoint_path = "/api/v1/internet-search", auth=self.auth,
|
||||
requestor = self.services["internet-search"],
|
||||
),
|
||||
ServiceEndpoint(
|
||||
endpoint_path = "/api/v1/load/document", auth=self.auth,
|
||||
requestor = self.services["document-load"],
|
||||
),
|
||||
ServiceEndpoint(
|
||||
endpoint_path = "/api/v1/load/text", auth=self.auth,
|
||||
requestor = self.services["text-load"],
|
||||
),
|
||||
TriplesStreamEndpoint(
|
||||
pulsar_host=self.pulsar_host,
|
||||
auth = self.auth,
|
||||
|
|
@ -187,122 +200,14 @@ class Api:
|
|||
),
|
||||
]
|
||||
|
||||
self.document_out = Publisher(
|
||||
self.pulsar_host, document_ingest_queue,
|
||||
schema=JsonSchema(Document),
|
||||
chunking_enabled=True,
|
||||
)
|
||||
|
||||
self.text_out = Publisher(
|
||||
self.pulsar_host, text_ingest_queue,
|
||||
schema=JsonSchema(TextDocument),
|
||||
chunking_enabled=True,
|
||||
)
|
||||
|
||||
for ep in self.endpoints:
|
||||
ep.add_routes(self.app)
|
||||
|
||||
self.app.add_routes([
|
||||
web.post("/api/v1/load/document", self.load_document),
|
||||
web.post("/api/v1/load/text", self.load_text),
|
||||
])
|
||||
|
||||
async def load_document(self, request):
|
||||
|
||||
try:
|
||||
|
||||
data = await request.json()
|
||||
|
||||
if "metadata" in data:
|
||||
metadata = to_subgraph(data["metadata"])
|
||||
else:
|
||||
metadata = []
|
||||
|
||||
# Doing a base64 decoe/encode here to make sure the
|
||||
# content is valid base64
|
||||
doc = base64.b64decode(data["data"])
|
||||
|
||||
resp = await asyncio.to_thread(
|
||||
self.document_out.send,
|
||||
None,
|
||||
Document(
|
||||
metadata=Metadata(
|
||||
id=data.get("id"),
|
||||
metadata=metadata,
|
||||
user=data.get("user", "trustgraph"),
|
||||
collection=data.get("collection", "default"),
|
||||
),
|
||||
data=base64.b64encode(doc).decode("utf-8")
|
||||
)
|
||||
)
|
||||
|
||||
print("Document loaded.")
|
||||
|
||||
return web.json_response(
|
||||
{ }
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Exception: {e}")
|
||||
|
||||
return web.json_response(
|
||||
{ "error": str(e) }
|
||||
)
|
||||
|
||||
async def load_text(self, request):
|
||||
|
||||
try:
|
||||
|
||||
data = await request.json()
|
||||
|
||||
if "metadata" in data:
|
||||
metadata = to_subgraph(data["metadata"])
|
||||
else:
|
||||
metadata = []
|
||||
|
||||
if "charset" in data:
|
||||
charset = data["charset"]
|
||||
else:
|
||||
charset = "utf-8"
|
||||
|
||||
# Text is base64 encoded
|
||||
text = base64.b64decode(data["text"]).decode(charset)
|
||||
|
||||
resp = await asyncio.to_thread(
|
||||
self.text_out.send,
|
||||
None,
|
||||
TextDocument(
|
||||
metadata=Metadata(
|
||||
id=data.get("id"),
|
||||
metadata=metadata,
|
||||
user=data.get("user", "trustgraph"),
|
||||
collection=data.get("collection", "default"),
|
||||
),
|
||||
text=text,
|
||||
)
|
||||
)
|
||||
|
||||
print("Text document loaded.")
|
||||
|
||||
return web.json_response(
|
||||
{ }
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Exception: {e}")
|
||||
|
||||
return web.json_response(
|
||||
{ "error": str(e) }
|
||||
)
|
||||
|
||||
async def app_factory(self):
|
||||
|
||||
for ep in self.endpoints:
|
||||
await ep.start()
|
||||
|
||||
self.document_out.start()
|
||||
self.text_out.start()
|
||||
|
||||
return self.app
|
||||
|
||||
def run(self):
|
||||
|
|
|
|||
45
trustgraph-flow/trustgraph/gateway/text_load.py
Normal file
45
trustgraph-flow/trustgraph/gateway/text_load.py
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
|
||||
import base64
|
||||
|
||||
from .. schema import TextDocument, Metadata
|
||||
from .. schema import text_ingest_queue
|
||||
|
||||
from . sender import ServiceSender
|
||||
from . serialize import to_subgraph
|
||||
|
||||
class TextLoadSender(ServiceSender):
|
||||
def __init__(self, pulsar_host):
|
||||
|
||||
super(TextLoadSender, self).__init__(
|
||||
pulsar_host=pulsar_host,
|
||||
request_queue=text_ingest_queue,
|
||||
request_schema=TextDocument,
|
||||
)
|
||||
|
||||
def to_request(self, body):
|
||||
|
||||
if "metadata" in body:
|
||||
metadata = to_subgraph(body["metadata"])
|
||||
else:
|
||||
metadata = []
|
||||
|
||||
if "charset" in body:
|
||||
charset = body["charset"]
|
||||
else:
|
||||
charset = "utf-8"
|
||||
|
||||
# Text is base64 encoded
|
||||
text = base64.b64decode(body["text"]).decode(charset)
|
||||
|
||||
print("Text document received")
|
||||
|
||||
return TextDocument(
|
||||
metadata=Metadata(
|
||||
id=body.get("id"),
|
||||
metabody=metadata,
|
||||
user=body.get("user", "trustgraph"),
|
||||
collection=body.get("collection", "default"),
|
||||
),
|
||||
text=text,
|
||||
)
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue