Using persistent/non-persistent, tenants and namespaces.

This commit is contained in:
Cyber MacGeddon 2024-07-29 22:23:32 +01:00
parent 5d53de74cb
commit cd0671e777

View file

@ -3,6 +3,9 @@ from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double
from enum import Enum from enum import Enum
def topic(topic, kind='persistent', tenant='tg', namespace='flow'):
return f"{kind}://{tenant}/{namespace}/{topic}"
############################################################################ ############################################################################
class Value(Record): class Value(Record):
@ -22,7 +25,7 @@ class Document(Record):
source = Source() source = Source()
data = Bytes() data = Bytes()
document_ingest_queue = 'document-load' document_ingest_queue = topic('document-load')
############################################################################ ############################################################################
@ -32,7 +35,7 @@ class TextDocument(Record):
source = Source() source = Source()
text = Bytes() text = Bytes()
text_ingest_queue = 'text-document-load' text_ingest_queue = topic('text-document-load')
############################################################################ ############################################################################
@ -42,7 +45,7 @@ class Chunk(Record):
source = Source() source = Source()
chunk = Bytes() chunk = Bytes()
chunk_ingest_queue = 'chunk-load' chunk_ingest_queue = topic('chunk-load')
############################################################################ ############################################################################
@ -53,7 +56,7 @@ class ChunkEmbeddings(Record):
vectors = Array(Array(Double())) vectors = Array(Array(Double()))
chunk = Bytes() chunk = Bytes()
chunk_embeddings_ingest_queue = 'chunk-embeddings-load' chunk_embeddings_ingest_queue = topic('chunk-embeddings-load')
############################################################################ ############################################################################
@ -64,7 +67,7 @@ class GraphEmbeddings(Record):
vectors = Array(Array(Double())) vectors = Array(Array(Double()))
entity = Value() entity = Value()
graph_embeddings_store_queue = 'graph-embeddings-store' graph_embeddings_store_queue = topic('graph-embeddings-store')
############################################################################ ############################################################################
@ -76,11 +79,11 @@ class Triple(Record):
p = Value() p = Value()
o = Value() o = Value()
triples_store_queue = 'triples-store' triples_store_queue = topic('triples-store')
############################################################################ ############################################################################
# chunk_embeddings_store_queue = 'chunk-embeddings-store' # chunk_embeddings_store_queue = topic('chunk-embeddings-store')
############################################################################ ############################################################################
@ -92,8 +95,12 @@ class TextCompletionRequest(Record):
class TextCompletionResponse(Record): class TextCompletionResponse(Record):
response = String() response = String()
text_completion_request_queue = 'text-completion' text_completion_request_queue = topic(
text_completion_response_queue = 'text-completion-response' 'text-completion', kind='non-persistent', namespace='request'
)
text_completion_response_queue = topic(
'text-completion-response', kind='non-persistent', namespace='response',
)
############################################################################ ############################################################################
@ -105,8 +112,12 @@ class EmbeddingsRequest(Record):
class EmbeddingsResponse(Record): class EmbeddingsResponse(Record):
vectors = Array(Array(Double())) vectors = Array(Array(Double()))
embeddings_request_queue = 'embeddings' embeddings_request_queue = topic(
embeddings_response_queue = 'embeddings-response' 'embeddings', kind='non-persistent', namespace='request'
)
embeddings_response_queue = topic(
'embeddings-response', kind='non-persistent', namespace='response'
)
############################################################################ ############################################################################
@ -118,8 +129,12 @@ class GraphRagQuery(Record):
class GraphRagResponse(Record): class GraphRagResponse(Record):
response = String() response = String()
graph_rag_request_queue = 'graph-rag' graph_rag_request_queue = topic(
graph_rag_response_queue = 'graph-rag-response' 'graph-rag', kind='non-persistent', namespace='request'
)
graph_rag_response_queue = topic(
'graph-rag-response', kind='non-persistent', namespace='response'
)
############################################################################ ############################################################################