mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-01 03:46:23 +02:00
feat: merge send18:dev
This commit is contained in:
commit
7effe7f74c
92 changed files with 4830 additions and 302 deletions
|
|
@ -18,8 +18,10 @@ import os
|
|||
import platform
|
||||
import re
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
from metagpt.const import MESSAGE_ROUTE_TO_ALL
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
import yaml
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
|
|
@ -184,7 +186,7 @@ class OutputParser:
|
|||
|
||||
if start_index != -1 and end_index != -1:
|
||||
# Extract the structure part
|
||||
structure_text = text[start_index : end_index + 1]
|
||||
structure_text = text[start_index: end_index + 1]
|
||||
|
||||
try:
|
||||
# Attempt to convert the text to a Python data type using ast.literal_eval
|
||||
|
|
|
|||
79
metagpt/utils/cost_manager.py
Normal file
79
metagpt/utils/cost_manager.py
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@Time : 2023/8/28
|
||||
@Author : mashenquan
|
||||
@File : openai.py
|
||||
@Desc : mashenquan, 2023/8/28. Separate the `CostManager` class to support user-level cost accounting.
|
||||
"""
|
||||
|
||||
from pydantic import BaseModel
|
||||
from metagpt.logs import logger
|
||||
from metagpt.utils.token_counter import TOKEN_COSTS
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
class Costs(NamedTuple):
|
||||
total_prompt_tokens: int
|
||||
total_completion_tokens: int
|
||||
total_cost: float
|
||||
total_budget: float
|
||||
|
||||
|
||||
class CostManager(BaseModel):
|
||||
"""Calculate the overhead of using the interface."""
|
||||
|
||||
total_prompt_tokens: int = 0
|
||||
total_completion_tokens: int = 0
|
||||
total_budget: float = 0
|
||||
max_budget: float = 10.0
|
||||
total_cost: float = 0
|
||||
|
||||
def update_cost(self, prompt_tokens, completion_tokens, model):
|
||||
"""
|
||||
Update the total cost, prompt tokens, and completion tokens.
|
||||
|
||||
Args:
|
||||
prompt_tokens (int): The number of tokens used in the prompt.
|
||||
completion_tokens (int): The number of tokens used in the completion.
|
||||
model (str): The model used for the API call.
|
||||
"""
|
||||
self.total_prompt_tokens += prompt_tokens
|
||||
self.total_completion_tokens += completion_tokens
|
||||
cost = (prompt_tokens * TOKEN_COSTS[model]["prompt"] + completion_tokens * TOKEN_COSTS[model][
|
||||
"completion"]) / 1000
|
||||
self.total_cost += cost
|
||||
logger.info(
|
||||
f"Total running cost: ${self.total_cost:.3f} | Max budget: ${self.max_budget:.3f} | "
|
||||
f"Current cost: ${cost:.3f}, prompt_tokens: {prompt_tokens}, completion_tokens: {completion_tokens}"
|
||||
)
|
||||
|
||||
def get_total_prompt_tokens(self):
|
||||
"""
|
||||
Get the total number of prompt tokens.
|
||||
|
||||
Returns:
|
||||
int: The total number of prompt tokens.
|
||||
"""
|
||||
return self.total_prompt_tokens
|
||||
|
||||
def get_total_completion_tokens(self):
|
||||
"""
|
||||
Get the total number of completion tokens.
|
||||
|
||||
Returns:
|
||||
int: The total number of completion tokens.
|
||||
"""
|
||||
return self.total_completion_tokens
|
||||
|
||||
def get_total_cost(self):
|
||||
"""
|
||||
Get the total cost of API calls.
|
||||
|
||||
Returns:
|
||||
float: The total cost of API calls.
|
||||
"""
|
||||
return self.total_cost
|
||||
|
||||
def get_costs(self) -> Costs:
|
||||
"""Get all costs"""
|
||||
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)
|
||||
|
|
@ -4,15 +4,25 @@
|
|||
@Time : 2023/7/4 10:53
|
||||
@Author : alexanderwu alitrack
|
||||
@File : mermaid.py
|
||||
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
|
||||
"""
|
||||
import asyncio
|
||||
<<<<<<< HEAD
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.const import METAGPT_ROOT
|
||||
=======
|
||||
from pathlib import Path
|
||||
|
||||
# from metagpt.utils.common import check_cmd_exists
|
||||
import aiofiles
|
||||
|
||||
from metagpt.config import CONFIG, Config
|
||||
from metagpt.const import PROJECT_ROOT
|
||||
>>>>>>> send18/dev
|
||||
from metagpt.logs import logger
|
||||
from metagpt.utils.common import check_cmd_exists
|
||||
|
||||
|
||||
async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048) -> int:
|
||||
|
|
@ -29,8 +39,11 @@ async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048,
|
|||
if dir_name and not os.path.exists(dir_name):
|
||||
os.makedirs(dir_name)
|
||||
tmp = Path(f"{output_file_without_suffix}.mmd")
|
||||
tmp.write_text(mermaid_code, encoding="utf-8")
|
||||
async with aiofiles.open(tmp, "w", encoding="utf-8") as f:
|
||||
await f.write(mermaid_code)
|
||||
# tmp.write_text(mermaid_code, encoding="utf-8")
|
||||
|
||||
<<<<<<< HEAD
|
||||
engine = CONFIG.mermaid_engine.lower()
|
||||
if engine == "nodejs":
|
||||
if check_cmd_exists(CONFIG.mmdc) != 0:
|
||||
|
|
@ -87,60 +100,93 @@ async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048,
|
|||
logger.warning(f"Unsupported mermaid engine: {engine}")
|
||||
return 0
|
||||
|
||||
=======
|
||||
# if check_cmd_exists("mmdc") != 0:
|
||||
# logger.warning("RUN `npm install -g @mermaid-js/mermaid-cli` to install mmdc")
|
||||
# return -1
|
||||
|
||||
MMC1 = """classDiagram
|
||||
class Main {
|
||||
-SearchEngine search_engine
|
||||
+main() str
|
||||
}
|
||||
class SearchEngine {
|
||||
-Index index
|
||||
-Ranking ranking
|
||||
-Summary summary
|
||||
+search(query: str) str
|
||||
}
|
||||
class Index {
|
||||
-KnowledgeBase knowledge_base
|
||||
+create_index(data: dict)
|
||||
+query_index(query: str) list
|
||||
}
|
||||
class Ranking {
|
||||
+rank_results(results: list) list
|
||||
}
|
||||
class Summary {
|
||||
+summarize_results(results: list) str
|
||||
}
|
||||
class KnowledgeBase {
|
||||
+update(data: dict)
|
||||
+fetch_data(query: str) dict
|
||||
}
|
||||
Main --> SearchEngine
|
||||
SearchEngine --> Index
|
||||
SearchEngine --> Ranking
|
||||
SearchEngine --> Summary
|
||||
Index --> KnowledgeBase"""
|
||||
# for suffix in ["pdf", "svg", "png"]:
|
||||
for suffix in ["png"]:
|
||||
output_file = f"{output_file_without_suffix}.{suffix}"
|
||||
# Call the `mmdc` command to convert the Mermaid code to a PNG
|
||||
logger.info(f"Generating {output_file}..")
|
||||
cmds = [CONFIG.mmdc, "-i", str(tmp), "-o", output_file, "-w", str(width), "-H", str(height)]
|
||||
|
||||
MMC2 = """sequenceDiagram
|
||||
participant M as Main
|
||||
participant SE as SearchEngine
|
||||
participant I as Index
|
||||
participant R as Ranking
|
||||
participant S as Summary
|
||||
participant KB as KnowledgeBase
|
||||
M->>SE: search(query)
|
||||
SE->>I: query_index(query)
|
||||
I->>KB: fetch_data(query)
|
||||
KB-->>I: return data
|
||||
I-->>SE: return results
|
||||
SE->>R: rank_results(results)
|
||||
R-->>SE: return ranked_results
|
||||
SE->>S: summarize_results(ranked_results)
|
||||
S-->>SE: return summary
|
||||
SE-->>M: return summary"""
|
||||
if CONFIG.puppeteer_config:
|
||||
cmds.extend(["-p", CONFIG.puppeteer_config])
|
||||
process = await asyncio.create_subprocess_exec(*cmds)
|
||||
await process.wait()
|
||||
return process.returncode
|
||||
>>>>>>> send18/dev
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
MMC1 = """classDiagram
|
||||
class Main {
|
||||
-SearchEngine search_engine
|
||||
+main() str
|
||||
}
|
||||
class SearchEngine {
|
||||
-Index index
|
||||
-Ranking ranking
|
||||
-Summary summary
|
||||
+search(query: str) str
|
||||
}
|
||||
class Index {
|
||||
-KnowledgeBase knowledge_base
|
||||
+create_index(data: dict)
|
||||
+query_index(query: str) list
|
||||
}
|
||||
class Ranking {
|
||||
+rank_results(results: list) list
|
||||
}
|
||||
class Summary {
|
||||
+summarize_results(results: list) str
|
||||
}
|
||||
class KnowledgeBase {
|
||||
+update(data: dict)
|
||||
+fetch_data(query: str) dict
|
||||
}
|
||||
Main --> SearchEngine
|
||||
SearchEngine --> Index
|
||||
SearchEngine --> Ranking
|
||||
SearchEngine --> Summary
|
||||
Index --> KnowledgeBase"""
|
||||
|
||||
MMC2 = """sequenceDiagram
|
||||
participant M as Main
|
||||
participant SE as SearchEngine
|
||||
participant I as Index
|
||||
participant R as Ranking
|
||||
participant S as Summary
|
||||
participant KB as KnowledgeBase
|
||||
M->>SE: search(query)
|
||||
SE->>I: query_index(query)
|
||||
I->>KB: fetch_data(query)
|
||||
KB-->>I: return data
|
||||
I-->>SE: return results
|
||||
SE->>R: rank_results(results)
|
||||
R-->>SE: return ranked_results
|
||||
SE->>S: summarize_results(ranked_results)
|
||||
S-->>SE: return summary
|
||||
SE-->>M: return summary"""
|
||||
|
||||
<<<<<<< HEAD
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.new_event_loop()
|
||||
result = loop.run_until_complete(mermaid_to_file(MMC1, METAGPT_ROOT / f"{CONFIG.mermaid_engine}/1"))
|
||||
result = loop.run_until_complete(mermaid_to_file(MMC2, METAGPT_ROOT / f"{CONFIG.mermaid_engine}/1"))
|
||||
loop.close()
|
||||
=======
|
||||
conf = Config()
|
||||
asyncio.run(
|
||||
mermaid_to_file(
|
||||
options=conf.runtime_options, mermaid_code=MMC1, output_file_without_suffix=PROJECT_ROOT / "tmp/1.png"
|
||||
)
|
||||
)
|
||||
asyncio.run(
|
||||
mermaid_to_file(
|
||||
options=conf.runtime_options, mermaid_code=MMC2, output_file_without_suffix=PROJECT_ROOT / "tmp/2.png"
|
||||
)
|
||||
)
|
||||
>>>>>>> send18/dev
|
||||
|
|
|
|||
219
metagpt/utils/redis.py
Normal file
219
metagpt/utils/redis.py
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
# !/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Author: Hui
|
||||
# @Desc: { redis client }
|
||||
# @Date: 2022/11/28 10:12
|
||||
import json
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Dict, Optional, Union
|
||||
|
||||
from redis import asyncio as aioredis
|
||||
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
class RedisTypeEnum(Enum):
|
||||
"""Redis 数据类型"""
|
||||
|
||||
String = "String"
|
||||
List = "List"
|
||||
Hash = "Hash"
|
||||
Set = "Set"
|
||||
ZSet = "ZSet"
|
||||
|
||||
|
||||
def make_url(
|
||||
dialect: str,
|
||||
*,
|
||||
user: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
host: Optional[str] = None,
|
||||
port: Optional[Union[str, int]] = None,
|
||||
name: Optional[Union[str, int]] = None,
|
||||
) -> str:
|
||||
url_parts = [f"{dialect}://"]
|
||||
if user or password:
|
||||
if user:
|
||||
url_parts.append(user)
|
||||
if password:
|
||||
url_parts.append(f":{password}")
|
||||
url_parts.append("@")
|
||||
|
||||
if not host and not dialect.startswith("sqlite"):
|
||||
host = "127.0.0.1"
|
||||
|
||||
if host:
|
||||
url_parts.append(f"{host}")
|
||||
if port:
|
||||
url_parts.append(f":{port}")
|
||||
|
||||
# 比如redis可能传入0
|
||||
if name is not None:
|
||||
url_parts.append(f"/{name}")
|
||||
return "".join(url_parts)
|
||||
|
||||
|
||||
class RedisAsyncClient(aioredis.Redis):
|
||||
"""异步的客户端
|
||||
例子::
|
||||
|
||||
rdb = RedisAsyncClient()
|
||||
print(rdb.url)
|
||||
|
||||
Args:
|
||||
host: 服务器地址
|
||||
port: 服务器端口
|
||||
user: 用户名
|
||||
db: 数据库
|
||||
password: 密码
|
||||
decode_responses: 字符串输入被编码成utf8存储在Redis里了,而取出来的时候还是被编码后的bytes,需要显示的decode才能变成字符串
|
||||
health_check_interval: 定时检测连接,防止出现ConnectionErrors (104, Connection reset by peer)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "localhost",
|
||||
port: int = 6379,
|
||||
db: int = 0,
|
||||
password: str = None,
|
||||
decode_responses=True,
|
||||
health_check_interval=10,
|
||||
socket_connect_timeout=5,
|
||||
retry_on_timeout=True,
|
||||
socket_keepalive=True,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
password=password,
|
||||
decode_responses=decode_responses,
|
||||
health_check_interval=health_check_interval,
|
||||
socket_connect_timeout=socket_connect_timeout,
|
||||
retry_on_timeout=retry_on_timeout,
|
||||
socket_keepalive=socket_keepalive,
|
||||
**kwargs,
|
||||
)
|
||||
self.url = make_url("redis", host=host, port=port, name=db, password=password)
|
||||
|
||||
|
||||
class RedisCacheInfo(object):
|
||||
"""统一缓存信息类"""
|
||||
|
||||
def __init__(self, key, timeout: Union[int, timedelta] = timedelta(seconds=60), data_type=RedisTypeEnum.String):
|
||||
"""
|
||||
缓存信息类初始化
|
||||
Args:
|
||||
key: 缓存的key
|
||||
timeout: 缓存过期时间, 单位秒
|
||||
data_type: 缓存采用的数据结构 (不传并不影响,用于标记业务采用的是什么数据结构)
|
||||
"""
|
||||
self.key = key
|
||||
self.timeout = timeout
|
||||
self.data_type = data_type
|
||||
|
||||
def __str__(self):
|
||||
return f"cache key {self.key} timeout {self.timeout}s"
|
||||
|
||||
|
||||
class RedisManager:
|
||||
client: RedisAsyncClient = None
|
||||
|
||||
@classmethod
|
||||
def init_redis_conn(cls, host, port, password, db):
|
||||
"""初始化redis 连接"""
|
||||
if cls.client is None:
|
||||
cls.client = RedisAsyncClient(host=host, port=port, password=password, db=db)
|
||||
|
||||
@classmethod
|
||||
async def set_with_cache_info(cls, redis_cache_info: RedisCacheInfo, value):
|
||||
"""
|
||||
根据 RedisCacheInfo 设置 Redis 缓存
|
||||
:param redis_cache_info: RedisCacheInfo缓存信息对象
|
||||
:param value: 缓存的值
|
||||
:return:
|
||||
"""
|
||||
await cls.client.setex(redis_cache_info.key, redis_cache_info.timeout, value)
|
||||
|
||||
@classmethod
|
||||
async def get_with_cache_info(cls, redis_cache_info: RedisCacheInfo):
|
||||
"""
|
||||
根据 RedisCacheInfo 获取 Redis 缓存
|
||||
:param redis_cache_info: RedisCacheInfo 缓存信息对象
|
||||
:return:
|
||||
"""
|
||||
cache_info = await cls.client.get(redis_cache_info.key)
|
||||
return cache_info
|
||||
|
||||
@classmethod
|
||||
async def del_with_cache_info(cls, redis_cache_info: RedisCacheInfo):
|
||||
"""
|
||||
根据 RedisCacheInfo 删除 Redis 缓存
|
||||
:param redis_cache_info: RedisCacheInfo缓存信息对象
|
||||
:return:
|
||||
"""
|
||||
await cls.client.delete(redis_cache_info.key)
|
||||
|
||||
@staticmethod
|
||||
async def get_or_set_cache(cache_info: RedisCacheInfo, fetch_data_func: Callable[[], Awaitable[dict]]) -> dict:
|
||||
"""
|
||||
获取缓存数据,如果缓存不存在,则从提供的函数中获取并设置缓存
|
||||
当前版本仅支持 json 形式的 string 格式数据
|
||||
"""
|
||||
|
||||
serialized_data = await RedisManager.get_with_cache_info(cache_info)
|
||||
|
||||
if serialized_data:
|
||||
return json.loads(serialized_data)
|
||||
|
||||
data = await fetch_data_func()
|
||||
try:
|
||||
serialized_data = json.dumps(data)
|
||||
await RedisManager.set_with_cache_info(cache_info, serialized_data)
|
||||
except Exception as e:
|
||||
logger.warning(f"数据 {data} 通过 json 进行序列化缓存失败:{e}")
|
||||
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def is_valid(cls):
|
||||
return cls.client is not None
|
||||
|
||||
|
||||
class Redis:
|
||||
def __init__(self, conf: Dict = None):
|
||||
try:
|
||||
host = CONFIG.REDIS_HOST
|
||||
port = int(CONFIG.REDIS_PORT)
|
||||
pwd = CONFIG.REDIS_PASSWORD
|
||||
db = CONFIG.REDIS_DB
|
||||
RedisManager.init_redis_conn(host=host, port=port, password=pwd, db=db)
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis initialization has failed:{e}")
|
||||
|
||||
def is_valid(self):
|
||||
return RedisManager.is_valid()
|
||||
|
||||
async def get(self, key: str) -> str:
|
||||
if not self.is_valid() or not key:
|
||||
return None
|
||||
try:
|
||||
v = await RedisManager.get_with_cache_info(redis_cache_info=RedisCacheInfo(key=key))
|
||||
return v
|
||||
except Exception as e:
|
||||
logger.exception(f"{e}, stack:{traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
async def set(self, key: str, data: str, timeout_sec: int):
|
||||
if not self.is_valid() or not key:
|
||||
return
|
||||
try:
|
||||
await RedisManager.set_with_cache_info(
|
||||
redis_cache_info=RedisCacheInfo(key=key, timeout=timeout_sec), value=data
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"{e}, stack:{traceback.format_exc()}")
|
||||
154
metagpt/utils/s3.py
Normal file
154
metagpt/utils/s3.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
import base64
|
||||
import os.path
|
||||
import traceback
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import aioboto3
|
||||
import aiofiles
|
||||
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.const import BASE64_FORMAT
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
class S3:
|
||||
"""A class for interacting with Amazon S3 storage."""
|
||||
|
||||
def __init__(self):
|
||||
self.session = aioboto3.Session()
|
||||
self.auth_config = {
|
||||
"service_name": "s3",
|
||||
"aws_access_key_id": CONFIG.S3_ACCESS_KEY,
|
||||
"aws_secret_access_key": CONFIG.S3_SECRET_KEY,
|
||||
"endpoint_url": CONFIG.S3_ENDPOINT_URL,
|
||||
}
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
bucket: str,
|
||||
local_path: str,
|
||||
object_name: str,
|
||||
) -> None:
|
||||
"""Upload a file from the local path to the specified path of the storage bucket specified in s3.
|
||||
|
||||
Args:
|
||||
bucket: The name of the S3 storage bucket.
|
||||
local_path: The local file path, including the file name.
|
||||
object_name: The complete path of the uploaded file to be stored in S3, including the file name.
|
||||
|
||||
Raises:
|
||||
Exception: If an error occurs during the upload process, an exception is raised.
|
||||
"""
|
||||
try:
|
||||
async with self.session.client(**self.auth_config) as client:
|
||||
async with aiofiles.open(local_path, mode="rb") as reader:
|
||||
body = await reader.read()
|
||||
await client.put_object(Body=body, Bucket=bucket, Key=object_name)
|
||||
logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}")
|
||||
raise e
|
||||
|
||||
async def get_object_url(
|
||||
self,
|
||||
bucket: str,
|
||||
object_name: str,
|
||||
) -> str:
|
||||
"""Get the URL for a downloadable or preview file stored in the specified S3 bucket.
|
||||
|
||||
Args:
|
||||
bucket: The name of the S3 storage bucket.
|
||||
object_name: The complete path of the file stored in S3, including the file name.
|
||||
|
||||
Returns:
|
||||
The URL for the downloadable or preview file.
|
||||
|
||||
Raises:
|
||||
Exception: If an error occurs while retrieving the URL, an exception is raised.
|
||||
"""
|
||||
try:
|
||||
async with self.session.client(**self.auth_config) as client:
|
||||
file = await client.get_object(Bucket=bucket, Key=object_name)
|
||||
return str(file["Body"].url)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get the url for a downloadable or preview file: {e}")
|
||||
raise e
|
||||
|
||||
async def get_object(
|
||||
self,
|
||||
bucket: str,
|
||||
object_name: str,
|
||||
) -> bytes:
|
||||
"""Get the binary data of a file stored in the specified S3 bucket.
|
||||
|
||||
Args:
|
||||
bucket: The name of the S3 storage bucket.
|
||||
object_name: The complete path of the file stored in S3, including the file name.
|
||||
|
||||
Returns:
|
||||
The binary data of the requested file.
|
||||
|
||||
Raises:
|
||||
Exception: If an error occurs while retrieving the file data, an exception is raised.
|
||||
"""
|
||||
try:
|
||||
async with self.session.client(**self.auth_config) as client:
|
||||
s3_object = await client.get_object(Bucket=bucket, Key=object_name)
|
||||
return await s3_object["Body"].read()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get the binary data of the file: {e}")
|
||||
raise e
|
||||
|
||||
async def download_file(
|
||||
self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024
|
||||
) -> None:
|
||||
"""Download an S3 object to a local file.
|
||||
|
||||
Args:
|
||||
bucket: The name of the S3 storage bucket.
|
||||
object_name: The complete path of the file stored in S3, including the file name.
|
||||
local_path: The local file path where the S3 object will be downloaded.
|
||||
chunk_size: The size of data chunks to read and write at a time. Default is 128 KB.
|
||||
|
||||
Raises:
|
||||
Exception: If an error occurs during the download process, an exception is raised.
|
||||
"""
|
||||
try:
|
||||
async with self.session.client(**self.auth_config) as client:
|
||||
s3_object = await client.get_object(Bucket=bucket, Key=object_name)
|
||||
stream = s3_object["Body"]
|
||||
async with aiofiles.open(local_path, mode="wb") as writer:
|
||||
while True:
|
||||
file_data = await stream.read(chunk_size)
|
||||
if not file_data:
|
||||
break
|
||||
await writer.write(file_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download the file from S3: {e}")
|
||||
raise e
|
||||
|
||||
async def cache(self, data: str, file_ext: str, format: str = "") -> str:
|
||||
"""Save data to remote S3 and return url"""
|
||||
object_name = uuid.uuid4().hex + file_ext
|
||||
path = Path(__file__).parent
|
||||
pathname = path / object_name
|
||||
try:
|
||||
async with aiofiles.open(str(pathname), mode="wb") as file:
|
||||
if format == BASE64_FORMAT:
|
||||
data = base64.b64decode(data)
|
||||
await file.write(data)
|
||||
|
||||
bucket = CONFIG.S3_BUCKET
|
||||
object_pathname = CONFIG.S3_BUCKET or "system"
|
||||
object_pathname += f"/{object_name}"
|
||||
object_pathname = os.path.normpath(object_pathname)
|
||||
await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname)
|
||||
pathname.unlink(missing_ok=True)
|
||||
|
||||
return await self.get_object_url(bucket=bucket, object_name=object_pathname)
|
||||
except Exception as e:
|
||||
logger.exception(f"{e}, stack:{traceback.format_exc()}")
|
||||
pathname.unlink(missing_ok=True)
|
||||
return None
|
||||
Loading…
Add table
Add a link
Reference in a new issue