Merge branch 'feature/toollib/git' into feature/import_repo

This commit is contained in:
莘权 马 2024-03-26 23:08:16 +08:00
commit 426eb5e61f
10 changed files with 357 additions and 41 deletions

View file

@ -822,19 +822,60 @@ See FAQ 5.8
raise retry_state.outcome.exception()
def get_markdown_codeblock_type(filename: str) -> str:
async def get_mime_type(filename: str | Path, force_read: bool = False) -> str:
guess_mime_type, _ = mimetypes.guess_type(filename.name)
if not guess_mime_type:
ext_mappings = {".yml": "text/yaml", ".yaml": "text/yaml"}
guess_mime_type = ext_mappings.get(filename.suffix)
if not force_read and guess_mime_type:
return guess_mime_type
from metagpt.tools.libs.shell import shell_execute # avoid circular import
text_set = {
"application/json",
"application/vnd.chipnuts.karaoke-mmd",
"application/javascript",
"application/xml",
"application/x-sh",
"application/sql",
"text/yaml",
}
try:
stdout, _, _ = await shell_execute(f"file --mime-type {str(filename)}")
ix = stdout.rfind(" ")
mime_type = stdout[ix:].strip()
if mime_type == "text/plain" and guess_mime_type in text_set:
return guess_mime_type
return mime_type
except Exception as e:
logger.debug(f"file:{filename}, error:{e}")
return "unknown"
def get_markdown_codeblock_type(filename: str = None, mime_type: str = None) -> str:
"""Return the markdown code-block type corresponding to the file extension."""
mime_type, _ = mimetypes.guess_type(filename)
if not filename and not mime_type:
raise ValueError("Either filename or mime_type must be valid.")
if not mime_type:
mime_type, _ = mimetypes.guess_type(filename)
mappings = {
"text/x-shellscript": "bash",
"text/x-c++src": "cpp",
"text/css": "css",
"text/html": "html",
"text/x-java": "java",
"application/javascript": "javascript",
"application/json": "json",
"text/x-python": "python",
"text/x-ruby": "ruby",
"text/x-c": "cpp",
"text/yaml": "yaml",
"application/javascript": "javascript",
"application/json": "json",
"application/sql": "sql",
"application/vnd.chipnuts.karaoke-mmd": "mermaid",
"application/x-sh": "bash",
"application/xml": "xml",
}
return mappings.get(mime_type, "text")

View file

@ -9,6 +9,7 @@
from __future__ import annotations
import shutil
import uuid
from enum import Enum
from pathlib import Path
from typing import Dict, List
@ -16,8 +17,10 @@ from typing import Dict, List
from git.repo import Repo
from git.repo.fun import is_git_dir
from gitignore_parser import parse_gitignore
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.logs import logger
from metagpt.tools.libs.shell import shell_execute
from metagpt.utils.dependency_file import DependencyFile
from metagpt.utils.file_repository import FileRepository
@ -283,3 +286,33 @@ class GitRepository:
continue
files.append(filename)
return files
@classmethod
@retry(wait=wait_random_exponential(min=1, max=15), stop=stop_after_attempt(3))
async def clone_from(cls, url: str | Path, output_dir: str | Path = None) -> "GitRepository":
from metagpt.context import Context
to_path = Path(output_dir or Path(__file__).parent / f"../../workspace/downloads/{uuid.uuid4().hex}").resolve()
to_path.mkdir(parents=True, exist_ok=True)
repo_dir = to_path / Path(url).stem
if repo_dir.exists():
shutil.rmtree(repo_dir, ignore_errors=True)
ctx = Context()
env = ctx.new_environ()
proxy = ["-c", f"http.proxy={ctx.config.proxy}"] if ctx.config.proxy else []
command = ["git", "clone"] + proxy + [str(url)]
logger.info(" ".join(command))
stdout, stderr, return_code = await shell_execute(command=command, cwd=str(to_path), env=env, timeout=600)
info = f"{stdout}\n{stderr}\nexit: {return_code}\n"
logger.info(info)
dir_name = Path(url).with_suffix("").name
to_path = to_path / dir_name
if not cls.is_git_dir(to_path):
raise ValueError(info)
logger.info(f"git clone to {to_path}")
return GitRepository(local_path=to_path, auto_init=False)
async def checkout(self, commit_id: str):
self._repository.git.checkout(commit_id)
logger.info(f"git checkout {commit_id}")

View file

@ -5,17 +5,24 @@ This file provides functionality to convert a local repository into a markdown r
"""
from __future__ import annotations
import mimetypes
import re
from pathlib import Path
from typing import Tuple
from gitignore_parser import parse_gitignore
from metagpt.logs import logger
from metagpt.utils.common import aread, awrite, get_markdown_codeblock_type, list_files
from metagpt.utils.common import (
aread,
awrite,
get_markdown_codeblock_type,
get_mime_type,
list_files,
)
from metagpt.utils.tree import tree
async def repo_to_markdown(repo_path: str | Path, output: str | Path = None, gitignore: str | Path = None) -> str:
async def repo_to_markdown(repo_path: str | Path, output: str | Path = None) -> str:
"""
Convert a local repository into a markdown representation.
@ -25,56 +32,108 @@ async def repo_to_markdown(repo_path: str | Path, output: str | Path = None, git
Args:
repo_path (str | Path): The path to the local repository.
output (str | Path, optional): The path to save the generated markdown file. Defaults to None.
gitignore (str | Path, optional): The path to the .gitignore file. Defaults to None.
Returns:
str: The markdown representation of the repository.
"""
repo_path = Path(repo_path)
gitignore = Path(gitignore or Path(__file__).parent / "../../.gitignore").resolve()
repo_path = Path(repo_path).resolve()
gitignore_file = repo_path / ".gitignore"
markdown = await _write_dir_tree(repo_path=repo_path, gitignore=gitignore)
markdown = await _write_dir_tree(repo_path=repo_path, gitignore=gitignore_file)
gitignore_rules = parse_gitignore(full_path=str(gitignore))
gitignore_rules = parse_gitignore(full_path=str(gitignore_file)) if gitignore_file.exists() else None
markdown += await _write_files(repo_path=repo_path, gitignore_rules=gitignore_rules)
if output:
await awrite(filename=str(output), data=markdown, encoding="utf-8")
output_file = Path(output).resolve()
output_file.parent.mkdir(parents=True, exist_ok=True)
await awrite(filename=str(output_file), data=markdown, encoding="utf-8")
logger.info(f"save: {output_file}")
return markdown
async def _write_dir_tree(repo_path: Path, gitignore: Path) -> str:
try:
content = tree(repo_path, gitignore, run_command=True)
content = await tree(repo_path, gitignore, run_command=True)
except Exception as e:
logger.info(f"{e}, using safe mode.")
content = tree(repo_path, gitignore, run_command=False)
content = await tree(repo_path, gitignore, run_command=False)
doc = f"## Directory Tree\n```text\n{content}\n```\n---\n\n"
return doc
async def _write_files(repo_path, gitignore_rules) -> str:
async def _write_files(repo_path, gitignore_rules=None) -> str:
filenames = list_files(repo_path)
markdown = ""
pattern = r"^\..*" # Hidden folders/files
for filename in filenames:
if gitignore_rules(str(filename)):
if gitignore_rules and gitignore_rules(str(filename)):
continue
ignore = False
for i in filename.parts:
if re.match(pattern, i):
ignore = True
break
if ignore:
continue
markdown += await _write_file(filename=filename, repo_path=repo_path)
return markdown
async def _write_file(filename: Path, repo_path: Path) -> str:
relative_path = filename.relative_to(repo_path)
markdown = f"## {relative_path}\n"
mime_type, _ = mimetypes.guess_type(filename.name)
if "text/" not in mime_type:
is_text, mime_type = await _is_text_file(filename)
if not is_text:
logger.info(f"Ignore content: {filename}")
markdown += "<binary file>\n---\n\n"
return ""
try:
relative_path = filename.relative_to(repo_path)
markdown = f"## {relative_path}\n"
content = await aread(filename, encoding="utf-8")
content = content.replace("```", "\\`\\`\\`").replace("---", "\\-\\-\\-")
code_block_type = get_markdown_codeblock_type(filename.name)
markdown += f"```{code_block_type}\n{content}\n```\n---\n\n"
return markdown
content = await aread(filename, encoding="utf-8")
content = content.replace("```", "\\`\\`\\`").replace("---", "\\-\\-\\-")
code_block_type = get_markdown_codeblock_type(filename.name)
markdown += f"```{code_block_type}\n{content}\n```\n---\n\n"
return markdown
except Exception as e:
logger.error(e)
return ""
async def _is_text_file(filename: Path) -> Tuple[bool, str]:
pass_set = {
"application/json",
"application/vnd.chipnuts.karaoke-mmd",
"application/javascript",
"application/xml",
"application/x-sh",
"application/sql",
}
denied_set = {
"application/zlib",
"application/octet-stream",
"image/svg+xml",
"application/pdf",
"application/msword",
"application/vnd.ms-excel",
"audio/x-wav",
"application/x-git",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/zip",
"image/jpeg",
"audio/mpeg",
"video/mp2t",
"inode/x-empty",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"image/png",
"image/vnd.microsoft.icon",
"video/mp4",
}
mime_type = await get_mime_type(filename, force_read=True)
v = "text/" in mime_type or mime_type in pass_set
if v:
return True, mime_type
if mime_type not in denied_set:
logger.info(mime_type)
return False, mime_type

View file

@ -27,14 +27,15 @@
"""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Callable, Dict, List
from gitignore_parser import parse_gitignore
from metagpt.tools.libs.shell import shell_execute
def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = False) -> str:
async def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = False) -> str:
"""
Recursively traverses the directory structure and prints it out in a tree-like format.
@ -80,7 +81,7 @@ def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = Fal
"""
root = Path(root).resolve()
if run_command:
return _execute_tree(root, gitignore)
return await _execute_tree(root, gitignore)
git_ignore_rules = parse_gitignore(gitignore) if gitignore else None
dir_ = {root.name: _list_children(root=root, git_ignore_rules=git_ignore_rules)}
@ -129,12 +130,7 @@ def _add_line(rows: List[str]) -> List[str]:
return rows
def _execute_tree(root: Path, gitignore: str | Path) -> str:
async def _execute_tree(root: Path, gitignore: str | Path) -> str:
args = ["--gitfile", str(gitignore)] if gitignore else []
try:
result = subprocess.run(["tree"] + args + [str(root)], capture_output=True, text=True, check=True)
if result.returncode != 0:
raise ValueError(f"tree exits with code {result.returncode}")
return result.stdout
except subprocess.CalledProcessError as e:
raise e
stdout, _, _ = await shell_execute(["tree"] + args + [str(root)])
return stdout