Merge branch 'mgx_ops' into feat-exp-pool

This commit is contained in:
seehi 2024-06-25 16:55:58 +08:00
commit 5a5e75b76f
26 changed files with 618 additions and 308 deletions

View file

@ -275,7 +275,7 @@ class CodeParser:
def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str:
if block:
text = cls.parse_block(block, text)
pattern = rf"```{lang}.*?\s+(.*?)```"
pattern = rf"```{lang}.*?\s+(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
code = match.group(1)

View file

@ -51,7 +51,6 @@ class ResourceReporter(BaseModel):
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
@ -153,17 +152,14 @@ class ResourceReporter(BaseModel):
def __enter__(self):
"""Enter the synchronous streaming callback context."""
self.is_chunk = True
return self
def __exit__(self, *args, **kwargs):
"""Exit the synchronous streaming callback context."""
self.report(None, END_MARKER_NAME)
self.is_chunk = False
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
@ -171,15 +167,18 @@ class ResourceReporter(BaseModel):
async def __aexit__(self, *args, **kwargs):
"""Exit the asynchronous streaming callback context."""
self.is_chunk = False
if self.enable_llm_stream:
self._llm_task.cancel()
await get_llm_stream_queue().put(None)
await self._llm_task
self._llm_task = None
await self.async_report(None, END_MARKER_NAME)
async def _llm_stream_report(self, queue: asyncio.Queue):
while self.is_chunk:
await self.async_report(await queue.get(), "content")
while True:
data = await queue.get()
if data is None:
return
await self.async_report(data, "content")
async def wait_llm_stream_report(self):
"""Wait for the LLM stream report to complete."""