From 3947920ee89c5a0b652f8e59f5cdd50416474cdf Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 23 Jul 2024 21:34:03 +0100 Subject: [PATCH] Refactor names (#4) - Downsize embeddings model to mini-lm in docker-compose files - Rename for structure - Default queues defined in schema file - Standardize naming: graph embeddings, chunk embeddings, triples --- Makefile | 2 +- docker-compose-azure.yaml | 36 +- docker-compose-claude.yaml | 36 +- docker-compose-ollama.yaml | 36 +- docker-compose-vertexai.yaml | 36 +- scripts/chunker-recursive | 2 +- scripts/ge-write-milvus | 6 + scripts/graph-rag | 2 +- scripts/graph-write-cassandra | 6 - scripts/llm-azure-text | 6 - scripts/llm-claude-text | 6 - scripts/llm-ollama-text | 6 - scripts/llm-vertexai-text | 6 - scripts/pdf-decoder | 2 +- scripts/text-completion-azure | 6 + scripts/text-completion-claude | 6 + scripts/text-completion-ollama | 6 + scripts/text-completion-vertexai | 6 + scripts/triples-write-cassandra | 6 + scripts/vector-write-milvus | 6 - setup.py | 14 +- trustgraph/base/__init__.py | 5 +- trustgraph/base/base_processor.py | 117 ++++++ trustgraph/base/consumer.py | 87 +++++ trustgraph/base/consumer_producer.py | 168 ++++++++ trustgraph/base/processor.py | 360 ------------------ trustgraph/base/producer.py | 55 +++ trustgraph/{chunker => chunking}/__init__.py | 0 .../recursive/__init__.py | 0 .../recursive/__main__.py | 0 .../recursive/chunker.py | 11 +- trustgraph/{decoder => decoding}/__init__.py | 0 .../{decoder => decoding}/pdf/__init__.py | 0 .../{decoder => decoding}/pdf/__main__.py | 0 .../{decoder => decoding}/pdf/pdf_decoder.py | 11 +- trustgraph/embeddings/hf/hf.py | 11 +- trustgraph/embeddings/ollama/processor.py | 11 +- trustgraph/embeddings/vectorize/vectorize.py | 17 +- trustgraph/embeddings_client.py | 7 +- trustgraph/graph_rag_client.py | 8 +- trustgraph/kg/extract_definitions/extract.py | 17 +- .../kg/extract_relationships/extract.py | 25 +- trustgraph/llm_client.py | 9 +- trustgraph/{graph => model}/__init__.py | 0 .../text_completion}/__init__.py | 0 .../text_completion/azure}/__init__.py | 0 .../text_completion/azure}/__main__.py | 0 .../text_completion/azure}/llm.py | 18 +- .../text_completion/claude}/__init__.py | 0 .../text_completion/claude}/__main__.py | 0 .../text_completion/claude}/llm.py | 18 +- .../text_completion/ollama}/__init__.py | 0 .../text_completion/ollama}/__main__.py | 0 .../text_completion/ollama}/llm.py | 18 +- .../text_completion/vertexai}/__init__.py | 0 .../text_completion/vertexai}/__main__.py | 0 .../text_completion/vertexai}/llm.py | 18 +- trustgraph/{rag => retrieval}/__init__.py | 0 .../graph => retrieval/graph_rag}/__init__.py | 0 .../graph => retrieval/graph_rag}/__main__.py | 0 .../{rag/graph => retrieval/graph_rag}/rag.py | 11 +- trustgraph/schema.py | 72 +++- trustgraph/{vector => storage}/__init__.py | 0 .../storage/graph_embeddings/__init__.py | 0 .../graph_embeddings/milvus}/__init__.py | 0 .../graph_embeddings/milvus}/__main__.py | 0 .../graph_embeddings/milvus}/write.py | 20 +- trustgraph/storage/triples/__init__.py | 0 .../triples/cassandra}/__init__.py | 0 .../triples/cassandra}/__main__.py | 0 .../triples/cassandra}/write.py | 17 +- 71 files changed, 764 insertions(+), 585 deletions(-) create mode 100755 scripts/ge-write-milvus delete mode 100755 scripts/graph-write-cassandra delete mode 100755 scripts/llm-azure-text delete mode 100755 scripts/llm-claude-text delete mode 100755 scripts/llm-ollama-text delete mode 100755 scripts/llm-vertexai-text create mode 100755 scripts/text-completion-azure create mode 100755 scripts/text-completion-claude create mode 100755 scripts/text-completion-ollama create mode 100755 scripts/text-completion-vertexai create mode 100755 scripts/triples-write-cassandra delete mode 100755 scripts/vector-write-milvus create mode 100644 trustgraph/base/base_processor.py create mode 100644 trustgraph/base/consumer.py create mode 100644 trustgraph/base/consumer_producer.py delete mode 100644 trustgraph/base/processor.py create mode 100644 trustgraph/base/producer.py rename trustgraph/{chunker => chunking}/__init__.py (100%) rename trustgraph/{chunker => chunking}/recursive/__init__.py (100%) rename trustgraph/{chunker => chunking}/recursive/__main__.py (100%) rename trustgraph/{chunker => chunking}/recursive/chunker.py (90%) rename trustgraph/{decoder => decoding}/__init__.py (100%) rename trustgraph/{decoder => decoding}/pdf/__init__.py (100%) rename trustgraph/{decoder => decoding}/pdf/__main__.py (100%) rename trustgraph/{decoder => decoding}/pdf/pdf_decoder.py (88%) rename trustgraph/{graph => model}/__init__.py (100%) rename trustgraph/{llm => model/text_completion}/__init__.py (100%) rename trustgraph/{llm/azure_text => model/text_completion/azure}/__init__.py (100%) rename trustgraph/{llm/azure_text => model/text_completion/azure}/__main__.py (100%) rename trustgraph/{llm/azure_text => model/text_completion/azure}/llm.py (85%) rename trustgraph/{llm/claude_text => model/text_completion/claude}/__init__.py (100%) rename trustgraph/{llm/claude_text => model/text_completion/claude}/__main__.py (100%) rename trustgraph/{llm/claude_text => model/text_completion/claude}/llm.py (84%) rename trustgraph/{llm/ollama_text => model/text_completion/ollama}/__init__.py (100%) rename trustgraph/{llm/ollama_text => model/text_completion/ollama}/__main__.py (100%) rename trustgraph/{llm/ollama_text => model/text_completion/ollama}/llm.py (83%) rename trustgraph/{llm/vertexai_text => model/text_completion/vertexai}/__init__.py (100%) rename trustgraph/{llm/vertexai_text => model/text_completion/vertexai}/__main__.py (100%) rename trustgraph/{llm/vertexai_text => model/text_completion/vertexai}/llm.py (90%) rename trustgraph/{rag => retrieval}/__init__.py (100%) rename trustgraph/{rag/graph => retrieval/graph_rag}/__init__.py (100%) rename trustgraph/{rag/graph => retrieval/graph_rag}/__main__.py (100%) rename trustgraph/{rag/graph => retrieval/graph_rag}/rag.py (92%) rename trustgraph/{vector => storage}/__init__.py (100%) create mode 100644 trustgraph/storage/graph_embeddings/__init__.py rename trustgraph/{graph/cassandra_write => storage/graph_embeddings/milvus}/__init__.py (100%) rename trustgraph/{graph/cassandra_write => storage/graph_embeddings/milvus}/__main__.py (100%) rename trustgraph/{vector/milvus_write => storage/graph_embeddings/milvus}/write.py (73%) create mode 100644 trustgraph/storage/triples/__init__.py rename trustgraph/{vector/milvus_write => storage/triples/cassandra}/__init__.py (100%) rename trustgraph/{vector/milvus_write => storage/triples/cassandra}/__main__.py (100%) rename trustgraph/{graph/cassandra_write => storage/triples/cassandra}/write.py (78%) diff --git a/Makefile b/Makefile index aced52c1..79b0d810 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # VERSION=$(shell git describe | sed 's/^v//') -VERSION=0.4.2 +VERSION=0.5.1 all: container diff --git a/docker-compose-azure.yaml b/docker-compose-azure.yaml index f48df808..774e2b15 100644 --- a/docker-compose-azure.yaml +++ b/docker-compose-azure.yaml @@ -119,7 +119,7 @@ services: restart: on-failure:100 pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "pdf-decoder" - "-p" @@ -127,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "chunker-recursive" - "-p" @@ -135,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-vectorize" - "-p" @@ -143,17 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" - - "-m" - - "mixedbread-ai/mxbai-embed-large-v1" +# - "-m" +# - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-definitions" - "-p" @@ -161,37 +161,37 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-relationships" - "-p" - "pulsar://pulsar:6650" restart: on-failure:100 - vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-graph-embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "vector-write-milvus" + - "ge-write-milvus" - "-p" - "pulsar://pulsar:6650" - "-t" - "http://milvus:19530" restart: on-failure:100 - graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-triples: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "graph-write-cassandra" + - "triples-write-cassandra" - "-p" - "pulsar://pulsar:6650" - "-g" - "cassandra" restart: on-failure:100 - llm: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + text-completion: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "llm-azure-text" + - "text-completion-azure" - "-p" - "pulsar://pulsar:6650" - "-k" @@ -201,7 +201,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-claude.yaml b/docker-compose-claude.yaml index fc63ef14..2af933b9 100644 --- a/docker-compose-claude.yaml +++ b/docker-compose-claude.yaml @@ -119,7 +119,7 @@ services: restart: on-failure:100 pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "pdf-decoder" - "-p" @@ -127,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "chunker-recursive" - "-p" @@ -135,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-vectorize" - "-p" @@ -143,17 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" - - "-m" - - "mixedbread-ai/mxbai-embed-large-v1" +# - "-m" +# - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-definitions" - "-p" @@ -161,37 +161,37 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-relationships" - "-p" - "pulsar://pulsar:6650" restart: on-failure:100 - vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-graph-embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "vector-write-milvus" + - "ge-write-milvus" - "-p" - "pulsar://pulsar:6650" - "-t" - "http://milvus:19530" restart: on-failure:100 - graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-triples: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "graph-write-cassandra" + - "triples-write-cassandra" - "-p" - "pulsar://pulsar:6650" - "-g" - "cassandra" restart: on-failure:100 - llm: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + text-completion: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "llm-claude-text" + - "text-completion-claude" - "-p" - "pulsar://pulsar:6650" - "-k" @@ -199,7 +199,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-ollama.yaml b/docker-compose-ollama.yaml index 2e66727b..18d81a1a 100644 --- a/docker-compose-ollama.yaml +++ b/docker-compose-ollama.yaml @@ -119,7 +119,7 @@ services: restart: on-failure:100 pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "pdf-decoder" - "-p" @@ -127,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "chunker-recursive" - "-p" @@ -135,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-vectorize" - "-p" @@ -143,17 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" - - "-m" - - "mixedbread-ai/mxbai-embed-large-v1" +# - "-m" +# - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-definitions" - "-p" @@ -161,37 +161,37 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-relationships" - "-p" - "pulsar://pulsar:6650" restart: on-failure:100 - vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-graph-embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "vector-write-milvus" + - "ge-write-milvus" - "-p" - "pulsar://pulsar:6650" - "-t" - "http://milvus:19530" restart: on-failure:100 - graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-triples: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "graph-write-cassandra" + - "triples-write-cassandra" - "-p" - "pulsar://pulsar:6650" - "-g" - "cassandra" restart: on-failure:100 - llm: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + text-completion: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "llm-ollama-text" + - "text-completion-ollama" - "-p" - "pulsar://pulsar:6650" - "-r" @@ -199,7 +199,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "graph-rag" - "-p" diff --git a/docker-compose-vertexai.yaml b/docker-compose-vertexai.yaml index 2a69ee69..24d0d3a4 100644 --- a/docker-compose-vertexai.yaml +++ b/docker-compose-vertexai.yaml @@ -119,7 +119,7 @@ services: restart: on-failure:100 pdf-decoder: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "pdf-decoder" - "-p" @@ -127,7 +127,7 @@ services: restart: on-failure:100 chunker: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "chunker-recursive" - "-p" @@ -135,7 +135,7 @@ services: restart: on-failure:100 vectorize: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-vectorize" - "-p" @@ -143,17 +143,17 @@ services: restart: on-failure:100 embeddings: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "embeddings-hf" - "-p" - "pulsar://pulsar:6650" - - "-m" - - "mixedbread-ai/mxbai-embed-large-v1" +# - "-m" +# - "mixedbread-ai/mxbai-embed-large-v1" restart: on-failure:100 kg-extract-definitions: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-definitions" - "-p" @@ -161,37 +161,37 @@ services: restart: on-failure:100 kg-extract-relationships: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "kg-extract-relationships" - "-p" - "pulsar://pulsar:6650" restart: on-failure:100 - vector-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-graph-embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "vector-write-milvus" + - "ge-write-milvus" - "-p" - "pulsar://pulsar:6650" - "-t" - "http://milvus:19530" restart: on-failure:100 - graph-write: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + store-triples: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "graph-write-cassandra" + - "triples-write-cassandra" - "-p" - "pulsar://pulsar:6650" - "-g" - "cassandra" restart: on-failure:100 - llm: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + text-completion: + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - - "llm-vertexai-text" + - "text-completion-ollama" - "-p" - "pulsar://pulsar:6650" - "-k" @@ -203,7 +203,7 @@ services: restart: on-failure:100 graph-rag: - image: docker.io/trustgraph/trustgraph-flow:0.4.2 + image: docker.io/trustgraph/trustgraph-flow:0.5.1 command: - "graph-rag" - "-p" diff --git a/scripts/chunker-recursive b/scripts/chunker-recursive index 2356903d..041a72d4 100755 --- a/scripts/chunker-recursive +++ b/scripts/chunker-recursive @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from trustgraph.chunker.recursive import run +from trustgraph.chunking.recursive import run run() diff --git a/scripts/ge-write-milvus b/scripts/ge-write-milvus new file mode 100755 index 00000000..0b18faf8 --- /dev/null +++ b/scripts/ge-write-milvus @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.graph_embeddings.milvus import run + +run() + diff --git a/scripts/graph-rag b/scripts/graph-rag index a6dab1f3..6b18b689 100755 --- a/scripts/graph-rag +++ b/scripts/graph-rag @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from trustgraph.rag.graph import run +from trustgraph.retrieval.graph_rag import run run() diff --git a/scripts/graph-write-cassandra b/scripts/graph-write-cassandra deleted file mode 100755 index 7fc3d0c8..00000000 --- a/scripts/graph-write-cassandra +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.graph.cassandra_write import run - -run() - diff --git a/scripts/llm-azure-text b/scripts/llm-azure-text deleted file mode 100755 index cdaea4b8..00000000 --- a/scripts/llm-azure-text +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.llm.azure_text import run - -run() - diff --git a/scripts/llm-claude-text b/scripts/llm-claude-text deleted file mode 100755 index 496d1440..00000000 --- a/scripts/llm-claude-text +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.llm.claude_text import run - -run() - diff --git a/scripts/llm-ollama-text b/scripts/llm-ollama-text deleted file mode 100755 index cb7a4ebc..00000000 --- a/scripts/llm-ollama-text +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.llm.ollama_text import run - -run() - diff --git a/scripts/llm-vertexai-text b/scripts/llm-vertexai-text deleted file mode 100755 index 4634015f..00000000 --- a/scripts/llm-vertexai-text +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.llm.vertexai_text import run - -run() - diff --git a/scripts/pdf-decoder b/scripts/pdf-decoder index 82b89298..0de6a9be 100755 --- a/scripts/pdf-decoder +++ b/scripts/pdf-decoder @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from trustgraph.decoder.pdf import run +from trustgraph.decoding.pdf import run run() diff --git a/scripts/text-completion-azure b/scripts/text-completion-azure new file mode 100755 index 00000000..965bf956 --- /dev/null +++ b/scripts/text-completion-azure @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.model.text_completion.azure import run + +run() + diff --git a/scripts/text-completion-claude b/scripts/text-completion-claude new file mode 100755 index 00000000..b9175375 --- /dev/null +++ b/scripts/text-completion-claude @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.model.text_completion.claude import run + +run() + diff --git a/scripts/text-completion-ollama b/scripts/text-completion-ollama new file mode 100755 index 00000000..9479750a --- /dev/null +++ b/scripts/text-completion-ollama @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.model.text_completion.ollama import run + +run() + diff --git a/scripts/text-completion-vertexai b/scripts/text-completion-vertexai new file mode 100755 index 00000000..56458d4a --- /dev/null +++ b/scripts/text-completion-vertexai @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.model.text_completion.vertexai import run + +run() + diff --git a/scripts/triples-write-cassandra b/scripts/triples-write-cassandra new file mode 100755 index 00000000..207c3222 --- /dev/null +++ b/scripts/triples-write-cassandra @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.triples.cassandra import run + +run() + diff --git a/scripts/vector-write-milvus b/scripts/vector-write-milvus deleted file mode 100755 index 952e22cf..00000000 --- a/scripts/vector-write-milvus +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from trustgraph.vector.milvus_write import run - -run() - diff --git a/setup.py b/setup.py index dac0ecfd..8cd2103a 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import os with open("README.md", "r") as fh: long_description = fh.read() -version = "0.4.2" +version = "0.5.1" setuptools.setup( name="trustgraph", @@ -50,21 +50,21 @@ setuptools.setup( "scripts/embeddings-hf", "scripts/embeddings-ollama", "scripts/embeddings-vectorize", + "scripts/ge-write-milvus", "scripts/graph-rag", "scripts/graph-show", "scripts/graph-to-turtle", - "scripts/graph-write-cassandra", "scripts/init-pulsar-manager", "scripts/kg-extract-definitions", "scripts/kg-extract-relationships", - "scripts/llm-azure-text", - "scripts/llm-claude-text", - "scripts/llm-ollama-text", - "scripts/llm-vertexai-text", "scripts/loader", "scripts/pdf-decoder", "scripts/query", "scripts/run-processing", - "scripts/vector-write-milvus", + "scripts/text-completion-azure", + "scripts/text-completion-claude", + "scripts/text-completion-ollama", + "scripts/text-completion-vertexai", + "scripts/triples-write-cassandra", ] ) diff --git a/trustgraph/base/__init__.py b/trustgraph/base/__init__.py index 9d16af90..b9dba4fa 100644 --- a/trustgraph/base/__init__.py +++ b/trustgraph/base/__init__.py @@ -1,3 +1,6 @@ -from . processor import * +from . base_processor import BaseProcessor +from . consumer import Consumer +from . producer import Producer +from . consumer_producer import ConsumerProducer diff --git a/trustgraph/base/base_processor.py b/trustgraph/base/base_processor.py new file mode 100644 index 00000000..202b1932 --- /dev/null +++ b/trustgraph/base/base_processor.py @@ -0,0 +1,117 @@ + +import os +import argparse +import pulsar +import _pulsar +import time +from prometheus_client import start_http_server, Info + +from .. log_level import LogLevel + +class BaseProcessor: + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + + def __init__(self, **params): + + self.client = None + + if not hasattr(__class__, "params_metric"): + __class__.params_metric = Info( + 'params', 'Parameters configuration' + ) + + # FIXME: Maybe outputs information it should not + __class__.params_metric.info({ + k: str(params[k]) + for k in params + }) + + pulsar_host = params.get("pulsar_host", self.default_pulsar_host) + log_level = params.get("log_level", LogLevel.INFO) + + self.pulsar_host = pulsar_host + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + def __del__(self): + + if self.client: + self.client.close() + + @staticmethod + def add_args(parser): + + parser.add_argument( + '-p', '--pulsar-host', + default=__class__.default_pulsar_host, + help=f'Pulsar host (default: {__class__.default_pulsar_host})', + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-M', '--metrics-enabled', + type=bool, + default=True, + help=f'Pulsar host (default: true)', + ) + + parser.add_argument( + '-P', '--metrics-port', + type=int, + default=8000, + help=f'Pulsar host (default: 8000)', + ) + + def run(self): + raise RuntimeError("Something should have implemented the run method") + + @classmethod + def start(cls, prog, doc): + + while True: + + parser = argparse.ArgumentParser( + prog=prog, + description=doc + ) + + cls.add_args(parser) + + args = parser.parse_args() + args = vars(args) + + if args["metrics_enabled"]: + start_http_server(args["metrics_port"]) + + try: + + p = cls(**args) + p.run() + + except KeyboardInterrupt: + print("Keyboard interrupt.") + return + + except _pulsar.Interrupted: + print("Pulsar Interrupted.") + return + + except Exception as e: + + print(type(e)) + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) diff --git a/trustgraph/base/consumer.py b/trustgraph/base/consumer.py new file mode 100644 index 00000000..ec20349b --- /dev/null +++ b/trustgraph/base/consumer.py @@ -0,0 +1,87 @@ + +from pulsar.schema import JsonSchema +from prometheus_client import start_http_server, Histogram, Info, Counter + +from . base_processor import BaseProcessor + +class Consumer(BaseProcessor): + + def __init__(self, **params): + + super(Consumer, self).__init__(**params) + + input_queue = params.get("input_queue") + subscriber = params.get("subscriber") + input_schema = params.get("input_schema") + + if input_schema == None: + raise RuntimeError("input_schema must be specified") + + if not hasattr(__class__, "request_metric"): + __class__.request_metric = Histogram( + 'request_latency', 'Request latency (seconds)' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + if not hasattr(__class__, "processing_metric"): + __class__.processing_metric = Counter( + 'processing_count', 'Processing count', ["status"] + ) + + __class__.pubsub_metric.info({ + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": input_schema.__name__, + }) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(input_schema), + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + with __class__.request_metric.time(): + self.handle(msg) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + __class__.processing_metric.labels(status="success").inc() + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + __class__.processing_metric.labels(status="error").inc() + + @staticmethod + def add_args(parser, default_input_queue, default_subscriber): + + BaseProcessor.add_args(parser) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + diff --git a/trustgraph/base/consumer_producer.py b/trustgraph/base/consumer_producer.py new file mode 100644 index 00000000..06704d02 --- /dev/null +++ b/trustgraph/base/consumer_producer.py @@ -0,0 +1,168 @@ + +from pulsar.schema import JsonSchema +from prometheus_client import Histogram, Info, Counter + +from . base_processor import BaseProcessor + +# FIXME: Derive from consumer? And producer? + +class ConsumerProducer(BaseProcessor): + + def __init__(self, **params): + + input_queue = params.get("input_queue") + output_queue = params.get("output_queue") + subscriber = params.get("subscriber") + input_schema = params.get("input_schema") + output_schema = params.get("output_schema") + + if not hasattr(__class__, "request_metric"): + __class__.request_metric = Histogram( + 'request_latency', 'Request latency (seconds)' + ) + + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + if not hasattr(__class__, "processing_metric"): + __class__.processing_metric = Counter( + 'processing_count', 'Processing count', ["status"] + ) + + __class__.pubsub_metric.info({ + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": input_schema.__name__, + "output_schema": output_schema.__name__, + }) + + super(ConsumerProducer, self).__init__(**params) + + if input_schema == None: + raise RuntimeError("input_schema must be specified") + + if output_schema == None: + raise RuntimeError("output_schema must be specified") + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(input_schema), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(output_schema), + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + with __class__.request_metric.time(): + resp = self.handle(msg) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + __class__.processing_metric.labels(status="success").inc() + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + __class__.processing_metric.labels(status="error").inc() + + def send(self, msg, properties={}): + self.producer.send(msg, properties) + __class__.output_metric.inc() + + @staticmethod + def add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ): + + BaseProcessor.add_args(parser) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + +class Producer(BaseProcessor): + + def __init__(self, **params): + + output_queue = params.get("output_queue") + output_schema = params.get("output_schema") + + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + __class__.pubsub_metric.info({ + "output_queue": output_queue, + "output_schema": output_schema.__name__, + }) + + super(Producer, self).__init__(**params) + + if output_schema == None: + raise RuntimeError("output_schema must be specified") + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(output_schema), + ) + + def send(self, msg, properties={}): + self.producer.send(msg, properties) + __class__.output_metric.inc() + + @staticmethod + def add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ): + + BaseProcessor.add_args(parser) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) diff --git a/trustgraph/base/processor.py b/trustgraph/base/processor.py deleted file mode 100644 index ec017de2..00000000 --- a/trustgraph/base/processor.py +++ /dev/null @@ -1,360 +0,0 @@ - -import os -import argparse -import pulsar -import _pulsar -import time -from pulsar.schema import JsonSchema -from prometheus_client import start_http_server, Histogram, Info, Counter - -from .. log_level import LogLevel - -class BaseProcessor: - - default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') - - def __init__(self, **params): - - self.client = None - - if not hasattr(__class__, "params_metric"): - __class__.params_metric = Info( - 'params', 'Parameters configuration' - ) - - # FIXME: Maybe outputs information it should not - __class__.params_metric.info({ - k: str(params[k]) - for k in params - }) - - pulsar_host = params.get("pulsar_host", self.default_pulsar_host) - log_level = params.get("log_level", LogLevel.INFO) - - self.pulsar_host = pulsar_host - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - def __del__(self): - - if self.client: - self.client.close() - - @staticmethod - def add_args(parser): - - parser.add_argument( - '-p', '--pulsar-host', - default=__class__.default_pulsar_host, - help=f'Pulsar host (default: {__class__.default_pulsar_host})', - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-M', '--metrics-enabled', - type=bool, - default=True, - help=f'Pulsar host (default: true)', - ) - - parser.add_argument( - '-P', '--metrics-port', - type=int, - default=8000, - help=f'Pulsar host (default: 8000)', - ) - - def run(self): - raise RuntimeError("Something should have implemented the run method") - - @classmethod - def start(cls, prog, doc): - - while True: - - parser = argparse.ArgumentParser( - prog=prog, - description=doc - ) - - cls.add_args(parser) - - args = parser.parse_args() - args = vars(args) - - if args["metrics_enabled"]: - start_http_server(args["metrics_port"]) - - try: - - p = cls(**args) - p.run() - - except KeyboardInterrupt: - print("Keyboard interrupt.") - return - - except _pulsar.Interrupted: - print("Pulsar Interrupted.") - return - - except Exception as e: - - print(type(e)) - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - -class Consumer(BaseProcessor): - - def __init__(self, **params): - - super(Consumer, self).__init__(**params) - - input_queue = params.get("input_queue") - subscriber = params.get("subscriber") - input_schema = params.get("input_schema") - - if input_schema == None: - raise RuntimeError("input_schema must be specified") - - if not hasattr(__class__, "request_metric"): - __class__.request_metric = Histogram( - 'request_latency', 'Request latency (seconds)' - ) - - if not hasattr(__class__, "pubsub_metric"): - __class__.pubsub_metric = Info( - 'pubsub', 'Pub/sub configuration' - ) - - if not hasattr(__class__, "processing_metric"): - __class__.processing_metric = Counter( - 'processing_count', 'Processing count', ["status"] - ) - - __class__.pubsub_metric.info({ - "input_queue": input_queue, - "subscriber": subscriber, - "input_schema": input_schema.__name__, - }) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(input_schema), - ) - - def run(self): - - while True: - - msg = self.consumer.receive() - - try: - - with __class__.request_metric.time(): - self.handle(msg) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - __class__.processing_metric.labels(status="success").inc() - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - __class__.processing_metric.labels(status="error").inc() - - @staticmethod - def add_args(parser, default_input_queue, default_subscriber): - - BaseProcessor.add_args(parser) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - -class ConsumerProducer(BaseProcessor): - - def __init__(self, **params): - - input_queue = params.get("input_queue") - output_queue = params.get("output_queue") - subscriber = params.get("subscriber") - input_schema = params.get("input_schema") - output_schema = params.get("output_schema") - - if not hasattr(__class__, "request_metric"): - __class__.request_metric = Histogram( - 'request_latency', 'Request latency (seconds)' - ) - - if not hasattr(__class__, "output_metric"): - __class__.output_metric = Counter( - 'output_count', 'Output items created' - ) - - if not hasattr(__class__, "pubsub_metric"): - __class__.pubsub_metric = Info( - 'pubsub', 'Pub/sub configuration' - ) - - if not hasattr(__class__, "processing_metric"): - __class__.processing_metric = Counter( - 'processing_count', 'Processing count', ["status"] - ) - - __class__.pubsub_metric.info({ - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": input_schema.__name__, - "output_schema": output_schema.__name__, - }) - - super(ConsumerProducer, self).__init__(**params) - - if input_schema == None: - raise RuntimeError("input_schema must be specified") - - if output_schema == None: - raise RuntimeError("output_schema must be specified") - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(input_schema), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(output_schema), - ) - - def run(self): - - while True: - - msg = self.consumer.receive() - - try: - - with __class__.request_metric.time(): - resp = self.handle(msg) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - __class__.processing_metric.labels(status="success").inc() - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - __class__.processing_metric.labels(status="error").inc() - - def send(self, msg, properties={}): - self.producer.send(msg, properties) - __class__.output_metric.inc() - - @staticmethod - def add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ): - - BaseProcessor.add_args(parser) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - -class Producer(BaseProcessor): - - def __init__(self, **params): - - output_queue = params.get("output_queue") - output_schema = params.get("output_schema") - - if not hasattr(__class__, "output_metric"): - __class__.output_metric = Counter( - 'output_count', 'Output items created' - ) - - if not hasattr(__class__, "pubsub_metric"): - __class__.pubsub_metric = Info( - 'pubsub', 'Pub/sub configuration' - ) - - __class__.pubsub_metric.info({ - "output_queue": output_queue, - "output_schema": output_schema.__name__, - }) - - super(Producer, self).__init__(**params) - - if output_schema == None: - raise RuntimeError("output_schema must be specified") - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(output_schema), - ) - - def send(self, msg, properties={}): - self.producer.send(msg, properties) - __class__.output_metric.inc() - - @staticmethod - def add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ): - - BaseProcessor.add_args(parser) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) diff --git a/trustgraph/base/producer.py b/trustgraph/base/producer.py new file mode 100644 index 00000000..27d693ee --- /dev/null +++ b/trustgraph/base/producer.py @@ -0,0 +1,55 @@ + +from pulsar.schema import JsonSchema +from prometheus_client import Info, Counter + +from . base_processor import BaseProcessor + +class Producer(BaseProcessor): + + def __init__(self, **params): + + output_queue = params.get("output_queue") + output_schema = params.get("output_schema") + + if not hasattr(__class__, "output_metric"): + __class__.output_metric = Counter( + 'output_count', 'Output items created' + ) + + if not hasattr(__class__, "pubsub_metric"): + __class__.pubsub_metric = Info( + 'pubsub', 'Pub/sub configuration' + ) + + __class__.pubsub_metric.info({ + "output_queue": output_queue, + "output_schema": output_schema.__name__, + }) + + super(Producer, self).__init__(**params) + + if output_schema == None: + raise RuntimeError("output_schema must be specified") + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(output_schema), + ) + + def send(self, msg, properties={}): + self.producer.send(msg, properties) + __class__.output_metric.inc() + + @staticmethod + def add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ): + + BaseProcessor.add_args(parser) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) diff --git a/trustgraph/chunker/__init__.py b/trustgraph/chunking/__init__.py similarity index 100% rename from trustgraph/chunker/__init__.py rename to trustgraph/chunking/__init__.py diff --git a/trustgraph/chunker/recursive/__init__.py b/trustgraph/chunking/recursive/__init__.py similarity index 100% rename from trustgraph/chunker/recursive/__init__.py rename to trustgraph/chunking/recursive/__init__.py diff --git a/trustgraph/chunker/recursive/__main__.py b/trustgraph/chunking/recursive/__main__.py similarity index 100% rename from trustgraph/chunker/recursive/__main__.py rename to trustgraph/chunking/recursive/__main__.py diff --git a/trustgraph/chunker/recursive/chunker.py b/trustgraph/chunking/recursive/chunker.py similarity index 90% rename from trustgraph/chunker/recursive/chunker.py rename to trustgraph/chunking/recursive/chunker.py index 5418f951..7f7026f3 100755 --- a/trustgraph/chunker/recursive/chunker.py +++ b/trustgraph/chunking/recursive/chunker.py @@ -8,12 +8,15 @@ from langchain_text_splitters import RecursiveCharacterTextSplitter from ... schema import TextDocument, Chunk, Source +from ... schema import text_ingest_queue, chunk_ingest_queue from ... log_level import LogLevel from ... base import ConsumerProducer -default_input_queue = 'text-doc-load' -default_output_queue = 'chunk-load' -default_subscriber = 'chunker-recursive' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_ingest_queue +default_output_queue = chunk_ingest_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -92,5 +95,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start('chunker', __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/decoder/__init__.py b/trustgraph/decoding/__init__.py similarity index 100% rename from trustgraph/decoder/__init__.py rename to trustgraph/decoding/__init__.py diff --git a/trustgraph/decoder/pdf/__init__.py b/trustgraph/decoding/pdf/__init__.py similarity index 100% rename from trustgraph/decoder/pdf/__init__.py rename to trustgraph/decoding/pdf/__init__.py diff --git a/trustgraph/decoder/pdf/__main__.py b/trustgraph/decoding/pdf/__main__.py similarity index 100% rename from trustgraph/decoder/pdf/__main__.py rename to trustgraph/decoding/pdf/__main__.py diff --git a/trustgraph/decoder/pdf/pdf_decoder.py b/trustgraph/decoding/pdf/pdf_decoder.py similarity index 88% rename from trustgraph/decoder/pdf/pdf_decoder.py rename to trustgraph/decoding/pdf/pdf_decoder.py index 1d86c366..fffcaee0 100755 --- a/trustgraph/decoder/pdf/pdf_decoder.py +++ b/trustgraph/decoding/pdf/pdf_decoder.py @@ -9,12 +9,15 @@ import base64 from langchain_community.document_loaders import PyPDFLoader from ... schema import Document, TextDocument, Source +from ... schema import document_ingest_queue, text_ingest_queue from ... log_level import LogLevel from ... base import ConsumerProducer -default_input_queue = 'document-load' -default_output_queue = 'text-doc-load' -default_subscriber = 'pdf-decoder' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = document_ingest_queue +default_output_queue = text_ingest_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -80,5 +83,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("pdf-decoder", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/embeddings/hf/hf.py b/trustgraph/embeddings/hf/hf.py index e5a5728f..20e57d53 100755 --- a/trustgraph/embeddings/hf/hf.py +++ b/trustgraph/embeddings/hf/hf.py @@ -7,12 +7,15 @@ Input is text, output is embeddings vector. from langchain_huggingface import HuggingFaceEmbeddings from ... schema import EmbeddingsRequest, EmbeddingsResponse +from ... schema import embeddings_request_queue, embeddings_response_queue from ... log_level import LogLevel from ... base import ConsumerProducer -default_input_queue = 'embeddings' -default_output_queue = 'embeddings-response' -default_subscriber = 'embeddings-hf' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = embeddings_request_queue +default_output_queue = embeddings_response_queue +default_subscriber = module default_model="all-MiniLM-L6-v2" class Processor(ConsumerProducer): @@ -70,5 +73,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("embeddings-hf", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/embeddings/ollama/processor.py b/trustgraph/embeddings/ollama/processor.py index 38c177b1..6682a79f 100755 --- a/trustgraph/embeddings/ollama/processor.py +++ b/trustgraph/embeddings/ollama/processor.py @@ -6,12 +6,15 @@ Input is text, output is embeddings vector. from langchain_community.embeddings import OllamaEmbeddings from ... schema import EmbeddingsRequest, EmbeddingsResponse +from ... schema import embeddings_request_queue, embeddings_response_queue from ... log_level import LogLevel from ... base import ConsumerProducer -default_input_queue = 'embeddings' -default_output_queue = 'embeddings-response' -default_subscriber = 'embeddings-ollama' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = embeddings_request_queue +default_output_queue = embeddings_response_queue +default_subscriber = module default_model="mxbai-embed-large" default_ollama = 'http://localhost:11434' @@ -77,5 +80,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start('embeddings-ollama', __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py index 2a92827e..eaf9b05e 100755 --- a/trustgraph/embeddings/vectorize/vectorize.py +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -4,14 +4,17 @@ Vectorizer, calls the embeddings service to get embeddings for a chunk. Input is text chunk, output is chunk and vectors. """ -from ... schema import Chunk, VectorsChunk +from ... schema import Chunk, ChunkEmbeddings +from ... schema import chunk_ingest_queue, chunk_embeddings_ingest_queue from ... embeddings_client import EmbeddingsClient from ... log_level import LogLevel from ... base import ConsumerProducer -default_input_queue = 'chunk-load' -default_output_queue = 'vectors-chunk-load' -default_subscriber = 'embeddings-vectorizer' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = chunk_ingest_queue +default_output_queue = chunk_embeddings_ingest_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -27,7 +30,7 @@ class Processor(ConsumerProducer): "output_queue": output_queue, "subscriber": subscriber, "input_schema": Chunk, - "output_schema": VectorsChunk, + "output_schema": ChunkEmbeddings, } ) @@ -35,7 +38,7 @@ class Processor(ConsumerProducer): def emit(self, source, chunk, vectors): - r = VectorsChunk(source=source, chunk=chunk, vectors=vectors) + r = ChunkEmbeddings(source=source, chunk=chunk, vectors=vectors) self.producer.send(r) def handle(self, msg): @@ -70,5 +73,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("embeddings-vectorize", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/embeddings_client.py b/trustgraph/embeddings_client.py index bac41074..e09216d0 100644 --- a/trustgraph/embeddings_client.py +++ b/trustgraph/embeddings_client.py @@ -3,7 +3,8 @@ import pulsar import _pulsar from pulsar.schema import JsonSchema -from trustgraph.schema import EmbeddingsRequest, EmbeddingsResponse +from . schema import EmbeddingsRequest, EmbeddingsResponse +from . schema import embeddings_request_queue, embeddings_response_queue import hashlib import uuid @@ -31,13 +32,13 @@ class EmbeddingsClient: ) self.producer = self.client.create_producer( - topic='embeddings', + topic=embeddings_request_queue, schema=JsonSchema(EmbeddingsRequest), chunking_enabled=True, ) self.consumer = self.client.subscribe( - 'embeddings-response', client_id, + embeddings_response_queue, client_id, schema=JsonSchema(EmbeddingsResponse), ) diff --git a/trustgraph/graph_rag_client.py b/trustgraph/graph_rag_client.py index 6f48e772..c6634623 100644 --- a/trustgraph/graph_rag_client.py +++ b/trustgraph/graph_rag_client.py @@ -3,7 +3,9 @@ import pulsar import _pulsar from pulsar.schema import JsonSchema -from trustgraph.schema import GraphRagQuery, GraphRagResponse +from . schema import GraphRagQuery, GraphRagResponse +from . schema import graph_rag_request_queue, graph_rag_response_queue + import hashlib import uuid @@ -29,13 +31,13 @@ class GraphRagClient: ) self.producer = self.client.create_producer( - topic='graph-rag-query', + topic=graph_rag_request_queue, schema=JsonSchema(GraphRagQuery), chunking_enabled=True, ) self.consumer = self.client.subscribe( - 'graph-rag-response', client_id, + graph_rag_response_queue, client_id, schema=JsonSchema(GraphRagResponse), ) diff --git a/trustgraph/kg/extract_definitions/extract.py b/trustgraph/kg/extract_definitions/extract.py index 3f6bf7b6..22b428b9 100755 --- a/trustgraph/kg/extract_definitions/extract.py +++ b/trustgraph/kg/extract_definitions/extract.py @@ -1,13 +1,14 @@ """ -Simple decoder, accepts vector+text chunks input, applies entity analysis to +Simple decoder, accepts embeddings+text chunks input, applies entity analysis to get entity definitions which are output as graph edges. """ import urllib.parse import json -from ... schema import VectorsChunk, Triple, Source, Value +from ... schema import ChunkEmbeddings, Triple, Source, Value +from ... schema import chunk_embeddings_ingest_queue, triples_store_queue from ... log_level import LogLevel from ... llm_client import LlmClient from ... prompts import to_definitions @@ -16,9 +17,11 @@ from ... base import ConsumerProducer DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) -default_input_queue = 'vectors-chunk-load' -default_output_queue = 'graph-load' -default_subscriber = 'kg-extract-definitions' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = chunk_embeddings_ingest_queue +default_output_queue = triples_store_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -33,7 +36,7 @@ class Processor(ConsumerProducer): "input_queue": input_queue, "output_queue": output_queue, "subscriber": subscriber, - "input_schema": VectorsChunk, + "input_schema": ChunkEmbeddings, "output_schema": Triple, } ) @@ -101,5 +104,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("kg-extract-definitions", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py index bf3b94c7..45f63e8a 100755 --- a/trustgraph/kg/extract_relationships/extract.py +++ b/trustgraph/kg/extract_relationships/extract.py @@ -10,7 +10,8 @@ import json import os from pulsar.schema import JsonSchema -from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value +from ... schema import ChunkEmbeddings, Triple, GraphEmbeddings, Source, Value +from ... schema import chunk_embeddings_ingest_queue, triples_store_queue, graph_embeddings_store_queue from ... log_level import LogLevel from ... llm_client import LlmClient from ... prompts import to_relationships @@ -19,10 +20,12 @@ from ... base import ConsumerProducer RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) -default_input_queue = 'vectors-chunk-load' -default_output_queue = 'graph-load' -default_subscriber = 'kg-extract-relationships' -default_vector_queue='vectors-load' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = chunk_embeddings_ingest_queue +default_output_queue = triples_store_queue +default_vector_queue = graph_embeddings_store_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -38,14 +41,14 @@ class Processor(ConsumerProducer): "input_queue": input_queue, "output_queue": output_queue, "subscriber": subscriber, - "input_schema": VectorsChunk, + "input_schema": ChunkEmbeddings, "output_schema": Triple, } ) self.vec_prod = self.client.create_producer( topic=vector_queue, - schema=JsonSchema(VectorsAssociation), + schema=JsonSchema(GraphEmbeddings), ) __class__.pubsub_metric.info({ @@ -53,9 +56,9 @@ class Processor(ConsumerProducer): "output_queue": output_queue, "vector_queue": vector_queue, "subscriber": subscriber, - "input_schema": VectorsChunk.__name__, + "input_schema": ChunkEmbeddings.__name__, "output_schema": Triple.__name__, - "vector_schema": VectorsAssociation.__name__, + "vector_schema": GraphEmbeddings.__name__, }) self.llm = LlmClient(pulsar_host=self.pulsar_host) @@ -84,7 +87,7 @@ class Processor(ConsumerProducer): def emit_vec(self, ent, vec): - r = VectorsAssociation(entity=ent, vectors=vec) + r = GraphEmbeddings(entity=ent, vectors=vec) self.vec_prod.send(r) def handle(self, msg): @@ -171,5 +174,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("kg-extract-relationships", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/llm_client.py b/trustgraph/llm_client.py index 4d392f0e..aa034b77 100644 --- a/trustgraph/llm_client.py +++ b/trustgraph/llm_client.py @@ -3,10 +3,13 @@ import pulsar import _pulsar from pulsar.schema import JsonSchema -from trustgraph.schema import TextCompletionRequest, TextCompletionResponse import hashlib import uuid +from . schema import TextCompletionRequest, TextCompletionResponse +from . schema import text_completion_request_queue +from . schema import text_completion_response_queue + # Ugly ERROR=_pulsar.LoggerLevel.Error WARN=_pulsar.LoggerLevel.Warn @@ -29,13 +32,13 @@ class LlmClient: ) self.producer = self.client.create_producer( - topic='llm-complete-text', + topic=text_completion_request_queue, schema=JsonSchema(TextCompletionRequest), chunking_enabled=True, ) self.consumer = self.client.subscribe( - 'llm-complete-text-response', client_id, + text_completion_response_queue, client_id, schema=JsonSchema(TextCompletionResponse), ) diff --git a/trustgraph/graph/__init__.py b/trustgraph/model/__init__.py similarity index 100% rename from trustgraph/graph/__init__.py rename to trustgraph/model/__init__.py diff --git a/trustgraph/llm/__init__.py b/trustgraph/model/text_completion/__init__.py similarity index 100% rename from trustgraph/llm/__init__.py rename to trustgraph/model/text_completion/__init__.py diff --git a/trustgraph/llm/azure_text/__init__.py b/trustgraph/model/text_completion/azure/__init__.py similarity index 100% rename from trustgraph/llm/azure_text/__init__.py rename to trustgraph/model/text_completion/azure/__init__.py diff --git a/trustgraph/llm/azure_text/__main__.py b/trustgraph/model/text_completion/azure/__main__.py similarity index 100% rename from trustgraph/llm/azure_text/__main__.py rename to trustgraph/model/text_completion/azure/__main__.py diff --git a/trustgraph/llm/azure_text/llm.py b/trustgraph/model/text_completion/azure/llm.py similarity index 85% rename from trustgraph/llm/azure_text/llm.py rename to trustgraph/model/text_completion/azure/llm.py index bccbf04f..e3e7a559 100755 --- a/trustgraph/llm/azure_text/llm.py +++ b/trustgraph/model/text_completion/azure/llm.py @@ -7,13 +7,17 @@ serverless endpoint service. Input is prompt, output is response. import requests import json -from ... schema import TextCompletionRequest, TextCompletionResponse -from ... log_level import LogLevel -from ... base import ConsumerProducer +from .... schema import TextCompletionRequest, TextCompletionResponse +from .... schema import text_completion_request_queue +from .... schema import text_completion_response_queue +from .... log_level import LogLevel +from .... base import ConsumerProducer -default_input_queue = 'llm-complete-text' -default_output_queue = 'llm-complete-text-response' -default_subscriber = 'llm-azure-text' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_completion_request_queue +default_output_queue = text_completion_response_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -121,4 +125,4 @@ class Processor(ConsumerProducer): def run(): - Processor.start("llm-azure-text", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/llm/claude_text/__init__.py b/trustgraph/model/text_completion/claude/__init__.py similarity index 100% rename from trustgraph/llm/claude_text/__init__.py rename to trustgraph/model/text_completion/claude/__init__.py diff --git a/trustgraph/llm/claude_text/__main__.py b/trustgraph/model/text_completion/claude/__main__.py similarity index 100% rename from trustgraph/llm/claude_text/__main__.py rename to trustgraph/model/text_completion/claude/__main__.py diff --git a/trustgraph/llm/claude_text/llm.py b/trustgraph/model/text_completion/claude/llm.py similarity index 84% rename from trustgraph/llm/claude_text/llm.py rename to trustgraph/model/text_completion/claude/llm.py index 230d540c..f5665201 100755 --- a/trustgraph/llm/claude_text/llm.py +++ b/trustgraph/model/text_completion/claude/llm.py @@ -6,13 +6,17 @@ Input is prompt, output is response. import anthropic -from ... schema import TextCompletionRequest, TextCompletionResponse -from ... log_level import LogLevel -from ... base import ConsumerProducer +from .... schema import TextCompletionRequest, TextCompletionResponse +from .... schema import text_completion_request_queue +from .... schema import text_completion_response_queue +from .... log_level import LogLevel +from .... base import ConsumerProducer -default_input_queue = 'llm-complete-text' -default_output_queue = 'llm-complete-text-response' -default_subscriber = 'llm-claude-text' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_completion_request_queue +default_output_queue = text_completion_response_queue +default_subscriber = module default_model = 'claude-3-5-sonnet-20240620' class Processor(ConsumerProducer): @@ -101,6 +105,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start("llm-claude-text", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/llm/ollama_text/__init__.py b/trustgraph/model/text_completion/ollama/__init__.py similarity index 100% rename from trustgraph/llm/ollama_text/__init__.py rename to trustgraph/model/text_completion/ollama/__init__.py diff --git a/trustgraph/llm/ollama_text/__main__.py b/trustgraph/model/text_completion/ollama/__main__.py similarity index 100% rename from trustgraph/llm/ollama_text/__main__.py rename to trustgraph/model/text_completion/ollama/__main__.py diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/model/text_completion/ollama/llm.py similarity index 83% rename from trustgraph/llm/ollama_text/llm.py rename to trustgraph/model/text_completion/ollama/llm.py index 3d256137..90cd9a53 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/model/text_completion/ollama/llm.py @@ -7,13 +7,17 @@ Input is prompt, output is response. from langchain_community.llms import Ollama from prometheus_client import Histogram, Info, Counter -from ... schema import TextCompletionRequest, TextCompletionResponse -from ... log_level import LogLevel -from ... base import ConsumerProducer +from .... schema import TextCompletionRequest, TextCompletionResponse +from .... schema import text_completion_request_queue +from .... schema import text_completion_response_queue +from .... log_level import LogLevel +from .... base import ConsumerProducer -default_input_queue = 'llm-complete-text' -default_output_queue = 'llm-complete-text-response' -default_subscriber = 'llm-ollama-text' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_completion_request_queue +default_output_queue = text_completion_response_queue +default_subscriber = module default_model = 'gemma2' default_ollama = 'http://localhost:11434' @@ -93,6 +97,6 @@ class Processor(ConsumerProducer): def run(): - Processor.start("llm-ollama-text", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/llm/vertexai_text/__init__.py b/trustgraph/model/text_completion/vertexai/__init__.py similarity index 100% rename from trustgraph/llm/vertexai_text/__init__.py rename to trustgraph/model/text_completion/vertexai/__init__.py diff --git a/trustgraph/llm/vertexai_text/__main__.py b/trustgraph/model/text_completion/vertexai/__main__.py similarity index 100% rename from trustgraph/llm/vertexai_text/__main__.py rename to trustgraph/model/text_completion/vertexai/__main__.py diff --git a/trustgraph/llm/vertexai_text/llm.py b/trustgraph/model/text_completion/vertexai/llm.py similarity index 90% rename from trustgraph/llm/vertexai_text/llm.py rename to trustgraph/model/text_completion/vertexai/llm.py index 30896fb5..4861dd8e 100755 --- a/trustgraph/llm/vertexai_text/llm.py +++ b/trustgraph/model/text_completion/vertexai/llm.py @@ -21,13 +21,17 @@ from vertexai.preview.generative_models import ( Tool, ) -from ... schema import TextCompletionRequest, TextCompletionResponse -from ... log_level import LogLevel -from ... base import ConsumerProducer +from .... schema import TextCompletionRequest, TextCompletionResponse +from .... schema import text_completion_request_queue +from .... schema import text_completion_response_queue +from .... log_level import LogLevel +from .... base import ConsumerProducer -default_input_queue = 'llm-complete-text' -default_output_queue = 'llm-complete-text-response' -default_subscriber = 'llm-vertexai-text' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_completion_request_queue +default_output_queue = text_completion_response_queue +default_subscriber = module class Processor(ConsumerProducer): @@ -169,5 +173,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start("llm-vertexai-text", __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/rag/__init__.py b/trustgraph/retrieval/__init__.py similarity index 100% rename from trustgraph/rag/__init__.py rename to trustgraph/retrieval/__init__.py diff --git a/trustgraph/rag/graph/__init__.py b/trustgraph/retrieval/graph_rag/__init__.py similarity index 100% rename from trustgraph/rag/graph/__init__.py rename to trustgraph/retrieval/graph_rag/__init__.py diff --git a/trustgraph/rag/graph/__main__.py b/trustgraph/retrieval/graph_rag/__main__.py similarity index 100% rename from trustgraph/rag/graph/__main__.py rename to trustgraph/retrieval/graph_rag/__main__.py diff --git a/trustgraph/rag/graph/rag.py b/trustgraph/retrieval/graph_rag/rag.py similarity index 92% rename from trustgraph/rag/graph/rag.py rename to trustgraph/retrieval/graph_rag/rag.py index 3c1d2b8a..993557a0 100755 --- a/trustgraph/rag/graph/rag.py +++ b/trustgraph/retrieval/graph_rag/rag.py @@ -5,13 +5,16 @@ Input is query, output is response. """ from ... schema import GraphRagQuery, GraphRagResponse +from ... schema import graph_rag_request_queue, graph_rag_response_queue from ... log_level import LogLevel from ... graph_rag import GraphRag from ... base import ConsumerProducer -default_input_queue = 'graph-rag-query' -default_output_queue = 'graph-rag-response' -default_subscriber = 'graph-rag' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = graph_rag_request_queue +default_output_queue = graph_rag_response_queue +default_subscriber = module default_graph_hosts = 'localhost' default_vector_store = 'http://localhost:19530' @@ -112,5 +115,5 @@ class Processor(ConsumerProducer): def run(): - Processor.start('graph-rag', __doc__) + Processor.start(module, __doc__) diff --git a/trustgraph/schema.py b/trustgraph/schema.py index d8fd4af7..2b590e6c 100644 --- a/trustgraph/schema.py +++ b/trustgraph/schema.py @@ -3,11 +3,7 @@ from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double from enum import Enum -#class Command(Enum): -# reindex = 1 - -#class IndexCommand(Record): -# command = Command +############################################################################ class Value(Record): value = String() @@ -19,49 +15,111 @@ class Source(Record): id = String() title = String() +############################################################################ + +# PDF docs etc. class Document(Record): source = Source() data = Bytes() +document_ingest_queue = 'document-load' + +############################################################################ + +# Text documents / text from PDF + class TextDocument(Record): source = Source() text = Bytes() +text_ingest_queue = 'text-document-load' + +############################################################################ + +# Chunks of text + class Chunk(Record): source = Source() chunk = Bytes() -class VectorsChunk(Record): +chunk_ingest_queue = 'chunk-load' + +############################################################################ + +# Chunk embeddings are an embeddings associated with a text chunk + +class ChunkEmbeddings(Record): source = Source() vectors = Array(Array(Double())) chunk = Bytes() -class VectorsAssociation(Record): +chunk_embeddings_ingest_queue = 'chunk-embeddings-load' + +############################################################################ + +# Graph embeddings are embeddings associated with a graph entity + +class GraphEmbeddings(Record): source = Source() vectors = Array(Array(Double())) entity = Value() +graph_embeddings_store_queue = 'graph-embeddings-store' + +############################################################################ + +# Graph triples + class Triple(Record): source = Source() s = Value() p = Value() o = Value() +triples_store_queue = 'triples-store' + +############################################################################ + +# chunk_embeddings_store_queue = 'chunk-embeddings-store' + +############################################################################ + +# LLM text completion + class TextCompletionRequest(Record): prompt = String() class TextCompletionResponse(Record): response = String() +text_completion_request_queue = 'text-completion' +text_completion_response_queue = 'text-completion-response' + +############################################################################ + +# Embeddings + class EmbeddingsRequest(Record): text = String() class EmbeddingsResponse(Record): vectors = Array(Array(Double())) +embeddings_request_queue = 'embeddings' +embeddings_response_queue = 'embeddings-response' + +############################################################################ + +# Graph RAG text retrieval + class GraphRagQuery(Record): query = String() class GraphRagResponse(Record): response = String() +graph_rag_request_queue = 'graph-rag' +graph_rag_response_queue = 'graph-rag-response' + +############################################################################ + diff --git a/trustgraph/vector/__init__.py b/trustgraph/storage/__init__.py similarity index 100% rename from trustgraph/vector/__init__.py rename to trustgraph/storage/__init__.py diff --git a/trustgraph/storage/graph_embeddings/__init__.py b/trustgraph/storage/graph_embeddings/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/graph/cassandra_write/__init__.py b/trustgraph/storage/graph_embeddings/milvus/__init__.py similarity index 100% rename from trustgraph/graph/cassandra_write/__init__.py rename to trustgraph/storage/graph_embeddings/milvus/__init__.py diff --git a/trustgraph/graph/cassandra_write/__main__.py b/trustgraph/storage/graph_embeddings/milvus/__main__.py similarity index 100% rename from trustgraph/graph/cassandra_write/__main__.py rename to trustgraph/storage/graph_embeddings/milvus/__main__.py diff --git a/trustgraph/vector/milvus_write/write.py b/trustgraph/storage/graph_embeddings/milvus/write.py similarity index 73% rename from trustgraph/vector/milvus_write/write.py rename to trustgraph/storage/graph_embeddings/milvus/write.py index 8ad56055..1a6883a1 100755 --- a/trustgraph/vector/milvus_write/write.py +++ b/trustgraph/storage/graph_embeddings/milvus/write.py @@ -3,13 +3,16 @@ Accepts entity/vector pairs and writes them to a Milvus store. """ -from ... schema import VectorsAssociation -from ... log_level import LogLevel -from ... triple_vectors import TripleVectors -from ... base import Consumer +from .... schema import GraphEmbeddings +from .... schema import graph_embeddings_store_queue +from .... log_level import LogLevel +from .... triple_vectors import TripleVectors +from .... base import Consumer -default_input_queue = 'vectors-load' -default_subscriber = 'vector-write-milvus' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = graph_embeddings_store_queue +default_subscriber = module default_store_uri = 'http://localhost:19530' class Processor(Consumer): @@ -24,7 +27,7 @@ class Processor(Consumer): **params | { "input_queue": input_queue, "subscriber": subscriber, - "input_schema": VectorsAssociation, + "input_schema": GraphEmbeddings, "store_uri": store_uri, } ) @@ -54,6 +57,5 @@ class Processor(Consumer): def run(): - Processor.start("vector-write-milvus", __doc__) + Processor.start(module, __doc__) - diff --git a/trustgraph/storage/triples/__init__.py b/trustgraph/storage/triples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/vector/milvus_write/__init__.py b/trustgraph/storage/triples/cassandra/__init__.py similarity index 100% rename from trustgraph/vector/milvus_write/__init__.py rename to trustgraph/storage/triples/cassandra/__init__.py diff --git a/trustgraph/vector/milvus_write/__main__.py b/trustgraph/storage/triples/cassandra/__main__.py similarity index 100% rename from trustgraph/vector/milvus_write/__main__.py rename to trustgraph/storage/triples/cassandra/__main__.py diff --git a/trustgraph/graph/cassandra_write/write.py b/trustgraph/storage/triples/cassandra/write.py similarity index 78% rename from trustgraph/graph/cassandra_write/write.py rename to trustgraph/storage/triples/cassandra/write.py index fe5864a0..7cff6304 100755 --- a/trustgraph/graph/cassandra_write/write.py +++ b/trustgraph/storage/triples/cassandra/write.py @@ -9,13 +9,16 @@ import os import argparse import time -from ... trustgraph import TrustGraph -from ... schema import Triple -from ... log_level import LogLevel -from ... base import Consumer +from .... trustgraph import TrustGraph +from .... schema import Triple +from .... schema import triples_store_queue +from .... log_level import LogLevel +from .... base import Consumer -default_input_queue = 'graph-load' -default_subscriber = 'graph-write-cassandra' +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_store_queue +default_subscriber = module default_graph_host='localhost' class Processor(Consumer): @@ -61,5 +64,5 @@ class Processor(Consumer): def run(): - Processor.start("graph-write-cassandra", __doc__) + Processor.start(module, __doc__)