From cd0671e7772faa8c02cf094fe6eb698e0e8f0544 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Mon, 29 Jul 2024 22:23:32 +0100 Subject: [PATCH] Using persistent/non-persistent, tenants and namespaces. --- trustgraph/schema.py | 41 ++++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/trustgraph/schema.py b/trustgraph/schema.py index 2b590e6c..a84a5880 100644 --- a/trustgraph/schema.py +++ b/trustgraph/schema.py @@ -3,6 +3,9 @@ from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double from enum import Enum +def topic(topic, kind='persistent', tenant='tg', namespace='flow'): + return f"{kind}://{tenant}/{namespace}/{topic}" + ############################################################################ class Value(Record): @@ -22,7 +25,7 @@ class Document(Record): source = Source() data = Bytes() -document_ingest_queue = 'document-load' +document_ingest_queue = topic('document-load') ############################################################################ @@ -32,7 +35,7 @@ class TextDocument(Record): source = Source() text = Bytes() -text_ingest_queue = 'text-document-load' +text_ingest_queue = topic('text-document-load') ############################################################################ @@ -42,7 +45,7 @@ class Chunk(Record): source = Source() chunk = Bytes() -chunk_ingest_queue = 'chunk-load' +chunk_ingest_queue = topic('chunk-load') ############################################################################ @@ -53,7 +56,7 @@ class ChunkEmbeddings(Record): vectors = Array(Array(Double())) chunk = Bytes() -chunk_embeddings_ingest_queue = 'chunk-embeddings-load' +chunk_embeddings_ingest_queue = topic('chunk-embeddings-load') ############################################################################ @@ -64,7 +67,7 @@ class GraphEmbeddings(Record): vectors = Array(Array(Double())) entity = Value() -graph_embeddings_store_queue = 'graph-embeddings-store' +graph_embeddings_store_queue = topic('graph-embeddings-store') ############################################################################ @@ -76,11 +79,11 @@ class Triple(Record): p = Value() o = Value() -triples_store_queue = 'triples-store' +triples_store_queue = topic('triples-store') ############################################################################ -# chunk_embeddings_store_queue = 'chunk-embeddings-store' +# chunk_embeddings_store_queue = topic('chunk-embeddings-store') ############################################################################ @@ -92,8 +95,12 @@ class TextCompletionRequest(Record): class TextCompletionResponse(Record): response = String() -text_completion_request_queue = 'text-completion' -text_completion_response_queue = 'text-completion-response' +text_completion_request_queue = topic( + 'text-completion', kind='non-persistent', namespace='request' +) +text_completion_response_queue = topic( + 'text-completion-response', kind='non-persistent', namespace='response', +) ############################################################################ @@ -105,8 +112,12 @@ class EmbeddingsRequest(Record): class EmbeddingsResponse(Record): vectors = Array(Array(Double())) -embeddings_request_queue = 'embeddings' -embeddings_response_queue = 'embeddings-response' +embeddings_request_queue = topic( + 'embeddings', kind='non-persistent', namespace='request' +) +embeddings_response_queue = topic( + 'embeddings-response', kind='non-persistent', namespace='response' +) ############################################################################ @@ -118,8 +129,12 @@ class GraphRagQuery(Record): class GraphRagResponse(Record): response = String() -graph_rag_request_queue = 'graph-rag' -graph_rag_response_queue = 'graph-rag-response' +graph_rag_request_queue = topic( + 'graph-rag', kind='non-persistent', namespace='request' +) +graph_rag_response_queue = topic( + 'graph-rag-response', kind='non-persistent', namespace='response' +) ############################################################################