494 lines
18 KiB
Python
494 lines
18 KiB
Python
import yaml
|
|
import sys
|
|
import os
|
|
import re
|
|
import argparse
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
# Fix for Windows console UTF-8 output (emojis)
|
|
if sys.stdout.encoding != 'utf-8':
|
|
try:
|
|
sys.stdout.reconfigure(encoding='utf-8')
|
|
except AttributeError:
|
|
pass # Python < 3.7 fallback
|
|
|
|
# ============================================================================
|
|
# VALIDATION FUNCTIONS - Arthur Gold Edition
|
|
# ============================================================================
|
|
|
|
def is_valid_uuidv4(uuid_str):
|
|
"""
|
|
Validate if a string is a proper UUIDv4 format.
|
|
UUIDv4 rules (32 hex chars, no dashes):
|
|
- Position 13 (0-indexed 12) must be '4' (version)
|
|
- Position 17 (0-indexed 16) must be '8', '9', 'a', or 'b' (variant)
|
|
"""
|
|
if not isinstance(uuid_str, str):
|
|
return False, "UUID is not a string"
|
|
|
|
# Remove dashes if present and lowercase
|
|
clean = uuid_str.replace('-', '').lower()
|
|
|
|
if len(clean) != 32:
|
|
return False, f"UUID has {len(clean)} chars (expected 32)"
|
|
|
|
if not re.match(r'^[0-9a-f]{32}$', clean):
|
|
return False, "UUID contains non-hex characters"
|
|
|
|
# Check version (position 12, 0-indexed)
|
|
if clean[12] != '4':
|
|
return False, f"UUID version is '{clean[12]}' (expected '4' at position 13)"
|
|
|
|
# Check variant (position 16, 0-indexed)
|
|
if clean[16] not in '89ab':
|
|
return False, f"UUID variant is '{clean[16]}' (expected '8/9/a/b' at position 17)"
|
|
|
|
return True, "Valid UUIDv4"
|
|
|
|
|
|
def validate_uuids_format(content):
|
|
"""
|
|
Recursively check all UUIDs in the template for valid UUIDv4 format.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_node(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
uuid = node['uuid']
|
|
is_valid, msg = is_valid_uuidv4(uuid)
|
|
if not is_valid:
|
|
errors.append(f"[INVALID UUID] {uuid} at {path}: {msg}")
|
|
|
|
for k, v in node.items():
|
|
check_node(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_node(item, f"{path}[{i}]")
|
|
|
|
check_node(content)
|
|
return errors
|
|
|
|
|
|
def collect_item_keys(content):
|
|
"""
|
|
Collect all item and item_prototype keys from the template.
|
|
Used for validating graph references.
|
|
"""
|
|
keys = set()
|
|
|
|
def extract(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Collect from items and item_prototypes
|
|
if 'key' in node and ('type' in node or 'delay' in node or 'value_type' in node):
|
|
keys.add(node['key'])
|
|
|
|
for k, v in node.items():
|
|
extract(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]")
|
|
|
|
extract(content)
|
|
return keys
|
|
|
|
|
|
def collect_graph_names(content):
|
|
"""
|
|
Collect all graph and graph_prototype names from the template.
|
|
Used for validating dashboard references.
|
|
"""
|
|
names = set()
|
|
|
|
def extract(node, path="root", in_graphs=False):
|
|
if isinstance(node, dict):
|
|
# Check if we're in a graphs section
|
|
if 'name' in node and (in_graphs or 'graph_items' in node):
|
|
names.add(node['name'])
|
|
|
|
for k, v in node.items():
|
|
is_graph_section = k in ('graphs', 'graph_prototypes')
|
|
extract(v, f"{path}.{k}", in_graphs or is_graph_section)
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]", in_graphs)
|
|
|
|
extract(content)
|
|
return names
|
|
|
|
|
|
def validate_graph_references(content, item_keys):
|
|
"""
|
|
Check if all items referenced in graphs actually exist.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_graphs(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check graph_items for item references
|
|
if 'graph_items' in node:
|
|
graph_name = node.get('name', 'Unknown')
|
|
for i, graph_item in enumerate(node['graph_items']):
|
|
if 'item' in graph_item and 'key' in graph_item['item']:
|
|
ref_key = graph_item['item']['key']
|
|
# Handle LLD macros - extract base pattern
|
|
base_key = re.sub(r'\[.*\]', '[*]', ref_key)
|
|
|
|
# Check if key exists (exact match or pattern match)
|
|
found = False
|
|
for existing_key in item_keys:
|
|
existing_base = re.sub(r'\[.*\]', '[*]', existing_key)
|
|
if existing_base == base_key or existing_key == ref_key:
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
errors.append(f"[MISSING ITEM REF] Graph '{graph_name}' references non-existent item '{ref_key}'")
|
|
|
|
for k, v in node.items():
|
|
check_graphs(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_graphs(item, f"{path}[{i}]")
|
|
|
|
check_graphs(content)
|
|
return errors
|
|
|
|
|
|
def validate_dashboard_references(content, graph_names):
|
|
"""
|
|
Check if all graphs referenced in dashboards actually exist.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_dashboards(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check for dashboard widget graph references
|
|
if 'fields' in node and isinstance(node['fields'], list):
|
|
widget_name = node.get('name', 'Unknown widget')
|
|
for field in node['fields']:
|
|
if isinstance(field, dict):
|
|
if field.get('name') == 'graphid.0' and 'value' in field:
|
|
value = field['value']
|
|
if isinstance(value, dict) and 'name' in value:
|
|
ref_name = value['name']
|
|
if ref_name not in graph_names:
|
|
errors.append(f"[MISSING GRAPH REF] Dashboard widget references non-existent graph '{ref_name}'")
|
|
|
|
for k, v in node.items():
|
|
check_dashboards(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_dashboards(item, f"{path}[{i}]")
|
|
|
|
check_dashboards(content)
|
|
return errors
|
|
|
|
|
|
def validate_yaml(file_path):
|
|
print(f"Validating {file_path}...")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f"[FATAL] Invalid YAML syntax: {e}")
|
|
return False
|
|
|
|
if not content:
|
|
print("[FATAL] Empty file")
|
|
return False
|
|
|
|
all_errors = []
|
|
warnings = []
|
|
uuids = set()
|
|
|
|
# ========== 1. UUID Format Validation (UUIDv4) ==========
|
|
print("\n[1/4] Checking UUID format (UUIDv4 compliance)...")
|
|
uuid_errors = validate_uuids_format(content)
|
|
if uuid_errors:
|
|
all_errors.extend(uuid_errors)
|
|
print(f" ❌ Found {len(uuid_errors)} invalid UUIDs")
|
|
else:
|
|
print(" ✅ All UUIDs are valid UUIDv4 format")
|
|
|
|
# ========== 2. UUID Duplicates Check ==========
|
|
print("\n[2/4] Checking for duplicate UUIDs...")
|
|
def check_uuid(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
uuid = node['uuid']
|
|
if uuid in uuids:
|
|
warnings.append(f"[DUPLICATE UUID] {uuid} found at {path}")
|
|
else:
|
|
uuids.add(uuid)
|
|
|
|
# Check for English descriptions (Basic Heuristic)
|
|
if 'description' in node:
|
|
desc = node['description']
|
|
if isinstance(desc, str):
|
|
if re.search(r'\bThe\b|\bThis\b|\bValue\b', desc):
|
|
warnings.append(f"[POTENTIAL ENGLISH] at {path}: '{desc[:40]}...'")
|
|
|
|
for k, v in node.items():
|
|
check_uuid(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_uuid(item, f"{path}[{i}]")
|
|
|
|
check_uuid(content)
|
|
dup_warnings = [w for w in warnings if 'DUPLICATE' in w]
|
|
if dup_warnings:
|
|
print(f" ⚠️ Found {len(dup_warnings)} duplicate UUIDs (warning only)")
|
|
else:
|
|
print(" ✅ No duplicate UUIDs")
|
|
|
|
# ========== 3. Graph Item References ==========
|
|
print("\n[3/4] Checking graph item references...")
|
|
item_keys = collect_item_keys(content)
|
|
graph_ref_errors = validate_graph_references(content, item_keys)
|
|
if graph_ref_errors:
|
|
all_errors.extend(graph_ref_errors)
|
|
print(f" ❌ Found {len(graph_ref_errors)} broken item references in graphs")
|
|
else:
|
|
print(f" ✅ All graph item references are valid ({len(item_keys)} items found)")
|
|
|
|
# ========== 4. Dashboard Graph References ==========
|
|
print("\n[4/4] Checking dashboard graph references...")
|
|
graph_names = collect_graph_names(content)
|
|
dashboard_ref_errors = validate_dashboard_references(content, graph_names)
|
|
if dashboard_ref_errors:
|
|
all_errors.extend(dashboard_ref_errors)
|
|
print(f" ❌ Found {len(dashboard_ref_errors)} broken graph references in dashboards")
|
|
else:
|
|
print(f" ✅ All dashboard graph references are valid ({len(graph_names)} graphs found)")
|
|
|
|
# ========== Summary ==========
|
|
print("\n" + "=" * 60)
|
|
|
|
if warnings:
|
|
eng_warnings = [w for w in warnings if 'ENGLISH' in w]
|
|
if eng_warnings:
|
|
print(f"\n[WARNINGS] {len(eng_warnings)} potential English descriptions found (Arthur Audit)")
|
|
for w in eng_warnings[:5]: # Show max 5
|
|
print(f" • {w}")
|
|
if len(eng_warnings) > 5:
|
|
print(f" ... and {len(eng_warnings) - 5} more")
|
|
|
|
if all_errors:
|
|
print(f"\n[ERRORS FOUND] {len(all_errors)} critical issues:")
|
|
for e in all_errors:
|
|
print(f" ❌ {e}")
|
|
return False
|
|
else:
|
|
print("\n[SUCCESS] YAML Structure & UUIDs are VALID. ✅")
|
|
return True
|
|
|
|
|
|
|
|
def collect_uuids_from_file(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = yaml.safe_load(f)
|
|
if not content:
|
|
return set()
|
|
|
|
local_uuids = set()
|
|
def extract(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
# Ignore UUIDs in template_groups and host_groups (they SHOULD be shared)
|
|
if "template_groups" not in path and "host_groups" not in path:
|
|
local_uuids.add(node['uuid'])
|
|
|
|
for k, v in node.items():
|
|
extract(v, f"{path}.{k}")
|
|
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]")
|
|
extract(content)
|
|
return local_uuids
|
|
except Exception as e:
|
|
print(f"[WARN] Could not parse {file_path} for collision check: {e}")
|
|
return set()
|
|
|
|
def check_cross_template_collisions(target_file, search_dir):
|
|
print(f"\n[INFO] Checking for cross-template UUID collisions in: {search_dir}")
|
|
target_uuids = collect_uuids_from_file(target_file)
|
|
if not target_uuids:
|
|
return True # Target file is empty or invalid, handled by main validation
|
|
|
|
collisions = []
|
|
|
|
for root, _, files in os.walk(search_dir):
|
|
for file in files:
|
|
if file.endswith('.yaml') or file.endswith('.xml'):
|
|
full_path = os.path.join(root, file)
|
|
if os.path.abspath(full_path) == os.path.abspath(target_file):
|
|
continue # Skip self
|
|
|
|
other_uuids = collect_uuids_from_file(full_path)
|
|
intersection = target_uuids.intersection(other_uuids)
|
|
|
|
if intersection:
|
|
for uuid in intersection:
|
|
collisions.append(f"[COLLISION] UUID {uuid} exists in both '{os.path.basename(target_file)}' and '{file}'")
|
|
|
|
if collisions:
|
|
print("\n[CROSS-TEMPLATE COLLISIONS DETECTED]:")
|
|
for c in collisions:
|
|
print(c)
|
|
return False
|
|
else:
|
|
print("[SUCCESS] No cross-template UUID collisions found.")
|
|
return True
|
|
|
|
def zabbix_import(file_path, url, token):
|
|
print(f"\n[INFO] Attempting to import {os.path.basename(file_path)} to Zabbix at {url}...")
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
yaml_content = f.read()
|
|
except Exception as e:
|
|
print(f"[ERROR] Could not read file for import: {e}")
|
|
return False
|
|
|
|
# Construct the JSON-RPC request for Zabbix 6.0/7.0
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "configuration.import",
|
|
"params": {
|
|
"format": "yaml",
|
|
"source": yaml_content,
|
|
"rules": {
|
|
"host_groups": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"template_groups": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templates": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"valueMaps": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templateDashboards": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templateLinkage": {
|
|
"createMissing": True, # Usually we want to link if missing
|
|
"deleteMissing": False
|
|
},
|
|
"items": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"discoveryRules": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"triggers": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"graphs": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"httptests": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
}
|
|
}
|
|
},
|
|
"id": 1
|
|
}
|
|
|
|
# Prepare request
|
|
api_url = url.rstrip('/') + "/api_jsonrpc.php"
|
|
headers = {
|
|
'Content-Type': 'application/json-rpc',
|
|
'Authorization': f'Bearer {token}'
|
|
}
|
|
data = json.dumps(payload).encode('utf-8')
|
|
|
|
try:
|
|
req = urllib.request.Request(api_url, data=data, headers=headers, method='POST')
|
|
with urllib.request.urlopen(req) as response:
|
|
resp_body = response.read().decode('utf-8')
|
|
json_resp = json.loads(resp_body)
|
|
|
|
if 'error' in json_resp:
|
|
error = json_resp['error']
|
|
print(f"[IMPORT FAILED] API Error {error.get('code')}: {error.get('message')}")
|
|
if 'data' in error:
|
|
print(f"Details: {error['data']}")
|
|
return False
|
|
elif 'result' in json_resp and json_resp['result'] is True:
|
|
print(f"[SUCCESS] Template imported successfully!")
|
|
return True
|
|
else:
|
|
# Unexpected success response format, but likely success if no error
|
|
print(f"[SUCCESS] Template imported (Response: {json_resp.get('result')})")
|
|
return True
|
|
|
|
except urllib.error.HTTPError as e:
|
|
print(f"[IMPORT FAILED] HTTP Error: {e.code} - {e.reason}")
|
|
return False
|
|
except urllib.error.URLError as e:
|
|
print(f"[IMPORT FAILED] Connection Error: {e.reason}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[IMPORT FAILED] Unexpected error: {e}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Validate and optionally import Zabbix templates.")
|
|
parser.add_argument("file", help="Path to the YAML template file")
|
|
parser.add_argument("--url", help="Zabbix Server URL (e.g., https://zabbix.example.com)", default=None)
|
|
parser.add_argument("--token", help="Zabbix API Token", default=None)
|
|
parser.add_argument("--import-template", action="store_true", help="Attempt to import if validation passes")
|
|
|
|
args = parser.parse_args()
|
|
|
|
file_path = args.file
|
|
|
|
# 1. Validate the file itself
|
|
if not validate_yaml(file_path):
|
|
sys.exit(1)
|
|
|
|
# 2. Check for collisions in the same directory (Gold Edition Suite)
|
|
directory = os.path.dirname(os.path.abspath(file_path))
|
|
if not check_cross_template_collisions(file_path, directory):
|
|
sys.exit(1)
|
|
|
|
# 3. Import if requested
|
|
if args.import_template:
|
|
if not args.url or not args.token:
|
|
print("\n[ERROR] To import, you must provide --url and --token.")
|
|
sys.exit(1)
|
|
|
|
if not zabbix_import(file_path, args.url, args.token):
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|