Merge pull request #530 from seehi/feature-openai-v1

Support new openai package
This commit is contained in:
geekan 2023-12-21 13:54:04 +08:00 committed by GitHub
commit ef69b84a1c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 935 additions and 107 deletions

View file

@ -5,10 +5,10 @@
# WORKSPACE_PATH: "Path for placing output files"
#### if OpenAI
## The official OPENAI_API_BASE is https://api.openai.com/v1
## If the official OPENAI_API_BASE is not available, we recommend using the [openai-forward](https://github.com/beidongjiedeguang/openai-forward).
## Or, you can configure OPENAI_PROXY to access official OPENAI_API_BASE.
OPENAI_API_BASE: "https://api.openai.com/v1"
## The official OPENAI_BASE_URL is https://api.openai.com/v1
## If the official OPENAI_BASE_URL is not available, we recommend using the [openai-forward](https://github.com/beidongjiedeguang/openai-forward).
## Or, you can configure OPENAI_PROXY to access official OPENAI_BASE_URL.
OPENAI_BASE_URL: "https://api.openai.com/v1"
#OPENAI_PROXY: "http://127.0.0.1:8118"
#OPENAI_API_KEY: "YOUR_API_KEY" # set the value to sk-xxx if you host the openai interface for open llm model
OPENAI_API_MODEL: "gpt-4-1106-preview"
@ -26,13 +26,11 @@ RPM: 10
#ANTHROPIC_API_KEY: "YOUR_API_KEY"
#### if AZURE, check https://github.com/openai/openai-cookbook/blob/main/examples/azure/chat.ipynb
#### You can use ENGINE or DEPLOYMENT mode
#OPENAI_API_TYPE: "azure"
#OPENAI_API_BASE: "YOUR_AZURE_ENDPOINT"
#OPENAI_BASE_URL: "YOUR_AZURE_ENDPOINT"
#OPENAI_API_KEY: "YOUR_AZURE_API_KEY"
#OPENAI_API_VERSION: "YOUR_AZURE_API_VERSION"
#DEPLOYMENT_NAME: "YOUR_DEPLOYMENT_NAME"
#DEPLOYMENT_ID: "YOUR_DEPLOYMENT_ID"
#### if zhipuai from `https://open.bigmodel.cn`. You can set here or export API_KEY="YOUR_API_KEY"
# ZHIPUAI_API_KEY: "YOUR_API_KEY"

View file

@ -83,10 +83,10 @@
1. PRD stuck / unable to access/ connection interrupted
1. The official OPENAI_API_BASE address is `https://api.openai.com/v1`
1. If the official OPENAI_API_BASE address is inaccessible in your environment (this can be verified with curl), it's recommended to configure using the reverse proxy OPENAI_API_BASE provided by libraries such as openai-forward. For instance, `OPENAI_API_BASE: "``https://api.openai-forward.com/v1``"`
1. If the official OPENAI_API_BASE address is inaccessible in your environment (again, verifiable via curl), another option is to configure the OPENAI_PROXY parameter. This way, you can access the official OPENAI_API_BASE via a local proxy. If you don't need to access via a proxy, please do not enable this configuration; if accessing through a proxy is required, modify it to the correct proxy address. Note that when OPENAI_PROXY is enabled, don't set OPENAI_API_BASE.
1. Note: OpenAI's default API design ends with a v1. An example of the correct configuration is: `OPENAI_API_BASE: "``https://api.openai.com/v1``"`
1. The official OPENAI_BASE_URL address is `https://api.openai.com/v1`
1. If the official OPENAI_BASE_URL address is inaccessible in your environment (this can be verified with curl), it's recommended to configure using the reverse proxy OPENAI_BASE_URL provided by libraries such as openai-forward. For instance, `OPENAI_BASE_URL: "``https://api.openai-forward.com/v1``"`
1. If the official OPENAI_BASE_URL address is inaccessible in your environment (again, verifiable via curl), another option is to configure the OPENAI_PROXY parameter. This way, you can access the official OPENAI_BASE_URL via a local proxy. If you don't need to access via a proxy, please do not enable this configuration; if accessing through a proxy is required, modify it to the correct proxy address. Note that when OPENAI_PROXY is enabled, don't set OPENAI_BASE_URL.
1. Note: OpenAI's default API design ends with a v1. An example of the correct configuration is: `OPENAI_BASE_URL: "``https://api.openai.com/v1``"`
1. Absolutely! How can I assist you today?

View file

@ -219,7 +219,7 @@ # 設定ファイルをコピーし、必要な修正を加える。
| 変数名 | config/key.yaml | env |
| --------------------------------------- | ----------------------------------------- | ----------------------------------------------- |
| OPENAI_API_KEY # 自分のキーに置き換える | OPENAI_API_KEY: "sk-..." | export OPENAI_API_KEY="sk-..." |
| OPENAI_API_BASE # オプション | OPENAI_API_BASE: "https://<YOUR_SITE>/v1" | export OPENAI_API_BASE="https://<YOUR_SITE>/v1" |
| OPENAI_BASE_URL # オプション | OPENAI_BASE_URL: "https://<YOUR_SITE>/v1" | export OPENAI_BASE_URL="https://<YOUR_SITE>/v1" |
## チュートリアル: スタートアップの開始

View file

@ -13,7 +13,7 @@ # Copy the configuration file and make the necessary modifications.
| Variable Name | config/key.yaml | env |
| ------------------------------------------ | ----------------------------------------- | ----------------------------------------------- |
| OPENAI_API_KEY # Replace with your own key | OPENAI_API_KEY: "sk-..." | export OPENAI_API_KEY="sk-..." |
| OPENAI_API_BASE # Optional | OPENAI_API_BASE: "https://<YOUR_SITE>/v1" | export OPENAI_API_BASE="https://<YOUR_SITE>/v1" |
| OPENAI_BASE_URL # Optional | OPENAI_BASE_URL: "https://<YOUR_SITE>/v1" | export OPENAI_BASE_URL="https://<YOUR_SITE>/v1" |
### Initiating a startup

View file

@ -13,7 +13,7 @@ # 复制配置文件并进行必要的修改
| 变量名 | config/key.yaml | env |
| ----------------------------------- | ----------------------------------------- | ----------------------------------------------- |
| OPENAI_API_KEY # 用您自己的密钥替换 | OPENAI_API_KEY: "sk-..." | export OPENAI_API_KEY="sk-..." |
| OPENAI_API_BASE # 可选 | OPENAI_API_BASE: "https://<YOUR_SITE>/v1" | export OPENAI_API_BASE="https://<YOUR_SITE>/v1" |
| OPENAI_BASE_URL # 可选 | OPENAI_BASE_URL: "https://<YOUR_SITE>/v1" | export OPENAI_BASE_URL="https://<YOUR_SITE>/v1" |
### 示例:启动一个创业公司

View file

@ -98,15 +98,14 @@ class Config(metaclass=Singleton):
self.fireworks_api_key = self._get("FIREWORKS_API_KEY")
_ = self.get_default_llm_provider_enum()
self.openai_api_base = self._get("OPENAI_API_BASE")
self.openai_base_url = self._get("OPENAI_BASE_URL")
self.openai_proxy = self._get("OPENAI_PROXY") or self.global_proxy
self.openai_api_type = self._get("OPENAI_API_TYPE")
self.openai_api_version = self._get("OPENAI_API_VERSION")
self.openai_api_rpm = self._get("RPM", 3)
self.openai_api_model = self._get("OPENAI_API_MODEL", "gpt-4-1106-preview")
self.max_tokens_rsp = self._get("MAX_TOKENS", 2048)
self.deployment_name = self._get("DEPLOYMENT_NAME")
self.deployment_id = self._get("DEPLOYMENT_ID")
self.deployment_name = self._get("DEPLOYMENT_NAME", "gpt-4")
self.spark_appid = self._get("SPARK_APPID")
self.spark_api_secret = self._get("SPARK_API_SECRET")

View file

@ -0,0 +1,718 @@
import asyncio
import json
import os
import platform
import re
import sys
import threading
import time
from contextlib import asynccontextmanager
from enum import Enum
from typing import (
AsyncGenerator,
AsyncIterator,
Callable,
Dict,
Iterator,
Optional,
Tuple,
Union,
overload,
)
from urllib.parse import urlencode, urlsplit, urlunsplit
import aiohttp
import requests
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
import logging
import openai
from openai import version
logger = logging.getLogger("openai")
TIMEOUT_SECS = 600
MAX_SESSION_LIFETIME_SECS = 180
MAX_CONNECTION_RETRIES = 2
# Has one attribute per thread, 'session'.
_thread_context = threading.local()
OPENAI_LOG = os.environ.get("OPENAI_LOG")
OPENAI_LOG = "debug"
class ApiType(Enum):
AZURE = 1
OPEN_AI = 2
AZURE_AD = 3
@staticmethod
def from_str(label):
if label.lower() == "azure":
return ApiType.AZURE
elif label.lower() in ("azure_ad", "azuread"):
return ApiType.AZURE_AD
elif label.lower() in ("open_ai", "openai"):
return ApiType.OPEN_AI
else:
raise openai.OpenAIError(
"The API type provided in invalid. Please select one of the supported API types: 'azure', 'azure_ad', 'open_ai'"
)
api_key_to_header = (
lambda api, key: {"Authorization": f"Bearer {key}"}
if api in (ApiType.OPEN_AI, ApiType.AZURE_AD)
else {"api-key": f"{key}"}
)
def _console_log_level():
if OPENAI_LOG in ["debug", "info"]:
return OPENAI_LOG
else:
return None
def log_debug(message, **params):
msg = logfmt(dict(message=message, **params))
if _console_log_level() == "debug":
print(msg, file=sys.stderr)
logger.debug(msg)
def log_info(message, **params):
msg = logfmt(dict(message=message, **params))
if _console_log_level() in ["debug", "info"]:
print(msg, file=sys.stderr)
logger.info(msg)
def log_warn(message, **params):
msg = logfmt(dict(message=message, **params))
print(msg, file=sys.stderr)
logger.warn(msg)
def logfmt(props):
def fmt(key, val):
# Handle case where val is a bytes or bytesarray
if hasattr(val, "decode"):
val = val.decode("utf-8")
# Check if val is already a string to avoid re-encoding into ascii.
if not isinstance(val, str):
val = str(val)
if re.search(r"\s", val):
val = repr(val)
# key should already be a string
if re.search(r"\s", key):
key = repr(key)
return "{key}={val}".format(key=key, val=val)
return " ".join([fmt(key, val) for key, val in sorted(props.items())])
class OpenAIResponse:
def __init__(self, data, headers):
self._headers = headers
self.data = data
@property
def request_id(self) -> Optional[str]:
return self._headers.get("request-id")
@property
def retry_after(self) -> Optional[int]:
try:
return int(self._headers.get("retry-after"))
except TypeError:
return None
@property
def operation_location(self) -> Optional[str]:
return self._headers.get("operation-location")
@property
def organization(self) -> Optional[str]:
return self._headers.get("OpenAI-Organization")
@property
def response_ms(self) -> Optional[int]:
h = self._headers.get("Openai-Processing-Ms")
return None if h is None else round(float(h))
def _build_api_url(url, query):
scheme, netloc, path, base_query, fragment = urlsplit(url)
if base_query:
query = "%s&%s" % (base_query, query)
return urlunsplit((scheme, netloc, path, query, fragment))
def _requests_proxies_arg(proxy) -> Optional[Dict[str, str]]:
"""Returns a value suitable for the 'proxies' argument to 'requests.request."""
if proxy is None:
return None
elif isinstance(proxy, str):
return {"http": proxy, "https": proxy}
elif isinstance(proxy, dict):
return proxy.copy()
else:
raise ValueError(
"'openai.proxy' must be specified as either a string URL or a dict with string URL under the https and/or http keys."
)
def _aiohttp_proxies_arg(proxy) -> Optional[str]:
"""Returns a value suitable for the 'proxies' argument to 'aiohttp.ClientSession.request."""
if proxy is None:
return None
elif isinstance(proxy, str):
return proxy
elif isinstance(proxy, dict):
return proxy["https"] if "https" in proxy else proxy["http"]
else:
raise ValueError(
"'openai.proxy' must be specified as either a string URL or a dict with string URL under the https and/or http keys."
)
def _make_session() -> requests.Session:
s = requests.Session()
s.mount(
"https://",
requests.adapters.HTTPAdapter(max_retries=MAX_CONNECTION_RETRIES),
)
return s
def parse_stream_helper(line: bytes) -> Optional[str]:
if line:
if line.strip() == b"data: [DONE]":
# return here will cause GeneratorExit exception in urllib3
# and it will close http connection with TCP Reset
return None
if line.startswith(b"data: "):
line = line[len(b"data: ") :]
return line.decode("utf-8")
else:
return None
return None
def parse_stream(rbody: Iterator[bytes]) -> Iterator[str]:
for line in rbody:
_line = parse_stream_helper(line)
if _line is not None:
yield _line
async def parse_stream_async(rbody: aiohttp.StreamReader):
async for line in rbody:
_line = parse_stream_helper(line)
if _line is not None:
yield _line
class APIRequestor:
def __init__(
self,
key=None,
base_url=None,
api_type=None,
api_version=None,
organization=None,
):
self.base_url = base_url or openai.base_url
self.api_key = key or openai.api_key
self.api_type = ApiType.from_str(api_type) if api_type else ApiType.from_str("openai")
self.api_version = api_version or openai.api_version
self.organization = organization or openai.organization
def _check_polling_response(self, response: OpenAIResponse, predicate: Callable[[OpenAIResponse], bool]):
if not predicate(response):
return
error_data = response.data["error"]
message = error_data.get("message", "Operation failed")
code = error_data.get("code")
raise openai.APIError(message=message, body=dict(code=code))
def _poll(
self, method, url, until, failed, params=None, headers=None, interval=None, delay=None
) -> Tuple[Iterator[OpenAIResponse], bool, str]:
if delay:
time.sleep(delay)
response, b, api_key = self.request(method, url, params, headers)
self._check_polling_response(response, failed)
start_time = time.time()
while not until(response):
if time.time() - start_time > TIMEOUT_SECS:
raise openai.APITimeoutError("Operation polling timed out.")
time.sleep(interval or response.retry_after or 10)
response, b, api_key = self.request(method, url, params, headers)
self._check_polling_response(response, failed)
response.data = response.data["result"]
return response, b, api_key
async def _apoll(
self, method, url, until, failed, params=None, headers=None, interval=None, delay=None
) -> Tuple[Iterator[OpenAIResponse], bool, str]:
if delay:
await asyncio.sleep(delay)
response, b, api_key = await self.arequest(method, url, params, headers)
self._check_polling_response(response, failed)
start_time = time.time()
while not until(response):
if time.time() - start_time > TIMEOUT_SECS:
raise openai.APITimeoutError("Operation polling timed out.")
await asyncio.sleep(interval or response.retry_after or 10)
response, b, api_key = await self.arequest(method, url, params, headers)
self._check_polling_response(response, failed)
response.data = response.data["result"]
return response, b, api_key
@overload
def request(
self,
method,
url,
params,
headers,
files,
stream: Literal[True],
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[Iterator[OpenAIResponse], bool, str]:
pass
@overload
def request(
self,
method,
url,
params=...,
headers=...,
files=...,
*,
stream: Literal[True],
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[Iterator[OpenAIResponse], bool, str]:
pass
@overload
def request(
self,
method,
url,
params=...,
headers=...,
files=...,
stream: Literal[False] = ...,
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[OpenAIResponse, bool, str]:
pass
@overload
def request(
self,
method,
url,
params=...,
headers=...,
files=...,
stream: bool = ...,
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool, str]:
pass
def request(
self,
method,
url,
params=None,
headers=None,
files=None,
stream: bool = False,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool, str]:
result = self.request_raw(
method.lower(),
url,
params=params,
supplied_headers=headers,
files=files,
stream=stream,
request_id=request_id,
request_timeout=request_timeout,
)
resp, got_stream = self._interpret_response(result, stream)
return resp, got_stream, self.api_key
@overload
async def arequest(
self,
method,
url,
params,
headers,
files,
stream: Literal[True],
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[AsyncGenerator[OpenAIResponse, None], bool, str]:
pass
@overload
async def arequest(
self,
method,
url,
params=...,
headers=...,
files=...,
*,
stream: Literal[True],
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[AsyncGenerator[OpenAIResponse, None], bool, str]:
pass
@overload
async def arequest(
self,
method,
url,
params=...,
headers=...,
files=...,
stream: Literal[False] = ...,
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[OpenAIResponse, bool, str]:
pass
@overload
async def arequest(
self,
method,
url,
params=...,
headers=...,
files=...,
stream: bool = ...,
request_id: Optional[str] = ...,
request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool, str]:
pass
async def arequest(
self,
method,
url,
params=None,
headers=None,
files=None,
stream: bool = False,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool, str]:
ctx = aiohttp_session()
session = await ctx.__aenter__()
try:
result = await self.arequest_raw(
method.lower(),
url,
session,
params=params,
supplied_headers=headers,
files=files,
request_id=request_id,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
await ctx.__aexit__(None, None, None)
raise
if got_stream:
async def wrap_resp():
assert isinstance(resp, AsyncGenerator)
try:
async for r in resp:
yield r
finally:
await ctx.__aexit__(None, None, None)
return wrap_resp(), got_stream, self.api_key
else:
await ctx.__aexit__(None, None, None)
return resp, got_stream, self.api_key
def handle_error_response(self, rbody, rcode, resp, rheaders, stream_error=False):
try:
error_data = resp["error"]
except (KeyError, TypeError):
raise openai.APIError(
"Invalid response object from API: %r (HTTP response code " "was %d)" % (rbody, rcode)
)
if "internal_message" in error_data:
error_data["message"] += "\n\n" + error_data["internal_message"]
log_info(
"OpenAI API error received",
error_code=error_data.get("code"),
error_type=error_data.get("type"),
error_message=error_data.get("message"),
error_param=error_data.get("param"),
stream_error=stream_error,
)
# Rate limits were previously coded as 400's with code 'rate_limit'
if rcode == 429:
return openai.RateLimitError(f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}", body=rbody)
elif rcode in [400, 404, 415]:
return openai.BadRequestError(
message=f'{error_data.get("message")}, {error_data.get("param")}, {error_data.get("code")} {rbody} {rcode} {resp} {rheaders}',
body=rbody,
)
elif rcode == 401:
return openai.AuthenticationError(
f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}", body=rbody
)
elif rcode == 403:
return openai.PermissionDeniedError(
f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}", body=rbody
)
elif rcode == 409:
return openai.ConflictError(f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}", body=rbody)
elif stream_error:
# TODO: we will soon attach status codes to stream errors
parts = [error_data.get("message"), "(Error occurred while streaming.)"]
message = " ".join([p for p in parts if p is not None])
return openai.APIError(f"{message} {rbody} {rcode} {resp} {rheaders}", body=rbody)
else:
return openai.APIError(
f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}",
body=rbody,
)
def request_headers(self, method: str, extra, request_id: Optional[str]) -> Dict[str, str]:
user_agent = "OpenAI/v1 PythonBindings/%s" % (version.VERSION,)
uname_without_node = " ".join(v for k, v in platform.uname()._asdict().items() if k != "node")
ua = {
"bindings_version": version.VERSION,
"httplib": "requests",
"lang": "python",
"lang_version": platform.python_version(),
"platform": platform.platform(),
"publisher": "openai",
"uname": uname_without_node,
}
headers = {
"X-OpenAI-Client-User-Agent": json.dumps(ua),
"User-Agent": user_agent,
}
headers.update(api_key_to_header(self.api_type, self.api_key))
if self.organization:
headers["OpenAI-Organization"] = self.organization
if self.api_version is not None and self.api_type == ApiType.OPEN_AI:
headers["OpenAI-Version"] = self.api_version
if request_id is not None:
headers["X-Request-Id"] = request_id
headers.update(extra)
return headers
def _validate_headers(self, supplied_headers: Optional[Dict[str, str]]) -> Dict[str, str]:
headers: Dict[str, str] = {}
if supplied_headers is None:
return headers
if not isinstance(supplied_headers, dict):
raise TypeError("Headers must be a dictionary")
for k, v in supplied_headers.items():
if not isinstance(k, str):
raise TypeError("Header keys must be strings")
if not isinstance(v, str):
raise TypeError("Header values must be strings")
headers[k] = v
# NOTE: It is possible to do more validation of the headers, but a request could always
# be made to the API manually with invalid headers, so we need to handle them server side.
return headers
def _prepare_request_raw(
self,
url,
supplied_headers,
method,
params,
files,
request_id: Optional[str],
) -> Tuple[str, Dict[str, str], Optional[bytes]]:
abs_url = "%s%s" % (self.base_url, url)
headers = self._validate_headers(supplied_headers)
data = None
if method == "get" or method == "delete":
if params:
encoded_params = urlencode([(k, v) for k, v in params.items() if v is not None])
abs_url = _build_api_url(abs_url, encoded_params)
elif method in {"post", "put"}:
if params and files:
data = params
if params and not files:
data = json.dumps(params).encode()
headers["Content-Type"] = "application/json"
else:
raise openai.APIConnectionError(
"Unrecognized HTTP method %r. This may indicate a bug in the "
"OpenAI bindings. Please contact us through our help center at help.openai.com for "
"assistance." % (method,)
)
headers = self.request_headers(method, headers, request_id)
log_debug("Request to OpenAI API", method=method, path=abs_url)
log_debug("Post details", data=data, api_version=self.api_version)
return abs_url, headers, data
def request_raw(
self,
method,
url,
*,
params=None,
supplied_headers: Optional[Dict[str, str]] = None,
files=None,
stream: bool = False,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
) -> requests.Response:
abs_url, headers, data = self._prepare_request_raw(url, supplied_headers, method, params, files, request_id)
if not hasattr(_thread_context, "session"):
_thread_context.session = _make_session()
_thread_context.session_create_time = time.time()
elif time.time() - getattr(_thread_context, "session_create_time", 0) >= MAX_SESSION_LIFETIME_SECS:
_thread_context.session.close()
_thread_context.session = _make_session()
_thread_context.session_create_time = time.time()
try:
result = _thread_context.session.request(
method,
abs_url,
headers=headers,
data=data,
files=files,
stream=stream,
timeout=request_timeout if request_timeout else TIMEOUT_SECS,
proxies=_thread_context.session.proxies,
)
except requests.exceptions.Timeout as e:
raise openai.APITimeoutError("Request timed out: {}".format(e)) from e
except requests.exceptions.RequestException as e:
raise openai.APIConnectionError("Error communicating with OpenAI: {}".format(e)) from e
log_debug(
"OpenAI API response",
path=abs_url,
response_code=result.status_code,
processing_ms=result.headers.get("OpenAI-Processing-Ms"),
request_id=result.headers.get("X-Request-Id"),
)
return result
async def arequest_raw(
self,
method,
url,
session,
*,
params=None,
supplied_headers: Optional[Dict[str, str]] = None,
files=None,
request_id: Optional[str] = None,
request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
) -> aiohttp.ClientResponse:
abs_url, headers, data = self._prepare_request_raw(url, supplied_headers, method, params, files, request_id)
if isinstance(request_timeout, tuple):
timeout = aiohttp.ClientTimeout(
connect=request_timeout[0],
total=request_timeout[1],
)
else:
timeout = aiohttp.ClientTimeout(total=request_timeout if request_timeout else TIMEOUT_SECS)
if files:
# TODO: Use `aiohttp.MultipartWriter` to create the multipart form data here.
# For now we use the private `requests` method that is known to have worked so far.
data, content_type = requests.models.RequestEncodingMixin._encode_files(files, data) # type: ignore
headers["Content-Type"] = content_type
request_kwargs = {
"method": method,
"url": abs_url,
"headers": headers,
"data": data,
"timeout": timeout,
}
try:
result = await session.request(**request_kwargs)
log_info(
"OpenAI API response",
path=abs_url,
response_code=result.status,
processing_ms=result.headers.get("OpenAI-Processing-Ms"),
request_id=result.headers.get("X-Request-Id"),
)
return result
except (aiohttp.ServerTimeoutError, asyncio.TimeoutError) as e:
raise openai.APITimeoutError("Request timed out") from e
except aiohttp.ClientError as e:
raise openai.APIConnectionError("Error communicating with OpenAI") from e
def _interpret_response(
self, result: requests.Response, stream: bool
) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool]:
"""Returns the response(s) and a bool indicating whether it is a stream."""
async def _interpret_async_response(
self, result: aiohttp.ClientResponse, stream: bool
) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool]:
"""Returns the response(s) and a bool indicating whether it is a stream."""
def _interpret_response_line(self, rbody: str, rcode: int, rheaders, stream: bool) -> OpenAIResponse:
...
@asynccontextmanager
async def aiohttp_session() -> AsyncIterator[aiohttp.ClientSession]:
async with aiohttp.ClientSession() as session:
yield session

View file

@ -6,16 +6,16 @@ import asyncio
from typing import AsyncGenerator, Tuple, Union
import aiohttp
from openai.api_requestor import APIRequestor
from metagpt.logs import logger
from metagpt.provider.general_api_base import APIRequestor
class GeneralAPIRequestor(APIRequestor):
"""
usage
# full_url = "{api_base}{url}"
requester = GeneralAPIRequestor(api_base=api_base)
# full_url = "{base_url}{url}"
requester = GeneralAPIRequestor(base_url=base_url)
result, _, api_key = await requester.arequest(
method=method,
url=url,

View file

@ -5,11 +5,21 @@
@File : openai.py
"""
import asyncio
import json
import time
from typing import NamedTuple, Union
import openai
from openai.error import APIConnectionError
from openai import (
APIConnectionError,
AsyncAzureOpenAI,
AsyncOpenAI,
AsyncStream,
AzureOpenAI,
OpenAI,
)
from openai._base_client import AsyncHttpxClientWrapper, SyncHttpxClientWrapper
from openai.types import CompletionUsage
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from tenacity import (
after_log,
retry,
@ -18,7 +28,7 @@ from tenacity import (
wait_random_exponential,
)
from metagpt.config import CONFIG, LLMProviderEnum
from metagpt.config import CONFIG, Config, LLMProviderEnum
from metagpt.logs import logger
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.provider.constant import GENERAL_FUNCTION_SCHEMA, GENERAL_TOOL_CHOICE
@ -145,26 +155,61 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
"""
def __init__(self):
self.__init_openai(CONFIG)
self.llm = openai
self.model = CONFIG.openai_api_model
self.config: Config = CONFIG
self.__init_openai()
self.auto_max_tokens = False
self._cost_manager = CostManager()
RateLimiter.__init__(self, rpm=self.rpm)
def __init_openai(self, config):
openai.api_key = config.openai_api_key
if config.openai_api_base:
openai.api_base = config.openai_api_base
if config.openai_api_type:
openai.api_type = config.openai_api_type
openai.api_version = config.openai_api_version
if config.openai_proxy:
openai.proxy = config.openai_proxy
self.rpm = int(config.get("RPM", 10))
def __init_openai(self):
self.is_azure = self.config.openai_api_type == "azure"
self.model = self.config.deployment_name if self.is_azure else self.config.openai_api_model
self.rpm = int(self.config.get("RPM", 10))
self._make_client()
def _make_client(self):
kwargs, async_kwargs = self._make_client_kwargs()
if self.is_azure:
self.client = AzureOpenAI(**kwargs)
self.async_client = AsyncAzureOpenAI(**async_kwargs)
else:
self.client = OpenAI(**kwargs)
self.async_client = AsyncOpenAI(**async_kwargs)
def _make_client_kwargs(self) -> (dict, dict):
if self.is_azure:
kwargs = dict(
api_key=self.config.openai_api_key,
api_version=self.config.openai_api_version,
azure_endpoint=self.config.openai_base_url,
)
else:
kwargs = dict(api_key=self.config.openai_api_key, base_url=self.config.openai_base_url)
async_kwargs = kwargs.copy()
# to use proxy, openai v1 needs http_client
proxy_params = self._get_proxy_params()
if proxy_params:
kwargs["http_client"] = SyncHttpxClientWrapper(**proxy_params)
async_kwargs["http_client"] = AsyncHttpxClientWrapper(**proxy_params)
return kwargs, async_kwargs
def _get_proxy_params(self) -> dict:
params = {}
if self.config.openai_proxy:
params = {"proxies": self.config.openai_proxy}
if self.config.openai_base_url:
params["base_url"] = self.config.openai_base_url
return params
async def _achat_completion_stream(self, messages: list[dict]) -> str:
response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True)
response: AsyncStream[ChatCompletionChunk] = await self.async_client.chat.completions.create(
**self._cons_kwargs(messages), stream=True
)
# create variables to collect the stream of chunks
collected_chunks = []
@ -172,15 +217,14 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
choices = chunk["choices"]
if len(choices) > 0:
chunk_message = chunk["choices"][0].get("delta", {}) # extract the message
if chunk.choices:
chunk_message = chunk.choices[0].delta # extract the message
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
print(chunk_message["content"], end="")
if chunk_message.content:
print(chunk_message.content, end="")
print()
full_reply_content = "".join([m.get("content", "") for m in collected_messages])
full_reply_content = "".join([m.content for m in collected_messages if m.content])
usage = self._calc_usage(messages, full_reply_content)
self._update_costs(usage)
return full_reply_content
@ -193,43 +237,27 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
"stop": None,
"temperature": 0.3,
"timeout": 3,
"model": self.model,
}
if configs:
kwargs.update(configs)
if CONFIG.openai_api_type == "azure":
if CONFIG.deployment_name and CONFIG.deployment_id:
raise ValueError("You can only use one of the `deployment_id` or `deployment_name` model")
elif not CONFIG.deployment_name and not CONFIG.deployment_id:
raise ValueError("You must specify `DEPLOYMENT_NAME` or `DEPLOYMENT_ID` parameter")
kwargs_mode = (
{"engine": CONFIG.deployment_name}
if CONFIG.deployment_name
else {"deployment_id": CONFIG.deployment_id}
)
else:
kwargs_mode = {"model": self.model}
kwargs.update(kwargs_mode)
return kwargs
async def _achat_completion(self, messages: list[dict]) -> dict:
rsp = await self.llm.ChatCompletion.acreate(**self._cons_kwargs(messages))
self._update_costs(rsp.get("usage"))
async def _achat_completion(self, messages: list[dict]) -> ChatCompletion:
rsp: ChatCompletion = await self.async_client.chat.completions.create(**self._cons_kwargs(messages))
self._update_costs(rsp.usage)
return rsp
def _chat_completion(self, messages: list[dict]) -> dict:
rsp = self.llm.ChatCompletion.create(**self._cons_kwargs(messages))
self._update_costs(rsp)
def _chat_completion(self, messages: list[dict]) -> ChatCompletion:
rsp: ChatCompletion = self.client.chat.completions.create(**self._cons_kwargs(messages))
self._update_costs(rsp.usage)
return rsp
def completion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
def completion(self, messages: list[dict]) -> ChatCompletion:
return self._chat_completion(messages)
async def acompletion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
async def acompletion(self, messages: list[dict]) -> ChatCompletion:
return await self._achat_completion(messages)
@retry(
@ -259,14 +287,16 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return self._cons_kwargs(messages, **kwargs)
def _chat_completion_function(self, messages: list[dict], **kwargs) -> dict:
rsp = self.llm.ChatCompletion.create(**self._func_configs(messages, **kwargs))
self._update_costs(rsp.get("usage"))
def _chat_completion_function(self, messages: list[dict], **kwargs) -> ChatCompletion:
rsp: ChatCompletion = self.client.chat.completions.create(**self._func_configs(messages, **kwargs))
self._update_costs(rsp.usage)
return rsp
async def _achat_completion_function(self, messages: list[dict], **chat_configs) -> dict:
rsp = await self.llm.ChatCompletion.acreate(**self._func_configs(messages, **chat_configs))
self._update_costs(rsp.get("usage"))
async def _achat_completion_function(self, messages: list[dict], **chat_configs) -> ChatCompletion:
rsp: ChatCompletion = await self.async_client.chat.completions.create(
**self._func_configs(messages, **chat_configs)
)
self._update_costs(rsp.usage)
return rsp
def _process_message(self, messages: Union[str, Message, list[dict], list[Message], list[str]]) -> list[dict]:
@ -321,22 +351,35 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
rsp = await self._achat_completion_function(messages, **kwargs)
return self.get_choice_function_arguments(rsp)
def _calc_usage(self, messages: list[dict], rsp: str) -> dict:
usage = {}
if CONFIG.calc_usage:
try:
prompt_tokens = count_message_tokens(messages, self.model)
completion_tokens = count_string_tokens(rsp, self.model)
usage["prompt_tokens"] = prompt_tokens
usage["completion_tokens"] = completion_tokens
return usage
except Exception as e:
logger.error(f"{self.model} usage calculation failed!", e)
return {}
else:
def get_choice_function_arguments(self, rsp: ChatCompletion) -> dict:
"""Required to provide the first function arguments of choice.
:return dict: return the first function arguments of choice, for example,
{'language': 'python', 'code': "print('Hello, World!')"}
"""
try:
return json.loads(rsp.choices[0].message.tool_calls[0].function.arguments)
except json.JSONDecodeError:
return {}
def get_choice_text(self, rsp: ChatCompletion) -> str:
"""Required to provide the first text of choice"""
return rsp.choices[0].message.content if rsp.choices else ""
def _calc_usage(self, messages: list[dict], rsp: str) -> CompletionUsage:
usage = CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0)
if not CONFIG.calc_usage:
return usage
async def acompletion_batch(self, batch: list[list[dict]]) -> list[dict]:
try:
usage.prompt_tokens = count_message_tokens(messages, self.model)
usage.completion_tokens = count_string_tokens(rsp, self.model)
except Exception as e:
logger.error(f"usage calculation failed!: {e}")
return usage
async def acompletion_batch(self, batch: list[list[dict]]) -> list[ChatCompletion]:
"""Return full JSON"""
split_batches = self.split_batches(batch)
all_results = []
@ -362,12 +405,10 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
logger.info(f"Result of task {idx}: {result}")
return results
def _update_costs(self, usage: dict):
def _update_costs(self, usage: CompletionUsage):
if CONFIG.calc_usage and usage:
try:
prompt_tokens = int(usage["prompt_tokens"])
completion_tokens = int(usage["completion_tokens"])
self._cost_manager.update_cost(prompt_tokens, completion_tokens, self.model)
self._cost_manager.update_cost(usage.prompt_tokens, usage.completion_tokens, self.model)
except Exception as e:
logger.error("updating costs failed!", e)
@ -390,7 +431,7 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
logger.error(f"moderating failed:{e}")
def _moderation(self, content: Union[str, list[str]]):
rsp = self.llm.Moderation.create(input=content)
rsp = self.client.moderations.create(input=content)
return rsp
async def amoderation(self, content: Union[str, list[str]]):
@ -404,5 +445,5 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
logger.error(f"moderating failed:{e}")
async def _amoderation(self, content: Union[str, list[str]]):
rsp = await self.llm.Moderation.acreate(input=content)
rsp = await self.async_client.moderations.create(input=content)
return rsp

View file

@ -41,8 +41,8 @@ class ZhiPuModelAPI(ModelAPI):
# TODO to make the async request to be more generic for models in http mode.
assert method in ["post", "get"]
api_base, url = cls.split_zhipu_api_url(invoke_type, kwargs)
requester = GeneralAPIRequestor(api_base=api_base)
base_url, url = cls.split_zhipu_api_url(invoke_type, kwargs)
requester = GeneralAPIRequestor(base_url=base_url)
result, _, api_key = await requester.arequest(
method=method,
url=url,

View file

@ -70,7 +70,7 @@ class Researcher(Role):
return ret
def research_system_text(self, topic, current_task: Action) -> str:
""" BACKWARD compatible
"""BACKWARD compatible
This allows sub-class able to define its own system prompt based on topic.
return the previous implementation to have backward compatible
Args:

View file

@ -46,7 +46,6 @@ class OpenCodeInterpreter(object):
interpreter.auto_run = auto_run
interpreter.model = CONFIG.openai_api_model or "gpt-3.5-turbo"
interpreter.api_key = CONFIG.openai_api_key
# interpreter.api_base = CONFIG.openai_api_base
self.interpreter = interpreter
def chat(self, query: str, reset: bool = True):

View file

@ -106,8 +106,8 @@ def _gen_get_driver_func(browser_type, *args, executable_path=None):
options.add_argument("--headless")
options.add_argument("--enable-javascript")
if browser_type == "chrome":
options.add_argument("--disable-gpu") # This flag can help avoid renderer issue
options.add_argument("--disable-dev-shm-usage") # Overcome limited resource problems
options.add_argument("--disable-gpu") # This flag can help avoid renderer issue
options.add_argument("--disable-dev-shm-usage") # Overcome limited resource problems
options.add_argument("--no-sandbox")
for i in args:
options.add_argument(i)

View file

@ -21,14 +21,12 @@ def make_sk_kernel():
if CONFIG.openai_api_type == "azure":
kernel.add_chat_service(
"chat_completion",
AzureChatCompletion(CONFIG.deployment_name, CONFIG.openai_api_base, CONFIG.openai_api_key),
AzureChatCompletion(CONFIG.deployment_name, CONFIG.openai_base_url, CONFIG.openai_api_key),
)
else:
kernel.add_chat_service(
"chat_completion",
OpenAIChatCompletion(
CONFIG.openai_api_model, CONFIG.openai_api_key, org_id=None, endpoint=CONFIG.openai_api_base
),
OpenAIChatCompletion(CONFIG.openai_api_model, CONFIG.openai_api_key),
)
return kernel

View file

@ -16,6 +16,8 @@ TOKEN_COSTS = {
"gpt-3.5-turbo-0613": {"prompt": 0.0015, "completion": 0.002},
"gpt-3.5-turbo-16k": {"prompt": 0.003, "completion": 0.004},
"gpt-3.5-turbo-16k-0613": {"prompt": 0.003, "completion": 0.004},
"gpt-35-turbo": {"prompt": 0.0015, "completion": 0.002},
"gpt-35-turbo-16k": {"prompt": 0.003, "completion": 0.004},
"gpt-3.5-turbo-1106": {"prompt": 0.001, "completion": 0.002},
"gpt-4-0314": {"prompt": 0.03, "completion": 0.06},
"gpt-4": {"prompt": 0.03, "completion": 0.06},
@ -34,6 +36,8 @@ TOKEN_MAX = {
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-35-turbo": 4096,
"gpt-35-turbo-16k": 16384,
"gpt-3.5-turbo-1106": 16384,
"gpt-4-0314": 8192,
"gpt-4": 8192,
@ -56,6 +60,8 @@ def count_message_tokens(messages, model="gpt-3.5-turbo-0613"):
if model in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-35-turbo",
"gpt-35-turbo-16k",
"gpt-3.5-turbo-16k",
"gpt-3.5-turbo-1106",
"gpt-4-0314",

View file

@ -15,7 +15,7 @@ langchain==0.0.231
loguru==0.6.0
meilisearch==0.21.0
numpy==1.24.3
openai>=0.28.1
openai==1.6.0
openpyxl
beautifulsoup4==4.12.2
pandas==2.0.3
@ -28,7 +28,7 @@ PyYAML==6.0.1
# sentence_transformers==2.2.2
setuptools==65.6.3
tenacity==8.2.2
tiktoken==0.4.0
tiktoken==0.5.2
tqdm==4.64.0
#unstructured[local-inference]
# playwright
@ -42,7 +42,7 @@ qdrant-client==1.4.0
pytest-mock==3.11.1
open-interpreter==0.1.7; python_version>"3.9"
ta==0.10.2
semantic-kernel==0.3.13.dev0
semantic-kernel==0.4.0.dev0
wrapt==1.15.0
websocket-client==0.58.0
aiofiles==23.2.1

View file

@ -1,3 +1,5 @@
from unittest.mock import Mock
import pytest
from metagpt.provider.openai_api import OpenAIGPTAPI
@ -78,3 +80,70 @@ def test_ask_code_list_str():
assert "language" in rsp
assert "code" in rsp
assert len(rsp["code"]) > 0
class TestOpenAI:
@pytest.fixture
def config(self):
return Mock(openai_api_key="test_key", openai_base_url="test_url", openai_proxy=None, openai_api_type="other")
@pytest.fixture
def config_azure(self):
return Mock(
openai_api_key="test_key",
openai_api_version="test_version",
openai_base_url="test_url",
openai_proxy=None,
openai_api_type="azure",
)
@pytest.fixture
def config_proxy(self):
return Mock(
openai_api_key="test_key",
openai_base_url="test_url",
openai_proxy="http://proxy.com",
openai_api_type="other",
)
@pytest.fixture
def config_azure_proxy(self):
return Mock(
openai_api_key="test_key",
openai_api_version="test_version",
openai_base_url="test_url",
openai_proxy="http://proxy.com",
openai_api_type="azure",
)
def test_make_client_kwargs_without_proxy(self, config):
instance = OpenAIGPTAPI()
instance.config = config
kwargs, async_kwargs = instance._make_client_kwargs()
assert kwargs == {"api_key": "test_key", "base_url": "test_url"}
assert async_kwargs == {"api_key": "test_key", "base_url": "test_url"}
assert "http_client" not in kwargs
assert "http_client" not in async_kwargs
def test_make_client_kwargs_without_proxy_azure(self, config_azure):
instance = OpenAIGPTAPI()
instance.config = config_azure
kwargs, async_kwargs = instance._make_client_kwargs()
assert kwargs == {"api_key": "test_key", "api_version": "test_version", "azure_endpoint": "test_url"}
assert async_kwargs == {"api_key": "test_key", "api_version": "test_version", "azure_endpoint": "test_url"}
assert "http_client" not in kwargs
assert "http_client" not in async_kwargs
def test_make_client_kwargs_with_proxy(self, config_proxy):
instance = OpenAIGPTAPI()
instance.config = config_proxy
kwargs, async_kwargs = instance._make_client_kwargs()
assert "http_client" in kwargs
assert "http_client" in async_kwargs
def test_make_client_kwargs_with_proxy_azure(self, config_azure_proxy):
instance = OpenAIGPTAPI()
instance.config = config_azure_proxy
kwargs, async_kwargs = instance._make_client_kwargs()
assert "http_client" in kwargs
assert "http_client" in async_kwargs