mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-26 00:46:22 +02:00
* Plural/singular confusion in config key * Flow class vs flow blueprint nomenclature change * Update docs & CLI to reflect the above
492 lines
14 KiB
Python
492 lines
14 KiB
Python
"""
|
|
Verifies TrustGraph system health by running comprehensive checks.
|
|
|
|
This utility monitors system startup and health by checking:
|
|
1. Infrastructure (Pulsar, API Gateway)
|
|
2. Core services (processors, flows, prompts)
|
|
3. Data services (library)
|
|
4. UI (workbench)
|
|
|
|
Includes intelligent retry logic with configurable timeouts.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
import requests
|
|
from typing import Tuple, Optional
|
|
|
|
# Import existing CLI functions to reuse logic
|
|
from trustgraph.api import Api
|
|
|
|
default_pulsar_url = "http://localhost:8080"
|
|
default_api_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/")
|
|
default_ui_url = "http://localhost:8888"
|
|
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
|
|
|
|
|
|
class HealthChecker:
|
|
"""Manages health check execution with retry logic and timeouts."""
|
|
|
|
def __init__(
|
|
self,
|
|
global_timeout: int = 120,
|
|
check_timeout: int = 10,
|
|
retry_delay: int = 3,
|
|
verbose: bool = False
|
|
):
|
|
self.global_timeout = global_timeout
|
|
self.check_timeout = check_timeout
|
|
self.retry_delay = retry_delay
|
|
self.verbose = verbose
|
|
self.start_time = time.time()
|
|
self.checks_passed = 0
|
|
self.checks_failed = 0
|
|
|
|
def elapsed(self) -> str:
|
|
"""Return formatted elapsed time MM:SS."""
|
|
elapsed_sec = int(time.time() - self.start_time)
|
|
minutes = elapsed_sec // 60
|
|
seconds = elapsed_sec % 60
|
|
return f"{minutes:02d}:{seconds:02d}"
|
|
|
|
def time_remaining(self) -> float:
|
|
"""Return seconds remaining in global timeout."""
|
|
return self.global_timeout - (time.time() - self.start_time)
|
|
|
|
def log(self, message: str, level: str = "info"):
|
|
"""Log a message with timestamp."""
|
|
timestamp = self.elapsed()
|
|
if level == "success":
|
|
icon = "✓"
|
|
elif level == "error":
|
|
icon = "✗"
|
|
elif level == "progress":
|
|
icon = "⏳"
|
|
else:
|
|
icon = " "
|
|
print(f"[{timestamp}] {icon} {message}", flush=True)
|
|
|
|
def debug(self, message: str):
|
|
"""Log a debug message if verbose mode is enabled."""
|
|
if self.verbose:
|
|
timestamp = self.elapsed()
|
|
print(f"[{timestamp}] {message}", flush=True)
|
|
|
|
def run_check(
|
|
self,
|
|
name: str,
|
|
check_func,
|
|
*args,
|
|
**kwargs
|
|
) -> bool:
|
|
"""
|
|
Run a check with retry logic until success or global timeout.
|
|
|
|
Args:
|
|
name: Human-readable name of the check
|
|
check_func: Function that returns (success: bool, message: str)
|
|
*args, **kwargs: Arguments to pass to check_func
|
|
|
|
Returns:
|
|
True if check passed, False otherwise
|
|
"""
|
|
attempt = 0
|
|
|
|
while self.time_remaining() > 0:
|
|
attempt += 1
|
|
|
|
if attempt > 1:
|
|
self.log(f"Checking {name}... (attempt {attempt})", "progress")
|
|
else:
|
|
self.log(f"Checking {name}...", "progress")
|
|
|
|
try:
|
|
# Run the check with timeout
|
|
success, message = check_func(*args, **kwargs)
|
|
|
|
if success:
|
|
self.log(f"{name}: {message}", "success")
|
|
self.checks_passed += 1
|
|
return True
|
|
else:
|
|
self.debug(f"{name} not ready: {message}")
|
|
|
|
except Exception as e:
|
|
self.debug(f"{name} check failed with exception: {e}")
|
|
|
|
# Check if we have time for another attempt
|
|
if self.time_remaining() < self.retry_delay:
|
|
break
|
|
|
|
# Wait before retry
|
|
time.sleep(self.retry_delay)
|
|
|
|
# Check failed
|
|
self.log(f"{name}: Failed (timeout after {attempt} attempts)", "error")
|
|
self.checks_failed += 1
|
|
return False
|
|
|
|
|
|
def check_pulsar(url: str, timeout: int) -> Tuple[bool, str]:
|
|
"""Check if Pulsar admin API is responding."""
|
|
try:
|
|
resp = requests.get(f"{url}/admin/v2/clusters", timeout=timeout)
|
|
if resp.status_code == 200:
|
|
clusters = resp.json()
|
|
return True, f"Pulsar healthy ({len(clusters)} cluster(s))"
|
|
else:
|
|
return False, f"Pulsar returned status {resp.status_code}"
|
|
except requests.exceptions.Timeout:
|
|
return False, "Pulsar connection timeout"
|
|
except requests.exceptions.ConnectionError:
|
|
return False, "Cannot connect to Pulsar"
|
|
except Exception as e:
|
|
return False, f"Pulsar error: {e}"
|
|
|
|
|
|
def check_api_gateway(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if API Gateway is responding."""
|
|
try:
|
|
# Try to hit the base URL
|
|
headers = {}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
if not url.endswith('/'):
|
|
url += '/'
|
|
|
|
resp = requests.get(url, headers=headers, timeout=timeout)
|
|
if resp.status_code in [200, 404]: # 404 is OK, means gateway is up
|
|
return True, "API Gateway is responding"
|
|
else:
|
|
return False, f"API Gateway returned status {resp.status_code}"
|
|
except requests.exceptions.Timeout:
|
|
return False, "API Gateway connection timeout"
|
|
except requests.exceptions.ConnectionError:
|
|
return False, "Cannot connect to API Gateway"
|
|
except Exception as e:
|
|
return False, f"API Gateway error: {e}"
|
|
|
|
|
|
def check_processors(url: str, min_processors: int, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if processors are running via metrics endpoint."""
|
|
try:
|
|
# Construct metrics URL from API URL
|
|
if not url.endswith('/'):
|
|
url += '/'
|
|
metrics_url = f"{url}api/metrics/query?query=processor_info"
|
|
|
|
resp = requests.get(metrics_url, timeout=timeout)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
processor_count = len(data.get("data", {}).get("result", []))
|
|
|
|
if processor_count >= min_processors:
|
|
return True, f"Found {processor_count} processors (≥ {min_processors})"
|
|
else:
|
|
return False, f"Only {processor_count} processors running (need {min_processors})"
|
|
else:
|
|
return False, f"Metrics returned status {resp.status_code}"
|
|
|
|
except Exception as e:
|
|
return False, f"Processor check error: {e}"
|
|
|
|
|
|
def check_flow_blueprints(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if flow blueprints are loaded."""
|
|
try:
|
|
api = Api(url, token=token, timeout=timeout)
|
|
flow_api = api.flow()
|
|
|
|
blueprints = flow_api.list_blueprints()
|
|
|
|
if blueprints and len(blueprints) > 0:
|
|
return True, f"Found {len(blueprints)} flow blueprint(s)"
|
|
else:
|
|
return False, "No flow blueprints found"
|
|
|
|
except Exception as e:
|
|
return False, f"Flow blueprints check error: {e}"
|
|
|
|
|
|
def check_flows(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if flow manager is responding."""
|
|
try:
|
|
api = Api(url, token=token, timeout=timeout)
|
|
flow_api = api.flow()
|
|
|
|
flows = flow_api.list()
|
|
|
|
# Success if we get a response (even if empty)
|
|
return True, f"Flow manager responding ({len(flows)} flow(s))"
|
|
|
|
except Exception as e:
|
|
return False, f"Flow manager check error: {e}"
|
|
|
|
|
|
def check_prompts(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if prompts are loaded."""
|
|
try:
|
|
api = Api(url, token=token, timeout=timeout)
|
|
config = api.config()
|
|
|
|
# Import ConfigKey here to avoid top-level import issues
|
|
from trustgraph.api.types import ConfigKey
|
|
import json
|
|
|
|
# Get the template-index which lists all prompts
|
|
values = config.get([
|
|
ConfigKey(type="prompt", key="template-index")
|
|
])
|
|
|
|
ix = json.loads(values[0].value)
|
|
|
|
if ix and len(ix) > 0:
|
|
return True, f"Found {len(ix)} prompt(s)"
|
|
else:
|
|
return False, "No prompts found"
|
|
|
|
except Exception as e:
|
|
return False, f"Prompts check error: {e}"
|
|
|
|
|
|
def check_library(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Check if library service is responding."""
|
|
try:
|
|
api = Api(url, token=token, timeout=timeout)
|
|
library_api = api.library()
|
|
|
|
# Try to get documents (with default user)
|
|
docs = library_api.get_documents(user="trustgraph")
|
|
|
|
# Success if we get a valid response (even if empty)
|
|
return True, f"Library responding ({len(docs)} document(s))"
|
|
|
|
except Exception as e:
|
|
return False, f"Library check error: {e}"
|
|
|
|
|
|
def check_ui(url: str, timeout: int) -> Tuple[bool, str]:
|
|
"""Check if Workbench UI is responding."""
|
|
try:
|
|
if not url.endswith('/'):
|
|
url += '/'
|
|
|
|
resp = requests.get(f"{url}index.html", timeout=timeout)
|
|
if resp.status_code == 200:
|
|
return True, "Workbench UI is responding"
|
|
else:
|
|
return False, f"UI returned status {resp.status_code}"
|
|
except requests.exceptions.Timeout:
|
|
return False, "UI connection timeout"
|
|
except requests.exceptions.ConnectionError:
|
|
return False, "Cannot connect to UI"
|
|
except Exception as e:
|
|
return False, f"UI error: {e}"
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the CLI."""
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog='tg-verify-system-status',
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--global-timeout',
|
|
type=int,
|
|
default=120,
|
|
help='Total timeout in seconds (default: 120)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--check-timeout',
|
|
type=int,
|
|
default=10,
|
|
help='Per-check timeout in seconds (default: 10)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--retry-delay',
|
|
type=int,
|
|
default=3,
|
|
help='Delay between retries in seconds (default: 3)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--min-processors',
|
|
type=int,
|
|
default=15,
|
|
help='Minimum processors required (default: 15)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--pulsar-url',
|
|
default=default_pulsar_url,
|
|
help=f'Pulsar admin URL (default: {default_pulsar_url})'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--api-url',
|
|
default=default_api_url,
|
|
help=f'API Gateway URL (default: {default_api_url})'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--ui-url',
|
|
default=default_ui_url,
|
|
help=f'Workbench UI URL (default: {default_ui_url})'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--skip-ui',
|
|
action='store_true',
|
|
help='Skip UI check (for headless deployments)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-t', '--token',
|
|
default=default_token,
|
|
help='Authentication token (default: $TRUSTGRAPH_TOKEN)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-v', '--verbose',
|
|
action='store_true',
|
|
help='Show detailed output'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create health checker
|
|
checker = HealthChecker(
|
|
global_timeout=args.global_timeout,
|
|
check_timeout=args.check_timeout,
|
|
retry_delay=args.retry_delay,
|
|
verbose=args.verbose
|
|
)
|
|
|
|
print("=" * 60)
|
|
print("TrustGraph System Status Verification")
|
|
print("=" * 60)
|
|
# print(f"Global timeout: {args.global_timeout}s")
|
|
# print(f"Check timeout: {args.check_timeout}s")
|
|
# print(f"Retry delay: {args.retry_delay}s")
|
|
# print("=" * 60)
|
|
print()
|
|
|
|
# Phase 1: Infrastructure
|
|
print("Phase 1: Infrastructure")
|
|
print("-" * 60)
|
|
|
|
if not checker.run_check(
|
|
"Pulsar",
|
|
check_pulsar,
|
|
args.pulsar_url,
|
|
args.check_timeout
|
|
):
|
|
print("\n⚠️ Pulsar is not responding - other checks may fail")
|
|
print()
|
|
|
|
checker.run_check(
|
|
"API Gateway",
|
|
check_api_gateway,
|
|
args.api_url,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
print()
|
|
|
|
# Phase 2: Core Services
|
|
print("Phase 2: Core Services")
|
|
print("-" * 60)
|
|
|
|
checker.run_check(
|
|
"Processors",
|
|
check_processors,
|
|
args.api_url,
|
|
args.min_processors,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
checker.run_check(
|
|
"Flow Blueprints",
|
|
check_flow_blueprints,
|
|
args.api_url,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
checker.run_check(
|
|
"Flows",
|
|
check_flows,
|
|
args.api_url,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
checker.run_check(
|
|
"Prompts",
|
|
check_prompts,
|
|
args.api_url,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
print()
|
|
|
|
# Phase 3: Data Services
|
|
print("Phase 3: Data Services")
|
|
print("-" * 60)
|
|
|
|
checker.run_check(
|
|
"Library",
|
|
check_library,
|
|
args.api_url,
|
|
args.check_timeout,
|
|
args.token
|
|
)
|
|
|
|
print()
|
|
|
|
# Phase 4: UI (optional)
|
|
if not args.skip_ui:
|
|
print("Phase 4: User Interface")
|
|
print("-" * 60)
|
|
|
|
checker.run_check(
|
|
"Workbench UI",
|
|
check_ui,
|
|
args.ui_url,
|
|
args.check_timeout
|
|
)
|
|
|
|
print()
|
|
|
|
# Summary
|
|
print("=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
|
|
total_checks = checker.checks_passed + checker.checks_failed
|
|
|
|
print(f"Checks passed: {checker.checks_passed}/{total_checks}")
|
|
print(f"Checks failed: {checker.checks_failed}/{total_checks}")
|
|
print(f"Total time: {checker.elapsed()}")
|
|
|
|
if checker.checks_failed == 0:
|
|
print("\n✓ System is healthy!")
|
|
sys.exit(0)
|
|
else:
|
|
print(f"\n✗ System has {checker.checks_failed} failing check(s)")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|