trustgraph/trustgraph-base/trustgraph/clients/triples_query_client.py
cybermaggedon 617eb7efd5
Feature/pulsar api key support (#308)
* Add pulsar API token check

* Added missing api_key references

---------

Co-authored-by: Tyler O <4535788+toliver38@users.noreply.github.com>
2025-02-15 11:22:48 +00:00

68 lines
1.8 KiB
Python

#!/usr/bin/env python3
import _pulsar
from .. schema import TriplesQueryRequest, TriplesQueryResponse, Value
from .. schema import triples_request_queue
from .. schema import triples_response_queue
from . base import BaseClient
# Ugly
ERROR=_pulsar.LoggerLevel.Error
WARN=_pulsar.LoggerLevel.Warn
INFO=_pulsar.LoggerLevel.Info
DEBUG=_pulsar.LoggerLevel.Debug
class TriplesQueryClient(BaseClient):
def __init__(
self, log_level=ERROR,
subscriber=None,
input_queue=None,
output_queue=None,
pulsar_host="pulsar://pulsar:6650",
pulsar_api_key=None,
):
if input_queue == None:
input_queue = triples_request_queue
if output_queue == None:
output_queue = triples_response_queue
super(TriplesQueryClient, self).__init__(
log_level=log_level,
subscriber=subscriber,
input_queue=input_queue,
output_queue=output_queue,
pulsar_api_key=pulsar_api_key,
pulsar_host=pulsar_host,
input_schema=TriplesQueryRequest,
output_schema=TriplesQueryResponse,
)
def create_value(self, ent):
if ent == None: return None
if ent.startswith("http://") or ent.startswith("https://"):
return Value(value=ent, is_uri=True)
return Value(value=ent, is_uri=False)
def request(
self,
s, p, o,
user="trustgraph", collection="default",
limit=10, timeout=120,
):
return self.call(
s=self.create_value(s),
p=self.create_value(p),
o=self.create_value(o),
user=user,
collection=collection,
limit=limit,
timeout=timeout,
).triples