* Basic metrics working
* Add consumer & producer metrics
* Grafana & Prometheus in docker compose
This commit is contained in:
cybermaggedon 2024-07-18 17:20:42 +01:00 committed by GitHub
parent 33b646eaec
commit 9ab7613e07
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 888 additions and 327 deletions

View file

@ -1,6 +1,6 @@
# VERSION=$(shell git describe | sed 's/^v//') # VERSION=$(shell git describe | sed 's/^v//')
VERSION=0.3.3 VERSION=0.4.1
all: container all: container

View file

@ -6,6 +6,8 @@ volumes:
etcd: etcd:
minio-data: minio-data:
milvus: milvus:
prometheus-data:
grafana-storage:
services: services:
@ -90,8 +92,34 @@ services:
- "milvus:/var/lib/milvus" - "milvus:/var/lib/milvus"
restart: on-failure:100 restart: on-failure:100
prometheus:
image: docker.io/prom/prometheus:v2.53.1
ports:
- "9090:9090"
volumes:
- "./prometheus:/etc/prometheus"
- "prometheus-data:/prometheus"
restart: on-failure:100
grafana:
image: docker.io/grafana/grafana:10.0.0
ports:
- "3000:3000"
volumes:
- "grafana-storage:/var/lib/grafana"
- "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml"
- "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml"
- "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json"
environment:
# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin
# GF_AUTH_ANONYMOUS_ENABLED: true
# GF_ORG_ROLE: Admin
GF_ORG_NAME: trustgraph.ai
# GF_SERVER_ROOT_URL: https://example.com
restart: on-failure:100
pdf-decoder: pdf-decoder:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "pdf-decoder" - "pdf-decoder"
- "-p" - "-p"
@ -99,7 +127,7 @@ services:
restart: on-failure:100 restart: on-failure:100
chunker: chunker:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "chunker-recursive" - "chunker-recursive"
- "-p" - "-p"
@ -107,7 +135,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vectorize: vectorize:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-vectorize" - "embeddings-vectorize"
- "-p" - "-p"
@ -115,15 +143,17 @@ services:
restart: on-failure:100 restart: on-failure:100
embeddings: embeddings:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-hf" - "embeddings-hf"
- "-p" - "-p"
- "pulsar://pulsar:6650" - "pulsar://pulsar:6650"
- "-m"
- "mixedbread-ai/mxbai-embed-large-v1"
restart: on-failure:100 restart: on-failure:100
kg-extract-definitions: kg-extract-definitions:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-definitions" - "kg-extract-definitions"
- "-p" - "-p"
@ -131,7 +161,7 @@ services:
restart: on-failure:100 restart: on-failure:100
kg-extract-relationships: kg-extract-relationships:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-relationships" - "kg-extract-relationships"
- "-p" - "-p"
@ -139,7 +169,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vector-write: vector-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "vector-write-milvus" - "vector-write-milvus"
- "-p" - "-p"
@ -149,7 +179,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-write: graph-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-write-cassandra" - "graph-write-cassandra"
- "-p" - "-p"
@ -159,7 +189,7 @@ services:
restart: on-failure:100 restart: on-failure:100
llm: llm:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "llm-azure-text" - "llm-azure-text"
- "-p" - "-p"
@ -171,7 +201,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-rag: graph-rag:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-rag" - "graph-rag"
- "-p" - "-p"

View file

@ -6,6 +6,8 @@ volumes:
etcd: etcd:
minio-data: minio-data:
milvus: milvus:
prometheus-data:
grafana-storage:
services: services:
@ -90,8 +92,34 @@ services:
- "milvus:/var/lib/milvus" - "milvus:/var/lib/milvus"
restart: on-failure:100 restart: on-failure:100
prometheus:
image: docker.io/prom/prometheus:v2.53.1
ports:
- "9090:9090"
volumes:
- "./prometheus:/etc/prometheus"
- "prometheus-data:/prometheus"
restart: on-failure:100
grafana:
image: docker.io/grafana/grafana:10.0.0
ports:
- "3000:3000"
volumes:
- "grafana-storage:/var/lib/grafana"
- "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml"
- "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml"
- "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json"
environment:
# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin
# GF_AUTH_ANONYMOUS_ENABLED: true
# GF_ORG_ROLE: Admin
GF_ORG_NAME: trustgraph.ai
# GF_SERVER_ROOT_URL: https://example.com
restart: on-failure:100
pdf-decoder: pdf-decoder:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "pdf-decoder" - "pdf-decoder"
- "-p" - "-p"
@ -99,7 +127,7 @@ services:
restart: on-failure:100 restart: on-failure:100
chunker: chunker:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "chunker-recursive" - "chunker-recursive"
- "-p" - "-p"
@ -107,7 +135,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vectorize: vectorize:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-vectorize" - "embeddings-vectorize"
- "-p" - "-p"
@ -115,15 +143,17 @@ services:
restart: on-failure:100 restart: on-failure:100
embeddings: embeddings:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-hf" - "embeddings-hf"
- "-p" - "-p"
- "pulsar://pulsar:6650" - "pulsar://pulsar:6650"
- "-m"
- "mixedbread-ai/mxbai-embed-large-v1"
restart: on-failure:100 restart: on-failure:100
kg-extract-definitions: kg-extract-definitions:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-definitions" - "kg-extract-definitions"
- "-p" - "-p"
@ -131,7 +161,7 @@ services:
restart: on-failure:100 restart: on-failure:100
kg-extract-relationships: kg-extract-relationships:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-relationships" - "kg-extract-relationships"
- "-p" - "-p"
@ -139,7 +169,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vector-write: vector-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "vector-write-milvus" - "vector-write-milvus"
- "-p" - "-p"
@ -149,7 +179,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-write: graph-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-write-cassandra" - "graph-write-cassandra"
- "-p" - "-p"
@ -159,7 +189,7 @@ services:
restart: on-failure:100 restart: on-failure:100
llm: llm:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "llm-claude-text" - "llm-claude-text"
- "-p" - "-p"
@ -169,7 +199,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-rag: graph-rag:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-rag" - "graph-rag"
- "-p" - "-p"

View file

@ -6,6 +6,8 @@ volumes:
etcd: etcd:
minio-data: minio-data:
milvus: milvus:
prometheus-data:
grafana-storage:
services: services:
@ -90,8 +92,34 @@ services:
- "milvus:/var/lib/milvus" - "milvus:/var/lib/milvus"
restart: on-failure:100 restart: on-failure:100
prometheus:
image: docker.io/prom/prometheus:v2.53.1
ports:
- "9090:9090"
volumes:
- "./prometheus:/etc/prometheus"
- "prometheus-data:/prometheus"
restart: on-failure:100
grafana:
image: docker.io/grafana/grafana:10.0.0
ports:
- "3000:3000"
volumes:
- "grafana-storage:/var/lib/grafana"
- "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml"
- "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml"
- "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json"
environment:
# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin
# GF_AUTH_ANONYMOUS_ENABLED: true
# GF_ORG_ROLE: Admin
GF_ORG_NAME: trustgraph.ai
# GF_SERVER_ROOT_URL: https://example.com
restart: on-failure:100
pdf-decoder: pdf-decoder:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "pdf-decoder" - "pdf-decoder"
- "-p" - "-p"
@ -99,7 +127,7 @@ services:
restart: on-failure:100 restart: on-failure:100
chunker: chunker:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "chunker-recursive" - "chunker-recursive"
- "-p" - "-p"
@ -107,7 +135,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vectorize: vectorize:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-vectorize" - "embeddings-vectorize"
- "-p" - "-p"
@ -115,7 +143,7 @@ services:
restart: on-failure:100 restart: on-failure:100
embeddings: embeddings:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-hf" - "embeddings-hf"
- "-p" - "-p"
@ -125,7 +153,7 @@ services:
restart: on-failure:100 restart: on-failure:100
kg-extract-definitions: kg-extract-definitions:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-definitions" - "kg-extract-definitions"
- "-p" - "-p"
@ -133,7 +161,7 @@ services:
restart: on-failure:100 restart: on-failure:100
kg-extract-relationships: kg-extract-relationships:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-relationships" - "kg-extract-relationships"
- "-p" - "-p"
@ -141,7 +169,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vector-write: vector-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "vector-write-milvus" - "vector-write-milvus"
- "-p" - "-p"
@ -151,7 +179,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-write: graph-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-write-cassandra" - "graph-write-cassandra"
- "-p" - "-p"
@ -161,7 +189,7 @@ services:
restart: on-failure:100 restart: on-failure:100
llm: llm:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "llm-ollama-text" - "llm-ollama-text"
- "-p" - "-p"
@ -171,7 +199,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-rag: graph-rag:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-rag" - "graph-rag"
- "-p" - "-p"

View file

@ -6,6 +6,8 @@ volumes:
etcd: etcd:
minio-data: minio-data:
milvus: milvus:
prometheus-data:
grafana-storage:
services: services:
@ -90,8 +92,34 @@ services:
- "milvus:/var/lib/milvus" - "milvus:/var/lib/milvus"
restart: on-failure:100 restart: on-failure:100
prometheus:
image: docker.io/prom/prometheus:v2.53.1
ports:
- "9090:9090"
volumes:
- "./prometheus:/etc/prometheus"
- "prometheus-data:/prometheus"
restart: on-failure:100
grafana:
image: docker.io/grafana/grafana:10.0.0
ports:
- "3000:3000"
volumes:
- "grafana-storage:/var/lib/grafana"
- "./grafana/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml"
- "./grafana/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml"
- "./grafana/dashboard.json:/var/lib/grafana/dashboards/dashboard.json"
environment:
# GF_AUTH_ANONYMOUS_ORG_ROLE: Admin
# GF_AUTH_ANONYMOUS_ENABLED: true
# GF_ORG_ROLE: Admin
GF_ORG_NAME: trustgraph.ai
# GF_SERVER_ROOT_URL: https://example.com
restart: on-failure:100
pdf-decoder: pdf-decoder:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "pdf-decoder" - "pdf-decoder"
- "-p" - "-p"
@ -99,7 +127,7 @@ services:
restart: on-failure:100 restart: on-failure:100
chunker: chunker:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "chunker-recursive" - "chunker-recursive"
- "-p" - "-p"
@ -107,7 +135,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vectorize: vectorize:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-vectorize" - "embeddings-vectorize"
- "-p" - "-p"
@ -115,15 +143,17 @@ services:
restart: on-failure:100 restart: on-failure:100
embeddings: embeddings:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "embeddings-hf" - "embeddings-hf"
- "-p" - "-p"
- "pulsar://pulsar:6650" - "pulsar://pulsar:6650"
- "-m"
- "mixedbread-ai/mxbai-embed-large-v1"
restart: on-failure:100 restart: on-failure:100
kg-extract-definitions: kg-extract-definitions:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-definitions" - "kg-extract-definitions"
- "-p" - "-p"
@ -131,7 +161,7 @@ services:
restart: on-failure:100 restart: on-failure:100
kg-extract-relationships: kg-extract-relationships:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "kg-extract-relationships" - "kg-extract-relationships"
- "-p" - "-p"
@ -139,7 +169,7 @@ services:
restart: on-failure:100 restart: on-failure:100
vector-write: vector-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "vector-write-milvus" - "vector-write-milvus"
- "-p" - "-p"
@ -149,7 +179,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-write: graph-write:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-write-cassandra" - "graph-write-cassandra"
- "-p" - "-p"
@ -159,7 +189,7 @@ services:
restart: on-failure:100 restart: on-failure:100
llm: llm:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "llm-vertexai-text" - "llm-vertexai-text"
- "-p" - "-p"
@ -173,7 +203,7 @@ services:
restart: on-failure:100 restart: on-failure:100
graph-rag: graph-rag:
image: docker.io/trustgraph/trustgraph-flow:0.3.3 image: docker.io/trustgraph/trustgraph-flow:0.4.1
command: command:
- "graph-rag" - "graph-rag"
- "-p" - "-p"

298
grafana/dashboard.json Normal file
View file

@ -0,0 +1,298 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 1,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineWidth": 1
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 4,
"options": {
"bucketOffset": 0,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"editorMode": "builder",
"expr": "avg(rate(request_latency_bucket{instance=\"llm:8000\"}[5m]))",
"instant": false,
"range": true,
"refId": "A"
}
],
"title": "LLM latency",
"type": "histogram"
},
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "continuous-RdYlGr"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 39,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "percent"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"editorMode": "builder",
"expr": "sum by(status) (rate(processing_count_total[5m]))",
"format": "time_series",
"instant": false,
"interval": "",
"range": true,
"refId": "A"
}
],
"title": "Error rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 11,
"w": 12,
"x": 0,
"y": 8
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "f6b18033-5918-4e05-a1ca-4cb30343b129"
},
"editorMode": "builder",
"expr": "rate(request_latency_count[1m])",
"instant": false,
"range": true,
"refId": "A"
}
],
"title": "Request count",
"type": "timeseries"
}
],
"refresh": "10s",
"schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Overview",
"uid": "b5c8abf8-fe79-496b-b028-10bde917d1f0",
"version": 7,
"weekStart": ""
}

17
grafana/dashboard.yml Normal file
View file

@ -0,0 +1,17 @@
apiVersion: 1
providers:
- name: 'trustgraph.ai'
orgId: 1
folder: 'TrustGraph'
folderUid: 'b6c5be90-d432-4df8-aeab-737c7b151228'
type: file
disableDeletion: false
updateIntervalSeconds: 30
allowUiUpdates: true
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: false

21
grafana/datasource.yml Normal file
View file

@ -0,0 +1,21 @@
apiVersion: 1
prune: true
datasources:
- name: Prometheus
type: prometheus
access: proxy
orgId: 1
# <string> Sets a custom UID to reference this
# data source in other parts of the configuration.
# If not specified, Grafana generates one.
uid: 'f6b18033-5918-4e05-a1ca-4cb30343b129'
url: http://prometheus:9090
basicAuth: false
withCredentials: false
isDefault: true
editable: true

35
prometheus/prometheus.yml Normal file
View file

@ -0,0 +1,35 @@
global:
scrape_interval: 15s # By default, scrape targets every 15 seconds.
# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'trustgraph'
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries
# scraped from this config.
- job_name: 'trustgraph'
# Override the global default and scrape targets from this job every
# 5 seconds.
scrape_interval: 5s
static_configs:
- targets:
- 'pdf-decoder:8000'
- 'chunker:8000'
- 'vectorize:8000'
- 'embeddings:8000'
- 'kg-extract-definitions:8000'
- 'kg-extract-relationships:8000'
- 'vector-write:8000'
- 'graph-write:8000'
- 'llm:8000'
- 'graph-rag:8000'

View file

@ -4,7 +4,7 @@ import os
with open("README.md", "r") as fh: with open("README.md", "r") as fh:
long_description = fh.read() long_description = fh.read()
version = "0.3.3" version = "0.4.1"
setuptools.setup( setuptools.setup(
name="trustgraph", name="trustgraph",
@ -43,6 +43,7 @@ setuptools.setup(
"anthropic", "anthropic",
"google-cloud-aiplatform", "google-cloud-aiplatform",
"pyyaml", "pyyaml",
"prometheus-client",
], ],
scripts=[ scripts=[
"scripts/chunker-recursive", "scripts/chunker-recursive",

View file

@ -2,8 +2,10 @@
import os import os
import argparse import argparse
import pulsar import pulsar
import _pulsar
import time import time
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
from prometheus_client import start_http_server, Histogram, Info, Counter
from .. log_level import LogLevel from .. log_level import LogLevel
@ -11,16 +13,23 @@ class BaseProcessor:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
def __init__( def __init__(self, **params):
self,
pulsar_host=default_pulsar_host,
log_level=LogLevel.INFO,
):
self.client = None self.client = None
if pulsar_host == None: if not hasattr(__class__, "params_metric"):
pulsar_host = default_pulsar_host __class__.params_metric = Info(
'params', 'Parameters configuration'
)
# FIXME: Maybe outputs information it should not
__class__.params_metric.info({
k: str(params[k])
for k in params
})
pulsar_host = params.get("pulsar_host", self.default_pulsar_host)
log_level = params.get("log_level", LogLevel.INFO)
self.pulsar_host = pulsar_host self.pulsar_host = pulsar_host
@ -51,6 +60,20 @@ class BaseProcessor:
help=f'Output queue (default: info)' help=f'Output queue (default: info)'
) )
parser.add_argument(
'-M', '--metrics-enabled',
type=bool,
default=True,
help=f'Pulsar host (default: true)',
)
parser.add_argument(
'-P', '--metrics-port',
type=int,
default=8000,
help=f'Pulsar host (default: 8000)',
)
def run(self): def run(self):
raise RuntimeError("Something should have implemented the run method") raise RuntimeError("Something should have implemented the run method")
@ -69,13 +92,26 @@ class BaseProcessor:
args = parser.parse_args() args = parser.parse_args()
args = vars(args) args = vars(args)
if args["metrics_enabled"]:
start_http_server(args["metrics_port"])
try: try:
p = cls(**args) p = cls(**args)
p.run() p.run()
except KeyboardInterrupt:
print("Keyboard interrupt.")
return
except _pulsar.Interrupted:
print("Pulsar Interrupted.")
return
except Exception as e: except Exception as e:
print(type(e))
print("Exception:", e, flush=True) print("Exception:", e, flush=True)
print("Will retry...", flush=True) print("Will retry...", flush=True)
@ -83,23 +119,38 @@ class BaseProcessor:
class Consumer(BaseProcessor): class Consumer(BaseProcessor):
def __init__( def __init__(self, **params):
self,
pulsar_host=None,
log_level=LogLevel.INFO,
input_queue="input",
subscriber="subscriber",
input_schema=None,
):
super(Consumer, self).__init__( super(Consumer, self).__init__(**params)
pulsar_host=pulsar_host,
log_level=log_level, input_queue = params.get("input_queue")
) subscriber = params.get("subscriber")
input_schema = params.get("input_schema")
if input_schema == None: if input_schema == None:
raise RuntimeError("input_schema must be specified") raise RuntimeError("input_schema must be specified")
if not hasattr(__class__, "request_metric"):
__class__.request_metric = Histogram(
'request_latency', 'Request latency (seconds)'
)
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
if not hasattr(__class__, "processing_metric"):
__class__.processing_metric = Counter(
'processing_count', 'Processing count', ["status"]
)
__class__.pubsub_metric.info({
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": input_schema.__name__,
})
self.consumer = self.client.subscribe( self.consumer = self.client.subscribe(
input_queue, subscriber, input_queue, subscriber,
schema=JsonSchema(input_schema), schema=JsonSchema(input_schema),
@ -113,11 +164,14 @@ class Consumer(BaseProcessor):
try: try:
self.handle(msg) with __class__.request_metric.time():
self.handle(msg)
# Acknowledge successful processing of the message # Acknowledge successful processing of the message
self.consumer.acknowledge(msg) self.consumer.acknowledge(msg)
__class__.processing_metric.labels(status="success").inc()
except Exception as e: except Exception as e:
print("Exception:", e, flush=True) print("Exception:", e, flush=True)
@ -125,6 +179,8 @@ class Consumer(BaseProcessor):
# Message failed to be processed # Message failed to be processed
self.consumer.negative_acknowledge(msg) self.consumer.negative_acknowledge(msg)
__class__.processing_metric.labels(status="error").inc()
@staticmethod @staticmethod
def add_args(parser, default_input_queue, default_subscriber): def add_args(parser, default_input_queue, default_subscriber):
@ -144,21 +200,43 @@ class Consumer(BaseProcessor):
class ConsumerProducer(BaseProcessor): class ConsumerProducer(BaseProcessor):
def __init__( def __init__(self, **params):
self,
pulsar_host=None,
log_level=LogLevel.INFO,
input_queue="input",
output_queue="output",
subscriber="subscriber",
input_schema=None,
output_schema=None,
):
super(ConsumerProducer, self).__init__( input_queue = params.get("input_queue")
pulsar_host=pulsar_host, output_queue = params.get("output_queue")
log_level=log_level, subscriber = params.get("subscriber")
) input_schema = params.get("input_schema")
output_schema = params.get("output_schema")
if not hasattr(__class__, "request_metric"):
__class__.request_metric = Histogram(
'request_latency', 'Request latency (seconds)'
)
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created'
)
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
if not hasattr(__class__, "processing_metric"):
__class__.processing_metric = Counter(
'processing_count', 'Processing count', ["status"]
)
__class__.pubsub_metric.info({
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": input_schema.__name__,
"output_schema": output_schema.__name__,
})
super(ConsumerProducer, self).__init__(**params)
if input_schema == None: if input_schema == None:
raise RuntimeError("input_schema must be specified") raise RuntimeError("input_schema must be specified")
@ -184,11 +262,14 @@ class ConsumerProducer(BaseProcessor):
try: try:
resp = self.handle(msg) with __class__.request_metric.time():
resp = self.handle(msg)
# Acknowledge successful processing of the message # Acknowledge successful processing of the message
self.consumer.acknowledge(msg) self.consumer.acknowledge(msg)
__class__.processing_metric.labels(status="success").inc()
except Exception as e: except Exception as e:
print("Exception:", e, flush=True) print("Exception:", e, flush=True)
@ -196,9 +277,11 @@ class ConsumerProducer(BaseProcessor):
# Message failed to be processed # Message failed to be processed
self.consumer.negative_acknowledge(msg) self.consumer.negative_acknowledge(msg)
def send(self, msg, properties={}): __class__.processing_metric.labels(status="error").inc()
def send(self, msg, properties={}):
self.producer.send(msg, properties) self.producer.send(msg, properties)
__class__.output_metric.inc()
@staticmethod @staticmethod
def add_args( def add_args(
@ -228,18 +311,27 @@ class ConsumerProducer(BaseProcessor):
class Producer(BaseProcessor): class Producer(BaseProcessor):
def __init__( def __init__(self, **params):
self,
pulsar_host=None,
log_level=LogLevel.INFO,
output_queue="output",
output_schema=None,
):
super(Producer, self).__init__( output_queue = params.get("output_queue")
pulsar_host=pulsar_host, output_schema = params.get("output_schema")
log_level=log_level,
) if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created'
)
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
__class__.pubsub_metric.info({
"output_queue": output_queue,
"output_schema": output_schema.__name__,
})
super(Producer, self).__init__(**params)
if output_schema == None: if output_schema == None:
raise RuntimeError("output_schema must be specified") raise RuntimeError("output_schema must be specified")
@ -250,8 +342,8 @@ class Producer(BaseProcessor):
) )
def send(self, msg, properties={}): def send(self, msg, properties={}):
self.producer.send(msg, properties) self.producer.send(msg, properties)
__class__.output_metric.inc()
@staticmethod @staticmethod
def add_args( def add_args(

View file

@ -17,25 +17,22 @@ default_subscriber = 'chunker-recursive'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
log_level=LogLevel.INFO,
chunk_size=2000,
chunk_overlap=100,
):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
chunk_size = params.get("chunk_size", 2000)
chunk_overlap = params.get("chunk_overlap", 100)
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": TextDocument,
input_schema=TextDocument, "output_schema": Chunk,
output_schema=Chunk, }
) )
self.text_splitter = RecursiveCharacterTextSplitter( self.text_splitter = RecursiveCharacterTextSplitter(

View file

@ -18,23 +18,20 @@ default_subscriber = 'pdf-decoder'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": Document,
input_schema=Document, "output_schema": TextDocument,
output_schema=TextDocument, }
) )
print("PDF inited") print("PDF inited")

View file

@ -17,24 +17,21 @@ default_model="all-MiniLM-L6-v2"
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, model = params.get("model", default_model)
log_level=LogLevel.INFO,
model=default_model,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": EmbeddingsRequest,
input_schema=EmbeddingsRequest, "output_schema": EmbeddingsResponse,
output_schema=EmbeddingsResponse, }
) )
self.embeddings = HuggingFaceEmbeddings(model_name=model) self.embeddings = HuggingFaceEmbeddings(model_name=model)

View file

@ -17,25 +17,20 @@ default_ollama = 'http://localhost:11434'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber,
log_level=LogLevel.INFO,
model=default_model,
ollama=default_ollama,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": EmbeddingsRequest,
input_schema=EmbeddingsRequest, "output_schema": EmbeddingsResponse,
output_schema=EmbeddingsResponse, }
) )
self.embeddings = OllamaEmbeddings(base_url=ollama, model=model) self.embeddings = OllamaEmbeddings(base_url=ollama, model=model)

View file

@ -15,26 +15,23 @@ default_subscriber = 'embeddings-vectorizer'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": Chunk,
input_schema=Chunk, "output_schema": VectorsChunk,
output_schema=VectorsChunk, }
) )
self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) self.embeddings = EmbeddingsClient(pulsar_host=self.pulsar_host)
def emit(self, source, chunk, vectors): def emit(self, source, chunk, vectors):

View file

@ -20,27 +20,22 @@ default_graph_host='localhost'
class Processor(Consumer): class Processor(Consumer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, graph_host = params.get("graph_host", default_graph_host)
graph_host=default_graph_host,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": Triple,
input_schema=Triple, }
) )
self.tg = TrustGraph([graph_host]) self.tg = TrustGraph([graph_host])
self.count = 0
def handle(self, msg): def handle(self, msg):
v = msg.value() v = msg.value()
@ -51,11 +46,6 @@ class Processor(Consumer):
v.o.value v.o.value
) )
self.count += 1
if (self.count % 1000) == 0:
print(self.count, "...", flush=True)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):

View file

@ -22,23 +22,20 @@ default_subscriber = 'kg-extract-definitions'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": VectorsChunk,
input_schema=VectorsChunk, "output_schema": Triple,
output_schema=Triple, }
) )
self.llm = LlmClient(pulsar_host=pulsar_host) self.llm = LlmClient(pulsar_host=pulsar_host)

View file

@ -7,6 +7,7 @@ graph edges.
import urllib.parse import urllib.parse
import json import json
import os
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value
@ -25,24 +26,21 @@ default_vector_queue='vectors-load'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
vector_queue=default_vector_queue, vector_queue = params.get("vector_queue", default_vector_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": VectorsChunk,
input_schema=VectorsChunk, "output_schema": Triple,
output_schema=Triple, }
) )
self.vec_prod = self.client.create_producer( self.vec_prod = self.client.create_producer(
@ -50,7 +48,17 @@ class Processor(ConsumerProducer):
schema=JsonSchema(VectorsAssociation), schema=JsonSchema(VectorsAssociation),
) )
self.llm = LlmClient(pulsar_host=pulsar_host) __class__.pubsub_metric.info({
"input_queue": input_queue,
"output_queue": output_queue,
"vector_queue": vector_queue,
"subscriber": subscriber,
"input_schema": VectorsChunk.__name__,
"output_schema": Triple.__name__,
"vector_schema": VectorsAssociation.__name__,
})
self.llm = LlmClient(pulsar_host=self.pulsar_host)
def to_uri(self, text): def to_uri(self, text):

View file

@ -17,25 +17,22 @@ default_subscriber = 'llm-azure-text'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, endpoint = params.get("endpoint")
log_level=LogLevel.INFO, token = params.get("token")
endpoint=None,
token=None,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": TextCompletionRequest,
input_schema=TextCompletionRequest, "output_schema": TextCompletionResponse,
output_schema=TextCompletionResponse, }
) )
self.endpoint = endpoint self.endpoint = endpoint

View file

@ -15,27 +15,25 @@ default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-claude-text' default_subscriber = 'llm-claude-text'
default_model = 'claude-3-5-sonnet-20240620' default_model = 'claude-3-5-sonnet-20240620'
class Processor: class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, model = params.get("model", default_model)
log_level=LogLevel.INFO, api_key = params.get("api_key")
model=default_model,
api_key="",
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": TextCompletionRequest,
input_schema=TextCompletionRequest, "output_schema": TextCompletionResponse,
output_schema=TextCompletionResponse, "model": model,
}
) )
self.model = model self.model = model

View file

@ -5,6 +5,7 @@ Input is prompt, output is response.
""" """
from langchain_community.llms import Ollama from langchain_community.llms import Ollama
from prometheus_client import Histogram, Info, Counter
from ... schema import TextCompletionRequest, TextCompletionResponse from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel from ... log_level import LogLevel
@ -18,27 +19,36 @@ default_ollama = 'http://localhost:11434'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, model = params.get("model", default_model)
log_level=LogLevel.INFO, ollama = params.get("ollama", default_ollama)
model=default_model,
ollama=default_ollama,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "model": model,
input_schema=TextCompletionRequest, "ollama": ollama,
output_schema=TextCompletionResponse, "input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
}
) )
if not hasattr(__class__, "model_metric"):
__class__.model_metric = Info(
'model', 'Model information'
)
__class__.model_metric.info({
"model": model,
"ollama": ollama,
})
self.llm = Ollama(base_url=ollama, model=model) self.llm = Ollama(base_url=ollama, model=model)
def handle(self, msg): def handle(self, msg):

View file

@ -31,26 +31,23 @@ default_subscriber = 'llm-vertexai-text'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, region = params.get("region", "us-west1")
log_level=LogLevel.INFO, model = params.get("model", "gemini-1.0-pro-001")
region="us-west1", private_key = params.get("private_key")
model="gemini-1.0-pro-001",
private_key=None,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": TextCompletionRequest,
input_schema=TextCompletionRequest, "output_schema": TextCompletionResponse,
output_schema=TextCompletionResponse, }
) )
self.parameters = { self.parameters = {

View file

@ -17,32 +17,32 @@ default_vector_store = 'http://localhost:19530'
class Processor(ConsumerProducer): class Processor(ConsumerProducer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, output_queue = params.get("output_queue", default_output_queue)
output_queue=default_output_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, graph_hosts = params.get("graph_hosts", default_graph_hosts)
log_level=LogLevel.INFO, vector_store = params.get("vector_store", default_vector_store)
graph_hosts=default_graph_hosts, entity_limit = params.get("entity_limit", 50)
vector_store=default_vector_store, triple_limit = params.get("triple_limit", 30)
entity_limit=50, max_subgraph_size = params.get("max_subgraph_size", 3000)
triple_limit=30,
max_subgraph_size=3000,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "output_queue": output_queue,
output_queue=output_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": GraphRagQuery,
input_schema=GraphRagQuery, "output_schema": GraphRagResponse,
output_schema=GraphRagResponse, "entity_limit": entity_limit,
"triple_limit": triple_limit,
"max_subgraph_size": max_subgraph_size,
}
) )
self.rag = GraphRag( self.rag = GraphRag(
pulsar_host=pulsar_host, pulsar_host=self.pulsar_host,
graph_hosts=graph_hosts.split(","), graph_hosts=graph_hosts.split(","),
vector_store=vector_store, vector_store=vector_store,
verbose=True, verbose=True,

View file

@ -14,21 +14,19 @@ default_store_uri = 'http://localhost:19530'
class Processor(Consumer): class Processor(Consumer):
def __init__( def __init__(self, **params):
self,
pulsar_host=None, input_queue = params.get("input_queue", default_input_queue)
input_queue=default_input_queue, subscriber = params.get("subscriber", default_subscriber)
subscriber=default_subscriber, store_uri = params.get("store_uri", default_store_uri)
store_uri=default_store_uri,
log_level=LogLevel.INFO,
):
super(Processor, self).__init__( super(Processor, self).__init__(
pulsar_host=pulsar_host, **params | {
log_level=log_level, "input_queue": input_queue,
input_queue=input_queue, "subscriber": subscriber,
subscriber=subscriber, "input_schema": VectorsAssociation,
input_schema=VectorsAssociation, "store_uri": store_uri,
}
) )
self.vecstore = TripleVectors(store_uri) self.vecstore = TripleVectors(store_uri)
@ -40,6 +38,7 @@ class Processor(Consumer):
if v.entity.value != "": if v.entity.value != "":
for vec in v.vectors: for vec in v.vectors:
self.vecstore.insert(vec, v.entity.value) self.vecstore.insert(vec, v.entity.value)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):