Feature/persist config (#370)

* Cassandra tables for config

* Config is backed by Cassandra
This commit is contained in:
cybermaggedon 2025-05-07 12:58:32 +01:00 committed by GitHub
parent f7123ac57f
commit 4461d7b289
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 516 additions and 103 deletions

View file

@ -65,10 +65,10 @@ some-containers:
-t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} .
${DOCKER} build -f containers/Containerfile.flow \ ${DOCKER} build -f containers/Containerfile.flow \
-t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} .
${DOCKER} build -f containers/Containerfile.vertexai \ # ${DOCKER} build -f containers/Containerfile.vertexai \
-t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . # -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} .
${DOCKER} build -f containers/Containerfile.bedrock \ # ${DOCKER} build -f containers/Containerfile.bedrock \
-t ${CONTAINER_BASE}/trustgraph-bedrock:${VERSION} . # -t ${CONTAINER_BASE}/trustgraph-bedrock:${VERSION} .
basic-containers: update-package-versions basic-containers: update-package-versions
${DOCKER} build -f containers/Containerfile.base \ ${DOCKER} build -f containers/Containerfile.base \

View file

@ -2,56 +2,93 @@
from trustgraph.schema import ConfigResponse from trustgraph.schema import ConfigResponse
from trustgraph.schema import ConfigValue, Error from trustgraph.schema import ConfigValue, Error
# This behaves just like a dict, should be easier to add persistent storage from ... tables.config import ConfigTableStore
# later
class ConfigurationItems(dict):
pass
class Configuration(dict): class ConfigurationClass:
async def keys(self):
return await self.table_store.get_keys(self.type)
async def values(self):
vals = await self.table_store.get_values(self.type)
return {
v[0]: v[1]
for v in vals
}
async def get(self, key):
return await self.table_store.get_value(self.type, key)
async def put(self, key, value):
return await self.table_store.put_config(self.type, key, value)
async def delete(self, key):
return await self.table_store.delete_key(self.type, key)
async def has(self, key):
val = await self.table_store.get_value(self.type, key)
return val is not None
class Configuration:
# FIXME: The state is held internally. This only works if there's # FIXME: The state is held internally. This only works if there's
# one config service. Should be more than one, and use a # one config service. Should be more than one, and use a
# back-end state store. # back-end state store.
def __init__(self, push): # FIXME: This has state now, but does it address all of the above?
# REVIEW: Above
# Version counter # FIXME: Some version vs config race conditions
self.version = 0
def __init__(self, push, host, user, password, keyspace):
# External function to respond to update # External function to respond to update
self.push = push self.push = push
def __getitem__(self, key): self.table_store = ConfigTableStore(
if key not in self: host, user, password, keyspace
self[key] = ConfigurationItems() )
return dict.__getitem__(self, key)
async def inc_version(self):
await self.table_store.inc_version()
async def get_version(self):
return await self.table_store.get_version()
def get(self, type):
c = ConfigurationClass()
c.table_store = self.table_store
c.type = type
return c
async def handle_get(self, v): async def handle_get(self, v):
for k in v.keys: # for k in v.keys:
if k.type not in self or k.key not in self[k.type]: # if k.type not in self or k.key not in self[k.type]:
return ConfigResponse( # return ConfigResponse(
version = None, # version = None,
values = None, # values = None,
directory = None, # directory = None,
config = None, # config = None,
error = Error( # error = Error(
type = "key-error", # type = "key-error",
message = f"Key error" # message = f"Key error"
) # )
) # )
values = [ values = [
ConfigValue( ConfigValue(
type = k.type, type = k.type,
key = k.key, key = k.key,
value = self[k.type][k.key] value = await self.table_store.get_value(k.type, k.key)
) )
for k in v.keys for k in v.keys
] ]
return ConfigResponse( return ConfigResponse(
version = self.version, version = await self.get_version(),
values = values, values = values,
directory = None, directory = None,
config = None, config = None,
@ -60,23 +97,23 @@ class Configuration(dict):
async def handle_list(self, v): async def handle_list(self, v):
if v.type not in self: # if v.type not in self:
return ConfigResponse( # return ConfigResponse(
version = None, # version = None,
values = None, # values = None,
directory = None, # directory = None,
config = None, # config = None,
error = Error( # error = Error(
type = "key-error", # type = "key-error",
message = "No such type", # message = "No such type",
), # ),
) # )
return ConfigResponse( return ConfigResponse(
version = self.version, version = await self.get_version(),
values = None, values = None,
directory = list(self[v.type].keys()), directory = await self.table_store.get_keys(v.type),
config = None, config = None,
error = None, error = None,
) )
@ -96,17 +133,17 @@ class Configuration(dict):
) )
) )
values = [ v = await self.table_store.get_values(v.type)
ConfigValue(
type = v.type, values = map(
key = k, lambda x: ConfigValue(
value = self[v.type][k], type = v.type, key = x[0], value = x[1]
) ),
for k in self[v.type] v
] )
return ConfigResponse( return ConfigResponse(
version = self.version, version = await self.get_version(),
values = values, values = values,
directory = None, directory = None,
config = None, config = None,
@ -115,23 +152,24 @@ class Configuration(dict):
async def handle_delete(self, v): async def handle_delete(self, v):
for k in v.keys: # for k in v.keys:
if k.type not in self or k.key not in self[k.type]: # if k.type not in self or k.key not in self[k.type]:
return ConfigResponse( # return ConfigResponse(
version = None, # version = None,
values = None, # values = None,
directory = None, # directory = None,
config = None, # config = None,
error = Error( # error = Error(
type = "key-error", # type = "key-error",
message = f"Key error" # message = f"Key error"
) # )
) # )
for k in v.keys: for k in v.keys:
del self[k.type][k.key]
self.version += 1 await self.table_store.delete_key(k.type, k.key)
await self.inc_version()
await self.push() await self.push()
@ -147,9 +185,10 @@ class Configuration(dict):
async def handle_put(self, v): async def handle_put(self, v):
for k in v.values: for k in v.values:
self[k.type][k.key] = k.value
self.version += 1 await self.table_store.put_config(k.type, k.key, k.value)
await self.inc_version()
await self.push() await self.push()
@ -161,14 +200,29 @@ class Configuration(dict):
error = None, error = None,
) )
async def get_config(self):
table = await self.table_store.get_all()
config = {}
for row in table:
if row[0] not in config:
config[row[0]] = {}
config[row[0]][row[1]] = row[2]
return config
async def handle_config(self, v): async def handle_config(self, v):
config = await self.get_config()
return ConfigResponse( return ConfigResponse(
version = self.version, version = await self.get_version(),
value = None, value = None,
directory = None, directory = None,
values = None, values = None,
config = self, config = config,
error = None, error = None,
) )

View file

@ -9,7 +9,7 @@ class FlowConfig:
async def handle_list_classes(self, msg): async def handle_list_classes(self, msg):
names = list(self.config["flow-classes"].keys()) names = list(await self.config.get("flow-classes").keys())
return FlowResponse( return FlowResponse(
error = None, error = None,
@ -20,14 +20,18 @@ class FlowConfig:
return FlowResponse( return FlowResponse(
error = None, error = None,
class_definition = self.config["flow-classes"][msg.class_name], class_definition = await self.config.get(
"flow-classes"
).get(msg.class_name),
) )
async def handle_put_class(self, msg): async def handle_put_class(self, msg):
self.config["flow-classes"][msg.class_name] = msg.class_definition await self.config.get("flow-classes").put(
msg.class_name, msg.class_definition
)
self.config.version += 1 await self.config.inc_version()
await self.config.push() await self.config.push()
@ -39,9 +43,9 @@ class FlowConfig:
print(msg) print(msg)
del self.config["flow-classes"][msg.class_name] await self.config.get("flow-classes").delete(msg.class_name)
self.config.version += 1 await self.config.inc_version()
await self.config.push() await self.config.push()
@ -51,7 +55,7 @@ class FlowConfig:
async def handle_list_flows(self, msg): async def handle_list_flows(self, msg):
names = list(self.config["flows"].keys()) names = list(await self.config.get("flows").keys())
return FlowResponse( return FlowResponse(
error = None, error = None,
@ -60,7 +64,7 @@ class FlowConfig:
async def handle_get_flow(self, msg): async def handle_get_flow(self, msg):
flow = self.config["flows"][msg.flow_id] flow = await self.config.get("flows").get(msg.flow_id)
return FlowResponse( return FlowResponse(
error = None, error = None,
@ -75,13 +79,13 @@ class FlowConfig:
if msg.flow_id is None: if msg.flow_id is None:
raise RuntimeError("No flow ID") raise RuntimeError("No flow ID")
if msg.flow_id in self.config["flows"]: if msg.flow_id in await self.config.get("flows").values():
raise RuntimeError("Flow already exists") raise RuntimeError("Flow already exists")
if msg.description is None: if msg.description is None:
raise RuntimeError("No description") raise RuntimeError("No description")
if msg.class_name not in self.config["flow-classes"]: if msg.class_name not in await self.config.get("flow-classes").values():
raise RuntimeError("Class does not exist") raise RuntimeError("Class does not exist")
def repl_template(tmp): def repl_template(tmp):
@ -91,7 +95,9 @@ class FlowConfig:
"{id}", msg.flow_id "{id}", msg.flow_id
) )
cls = json.loads(self.config["flow-classes"][msg.class_name]) cls = json.loads(
await self.config.get("flow-classes").get(msg.class_name)
)
for kind in ("class", "flow"): for kind in ("class", "flow"):
@ -106,15 +112,18 @@ class FlowConfig:
for k2, v2 in v.items() for k2, v2 in v.items()
} }
if processor in self.config["flows-active"]: flac = await self.config.get("flows-active").values()
target = json.loads(self.config["flows-active"][processor]) if processor in flac:
target = json.loads(flac[processor])
else: else:
target = {} target = {}
if variant not in target: if variant not in target:
target[variant] = v target[variant] = v
self.config["flows-active"][processor] = json.dumps(target) await self.config.get("flows-active").put(
processor, json.dumps(target)
)
def repl_interface(i): def repl_interface(i):
if isinstance(i, str): if isinstance(i, str):
@ -133,13 +142,16 @@ class FlowConfig:
else: else:
interfaces = {} interfaces = {}
self.config["flows"][msg.flow_id] = json.dumps({ await self.config.get("flows").put(
"description": msg.description, msg.flow_id,
"class-name": msg.class_name, json.dumps({
"interfaces": interfaces, "description": msg.description,
}) "class-name": msg.class_name,
"interfaces": interfaces,
})
)
self.config.version += 1 await self.config.inc_version()
await self.config.push() await self.config.push()
@ -152,17 +164,17 @@ class FlowConfig:
if msg.flow_id is None: if msg.flow_id is None:
raise RuntimeError("No flow ID") raise RuntimeError("No flow ID")
if msg.flow_id not in self.config["flows"]: if msg.flow_id not in await self.config.get("flows").keys():
raise RuntimeError("Flow ID invalid") raise RuntimeError("Flow ID invalid")
flow = json.loads(self.config["flows"][msg.flow_id]) flow = json.loads(await self.config.get("flows").get(msg.flow_id))
if "class-name" not in flow: if "class-name" not in flow:
raise RuntimeError("Internal error: flow has no flow class") raise RuntimeError("Internal error: flow has no flow class")
class_name = flow["class-name"] class_name = flow["class-name"]
cls = json.loads(self.config["flow-classes"][class_name]) cls = json.loads(await self.config.get("flow-classes").get(class_name))
def repl_template(tmp): def repl_template(tmp):
return tmp.replace( return tmp.replace(
@ -179,20 +191,24 @@ class FlowConfig:
variant = repl_template(variant) variant = repl_template(variant)
if processor in self.config["flows-active"]: flac = await self.config.get("flows-active").values()
target = json.loads(self.config["flows-active"][processor])
if processor in flac:
target = json.loads(flac[processor])
else: else:
target = {} target = {}
if variant in target: if variant in target:
del target[variant] del target[variant]
self.config["flows-active"][processor] = json.dumps(target) await self.config.get("flows-active").put(
processor, json.dumps(target)
)
if msg.flow_id in self.config["flows"]: if msg.flow_id in await self.config.get("flows").values():
del self.config["flows"][msg.flow_id] await self.config.get("flows").delete(msg.flow_id)
self.config.version += 1 await self.config.inc_version()
await self.config.push() await self.config.push()

View file

@ -20,6 +20,9 @@ from . flow import FlowConfig
from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from ... base import Consumer, Producer from ... base import Consumer, Producer
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "config"
default_ident = "config-svc" default_ident = "config-svc"
default_config_request_queue = config_request_queue default_config_request_queue = config_request_queue
@ -29,6 +32,8 @@ default_config_push_queue = config_push_queue
default_flow_request_queue = flow_request_queue default_flow_request_queue = flow_request_queue
default_flow_response_queue = flow_response_queue default_flow_response_queue = flow_response_queue
default_cassandra_host = "cassandra"
class Processor(AsyncProcessor): class Processor(AsyncProcessor):
def __init__(self, **params): def __init__(self, **params):
@ -50,6 +55,10 @@ class Processor(AsyncProcessor):
"flow_response_queue", default_flow_response_queue "flow_response_queue", default_flow_response_queue
) )
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_user = params.get("cassandra_user")
cassandra_password = params.get("cassandra_password")
id = params.get("id") id = params.get("id")
flow_request_schema = FlowRequest flow_request_schema = FlowRequest
@ -62,6 +71,8 @@ class Processor(AsyncProcessor):
"config_push_schema": ConfigPush.__name__, "config_push_schema": ConfigPush.__name__,
"flow_request_schema": FlowRequest.__name__, "flow_request_schema": FlowRequest.__name__,
"flow_response_schema": FlowResponse.__name__, "flow_response_schema": FlowResponse.__name__,
"cassandra_host": cassandra_host,
"cassandra_user": cassandra_user,
} }
) )
@ -125,7 +136,14 @@ class Processor(AsyncProcessor):
metrics = flow_response_metrics, metrics = flow_response_metrics,
) )
self.config = Configuration(self.push) self.config = Configuration(
host = cassandra_host.split(","),
user = cassandra_user,
password = cassandra_password,
keyspace = keyspace,
push = self.push
)
self.flow = FlowConfig(self.config) self.flow = FlowConfig(self.config)
print("Service initialised.") print("Service initialised.")
@ -138,18 +156,23 @@ class Processor(AsyncProcessor):
async def push(self): async def push(self):
config = await self.config.get_config()
version = await self.config.get_version()
resp = ConfigPush( resp = ConfigPush(
version = self.config.version, version = version,
value = None, value = None,
directory = None, directory = None,
values = None, values = None,
config = self.config, config = config,
error = None, error = None,
) )
await self.config_push_producer.send(resp) await self.config_push_producer.send(resp)
print("Pushed version ", self.config.version) # Race condition, should make sure version & config sync
print("Pushed version ", await self.config.get_version())
async def on_config_request(self, msg, consumer, flow): async def on_config_request(self, msg, consumer, flow):
@ -248,6 +271,24 @@ class Processor(AsyncProcessor):
help=f'Flow response queue {default_flow_response_queue}', help=f'Flow response queue {default_flow_response_queue}',
) )
parser.add_argument(
'--cassandra-host',
default="cassandra",
help=f'Graph host (default: cassandra)'
)
parser.add_argument(
'--cassandra-user',
default=None,
help=f'Cassandra user'
)
parser.add_argument(
'--cassandra-password',
default=None,
help=f'Cassandra password'
)
def run(): def run():
Processor.launch(default_ident, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1,302 @@
from .. schema import KnowledgeResponse, Triple, Triples, EntityEmbeddings
from .. schema import Metadata, Value, GraphEmbeddings
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLSv1_2
import uuid
import time
import asyncio
class ConfigTableStore:
def __init__(
self,
cassandra_host, cassandra_user, cassandra_password, keyspace,
):
self.keyspace = keyspace
print("Connecting to Cassandra...", flush=True)
if cassandra_user and cassandra_password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(
username=cassandra_user, password=cassandra_password
)
self.cluster = Cluster(
cassandra_host,
auth_provider=auth_provider,
ssl_context=ssl_context
)
else:
self.cluster = Cluster(cassandra_host)
self.cassandra = self.cluster.connect()
print("Connected.", flush=True)
self.ensure_cassandra_schema()
self.prepare_statements()
def ensure_cassandra_schema(self):
print("Ensure Cassandra schema...", flush=True)
print("Keyspace...", flush=True)
# FIXME: Replication factor should be configurable
self.cassandra.execute(f"""
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
'replication_factor' : 1
}};
""");
self.cassandra.set_keyspace(self.keyspace)
print("config table...", flush=True)
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS config (
class text,
key text,
value text,
PRIMARY KEY (class, key)
);
""");
print("version table...", flush=True)
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS version (
id text,
version counter,
PRIMARY KEY (id)
);
""");
resp = self.cassandra.execute("""
SELECT version FROM version
""")
print("ensure version...", flush=True)
self.cassandra.execute("""
UPDATE version set version = version + 0
WHERE id = 'version'
""")
print("Cassandra schema OK.", flush=True)
async def inc_version(self):
self.cassandra.execute("""
UPDATE version set version = version + 1
WHERE id = 'version'
""")
async def get_version(self):
resp = self.cassandra.execute("""
SELECT version FROM version
WHERE id = 'version'
""")
row = resp.one()
if row: return row[0]
return None
def prepare_statements(self):
self.put_config_stmt = self.cassandra.prepare("""
INSERT INTO config ( class, key, value )
VALUES (?, ?, ?)
""")
self.get_classes_stmt = self.cassandra.prepare("""
SELECT DISTINCT class FROM config;
""")
self.get_keys_stmt = self.cassandra.prepare("""
SELECT key FROM config WHERE class = ?;
""")
self.get_value_stmt = self.cassandra.prepare("""
SELECT value FROM config WHERE class = ? AND key = ?;
""")
self.delete_key_stmt = self.cassandra.prepare("""
DELETE FROM config
WHERE class = ? AND key = ?;
""")
self.get_all_stmt = self.cassandra.prepare("""
SELECT class, key, value FROM config;
""")
self.get_values_stmt = self.cassandra.prepare("""
SELECT key, value FROM config WHERE class = ?;
""")
async def put_config(self, cls, key, value):
while True:
try:
resp = self.cassandra.execute(
self.put_config_stmt,
( cls, key, value )
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
async def get_value(self, cls, key):
while True:
try:
resp = self.cassandra.execute(
self.get_value_stmt,
( cls, key )
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
for row in resp:
return row[0]
return None
async def get_values(self, cls):
while True:
try:
resp = self.cassandra.execute(
self.get_values_stmt,
( cls, )
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
return [
[row[0], row[1]]
for row in resp
]
async def get_classes(self):
while True:
try:
resp = self.cassandra.execute(
self.get_classes_stmt,
()
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
return [
row[0] for row in resp
]
async def get_all(self):
while True:
try:
resp = self.cassandra.execute(
self.get_all_stmt,
()
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
return [
(row[0], row[1], row[2])
for row in resp
]
async def get_keys(self, cls):
while True:
try:
resp = self.cassandra.execute(
self.get_keys_stmt,
( cls, )
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)
return [
row[0] for row in resp
]
async def delete_key(self, cls, key):
while True:
try:
resp = self.cassandra.execute(
self.delete_key_stmt,
(cls, key)
)
break
except Exception as e:
print("Exception:", type(e))
print(f"{e}, retry...", flush=True)
await asyncio.sleep(1)