refactor: timeout 0

This commit is contained in:
莘权 马 2024-03-21 13:21:24 +08:00
parent 7c8f57e46c
commit b42cf5cbd6
14 changed files with 74 additions and 49 deletions

View file

@ -4,7 +4,7 @@ llm:
api_key: "YOUR_API_KEY"
model: "gpt-4-turbo-preview" # or gpt-3.5-turbo-1106 / gpt-4-1106-preview
proxy: "YOUR_PROXY" # for LLM API requests
# timeout: 600 # Optional.
# timeout: 600 # Optional. If set to 0, default value is 300.
pricing_plan: "" # Optional. If invalid, it will be automatically filled in with the value of the `model`.
# Azure-exclusive pricing plan mappings
# - gpt-3.5-turbo 4k: "gpt-3.5-turbo-1106"

View file

@ -17,6 +17,7 @@ from pydantic import BaseModel, Field, create_model, model_validator
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action_outcls_registry import register_action_outcls
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.llm import BaseLLM
from metagpt.logs import logger
from metagpt.provider.postprocess.llm_output_postprocess import llm_output_postprocess
@ -416,7 +417,7 @@ class ActionNode:
images: Optional[Union[str, list[str]]] = None,
system_msgs: Optional[list[str]] = None,
schema="markdown", # compatible to original format
timeout=0,
timeout=USE_CONFIG_TIMEOUT,
) -> (str, BaseModel):
"""Use ActionOutput to wrap the output of aask"""
content = await self.llm.aask(prompt, system_msgs, images=images, timeout=timeout)
@ -448,7 +449,9 @@ class ActionNode:
def set_context(self, context):
self.set_recursive("context", context)
async def simple_fill(self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=0, exclude=None):
async def simple_fill(
self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=USE_CONFIG_TIMEOUT, exclude=None
):
prompt = self.compile(context=self.context, schema=schema, mode=mode, exclude=exclude)
if schema != "raw":
@ -473,7 +476,7 @@ class ActionNode:
mode="auto",
strgy="simple",
images: Optional[Union[str, list[str]]] = None,
timeout=0,
timeout=USE_CONFIG_TIMEOUT,
exclude=[],
):
"""Fill the node(s) with mode.

View file

@ -10,6 +10,7 @@ from typing import Optional
from pydantic import field_validator
from metagpt.const import LLM_API_TIMEOUT
from metagpt.utils.yaml_model import YamlModel
@ -88,3 +89,8 @@ class LLMConfig(YamlModel):
if v in ["", None, "YOUR_API_KEY"]:
raise ValueError("Please set your API key in config2.yaml")
return v
@field_validator("timeout")
@classmethod
def check_timeout(cls, v):
return v or LLM_API_TIMEOUT

View file

@ -123,7 +123,6 @@ BASE64_FORMAT = "base64"
# REDIS
REDIS_KEY = "REDIS_KEY"
LLM_API_TIMEOUT = 300
# Message id
IGNORED_MESSAGE_ID = "0"
@ -132,3 +131,7 @@ IGNORED_MESSAGE_ID = "0"
GENERALIZATION = "Generalize"
COMPOSITION = "Composite"
AGGREGATION = "Aggregate"
# Timeout
USE_CONFIG_TIMEOUT = 0 # Using llm.timeout configuration.
LLM_API_TIMEOUT = 300

View file

@ -5,6 +5,7 @@ from anthropic import AsyncAnthropic
from anthropic.types import Message, Usage
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import register_provider
@ -41,15 +42,15 @@ class AnthropicLLM(BaseLLM):
def get_choice_text(self, resp: Message) -> str:
return resp.content[0].text
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> Message:
async def _achat_completion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> Message:
resp: Message = await self.aclient.messages.create(**self._const_kwargs(messages))
self._update_costs(resp.usage, self.model)
return resp
async def acompletion(self, messages: list[dict], timeout: int = 0) -> Message:
async def acompletion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> Message:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
stream = await self.aclient.messages.create(**self._const_kwargs(messages, stream=True))
collected_content = []
usage = Usage(input_tokens=0, output_tokens=0)

View file

@ -23,7 +23,7 @@ from tenacity import (
)
from metagpt.configs.llm_config import LLMConfig
from metagpt.const import LLM_API_TIMEOUT
from metagpt.const import LLM_API_TIMEOUT, USE_CONFIG_TIMEOUT
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import log_and_reraise
@ -109,7 +109,7 @@ class BaseLLM(ABC):
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
timeout=0,
timeout=USE_CONFIG_TIMEOUT,
stream=True,
) -> str:
if system_msgs:
@ -131,7 +131,7 @@ class BaseLLM(ABC):
def _extract_assistant_rsp(self, context):
return "\n".join([i["content"] for i in context if i["role"] == "assistant"])
async def aask_batch(self, msgs: list, timeout=0) -> str:
async def aask_batch(self, msgs: list, timeout=USE_CONFIG_TIMEOUT) -> str:
"""Sequential questioning"""
context = []
for msg in msgs:
@ -141,15 +141,15 @@ class BaseLLM(ABC):
context.append(self._assistant_msg(rsp_text))
return self._extract_assistant_rsp(context)
async def aask_code(self, messages: Union[str, Message, list[dict]], timeout=0, **kwargs) -> dict:
async def aask_code(self, messages: Union[str, Message, list[dict]], timeout=USE_CONFIG_TIMEOUT, **kwargs) -> dict:
raise NotImplementedError
@abstractmethod
async def _achat_completion(self, messages: list[dict], timeout=0):
async def _achat_completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
"""_achat_completion implemented by inherited class"""
@abstractmethod
async def acompletion(self, messages: list[dict], timeout=0):
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
"""Asynchronous version of completion
All GPTAPIs are required to provide the standard OpenAI completion interface
[
@ -160,7 +160,7 @@ class BaseLLM(ABC):
"""
@abstractmethod
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
"""_achat_completion_stream implemented by inherited class"""
@retry(
@ -170,7 +170,9 @@ class BaseLLM(ABC):
retry=retry_if_exception_type(ConnectionError),
retry_error_callback=log_and_reraise,
)
async def acompletion_text(self, messages: list[dict], stream: bool = False, timeout: int = 0) -> str:
async def acompletion_text(
self, messages: list[dict], stream: bool = False, timeout: int = USE_CONFIG_TIMEOUT
) -> str:
"""Asynchronous version of completion. Return str. Support stream-print"""
if stream:
return await self._achat_completion_stream(messages, timeout=self.get_timeout(timeout))

View file

@ -25,6 +25,7 @@ from dashscope.common.error import (
UnsupportedApiProtocol,
)
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM, LLMConfig
from metagpt.provider.llm_provider_registry import LLMType, register_provider
@ -202,16 +203,16 @@ class DashScopeLLM(BaseLLM):
self._update_costs(dict(resp.usage))
return resp.output
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> GenerationOutput:
async def _achat_completion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> GenerationOutput:
resp: GenerationResponse = await self.aclient.acall(**self._const_kwargs(messages, stream=False))
self._check_response(resp)
self._update_costs(dict(resp.usage))
return resp.output
async def acompletion(self, messages: list[dict], timeout=0) -> GenerationOutput:
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> GenerationOutput:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
resp = await self.aclient.acall(**self._const_kwargs(messages, stream=True))
collected_content = []
usage = {}

View file

@ -15,6 +15,7 @@ from google.generativeai.types.generation_types import (
)
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import register_provider
@ -88,16 +89,18 @@ class GeminiLLM(BaseLLM):
self._update_costs(usage)
return resp
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> "AsyncGenerateContentResponse":
async def _achat_completion(
self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT
) -> "AsyncGenerateContentResponse":
resp: AsyncGenerateContentResponse = await self.llm.generate_content_async(**self._const_kwargs(messages))
usage = await self.aget_usage(messages, resp.text)
self._update_costs(usage)
return resp
async def acompletion(self, messages: list[dict], timeout=0) -> dict:
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> dict:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
resp: AsyncGenerateContentResponse = await self.llm.generate_content_async(
**self._const_kwargs(messages, stream=True)
)

View file

@ -6,6 +6,7 @@ Author: garylin2099
from typing import Optional
from metagpt.configs.llm_config import LLMConfig
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import logger
from metagpt.provider.base_llm import BaseLLM
@ -18,7 +19,7 @@ class HumanProvider(BaseLLM):
def __init__(self, config: LLMConfig):
pass
def ask(self, msg: str, timeout=0) -> str:
def ask(self, msg: str, timeout=USE_CONFIG_TIMEOUT) -> str:
logger.info("It's your turn, please type in your response. You may also refer to the context below")
rsp = input(msg)
if rsp in ["exit", "quit"]:
@ -31,20 +32,20 @@ class HumanProvider(BaseLLM):
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
generator: bool = False,
timeout=0,
timeout=USE_CONFIG_TIMEOUT,
) -> str:
return self.ask(msg, timeout=self.get_timeout(timeout))
async def _achat_completion(self, messages: list[dict], timeout=0):
async def _achat_completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
pass
async def acompletion(self, messages: list[dict], timeout=0):
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
"""dummy implementation of abstract method in base"""
return []
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
pass
async def acompletion_text(self, messages: list[dict], stream=False, timeout=0) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout=USE_CONFIG_TIMEOUT) -> str:
"""dummy implementation of abstract method in base"""
return ""

View file

@ -5,6 +5,7 @@
import json
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.general_api_requestor import GeneralAPIRequestor
@ -49,7 +50,7 @@ class OllamaLLM(BaseLLM):
chunk = chunk.decode(encoding)
return json.loads(chunk)
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> dict:
async def _achat_completion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> dict:
resp, _, _ = await self.client.arequest(
method=self.http_method,
url=self.suffix_url,
@ -61,10 +62,10 @@ class OllamaLLM(BaseLLM):
self._update_costs(usage)
return resp
async def acompletion(self, messages: list[dict], timeout=0) -> dict:
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> dict:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
stream_resp, _, _ = await self.client.arequest(
method=self.http_method,
url=self.suffix_url,

View file

@ -25,6 +25,7 @@ from tenacity import (
)
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream, logger
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.constant import GENERAL_FUNCTION_SCHEMA
@ -79,7 +80,7 @@ class OpenAILLM(BaseLLM):
return params
async def _achat_completion_stream(self, messages: list[dict], timeout=0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> str:
response: AsyncStream[ChatCompletionChunk] = await self.aclient.chat.completions.create(
**self._cons_kwargs(messages, timeout=self.get_timeout(timeout)), stream=True
)
@ -109,7 +110,7 @@ class OpenAILLM(BaseLLM):
self._update_costs(usage)
return full_reply_content
def _cons_kwargs(self, messages: list[dict], timeout=0, **extra_kwargs) -> dict:
def _cons_kwargs(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT, **extra_kwargs) -> dict:
kwargs = {
"messages": messages,
"max_tokens": self._get_max_tokens(messages),
@ -123,13 +124,13 @@ class OpenAILLM(BaseLLM):
kwargs.update(extra_kwargs)
return kwargs
async def _achat_completion(self, messages: list[dict], timeout=0) -> ChatCompletion:
async def _achat_completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> ChatCompletion:
kwargs = self._cons_kwargs(messages, timeout=self.get_timeout(timeout))
rsp: ChatCompletion = await self.aclient.chat.completions.create(**kwargs)
self._update_costs(rsp.usage)
return rsp
async def acompletion(self, messages: list[dict], timeout=0) -> ChatCompletion:
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> ChatCompletion:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
@retry(
@ -139,7 +140,7 @@ class OpenAILLM(BaseLLM):
retry=retry_if_exception_type(APIConnectionError),
retry_error_callback=log_and_reraise,
)
async def acompletion_text(self, messages: list[dict], stream=False, timeout=0) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout=USE_CONFIG_TIMEOUT) -> str:
"""when streaming, print each token in place."""
if stream:
return await self._achat_completion_stream(messages, timeout=timeout)
@ -148,7 +149,7 @@ class OpenAILLM(BaseLLM):
return self.get_choice_text(rsp)
async def _achat_completion_function(
self, messages: list[dict], timeout: int = 0, **chat_configs
self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT, **chat_configs
) -> ChatCompletion:
messages = process_message(messages)
kwargs = self._cons_kwargs(messages=messages, timeout=self.get_timeout(timeout), **chat_configs)
@ -156,7 +157,7 @@ class OpenAILLM(BaseLLM):
self._update_costs(rsp.usage)
return rsp
async def aask_code(self, messages: list[dict], timeout: int = 0, **kwargs) -> dict:
async def aask_code(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT, **kwargs) -> dict:
"""Use function of tools to ask a code.
Note: Keep kwargs consistent with https://platform.openai.com/docs/api-reference/chat/create

View file

@ -9,6 +9,7 @@ from qianfan import ChatCompletion
from qianfan.resources.typing import JsonBody
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import register_provider
@ -107,15 +108,15 @@ class QianFanLLM(BaseLLM):
self._update_costs(resp.body.get("usage", {}))
return resp.body
async def _achat_completion(self, messages: list[dict], timeout: int = 0) -> JsonBody:
async def _achat_completion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> JsonBody:
resp = await self.aclient.ado(**self._const_kwargs(messages=messages, stream=False))
self._update_costs(resp.body.get("usage", {}))
return resp.body
async def acompletion(self, messages: list[dict], timeout: int = 0) -> JsonBody:
async def acompletion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> JsonBody:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
resp = await self.aclient.ado(**self._const_kwargs(messages=messages, stream=True))
collected_content = []
usage = {}

View file

@ -17,6 +17,7 @@ from wsgiref.handlers import format_date_time
import websocket # 使用websocket_client
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import logger
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import register_provider
@ -31,19 +32,19 @@ class SparkLLM(BaseLLM):
def get_choice_text(self, rsp: dict) -> str:
return rsp["payload"]["choices"]["text"][-1]["content"]
async def _achat_completion_stream(self, messages: list[dict], timeout: int = 0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
pass
async def acompletion_text(self, messages: list[dict], stream=False, timeout: int = 0) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout: int = USE_CONFIG_TIMEOUT) -> str:
# 不支持
# logger.warning("当前方法无法支持异步运行。当你使用acompletion时并不能并行访问。")
w = GetMessageFromWeb(messages, self.config)
return w.run()
async def _achat_completion(self, messages: list[dict], timeout=0):
async def _achat_completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
pass
async def acompletion(self, messages: list[dict], timeout=0):
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT):
# 不支持异步
w = GetMessageFromWeb(messages, self.config)
return w.run()

View file

@ -8,6 +8,7 @@ from typing import Optional
from zhipuai.types.chat.chat_completion import Completion
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.logs import log_llm_stream
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import register_provider
@ -45,22 +46,22 @@ class ZhiPuAILLM(BaseLLM):
kwargs = {"model": self.model, "messages": messages, "stream": stream, "temperature": 0.3}
return kwargs
def completion(self, messages: list[dict], timeout=0) -> dict:
def completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> dict:
resp: Completion = self.llm.chat.completions.create(**self._const_kwargs(messages))
usage = resp.usage.model_dump()
self._update_costs(usage)
return resp.model_dump()
async def _achat_completion(self, messages: list[dict], timeout=0) -> dict:
async def _achat_completion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> dict:
resp = await self.llm.acreate(**self._const_kwargs(messages))
usage = resp.get("usage", {})
self._update_costs(usage)
return resp
async def acompletion(self, messages: list[dict], timeout=0) -> dict:
async def acompletion(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> dict:
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
async def _achat_completion_stream(self, messages: list[dict], timeout=0) -> str:
async def _achat_completion_stream(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> str:
response = await self.llm.acreate_stream(**self._const_kwargs(messages, stream=True))
collected_content = []
usage = {}