- Refactor doc load and text load to use a new ServiceSender class,

similar to ServiceRequestor, but one-way.
- This means these two services are now available over websocket with
  document-load and text-load service IDs.
This commit is contained in:
Cyber MacGeddon 2024-12-28 19:50:50 +00:00
parent 444f2d97e2
commit e4fdde541e
7 changed files with 358 additions and 112 deletions

View file

@ -0,0 +1,42 @@
import base64
from .. schema import Document
from .. schema import document_ingest_queue
from . sender import ServiceSender
from . serialize import to_subgraph
class DocumentLoadSender(ServiceSender):
def __init__(self, pulsar_host):
super(DocumentLoadSender, self).__init__(
pulsar_host=pulsar_host,
request_queue=document_ingest_queue,
request_schema=Document,
)
def to_request(self, body):
if "metadata" in data:
metadata = to_subgraph(data["metadata"])
else:
metadata = []
# Doing a base64 decoe/encode here to make sure the
# content is valid base64
doc = base64.b64decode(data["data"])
print("Document received")
return Document(
metadata=Metadata(
id=data.get("id"),
metadata=metadata,
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
data=base64.b64encode(doc).decode("utf-8")
)

View file

@ -26,4 +26,3 @@ class EmbeddingsRequestor(ServiceRequestor):
def from_response(self, message):
return { "vectors": message.vectors }, True

View file

@ -0,0 +1,49 @@
# Like ServiceRequestor, but just fire-and-forget instead of request/response
import asyncio
from pulsar.schema import JsonSchema
import uuid
import logging
from . publisher import Publisher
logger = logging.getLogger("sender")
logger.setLevel(logging.INFO)
class ServiceSender:
def __init__(
self,
pulsar_host,
request_queue, request_schema,
):
self.pub = Publisher(
pulsar_host, request_queue,
schema=JsonSchema(request_schema)
)
async def start(self):
self.pub.start()
def to_request(self, request):
raise RuntimeError("Not defined")
async def process(self, request, responder=None):
try:
await asyncio.to_thread(
self.pub.send, None, self.to_request(request)
)
return {}
except Exception as e:
logging.error(f"Exception: {e}")
return { "error": str(e) }

View file

@ -24,9 +24,6 @@ from prometheus_client import start_http_server
from .. log_level import LogLevel
from .. schema import Metadata, Document, TextDocument
from .. schema import document_ingest_queue, text_ingest_queue
from . serialize import to_subgraph
from . running import Running
from . publisher import Publisher
@ -46,6 +43,8 @@ from . graph_embeddings_stream import GraphEmbeddingsStreamEndpoint
from . triples_load import TriplesLoadEndpoint
from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint
from . mux import MuxEndpoint
from . document_load import DocumentLoadSender
from . text_load import TextLoadSender
from . endpoint import ServiceEndpoint
from . auth import Authenticator
@ -120,6 +119,12 @@ class Api:
pulsar_host=self.pulsar_host, timeout=self.timeout,
auth = self.auth,
),
"document-load": DocumentLoadSender(
pulsar_host=self.pulsar_host,
),
"text-load": TextLoadSender(
pulsar_host=self.pulsar_host,
),
}
self.endpoints = [
@ -164,6 +169,14 @@ class Api:
endpoint_path = "/api/v1/internet-search", auth=self.auth,
requestor = self.services["internet-search"],
),
ServiceEndpoint(
endpoint_path = "/api/v1/load/document", auth=self.auth,
requestor = self.services["document-load"],
),
ServiceEndpoint(
endpoint_path = "/api/v1/load/text", auth=self.auth,
requestor = self.services["text-load"],
),
TriplesStreamEndpoint(
pulsar_host=self.pulsar_host,
auth = self.auth,
@ -187,122 +200,14 @@ class Api:
),
]
self.document_out = Publisher(
self.pulsar_host, document_ingest_queue,
schema=JsonSchema(Document),
chunking_enabled=True,
)
self.text_out = Publisher(
self.pulsar_host, text_ingest_queue,
schema=JsonSchema(TextDocument),
chunking_enabled=True,
)
for ep in self.endpoints:
ep.add_routes(self.app)
self.app.add_routes([
web.post("/api/v1/load/document", self.load_document),
web.post("/api/v1/load/text", self.load_text),
])
async def load_document(self, request):
try:
data = await request.json()
if "metadata" in data:
metadata = to_subgraph(data["metadata"])
else:
metadata = []
# Doing a base64 decoe/encode here to make sure the
# content is valid base64
doc = base64.b64decode(data["data"])
resp = await asyncio.to_thread(
self.document_out.send,
None,
Document(
metadata=Metadata(
id=data.get("id"),
metadata=metadata,
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
data=base64.b64encode(doc).decode("utf-8")
)
)
print("Document loaded.")
return web.json_response(
{ }
)
except Exception as e:
logging.error(f"Exception: {e}")
return web.json_response(
{ "error": str(e) }
)
async def load_text(self, request):
try:
data = await request.json()
if "metadata" in data:
metadata = to_subgraph(data["metadata"])
else:
metadata = []
if "charset" in data:
charset = data["charset"]
else:
charset = "utf-8"
# Text is base64 encoded
text = base64.b64decode(data["text"]).decode(charset)
resp = await asyncio.to_thread(
self.text_out.send,
None,
TextDocument(
metadata=Metadata(
id=data.get("id"),
metadata=metadata,
user=data.get("user", "trustgraph"),
collection=data.get("collection", "default"),
),
text=text,
)
)
print("Text document loaded.")
return web.json_response(
{ }
)
except Exception as e:
logging.error(f"Exception: {e}")
return web.json_response(
{ "error": str(e) }
)
async def app_factory(self):
for ep in self.endpoints:
await ep.start()
self.document_out.start()
self.text_out.start()
return self.app
def run(self):

View file

@ -0,0 +1,45 @@
import base64
from .. schema import TextDocument, Metadata
from .. schema import text_ingest_queue
from . sender import ServiceSender
from . serialize import to_subgraph
class TextLoadSender(ServiceSender):
def __init__(self, pulsar_host):
super(TextLoadSender, self).__init__(
pulsar_host=pulsar_host,
request_queue=text_ingest_queue,
request_schema=TextDocument,
)
def to_request(self, body):
if "metadata" in body:
metadata = to_subgraph(body["metadata"])
else:
metadata = []
if "charset" in body:
charset = body["charset"]
else:
charset = "utf-8"
# Text is base64 encoded
text = base64.b64decode(body["text"]).decode(charset)
print("Text document received")
return TextDocument(
metadata=Metadata(
id=body.get("id"),
metabody=metadata,
user=body.get("user", "trustgraph"),
collection=body.get("collection", "default"),
),
text=text,
)