Revert "feat(core): Add stream data return and reception"

This reverts commit 7706b88f03.
This commit is contained in:
leiwu30 2024-03-28 09:38:44 +08:00
parent 29fecffa3f
commit 923150b2f3
13 changed files with 3 additions and 211 deletions

View file

@ -1,108 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/3/27 9:44
@Author : leiwu30
@File : flask_web_api.py
@Description : Stream log information and communicate over the network via web api.
"""
import os
import json
import socket
import asyncio
import threading
from metagpt.utils.stream_pipe import StreamPipe
from metagpt.roles.tutorial_assistant import TutorialAssistant
from metagpt.const import METAGPT_ROOT
from flask import Flask, Response
from flask import request, jsonify, send_from_directory
app = Flask(__name__)
def write_tutorial(message):
async def main(idea, stream_pipe):
role = TutorialAssistant(stream_pipe=stream_pipe)
await role.run(idea)
def thread_run(idea: str, stream_pipe: StreamPipe = None):
"""
Convert asynchronous function to thread function
"""
asyncio.run(main(idea, stream_pipe))
stream_pipe = StreamPipe()
thread = threading.Thread(target=thread_run, args=(message["content"], stream_pipe,))
thread.start()
while not stream_pipe.finish:
stream_pipe.wait()
msg = stream_pipe.get_message()
yield stream_pipe.msg2stream(msg)
# 文件位置
md_file = stream_pipe.get_k_message("file_name")
yield stream_pipe.msg2stream(
f"\n\n[{os.path.basename(md_file)}](http://{server_address}:{server_port}/download/{md_file})")
@app.route('/v1/chat/completions', methods=['POST'])
def completions():
"""
data: {
"model": "write_tutorial",
"stream": true,
"messages": [
{
"role": "user",
"content": "Write a tutorial about MySQL"
}
]
}
"""
data = json.loads(request.data)
print(json.dumps(data, indent=4, ensure_ascii=False))
# Non-streaming interfaces are not supported yet
stream_type = True if "stream" in data.keys() and data["stream"] else False
if not stream_type:
return jsonify({"status": 200})
# Only accept the last user information
last_message = data["messages"][-1]
model = data["model"]
# write_tutorial
if model == "write_tutorial":
return Response(write_tutorial(last_message), mimetype="text/plain")
else:
return jsonify({"status": 200})
# return Response(event_stream(), mimetype="text/plain")
@app.route('/download/<path:filename>')
def download_file(filename):
return send_from_directory(METAGPT_ROOT, filename, as_attachment=True)
if __name__ == "__main__":
"""
curl https://$server_address:$server_port/v1/chat/completions -X POST -d '{
"model": "gpt-3.5-turbo",
"stream": true,
"messages": [
{
"role": "user",
"content": "Write a tutorial about MySQL"
}
]
}'
"""
server_port = 7860
server_address = socket.gethostbyname(socket.gethostname())
app.run(port=server_port, host=server_address)

View file

@ -23,7 +23,6 @@ from metagpt.schema import (
TestingContext,
)
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.stream_pipe import StreamPipe
class Action(SerializationMixin, ContextMixin, BaseModel):
@ -36,7 +35,6 @@ class Action(SerializationMixin, ContextMixin, BaseModel):
prefix: str = "" # aask*时会加上prefix作为system_message
desc: str = "" # for skill manager
node: ActionNode = Field(default=None, exclude=True)
stream_pipe: Optional[StreamPipe] = None
@property
def repo(self) -> ProjectRepo:
@ -92,8 +90,6 @@ class Action(SerializationMixin, ContextMixin, BaseModel):
async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
"""Append default prefix"""
if self.stream_pipe and not self.llm.stream_pipe:
self.llm.stream_pipe = self.stream_pipe
return await self.llm.aask(prompt, system_msgs)
async def _run_action_node(self, *args, **kwargs):

View file

@ -62,9 +62,6 @@ class AnthropicLLM(BaseLLM):
elif event_type == "content_block_delta":
content = event.delta.text
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
collected_content.append(content)
elif event_type == "message_delta":
usage.output_tokens = event.usage.output_tokens # update final output_tokens

View file

@ -28,7 +28,6 @@ from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import log_and_reraise
from metagpt.utils.cost_manager import CostManager, Costs
from metagpt.utils.stream_pipe import StreamPipe
class BaseLLM(ABC):
@ -43,7 +42,6 @@ class BaseLLM(ABC):
cost_manager: Optional[CostManager] = None
model: Optional[str] = None # deprecated
pricing_plan: Optional[str] = None
stream_pipe: Optional[StreamPipe] = None
@abstractmethod
def __init__(self, config: LLMConfig):

View file

@ -221,8 +221,6 @@ class DashScopeLLM(BaseLLM):
content = chunk.output.choices[0]["message"]["content"]
usage = dict(chunk.usage) # each chunk has usage
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
collected_content.append(content)
log_llm_stream("\n")
self._update_costs(usage)

View file

@ -149,8 +149,6 @@ class GeminiLLM(BaseLLM):
logger.warning(f"messages: {messages}\nerrors: {e}\n{BlockedPromptException(str(chunk))}")
raise BlockedPromptException(str(chunk))
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
collected_content.append(content)
log_llm_stream("\n")

View file

@ -83,8 +83,6 @@ class OllamaLLM(BaseLLM):
content = self.get_choice_text(chunk)
collected_content.append(content)
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
else:
# stream finished
usage = self.get_usage(chunk)

View file

@ -87,9 +87,6 @@ class OpenAILLM(BaseLLM):
chunk.choices[0].finish_reason if chunk.choices and hasattr(chunk.choices[0], "finish_reason") else None
)
log_llm_stream(chunk_message)
if self.stream_pipe:
self.stream_pipe.set_message(chunk_message)
collected_messages.append(chunk_message)
if finish_reason:
if hasattr(chunk, "usage"):

View file

@ -124,8 +124,6 @@ class QianFanLLM(BaseLLM):
content = chunk.body.get("result", "")
usage = chunk.body.get("usage", {})
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
collected_content.append(content)
log_llm_stream("\n")

View file

@ -73,8 +73,6 @@ class ZhiPuAILLM(BaseLLM):
content = self.get_choice_delta_text(chunk)
collected_content.append(content)
log_llm_stream(content)
if self.stream_pipe:
self.stream_pipe.set_message(content)
log_llm_stream("\n")

View file

@ -39,8 +39,6 @@ from metagpt.strategy.planner import Planner
from metagpt.utils.common import any_to_name, any_to_str, role_raise_decorator
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.repair_llm_raw_output import extract_state_value_from_output
from metagpt.utils.stream_pipe import StreamPipe
if TYPE_CHECKING:
from metagpt.environment import Environment # noqa: F401
@ -141,8 +139,6 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
role_id: str = ""
states: list[str] = []
stream_pipe: Optional[StreamPipe] = None
# scenarios to set action system_prompt:
# 1. `__init__` while using Role(actions=[...])
# 2. add action to role while using `role.set_action(action)`

View file

@ -10,7 +10,7 @@ from datetime import datetime
from typing import Dict
from metagpt.actions.write_tutorial import WriteContent, WriteDirectory
from metagpt.const import TUTORIAL_PATH, METAGPT_ROOT
from metagpt.const import TUTORIAL_PATH
from metagpt.logs import logger
from metagpt.roles.role import Role, RoleReactMode
from metagpt.schema import Message
@ -40,7 +40,7 @@ class TutorialAssistant(Role):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_actions([WriteDirectory(language=self.language, stream_pipe=self.stream_pipe)])
self.set_actions([WriteDirectory(language=self.language)])
self._set_react_mode(react_mode=RoleReactMode.BY_ORDER.value)
async def _handle_directory(self, titles: Dict) -> Message:
@ -58,7 +58,7 @@ class TutorialAssistant(Role):
self.total_content += f"# {self.main_title}"
actions = list()
for first_dir in titles.get("directory"):
actions.append(WriteContent(language=self.language, directory=first_dir, stream_pipe=self.stream_pipe))
actions.append(WriteContent(language=self.language, directory=first_dir))
key = list(first_dir.keys())[0]
directory += f"- {key}\n"
for second_dir in first_dir[key]:
@ -91,8 +91,4 @@ class TutorialAssistant(Role):
root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
await File.write(root_path, f"{self.main_title}.md", self.total_content.encode("utf-8"))
msg.content = str(root_path / f"{self.main_title}.md")
if self.stream_pipe:
self.stream_pipe.set_k_message("file_name", msg.content.replace(str(METAGPT_ROOT), ""))
self.stream_pipe.with_finish()
return msg

View file

@ -1,70 +0,0 @@
# -*- coding: utf-8 -*-
# @Time : 2024/3/27 10:00
# @Author : leiwu30
# @File : stream_pipe.py
# @Version : None
# @Description : None
import time
import json
from multiprocessing import Pipe
class StreamPipe:
parent_conn, child_conn = Pipe()
variable: list = {}
finish: bool = False
format_data = {
"id": "chatcmpl-96bVnBOOyPFZZxEoTIGbdpFcVEnur",
"object": "chat.completion.chunk",
"created": 1711361191,
"model": "gpt-3.5-turbo-0125",
"system_fingerprint": "fp_3bc1b5746c",
"choices": [
{
"index": 0,
"delta":
{
"role": "assistant",
"content": "content"
},
"logprobs": None,
"finish_reason": None
}
]
}
def set_message(self, msg):
self.parent_conn.send(msg)
def wait(self):
pass
def get_message(self):
if self.child_conn.poll(timeout=3):
return self.child_conn.recv()
else:
return None
def set_k_message(self, k, msg):
self.variable[k] = msg
def get_k_message(self, k):
return self.variable[k]
def msg2stream(self, msg):
self.format_data['created'] = int(time.time())
self.format_data['choices'][0]['delta']['content'] = msg
return f"data: {json.dumps(self.format_data, ensure_ascii=False)}\n".encode("utf-8")
def with_finish(self, timeout: int = 3):
"""
Args:
timeout: /s
"""
# Pipe is not empty waiting for pipe condition
# while self.child_conn.poll(timeout=timeout):
# time.sleep(0.5)
self.finish = True