feat: merge mgx_ops

This commit is contained in:
莘权 马 2024-04-30 11:41:37 +08:00
commit 81540fe4d9
13 changed files with 719 additions and 55 deletions

View file

@ -0,0 +1,41 @@
CMD_PROMPT = """
# Data Structure
class Task(BaseModel):
task_id: str = ""
dependent_task_ids: list[str] = []
instruction: str = ""
task_type: str = ""
assignee: str = "David"
# Available Commands
{available_commands}
# Current Plan
{plan_status}
# Example
{example}
# Instructions
Based on the context, write a plan or modify an existing plan to achieve the goal. A plan consists of one to 3 tasks.
If plan is created, you should track the progress and update the plan accordingly, such as finish_current_task, append_task, reset_task, replace_task, etc.
Pay close attention to new user message, review the conversation history, use reply_to_human to respond to new user requirement.
Note:
1. If you keeping encountering errors, unexpected situation, or you are not sure of proceeding, use ask_human to ask for help.
2. Each time you finish a task, use reply_to_human to report your progress.
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command:
Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands.
```json
[
{{
"command_name": str,
"args": {{"arg_name": arg_value, ...}}
}},
...
]
```
"""

View file

@ -1,3 +1,9 @@
SYSTEM_PROMPT = """
You are a team leader, and you are responsible for drafting tasks and routing tasks to your team members.
When drafting and routing tasks, ALWAYS include necessary or important info inside the instruction, such as path, link, environment to team members, because you are their sole info source.
Each time you do something, reply to human letting them know what you did.
"""
CMD_PROMPT = """
# Data Structure
class Task(BaseModel):

View file

@ -1,7 +1,9 @@
INTERPRETER_SYSTEM_MSG = """
As a data scientist, you need to help user to achieve their goal step by step in a continuous Jupyter notebook.
Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function.
If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available before trying ! in notebook block.
If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available. DON'T use ! in notebook block.
Don't write all codes in one response, each time, just write code for one step or current task.
While some concise thoughts are helpful, code is absolutely required. Always output one and only one code block in your response.
"""
STRUCTUAL_PROMPT = """

View file

@ -0,0 +1,114 @@
from __future__ import annotations
import json
from typing import Literal
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.data_analyst import CMD_PROMPT
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, TaskResult
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.tools.tool_recommend import BM25ToolRecommender
from metagpt.utils.common import CodeParser
class DataAnalyst(DataInterpreter):
name: str = "David"
profile: str = "DataAnalyst"
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
task_result: TaskResult = None
available_commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
# Command.PUBLISH_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
# Command.PASS,
]
commands: list[dict] = [] # issued commands to be executed
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
assert self.auto_run
assert self.use_plan
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
self.set_actions([WriteAnalysisCode])
self._set_state(0)
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self._set_state(0)
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
else:
self.working_memory.add_batch(self.rc.news)
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
example = ""
prompt = CMD_PROMPT.format(
plan_status=plan_status,
example=example,
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, self.commands)
return bool(self.rc.todo)
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
logger.info(f"ready to take on task {self.planner.plan.current_task}")
code, result, is_success = await self._write_and_exec_code()
self.planner.plan.current_task.is_success = (
is_success # mark is_success, determine is_finished later in thinking
)
self.task_result = TaskResult(code=code, result=result, is_success=is_success)
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _react(self) -> Message:
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: difference here, keep observing within react
await self._observe()
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action

View file

@ -5,7 +5,7 @@ from typing import Literal
from pydantic import Field, model_validator
from metagpt.actions.di.ask_review import ReviewConst
# from metagpt.actions.di.ask_review import ReviewConst
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
from metagpt.logs import logger
@ -43,6 +43,7 @@ class DataInterpreter(Role):
tool_recommender: ToolRecommender = None
react_mode: Literal["plan_and_act", "react"] = "plan_and_act"
max_react_loop: int = 10 # used for react mode
user_requirement: str = ""
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
@ -62,7 +63,7 @@ class DataInterpreter(Role):
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
user_requirement = self.get_memories()[-1].content
self.user_requirement = self.get_memories()[-1].content
context = self.working_memory.get()
if not context:
@ -71,7 +72,7 @@ class DataInterpreter(Role):
self._set_state(0)
return True
prompt = REACT_THINK_PROMPT.format(user_requirement=user_requirement, context=context)
prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context)
rsp = await self.llm.aask(prompt)
rsp_dict = json.loads(CodeParser.parse_code(text=rsp))
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))
@ -83,7 +84,7 @@ class DataInterpreter(Role):
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
code, _, _ = await self._write_and_exec_code()
return Message(content=code, role="assistant", cause_by=WriteAnalysisCode)
return Message(content=code, role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _plan_and_act(self) -> Message:
self._set_state(0)
@ -136,11 +137,11 @@ class DataInterpreter(Role):
### process execution result ###
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORDS[0] in review:
counter = 0 # redo the task again with help of human suggestions
# if not success and counter >= max_retry:
# logger.info("coding failed!")
# review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
# if ReviewConst.CHANGE_WORDS[0] in review:
# counter = 0 # redo the task again with help of human suggestions
return code, result, success
@ -154,10 +155,8 @@ class DataInterpreter(Role):
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial
user_requirement = self.get_memories()[-1].content
code = await todo.run(
user_requirement=user_requirement,
user_requirement=self.user_requirement,
plan_status=plan_status,
tool_info=tool_info,
working_memory=self.working_memory.get(),

View file

@ -5,13 +5,20 @@ import json
from pydantic import model_validator
from metagpt.actions.di.run_command import RunCommand
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.prompts.di.team_leader import CMD_PROMPT, FINISH_CURRENT_TASK_CMD
from metagpt.prompts.di.team_leader import (
CMD_PROMPT,
FINISH_CURRENT_TASK_CMD,
SYSTEM_PROMPT,
)
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.schema import Message, TaskResult
from metagpt.strategy.experience_retriever import SimpleExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import Command, prepare_command_prompt
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.utils.common import CodeParser
@ -33,39 +40,12 @@ class TeamLeader(Role):
@model_validator(mode="after")
def set_plan(self) -> "TeamLeader":
self.rc.working_memory = (
self.rc.memory
) # TeamLeader does not need working memory, all messages should go into memory
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
async def _run_env_command(self, cmd):
assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
self.publish_message(Message(**cmd["args"]))
elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
await self.rc.env.ask_human(sent_from=self, **cmd["args"])
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
await self.rc.env.reply_to_human(sent_from=self, **cmd["args"])
def _run_internal_command(self, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
self.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
self.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
self.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
self.planner.plan.current_task.update_task_result(task_result=self.task_result)
self.planner.plan.finish_current_task()
self.rc.working_memory.clear()
async def run_commands(self, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
await self._run_env_command(cmd)
self._run_internal_command(cmd)
if self.planner.plan.is_plan_finished():
self._set_state(-1)
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
@ -92,7 +72,7 @@ class TeamLeader(Role):
)
context = self.llm.format_msg(self.get_memories(k=10) + [Message(content=prompt, role="user")])
rsp = await self.llm.aask(context)
rsp = await self.llm.aask(context, system_msgs=[SYSTEM_PROMPT])
self.commands = json.loads(CodeParser.parse_code(text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))
@ -100,7 +80,7 @@ class TeamLeader(Role):
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
await self.run_commands(self.commands)
await run_commands(self, self.commands)
self.task_result = TaskResult(result="Success", is_success=True)
msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface
self.rc.memory.add(msg)

View file

@ -424,11 +424,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
news = self.rc.msg_buffer.pop_all()
# Store the read messages in your own memory to prevent duplicate processing.
old_messages = [] if not self.enable_memory else self.rc.memory.get()
self.rc.memory.add_batch(news)
# Filter out messages of interest.
# Filter in messages of interest.
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]
self.rc.memory.add_batch(self.rc.news) # only save messages of interest into memory
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg
# Design Rules:

View file

@ -68,6 +68,12 @@ class SimpleExpRetriever(ExpRetriever):
"content": "User request to create a cli snake game. Please create a product requirement document (PRD) outlining the features, user interface, and user experience of the snake game.",
"send_to": "Alice"
}
},
{
"command_name": "reply_to_human",
"args": {
"content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly",
}
}
]
```
@ -92,6 +98,12 @@ class SimpleExpRetriever(ExpRetriever):
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"send_to": "David"
}
},
{
"command_name": "reply_to_human",
"args": {
"content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.",
}
}
]
```
@ -110,10 +122,16 @@ class SimpleExpRetriever(ExpRetriever):
"args": {}
},
{
"command_name": "publish_message",
"command_name": "publish_message",
"args": {
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
"send_to": "Bob"
}
},
{
"command_name": "reply_to_human",
"args": {
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
"send_to": "Bob"
"content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.",
}
}
]

View file

@ -2,6 +2,10 @@ from enum import Enum
from pydantic import BaseModel
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.roles import Role
from metagpt.schema import Message, Task
class CommandDef(BaseModel):
name: str
@ -66,3 +70,39 @@ def prepare_command_prompt(commands: list[Command]) -> str:
for i, command in enumerate(commands):
command_prompt += f"{i+1}. {command.value.signature}:\n{command.value.desc}\n\n"
return command_prompt
async def run_env_command(role: Role, cmd):
assert isinstance(role.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
role.publish_message(Message(**cmd["args"]))
if cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
role.rc.working_memory.add(Message(content=cmd["args"]["question"], role="assistant"))
human_rsp = await role.rc.env.ask_human(sent_from=role, **cmd["args"])
role.rc.working_memory.add(Message(content=human_rsp, role="user"))
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
# TODO: consider if the message should go into memory
await role.rc.env.reply_to_human(sent_from=role, **cmd["args"])
def run_plan_command(role: Role, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
role.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
role.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
role.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
role.planner.plan.current_task.update_task_result(task_result=role.task_result)
role.planner.plan.finish_current_task()
role.rc.working_memory.clear()
async def run_commands(role: Role, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
await run_env_command(role, cmd)
run_plan_command(role, cmd)
if role.planner.plan.is_plan_finished():
role._set_state(-1)

View file

@ -3,6 +3,10 @@
from __future__ import annotations
from pathlib import Path
from typing import Optional
from github.Issue import Issue
from github.PullRequest import PullRequest
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.git_repository import GitRepository
@ -63,3 +67,147 @@ async def git_checkout(repo_dir: str | Path, commit_id: str):
if not repo.is_valid:
ValueError(f"Invalid git root: {repo_dir}")
await repo.checkout(commit_id)
@register_tool(tags=["git"])
async def create_pull_request(
access_token: str,
base: str,
head: str,
base_repo_name: str,
head_repo_name: Optional[str] = None,
title: Optional[str] = None,
body: Optional[str] = None,
) -> PullRequest:
"""
Creates a pull request in a Git repository.
Args:
access_token (str): The access token for authentication.
base (str): The name of the base branch of the pull request (e.g., 'main', 'master').
head (str): The name of the head branch of the pull request (e.g., 'feature-branch').
base_repo_name (str): The full repository name (user/repo) where the pull request will be created.
head_repo_name (Optional[str], optional): The full repository name (user/repo) where the pull request will merge from. Defaults to None.
title (Optional[str]): The title of the pull request.
body (Optional[str]): The body of the pull request.
Returns:
PullRequest: The created pull request object.
Raises:
ValueError: If `access_token` is invalid. Visit: "https://github.com/settings/tokens"
Any exceptions that might occur during the pull request creation process.
Note:
This function is intended to be used in an asynchronous context (with `await`).
Example:
>>> # Merge Request
>>> repo_name = "user/repo" # "user/repo" for example: "https://github.com/user/repo.git"
>>> base = "master" # branch that merge to
>>> head = "feature/new_feature" # branch that merge from
>>> title = "Implement new feature"
>>> body = "This pull request adds functionality X, Y, and Z."
>>> pull_request = await create_pull_request(
repo_name=repo_name,
base=base,
head=head,
title=title,
body=body,
access_token=get_env("git_access_token")
)
>>> print(pull_request)
PullRequest(title="Implement new feature", number=26)
>>> # Pull Request
>>> base_repo_name = "user1/repo1" # for example: "user1/repo1" from "https://github.com/user1/repo1.git"
>>> head_repo_name = "user2/repo2" # for example: "user2/repo2" from "https://github.com/user2/repo2.git"
>>> base = "master" # branch that merge to
>>> head = "feature/new_feature" # branch that merge from
>>> title = "Implement new feature"
>>> body = "This pull request adds functionality X, Y, and Z."
>>> pull_request = await create_pull_request(
base_repo_name=base_repo_name,
head_repo_name=head_repo_name,
base=base,
head=head,
title=title,
body=body,
access_token=get_env("git_access_token")
)
>>> print(pull_request)
PullRequest(title="Implement new feature", number=26)
"""
return await GitRepository.create_pull(
base_repo_name=base_repo_name,
head_repo_name=head_repo_name,
base=base,
head=head,
title=title,
body=body,
access_token=access_token,
)
@register_tool(tags=["git"])
async def create_issue(
access_token: str,
repo_name: str,
title: str,
body: Optional[str] = None,
assignee: Optional[str] = None,
labels: Optional[list[str]] = None,
) -> Issue:
"""
Creates an issue in the specified repository.
Args:
access_token (str): The access token for authentication.
Visit `https://github.com/settings/tokens` to obtain a personal access token.
For more authentication options, visit: `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html`
repo_name (str): The full repository name (user/repo) where the issue will be created.
title (str): The title of the issue.
body (Optional[str], optional): The body of the issue. Defaults to None.
assignee (Optional[str], optional): The username of the assignee for the issue. Defaults to None.
labels (Optional[list[str]], optional): A list of label names to associate with the issue. Defaults to None.
Returns:
Issue: The created issue object.
Example:
>>> # Create an issue with title and body
>>> repo_name = "username/repository"
>>> title = "Bug Report"
>>> body = "I found a bug in the application."
>>> issue = await create_issue(
repo_name=repo_name,
title=title,
body=body,
access_token=get_env("git_access_token")
)
>>> print(issue)
Issue(title="Bug Report", number=26)
>>> # Create an issue with title, body, assignee, and labels
>>> repo_name = "username/repository"
>>> title = "Bug Report"
>>> body = "I found a bug in the application."
>>> assignee = "john_doe"
>>> labels = ["enhancement", "help wanted"]
>>> issue = await create_issue(
repo_name=repo_name,
title=title,
body=body,
assignee=assigee,
labels=labels,
access_token=get_env("git_access_token")
)
>>> print(issue)
Issue(title="Bug Report", number=26)
"""
return await GitRepository.create_issue(
repo_name=repo_name, title=title, body=body, assignee=assignee, labels=labels, access_token=access_token
)

View file

@ -8,14 +8,22 @@
"""
from __future__ import annotations
import re
import shutil
import uuid
from enum import Enum
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional
from git.repo import Repo
from git.repo.fun import is_git_dir
from github import Auth, Github
from github.GithubObject import NotSet
from github.Issue import Issue
from github.Label import Label
from github.Milestone import Milestone
from github.NamedUser import NamedUser
from github.PullRequest import PullRequest
from gitignore_parser import parse_gitignore
from tenacity import retry, stop_after_attempt, wait_random_exponential
@ -35,6 +43,12 @@ class ChangeType(Enum):
UNTRACTED = "U" # File is untracked (not added to version control)
class RateLimitError(Exception):
def __init__(self, message="Rate limit exceeded"):
self.message = message
super().__init__(self.message)
class GitRepository:
"""A class representing a Git repository.
@ -322,3 +336,156 @@ class GitRepository:
def log(self) -> str:
"""Return git log"""
return self._repository.git.log()
@staticmethod
async def create_pull(
base: str,
head: str,
base_repo_name: str,
head_repo_name: Optional[str] = None,
*,
title: Optional[str] = None,
body: Optional[str] = None,
maintainer_can_modify: Optional[bool] = None,
draft: Optional[bool] = None,
issue: Optional[Issue] = None,
access_token: Optional[str] = None,
auth: Optional[Auth] = None,
) -> PullRequest:
"""
Creates a pull request in the specified repository.
Args:
base (str): The name of the base branch.
head (str): The name of the head branch.
base_repo_name (str): The full repository name (user/repo) where the pull request will be created.
head_repo_name (Optional[str], optional): The full repository name (user/repo) where the pull request will merge from. Defaults to None.
title (Optional[str], optional): The title of the pull request. Defaults to None.
body (Optional[str], optional): The body of the pull request. Defaults to None.
maintainer_can_modify (Optional[bool], optional): Whether maintainers can modify the pull request. Defaults to None.
draft (Optional[bool], optional): Whether the pull request is a draft. Defaults to None.
issue (Optional[Issue], optional): The issue linked to the pull request. Defaults to None.
access_token (Optional[str], optional): The access token for authentication. Defaults to None. Visit `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html`, `https://github.com/PyGithub/PyGithub/blob/main/doc/examples/Authentication.rst`.
auth (Optional[Auth], optional): The authentication method. Defaults to None. Visit `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html`
Returns:
PullRequest: The created pull request object.
"""
title = title or NotSet
body = body or NotSet
maintainer_can_modify = maintainer_can_modify or NotSet
draft = draft or NotSet
issue = issue or NotSet
if not auth and not access_token:
raise ValueError('`access_token` is invalid. Visit: "https://github.com/settings/tokens"')
auth = auth or Auth.Token(access_token)
g = Github(auth=auth)
base_repo = g.get_repo(base_repo_name)
head_repo = g.get_repo(head_repo_name) if head_repo_name and head_repo_name != base_repo_name else None
x_ratelimit_remaining = base_repo.raw_headers.get("x-ratelimit-remaining")
if (
x_ratelimit_remaining
and bool(re.match(r"^-?\d+$", x_ratelimit_remaining))
and int(x_ratelimit_remaining) <= 0
):
raise RateLimitError()
if not head_repo:
pr = base_repo.create_pull(
base=base,
head=head,
title=title,
body=body,
maintainer_can_modify=maintainer_can_modify,
draft=draft,
issue=issue,
)
else:
head_branch = base_repo.get_branch(base)
base_branch = head_repo.get_branch(head)
pr = base_repo.create_pull(
base=base_branch.name,
head=head_branch.commit.sha,
title=title,
body=body,
maintainer_can_modify=maintainer_can_modify,
draft=draft,
issue=issue,
)
return pr
@staticmethod
async def create_issue(
repo_name: str,
title: str,
body: Optional[str] = None,
assignee: NamedUser | Optional[str] = None,
milestone: Optional[Milestone] = None,
labels: list[Label] | Optional[list[str]] = None,
assignees: Optional[list[str]] | list[NamedUser] = None,
access_token: Optional[str] = None,
auth: Optional[Auth] = None,
) -> Issue:
"""
Creates an issue in the specified repository.
Args:
repo_name (str): The full repository name (user/repo) where the issue will be created.
title (str): The title of the issue.
body (Optional[str], optional): The body of the issue. Defaults to None.
assignee (Union[NamedUser, str], optional): The assignee for the issue, either as a NamedUser object or their username. Defaults to None.
milestone (Optional[Milestone], optional): The milestone to associate with the issue. Defaults to None.
labels (Union[list[Label], list[str]], optional): The labels to associate with the issue, either as Label objects or their names. Defaults to None.
assignees (Union[list[str], list[NamedUser]], optional): The list of usernames or NamedUser objects to assign to the issue. Defaults to None.
access_token (Optional[str], optional): The access token for authentication. Defaults to None. Visit `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html`, `https://github.com/PyGithub/PyGithub/blob/main/doc/examples/Authentication.rst`.
auth (Optional[Auth], optional): The authentication method. Defaults to None. Visit `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html`
Returns:
Issue: The created issue object.
"""
body = body or NotSet
assignee = assignee or NotSet
milestone = milestone or NotSet
labels = labels or NotSet
assignees = assignees or NotSet
if not auth and not access_token:
raise ValueError('`access_token` is invalid. Visit: "https://github.com/settings/tokens"')
auth = auth or Auth.Token(access_token)
g = Github(auth=auth)
repo = g.get_repo(repo_name)
x_ratelimit_remaining = repo.raw_headers.get("x-ratelimit-remaining")
if (
x_ratelimit_remaining
and bool(re.match(r"^-?\d+$", x_ratelimit_remaining))
and int(x_ratelimit_remaining) <= 0
):
raise RateLimitError()
issue = repo.create_issue(
title=title,
body=body,
assignee=assignee,
milestone=milestone,
labels=labels,
assignees=assignees,
)
return issue
@staticmethod
async def get_repos(access_token: Optional[str] = None, auth: Optional[Auth] = None) -> List[str]:
"""
Fetches a list of public repositories belonging to the authenticated user.
Args:
access_token (Optional[str], optional): The access token for authentication. Defaults to None.
Visit `https://github.com/settings/tokens` for obtaining a personal access token.
auth (Optional[Auth], optional): The authentication method. Defaults to None.
Visit `https://pygithub.readthedocs.io/en/latest/examples/Authentication.html` for more information.
Returns:
List[str]: A list of full names of the public repositories belonging to the user.
"""
auth = auth or Auth.Token(access_token)
git = Github(auth=auth)
user = git.get_user()
v = user.get_repos(visibility="public")
return [i.full_name for i in v]