diff --git a/trustgraph-base/trustgraph/base/base_processor.py b/trustgraph-base/trustgraph/base/base_processor.py index a5a819c2..ebf2f46f 100644 --- a/trustgraph-base/trustgraph/base/base_processor.py +++ b/trustgraph-base/trustgraph/base/base_processor.py @@ -12,6 +12,7 @@ from .. log_level import LogLevel class BaseProcessor: default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) def __init__(self, **params): @@ -30,15 +31,23 @@ class BaseProcessor: pulsar_host = params.get("pulsar_host", self.default_pulsar_host) pulsar_listener = params.get("pulsar_listener", None) + pulsar_api_key = params.get("pulsar_api_key", None) log_level = params.get("log_level", LogLevel.INFO) self.pulsar_host = pulsar_host - - self.client = pulsar.Client( + if pulsar_api_key: + auth = pulsar.AuthenticationToken(pulsar_api_key) + self.client = pulsar.Client( + pulsar_host, + authentication=auth, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + else: + self.client = pulsar.Client( pulsar_host, listener_name=pulsar_listener, logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) + ) self.pulsar_listener = pulsar_listener @@ -56,6 +65,12 @@ class BaseProcessor: default=__class__.default_pulsar_host, help=f'Pulsar host (default: {__class__.default_pulsar_host})', ) + + parser.add_argument( + '--pulsar-api-key', + default=__class__.default_pulsar_api_key, + help=f'Pulsar API key', + ) parser.add_argument( '--pulsar-listener', diff --git a/trustgraph-base/trustgraph/base/publisher.py b/trustgraph-base/trustgraph/base/publisher.py index 37fae0ec..2da63331 100644 --- a/trustgraph-base/trustgraph/base/publisher.py +++ b/trustgraph-base/trustgraph/base/publisher.py @@ -31,8 +31,6 @@ class Publisher: while self.running: try: - - print(self.chunking_enabled) producer = self.client.create_producer( topic=self.topic, schema=self.schema, diff --git a/trustgraph-base/trustgraph/clients/agent_client.py b/trustgraph-base/trustgraph/clients/agent_client.py index 2ef69274..b31b4e36 100644 --- a/trustgraph-base/trustgraph/clients/agent_client.py +++ b/trustgraph-base/trustgraph/clients/agent_client.py @@ -20,6 +20,7 @@ class AgentClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue is None: input_queue = agent_request_queue @@ -33,6 +34,7 @@ class AgentClient(BaseClient): pulsar_host=pulsar_host, input_schema=AgentRequest, output_schema=AgentResponse, + pulsar_api_key=pulsar_api_key ) def request( diff --git a/trustgraph-base/trustgraph/clients/base.py b/trustgraph-base/trustgraph/clients/base.py index 78116f41..ac809123 100644 --- a/trustgraph-base/trustgraph/clients/base.py +++ b/trustgraph-base/trustgraph/clients/base.py @@ -27,6 +27,7 @@ class BaseClient: input_schema=None, output_schema=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: raise RuntimeError("Need input_queue") @@ -37,10 +38,18 @@ class BaseClient: if subscriber == None: subscriber = str(uuid.uuid4()) - self.client = pulsar.Client( + if pulsar_api_key: + auth = pulsar.AuthenticationToken(pulsar_api_key) + self.client = pulsar.Client( pulsar_host, logger=pulsar.ConsoleLogger(log_level), - ) + authentication=auth, + ) + else: + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level) + ) self.producer = self.client.create_producer( topic=input_queue, diff --git a/trustgraph-base/trustgraph/clients/document_embeddings_client.py b/trustgraph-base/trustgraph/clients/document_embeddings_client.py index 5b6d324e..14547595 100644 --- a/trustgraph-base/trustgraph/clients/document_embeddings_client.py +++ b/trustgraph-base/trustgraph/clients/document_embeddings_client.py @@ -20,6 +20,7 @@ class DocumentEmbeddingsClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class DocumentEmbeddingsClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=DocumentEmbeddingsRequest, output_schema=DocumentEmbeddingsResponse, ) diff --git a/trustgraph-base/trustgraph/clients/document_rag_client.py b/trustgraph-base/trustgraph/clients/document_rag_client.py index 103cbb69..6cbafa9b 100644 --- a/trustgraph-base/trustgraph/clients/document_rag_client.py +++ b/trustgraph-base/trustgraph/clients/document_rag_client.py @@ -20,6 +20,7 @@ class DocumentRagClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class DocumentRagClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=DocumentRagQuery, output_schema=DocumentRagResponse, ) diff --git a/trustgraph-base/trustgraph/clients/embeddings_client.py b/trustgraph-base/trustgraph/clients/embeddings_client.py index 8d21bdec..811f6ed2 100644 --- a/trustgraph-base/trustgraph/clients/embeddings_client.py +++ b/trustgraph-base/trustgraph/clients/embeddings_client.py @@ -20,6 +20,7 @@ class EmbeddingsClient(BaseClient): output_queue=None, subscriber=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class EmbeddingsClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=EmbeddingsRequest, output_schema=EmbeddingsResponse, ) diff --git a/trustgraph-base/trustgraph/clients/graph_embeddings_client.py b/trustgraph-base/trustgraph/clients/graph_embeddings_client.py index 401266bc..1a7a9512 100644 --- a/trustgraph-base/trustgraph/clients/graph_embeddings_client.py +++ b/trustgraph-base/trustgraph/clients/graph_embeddings_client.py @@ -20,6 +20,7 @@ class GraphEmbeddingsClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class GraphEmbeddingsClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=GraphEmbeddingsRequest, output_schema=GraphEmbeddingsResponse, ) diff --git a/trustgraph-base/trustgraph/clients/graph_rag_client.py b/trustgraph-base/trustgraph/clients/graph_rag_client.py index 9f8eff62..77102e36 100644 --- a/trustgraph-base/trustgraph/clients/graph_rag_client.py +++ b/trustgraph-base/trustgraph/clients/graph_rag_client.py @@ -20,6 +20,7 @@ class GraphRagClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class GraphRagClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=GraphRagQuery, output_schema=GraphRagResponse, ) diff --git a/trustgraph-base/trustgraph/clients/llm_client.py b/trustgraph-base/trustgraph/clients/llm_client.py index cfb0e606..a8894c8f 100644 --- a/trustgraph-base/trustgraph/clients/llm_client.py +++ b/trustgraph-base/trustgraph/clients/llm_client.py @@ -20,6 +20,7 @@ class LlmClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue is None: input_queue = text_completion_request_queue @@ -31,6 +32,7 @@ class LlmClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=TextCompletionRequest, output_schema=TextCompletionResponse, ) diff --git a/trustgraph-base/trustgraph/clients/prompt_client.py b/trustgraph-base/trustgraph/clients/prompt_client.py index 4b026cf0..91707670 100644 --- a/trustgraph-base/trustgraph/clients/prompt_client.py +++ b/trustgraph-base/trustgraph/clients/prompt_client.py @@ -39,6 +39,7 @@ class PromptClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -53,6 +54,7 @@ class PromptClient(BaseClient): input_queue=input_queue, output_queue=output_queue, pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, input_schema=PromptRequest, output_schema=PromptResponse, ) diff --git a/trustgraph-base/trustgraph/clients/triples_query_client.py b/trustgraph-base/trustgraph/clients/triples_query_client.py index fc1e4b26..8ed2ebb7 100644 --- a/trustgraph-base/trustgraph/clients/triples_query_client.py +++ b/trustgraph-base/trustgraph/clients/triples_query_client.py @@ -21,6 +21,7 @@ class TriplesQueryClient(BaseClient): input_queue=None, output_queue=None, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, ): if input_queue == None: @@ -34,6 +35,7 @@ class TriplesQueryClient(BaseClient): subscriber=subscriber, input_queue=input_queue, output_queue=output_queue, + pulsar_api_key=pulsar_api_key, pulsar_host=pulsar_host, input_schema=TriplesQueryRequest, output_schema=TriplesQueryResponse, diff --git a/trustgraph-cli/scripts/tg-invoke-agent b/trustgraph-cli/scripts/tg-invoke-agent index bb87c2a6..5e213447 100755 --- a/trustgraph-cli/scripts/tg-invoke-agent +++ b/trustgraph-cli/scripts/tg-invoke-agent @@ -137,6 +137,12 @@ def main(): action="store_true", help=f'Output thinking/observations' ) + + # parser.add_argument( + # '--pulsar-api-key', + # default=default_pulsar_api_key, + # help=f'Pulsar API key', + # ) args = parser.parse_args() diff --git a/trustgraph-cli/scripts/tg-invoke-document-rag b/trustgraph-cli/scripts/tg-invoke-document-rag index 3e0a9422..a3fc1958 100755 --- a/trustgraph-cli/scripts/tg-invoke-document-rag +++ b/trustgraph-cli/scripts/tg-invoke-document-rag @@ -34,8 +34,14 @@ def main(): help=f'API URL (default: {default_url})', ) + # parser.add_argument( + # '--pulsar-api-key', + # default=default_pulsar_api_key, + # help=f'Pulsar API key', + # ) + parser.add_argument( - '-q', '--question', + '-q', '--query', required=True, help=f'Question to answer', ) diff --git a/trustgraph-cli/scripts/tg-invoke-llm b/trustgraph-cli/scripts/tg-invoke-llm index d1a49e87..eb469b6e 100755 --- a/trustgraph-cli/scripts/tg-invoke-llm +++ b/trustgraph-cli/scripts/tg-invoke-llm @@ -44,6 +44,12 @@ def main(): nargs=1, help='LLM prompt e.g. What is 2 + 2?', ) + + # parser.add_argument( + # '--pulsar-api-key', + # default=default_pulsar_api_key, + # help=f'Pulsar API key', + # ) args = parser.parse_args() diff --git a/trustgraph-cli/scripts/tg-invoke-prompt b/trustgraph-cli/scripts/tg-invoke-prompt index 7d54dab0..426fe1ee 100755 --- a/trustgraph-cli/scripts/tg-invoke-prompt +++ b/trustgraph-cli/scripts/tg-invoke-prompt @@ -54,6 +54,12 @@ def main(): help='''Prompt template terms of the form variable=value, can be specified multiple times''', ) + + # parser.add_argument( + # '--pulsar-api-key', + # default=default_pulsar_api_key, + # help=f'Pulsar API key', + # ) args = parser.parse_args() diff --git a/trustgraph-cli/scripts/tg-load-pdf b/trustgraph-cli/scripts/tg-load-pdf index f27aaf5a..3e960c67 100755 --- a/trustgraph-cli/scripts/tg-load-pdf +++ b/trustgraph-cli/scripts/tg-load-pdf @@ -30,6 +30,7 @@ class Loader: user, collection, metadata, + pulsar_api_key=None, ): self.api = Api(url) diff --git a/trustgraph-cli/scripts/tg-load-text b/trustgraph-cli/scripts/tg-load-text index 634ac3d7..0cc221a5 100755 --- a/trustgraph-cli/scripts/tg-load-text +++ b/trustgraph-cli/scripts/tg-load-text @@ -80,6 +80,12 @@ def main(): default=default_url, help=f'API URL (default: {default_url})', ) + + # parser.add_argument( + # '--pulsar-api-key', + # default=default_pulsar_api_key, + # help=f'Pulsar API key', + # ) parser.add_argument( '-U', '--user', diff --git a/trustgraph-cli/scripts/tg-load-turtle b/trustgraph-cli/scripts/tg-load-turtle index 7c258fcc..3417a87d 100755 --- a/trustgraph-cli/scripts/tg-load-turtle +++ b/trustgraph-cli/scripts/tg-load-turtle @@ -19,6 +19,8 @@ from trustgraph.log_level import LogLevel default_user = 'trustgraph' default_collection = 'default' default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') +default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) + default_output_queue = triples_store_queue class Loader: @@ -31,12 +33,21 @@ class Loader: files, user, collection, + pulsar_api_key=None, ): - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) + if pulsar_api_key: + auth = pulsar.AuthenticationToken(pulsar_api_key) + self.client = pulsar.Client( + pulsar_host, + authentication=auth, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + else: + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) self.producer = self.client.create_producer( topic=output_queue, @@ -98,6 +109,12 @@ def main(): default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) + + parser.add_argument( + '--pulsar-api-key', + default=default_pulsar_api_key, + help=f'Pulsar API key', + ) parser.add_argument( '-o', '--output-queue', @@ -137,6 +154,7 @@ def main(): try: p = Loader( pulsar_host=args.pulsar_host, + pulsar_api_key=args.pulsar_api_key, output_queue=args.output_queue, log_level=args.log_level, files=args.files, diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index 684ebe5a..7544f008 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -156,14 +156,16 @@ class Processor(ConsumerProducer): subscriber=subscriber, input_queue=prompt_request_queue, output_queue=prompt_response_queue, - pulsar_host = self.pulsar_host + pulsar_host = self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, ) self.graph_rag = GraphRagClient( subscriber=subscriber, input_queue=graph_rag_request_queue, output_queue=graph_rag_response_queue, - pulsar_host = self.pulsar_host + pulsar_host = self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, ) # Need to be able to feed requests to myself diff --git a/trustgraph-flow/trustgraph/document_rag.py b/trustgraph-flow/trustgraph/document_rag.py index 86298783..f4676b15 100644 --- a/trustgraph-flow/trustgraph/document_rag.py +++ b/trustgraph-flow/trustgraph/document_rag.py @@ -59,6 +59,7 @@ class DocumentRag: def __init__( self, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, pr_request_queue=None, pr_response_queue=None, emb_request_queue=None, @@ -100,6 +101,7 @@ class DocumentRag: subscriber=module + "-de", input_queue=de_request_queue, output_queue=de_response_queue, + pulsar_api_key=pulsar_api_key, ) self.embeddings = EmbeddingsClient( @@ -107,6 +109,7 @@ class DocumentRag: input_queue=emb_request_queue, output_queue=emb_response_queue, subscriber=module + "-emb", + pulsar_api_key=pulsar_api_key, ) self.lang = PromptClient( @@ -114,6 +117,7 @@ class DocumentRag: input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber=module + "-de-prompt", + pulsar_api_key=pulsar_api_key, ) if self.verbose: diff --git a/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py index 6a4a4a67..70f53e07 100755 --- a/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/document_embeddings/embeddings.py @@ -47,6 +47,7 @@ class Processor(ConsumerProducer): self.embeddings = EmbeddingsClient( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, input_queue=emb_request_queue, output_queue=emb_response_queue, subscriber=module + "-emb", diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index c88005b7..47c99802 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -79,6 +79,7 @@ class Processor(ConsumerProducer): self.prompt = PromptClient( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber = module + "-prompt", diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 82973662..2f293527 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -54,6 +54,7 @@ class Processor(ConsumerProducer): self.prompt = PromptClient( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber = module + "-prompt", diff --git a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py index efac2c05..7424abe2 100755 --- a/trustgraph-flow/trustgraph/extract/kg/topics/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/topics/extract.py @@ -52,6 +52,7 @@ class Processor(ConsumerProducer): self.prompt = PromptClient( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber = module + "-prompt", diff --git a/trustgraph-flow/trustgraph/extract/object/row/extract.py b/trustgraph-flow/trustgraph/extract/object/row/extract.py index 73aa5edc..db02a651 100755 --- a/trustgraph-flow/trustgraph/extract/object/row/extract.py +++ b/trustgraph-flow/trustgraph/extract/object/row/extract.py @@ -112,6 +112,7 @@ class Processor(ConsumerProducer): self.prompt = PromptClient( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber = module + "-prompt", diff --git a/trustgraph-flow/trustgraph/gateway/agent.py b/trustgraph-flow/trustgraph/gateway/agent.py index 150b970e..f66e68b9 100644 --- a/trustgraph-flow/trustgraph/gateway/agent.py +++ b/trustgraph-flow/trustgraph/gateway/agent.py @@ -7,10 +7,18 @@ from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class AgentRequestor(ServiceRequestor): +<<<<<<< HEAD def __init__(self, pulsar_client, timeout, auth): super(AgentRequestor, self).__init__( pulsar_client=pulsar_client, +======= + def __init__(self, pulsar_host, timeout, auth, pulsar_api_key=None): + + super(AgentRequestor, self).__init__( + pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, +>>>>>>> a5d5b4c (Add pulsar API token check) request_queue=agent_request_queue, response_queue=agent_response_queue, request_schema=AgentRequest, diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py index a32c3e68..790ea27e 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_stream.py @@ -27,7 +27,8 @@ class DocumentEmbeddingsStreamEndpoint(SocketEndpoint): self.subscriber = Subscriber( self.pulsar_client, document_embeddings_store_queue, "api-gateway", "api-gateway", - schema=JsonSchema(DocumentEmbeddings) + schema=JsonSchema(DocumentEmbeddings), + pulsar_api_key=self.pulsar_api_key ) async def listener(self, ws, running): diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py index 385eb9f4..30204b6b 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_stream.py @@ -26,6 +26,7 @@ class GraphEmbeddingsStreamEndpoint(SocketEndpoint): self.subscriber = Subscriber( self.pulsar_client, graph_embeddings_store_queue, "api-gateway", "api-gateway", + pulsar_api_key=self.pulsar_api_key, schema=JsonSchema(GraphEmbeddings) ) diff --git a/trustgraph-flow/trustgraph/gateway/mux.py b/trustgraph-flow/trustgraph/gateway/mux.py index 23b693ab..b03d156a 100644 --- a/trustgraph-flow/trustgraph/gateway/mux.py +++ b/trustgraph-flow/trustgraph/gateway/mux.py @@ -21,6 +21,7 @@ class MuxEndpoint(SocketEndpoint): self, pulsar_client, auth, services, path="/api/v1/socket", + pulsar_api_key=None ): super(MuxEndpoint, self).__init__( diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 68ab1b58..468159cd 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -19,6 +19,7 @@ class ServiceRequestor: response_queue, response_schema, subscription="api-gateway", consumer_name="api-gateway", timeout=600, + pulsar_api_key=None, ): self.pub = Publisher( @@ -29,6 +30,7 @@ class ServiceRequestor: self.sub = Subscriber( pulsar_client, response_queue, subscription, consumer_name, + pulsar_api_key, JsonSchema(response_schema) ) diff --git a/trustgraph-flow/trustgraph/gateway/sender.py b/trustgraph-flow/trustgraph/gateway/sender.py index 32c586b1..1e8f5253 100644 --- a/trustgraph-flow/trustgraph/gateway/sender.py +++ b/trustgraph-flow/trustgraph/gateway/sender.py @@ -17,6 +17,7 @@ class ServiceSender: self, pulsar_client, request_queue, request_schema, + pulsar_api_key=None, ): self.pub = Publisher( diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index dff55b0e..8c26d5f4 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -57,6 +57,7 @@ logger.setLevel(logging.INFO) default_pulsar_host = os.getenv("PULSAR_HOST", "pulsar://pulsar:6650") default_prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus:9090") +default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) default_timeout = 600 default_port = 8088 default_api_token = os.getenv("GATEWAY_SECRET", "") @@ -73,11 +74,20 @@ class Api: self.port = int(config.get("port", default_port)) self.timeout = int(config.get("timeout", default_timeout)) self.pulsar_host = config.get("pulsar_host", default_pulsar_host) + self.pulsar_api_key = config.get( + "pulsar_api_key", default_pulsar_api_key + ) self.pulsar_listener = config.get("pulsar_listener", None) - self.pulsar_client = pulsar.Client( - self.pulsar_host, listener_name=self.pulsar_listener - ) + if self.pulsar_api_key: + self.pulsar_client = pulsar.Client( + self.pulsar_host, listener_name=self.pulsar_listener, + authentication=pulsar.AuthenticationToken(self.pulsar_api_key) + ) + else: + self.pulsar_client = pulsar.Client( + self.pulsar_host, listener_name=self.pulsar_listener, + ) self.prometheus_url = config.get( "prometheus_url", default_prometheus_url, @@ -224,6 +234,7 @@ class Api: TriplesLoadEndpoint( pulsar_client=self.pulsar_client, auth = self.auth, + pulsar_api_key=self.pulsar_api_key, ), GraphEmbeddingsLoadEndpoint( pulsar_client=self.pulsar_client, @@ -237,6 +248,7 @@ class Api: pulsar_client=self.pulsar_client, auth = self.auth, services = self.services, + pulsar_api_key=self.pulsar_api_key, ), MetricsEndpoint( endpoint_path = "/api/v1/metrics", @@ -270,6 +282,12 @@ def run(): default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) + + parser.add_argument( + '--pulsar-api-key', + default=default_pulsar_api_key, + help=f'Pulsar API key', + ) parser.add_argument( '--pulsar-listener', diff --git a/trustgraph-flow/trustgraph/gateway/triples_stream.py b/trustgraph-flow/trustgraph/gateway/triples_stream.py index a5d5ad0a..9a6a7434 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_stream.py +++ b/trustgraph-flow/trustgraph/gateway/triples_stream.py @@ -24,6 +24,7 @@ class TriplesStreamEndpoint(SocketEndpoint): self.subscriber = Subscriber( self.pulsar_client, triples_store_queue, "api-gateway", "api-gateway", + pulsar_api_key=self.pulsar_api_key, schema=JsonSchema(Triples) ) diff --git a/trustgraph-flow/trustgraph/graph_rag.py b/trustgraph-flow/trustgraph/graph_rag.py index f69ebeb7..a40ba32c 100644 --- a/trustgraph-flow/trustgraph/graph_rag.py +++ b/trustgraph-flow/trustgraph/graph_rag.py @@ -161,6 +161,7 @@ class GraphRag: def __init__( self, pulsar_host="pulsar://pulsar:6650", + pulsar_api_key=None, pr_request_queue=None, pr_response_queue=None, emb_request_queue=None, @@ -207,6 +208,7 @@ class GraphRag: self.ge_client = GraphEmbeddingsClient( pulsar_host=pulsar_host, + pulsar_api_key=-pulsar_api_key, subscriber=module + "-ge", input_queue=ge_request_queue, output_queue=ge_response_queue, @@ -214,6 +216,7 @@ class GraphRag: self.triples_client = TriplesQueryClient( pulsar_host=pulsar_host, + pulsar_api_key=-pulsar_api_key, subscriber=module + "-tpl", input_queue=tpl_request_queue, output_queue=tpl_response_queue @@ -221,6 +224,7 @@ class GraphRag: self.embeddings = EmbeddingsClient( pulsar_host=pulsar_host, + pulsar_api_key=-pulsar_api_key, input_queue=emb_request_queue, output_queue=emb_response_queue, subscriber=module + "-emb", @@ -234,6 +238,7 @@ class GraphRag: self.prompt = PromptClient( pulsar_host=pulsar_host, + pulsar_api_key=-pulsar_api_key, input_queue=pr_request_queue, output_queue=pr_response_queue, subscriber=module + "-prompt", diff --git a/trustgraph-flow/trustgraph/model/prompt/generic/service.py b/trustgraph-flow/trustgraph/model/prompt/generic/service.py index d4c1846d..8d533fcf 100755 --- a/trustgraph-flow/trustgraph/model/prompt/generic/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/generic/service.py @@ -63,7 +63,8 @@ class Processor(ConsumerProducer): subscriber=subscriber, input_queue=tc_request_queue, output_queue=tc_response_queue, - pulsar_host = self.pulsar_host + pulsar_host = self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, ) def parse_json(self, text): diff --git a/trustgraph-flow/trustgraph/model/prompt/template/service.py b/trustgraph-flow/trustgraph/model/prompt/template/service.py index f1f2c0ed..58657d7d 100755 --- a/trustgraph-flow/trustgraph/model/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/template/service.py @@ -136,7 +136,8 @@ class Processor(ConsumerProducer): subscriber=subscriber, input_queue=tc_request_queue, output_queue=tc_response_queue, - pulsar_host = self.pulsar_host + pulsar_host = self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, ) # System prompt hack diff --git a/trustgraph-flow/trustgraph/processing/processing.py b/trustgraph-flow/trustgraph/processing/processing.py index 5e4c7c8a..5352776a 100644 --- a/trustgraph-flow/trustgraph/processing/processing.py +++ b/trustgraph-flow/trustgraph/processing/processing.py @@ -49,11 +49,12 @@ class Processing: pulsar_host, log_level, file, + pulsar_api_key=None, ): self.pulsar_host = pulsar_host self.log_level = log_level self.file = file - + self.pulsar_api_key = pulsar_api_key self.defs = load(open(file, "r"), Loader=Loader) def run(self): @@ -68,6 +69,7 @@ class Processing: params = { "pulsar_host": self.pulsar_host, + "pulsar_api_key": self.pulsar_api_key, "log_level": str(self.log_level), } @@ -125,12 +127,19 @@ def run(): ) default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None) parser.add_argument( '-p', '--pulsar-host', default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) + + parser.add_argument( + '--pulsar-api-key', + default=default_pulsar_api_key, + help=f'Pulsar API key', + ) parser.add_argument( '-l', '--log-level', @@ -153,6 +162,7 @@ def run(): try: p = Processing( pulsar_host=args.pulsar_host, + pulsar_api_key=args.pulsar_api_key, file=args.file, log_level=args.log_level, ) diff --git a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py index 29203b4c..23b46129 100755 --- a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py @@ -68,6 +68,7 @@ class Processor(ConsumerProducer): self.rag = DocumentRag( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, pr_request_queue=pr_request_queue, pr_response_queue=pr_response_queue, emb_request_queue=emb_request_queue, diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py index 4f7b373c..4095d0c3 100755 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py @@ -82,6 +82,7 @@ class Processor(ConsumerProducer): self.rag = GraphRag( pulsar_host=self.pulsar_host, + pulsar_api_key=self.pulsar_api_key, pr_request_queue=pr_request_queue, pr_response_queue=pr_response_queue, emb_request_queue=emb_request_queue,