feat: rm vault

This commit is contained in:
莘权 马 2024-04-28 14:07:31 +08:00
parent c5945bf0a7
commit 997b626877
5 changed files with 41 additions and 325 deletions

View file

@ -16,14 +16,8 @@ from metagpt.tools.libs import (
browser,
deployer,
)
from metagpt.tools.libs.env import get_env, set_get_env_entry, default_get_env
from metagpt.tools.libs.env import get_env, set_get_env_entry, default_get_env, get_env_description
from metagpt.tools.libs.software_development import (
write_prd,
write_design,
write_project_plan,
write_codes,
run_qa_test,
fix_bug,
git_archive,
)
@ -34,18 +28,13 @@ _ = (
gpt_v_generator,
web_scraping,
email_login,
write_prd,
write_design,
write_project_plan,
write_codes,
run_qa_test,
fix_bug,
git_archive,
terminal,
file_manager,
browser,
deployer,
get_env,
get_env_description,
set_get_env_entry,
default_get_env,
) # Avoid pre-commit error

View file

@ -7,8 +7,7 @@
@Desc: Implement `get_env`. RFC 216 2.4.2.4.2.
"""
import os
from metagpt.context import Context
from typing import Dict
class EnvKeyNotFoundError(Exception):
@ -19,6 +18,9 @@ class EnvKeyNotFoundError(Exception):
async def default_get_env(key: str, app_name: str = None) -> str:
if key in os.environ:
return os.environ[key]
from metagpt.context import Context
context = Context()
val = context.kwargs.get(key, None)
if val is not None:
@ -27,7 +29,23 @@ async def default_get_env(key: str, app_name: str = None) -> str:
raise EnvKeyNotFoundError(f"EnvKeyNotFoundError: {key}, app_name:{app_name or ''}")
async def default_get_env_description() -> Dict[str, str]:
result = {}
for k in os.environ.keys():
call = f'await get_env(key="{k}", app_name="")'
result[call] = f"Return the value of environment variable `{k}`."
from metagpt.context import Context
context = Context()
for k in context.kwargs.__dict__.keys():
call = f'await get_env(key="{k}", app_name="")'
result[call] = f"Get the value of environment variable `{k}`."
return result
_get_env_entry = default_get_env
_get_env_description_entry = default_get_env_description
async def get_env(key: str, app_name: str = None) -> str:
@ -66,11 +84,26 @@ async def get_env(key: str, app_name: str = None) -> str:
return await default_get_env(key=key, app_name=app_name)
def set_get_env_entry(func):
"""Modify `get_env` entry.
async def get_env_description() -> Dict[str, str]:
global _get_env_description_entry
if _get_env_description_entry:
return await _get_env_description_entry()
return await default_get_env_description()
def set_get_env_entry(value, description):
"""Modify `get_env` entry and `get_description` entry.
Args:
func: New function entry.
value (function): New function entry.
description (str): Description of the function.
This function modifies the `get_env` entry by updating the function
to the provided `value` and its description to the provided `description`.
"""
global _get_env_entry
_get_env_entry = func
global _get_env_description_entry
_get_env_entry = value
_get_env_description_entry = description

View file

@ -1,258 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from hashlib import sha256
from typing import Dict, List, Optional
import aiohttp
from pydantic import BaseModel, constr, field_validator
class HashicorpResult(BaseModel):
errors: Optional[List[str]] = None
class HashicorpAuth(BaseModel):
vault_addr: str
role_id: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
secret_id: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
access_token: Optional[str] = None # At least one of `role_id` and `secret_id`, or `access_token` must be valid
async def get_token(self) -> str:
if self.access_token:
return self.access_token
url = f"{self.vault_addr}/v1/auth/approle/login"
payload = {"role_id": self.role_id, "secret_id": self.secret_id}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.text()
class Auth(BaseModel):
client_token: str
class Data(HashicorpResult):
auth: Optional[Auth] = None
v = Data.model_validate_json(data)
if v.errors:
raise ValueError(v.errors)
self.access_token = v.auth.client_token
return self.access_token
@field_validator("vault_addr", mode="before")
@classmethod
def check_vault_addr(cls, vault_addr: str) -> str:
vault_addr = vault_addr or os.environ["VAULT_ADDR"]
return vault_addr.rstrip("/")
class HashicorpVaultSecrets(BaseModel):
auth: HashicorpAuth
user_name: constr(min_length=1)
uid: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
self.uid = self.format_user_name(self.user_name)
async def create_user(self) -> HashicorpAuth:
"""Admin tool, create user.
Returns:
The `HashicorpAuth` object containing `role_id` and `secret_id`.
"""
await self._create_kv_secret_engine()
await self._create_policy()
await self._create_approle()
role_id = await self._get_role_id()
secret_id = await self._get_secret_id()
return HashicorpAuth(vault_addr=self.auth.vault_addr, role_id=role_id, secret_id=secret_id)
@staticmethod
def format_user_name(user_name):
"""Formats a user name by hashing it with SHA256 and returning a shortened version.
Args:
user_name (str): The user name to be formatted.
Returns:
str: A formatted user name with the first 8 characters of the SHA256 hash prepended by 'u'.
"""
hashed = sha256(user_name.encode()).hexdigest()
return f"u{hashed[:8]}"
async def _create_kv_secret_engine(self, exists_ok: bool = True):
url = f"{self.auth.vault_addr}/v1/sys/mounts/{self.uid}"
headers = {"X-Vault-Token": await self.auth.get_token()}
payload = {"type": "kv", "options": {"version": "2"}}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
exists_errors = [f"path is already in use at {self.uid}/"]
if exists_ok and exists_errors == result.errors:
return
raise ValueError(result.errors)
async def _create_policy(self):
url = f"{self.auth.vault_addr}/v1/sys/policies/acl/{self.uid}-secret-policy"
headers = {"X-Vault-Token": await self.auth.get_token()}
path = f'path "{self.uid}/data/*"'
acl = '{ capabilities = ["create", "read", "update", "delete", "list"] }'
payload = {"policy": f"{path} {acl}"}
async with aiohttp.ClientSession() as session:
async with session.put(url, headers=headers, json=payload) as response:
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
async def _create_approle(self):
url = f"{self.auth.vault_addr}/v1/auth/approle/role/role-{self.uid}"
headers = {"X-Vault-Token": await self.auth.get_token()}
payload = {"policies": [f"{self.uid}-secret-policy"]}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 204:
return
data = await response.text()
result = HashicorpResult.model_validate_json(data)
raise ValueError(result.errors)
async def _get_role_id(self) -> str:
url = f"{self.auth.vault_addr}/v1/auth/approle/role/role-{self.uid}/role-id"
headers = {"X-Vault-Token": await self.auth.get_token()}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
data = await response.text()
class RoleID(BaseModel):
role_id: str
class Data(HashicorpResult):
data: Optional[RoleID] = None
v = Data.model_validate_json(data)
if v.errors:
raise ValueError(v.errors)
return v.data.role_id
async def _get_secret_id(self) -> str:
url = f"{self.auth.vault_addr}/v1/auth/approle/role/role-{self.uid}/secret-id"
headers = {"X-Vault-Token": await self.auth.get_token()}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as response:
data = await response.text()
class SecretID(BaseModel):
secret_id: str
class Data(HashicorpResult):
data: Optional[SecretID] = None
v = Data.model_validate_json(data)
if v.errors:
raise ValueError(v.errors)
return v.data.secret_id
async def upsert_kv(self, kvs: Dict[str, str], app_name: str = "") -> int:
"""Creates or updates a secret in HashiCorp Vault's Key-Value (KV) secret engine v2.
See More: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#create-update-secret
Args:
kvs (Dict[str, str]): A dictionary containing key-value pairs representing the secret data
to be created or updated. The keys represent the paths of the secret to update, and the
values represent the corresponding secret values.
app_name (str, optional): The path to the KV mount containing the secret to read, such as secret.
This is specified as part of the URL.
Returns:
int: A int indicating the version of the kvs.
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token(), "Content-Type": "application/json"}
data = {"data": kvs}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
rsp = await response.text()
class Version(BaseModel):
version: int
class Data(HashicorpResult):
data: Optional[Version] = None
v = Data.model_validate_json(rsp)
if v.errors:
raise ValueError(v.errors)
return v.data.version
async def get_kv(self, app_name: str = "") -> Optional[Dict[str, str]]:
"""Reads a secret version from HashiCorp Vault's Key-Value (KV) secret engine v2.
Retrieves the secret data from the specified KV mount path.
See more: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#read-secret-version
Args:
app_name (str): The path to the KV mount containing the secret to read, such as secret.
This is specified as part of the URL.
Returns:
Optional[Dict[str, str]]: A dictionary containing the secret data if found.
Returns None if the secret is not found.
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token()}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
rsp = await response.text()
class KV(BaseModel):
data: Optional[Dict[str, str]] = None
class Data(HashicorpResult):
data: Optional[KV] = None
v = Data.model_validate_json(rsp)
if v.errors:
raise ValueError(v.errors)
return v.data.data
async def delete_kv(self, app_name: str = ""):
"""Deletes the latest version of a secret from HashiCorp Vault's Key-Value (KV) secret engine v2.
See More: https://developer.hashicorp.com/vault/api-docs/secret/kv/kv-v2#delete-latest-version-of-secret
Args:
app_name (str, optional): The path to the KV mount containing the secret to delete, such as secret.
This is specified as part of the URL.
Returns:
None
"""
app_name = app_name or "global"
url = f"{self.auth.vault_addr}/v1/{self.uid}/data/{app_name}"
headers = {"X-Vault-Token": await self.auth.get_token()}
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers) as response:
if response.status == 204:
return
rsp = await response.text()
v = HashicorpResult.model_validate_json(rsp)
raise ValueError(v.errors)
field_validator("vault_addr", mode="before")

View file

@ -1,11 +0,0 @@
storage "file" {
path = "vault-data"
}
listener "tcp" {
tls_disable = "true"
# tls_cert_file = "yourdomain.crt"
# tls_key_file = "yourdomain.key"
}
ui = true

View file

@ -1,37 +0,0 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
import pytest
from metagpt.utils.hashicorp_vault import HashicorpAuth, HashicorpVaultSecrets
@pytest.mark.asyncio
@pytest.mark.parametrize(
("root_token", "vault_addr", "user_name", "kvs"),
[
(
"hvs.gMkrxXDhVNkeg4H50Vb9tUeg",
"http://127.0.0.1:8200",
"a@dafcSSD/a",
{"user": "a", "pwd": "a", "ip": "a", "port": "a"},
)
],
)
@pytest.mark.skip
async def test_vault_secret(root_token, vault_addr, user_name, kvs, context):
root_vault = HashicorpVaultSecrets(
auth=HashicorpAuth(vault_addr=vault_addr, access_token=root_token), user_name=user_name
)
user_auth = await root_vault.create_user()
user_vault = HashicorpVaultSecrets(
auth=user_auth,
user_name=user_name,
)
await user_vault.upsert_kv(kvs=kvs, app_name="redis-config")
kvs1 = await user_vault.get_kv(app_name="redis-config")
assert kvs1 == kvs
if __name__ == "__main__":
pytest.main([__file__, "-s"])