Add first-class ChatGPT subscription provider support

This commit is contained in:
Spherrrical 2026-04-10 13:29:20 -07:00
parent 128059e7c1
commit bbe9946207
16 changed files with 637 additions and 7 deletions

289
cli/planoai/chatgpt_auth.py Normal file
View file

@ -0,0 +1,289 @@
"""
ChatGPT subscription OAuth device-flow authentication.
Implements the device code flow used by OpenAI Codex CLI to authenticate
with a ChatGPT Plus/Pro subscription. Tokens are stored locally in
~/.plano/chatgpt/auth.json and auto-refreshed when expired.
"""
import base64
import json
import os
import time
from typing import Any, Dict, Optional, Tuple
import requests
from planoai.consts import PLANO_HOME
# OAuth + API constants (derived from openai/codex)
CHATGPT_AUTH_BASE = "https://auth.openai.com"
CHATGPT_DEVICE_CODE_URL = f"{CHATGPT_AUTH_BASE}/api/accounts/deviceauth/usercode"
CHATGPT_DEVICE_TOKEN_URL = f"{CHATGPT_AUTH_BASE}/api/accounts/deviceauth/token"
CHATGPT_OAUTH_TOKEN_URL = f"{CHATGPT_AUTH_BASE}/oauth/token"
CHATGPT_DEVICE_VERIFY_URL = f"{CHATGPT_AUTH_BASE}/codex/device"
CHATGPT_API_BASE = "https://chatgpt.com/backend-api/codex"
CHATGPT_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
# Local storage
CHATGPT_AUTH_DIR = os.path.join(PLANO_HOME, "chatgpt")
CHATGPT_AUTH_FILE = os.path.join(CHATGPT_AUTH_DIR, "auth.json")
# Timeouts
TOKEN_EXPIRY_SKEW_SECONDS = 60
DEVICE_CODE_TIMEOUT_SECONDS = 15 * 60
DEVICE_CODE_POLL_SECONDS = 5
def _ensure_auth_dir():
os.makedirs(CHATGPT_AUTH_DIR, exist_ok=True)
def load_auth() -> Optional[Dict[str, Any]]:
"""Load auth data from disk."""
try:
with open(CHATGPT_AUTH_FILE, "r") as f:
return json.load(f)
except (IOError, json.JSONDecodeError):
return None
def save_auth(data: Dict[str, Any]):
"""Save auth data to disk."""
_ensure_auth_dir()
with open(CHATGPT_AUTH_FILE, "w") as f:
json.dump(data, f, indent=2)
def delete_auth():
"""Remove stored credentials."""
try:
os.remove(CHATGPT_AUTH_FILE)
except FileNotFoundError:
pass
def _decode_jwt_claims(token: str) -> Dict[str, Any]:
"""Decode JWT payload without verification."""
try:
parts = token.split(".")
if len(parts) < 2:
return {}
payload_b64 = parts[1]
payload_b64 += "=" * (-len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64).decode("utf-8"))
except Exception:
return {}
def _get_expires_at(token: str) -> Optional[int]:
"""Extract expiration time from JWT."""
claims = _decode_jwt_claims(token)
exp = claims.get("exp")
return int(exp) if isinstance(exp, (int, float)) else None
def _extract_account_id(token: Optional[str]) -> Optional[str]:
"""Extract ChatGPT account ID from JWT claims."""
if not token:
return None
claims = _decode_jwt_claims(token)
auth_claims = claims.get("https://api.openai.com/auth")
if isinstance(auth_claims, dict):
account_id = auth_claims.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id:
return account_id
return None
def _is_token_expired(auth_data: Dict[str, Any]) -> bool:
"""Check if the access token is expired."""
expires_at = auth_data.get("expires_at")
if expires_at is None:
access_token = auth_data.get("access_token")
if access_token:
expires_at = _get_expires_at(access_token)
if expires_at:
auth_data["expires_at"] = expires_at
save_auth(auth_data)
if expires_at is None:
return True
return time.time() >= float(expires_at) - TOKEN_EXPIRY_SKEW_SECONDS
def _refresh_tokens(refresh_token: str) -> Dict[str, str]:
"""Refresh the access token using the refresh token."""
resp = requests.post(
CHATGPT_OAUTH_TOKEN_URL,
json={
"client_id": CHATGPT_CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile email",
},
)
resp.raise_for_status()
data = resp.json()
access_token = data.get("access_token")
id_token = data.get("id_token")
if not access_token or not id_token:
raise RuntimeError(f"Refresh response missing fields: {data}")
return {
"access_token": access_token,
"refresh_token": data.get("refresh_token", refresh_token),
"id_token": id_token,
}
def _build_auth_record(tokens: Dict[str, str]) -> Dict[str, Any]:
"""Build the auth record to persist."""
access_token = tokens.get("access_token")
id_token = tokens.get("id_token")
expires_at = _get_expires_at(access_token) if access_token else None
account_id = _extract_account_id(id_token or access_token)
return {
"access_token": access_token,
"refresh_token": tokens.get("refresh_token"),
"id_token": id_token,
"expires_at": expires_at,
"account_id": account_id,
}
def request_device_code() -> Dict[str, str]:
"""Request a device code from OpenAI's device auth endpoint."""
resp = requests.post(
CHATGPT_DEVICE_CODE_URL,
json={"client_id": CHATGPT_CLIENT_ID},
)
resp.raise_for_status()
data = resp.json()
device_auth_id = data.get("device_auth_id")
user_code = data.get("user_code") or data.get("usercode")
interval = data.get("interval")
if not device_auth_id or not user_code:
raise RuntimeError(f"Device code response missing fields: {data}")
return {
"device_auth_id": device_auth_id,
"user_code": user_code,
"interval": str(interval or "5"),
}
def poll_for_authorization(device_code: Dict[str, str]) -> Dict[str, str]:
"""Poll until the user completes authorization. Returns code_data."""
interval = int(device_code.get("interval", "5"))
start_time = time.time()
while time.time() - start_time < DEVICE_CODE_TIMEOUT_SECONDS:
try:
resp = requests.post(
CHATGPT_DEVICE_TOKEN_URL,
json={
"device_auth_id": device_code["device_auth_id"],
"user_code": device_code["user_code"],
},
)
if resp.status_code == 200:
data = resp.json()
if all(
key in data
for key in ("authorization_code", "code_challenge", "code_verifier")
):
return data
if resp.status_code in (403, 404):
time.sleep(max(interval, DEVICE_CODE_POLL_SECONDS))
continue
resp.raise_for_status()
except requests.HTTPError as exc:
if exc.response is not None and exc.response.status_code in (403, 404):
time.sleep(max(interval, DEVICE_CODE_POLL_SECONDS))
continue
raise RuntimeError(f"Polling failed: {exc}") from exc
time.sleep(max(interval, DEVICE_CODE_POLL_SECONDS))
raise RuntimeError("Timed out waiting for device authorization")
def exchange_code_for_tokens(code_data: Dict[str, str]) -> Dict[str, str]:
"""Exchange the authorization code for access/refresh/id tokens."""
redirect_uri = f"{CHATGPT_AUTH_BASE}/deviceauth/callback"
body = (
"grant_type=authorization_code"
f"&code={code_data['authorization_code']}"
f"&redirect_uri={redirect_uri}"
f"&client_id={CHATGPT_CLIENT_ID}"
f"&code_verifier={code_data['code_verifier']}"
)
resp = requests.post(
CHATGPT_OAUTH_TOKEN_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
)
resp.raise_for_status()
data = resp.json()
if not all(key in data for key in ("access_token", "refresh_token", "id_token")):
raise RuntimeError(f"Token exchange response missing fields: {data}")
return {
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"id_token": data["id_token"],
}
def login() -> Dict[str, Any]:
"""Run the full device code login flow. Returns the auth record."""
device_code = request_device_code()
auth_record = _build_auth_record({})
auth_record["device_code_requested_at"] = time.time()
save_auth(auth_record)
print(
"\nSign in with your ChatGPT account:\n"
f" 1) Visit: {CHATGPT_DEVICE_VERIFY_URL}\n"
f" 2) Enter code: {device_code['user_code']}\n\n"
"Device codes are a common phishing target. Never share this code.\n",
flush=True,
)
code_data = poll_for_authorization(device_code)
tokens = exchange_code_for_tokens(code_data)
auth_record = _build_auth_record(tokens)
save_auth(auth_record)
return auth_record
def get_access_token() -> Tuple[str, Optional[str]]:
"""
Get a valid access token and account ID.
Refreshes automatically if expired. Raises if no auth data exists.
Returns (access_token, account_id).
"""
auth_data = load_auth()
if not auth_data:
raise RuntimeError(
"No ChatGPT credentials found. Run 'planoai chatgpt login' first."
)
access_token = auth_data.get("access_token")
if access_token and not _is_token_expired(auth_data):
return access_token, auth_data.get("account_id")
# Try refresh
refresh_token = auth_data.get("refresh_token")
if refresh_token:
tokens = _refresh_tokens(refresh_token)
auth_record = _build_auth_record(tokens)
save_auth(auth_record)
return auth_record["access_token"], auth_record.get("account_id")
raise RuntimeError(
"ChatGPT token expired and refresh failed. Run 'planoai chatgpt login' again."
)

View file

@ -0,0 +1,86 @@
"""
CLI commands for ChatGPT subscription management.
Usage:
planoai chatgpt login - Authenticate with ChatGPT via device code flow
planoai chatgpt status - Check authentication status
planoai chatgpt logout - Remove stored credentials
"""
import datetime
import click
from rich.console import Console
from planoai import chatgpt_auth
console = Console()
@click.group()
def chatgpt():
"""ChatGPT subscription management."""
pass
@chatgpt.command()
def login():
"""Authenticate with your ChatGPT subscription using device code flow."""
try:
auth_record = chatgpt_auth.login()
account_id = auth_record.get("account_id", "unknown")
console.print(
f"\n[green]Successfully authenticated with ChatGPT![/green]"
f"\nAccount ID: {account_id}"
f"\nCredentials saved to: {chatgpt_auth.CHATGPT_AUTH_FILE}"
)
except Exception as e:
console.print(f"\n[red]Authentication failed:[/red] {e}")
raise SystemExit(1)
@chatgpt.command()
def status():
"""Check ChatGPT authentication status."""
auth_data = chatgpt_auth.load_auth()
if not auth_data or not auth_data.get("access_token"):
console.print(
"[yellow]Not authenticated.[/yellow] Run 'planoai chatgpt login'."
)
return
account_id = auth_data.get("account_id", "unknown")
expires_at = auth_data.get("expires_at")
if expires_at:
expiry_time = datetime.datetime.fromtimestamp(
expires_at, tz=datetime.timezone.utc
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if expiry_time > now:
remaining = expiry_time - now
console.print(
f"[green]Authenticated[/green]"
f"\n Account ID: {account_id}"
f"\n Token expires: {expiry_time.strftime('%Y-%m-%d %H:%M:%S UTC')}"
f" ({remaining.seconds // 60}m remaining)"
)
else:
console.print(
f"[yellow]Token expired[/yellow]"
f"\n Account ID: {account_id}"
f"\n Expired at: {expiry_time.strftime('%Y-%m-%d %H:%M:%S UTC')}"
f"\n Will auto-refresh on next use, or run 'planoai chatgpt login'."
)
else:
console.print(
f"[green]Authenticated[/green] (no expiry info)"
f"\n Account ID: {account_id}"
)
@chatgpt.command()
def logout():
"""Remove stored ChatGPT credentials."""
chatgpt_auth.delete_auth()
console.print("[green]ChatGPT credentials removed.[/green]")

View file

@ -28,8 +28,13 @@ SUPPORTED_PROVIDERS_WITHOUT_BASE_URL = [
"xai",
"moonshotai",
"zhipu",
"chatgpt",
]
CHATGPT_API_BASE = "https://chatgpt.com/backend-api/codex"
CHATGPT_DEFAULT_ORIGINATOR = "codex_cli_rs"
CHATGPT_DEFAULT_USER_AGENT = "codex_cli_rs/0.0.0 (Unknown 0; unknown) unknown"
SUPPORTED_PROVIDERS = (
SUPPORTED_PROVIDERS_WITHOUT_BASE_URL + SUPPORTED_PROVIDERS_WITH_BASE_URL
)
@ -331,6 +336,25 @@ def validate_and_render_schema():
provider = model_provider["provider"]
model_provider["provider_interface"] = provider
del model_provider["provider"]
# Auto-wire ChatGPT provider: inject base_url, access_key, and extra headers
if provider == "chatgpt":
if not model_provider.get("base_url"):
model_provider["base_url"] = CHATGPT_API_BASE
if not model_provider.get("access_key"):
model_provider["access_key"] = "$CHATGPT_ACCESS_TOKEN"
import uuid
headers = model_provider.get("headers", {})
headers.setdefault(
"ChatGPT-Account-Id",
os.environ.get("CHATGPT_ACCOUNT_ID", ""),
)
headers.setdefault("originator", CHATGPT_DEFAULT_ORIGINATOR)
headers.setdefault("user-agent", CHATGPT_DEFAULT_USER_AGENT)
headers.setdefault("session_id", str(uuid.uuid4()))
model_provider["headers"] = headers
updated_model_providers.append(model_provider)
if model_provider.get("base_url", None):

View file

@ -31,6 +31,7 @@ from planoai.core import (
)
from planoai.init_cmd import init as init_cmd
from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_background
from planoai.chatgpt_cmd import chatgpt as chatgpt_cmd
from planoai.consts import (
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT,
@ -118,6 +119,33 @@ def _temporary_cli_log_level(level: str | None):
set_log_level(current_level)
def _inject_chatgpt_tokens_if_needed(plano_config_file, env, console):
"""If config uses chatgpt providers, resolve tokens from ~/.plano/chatgpt/auth.json."""
import yaml
with open(plano_config_file, "r") as f:
config = yaml.safe_load(f)
providers = config.get("model_providers") or config.get("llm_providers") or []
has_chatgpt = any(str(p.get("model", "")).startswith("chatgpt/") for p in providers)
if not has_chatgpt:
return
try:
from planoai.chatgpt_auth import get_access_token
access_token, account_id = get_access_token()
env["CHATGPT_ACCESS_TOKEN"] = access_token
if account_id:
env["CHATGPT_ACCOUNT_ID"] = account_id
except Exception as e:
console.print(
f"\n[red]ChatGPT auth error:[/red] {e}\n"
f"[dim]Run 'planoai chatgpt login' to authenticate.[/dim]\n"
)
sys.exit(1)
def _print_missing_keys(console, missing_keys: list[str]) -> None:
console.print(f"\n[red]✗[/red] [red]Missing API keys![/red]\n")
for key in missing_keys:
@ -384,6 +412,9 @@ def up(file, path, foreground, with_tracing, tracing_port, docker, verbose):
env = os.environ.copy()
env.pop("PATH", None)
# Inject ChatGPT tokens from ~/.plano/chatgpt/auth.json if any provider needs them
_inject_chatgpt_tokens_if_needed(plano_config_file, env, console)
# Check access keys
access_keys = get_llm_provider_access_keys(plano_config_file=plano_config_file)
access_keys = set(access_keys)
@ -681,6 +712,7 @@ main.add_command(cli_agent)
main.add_command(generate_prompt_targets)
main.add_command(init_cmd, name="init")
main.add_command(trace_cmd, name="trace")
main.add_command(chatgpt_cmd, name="chatgpt")
if __name__ == "__main__":
main()