mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-04-26 01:06:27 +02:00
143 lines
3.7 KiB
Python
143 lines
3.7 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
@Time : 2023/6/1 12:41
|
|
@Author : alexanderwu
|
|
@File : logs.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import inspect
|
|
import sys
|
|
from contextvars import ContextVar
|
|
from datetime import datetime
|
|
from functools import partial
|
|
from typing import Any
|
|
|
|
from loguru import logger as _logger
|
|
from pydantic import BaseModel, Field
|
|
|
|
from metagpt.const import METAGPT_ROOT
|
|
|
|
LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream")
|
|
|
|
|
|
class ToolLogItem(BaseModel):
|
|
type_: str = Field(alias="type", default="str", description="Data type of `value` field.")
|
|
name: str
|
|
value: Any
|
|
|
|
|
|
TOOL_LOG_END_MARKER = ToolLogItem(
|
|
type="str", name="end_marker", value="\x18\x19\x1B\x18"
|
|
) # A special log item to suggest the end of a stream log
|
|
|
|
|
|
def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
|
|
"""Adjust the log level to above level"""
|
|
current_date = datetime.now()
|
|
formatted_date = current_date.strftime("%Y%m%d")
|
|
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
|
|
|
|
_logger.remove()
|
|
_logger.add(sys.stderr, level=print_level)
|
|
_logger.add(METAGPT_ROOT / f"logs/{log_name}.txt", level=logfile_level)
|
|
return _logger
|
|
|
|
|
|
logger = define_log_level()
|
|
|
|
|
|
def log_llm_stream(msg):
|
|
"""
|
|
Logs a message to the LLM stream.
|
|
|
|
Args:
|
|
msg: The message to be logged.
|
|
|
|
Notes:
|
|
If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called),
|
|
the message will not be added to the LLM stream queue.
|
|
"""
|
|
|
|
queue = get_llm_stream_queue()
|
|
if queue:
|
|
queue.put_nowait(msg)
|
|
_llm_stream_log(msg)
|
|
|
|
|
|
def log_tool_output(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""):
|
|
"""interface for logging tool output, can be set to log tool output in different ways to different places with set_tool_output_logfunc"""
|
|
_tool_output_log(output=output, tool_name=tool_name)
|
|
|
|
|
|
async def log_tool_output_async(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""):
|
|
"""async interface for logging tool output, used when output contains async object"""
|
|
await _tool_output_log_async(output=output, tool_name=tool_name)
|
|
|
|
|
|
async def get_human_input(prompt: str = ""):
|
|
"""interface for getting human input, can be set to get input from different sources with set_human_input_func"""
|
|
if inspect.iscoroutinefunction(_get_human_input):
|
|
return await _get_human_input(prompt)
|
|
else:
|
|
return _get_human_input(prompt)
|
|
|
|
|
|
def set_llm_stream_logfunc(func):
|
|
global _llm_stream_log
|
|
_llm_stream_log = func
|
|
|
|
|
|
def set_tool_output_logfunc(func):
|
|
global _tool_output_log
|
|
_tool_output_log = func
|
|
|
|
|
|
async def set_tool_output_logfunc_async(func):
|
|
# async version
|
|
global _tool_output_log_async
|
|
_tool_output_log_async = func
|
|
|
|
|
|
def set_human_input_func(func):
|
|
global _get_human_input
|
|
_get_human_input = func
|
|
|
|
|
|
_llm_stream_log = partial(print, end="")
|
|
|
|
|
|
_tool_output_log = (
|
|
lambda *args, **kwargs: None
|
|
) # a dummy function to avoid errors if set_tool_output_logfunc is not called
|
|
|
|
|
|
async def _tool_output_log_async(*args, **kwargs):
|
|
# async version
|
|
pass
|
|
|
|
|
|
def create_llm_stream_queue():
|
|
"""Creates a new LLM stream queue and sets it in the context variable.
|
|
|
|
Returns:
|
|
The newly created asyncio.Queue instance.
|
|
"""
|
|
queue = asyncio.Queue()
|
|
LLM_STREAM_QUEUE.set(queue)
|
|
return queue
|
|
|
|
|
|
def get_llm_stream_queue():
|
|
"""Retrieves the current LLM stream queue from the context variable.
|
|
|
|
Returns:
|
|
The asyncio.Queue instance if set, otherwise None.
|
|
"""
|
|
return LLM_STREAM_QUEUE.get(None)
|
|
|
|
|
|
_get_human_input = input # get human input from console by default
|