diff --git a/Makefile b/Makefile index ef2ee557..8fb5e284 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # VERSION=$(shell git describe | sed 's/^v//') -VERSION=0.3.3 +VERSION=0.4.1 all: container diff --git a/docker-compose-azure.yaml b/docker-compose-azure.yaml index f970add6..d129f384 100644 --- a/docker-compose-azure.yaml +++ b/docker-compose-azure.yaml @@ -6,6 +6,8 @@ volumes: etcd: minio-data: milvus: + prometheus-data: + grafana-storage: services: @@ -90,8 +92,34 @@ services: - "milvus:/var/lib/milvus" restart: on-failure:100 + prometheus: + image: docker.io/prom/prometheus:v2.53.1 + ports: + - "9090:9090" + volumes: + - "./prometheus:/etc/prometheus" + - "prometheus-data:/prometheus" + restart: on-failure:100 + + grafana: + image: docker.io/grafana/grafana:10.0.0 + ports: + - "3000:3000" + volumes: + - "grafana-storage:/var/lib/grafana" + - "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml" + - "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml" + - "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json" + environment: +# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin +# GF_AUTH_ANONYMOUS_ENABLED: true +# GF_ORG_ROLE: Admin + GF_ORG_NAME: trustgraph.ai +# GF_SERVER_ROOT_URL: https://example.com + restart: on-failure:100 + pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "pdf-decoder" - "-p" @@ -99,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "chunker-recursive" - "-p" @@ -107,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-vectorize" - "-p" @@ -115,15 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" + - "-m" + - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-definitions" - "-p" @@ -131,7 +161,7 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-relationships" - "-p" @@ -139,7 +169,7 @@ services: restart: on-failure:100 vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "vector-write-milvus" - "-p" @@ -149,7 +179,7 @@ services: restart: on-failure:100 graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-write-cassandra" - "-p" @@ -159,7 +189,7 @@ services: restart: on-failure:100 llm: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "llm-azure-text" - "-p" @@ -171,7 +201,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-claude.yaml b/docker-compose-claude.yaml index bbceceac..40a3809a 100644 --- a/docker-compose-claude.yaml +++ b/docker-compose-claude.yaml @@ -6,6 +6,8 @@ volumes: etcd: minio-data: milvus: + prometheus-data: + grafana-storage: services: @@ -90,8 +92,34 @@ services: - "milvus:/var/lib/milvus" restart: on-failure:100 + prometheus: + image: docker.io/prom/prometheus:v2.53.1 + ports: + - "9090:9090" + volumes: + - "./prometheus:/etc/prometheus" + - "prometheus-data:/prometheus" + restart: on-failure:100 + + grafana: + image: docker.io/grafana/grafana:10.0.0 + ports: + - "3000:3000" + volumes: + - "grafana-storage:/var/lib/grafana" + - "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml" + - "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml" + - "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json" + environment: +# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin +# GF_AUTH_ANONYMOUS_ENABLED: true +# GF_ORG_ROLE: Admin + GF_ORG_NAME: trustgraph.ai +# GF_SERVER_ROOT_URL: https://example.com + restart: on-failure:100 + pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "pdf-decoder" - "-p" @@ -99,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "chunker-recursive" - "-p" @@ -107,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-vectorize" - "-p" @@ -115,15 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" + - "-m" + - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-definitions" - "-p" @@ -131,7 +161,7 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-relationships" - "-p" @@ -139,7 +169,7 @@ services: restart: on-failure:100 vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "vector-write-milvus" - "-p" @@ -149,7 +179,7 @@ services: restart: on-failure:100 graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-write-cassandra" - "-p" @@ -159,7 +189,7 @@ services: restart: on-failure:100 llm: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "llm-claude-text" - "-p" @@ -169,7 +199,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-ollama.yaml b/docker-compose-ollama.yaml index dd618cd2..6188e0df 100644 --- a/docker-compose-ollama.yaml +++ b/docker-compose-ollama.yaml @@ -6,6 +6,8 @@ volumes: etcd: minio-data: milvus: + prometheus-data: + grafana-storage: services: @@ -90,8 +92,34 @@ services: - "milvus:/var/lib/milvus" restart: on-failure:100 + prometheus: + image: docker.io/prom/prometheus:v2.53.1 + ports: + - "9090:9090" + volumes: + - "./prometheus:/etc/prometheus" + - "prometheus-data:/prometheus" + restart: on-failure:100 + + grafana: + image: docker.io/grafana/grafana:10.0.0 + ports: + - "3000:3000" + volumes: + - "grafana-storage:/var/lib/grafana" + - "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml" + - "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml" + - "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json" + environment: +# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin +# GF_AUTH_ANONYMOUS_ENABLED: true +# GF_ORG_ROLE: Admin + GF_ORG_NAME: trustgraph.ai +# GF_SERVER_ROOT_URL: https://example.com + restart: on-failure:100 + pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "pdf-decoder" - "-p" @@ -99,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "chunker-recursive" - "-p" @@ -107,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-vectorize" - "-p" @@ -115,7 +143,7 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-hf" - "-p" @@ -125,7 +153,7 @@ services: restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-definitions" - "-p" @@ -133,7 +161,7 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-relationships" - "-p" @@ -141,7 +169,7 @@ services: restart: on-failure:100 vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "vector-write-milvus" - "-p" @@ -151,7 +179,7 @@ services: restart: on-failure:100 graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-write-cassandra" - "-p" @@ -161,7 +189,7 @@ services: restart: on-failure:100 llm: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "llm-ollama-text" - "-p" @@ -171,7 +199,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-vertexai.yaml b/docker-compose-vertexai.yaml index b3a90f7a..0043efe0 100644 --- a/docker-compose-vertexai.yaml +++ b/docker-compose-vertexai.yaml @@ -6,6 +6,8 @@ volumes: etcd: minio-data: milvus: + prometheus-data: + grafana-storage: services: @@ -90,8 +92,34 @@ services: - "milvus:/var/lib/milvus" restart: on-failure:100 + prometheus: + image: docker.io/prom/prometheus:v2.53.1 + ports: + - "9090:9090" + volumes: + - "./prometheus:/etc/prometheus" + - "prometheus-data:/prometheus" + restart: on-failure:100 + + grafana: + image: docker.io/grafana/grafana:10.0.0 + ports: + - "3000:3000" + volumes: + - "grafana-storage:/var/lib/grafana" + - "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml" + - "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml" + - "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json" + environment: +# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin +# GF_AUTH_ANONYMOUS_ENABLED: true +# GF_ORG_ROLE: Admin + GF_ORG_NAME: trustgraph.ai +# GF_SERVER_ROOT_URL: https://example.com + restart: on-failure:100 + pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "pdf-decoder" - "-p" @@ -99,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "chunker-recursive" - "-p" @@ -107,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-vectorize" - "-p" @@ -115,15 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" + - "-m" + - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-definitions" - "-p" @@ -131,7 +161,7 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "kg-extract-relationships" - "-p" @@ -139,7 +169,7 @@ services: restart: on-failure:100 vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "vector-write-milvus" - "-p" @@ -149,7 +179,7 @@ services: restart: on-failure:100 graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-write-cassandra" - "-p" @@ -159,7 +189,7 @@ services: restart: on-failure:100 llm: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "llm-vertexai-text" - "-p" @@ -173,7 +203,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.3.3 + image: docker.io/trustgraph/trustgraph-flow:0.4.1 command: - "graph-rag" - "-p" diff --git a/grafana/dashboard.json b/grafana/dashboard.json new file mode 100644 index 00000000..8ed62fe6 --- /dev/null +++ b/grafana/dashboard.json @@ -0,0 +1,298 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 4, + "options": { + "bucketOffset": 0, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "editorMode": "builder", + "expr": "avg(rate(request_latency_bucket{instance=\"llm:8000\"}[5m]))", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "LLM latency", + "type": "histogram" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "continuous-RdYlGr" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 39, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "percent" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "editorMode": "builder", + "expr": "sum by(status) (rate(processing_count_total[5m]))", + "format": "time_series", + "instant": false, + "interval": "", + "range": true, + "refId": "A" + } + ], + "title": "Error rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 11, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f6b18033-5918-4e05-a1ca-4cb30343b129" + }, + "editorMode": "builder", + "expr": "rate(request_latency_count[1m])", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "Request count", + "type": "timeseries" + } + ], + "refresh": "10s", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Overview", + "uid": "b5c8abf8-fe79-496b-b028-10bde917d1f0", + "version": 7, + "weekStart": "" +} diff --git a/grafana/dashboard.yml b/grafana/dashboard.yml new file mode 100644 index 00000000..9b9e7450 --- /dev/null +++ b/grafana/dashboard.yml @@ -0,0 +1,17 @@ + +apiVersion: 1 + +providers: + + - name: 'trustgraph.ai' + orgId: 1 + folder: 'TrustGraph' + folderUid: 'b6c5be90-d432-4df8-aeab-737c7b151228' + type: file + disableDeletion: false + updateIntervalSeconds: 30 + allowUiUpdates: true + options: + path: /var/lib/grafana/dashboards + foldersFromFilesStructure: false + diff --git a/grafana/datasource.yml b/grafana/datasource.yml new file mode 100644 index 00000000..3afdb9b7 --- /dev/null +++ b/grafana/datasource.yml @@ -0,0 +1,21 @@ +apiVersion: 1 + +prune: true + +datasources: + - name: Prometheus + type: prometheus + access: proxy + orgId: 1 + # Sets a custom UID to reference this + # data source in other parts of the configuration. + # If not specified, Grafana generates one. + uid: 'f6b18033-5918-4e05-a1ca-4cb30343b129' + + url: http://prometheus:9090 + + basicAuth: false + withCredentials: false + isDefault: true + editable: true + diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml new file mode 100644 index 00000000..5d1e4b56 --- /dev/null +++ b/prometheus/prometheus.yml @@ -0,0 +1,35 @@ +global: + + scrape_interval: 15s # By default, scrape targets every 15 seconds. + + # Attach these labels to any time series or alerts when communicating with + # external systems (federation, remote storage, Alertmanager). + external_labels: + monitor: 'trustgraph' + +# A scrape configuration containing exactly one endpoint to scrape: +# Here it's Prometheus itself. +scrape_configs: + + # The job name is added as a label `job=` to any timeseries + # scraped from this config. + + - job_name: 'trustgraph' + + # Override the global default and scrape targets from this job every + # 5 seconds. + scrape_interval: 5s + + static_configs: + - targets: + - 'pdf-decoder:8000' + - 'chunker:8000' + - 'vectorize:8000' + - 'embeddings:8000' + - 'kg-extract-definitions:8000' + - 'kg-extract-relationships:8000' + - 'vector-write:8000' + - 'graph-write:8000' + - 'llm:8000' + - 'graph-rag:8000' + diff --git a/setup.py b/setup.py index 6d109949..b1c83fc4 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import os with open("README.md", "r") as fh: long_description = fh.read() -version = "0.3.3" +version = "0.4.1" setuptools.setup( name="trustgraph", @@ -43,6 +43,7 @@ setuptools.setup( "anthropic", "google-cloud-aiplatform", "pyyaml", + "prometheus-client", ], scripts=[ "scripts/chunker-recursive", diff --git a/trustgraph/base/processor.py b/trustgraph/base/processor.py index 4f035966..ec017de2 100644 --- a/trustgraph/base/processor.py +++ b/trustgraph/base/processor.py @@ -2,8 +2,10 @@ import os import argparse import pulsar +import _pulsar import time from pulsar.schema import JsonSchema +from prometheus_client import start_http_server, Histogram, Info, Counter from .. log_level import LogLevel @@ -11,16 +13,23 @@ class BaseProcessor: default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - def __init__( - self, - pulsar_host=default_pulsar_host, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): self.client = None - if pulsar_host == None: - pulsar_host = default_pulsar_host + if not hasattr(__class__, "params_metric"): + __class__.params_metric = Info( + 'params', 'Parameters configuration' + ) + + # FIXME: Maybe outputs information it should not + __class__.params_metric.info({ + k: str(params[k]) + for k in params + }) + + pulsar_host = params.get("pulsar_host", self.default_pulsar_host) + log_level = params.get("log_level", LogLevel.INFO) self.pulsar_host = pulsar_host @@ -51,6 +60,20 @@ class BaseProcessor: help=f'Output queue (default: info)' ) + parser.add_argument( + '-M', '--metrics-enabled', + type=bool, + default=True, + help=f'Pulsar host (default: true)', + ) + + parser.add_argument( + '-P', '--metrics-port', + type=int, + default=8000, + help=f'Pulsar host (default: 8000)', + ) + def run(self): raise RuntimeError("Something should have implemented the run method") @@ -69,13 +92,26 @@ class BaseProcessor: args = parser.parse_args() args = vars(args) + if args["metrics_enabled"]: + start_http_server(args["metrics_port"]) + try: p = cls(**args) p.run() + except KeyboardInterrupt: + print("Keyboard interrupt.") + return + + except _pulsar.Interrupted: + print("Pulsar Interrupted.") + return + except Exception as e: + print(type(e)) + print("Exception:", e, flush=True) print("Will retry...", flush=True) @@ -83,23 +119,38 @@ class BaseProcessor: class Consumer(BaseProcessor): - def __init__( - self, - pulsar_host=None, - log_level=LogLevel.INFO, - input_queue="input", - subscriber="subscriber", - input_schema=None, - ): + def __init__(self, **params): - super(Consumer, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - ) + super(Consumer, self).__init__(**params) + + input_queue = params.get("input_queue") + subscriber = params.get("subscriber") + input_schema = params.get("input_schema") if input_schema == None: raise RuntimeError("input_schema must be specified") + if not hasattr(__class__, "request_metric"): + __class__.request_metric = Histogram( + 'request_latency', 'Request latency (seconds)' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + if not hasattr(__class__, "processing_metric"): + __class__.processing_metric = Counter( + 'processing_count', 'Processing count', ["status"] + ) + + __class__.pubsub_metric.info({ + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": input_schema.__name__, + }) + self.consumer = self.client.subscribe( input_queue, subscriber, schema=JsonSchema(input_schema), @@ -113,11 +164,14 @@ class Consumer(BaseProcessor): try: - self.handle(msg) + with __class__.request_metric.time(): + self.handle(msg) # Acknowledge successful processing of the message self.consumer.acknowledge(msg) + __class__.processing_metric.labels(status="success").inc() + except Exception as e: print("Exception:", e, flush=True) @@ -125,6 +179,8 @@ class Consumer(BaseProcessor): # Message failed to be processed self.consumer.negative_acknowledge(msg) + __class__.processing_metric.labels(status="error").inc() + @staticmethod def add_args(parser, default_input_queue, default_subscriber): @@ -144,21 +200,43 @@ class Consumer(BaseProcessor): class ConsumerProducer(BaseProcessor): - def __init__( - self, - pulsar_host=None, - log_level=LogLevel.INFO, - input_queue="input", - output_queue="output", - subscriber="subscriber", - input_schema=None, - output_schema=None, - ): + def __init__(self, **params): - super(ConsumerProducer, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - ) + input_queue = params.get("input_queue") + output_queue = params.get("output_queue") + subscriber = params.get("subscriber") + input_schema = params.get("input_schema") + output_schema = params.get("output_schema") + + if not hasattr(__class__, "request_metric"): + __class__.request_metric = Histogram( + 'request_latency', 'Request latency (seconds)' + ) + + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + if not hasattr(__class__, "processing_metric"): + __class__.processing_metric = Counter( + 'processing_count', 'Processing count', ["status"] + ) + + __class__.pubsub_metric.info({ + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": input_schema.__name__, + "output_schema": output_schema.__name__, + }) + + super(ConsumerProducer, self).__init__(**params) if input_schema == None: raise RuntimeError("input_schema must be specified") @@ -184,11 +262,14 @@ class ConsumerProducer(BaseProcessor): try: - resp = self.handle(msg) + with __class__.request_metric.time(): + resp = self.handle(msg) # Acknowledge successful processing of the message self.consumer.acknowledge(msg) + __class__.processing_metric.labels(status="success").inc() + except Exception as e: print("Exception:", e, flush=True) @@ -196,9 +277,11 @@ class ConsumerProducer(BaseProcessor): # Message failed to be processed self.consumer.negative_acknowledge(msg) - def send(self, msg, properties={}): + __class__.processing_metric.labels(status="error").inc() + def send(self, msg, properties={}): self.producer.send(msg, properties) + __class__.output_metric.inc() @staticmethod def add_args( @@ -228,18 +311,27 @@ class ConsumerProducer(BaseProcessor): class Producer(BaseProcessor): - def __init__( - self, - pulsar_host=None, - log_level=LogLevel.INFO, - output_queue="output", - output_schema=None, - ): + def __init__(self, **params): - super(Producer, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - ) + output_queue = params.get("output_queue") + output_schema = params.get("output_schema") + + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + __class__.pubsub_metric.info({ + "output_queue": output_queue, + "output_schema": output_schema.__name__, + }) + + super(Producer, self).__init__(**params) if output_schema == None: raise RuntimeError("output_schema must be specified") @@ -250,8 +342,8 @@ class Producer(BaseProcessor): ) def send(self, msg, properties={}): - self.producer.send(msg, properties) + __class__.output_metric.inc() @staticmethod def add_args( diff --git a/trustgraph/chunker/recursive/chunker.py b/trustgraph/chunker/recursive/chunker.py index b116eca5..5418f951 100755 --- a/trustgraph/chunker/recursive/chunker.py +++ b/trustgraph/chunker/recursive/chunker.py @@ -17,25 +17,22 @@ default_subscriber = 'chunker-recursive' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - chunk_size=2000, - chunk_overlap=100, - ): + def __init__(self, **params): + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + chunk_size = params.get("chunk_size", 2000) + chunk_overlap = params.get("chunk_overlap", 100) + super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=TextDocument, - output_schema=Chunk, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TextDocument, + "output_schema": Chunk, + } ) self.text_splitter = RecursiveCharacterTextSplitter( diff --git a/trustgraph/decoder/pdf/pdf_decoder.py b/trustgraph/decoder/pdf/pdf_decoder.py index e87f2905..1d86c366 100755 --- a/trustgraph/decoder/pdf/pdf_decoder.py +++ b/trustgraph/decoder/pdf/pdf_decoder.py @@ -18,23 +18,20 @@ default_subscriber = 'pdf-decoder' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=Document, - output_schema=TextDocument, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": Document, + "output_schema": TextDocument, + } ) print("PDF inited") diff --git a/trustgraph/embeddings/hf/hf.py b/trustgraph/embeddings/hf/hf.py index 82273310..e5a5728f 100755 --- a/trustgraph/embeddings/hf/hf.py +++ b/trustgraph/embeddings/hf/hf.py @@ -17,24 +17,21 @@ default_model="all-MiniLM-L6-v2" class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - model=default_model, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + model = params.get("model", default_model) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=EmbeddingsRequest, - output_schema=EmbeddingsResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": EmbeddingsRequest, + "output_schema": EmbeddingsResponse, + } ) self.embeddings = HuggingFaceEmbeddings(model_name=model) diff --git a/trustgraph/embeddings/ollama/processor.py b/trustgraph/embeddings/ollama/processor.py index 5c36f86a..38c177b1 100755 --- a/trustgraph/embeddings/ollama/processor.py +++ b/trustgraph/embeddings/ollama/processor.py @@ -17,25 +17,20 @@ default_ollama = 'http://localhost:11434' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - model=default_model, - ollama=default_ollama, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=EmbeddingsRequest, - output_schema=EmbeddingsResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": EmbeddingsRequest, + "output_schema": EmbeddingsResponse, + } ) self.embeddings = OllamaEmbeddings(base_url=ollama, model=model) diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py index e779c6ed..2a92827e 100755 --- a/trustgraph/embeddings/vectorize/vectorize.py +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -15,26 +15,23 @@ default_subscriber = 'embeddings-vectorizer' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=Chunk, - output_schema=VectorsChunk, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": Chunk, + "output_schema": VectorsChunk, + } ) - self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) + self.embeddings = EmbeddingsClient(pulsar_host=self.pulsar_host) def emit(self, source, chunk, vectors): diff --git a/trustgraph/graph/cassandra_write/write.py b/trustgraph/graph/cassandra_write/write.py index df317c2a..fe5864a0 100755 --- a/trustgraph/graph/cassandra_write/write.py +++ b/trustgraph/graph/cassandra_write/write.py @@ -20,27 +20,22 @@ default_graph_host='localhost' class Processor(Consumer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - subscriber=default_subscriber, - graph_host=default_graph_host, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_host = params.get("graph_host", default_graph_host) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - subscriber=subscriber, - input_schema=Triple, + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": Triple, + } ) self.tg = TrustGraph([graph_host]) - self.count = 0 - def handle(self, msg): v = msg.value() @@ -51,11 +46,6 @@ class Processor(Consumer): v.o.value ) - self.count += 1 - - if (self.count % 1000) == 0: - print(self.count, "...", flush=True) - @staticmethod def add_args(parser): diff --git a/trustgraph/kg/extract_definitions/extract.py b/trustgraph/kg/extract_definitions/extract.py index 42cbacc9..9d08470d 100755 --- a/trustgraph/kg/extract_definitions/extract.py +++ b/trustgraph/kg/extract_definitions/extract.py @@ -22,23 +22,20 @@ default_subscriber = 'kg-extract-definitions' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=VectorsChunk, - output_schema=Triple, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": VectorsChunk, + "output_schema": Triple, + } ) self.llm = LlmClient(pulsar_host=pulsar_host) diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py index adc0b71a..bf3b94c7 100755 --- a/trustgraph/kg/extract_relationships/extract.py +++ b/trustgraph/kg/extract_relationships/extract.py @@ -7,6 +7,7 @@ graph edges. import urllib.parse import json +import os from pulsar.schema import JsonSchema from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value @@ -25,24 +26,21 @@ default_vector_queue='vectors-load' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - vector_queue=default_vector_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + vector_queue = params.get("vector_queue", default_vector_queue) + subscriber = params.get("subscriber", default_subscriber) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=VectorsChunk, - output_schema=Triple, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": VectorsChunk, + "output_schema": Triple, + } ) self.vec_prod = self.client.create_producer( @@ -50,7 +48,17 @@ class Processor(ConsumerProducer): schema=JsonSchema(VectorsAssociation), ) - self.llm = LlmClient(pulsar_host=pulsar_host) + __class__.pubsub_metric.info({ + "input_queue": input_queue, + "output_queue": output_queue, + "vector_queue": vector_queue, + "subscriber": subscriber, + "input_schema": VectorsChunk.__name__, + "output_schema": Triple.__name__, + "vector_schema": VectorsAssociation.__name__, + }) + + self.llm = LlmClient(pulsar_host=self.pulsar_host) def to_uri(self, text): diff --git a/trustgraph/llm/azure_text/llm.py b/trustgraph/llm/azure_text/llm.py index 12a7931e..bccbf04f 100755 --- a/trustgraph/llm/azure_text/llm.py +++ b/trustgraph/llm/azure_text/llm.py @@ -17,25 +17,22 @@ default_subscriber = 'llm-azure-text' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - endpoint=None, - token=None, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + endpoint = params.get("endpoint") + token = params.get("token") super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=TextCompletionRequest, - output_schema=TextCompletionResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TextCompletionRequest, + "output_schema": TextCompletionResponse, + } ) self.endpoint = endpoint diff --git a/trustgraph/llm/claude_text/llm.py b/trustgraph/llm/claude_text/llm.py index af516b71..230d540c 100755 --- a/trustgraph/llm/claude_text/llm.py +++ b/trustgraph/llm/claude_text/llm.py @@ -15,27 +15,25 @@ default_output_queue = 'llm-complete-text-response' default_subscriber = 'llm-claude-text' default_model = 'claude-3-5-sonnet-20240620' -class Processor: +class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - model=default_model, - api_key="", - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + model = params.get("model", default_model) + api_key = params.get("api_key") super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=TextCompletionRequest, - output_schema=TextCompletionResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TextCompletionRequest, + "output_schema": TextCompletionResponse, + "model": model, + } ) self.model = model diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py index b522e225..3d256137 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/llm/ollama_text/llm.py @@ -5,6 +5,7 @@ Input is prompt, output is response. """ from langchain_community.llms import Ollama +from prometheus_client import Histogram, Info, Counter from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel @@ -18,27 +19,36 @@ default_ollama = 'http://localhost:11434' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - model=default_model, - ollama=default_ollama, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + model = params.get("model", default_model) + ollama = params.get("ollama", default_ollama) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=TextCompletionRequest, - output_schema=TextCompletionResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "model": model, + "ollama": ollama, + "input_schema": TextCompletionRequest, + "output_schema": TextCompletionResponse, + } ) + if not hasattr(__class__, "model_metric"): + __class__.model_metric = Info( + 'model', 'Model information' + ) + + __class__.model_metric.info({ + "model": model, + "ollama": ollama, + }) + self.llm = Ollama(base_url=ollama, model=model) def handle(self, msg): diff --git a/trustgraph/llm/vertexai_text/llm.py b/trustgraph/llm/vertexai_text/llm.py index c6c6c81d..30896fb5 100755 --- a/trustgraph/llm/vertexai_text/llm.py +++ b/trustgraph/llm/vertexai_text/llm.py @@ -31,26 +31,23 @@ default_subscriber = 'llm-vertexai-text' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - region="us-west1", - model="gemini-1.0-pro-001", - private_key=None, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + region = params.get("region", "us-west1") + model = params.get("model", "gemini-1.0-pro-001") + private_key = params.get("private_key") super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=TextCompletionRequest, - output_schema=TextCompletionResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TextCompletionRequest, + "output_schema": TextCompletionResponse, + } ) self.parameters = { diff --git a/trustgraph/rag/graph/rag.py b/trustgraph/rag/graph/rag.py index 3ea322c1..3c1d2b8a 100755 --- a/trustgraph/rag/graph/rag.py +++ b/trustgraph/rag/graph/rag.py @@ -17,32 +17,32 @@ default_vector_store = 'http://localhost:19530' class Processor(ConsumerProducer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - output_queue=default_output_queue, - subscriber=default_subscriber, - log_level=LogLevel.INFO, - graph_hosts=default_graph_hosts, - vector_store=default_vector_store, - entity_limit=50, - triple_limit=30, - max_subgraph_size=3000, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_hosts = params.get("graph_hosts", default_graph_hosts) + vector_store = params.get("vector_store", default_vector_store) + entity_limit = params.get("entity_limit", 50) + triple_limit = params.get("triple_limit", 30) + max_subgraph_size = params.get("max_subgraph_size", 3000) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - output_queue=output_queue, - subscriber=subscriber, - input_schema=GraphRagQuery, - output_schema=GraphRagResponse, + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": GraphRagQuery, + "output_schema": GraphRagResponse, + "entity_limit": entity_limit, + "triple_limit": triple_limit, + "max_subgraph_size": max_subgraph_size, + } ) self.rag = GraphRag( - pulsar_host=pulsar_host, + pulsar_host=self.pulsar_host, graph_hosts=graph_hosts.split(","), vector_store=vector_store, verbose=True, diff --git a/trustgraph/vector/milvus_write/write.py b/trustgraph/vector/milvus_write/write.py index cf2a5f76..8ad56055 100755 --- a/trustgraph/vector/milvus_write/write.py +++ b/trustgraph/vector/milvus_write/write.py @@ -14,21 +14,19 @@ default_store_uri = 'http://localhost:19530' class Processor(Consumer): - def __init__( - self, - pulsar_host=None, - input_queue=default_input_queue, - subscriber=default_subscriber, - store_uri=default_store_uri, - log_level=LogLevel.INFO, - ): + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + store_uri = params.get("store_uri", default_store_uri) super(Processor, self).__init__( - pulsar_host=pulsar_host, - log_level=log_level, - input_queue=input_queue, - subscriber=subscriber, - input_schema=VectorsAssociation, + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": VectorsAssociation, + "store_uri": store_uri, + } ) self.vecstore = TripleVectors(store_uri) @@ -40,6 +38,7 @@ class Processor(Consumer): if v.entity.value != "": for vec in v.vectors: self.vecstore.insert(vec, v.entity.value) + @staticmethod def add_args(parser):