CLI tools

This commit is contained in:
Cyber MacGeddon 2026-04-24 11:08:00 +01:00
parent b5821bca6d
commit 3bdb677607
13 changed files with 676 additions and 0 deletions

View file

@ -41,6 +41,17 @@ tg-get-kg-core = "trustgraph.cli.get_kg_core:main"
tg-get-document-content = "trustgraph.cli.get_document_content:main"
tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main"
tg-bootstrap-iam = "trustgraph.cli.bootstrap_iam:main"
tg-login = "trustgraph.cli.login:main"
tg-create-user = "trustgraph.cli.create_user:main"
tg-list-users = "trustgraph.cli.list_users:main"
tg-disable-user = "trustgraph.cli.disable_user:main"
tg-change-password = "trustgraph.cli.change_password:main"
tg-reset-password = "trustgraph.cli.reset_password:main"
tg-create-api-key = "trustgraph.cli.create_api_key:main"
tg-list-api-keys = "trustgraph.cli.list_api_keys:main"
tg-revoke-api-key = "trustgraph.cli.revoke_api_key:main"
tg-list-workspaces = "trustgraph.cli.list_workspaces:main"
tg-create-workspace = "trustgraph.cli.create_workspace:main"
tg-invoke-agent = "trustgraph.cli.invoke_agent:main"
tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main"
tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main"

View file

@ -0,0 +1,75 @@
"""
Shared helpers for IAM CLI tools.
All IAM operations go through the gateway's ``/api/v1/iam`` forwarder,
with the three public auth operations (``login``, ``bootstrap``,
``change-password``) served via ``/api/v1/auth/...`` instead. These
helpers encapsulate the HTTP plumbing so each CLI can stay focused
on its own argument parsing and output formatting.
"""
import json
import os
import sys
import requests
DEFAULT_URL = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/")
DEFAULT_TOKEN = os.getenv("TRUSTGRAPH_TOKEN", None)
def _fmt_error(resp_json):
err = resp_json.get("error", {})
if isinstance(err, dict):
t = err.get("type", "")
m = err.get("message", "")
return f"{t}: {m}" if t else m or "error"
return str(err)
def _post(url, path, token, body):
endpoint = url.rstrip("/") + path
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
resp = requests.post(
endpoint, headers=headers, data=json.dumps(body),
)
if resp.status_code != 200:
try:
payload = resp.json()
detail = _fmt_error(payload)
except Exception:
detail = resp.text
raise RuntimeError(f"HTTP {resp.status_code}: {detail}")
body = resp.json()
if "error" in body:
raise RuntimeError(_fmt_error(body))
return body
def call_iam(url, token, request):
"""Forward an IAM request through ``/api/v1/iam``. ``request`` is
the ``IamRequest`` dict shape."""
return _post(url, "/api/v1/iam", token, request)
def call_auth(url, path, token, body):
"""Hit one of the public auth endpoints
(``/api/v1/auth/login``, ``/api/v1/auth/change-password``, etc.).
``token`` is optional login and bootstrap don't need one."""
return _post(url, path, token, body)
def run_main(fn, parser):
"""Standard error-handling wrapper for CLI main() bodies."""
args = parser.parse_args()
try:
fn(args)
except Exception as e:
print("Exception:", e, file=sys.stderr, flush=True)
sys.exit(1)

View file

@ -0,0 +1,46 @@
"""
Change your own password. Requires the current password.
"""
import argparse
import getpass
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_auth, run_main
def do_change_password(args):
current = args.current or getpass.getpass("Current password: ")
new = args.new or getpass.getpass("New password: ")
call_auth(
args.api_url, "/api/v1/auth/change-password", args.token,
{"current_password": current, "new_password": new},
)
print("Password changed.")
def main():
parser = argparse.ArgumentParser(
prog="tg-change-password", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--current", default=None,
help="Current password (prompted if omitted)",
)
parser.add_argument(
"--new", default=None,
help="New password (prompted if omitted)",
)
run_main(do_change_password, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,64 @@
"""
Create an API key for a user. Prints the plaintext key to stdout
shown once only.
"""
import argparse
import sys
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_create_api_key(args):
key = {
"user_id": args.user_id,
"name": args.name,
}
if args.expires:
key["expires"] = args.expires
resp = call_iam(args.api_url, args.token, {
"operation": "create-api-key",
"key": key,
})
plaintext = resp.get("api_key_plaintext", "")
rec = resp.get("api_key", {})
print(f"Key id: {rec.get('id', '')}", file=sys.stderr)
print(f"Name: {rec.get('name', '')}", file=sys.stderr)
print(f"Prefix: {rec.get('prefix', '')}", file=sys.stderr)
print(
"API key (shown once, capture now):", file=sys.stderr,
)
print(plaintext)
def main():
parser = argparse.ArgumentParser(
prog="tg-create-api-key", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--user-id", required=True,
help="Owner user id",
)
parser.add_argument(
"--name", required=True,
help="Operator-facing label (e.g. 'laptop', 'ci')",
)
parser.add_argument(
"--expires", default=None,
help="ISO-8601 expiry (optional; empty = no expiry)",
)
run_main(do_create_api_key, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,78 @@
"""
Create a user in the caller's workspace. Prints the new user id.
"""
import argparse
import getpass
import sys
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_create_user(args):
password = args.password
if not password:
password = getpass.getpass(
f"Password for new user {args.username}: "
)
user = {
"username": args.username,
"password": password,
"roles": args.roles,
}
if args.name:
user["name"] = args.name
if args.email:
user["email"] = args.email
if args.must_change_password:
user["must_change_password"] = True
req = {"operation": "create-user", "user": user}
resp = call_iam(args.api_url, args.token, req)
rec = resp.get("user", {})
print(f"User id: {rec.get('id', '')}", file=sys.stderr)
print(f"Username: {rec.get('username', '')}", file=sys.stderr)
print(f"Roles: {', '.join(rec.get('roles', []))}", file=sys.stderr)
print(rec.get("id", ""))
def main():
parser = argparse.ArgumentParser(
prog="tg-create-user", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--username", required=True, help="Username (unique in workspace)",
)
parser.add_argument(
"--password", default=None,
help="Password (prompted if omitted)",
)
parser.add_argument(
"--name", default=None, help="Display name",
)
parser.add_argument(
"--email", default=None, help="Email",
)
parser.add_argument(
"--roles", nargs="+", default=["reader"],
help="One or more role names (default: reader)",
)
parser.add_argument(
"--must-change-password", action="store_true",
help="Force password change on next login",
)
run_main(do_create_user, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,46 @@
"""
Create a workspace (system-level; requires admin).
"""
import argparse
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_create_workspace(args):
ws = {"id": args.workspace_id, "enabled": True}
if args.name:
ws["name"] = args.name
resp = call_iam(args.api_url, args.token, {
"operation": "create-workspace",
"workspace_record": ws,
})
rec = resp.get("workspace", {})
print(f"Workspace created: {rec.get('id', '')}")
def main():
parser = argparse.ArgumentParser(
prog="tg-create-workspace", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--workspace-id", required=True,
help="New workspace id (must not start with '_')",
)
parser.add_argument(
"--name", default=None, help="Display name",
)
run_main(do_create_workspace, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,38 @@
"""
Disable a user. Soft-deletes (enabled=false) and revokes all their
API keys.
"""
import argparse
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_disable_user(args):
call_iam(args.api_url, args.token, {
"operation": "disable-user",
"user_id": args.user_id,
})
print(f"Disabled user {args.user_id}")
def main():
parser = argparse.ArgumentParser(
prog="tg-disable-user", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--user-id", required=True, help="User id to disable",
)
run_main(do_disable_user, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,62 @@
"""
List the API keys for a user.
"""
import argparse
import tabulate
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_list_api_keys(args):
resp = call_iam(args.api_url, args.token, {
"operation": "list-api-keys",
"user_id": args.user_id,
})
keys = resp.get("api_keys", [])
if not keys:
print("No keys.")
return
rows = [
[
k.get("id", ""),
k.get("name", ""),
k.get("prefix", ""),
k.get("created", ""),
k.get("last_used", "") or "",
k.get("expires", "") or "never",
]
for k in keys
]
print(tabulate.tabulate(
rows,
headers=["id", "name", "prefix", "created", "last used", "expires"],
tablefmt="pretty",
stralign="left",
))
def main():
parser = argparse.ArgumentParser(
prog="tg-list-api-keys", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--user-id", required=True,
help="Owner user id",
)
run_main(do_list_api_keys, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,57 @@
"""
List users in the caller's workspace.
"""
import argparse
import tabulate
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_list_users(args):
resp = call_iam(
args.api_url, args.token, {"operation": "list-users"},
)
users = resp.get("users", [])
if not users:
print("No users.")
return
rows = [
[
u.get("id", ""),
u.get("username", ""),
u.get("name", ""),
", ".join(u.get("roles", [])),
"yes" if u.get("enabled") else "no",
"yes" if u.get("must_change_password") else "no",
]
for u in users
]
print(tabulate.tabulate(
rows,
headers=["id", "username", "name", "roles", "enabled", "change-pw"],
tablefmt="pretty",
stralign="left",
))
def main():
parser = argparse.ArgumentParser(
prog="tg-list-users", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
run_main(do_list_users, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,53 @@
"""
List workspaces (system-level; requires admin).
"""
import argparse
import tabulate
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_list_workspaces(args):
resp = call_iam(
args.api_url, args.token, {"operation": "list-workspaces"},
)
workspaces = resp.get("workspaces", [])
if not workspaces:
print("No workspaces.")
return
rows = [
[
w.get("id", ""),
w.get("name", ""),
"yes" if w.get("enabled") else "no",
w.get("created", ""),
]
for w in workspaces
]
print(tabulate.tabulate(
rows,
headers=["id", "name", "enabled", "created"],
tablefmt="pretty",
stralign="left",
))
def main():
parser = argparse.ArgumentParser(
prog="tg-list-workspaces", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
run_main(do_list_workspaces, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,62 @@
"""
Log in with username / password. Prints the resulting JWT to
stdout so it can be captured for subsequent CLI use.
"""
import argparse
import getpass
import sys
from ._iam import DEFAULT_URL, call_auth, run_main
def do_login(args):
password = args.password
if not password:
password = getpass.getpass(f"Password for {args.username}: ")
body = {
"username": args.username,
"password": password,
}
if args.workspace:
body["workspace"] = args.workspace
resp = call_auth(args.api_url, "/api/v1/auth/login", None, body)
jwt = resp.get("jwt", "")
expires = resp.get("jwt_expires", "")
if expires:
print(f"JWT expires: {expires}", file=sys.stderr)
# Machine-readable on stdout.
print(jwt)
def main():
parser = argparse.ArgumentParser(
prog="tg-login", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"--username", required=True, help="Username",
)
parser.add_argument(
"--password", default=None,
help="Password (prompted if omitted)",
)
parser.add_argument(
"-w", "--workspace", default=None,
help=(
"Optional workspace to log in against. Defaults to "
"the user's assigned workspace."
),
)
run_main(do_login, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,47 @@
"""
Admin: reset another user's password. Prints a one-time temporary
password to stdout. The user is forced to change it on next login.
"""
import argparse
import sys
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_reset_password(args):
resp = call_iam(args.api_url, args.token, {
"operation": "reset-password",
"user_id": args.user_id,
})
tmp = resp.get("temporary_password", "")
if not tmp:
raise RuntimeError(
"IAM returned no temporary password — unexpected"
)
print("Temporary password (shown once, capture now):", file=sys.stderr)
print(tmp)
def main():
parser = argparse.ArgumentParser(
prog="tg-reset-password", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--user-id", required=True,
help="Target user id",
)
run_main(do_reset_password, parser)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,37 @@
"""
Revoke an API key by id.
"""
import argparse
from ._iam import DEFAULT_URL, DEFAULT_TOKEN, call_iam, run_main
def do_revoke_api_key(args):
call_iam(args.api_url, args.token, {
"operation": "revoke-api-key",
"key_id": args.key_id,
})
print(f"Revoked key {args.key_id}")
def main():
parser = argparse.ArgumentParser(
prog="tg-revoke-api-key", description=__doc__,
)
parser.add_argument(
"-u", "--api-url", default=DEFAULT_URL,
help=f"API URL (default: {DEFAULT_URL})",
)
parser.add_argument(
"-t", "--token", default=DEFAULT_TOKEN,
help="Auth token (default: $TRUSTGRAPH_TOKEN)",
)
parser.add_argument(
"--key-id", required=True, help="Key id to revoke",
)
run_main(do_revoke_api_key, parser)
if __name__ == "__main__":
main()