diff --git a/Makefile b/Makefile index 2553abe0..4088caf4 100644 --- a/Makefile +++ b/Makefile @@ -65,10 +65,10 @@ some-containers: -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . ${DOCKER} build -f containers/Containerfile.flow \ -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . - ${DOCKER} build -f containers/Containerfile.vertexai \ - -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . - ${DOCKER} build -f containers/Containerfile.bedrock \ - -t ${CONTAINER_BASE}/trustgraph-bedrock:${VERSION} . +# ${DOCKER} build -f containers/Containerfile.vertexai \ +# -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . +# ${DOCKER} build -f containers/Containerfile.bedrock \ +# -t ${CONTAINER_BASE}/trustgraph-bedrock:${VERSION} . basic-containers: update-package-versions ${DOCKER} build -f containers/Containerfile.base \ diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index 46ade4c3..00afae1f 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -2,56 +2,93 @@ from trustgraph.schema import ConfigResponse from trustgraph.schema import ConfigValue, Error -# This behaves just like a dict, should be easier to add persistent storage -# later -class ConfigurationItems(dict): - pass +from ... tables.config import ConfigTableStore -class Configuration(dict): +class ConfigurationClass: + + async def keys(self): + return await self.table_store.get_keys(self.type) + + async def values(self): + vals = await self.table_store.get_values(self.type) + return { + v[0]: v[1] + for v in vals + } + + async def get(self, key): + return await self.table_store.get_value(self.type, key) + + async def put(self, key, value): + return await self.table_store.put_config(self.type, key, value) + + async def delete(self, key): + return await self.table_store.delete_key(self.type, key) + + async def has(self, key): + val = await self.table_store.get_value(self.type, key) + return val is not None + +class Configuration: # FIXME: The state is held internally. This only works if there's # one config service. Should be more than one, and use a # back-end state store. - def __init__(self, push): + # FIXME: This has state now, but does it address all of the above? + # REVIEW: Above - # Version counter - self.version = 0 + # FIXME: Some version vs config race conditions + + def __init__(self, push, host, user, password, keyspace): # External function to respond to update self.push = push - def __getitem__(self, key): - if key not in self: - self[key] = ConfigurationItems() - return dict.__getitem__(self, key) + self.table_store = ConfigTableStore( + host, user, password, keyspace + ) + + async def inc_version(self): + await self.table_store.inc_version() + + async def get_version(self): + return await self.table_store.get_version() + + def get(self, type): + + c = ConfigurationClass() + c.table_store = self.table_store + c.type = type + + return c async def handle_get(self, v): - for k in v.keys: - if k.type not in self or k.key not in self[k.type]: - return ConfigResponse( - version = None, - values = None, - directory = None, - config = None, - error = Error( - type = "key-error", - message = f"Key error" - ) - ) + # for k in v.keys: + # if k.type not in self or k.key not in self[k.type]: + # return ConfigResponse( + # version = None, + # values = None, + # directory = None, + # config = None, + # error = Error( + # type = "key-error", + # message = f"Key error" + # ) + # ) values = [ ConfigValue( type = k.type, key = k.key, - value = self[k.type][k.key] + value = await self.table_store.get_value(k.type, k.key) ) for k in v.keys ] return ConfigResponse( - version = self.version, + version = await self.get_version(), values = values, directory = None, config = None, @@ -60,23 +97,23 @@ class Configuration(dict): async def handle_list(self, v): - if v.type not in self: + # if v.type not in self: - return ConfigResponse( - version = None, - values = None, - directory = None, - config = None, - error = Error( - type = "key-error", - message = "No such type", - ), - ) + # return ConfigResponse( + # version = None, + # values = None, + # directory = None, + # config = None, + # error = Error( + # type = "key-error", + # message = "No such type", + # ), + # ) return ConfigResponse( - version = self.version, + version = await self.get_version(), values = None, - directory = list(self[v.type].keys()), + directory = await self.table_store.get_keys(v.type), config = None, error = None, ) @@ -96,17 +133,17 @@ class Configuration(dict): ) ) - values = [ - ConfigValue( - type = v.type, - key = k, - value = self[v.type][k], - ) - for k in self[v.type] - ] + v = await self.table_store.get_values(v.type) + + values = map( + lambda x: ConfigValue( + type = v.type, key = x[0], value = x[1] + ), + v + ) return ConfigResponse( - version = self.version, + version = await self.get_version(), values = values, directory = None, config = None, @@ -115,23 +152,24 @@ class Configuration(dict): async def handle_delete(self, v): - for k in v.keys: - if k.type not in self or k.key not in self[k.type]: - return ConfigResponse( - version = None, - values = None, - directory = None, - config = None, - error = Error( - type = "key-error", - message = f"Key error" - ) - ) + # for k in v.keys: + # if k.type not in self or k.key not in self[k.type]: + # return ConfigResponse( + # version = None, + # values = None, + # directory = None, + # config = None, + # error = Error( + # type = "key-error", + # message = f"Key error" + # ) + # ) for k in v.keys: - del self[k.type][k.key] - self.version += 1 + await self.table_store.delete_key(k.type, k.key) + + await self.inc_version() await self.push() @@ -147,9 +185,10 @@ class Configuration(dict): async def handle_put(self, v): for k in v.values: - self[k.type][k.key] = k.value - self.version += 1 + await self.table_store.put_config(k.type, k.key, k.value) + + await self.inc_version() await self.push() @@ -161,14 +200,29 @@ class Configuration(dict): error = None, ) + async def get_config(self): + + table = await self.table_store.get_all() + + config = {} + + for row in table: + if row[0] not in config: + config[row[0]] = {} + config[row[0]][row[1]] = row[2] + + return config + async def handle_config(self, v): + config = await self.get_config() + return ConfigResponse( - version = self.version, + version = await self.get_version(), value = None, directory = None, values = None, - config = self, + config = config, error = None, ) diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index 4d351f59..83e6835e 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -9,7 +9,7 @@ class FlowConfig: async def handle_list_classes(self, msg): - names = list(self.config["flow-classes"].keys()) + names = list(await self.config.get("flow-classes").keys()) return FlowResponse( error = None, @@ -20,14 +20,18 @@ class FlowConfig: return FlowResponse( error = None, - class_definition = self.config["flow-classes"][msg.class_name], + class_definition = await self.config.get( + "flow-classes" + ).get(msg.class_name), ) async def handle_put_class(self, msg): - self.config["flow-classes"][msg.class_name] = msg.class_definition + await self.config.get("flow-classes").put( + msg.class_name, msg.class_definition + ) - self.config.version += 1 + await self.config.inc_version() await self.config.push() @@ -39,9 +43,9 @@ class FlowConfig: print(msg) - del self.config["flow-classes"][msg.class_name] + await self.config.get("flow-classes").delete(msg.class_name) - self.config.version += 1 + await self.config.inc_version() await self.config.push() @@ -51,7 +55,7 @@ class FlowConfig: async def handle_list_flows(self, msg): - names = list(self.config["flows"].keys()) + names = list(await self.config.get("flows").keys()) return FlowResponse( error = None, @@ -60,7 +64,7 @@ class FlowConfig: async def handle_get_flow(self, msg): - flow = self.config["flows"][msg.flow_id] + flow = await self.config.get("flows").get(msg.flow_id) return FlowResponse( error = None, @@ -75,13 +79,13 @@ class FlowConfig: if msg.flow_id is None: raise RuntimeError("No flow ID") - if msg.flow_id in self.config["flows"]: + if msg.flow_id in await self.config.get("flows").values(): raise RuntimeError("Flow already exists") if msg.description is None: raise RuntimeError("No description") - if msg.class_name not in self.config["flow-classes"]: + if msg.class_name not in await self.config.get("flow-classes").values(): raise RuntimeError("Class does not exist") def repl_template(tmp): @@ -91,7 +95,9 @@ class FlowConfig: "{id}", msg.flow_id ) - cls = json.loads(self.config["flow-classes"][msg.class_name]) + cls = json.loads( + await self.config.get("flow-classes").get(msg.class_name) + ) for kind in ("class", "flow"): @@ -106,15 +112,18 @@ class FlowConfig: for k2, v2 in v.items() } - if processor in self.config["flows-active"]: - target = json.loads(self.config["flows-active"][processor]) + flac = await self.config.get("flows-active").values() + if processor in flac: + target = json.loads(flac[processor]) else: target = {} if variant not in target: target[variant] = v - self.config["flows-active"][processor] = json.dumps(target) + await self.config.get("flows-active").put( + processor, json.dumps(target) + ) def repl_interface(i): if isinstance(i, str): @@ -133,13 +142,16 @@ class FlowConfig: else: interfaces = {} - self.config["flows"][msg.flow_id] = json.dumps({ - "description": msg.description, - "class-name": msg.class_name, - "interfaces": interfaces, - }) + await self.config.get("flows").put( + msg.flow_id, + json.dumps({ + "description": msg.description, + "class-name": msg.class_name, + "interfaces": interfaces, + }) + ) - self.config.version += 1 + await self.config.inc_version() await self.config.push() @@ -152,17 +164,17 @@ class FlowConfig: if msg.flow_id is None: raise RuntimeError("No flow ID") - if msg.flow_id not in self.config["flows"]: + if msg.flow_id not in await self.config.get("flows").keys(): raise RuntimeError("Flow ID invalid") - flow = json.loads(self.config["flows"][msg.flow_id]) + flow = json.loads(await self.config.get("flows").get(msg.flow_id)) if "class-name" not in flow: raise RuntimeError("Internal error: flow has no flow class") class_name = flow["class-name"] - cls = json.loads(self.config["flow-classes"][class_name]) + cls = json.loads(await self.config.get("flow-classes").get(class_name)) def repl_template(tmp): return tmp.replace( @@ -179,20 +191,24 @@ class FlowConfig: variant = repl_template(variant) - if processor in self.config["flows-active"]: - target = json.loads(self.config["flows-active"][processor]) + flac = await self.config.get("flows-active").values() + + if processor in flac: + target = json.loads(flac[processor]) else: target = {} if variant in target: del target[variant] - self.config["flows-active"][processor] = json.dumps(target) + await self.config.get("flows-active").put( + processor, json.dumps(target) + ) - if msg.flow_id in self.config["flows"]: - del self.config["flows"][msg.flow_id] + if msg.flow_id in await self.config.get("flows").values(): + await self.config.get("flows").delete(msg.flow_id) - self.config.version += 1 + await self.config.inc_version() await self.config.push() diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 11cd156c..1ef81341 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -20,6 +20,9 @@ from . flow import FlowConfig from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from ... base import Consumer, Producer +# FIXME: How to ensure this doesn't conflict with other usage? +keyspace = "config" + default_ident = "config-svc" default_config_request_queue = config_request_queue @@ -29,6 +32,8 @@ default_config_push_queue = config_push_queue default_flow_request_queue = flow_request_queue default_flow_response_queue = flow_response_queue +default_cassandra_host = "cassandra" + class Processor(AsyncProcessor): def __init__(self, **params): @@ -50,6 +55,10 @@ class Processor(AsyncProcessor): "flow_response_queue", default_flow_response_queue ) + cassandra_host = params.get("cassandra_host", default_cassandra_host) + cassandra_user = params.get("cassandra_user") + cassandra_password = params.get("cassandra_password") + id = params.get("id") flow_request_schema = FlowRequest @@ -62,6 +71,8 @@ class Processor(AsyncProcessor): "config_push_schema": ConfigPush.__name__, "flow_request_schema": FlowRequest.__name__, "flow_response_schema": FlowResponse.__name__, + "cassandra_host": cassandra_host, + "cassandra_user": cassandra_user, } ) @@ -125,7 +136,14 @@ class Processor(AsyncProcessor): metrics = flow_response_metrics, ) - self.config = Configuration(self.push) + self.config = Configuration( + host = cassandra_host.split(","), + user = cassandra_user, + password = cassandra_password, + keyspace = keyspace, + push = self.push + ) + self.flow = FlowConfig(self.config) print("Service initialised.") @@ -138,18 +156,23 @@ class Processor(AsyncProcessor): async def push(self): + config = await self.config.get_config() + version = await self.config.get_version() + resp = ConfigPush( - version = self.config.version, + version = version, value = None, directory = None, values = None, - config = self.config, + config = config, error = None, ) await self.config_push_producer.send(resp) - print("Pushed version ", self.config.version) + # Race condition, should make sure version & config sync + + print("Pushed version ", await self.config.get_version()) async def on_config_request(self, msg, consumer, flow): @@ -248,6 +271,24 @@ class Processor(AsyncProcessor): help=f'Flow response queue {default_flow_response_queue}', ) + parser.add_argument( + '--cassandra-host', + default="cassandra", + help=f'Graph host (default: cassandra)' + ) + + parser.add_argument( + '--cassandra-user', + default=None, + help=f'Cassandra user' + ) + + parser.add_argument( + '--cassandra-password', + default=None, + help=f'Cassandra password' + ) + def run(): Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py new file mode 100644 index 00000000..3b9c2eb9 --- /dev/null +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -0,0 +1,302 @@ + +from .. schema import KnowledgeResponse, Triple, Triples, EntityEmbeddings +from .. schema import Metadata, Value, GraphEmbeddings + +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider +from ssl import SSLContext, PROTOCOL_TLSv1_2 + +import uuid +import time +import asyncio + +class ConfigTableStore: + + def __init__( + self, + cassandra_host, cassandra_user, cassandra_password, keyspace, + ): + + self.keyspace = keyspace + + print("Connecting to Cassandra...", flush=True) + + if cassandra_user and cassandra_password: + ssl_context = SSLContext(PROTOCOL_TLSv1_2) + auth_provider = PlainTextAuthProvider( + username=cassandra_user, password=cassandra_password + ) + self.cluster = Cluster( + cassandra_host, + auth_provider=auth_provider, + ssl_context=ssl_context + ) + else: + self.cluster = Cluster(cassandra_host) + + self.cassandra = self.cluster.connect() + + print("Connected.", flush=True) + + self.ensure_cassandra_schema() + + self.prepare_statements() + + def ensure_cassandra_schema(self): + + print("Ensure Cassandra schema...", flush=True) + + print("Keyspace...", flush=True) + + # FIXME: Replication factor should be configurable + self.cassandra.execute(f""" + create keyspace if not exists {self.keyspace} + with replication = {{ + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }}; + """); + + self.cassandra.set_keyspace(self.keyspace) + + print("config table...", flush=True) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS config ( + class text, + key text, + value text, + PRIMARY KEY (class, key) + ); + """); + + print("version table...", flush=True) + + self.cassandra.execute(""" + CREATE TABLE IF NOT EXISTS version ( + id text, + version counter, + PRIMARY KEY (id) + ); + """); + + resp = self.cassandra.execute(""" + SELECT version FROM version + """) + + print("ensure version...", flush=True) + + self.cassandra.execute(""" + UPDATE version set version = version + 0 + WHERE id = 'version' + """) + + print("Cassandra schema OK.", flush=True) + + async def inc_version(self): + + self.cassandra.execute(""" + UPDATE version set version = version + 1 + WHERE id = 'version' + """) + + async def get_version(self): + + resp = self.cassandra.execute(""" + SELECT version FROM version + WHERE id = 'version' + """) + + row = resp.one() + + if row: return row[0] + + return None + + def prepare_statements(self): + + self.put_config_stmt = self.cassandra.prepare(""" + INSERT INTO config ( class, key, value ) + VALUES (?, ?, ?) + """) + + self.get_classes_stmt = self.cassandra.prepare(""" + SELECT DISTINCT class FROM config; + """) + + self.get_keys_stmt = self.cassandra.prepare(""" + SELECT key FROM config WHERE class = ?; + """) + + self.get_value_stmt = self.cassandra.prepare(""" + SELECT value FROM config WHERE class = ? AND key = ?; + """) + + self.delete_key_stmt = self.cassandra.prepare(""" + DELETE FROM config + WHERE class = ? AND key = ?; + """) + + self.get_all_stmt = self.cassandra.prepare(""" + SELECT class, key, value FROM config; + """) + + self.get_values_stmt = self.cassandra.prepare(""" + SELECT key, value FROM config WHERE class = ?; + """) + + async def put_config(self, cls, key, value): + + while True: + + try: + + resp = self.cassandra.execute( + self.put_config_stmt, + ( cls, key, value ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + async def get_value(self, cls, key): + + while True: + + try: + + resp = self.cassandra.execute( + self.get_value_stmt, + ( cls, key ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + for row in resp: + return row[0] + + return None + + async def get_values(self, cls): + + while True: + + try: + + resp = self.cassandra.execute( + self.get_values_stmt, + ( cls, ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + return [ + [row[0], row[1]] + for row in resp + ] + + async def get_classes(self): + + while True: + + try: + + resp = self.cassandra.execute( + self.get_classes_stmt, + () + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + return [ + row[0] for row in resp + ] + + async def get_all(self): + + while True: + + try: + + resp = self.cassandra.execute( + self.get_all_stmt, + () + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + return [ + (row[0], row[1], row[2]) + for row in resp + ] + + async def get_keys(self, cls): + + while True: + + try: + + resp = self.cassandra.execute( + self.get_keys_stmt, + ( cls, ) + ) + + break + + except Exception as e: + + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) + + return [ + row[0] for row in resp + ] + + async def delete_key(self, cls, key): + + while True: + + try: + + resp = self.cassandra.execute( + self.delete_key_stmt, + (cls, key) + ) + + break + + except Exception as e: + print("Exception:", type(e)) + print(f"{e}, retry...", flush=True) + await asyncio.sleep(1) +