Merge remote-tracking branch 'origin/main' into stoff81/chatgpt5.5

This commit is contained in:
Spherrrical 2026-06-05 11:31:10 -07:00
commit 1538349367
39 changed files with 325 additions and 635 deletions

View file

@ -1,3 +1,3 @@
"""Plano CLI - Intelligent Prompt Gateway."""
__version__ = "0.4.22"
__version__ = "0.4.23"

View file

@ -39,6 +39,42 @@ CHATGPT_API_BASE = "https://chatgpt.com/backend-api/codex"
CHATGPT_DEFAULT_ORIGINATOR = "codex_cli_rs"
CHATGPT_DEFAULT_USER_AGENT = "codex_cli_rs/0.0.0 (Unknown 0; unknown) unknown"
KIMI_CODE_API_HOST = "api.kimi.com"
KIMI_CODE_DEFAULT_USER_AGENT = "KimiCLI/1.3"
def normalize_kimi_code_base_url(base_url: str) -> str:
"""Ensure Kimi Code API base URLs include the /v1 suffix."""
parsed = urlparse(base_url)
if parsed.hostname != KIMI_CODE_API_HOST:
return base_url
path = parsed.path.rstrip("/")
if path.endswith("/coding"):
return f"{parsed.scheme}://{parsed.netloc}{path}/v1"
return base_url
def apply_kimi_code_provider_defaults(model_provider: dict) -> None:
"""Inject Kimi Code API defaults (User-Agent, normalized base URL)."""
base_url = model_provider.get("base_url")
if not base_url:
return
parsed = urlparse(base_url)
model_id = model_provider.get("model", "")
is_kimi_code = (
parsed.hostname == KIMI_CODE_API_HOST or model_id == "kimi-for-coding"
)
if not is_kimi_code:
return
normalized = normalize_kimi_code_base_url(base_url)
if normalized != base_url:
model_provider["base_url"] = normalized
headers = model_provider.setdefault("headers", {})
headers.setdefault("User-Agent", KIMI_CODE_DEFAULT_USER_AGENT)
SUPPORTED_PROVIDERS = (
SUPPORTED_PROVIDERS_WITHOUT_BASE_URL + SUPPORTED_PROVIDERS_WITH_BASE_URL
)
@ -463,6 +499,8 @@ def validate_and_render_schema():
headers.setdefault("session_id", str(uuid.uuid4()))
model_provider["headers"] = headers
apply_kimi_code_provider_defaults(model_provider)
updated_model_providers.append(model_provider)
if model_provider.get("base_url", None):

View file

@ -5,7 +5,7 @@ PLANO_COLOR = "#969FF4"
SERVICE_NAME_ARCHGW = "plano"
PLANO_DOCKER_NAME = "plano"
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.22")
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.23")
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317"
# Native mode constants

View file

@ -7,7 +7,6 @@ import contextlib
import logging
import rich_click as click
import yaml
from planoai import targets
from planoai.defaults import (
DEFAULT_LLM_LISTENER_PORT,
detect_providers,
@ -624,28 +623,6 @@ def down(docker, verbose):
)
@click.command()
@click.option(
"--f",
"--file",
type=click.Path(exists=True),
required=True,
help="Path to the Python file",
)
def generate_prompt_targets(file):
"""Generats prompt_targets from python methods.
Note: This works for simple data types like ['int', 'float', 'bool', 'str', 'list', 'tuple', 'set', 'dict']:
If you have a complex pydantic data type, you will have to flatten those manually until we add support for it.
"""
print(f"Processing file: {file}")
if not file.endswith(".py"):
print("Error: Input file must be a .py file")
sys.exit(1)
targets.generate_prompt_targets(file)
@click.command()
@click.option(
"--debug",
@ -743,7 +720,6 @@ main.add_command(down)
main.add_command(build)
main.add_command(logs)
main.add_command(cli_agent)
main.add_command(generate_prompt_targets)
main.add_command(init_cmd, name="init")
main.add_command(trace_cmd, name="trace")
main.add_command(chatgpt_cmd, name="chatgpt")

View file

@ -63,9 +63,5 @@ def configure_rich_click(plano_color: str) -> None:
"name": "Observability",
"commands": ["trace", "obs"],
},
{
"name": "Utilities",
"commands": ["generate-prompt-targets"],
},
],
}

View file

@ -1,365 +0,0 @@
import ast
import sys
import yaml
from typing import Any
FLASK_ROUTE_DECORATORS = ["route", "get", "post", "put", "delete", "patch"]
FASTAPI_ROUTE_DECORATORS = ["get", "post", "put", "delete", "patch"]
def detect_framework(tree: Any) -> str:
"""Detect whether the file is using Flask or FastAPI based on imports."""
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module == "flask":
return "flask"
elif node.module == "fastapi":
return "fastapi"
return "unknown"
def get_route_decorators(node: Any, framework: str) -> list:
"""Extract route decorators based on the framework."""
decorators = []
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call) and isinstance(
decorator.func, ast.Attribute
):
if framework == "flask" and decorator.func.attr in FLASK_ROUTE_DECORATORS:
decorators.append(decorator.func.attr)
elif (
framework == "fastapi"
and decorator.func.attr in FASTAPI_ROUTE_DECORATORS
):
decorators.append(decorator.func.attr)
return decorators
def get_route_path(node: Any, framework: str) -> str:
"""Extract route path based on the framework."""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call) and decorator.args:
return decorator.args[0].s # Assuming it's a string literal
def is_pydantic_model(annotation: ast.expr, tree: ast.AST) -> bool:
"""Check if a given type annotation is a Pydantic model."""
# We walk through the AST to find class definitions and check if they inherit from Pydantic's BaseModel
if isinstance(annotation, ast.Name):
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == annotation.id:
for base in node.bases:
if isinstance(base, ast.Name) and base.id == "BaseModel":
return True
return False
def get_pydantic_model_fields(model_name: str, tree: ast.AST) -> list:
"""Extract fields from a Pydantic model, handling list, tuple, set, dict types, and direct default values."""
fields = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef) and node.name == model_name:
for stmt in node.body:
if isinstance(stmt, ast.AnnAssign):
# Initialize the default field description
field_type = "Unknown: Please Fix This!"
description = "Field, description not present. Please fix."
default_value = None
required = True # Assume the field is required initially
# Check if the field uses Field() with required status and description
if (
stmt.value
and isinstance(stmt.value, ast.Call)
and isinstance(stmt.value.func, ast.Name)
and stmt.value.func.id == "Field"
):
# Extract the description argument inside the Field call
for keyword in stmt.value.keywords:
if keyword.arg == "description" and isinstance(
keyword.value, ast.Str
):
description = keyword.value.s
if keyword.arg == "default":
default_value = keyword.value
# If Ellipsis (...) is used, it means the field is required
if (
stmt.value.args
and isinstance(stmt.value.args[0], ast.Constant)
and stmt.value.args[0].value is Ellipsis
):
required = True
else:
required = False
# Handle direct default values (e.g., name: str = "John Doe")
elif stmt.value is not None:
if isinstance(stmt.value, ast.Constant):
# Set the default value from the assignment (e.g., name: str = "John Doe")
default_value = stmt.value.value
required = (
False # Not required since it has a default value
)
# Always extract the field type, even if there's a default value
if isinstance(stmt.annotation, ast.Subscript):
# Get the base type (list, tuple, set, dict)
base_type = (
stmt.annotation.value.id
if isinstance(stmt.annotation.value, ast.Name)
else "Unknown"
)
# Handle only list, tuple, set, dict and ignore the inner types
if base_type.lower() in ["list", "tuple", "set", "dict"]:
field_type = base_type.lower()
# Handle the ellipsis '...' for required fields if no Field() call
elif (
isinstance(stmt.value, ast.Constant)
and stmt.value.value is Ellipsis
):
required = True
# Handle simple types like str, int, etc.
if isinstance(stmt.annotation, ast.Name):
field_type = stmt.annotation.id
field_info = {
"name": stmt.target.id,
"type": field_type, # Always set the field type
"description": description,
"default": default_value, # Handle direct default values
"required": required,
}
fields.append(field_info)
return fields
def get_function_parameters(node: ast.FunctionDef, tree: ast.AST) -> list:
"""Extract the parameters and their types from the function definition."""
parameters = []
# Extract docstring to find descriptions
docstring = ast.get_docstring(node)
arg_descriptions = extract_arg_descriptions_from_docstring(docstring)
# Extract default values
defaults = [None] * (
len(node.args.args) - len(node.args.defaults)
) + node.args.defaults # Align defaults with args
for arg, default in zip(node.args.args, defaults):
if arg.arg != "self": # Skip 'self' or 'cls' in class methods
param_info = {
"name": arg.arg,
"description": arg_descriptions.get(arg.arg, "[ADD DESCRIPTION]"),
}
# Handle Pydantic model types
if hasattr(arg, "annotation") and is_pydantic_model(arg.annotation, tree):
# Extract and flatten Pydantic model fields
pydantic_fields = get_pydantic_model_fields(arg.annotation.id, tree)
parameters.extend(
pydantic_fields
) # Flatten the model fields into the parameters list
continue # Skip adding the current param_info for the model since we expand the fields
# Handle standard Python types (int, float, str, etc.)
elif hasattr(arg, "annotation") and isinstance(arg.annotation, ast.Name):
if arg.annotation.id in [
"int",
"float",
"bool",
"str",
"list",
"tuple",
"set",
"dict",
]:
param_info["type"] = arg.annotation.id
else:
param_info["type"] = "[UNKNOWN - PLEASE FIX]"
# Handle generic subscript types (e.g., Optional, List[Type], etc.)
elif hasattr(arg, "annotation") and isinstance(
arg.annotation, ast.Subscript
):
if isinstance(
arg.annotation.value, ast.Name
) and arg.annotation.value.id in ["list", "tuple", "set", "dict"]:
param_info["type"] = (
f"{arg.annotation.value.id}" # e.g., "List", "Tuple", etc.
)
else:
param_info["type"] = "[UNKNOWN - PLEASE FIX]"
# Default for unknown types
else:
param_info["type"] = (
"[UNKNOWN - PLEASE FIX]" # If unable to detect type
)
# Handle default values
if default is not None:
if isinstance(default, ast.Constant) or isinstance(
default, ast.NameConstant
):
param_info["default"] = (
default.value
) # Use the default value directly
else:
param_info["default"] = "[UNKNOWN DEFAULT]" # Unknown default type
param_info["required"] = False # Optional since it has a default value
else:
param_info["default"] = None
param_info["required"] = True # Required if no default value
parameters.append(param_info)
return parameters
def get_function_docstring(node: Any) -> str:
"""Extract the function's docstring description if present."""
# Check if the first node is a docstring
if isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Str):
# Get the entire docstring
full_docstring = node.body[0].value.s.strip()
# Split the docstring by double newlines (to separate description from fields like Args)
description = full_docstring.split("\n\n")[0].strip()
return description
return "No description provided."
def extract_arg_descriptions_from_docstring(docstring: str) -> dict:
"""Extract descriptions for function parameters from the 'Args' section of the docstring."""
descriptions = {}
if not docstring:
return descriptions
in_args_section = False
current_param = None
for line in docstring.splitlines():
line = line.strip()
# Detect the start of the 'Args' section
if line.startswith("Args:"):
in_args_section = True
continue # Proceed to the next line after 'Args:'
# End of 'Args' section if no indentation and no colon
if in_args_section and not line.startswith(" ") and ":" not in line:
break # Stop processing if we reach a new section
# Process lines in the 'Args' section
if in_args_section:
if ":" in line:
# Extract parameter name and description
param_name, description = line.split(":", 1)
descriptions[param_name.strip()] = description.strip()
current_param = param_name.strip()
elif current_param and line.startswith(" "):
# Handle multiline descriptions (indented lines)
descriptions[current_param] += f" {line.strip()}"
return descriptions
def generate_prompt_targets(input_file_path: str) -> None:
"""Introspect routes and generate YAML for either Flask or FastAPI."""
with open(input_file_path, "r") as source:
tree = ast.parse(source.read())
# Detect the framework (Flask or FastAPI)
framework = detect_framework(tree)
if framework == "unknown":
print("Could not detect Flask or FastAPI in the file.")
return
# Extract routes
routes = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
route_decorators = get_route_decorators(node, framework)
if route_decorators:
route_path = get_route_path(node, framework)
function_params = get_function_parameters(
node, tree
) # Get parameters for the route
function_docstring = get_function_docstring(node) # Extract docstring
routes.append(
{
"name": node.name,
"path": route_path,
"methods": route_decorators,
"parameters": function_params, # Add parameters to the route
"description": function_docstring, # Add the docstring as the description
}
)
# Generate YAML structure
output_structure = {"prompt_targets": []}
for route in routes:
target = {
"name": route["name"],
"endpoint": [
{
"name": "app_server",
"path": route["path"],
}
],
"description": route["description"], # Use extracted docstring
"parameters": [
{
"name": param["name"],
"type": param["type"],
"description": f"{param['description']}",
**(
{"default": param["default"]}
if "default" in param and param["default"] is not None
else {}
), # Only add default if it's set
"required": param["required"],
}
for param in route["parameters"]
],
}
if route["name"] == "default":
# Special case for `information_extraction` based on your YAML format
target["type"] = "default"
target["auto-llm-dispatch-on-response"] = True
output_structure["prompt_targets"].append(target)
# Output as YAML
print(
yaml.dump(output_structure, sort_keys=False, default_flow_style=False, indent=3)
)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python targets.py <input_file>")
sys.exit(1)
input_file = sys.argv[1]
# Automatically generate the output file name
if input_file.endswith(".py"):
output_file = input_file.replace(".py", "_prompt_targets.yml")
else:
print("Error: Input file must be a .py file")
sys.exit(1)
# Call the function with the input and generated output file names
generate_prompt_targets(input_file, output_file)
# Example usage:
# python targets.py api.yaml

View file

@ -1,6 +1,6 @@
[project]
name = "planoai"
version = "0.4.22"
version = "0.4.23"
description = "Python-based CLI tool to manage Plano."
authors = [{name = "Katanemo Labs, Inc."}]
readme = "README.md"

View file

@ -3,8 +3,10 @@ import pytest
import yaml
from unittest import mock
from planoai.config_generator import (
validate_and_render_schema,
apply_kimi_code_provider_defaults,
migrate_inline_routing_preferences,
normalize_kimi_code_base_url,
validate_and_render_schema,
)
@ -795,3 +797,29 @@ model_providers:
migrate_inline_routing_preferences(config_yaml)
assert config_yaml["version"] == "v0.5.0"
def test_normalize_kimi_code_base_url_appends_v1_suffix():
assert (
normalize_kimi_code_base_url("https://api.kimi.com/coding")
== "https://api.kimi.com/coding/v1"
)
assert (
normalize_kimi_code_base_url("https://api.kimi.com/coding/")
== "https://api.kimi.com/coding/v1"
)
assert (
normalize_kimi_code_base_url("https://api.kimi.com/coding/v1")
== "https://api.kimi.com/coding/v1"
)
def test_apply_kimi_code_provider_defaults_injects_user_agent():
provider = {
"model": "kimi-for-coding",
"base_url": "https://api.kimi.com/coding",
"access_key": "$MOONSHOTAI_API_KEY",
}
apply_kimi_code_provider_defaults(provider)
assert provider["base_url"] == "https://api.kimi.com/coding/v1"
assert provider["headers"]["User-Agent"] == "KimiCLI/1.3"

2
cli/uv.lock generated
View file

@ -337,7 +337,7 @@ wheels = [
[[package]]
name = "planoai"
version = "0.4.22"
version = "0.4.23"
source = { editable = "." }
dependencies = [
{ name = "click" },