diff --git a/config/config2.yaml b/config/config2.yaml index b3f24539c..7ae8525f5 100644 --- a/config/config2.yaml +++ b/config/config2.yaml @@ -5,4 +5,8 @@ llm: api_type: "openai" # or azure / ollama / groq etc. model: "gpt-4-turbo" # or gpt-3.5-turbo base_url: "https://api.openai.com/v1" # or forward url / other llm url - api_key: "YOUR_API_KEY" \ No newline at end of file + api_key: "xxxx" + +omniparse: + api_key: "your_api_key" + base_url: "http://192.168.50.126:8000" \ No newline at end of file diff --git a/examples/data/rag/test01.docx b/examples/data/rag/test01.docx new file mode 100644 index 000000000..7b6251799 Binary files /dev/null and b/examples/data/rag/test01.docx differ diff --git a/examples/data/rag/test02.pdf b/examples/data/rag/test02.pdf new file mode 100644 index 000000000..27a895c55 Binary files /dev/null and b/examples/data/rag/test02.pdf differ diff --git a/examples/rag/omniparse_client.py b/examples/rag/omniparse_client.py new file mode 100644 index 000000000..61622ee36 --- /dev/null +++ b/examples/rag/omniparse_client.py @@ -0,0 +1,45 @@ +import asyncio + +from llama_parse import ResultType + +from metagpt.config2 import config +from metagpt.logs import logger +from metagpt.rag.parser.omniparse.client import OmniParseClient +from metagpt.rag.parser.omniparse.parse import OmniParse +from metagpt.rag.schema import OmniParseOptions, OmniParseType + + +async def omniparse_client_example(): + client = OmniParseClient(base_url=config.omniparse.base_url) + + with open("../data/rag/test01.docx", "rb") as f: + filelike = f.read() + parse_document_ret = await client.parse_document(filelike=filelike, bytes_filename="test_01.docx") + logger.info(parse_document_ret) + + parse_pdf_ret = await client.parse_pdf(filelike="../data/rag/test02.pdf") + logger.info(parse_pdf_ret) + + +async def omniparse_example(): + parser = OmniParse( + api_key=config.omniparse.api_key, + base_url=config.omniparse.base_url, + parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ResultType.MD) + ) + ret = await parser.aload_data(file_path="../data/rag/test02.pdf") + logger.info(ret) + + file_paths = ["../data/rag/test01.docx", "../data/rag/test02.pdf"] + parser.parse_type = OmniParseType.DOCUMENT + ret = await parser.aload_data(file_path=file_paths) + logger.info(ret) + + +async def main(): + await omniparse_client_example() + await omniparse_example() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/rag_pipeline.py b/examples/rag/rag_pipeline.py similarity index 100% rename from examples/rag_pipeline.py rename to examples/rag/rag_pipeline.py diff --git a/examples/rag_search.py b/examples/rag/rag_search.py similarity index 88% rename from examples/rag_search.py rename to examples/rag/rag_search.py index 258c5ba60..3b0e047f8 100644 --- a/examples/rag_search.py +++ b/examples/rag/rag_search.py @@ -2,7 +2,7 @@ import asyncio -from examples.rag_pipeline import DOC_PATH, QUESTION +from examples.rag.rag_pipeline import DOC_PATH, QUESTION from metagpt.logs import logger from metagpt.rag.engines import SimpleEngine from metagpt.roles import Sales diff --git a/metagpt/config2.py b/metagpt/config2.py index 58a99c920..3aaad28e4 100644 --- a/metagpt/config2.py +++ b/metagpt/config2.py @@ -12,7 +12,7 @@ from typing import Dict, Iterable, List, Literal, Optional from pydantic import BaseModel, model_validator from metagpt.configs.browser_config import BrowserConfig -from metagpt.configs.embedding_config import EmbeddingConfig +from metagpt.configs.embedding_config import EmbeddingConfig, OmniParseConfig from metagpt.configs.llm_config import LLMConfig, LLMType from metagpt.configs.mermaid_config import MermaidConfig from metagpt.configs.redis_config import RedisConfig @@ -51,6 +51,9 @@ class Config(CLIParams, YamlModel): # RAG Embedding embedding: EmbeddingConfig = EmbeddingConfig() + # omniparse + omniparse: OmniParseConfig = OmniParseConfig() + # Global Proxy. Will be used if llm.proxy is not set proxy: str = "" diff --git a/metagpt/configs/embedding_config.py b/metagpt/configs/embedding_config.py index f9b41b9dc..bc7411274 100644 --- a/metagpt/configs/embedding_config.py +++ b/metagpt/configs/embedding_config.py @@ -52,3 +52,8 @@ class EmbeddingConfig(YamlModel): if v == "": return None return v + + +class OmniParseConfig(YamlModel): + api_key: str = "" + base_url: str = "" diff --git a/metagpt/rag/engines/simple.py b/metagpt/rag/engines/simple.py index c237dcf69..3bb665d10 100644 --- a/metagpt/rag/engines/simple.py +++ b/metagpt/rag/engines/simple.py @@ -27,7 +27,9 @@ from llama_index.core.schema import ( QueryType, TransformComponent, ) +from llama_parse import ResultType +from metagpt.config2 import config from metagpt.rag.factories import ( get_index, get_rag_embedding, @@ -36,6 +38,7 @@ from metagpt.rag.factories import ( get_retriever, ) from metagpt.rag.interface import NoEmbedding, RAGObject +from metagpt.rag.parser.omniparse.parse import OmniParse from metagpt.rag.retrievers.base import ModifiableRAGRetriever, PersistableRAGRetriever from metagpt.rag.retrievers.hybrid_retriever import SimpleHybridRetriever from metagpt.rag.schema import ( @@ -43,7 +46,7 @@ from metagpt.rag.schema import ( BaseRankerConfig, BaseRetrieverConfig, BM25RetrieverConfig, - ObjectNode, + ObjectNode, OmniParseOptions, OmniParseType, ) from metagpt.utils.common import import_class @@ -73,6 +76,18 @@ class SimpleEngine(RetrieverQueryEngine): ) self._transformations = transformations or self._default_transformations() + @classmethod + def get_file_extractor(cls, file_type: str): + if not config.omniparse.base_url: + return + parser = OmniParse( + api_key=config.omniparse.api_key, + base_url=config.omniparse.base_url, + parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ResultType.MD) + ) + file_extractor = {file_type: parser} + return file_extractor + @classmethod def from_docs( cls, @@ -100,7 +115,10 @@ class SimpleEngine(RetrieverQueryEngine): if not input_dir and not input_files: raise ValueError("Must provide either `input_dir` or `input_files`.") - documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files).load_data() + file_extractor = cls.get_file_extractor(file_type=".pdf") + documents = SimpleDirectoryReader( + input_dir=input_dir, input_files=input_files, file_extractor=file_extractor + ).load_data() cls._fix_document_metadata(documents) transformations = transformations or cls._default_transformations() diff --git a/metagpt/rag/parser/__init__.py b/metagpt/rag/parser/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/metagpt/rag/parser/omniparse/__init__.py b/metagpt/rag/parser/omniparse/__init__.py new file mode 100644 index 000000000..d453d14d6 --- /dev/null +++ b/metagpt/rag/parser/omniparse/__init__.py @@ -0,0 +1,2 @@ +from .client import OmniParseClient +from .parse import OmniParse diff --git a/metagpt/rag/parser/omniparse/client.py b/metagpt/rag/parser/omniparse/client.py new file mode 100644 index 000000000..5d2c330ef --- /dev/null +++ b/metagpt/rag/parser/omniparse/client.py @@ -0,0 +1,154 @@ +import mimetypes +import os +import aiofiles +import httpx +from typing import Union + +from metagpt.rag.schema import OmniParsedResult + + +class OmniParseClient: + """ + OmniParse Server Client + OmniParse API Docs: https://docs.cognitivelab.in/api + """ + ALLOWED_DOCUMENT_EXTENSIONS = {".pdf", ".ppt", ".pptx", ".doc", ".docx"} + ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".aac"} + ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov"} + + def __init__(self, api_key=None, base_url="http://localhost:8000", max_timeout=120): + """ + Args: + api_key: 默认 None 后续可用于鉴权 + base_url: api 基础url + max_timeout: 请求最大超时时间单位s + """ + self.api_key = api_key + self.base_url = base_url + self.max_timeout = max_timeout + + self.parse_media_endpoint = "/parse_media" + self.parse_website_endpoint = "/parse_website" + self.parse_document_endpoint = "/parse_document" + + async def __request_parse(self, endpoint: str, files: dict = None, json: dict = None) -> dict: + """ + 请求api解析文档 + + Args: + endpoint (str): API endpoint. + files (dict, optional): 请求文件数据. + json (dict, optional): 请求json数据. + + Returns: + dict: 响应的json数据 + """ + url = f"{self.base_url}{endpoint}" + headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {} + async with httpx.AsyncClient() as client: + response = await client.post(url, files=files, json=json, headers=headers, timeout=self.max_timeout) + response.raise_for_status() + return response.json() + + @staticmethod + def verify_file_ext(filelike: Union[str, bytes], allowed_file_extensions: set): + """校验文件后缀""" + if not filelike or isinstance(filelike, bytes): + return + file_ext = os.path.splitext(filelike)[1].lower() + if file_ext not in allowed_file_extensions: + raise ValueError("File extension must be one of {}".format(allowed_file_extensions)) + + async def get_file_info( + self, + filelike: Union[str, bytes], + bytes_filename: str = None, + only_bytes=True, + ) -> Union[bytes, tuple]: + """ + 获取文件字节信息 + Args: + filelike: 文件数据 + bytes_filename: 通过字节数据上传需要指定文件名称,方便获取mime_type + only_bytes: 是否只需要字节数据 + + Notes: + 由于 parse_document 支持多种文件解析,需要上传文件时指定文件的mime_type + + Returns: + [bytes, tuple] + """ + if isinstance(filelike, str): + filename = os.path.basename(filelike) + async with aiofiles.open(filelike, 'rb') as file: + file_bytes = await file.read() + + if only_bytes: + return file_bytes + + mime_type = mimetypes.guess_type(filelike)[0] + return filename, file_bytes, mime_type + elif isinstance(filelike, bytes): + if only_bytes: + return filelike + if not bytes_filename: + raise ValueError("bytes_filename must be set when passing bytes") + + mime_type = mimetypes.guess_type(bytes_filename)[0] + return bytes_filename, filelike, mime_type + else: + raise ValueError("filelike must be a string (file path) or bytes.") + + async def parse_document(self, filelike: Union[str, bytes], bytes_filename: str = None) -> OmniParsedResult: + """ + 解析文档类型数据(支持 ".pdf", ".ppt", ".pptx", ".doc", ".docs") + Args: + filelike: 文件路径 or 文件字节数据 + bytes_filename: 字节数据名称,方便获取mime_type 用于httpx请求 + + Raises + ValueError + + Returns: + OmniParsedResult + """ + self.verify_file_ext(filelike, self.ALLOWED_DOCUMENT_EXTENSIONS) + file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False) + resp = await self.__request_parse(self.parse_document_endpoint, files={'file': file_info}) + data = OmniParsedResult(**resp) + return data + + async def parse_pdf(self, filelike: Union[str, bytes]) -> OmniParsedResult: + """ + 解析PDF文档 + Args: + filelike: 文件路径 or 文件字节数据 + + Raises + ValueError + + Returns: + OmniParsedResult + """ + self.verify_file_ext(filelike, {".pdf"}) + file_info = await self.get_file_info(filelike) + endpoint = f"{self.parse_document_endpoint}/pdf" + resp = await self.__request_parse(endpoint=endpoint, files={'file': file_info}) + data = OmniParsedResult(**resp) + return data + + async def parse_video(self, filelike: Union[str, bytes], bytes_filename: str = None) -> dict: + """解析视频""" + self.verify_file_ext(filelike, self.ALLOWED_VIDEO_EXTENSIONS) + file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False) + return await self.__request_parse(f"{self.parse_media_endpoint}/video", files={'file': file_info}) + + async def parse_audio(self, filelike: Union[str, bytes], bytes_filename: str = None) -> dict: + """解析音频""" + self.verify_file_ext(filelike, self.ALLOWED_AUDIO_EXTENSIONS) + file_info = await self.get_file_info(filelike, bytes_filename, only_bytes=False) + return await self.__request_parse(f"{self.parse_media_endpoint}/audio", files={'file': file_info}) + + async def parse_website(self, url: str) -> dict: + """解析网站""" + return await self.__request_parse(self.parse_website_endpoint, json={'url': url}) diff --git a/metagpt/rag/parser/omniparse/parse.py b/metagpt/rag/parser/omniparse/parse.py new file mode 100644 index 000000000..db61759f5 --- /dev/null +++ b/metagpt/rag/parser/omniparse/parse.py @@ -0,0 +1,97 @@ +import asyncio +from fileinput import FileInput +from typing import List, Union, Optional + +from llama_index.core import Document +from llama_index.core.async_utils import run_jobs +from llama_index.core.readers.base import BaseReader +from llama_parse import ResultType + +from metagpt.rag.parser.omniparse.client import OmniParseClient +from metagpt.rag.schema import OmniParseOptions, OmniParseType +from metagpt.logs import logger + + +class OmniParse(BaseReader): + """OmniParse""" + + def __init__( + self, + api_key=None, + base_url="http://localhost:8000", + parse_options: OmniParseOptions = None + ): + self.parse_options = parse_options or OmniParseOptions() + self.omniparse_client = OmniParseClient(api_key, base_url, max_timeout=self.parse_options.max_timeout) + + @property + def parse_type(self): + return self.parse_options.parse_type + + @property + def result_type(self): + return self.parse_options.result_type + + @parse_type.setter + def parse_type(self, parse_type: Union[str, OmniParseType]): + if isinstance(parse_type, str): + parse_type = OmniParseType(parse_type) + self.parse_options.parse_type = parse_type + + @result_type.setter + def result_type(self, result_type: Union[str, ResultType]): + if isinstance(result_type, str): + result_type = ResultType(result_type) + self.parse_options.result_type = result_type + + async def _aload_data( + self, + file_path: Union[str, bytes], + extra_info: Optional[dict] = None, + ) -> List[Document]: + try: + if self.parse_type == OmniParseType.PDF: + # 目前先只支持pdf解析 + parsed_result = await self.omniparse_client.parse_pdf(file_path) + else: + extra_info = extra_info or {} + filename = extra_info.get("filename") # 兼容字节数据要额外传filename + parsed_result = await self.omniparse_client.parse_document(file_path, bytes_filename=filename) + + # 获取指定的结构数据 + content = getattr(parsed_result, self.result_type) + docs = [ + Document( + text=content, + metadata=extra_info or {}, + ) + ] + except Exception as e: + logger.error(f"OMNI Parse Error: {e}") + docs = [] + + return docs + + async def aload_data( + self, + file_path: Union[List[FileInput], FileInput], + extra_info: Optional[dict] = None, + ) -> List[Document]: + docs = [] + if isinstance(file_path, (str, bytes)): + # 处理单个 + docs = await self._aload_data(file_path, extra_info) + elif isinstance(file_path, list): + # 并发处理多个 + parse_jobs = [self._aload_data(file_item, extra_info) for file_item in file_path] + doc_ret_list = await run_jobs(jobs=parse_jobs, workers=self.parse_options.num_workers) + docs = [doc for docs in doc_ret_list for doc in docs] + return docs + + def load_data( + self, + file_path: Union[List[FileInput], FileInput], + extra_info: Optional[dict] = None, + ) -> List[Document]: + """Load data from the input path.""" + return asyncio.run(self.aload_data(file_path, extra_info)) diff --git a/metagpt/rag/schema.py b/metagpt/rag/schema.py index 618880a22..7ef191d0c 100644 --- a/metagpt/rag/schema.py +++ b/metagpt/rag/schema.py @@ -1,13 +1,14 @@ """RAG schemas.""" - +from enum import Enum from pathlib import Path -from typing import Any, ClassVar, Literal, Optional, Union +from typing import Any, ClassVar, Literal, Optional, Union, List -from chromadb.api.types import CollectionMetadata +# from chromadb.api.types import CollectionMetadata from llama_index.core.embeddings import BaseEmbedding from llama_index.core.indices.base import BaseIndex from llama_index.core.schema import TextNode from llama_index.core.vector_stores.types import VectorStoreQueryMode +from llama_parse import ResultType from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator from metagpt.config2 import config @@ -67,9 +68,9 @@ class ChromaRetrieverConfig(IndexRetrieverConfig): persist_path: Union[str, Path] = Field(default="./chroma_db", description="The directory to save data.") collection_name: str = Field(default="metagpt", description="The name of the collection.") - metadata: Optional[CollectionMetadata] = Field( - default=None, description="Optional metadata to associate with the collection" - ) + # metadata: Optional[CollectionMetadata] = Field( + # default=None, description="Optional metadata to associate with the collection" + # ) class ElasticsearchStoreConfig(BaseModel): @@ -165,9 +166,9 @@ class ChromaIndexConfig(VectorIndexConfig): """Config for chroma-based index.""" collection_name: str = Field(default="metagpt", description="The name of the collection.") - metadata: Optional[CollectionMetadata] = Field( - default=None, description="Optional metadata to associate with the collection" - ) + # metadata: Optional[CollectionMetadata] = Field( + # default=None, description="Optional metadata to associate with the collection" + # ) class BM25IndexConfig(BaseIndexConfig): @@ -214,3 +215,41 @@ class ObjectNode(TextNode): ) return metadata.model_dump() + + +class OmniParseType(str, Enum): + """OmniParse解析类型""" + PDF = "PDF" + DOCUMENT = "DOCUMENT" + + +class OmniParseOptions(BaseModel): + """OmniParse可选配置""" + result_type: ResultType = Field(default=ResultType.MD, description="OmniParse解析返回的结果类型") + parse_type: OmniParseType = Field(default=OmniParseType.DOCUMENT, description="OmniParse解析类型,默认文档类型") + max_timeout: Optional[int] = Field(default=120, description="OmniParse服务请求最大超时") + num_workers: int = Field( + default=4, + gt=0, + lt=10, + description="多文件列表时并发请求数量", + ) + + +class OminParseImage(BaseModel): + image: str = Field(default="", description="image str bytes") + image_name: str = Field(default="", description="image name") + image_info: Optional[dict] = Field(default={}, description="image info") + + +class OmniParsedResult(BaseModel): + markdown: str = Field(default="", description="markdown text") + text: str = Field(default="", description="plain text") + images: Optional[List[OminParseImage]] = Field(default=[], description="images") + metadata: Optional[dict] = Field(default={}, description="metadata") + + @model_validator(mode="before") + def set_markdown(cls, values): + if not values.get("markdown"): + values["markdown"] = values.get("text") + return values