mg集成omniparse

This commit is contained in:
liuminhui 2024-07-18 20:40:20 +08:00
parent 39eb534ca0
commit 22b9990ccf
14 changed files with 381 additions and 14 deletions

View file

@ -5,4 +5,8 @@ llm:
api_type: "openai" # or azure / ollama / groq etc.
model: "gpt-4-turbo" # or gpt-3.5-turbo
base_url: "https://api.openai.com/v1" # or forward url / other llm url
api_key: "YOUR_API_KEY"
api_key: "xxxx"
omniparse:
api_key: "your_api_key"
base_url: "http://192.168.50.126:8000"

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,45 @@
import asyncio
from llama_parse import ResultType
from metagpt.config2 import config
from metagpt.logs import logger
from metagpt.rag.parser.omniparse.client import OmniParseClient
from metagpt.rag.parser.omniparse.parse import OmniParse
from metagpt.rag.schema import OmniParseOptions, OmniParseType
async def omniparse_client_example():
client = OmniParseClient(base_url=config.omniparse.base_url)
with open("../data/rag/test01.docx", "rb") as f:
filelike = f.read()
parse_document_ret = await client.parse_document(filelike=filelike, bytes_filename="test_01.docx")
logger.info(parse_document_ret)
parse_pdf_ret = await client.parse_pdf(filelike="../data/rag/test02.pdf")
logger.info(parse_pdf_ret)
async def omniparse_example():
parser = OmniParse(
api_key=config.omniparse.api_key,
base_url=config.omniparse.base_url,
parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ResultType.MD)
)
ret = await parser.aload_data(file_path="../data/rag/test02.pdf")
logger.info(ret)
file_paths = ["../data/rag/test01.docx", "../data/rag/test02.pdf"]
parser.parse_type = OmniParseType.DOCUMENT
ret = await parser.aload_data(file_path=file_paths)
logger.info(ret)
async def main():
await omniparse_client_example()
await omniparse_example()
if __name__ == '__main__':
asyncio.run(main())

View file

@ -2,7 +2,7 @@
import asyncio
from examples.rag_pipeline import DOC_PATH, QUESTION
from examples.rag.rag_pipeline import DOC_PATH, QUESTION
from metagpt.logs import logger
from metagpt.rag.engines import SimpleEngine
from metagpt.roles import Sales

View file

@ -12,7 +12,7 @@ from typing import Dict, Iterable, List, Literal, Optional
from pydantic import BaseModel, model_validator
from metagpt.configs.browser_config import BrowserConfig
from metagpt.configs.embedding_config import EmbeddingConfig
from metagpt.configs.embedding_config import EmbeddingConfig, OmniParseConfig
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.configs.mermaid_config import MermaidConfig
from metagpt.configs.redis_config import RedisConfig
@ -51,6 +51,9 @@ class Config(CLIParams, YamlModel):
# RAG Embedding
embedding: EmbeddingConfig = EmbeddingConfig()
# omniparse
omniparse: OmniParseConfig = OmniParseConfig()
# Global Proxy. Will be used if llm.proxy is not set
proxy: str = ""

View file

@ -52,3 +52,8 @@ class EmbeddingConfig(YamlModel):
if v == "":
return None
return v
class OmniParseConfig(YamlModel):
api_key: str = ""
base_url: str = ""

View file

@ -27,7 +27,9 @@ from llama_index.core.schema import (
QueryType,
TransformComponent,
)
from llama_parse import ResultType
from metagpt.config2 import config
from metagpt.rag.factories import (
get_index,
get_rag_embedding,
@ -36,6 +38,7 @@ from metagpt.rag.factories import (
get_retriever,
)
from metagpt.rag.interface import NoEmbedding, RAGObject
from metagpt.rag.parser.omniparse.parse import OmniParse
from metagpt.rag.retrievers.base import ModifiableRAGRetriever, PersistableRAGRetriever
from metagpt.rag.retrievers.hybrid_retriever import SimpleHybridRetriever
from metagpt.rag.schema import (
@ -43,7 +46,7 @@ from metagpt.rag.schema import (
BaseRankerConfig,
BaseRetrieverConfig,
BM25RetrieverConfig,
ObjectNode,
ObjectNode, OmniParseOptions, OmniParseType,
)
from metagpt.utils.common import import_class
@ -73,6 +76,18 @@ class SimpleEngine(RetrieverQueryEngine):
)
self._transformations = transformations or self._default_transformations()
@classmethod
def get_file_extractor(cls, file_type: str):
if not config.omniparse.base_url:
return
parser = OmniParse(
api_key=config.omniparse.api_key,
base_url=config.omniparse.base_url,
parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ResultType.MD)
)
file_extractor = {file_type: parser}
return file_extractor
@classmethod
def from_docs(
cls,
@ -100,7 +115,10 @@ class SimpleEngine(RetrieverQueryEngine):
if not input_dir and not input_files:
raise ValueError("Must provide either `input_dir` or `input_files`.")
documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files).load_data()
file_extractor = cls.get_file_extractor(file_type=".pdf")
documents = SimpleDirectoryReader(
input_dir=input_dir, input_files=input_files, file_extractor=file_extractor
).load_data()
cls._fix_document_metadata(documents)
transformations = transformations or cls._default_transformations()

View file

View file

@ -0,0 +1,2 @@
from .client import OmniParseClient
from .parse import OmniParse

View file

@ -0,0 +1,154 @@
import mimetypes
import os
import aiofiles
import httpx
from typing import Union
from metagpt.rag.schema import OmniParsedResult
class OmniParseClient:
"""
OmniParse Server Client
OmniParse API Docs: https://docs.cognitivelab.in/api
"""
ALLOWED_DOCUMENT_EXTENSIONS = {".pdf", ".ppt", ".pptx", ".doc", ".docx"}
ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".aac"}
ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov"}
def __init__(self, api_key=None, base_url="http://localhost:8000", max_timeout=120):
"""
Args:
api_key: 默认 None 后续可用于鉴权
base_url: api 基础url
max_timeout: 请求最大超时时间单位s
"""
self.api_key = api_key
self.base_url = base_url
self.max_timeout = max_timeout
self.parse_media_endpoint = "/parse_media"
self.parse_website_endpoint = "/parse_website"
self.parse_document_endpoint = "/parse_document"
async def __request_parse(self, endpoint: str, files: dict = None, json: dict = None) -> dict:
"""
请求api解析文档
Args:
endpoint (str): API endpoint.
files (dict, optional): 请求文件数据.
json (dict, optional): 请求json数据.
Returns:
dict: 响应的json数据
"""
url = f"{self.base_url}{endpoint}"
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
async with httpx.AsyncClient() as client:
response = await client.post(url, files=files, json=json, headers=headers, timeout=self.max_timeout)
response.raise_for_status()
return response.json()
@staticmethod
def verify_file_ext(filelike: Union[str, bytes], allowed_file_extensions: set):
"""校验文件后缀"""
if not filelike or isinstance(filelike, bytes):
return
file_ext = os.path.splitext(filelike)[1].lower()
if file_ext not in allowed_file_extensions:
raise ValueError("File extension must be one of {}".format(allowed_file_extensions))
async def get_file_info(
self,
filelike: Union[str, bytes],
bytes_filename: str = None,
only_bytes=True,
) -> Union[bytes, tuple]:
"""
获取文件字节信息
Args:
filelike: 文件数据
bytes_filename: 通过字节数据上传需要指定文件名称方便获取mime_type
only_bytes: 是否只需要字节数据
Notes:
由于 parse_document 支持多种文件解析需要上传文件时指定文件的mime_type
Returns:
[bytes, tuple]
"""
if isinstance(filelike, str):
filename = os.path.basename(filelike)
async with aiofiles.open(filelike, 'rb') as file:
file_bytes = await file.read()
if only_bytes:
return file_bytes
mime_type = mimetypes.guess_type(filelike)[0]
return filename, file_bytes, mime_type
elif isinstance(filelike, bytes):
if only_bytes:
return filelike
if not bytes_filename:
raise ValueError("bytes_filename must be set when passing bytes")
mime_type = mimetypes.guess_type(bytes_filename)[0]
return bytes_filename, filelike, mime_type
else:
raise ValueError("filelike must be a string (file path) or bytes.")
async def parse_document(self, filelike: Union[str, bytes], bytes_filename: str = None) -> OmniParsedResult:
"""
解析文档类型数据支持 ".pdf", ".ppt", ".pptx", ".doc", ".docs"
Args:
filelike: 文件路径 or 文件字节数据
bytes_filename: 字节数据名称方便获取mime_type 用于httpx请求
Raises
ValueError
Returns:
OmniParsedResult
"""
self.verify_file_ext(filelike, self.ALLOWED_DOCUMENT_EXTENSIONS)
file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False)
resp = await self.__request_parse(self.parse_document_endpoint, files={'file': file_info})
data = OmniParsedResult(**resp)
return data
async def parse_pdf(self, filelike: Union[str, bytes]) -> OmniParsedResult:
"""
解析PDF文档
Args:
filelike: 文件路径 or 文件字节数据
Raises
ValueError
Returns:
OmniParsedResult
"""
self.verify_file_ext(filelike, {".pdf"})
file_info = await self.get_file_info(filelike)
endpoint = f"{self.parse_document_endpoint}/pdf"
resp = await self.__request_parse(endpoint=endpoint, files={'file': file_info})
data = OmniParsedResult(**resp)
return data
async def parse_video(self, filelike: Union[str, bytes], bytes_filename: str = None) -> dict:
"""解析视频"""
self.verify_file_ext(filelike, self.ALLOWED_VIDEO_EXTENSIONS)
file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False)
return await self.__request_parse(f"{self.parse_media_endpoint}/video", files={'file': file_info})
async def parse_audio(self, filelike: Union[str, bytes], bytes_filename: str = None) -> dict:
"""解析音频"""
self.verify_file_ext(filelike, self.ALLOWED_AUDIO_EXTENSIONS)
file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False)
return await self.__request_parse(f"{self.parse_media_endpoint}/audio", files={'file': file_info})
async def parse_website(self, url: str) -> dict:
"""解析网站"""
return await self.__request_parse(self.parse_website_endpoint, json={'url': url})

View file

@ -0,0 +1,97 @@
import asyncio
from fileinput import FileInput
from typing import List, Union, Optional
from llama_index.core import Document
from llama_index.core.async_utils import run_jobs
from llama_index.core.readers.base import BaseReader
from llama_parse import ResultType
from metagpt.rag.parser.omniparse.client import OmniParseClient
from metagpt.rag.schema import OmniParseOptions, OmniParseType
from metagpt.logs import logger
class OmniParse(BaseReader):
"""OmniParse"""
def __init__(
self,
api_key=None,
base_url="http://localhost:8000",
parse_options: OmniParseOptions = None
):
self.parse_options = parse_options or OmniParseOptions()
self.omniparse_client = OmniParseClient(api_key, base_url, max_timeout=self.parse_options.max_timeout)
@property
def parse_type(self):
return self.parse_options.parse_type
@property
def result_type(self):
return self.parse_options.result_type
@parse_type.setter
def parse_type(self, parse_type: Union[str, OmniParseType]):
if isinstance(parse_type, str):
parse_type = OmniParseType(parse_type)
self.parse_options.parse_type = parse_type
@result_type.setter
def result_type(self, result_type: Union[str, ResultType]):
if isinstance(result_type, str):
result_type = ResultType(result_type)
self.parse_options.result_type = result_type
async def _aload_data(
self,
file_path: Union[str, bytes],
extra_info: Optional[dict] = None,
) -> List[Document]:
try:
if self.parse_type == OmniParseType.PDF:
# 目前先只支持pdf解析
parsed_result = await self.omniparse_client.parse_pdf(file_path)
else:
extra_info = extra_info or {}
filename = extra_info.get("filename") # 兼容字节数据要额外传filename
parsed_result = await self.omniparse_client.parse_document(file_path, bytes_filename=filename)
# 获取指定的结构数据
content = getattr(parsed_result, self.result_type)
docs = [
Document(
text=content,
metadata=extra_info or {},
)
]
except Exception as e:
logger.error(f"OMNI Parse Error: {e}")
docs = []
return docs
async def aload_data(
self,
file_path: Union[List[FileInput], FileInput],
extra_info: Optional[dict] = None,
) -> List[Document]:
docs = []
if isinstance(file_path, (str, bytes)):
# 处理单个
docs = await self._aload_data(file_path, extra_info)
elif isinstance(file_path, list):
# 并发处理多个
parse_jobs = [self._aload_data(file_item, extra_info) for file_item in file_path]
doc_ret_list = await run_jobs(jobs=parse_jobs, workers=self.parse_options.num_workers)
docs = [doc for docs in doc_ret_list for doc in docs]
return docs
def load_data(
self,
file_path: Union[List[FileInput], FileInput],
extra_info: Optional[dict] = None,
) -> List[Document]:
"""Load data from the input path."""
return asyncio.run(self.aload_data(file_path, extra_info))

View file

@ -1,13 +1,14 @@
"""RAG schemas."""
from enum import Enum
from pathlib import Path
from typing import Any, ClassVar, Literal, Optional, Union
from typing import Any, ClassVar, Literal, Optional, Union, List
from chromadb.api.types import CollectionMetadata
# from chromadb.api.types import CollectionMetadata
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.indices.base import BaseIndex
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores.types import VectorStoreQueryMode
from llama_parse import ResultType
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
from metagpt.config2 import config
@ -67,9 +68,9 @@ class ChromaRetrieverConfig(IndexRetrieverConfig):
persist_path: Union[str, Path] = Field(default="./chroma_db", description="The directory to save data.")
collection_name: str = Field(default="metagpt", description="The name of the collection.")
metadata: Optional[CollectionMetadata] = Field(
default=None, description="Optional metadata to associate with the collection"
)
# metadata: Optional[CollectionMetadata] = Field(
# default=None, description="Optional metadata to associate with the collection"
# )
class ElasticsearchStoreConfig(BaseModel):
@ -165,9 +166,9 @@ class ChromaIndexConfig(VectorIndexConfig):
"""Config for chroma-based index."""
collection_name: str = Field(default="metagpt", description="The name of the collection.")
metadata: Optional[CollectionMetadata] = Field(
default=None, description="Optional metadata to associate with the collection"
)
# metadata: Optional[CollectionMetadata] = Field(
# default=None, description="Optional metadata to associate with the collection"
# )
class BM25IndexConfig(BaseIndexConfig):
@ -214,3 +215,41 @@ class ObjectNode(TextNode):
)
return metadata.model_dump()
class OmniParseType(str, Enum):
"""OmniParse解析类型"""
PDF = "PDF"
DOCUMENT = "DOCUMENT"
class OmniParseOptions(BaseModel):
"""OmniParse可选配置"""
result_type: ResultType = Field(default=ResultType.MD, description="OmniParse解析返回的结果类型")
parse_type: OmniParseType = Field(default=OmniParseType.DOCUMENT, description="OmniParse解析类型默认文档类型")
max_timeout: Optional[int] = Field(default=120, description="OmniParse服务请求最大超时")
num_workers: int = Field(
default=4,
gt=0,
lt=10,
description="多文件列表时并发请求数量",
)
class OminParseImage(BaseModel):
image: str = Field(default="", description="image str bytes")
image_name: str = Field(default="", description="image name")
image_info: Optional[dict] = Field(default={}, description="image info")
class OmniParsedResult(BaseModel):
markdown: str = Field(default="", description="markdown text")
text: str = Field(default="", description="plain text")
images: Optional[List[OminParseImage]] = Field(default=[], description="images")
metadata: Optional[dict] = Field(default={}, description="metadata")
@model_validator(mode="before")
def set_markdown(cls, values):
if not values.get("markdown"):
values["markdown"] = values.get("text")
return values