templates-zabbix-itguys/validate_zabbix_template.py

567 lines
20 KiB
Python

import yaml
import sys
import os
import re
import argparse
import json
import urllib.request
import urllib.error
# Fix for Windows console UTF-8 output (emojis)
if sys.stdout.encoding != 'utf-8':
try:
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
pass # Python < 3.7 fallback
# ============================================================================
# VALIDATION FUNCTIONS - Arthur Gold Edition
# ============================================================================
def is_valid_uuidv4(uuid_str):
"""
Validate if a string is a proper UUIDv4 format.
UUIDv4 rules (32 hex chars, no dashes):
- Position 13 (0-indexed 12) must be '4' (version)
- Position 17 (0-indexed 16) must be '8', '9', 'a', or 'b' (variant)
"""
if not isinstance(uuid_str, str):
return False, "UUID is not a string"
# Remove dashes if present and lowercase
clean = uuid_str.replace('-', '').lower()
if len(clean) != 32:
return False, f"UUID has {len(clean)} chars (expected 32)"
if not re.match(r'^[0-9a-f]{32}$', clean):
return False, "UUID contains non-hex characters"
# Check version (position 12, 0-indexed)
if clean[12] != '4':
return False, f"UUID version is '{clean[12]}' (expected '4' at position 13)"
# Check variant (position 16, 0-indexed)
if clean[16] not in '89ab':
return False, f"UUID variant is '{clean[16]}' (expected '8/9/a/b' at position 17)"
return True, "Valid UUIDv4"
def validate_uuids_format(content):
"""
Recursively check all UUIDs in the template for valid UUIDv4 format.
Returns list of errors.
"""
errors = []
def check_node(node, path="root"):
if isinstance(node, dict):
if 'uuid' in node:
uuid = node['uuid']
is_valid, msg = is_valid_uuidv4(uuid)
if not is_valid:
errors.append(f"[INVALID UUID] {uuid} at {path}: {msg}")
for k, v in node.items():
check_node(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
check_node(item, f"{path}[{i}]")
check_node(content)
return errors
def collect_item_keys(content):
"""
Collect all item and item_prototype keys from the template.
Used for validating graph references.
"""
keys = set()
def extract(node, path="root"):
if isinstance(node, dict):
# Collect from items and item_prototypes
if 'key' in node and ('type' in node or 'delay' in node or 'value_type' in node):
keys.add(node['key'])
for k, v in node.items():
extract(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
extract(item, f"{path}[{i}]")
extract(content)
return keys
def collect_graph_names(content):
"""
Collect all graph and graph_prototype names from the template.
Used for validating dashboard references.
"""
names = set()
def extract(node, path="root", in_graphs=False):
if isinstance(node, dict):
# Check if we're in a graphs section
if 'name' in node and (in_graphs or 'graph_items' in node):
names.add(node['name'])
for k, v in node.items():
is_graph_section = k in ('graphs', 'graph_prototypes')
extract(v, f"{path}.{k}", in_graphs or is_graph_section)
elif isinstance(node, list):
for i, item in enumerate(node):
extract(item, f"{path}[{i}]", in_graphs)
extract(content)
return names
def validate_graph_references(content, item_keys):
"""
Check if all items referenced in graphs actually exist.
Returns list of errors.
"""
errors = []
def check_graphs(node, path="root"):
if isinstance(node, dict):
# Check graph_items for item references
if 'graph_items' in node:
graph_name = node.get('name', 'Unknown')
for i, graph_item in enumerate(node['graph_items']):
if 'item' in graph_item and 'key' in graph_item['item']:
ref_key = graph_item['item']['key']
# Handle LLD macros - extract base pattern
base_key = re.sub(r'\[.*\]', '[*]', ref_key)
# Check if key exists (exact match or pattern match)
found = False
for existing_key in item_keys:
existing_base = re.sub(r'\[.*\]', '[*]', existing_key)
if existing_base == base_key or existing_key == ref_key:
found = True
break
if not found:
errors.append(f"[MISSING ITEM REF] Graph '{graph_name}' references non-existent item '{ref_key}'")
for k, v in node.items():
check_graphs(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
check_graphs(item, f"{path}[{i}]")
check_graphs(content)
return errors
def validate_dashboard_references(content, graph_names):
"""
Check if all graphs referenced in dashboards actually exist.
Returns list of errors.
"""
errors = []
def check_dashboards(node, path="root"):
if isinstance(node, dict):
# Check for dashboard widget graph references
if 'fields' in node and isinstance(node['fields'], list):
widget_name = node.get('name', 'Unknown widget')
for field in node['fields']:
if isinstance(field, dict):
if field.get('name') == 'graphid.0' and 'value' in field:
value = field['value']
if isinstance(value, dict) and 'name' in value:
ref_name = value['name']
if ref_name not in graph_names:
errors.append(f"[MISSING GRAPH REF] Dashboard widget references non-existent graph '{ref_name}'")
for k, v in node.items():
check_dashboards(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
check_dashboards(item, f"{path}[{i}]")
check_dashboards(content)
return errors
def check_duplicate_yaml_keys(file_path):
"""
Check for duplicate YAML keys at the same level (e.g., two 'macros:' sections).
This is a Zabbix import killer - YAML parsers silently merge, but Zabbix rejects.
Uses regex-based scanning since yaml.safe_load silently handles duplicates.
Returns list of errors.
Note: Only checks for duplicates at template-level (indent 4) since nested
keys like 'triggers:' can legitimately appear multiple times in different
item contexts.
"""
errors = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
except Exception as e:
errors.append(f"[FILE ERROR] Could not read file: {e}")
return errors
# Track keys at template level (indent 4) only
# Key: key_name -> list of line numbers
template_level_keys = {}
# Critical keys that should never be duplicated at template level
critical_keys = {'macros', 'items', 'discovery_rules', 'dashboards',
'graphs', 'valuemaps', 'value_maps'}
for line_num, line in enumerate(lines, 1):
# Skip comments and empty lines
stripped = line.lstrip()
if not stripped or stripped.startswith('#'):
continue
# Calculate indentation (spaces before content)
indent = len(line) - len(line.lstrip())
# Only check template-level keys (indent 4 for " macros:")
if indent != 4:
continue
# Match YAML key pattern: "key:" or "key: value"
import re
match = re.match(r'^(\s*)([a-zA-Z_][a-zA-Z0-9_]*):', line)
if match:
key_name = match.group(2)
# Only track critical keys
if key_name in critical_keys:
if key_name not in template_level_keys:
template_level_keys[key_name] = []
template_level_keys[key_name].append(line_num)
# Report duplicates
for key_name, line_numbers in template_level_keys.items():
if len(line_numbers) > 1:
lines_str = ', '.join(map(str, line_numbers))
errors.append(f"[DUPLICATE KEY] '{key_name}:' appears {len(line_numbers)} times at template level (lines: {lines_str})")
return errors
def validate_yaml(file_path):
print(f"Validating {file_path}...")
print("=" * 60)
# ========== 0. Check for duplicate YAML keys (pre-parse) ==========
print("\n[0/5] Checking for duplicate YAML keys...")
duplicate_key_errors = check_duplicate_yaml_keys(file_path)
if duplicate_key_errors:
print(f" ❌ Found {len(duplicate_key_errors)} duplicate key issues")
for err in duplicate_key_errors:
print(f"{err}")
return False
else:
print(" ✅ No duplicate YAML keys detected")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
except Exception as e:
print(f"[FATAL] Invalid YAML syntax: {e}")
return False
if not content:
print("[FATAL] Empty file")
return False
all_errors = []
warnings = []
uuids = set()
# ========== 1. UUID Format Validation (UUIDv4) ==========
print("\n[1/4] Checking UUID format (UUIDv4 compliance)...")
uuid_errors = validate_uuids_format(content)
if uuid_errors:
all_errors.extend(uuid_errors)
print(f" ❌ Found {len(uuid_errors)} invalid UUIDs")
else:
print(" ✅ All UUIDs are valid UUIDv4 format")
# ========== 2. UUID Duplicates Check ==========
print("\n[2/4] Checking for duplicate UUIDs...")
def check_uuid(node, path="root"):
if isinstance(node, dict):
if 'uuid' in node:
uuid = node['uuid']
if uuid in uuids:
warnings.append(f"[DUPLICATE UUID] {uuid} found at {path}")
else:
uuids.add(uuid)
# Check for English descriptions (Basic Heuristic)
if 'description' in node:
desc = node['description']
if isinstance(desc, str):
if re.search(r'\bThe\b|\bThis\b|\bValue\b', desc):
warnings.append(f"[POTENTIAL ENGLISH] at {path}: '{desc[:40]}...'")
for k, v in node.items():
check_uuid(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
check_uuid(item, f"{path}[{i}]")
check_uuid(content)
dup_warnings = [w for w in warnings if 'DUPLICATE' in w]
if dup_warnings:
print(f" ⚠️ Found {len(dup_warnings)} duplicate UUIDs (warning only)")
else:
print(" ✅ No duplicate UUIDs")
# ========== 3. Graph Item References ==========
print("\n[3/4] Checking graph item references...")
item_keys = collect_item_keys(content)
graph_ref_errors = validate_graph_references(content, item_keys)
if graph_ref_errors:
all_errors.extend(graph_ref_errors)
print(f" ❌ Found {len(graph_ref_errors)} broken item references in graphs")
else:
print(f" ✅ All graph item references are valid ({len(item_keys)} items found)")
# ========== 4. Dashboard Graph References ==========
print("\n[4/4] Checking dashboard graph references...")
graph_names = collect_graph_names(content)
dashboard_ref_errors = validate_dashboard_references(content, graph_names)
if dashboard_ref_errors:
all_errors.extend(dashboard_ref_errors)
print(f" ❌ Found {len(dashboard_ref_errors)} broken graph references in dashboards")
else:
print(f" ✅ All dashboard graph references are valid ({len(graph_names)} graphs found)")
# ========== Summary ==========
print("\n" + "=" * 60)
if warnings:
eng_warnings = [w for w in warnings if 'ENGLISH' in w]
if eng_warnings:
print(f"\n[WARNINGS] {len(eng_warnings)} potential English descriptions found (Arthur Audit)")
for w in eng_warnings[:5]: # Show max 5
print(f"{w}")
if len(eng_warnings) > 5:
print(f" ... and {len(eng_warnings) - 5} more")
if all_errors:
print(f"\n[ERRORS FOUND] {len(all_errors)} critical issues:")
for e in all_errors:
print(f"{e}")
return False
else:
print("\n[SUCCESS] YAML Structure & UUIDs are VALID. ✅")
return True
def collect_uuids_from_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
if not content:
return set()
local_uuids = set()
def extract(node, path="root"):
if isinstance(node, dict):
if 'uuid' in node:
# Ignore UUIDs in template_groups and host_groups (they SHOULD be shared)
if "template_groups" not in path and "host_groups" not in path:
local_uuids.add(node['uuid'])
for k, v in node.items():
extract(v, f"{path}.{k}")
elif isinstance(node, list):
for i, item in enumerate(node):
extract(item, f"{path}[{i}]")
extract(content)
return local_uuids
except Exception as e:
print(f"[WARN] Could not parse {file_path} for collision check: {e}")
return set()
def check_cross_template_collisions(target_file, search_dir):
print(f"\n[INFO] Checking for cross-template UUID collisions in: {search_dir}")
target_uuids = collect_uuids_from_file(target_file)
if not target_uuids:
return True # Target file is empty or invalid, handled by main validation
collisions = []
for root, _, files in os.walk(search_dir):
for file in files:
if file.endswith('.yaml') or file.endswith('.xml'):
full_path = os.path.join(root, file)
if os.path.abspath(full_path) == os.path.abspath(target_file):
continue # Skip self
other_uuids = collect_uuids_from_file(full_path)
intersection = target_uuids.intersection(other_uuids)
if intersection:
for uuid in intersection:
collisions.append(f"[COLLISION] UUID {uuid} exists in both '{os.path.basename(target_file)}' and '{file}'")
if collisions:
print("\n[CROSS-TEMPLATE COLLISIONS DETECTED]:")
for c in collisions:
print(c)
return False
else:
print("[SUCCESS] No cross-template UUID collisions found.")
return True
def zabbix_import(file_path, url, token):
print(f"\n[INFO] Attempting to import {os.path.basename(file_path)} to Zabbix at {url}...")
try:
with open(file_path, 'r', encoding='utf-8') as f:
yaml_content = f.read()
except Exception as e:
print(f"[ERROR] Could not read file for import: {e}")
return False
# Construct the JSON-RPC request for Zabbix 6.0/7.0
payload = {
"jsonrpc": "2.0",
"method": "configuration.import",
"params": {
"format": "yaml",
"source": yaml_content,
"rules": {
"host_groups": {
"createMissing": True,
"updateExisting": True
},
"template_groups": {
"createMissing": True,
"updateExisting": True
},
"templates": {
"createMissing": True,
"updateExisting": True
},
"valueMaps": {
"createMissing": True,
"updateExisting": True
},
"templateDashboards": {
"createMissing": True,
"updateExisting": True
},
"templateLinkage": {
"createMissing": True, # Usually we want to link if missing
"deleteMissing": False
},
"items": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False
},
"discoveryRules": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False
},
"triggers": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False
},
"graphs": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False
},
"httptests": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False
}
}
},
"id": 1
}
# Prepare request
api_url = url.rstrip('/') + "/api_jsonrpc.php"
headers = {
'Content-Type': 'application/json-rpc',
'Authorization': f'Bearer {token}'
}
data = json.dumps(payload).encode('utf-8')
try:
req = urllib.request.Request(api_url, data=data, headers=headers, method='POST')
with urllib.request.urlopen(req) as response:
resp_body = response.read().decode('utf-8')
json_resp = json.loads(resp_body)
if 'error' in json_resp:
error = json_resp['error']
print(f"[IMPORT FAILED] API Error {error.get('code')}: {error.get('message')}")
if 'data' in error:
print(f"Details: {error['data']}")
return False
elif 'result' in json_resp and json_resp['result'] is True:
print(f"[SUCCESS] Template imported successfully!")
return True
else:
# Unexpected success response format, but likely success if no error
print(f"[SUCCESS] Template imported (Response: {json_resp.get('result')})")
return True
except urllib.error.HTTPError as e:
print(f"[IMPORT FAILED] HTTP Error: {e.code} - {e.reason}")
return False
except urllib.error.URLError as e:
print(f"[IMPORT FAILED] Connection Error: {e.reason}")
return False
except Exception as e:
print(f"[IMPORT FAILED] Unexpected error: {e}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Validate and optionally import Zabbix templates.")
parser.add_argument("file", help="Path to the YAML template file")
parser.add_argument("--url", help="Zabbix Server URL (e.g., https://zabbix.example.com)", default=None)
parser.add_argument("--token", help="Zabbix API Token", default=None)
parser.add_argument("--import-template", action="store_true", help="Attempt to import if validation passes")
args = parser.parse_args()
file_path = args.file
# 1. Validate the file itself
if not validate_yaml(file_path):
sys.exit(1)
# 2. Check for collisions in the same directory (Gold Edition Suite)
directory = os.path.dirname(os.path.abspath(file_path))
if not check_cross_template_collisions(file_path, directory):
sys.exit(1)
# 3. Import if requested
if args.import_template:
if not args.url or not args.token:
print("\n[ERROR] To import, you must provide --url and --token.")
sys.exit(1)
if not zabbix_import(file_path, args.url, args.token):
sys.exit(1)
sys.exit(0)