diff --git a/trustgraph-flow/trustgraph/gateway/metrics.py b/trustgraph-flow/trustgraph/gateway/metrics.py new file mode 100644 index 00000000..33c1fe3a --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/metrics.py @@ -0,0 +1,73 @@ + +# +# This provides a Prometheus endpoint on the api-gateway. It proxies +# HTTP GET requests to Prometheus. +# + +import aiohttp +from aiohttp import web +import asyncio +from pulsar.schema import JsonSchema +import uuid +import logging + +logger = logging.getLogger("endpoint") +logger.setLevel(logging.INFO) + +class MetricsEndpoint: + + def __init__(self, prometheus_url, endpoint_path, auth): + + self.prometheus_url = prometheus_url + self.path = endpoint_path + self.auth = auth + self.operation = "service" + + async def start(self): + pass + + def add_routes(self, app): + + app.add_routes([ + web.get(self.path + "/{path:.*}", self.handle), + ]) + + async def handle(self, request): + + print(request.path, "...") + + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() + + try: + + path = request.match_info["path"] + + async with aiohttp.ClientSession() as session: + + url = ( + self.prometheus_url + "/api/v1/" + path + "?" + + request.query_string + ) + + async with session.get(url) as resp: + return web.Response( + status=resp.status, + text=await resp.text() + ) + + except Exception as e: + + logging.error(f"Exception: {e}") + + raise web.HTTPInternalServerError() + diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index d6306ac6..b329660f 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -46,6 +46,7 @@ from . graph_embeddings_load import GraphEmbeddingsLoadEndpoint from . mux import MuxEndpoint from . document_load import DocumentLoadSender from . text_load import TextLoadSender +from . metrics import MetricsEndpoint from . endpoint import ServiceEndpoint from . auth import Authenticator @@ -54,6 +55,7 @@ logger = logging.getLogger("api") logger.setLevel(logging.INFO) default_pulsar_host = os.getenv("PULSAR_HOST", "pulsar://pulsar:6650") +default_prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus:9090") default_timeout = 600 default_port = 8088 default_api_token = os.getenv("GATEWAY_SECRET", "") @@ -71,6 +73,13 @@ class Api: self.timeout = int(config.get("timeout", default_timeout)) self.pulsar_host = config.get("pulsar_host", default_pulsar_host) + self.prometheus_url = config.get( + "prometheus_url", default_prometheus_url, + ) + + if not self.prometheus_url.endswith("/"): + self.prometheus_url += "/" + api_token = config.get("api_token", default_api_token) # Token not set, or token equal empty string means no auth @@ -207,6 +216,11 @@ class Api: auth = self.auth, services = self.services, ), + MetricsEndpoint( + endpoint_path = "/api/v1/metrics", + prometheus_url = self.prometheus_url, + auth = self.auth, + ), ] for ep in self.endpoints: @@ -235,6 +249,12 @@ def run(): help=f'Pulsar host (default: {default_pulsar_host})', ) + parser.add_argument( + '-m', '--prometheus-url', + default=default_prometheus_url, + help=f'Prometheus URL (default: {default_prometheus_url})', + ) + parser.add_argument( '--port', type=int,