feat: +unit test

This commit is contained in:
莘权 马 2024-04-22 11:40:05 +08:00
parent e3dc289e1a
commit 9668a6918f
2 changed files with 105 additions and 14 deletions

View file

@ -14,9 +14,9 @@ class HashicorpResult(BaseModel):
class HashicorpAuth(BaseModel):
vault_addr: str
role_id: str
secret_id: str
access_token: Optional[str] = None
role_id: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
secret_id: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
access_token: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
async def get_token(self) -> str:
if self.access_token:
@ -54,13 +54,16 @@ class HashicorpVaultSecrets(BaseModel):
user_name: constr(min_length=1)
uid: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
self.uid = self.format_user_name(self.user_name)
async def create_user(self) -> HashicorpAuth:
"""Admin tool, create user.
Returns:
The `HashicorpAuth` object containing `role_id` and `secret_id`.
"""
self.uid = self.format_user_name(self.user_name)
await self._create_kv_secret_engine()
await self._create_policy()
await self._create_approle()
@ -70,10 +73,18 @@ class HashicorpVaultSecrets(BaseModel):
@staticmethod
def format_user_name(user_name):
"""Formats a user name by hashing it with SHA256 and returning a shortened version.
Args:
user_name (str): The user name to be formatted.
Returns:
str: A formatted user name with the first 8 characters of the SHA256 hash prepended by 'u'.
"""
hashed = sha256(user_name.encode()).hexdigest()
return f"u{hashed[:8]}"
async def _create_kv_secret_engine(self):
async def _create_kv_secret_engine(self, exists_ok: bool = True):
url = f"{self.auth.vault_addr}/v1/sys/mounts/{self.uid}"
headers = {"X-Vault-Token": await self.auth.get_token()}
payload = {"type": "kv", "options": {"version": "2"}}
@ -83,8 +94,11 @@ class HashicorpVaultSecrets(BaseModel):
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
result = HashicorpResult.model_validate_json(data)
exists_errors = [f"path is already in use at {self.uid}/"]
if exists_ok and exists_errors == result.errors:
return
raise ValueError(result.errors)
async def _create_policy(self):
url = f"{self.auth.vault_addr}/v1/sys/policies/acl/{self.uid}-secret-policy"
@ -98,8 +112,8 @@ class HashicorpVaultSecrets(BaseModel):
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
async def _create_approle(self):
url = f"{self.auth.vault_addr}/v1/auth/approle/role/role-{self.uid}"
@ -111,8 +125,8 @@ class HashicorpVaultSecrets(BaseModel):
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
async def _get_role_id(self) -> str:
url = f"{self.auth.vault_addr}/v1/auth/approle/role/role-{self.uid}/role-id"
@ -152,7 +166,21 @@ class HashicorpVaultSecrets(BaseModel):
raise ValueError(v.errors)
return v.data.secret_id
async def upsert_kv(self, kvs: Dict[str, str], app_name: str = "") -> str:
async def upsert_kv(self, kvs: Dict[str, str], app_name: str = "") -> int:
"""Creates or updates a secret in HashiCorp Vault's Key-Value (KV) secret engine v2.
See More: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#create-update-secret
Args:
kvs (Dict[str, str]): A dictionary containing key-value pairs representing the secret data
to be created or updated. The keys represent the paths of the secret to update, and the
values represent the corresponding secret values.
app_name (str, optional): The path to the KV mount containing the secret to read, such as secret.
This is specified as part of the URL.
Returns:
int: A int indicating the version of the kvs.
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token(), "Content-Type": "application/json"}
@ -162,7 +190,7 @@ class HashicorpVaultSecrets(BaseModel):
rsp = await response.text()
class Version(BaseModel):
version: str
version: int
class Data(HashicorpResult):
data: Optional[Version] = None
@ -173,6 +201,19 @@ class HashicorpVaultSecrets(BaseModel):
return v.data.version
async def get_kv(self, app_name: str = "") -> Optional[Dict[str, str]]:
"""Reads a secret version from HashiCorp Vault's Key-Value (KV) secret engine v2.
Retrieves the secret data from the specified KV mount path.
See more: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#read-secret-version
Args:
app_name (str): The path to the KV mount containing the secret to read, such as secret.
This is specified as part of the URL.
Returns:
Optional[Dict[str, str]]: A dictionary containing the secret data if found.
Returns None if the secret is not found.
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token()}
@ -192,6 +233,17 @@ class HashicorpVaultSecrets(BaseModel):
return v.data.data
async def delete_kv(self, app_name: str = ""):
"""Deletes the latest version of a secret from HashiCorp Vault's Key-Value (KV) secret engine v2.
See More: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#delete-latest-version-of-secret
Args:
app_name (str, optional): The path to the KV mount containing the secret to delete, such as secret.
This is specified as part of the URL.
Returns:
None
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token()}
@ -202,3 +254,5 @@ class HashicorpVaultSecrets(BaseModel):
rsp = await response.text()
v = HashicorpResult.model_validate_json(rsp)
raise ValueError(v.errors)
field_validator("vault_addr", mode="before")