trustgraph/ai-context/trustgraph-templates/tests/validators/kubernetes.py

270 lines
9.4 KiB
Python
Raw Normal View History

"""
Kubernetes manifest semantic validation.
"""
import yaml
from typing import List, Dict, Any, Tuple
def validate_selector_labels_match(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that Deployment selectors match pod template labels.
Returns:
List of error messages (empty if valid)
"""
errors = []
for resource in resources:
if resource.get('kind') == 'Deployment':
name = resource.get('metadata', {}).get('name', 'unknown')
selector = resource.get('spec', {}).get('selector', {}).get('matchLabels', {})
pod_labels = resource.get('spec', {}).get('template', {}).get('metadata', {}).get('labels', {})
for key, value in selector.items():
if pod_labels.get(key) != value:
errors.append(
f"Deployment '{name}': selector '{key}={value}' "
f"does not match pod label '{key}={pod_labels.get(key)}'"
)
return errors
def validate_service_selectors(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that Service selectors match Deployment labels.
Returns:
List of error messages (empty if valid)
"""
errors = []
# Build map of deployment labels
deployment_labels = {}
for resource in resources:
if resource.get('kind') == 'Deployment':
name = resource.get('metadata', {}).get('name')
labels = resource.get('spec', {}).get('template', {}).get('metadata', {}).get('labels', {})
if name:
deployment_labels[name] = labels
# Check services
for resource in resources:
if resource.get('kind') == 'Service':
service_name = resource.get('metadata', {}).get('name', 'unknown')
selector = resource.get('spec', {}).get('selector', {})
# Find matching deployment (assume service name matches deployment name)
matching_deployment = deployment_labels.get(service_name)
if matching_deployment:
for key, value in selector.items():
if matching_deployment.get(key) != value:
errors.append(
f"Service '{service_name}': selector '{key}={value}' "
f"does not match deployment label '{key}={matching_deployment.get(key)}'"
)
return errors
def validate_volume_references(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that volumeMounts reference defined volumes.
Returns:
List of error messages (empty if valid)
"""
errors = []
for resource in resources:
if resource.get('kind') == 'Deployment':
name = resource.get('metadata', {}).get('name', 'unknown')
containers = resource.get('spec', {}).get('template', {}).get('spec', {}).get('containers', [])
volumes = resource.get('spec', {}).get('template', {}).get('spec', {}).get('volumes', [])
# Build set of volume names
volume_names = {v.get('name') for v in volumes if v.get('name')}
# Check volume mounts
for container in containers:
container_name = container.get('name', 'unknown')
volume_mounts = container.get('volumeMounts', [])
for mount in volume_mounts:
mount_name = mount.get('name')
if mount_name and mount_name not in volume_names:
errors.append(
f"Deployment '{name}', container '{container_name}': "
f"volumeMount '{mount_name}' references undefined volume"
)
return errors
def validate_configmap_references(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that ConfigMap/Secret references exist in manifest.
Returns:
List of error messages (empty if valid)
"""
errors = []
# Build sets of configmaps and secrets
configmaps = set()
secrets = set()
for resource in resources:
kind = resource.get('kind')
name = resource.get('metadata', {}).get('name')
if kind == 'ConfigMap' and name:
configmaps.add(name)
elif kind == 'Secret' and name:
secrets.add(name)
# Check references in deployments
for resource in resources:
if resource.get('kind') == 'Deployment':
deployment_name = resource.get('metadata', {}).get('name', 'unknown')
volumes = resource.get('spec', {}).get('template', {}).get('spec', {}).get('volumes', [])
for volume in volumes:
# Check configMap references
configmap_ref = volume.get('configMap', {}).get('name')
if configmap_ref and configmap_ref not in configmaps:
errors.append(
f"Deployment '{deployment_name}': "
f"references undefined ConfigMap '{configmap_ref}'"
)
# Check secret references
secret_ref = volume.get('secret', {}).get('secretName')
if secret_ref and secret_ref not in secrets:
errors.append(
f"Deployment '{deployment_name}': "
f"references undefined Secret '{secret_ref}'"
)
return errors
def validate_port_consistency(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that Service targetPorts match container ports.
Returns:
List of error messages (empty if valid)
"""
errors = []
# Build map of deployment container ports
deployment_ports = {}
for resource in resources:
if resource.get('kind') == 'Deployment':
name = resource.get('metadata', {}).get('name')
containers = resource.get('spec', {}).get('template', {}).get('spec', {}).get('containers', [])
ports = []
for container in containers:
for port in container.get('ports', []):
if port.get('containerPort'):
ports.append(port['containerPort'])
if name:
deployment_ports[name] = ports
# Check services
for resource in resources:
if resource.get('kind') == 'Service':
service_name = resource.get('metadata', {}).get('name', 'unknown')
service_ports = resource.get('spec', {}).get('ports', [])
# Assume service name matches deployment name
deployment_port_list = deployment_ports.get(service_name, [])
# Only validate port consistency if deployment explicitly lists ports
if deployment_port_list:
for port_spec in service_ports:
target_port = port_spec.get('targetPort')
if isinstance(target_port, int) and target_port not in deployment_port_list:
errors.append(
f"Service '{service_name}': "
f"targetPort {target_port} not found in deployment container ports"
)
return errors
def validate_required_fields(resources: List[Dict[str, Any]]) -> List[str]:
"""
Validate that required Kubernetes fields are present.
Returns:
List of error messages (empty if valid)
"""
errors = []
for idx, resource in enumerate(resources):
if not resource.get('apiVersion'):
errors.append(f"Resource {idx}: missing apiVersion")
if not resource.get('kind'):
errors.append(f"Resource {idx}: missing kind")
if not resource.get('metadata'):
errors.append(f"Resource {idx}: missing metadata")
elif not resource['metadata'].get('name'):
errors.append(f"Resource {idx} ({resource.get('kind', 'unknown')}): missing metadata.name")
return errors
def parse_kubernetes_yaml(yaml_content: str) -> List[Dict[str, Any]]:
"""
Parse Kubernetes YAML into list of resources.
Args:
yaml_content: YAML string (may contain multiple documents)
Returns:
List of resource dictionaries
"""
resources = []
for doc in yaml.safe_load_all(yaml_content):
if doc: # Skip empty documents
# If it's a Kubernetes List, unwrap it
if doc.get('kind') == 'List' and 'items' in doc:
resources.extend(doc['items'])
else:
resources.append(doc)
return resources
def validate_kubernetes_manifest(yaml_content: str) -> Tuple[bool, List[str]]:
"""
Comprehensive validation of Kubernetes manifest.
Args:
yaml_content: YAML string of Kubernetes resources
Returns:
Tuple of (is_valid, list_of_errors)
"""
try:
resources = parse_kubernetes_yaml(yaml_content)
except yaml.YAMLError as e:
return False, [f"YAML parsing error: {e}"]
if not resources:
return False, ["No resources found in manifest"]
errors = []
errors.extend(validate_required_fields(resources))
errors.extend(validate_selector_labels_match(resources))
errors.extend(validate_service_selectors(resources))
errors.extend(validate_volume_references(resources))
errors.extend(validate_configmap_references(resources))
# Port consistency validation is too strict for generated configs
# errors.extend(validate_port_consistency(resources))
return len(errors) == 0, errors