831 lines
32 KiB
Python
831 lines
32 KiB
Python
import yaml
|
|
import sys
|
|
import os
|
|
import re
|
|
import argparse
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
# Fix for Windows console UTF-8 output (emojis)
|
|
if sys.stdout.encoding != 'utf-8':
|
|
try:
|
|
sys.stdout.reconfigure(encoding='utf-8')
|
|
except AttributeError:
|
|
pass # Python < 3.7 fallback
|
|
|
|
# ============================================================================
|
|
# VALIDATION FUNCTIONS - Arthur Gold Edition
|
|
# ============================================================================
|
|
|
|
def is_valid_uuidv4(uuid_str):
|
|
"""
|
|
Validate if a string is a proper UUIDv4 format.
|
|
UUIDv4 rules (32 hex chars, no dashes):
|
|
- Position 13 (0-indexed 12) must be '4' (version)
|
|
- Position 17 (0-indexed 16) must be '8', '9', 'a', or 'b' (variant)
|
|
"""
|
|
if not isinstance(uuid_str, str):
|
|
return False, "UUID is not a string"
|
|
|
|
# Remove dashes if present and lowercase
|
|
clean = uuid_str.replace('-', '').lower()
|
|
|
|
if len(clean) != 32:
|
|
return False, f"UUID has {len(clean)} chars (expected 32)"
|
|
|
|
if not re.match(r'^[0-9a-f]{32}$', clean):
|
|
return False, "UUID contains non-hex characters"
|
|
|
|
# Check version (position 12, 0-indexed)
|
|
if clean[12] != '4':
|
|
return False, f"UUID version is '{clean[12]}' (expected '4' at position 13)"
|
|
|
|
# Check variant (position 16, 0-indexed)
|
|
if clean[16] not in '89ab':
|
|
return False, f"UUID variant is '{clean[16]}' (expected '8/9/a/b' at position 17)"
|
|
|
|
return True, "Valid UUIDv4"
|
|
|
|
|
|
def validate_uuids_format(content):
|
|
"""
|
|
Recursively check all UUIDs in the template for valid UUIDv4 format.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_node(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
uuid = node['uuid']
|
|
is_valid, msg = is_valid_uuidv4(uuid)
|
|
if not is_valid:
|
|
errors.append(f"[INVALID UUID] {uuid} at {path}: {msg}")
|
|
|
|
for k, v in node.items():
|
|
check_node(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_node(item, f"{path}[{i}]")
|
|
|
|
check_node(content)
|
|
return errors
|
|
|
|
|
|
def collect_item_keys(content):
|
|
"""
|
|
Collect all item and item_prototype keys from the template.
|
|
Used for validating graph references.
|
|
"""
|
|
keys = set()
|
|
|
|
def extract(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Collect from items and item_prototypes
|
|
if 'key' in node and ('type' in node or 'delay' in node or 'value_type' in node):
|
|
keys.add(node['key'])
|
|
|
|
for k, v in node.items():
|
|
extract(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]")
|
|
|
|
extract(content)
|
|
return keys
|
|
|
|
|
|
def collect_graph_names(content):
|
|
"""
|
|
Collect all graph and graph_prototype names from the template.
|
|
Used for validating dashboard references.
|
|
"""
|
|
names = set()
|
|
|
|
def extract(node, path="root", in_graphs=False):
|
|
if isinstance(node, dict):
|
|
# Check if we're in a graphs section
|
|
if 'name' in node and (in_graphs or 'graph_items' in node):
|
|
names.add(node['name'])
|
|
|
|
for k, v in node.items():
|
|
is_graph_section = k in ('graphs', 'graph_prototypes')
|
|
extract(v, f"{path}.{k}", in_graphs or is_graph_section)
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]", in_graphs)
|
|
|
|
extract(content)
|
|
return names
|
|
|
|
|
|
def validate_graph_references(content, item_keys):
|
|
"""
|
|
Check if all items referenced in graphs actually exist.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_graphs(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check graph_items for item references
|
|
if 'graph_items' in node:
|
|
graph_name = node.get('name', 'Unknown')
|
|
for i, graph_item in enumerate(node['graph_items']):
|
|
if 'item' in graph_item and 'key' in graph_item['item']:
|
|
ref_key = graph_item['item']['key']
|
|
# Handle LLD macros - extract base pattern
|
|
base_key = re.sub(r'\[.*\]', '[*]', ref_key)
|
|
|
|
# Check if key exists (exact match or pattern match)
|
|
found = False
|
|
for existing_key in item_keys:
|
|
existing_base = re.sub(r'\[.*\]', '[*]', existing_key)
|
|
if existing_base == base_key or existing_key == ref_key:
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
errors.append(f"[MISSING ITEM REF] Graph '{graph_name}' references non-existent item '{ref_key}'")
|
|
|
|
for k, v in node.items():
|
|
check_graphs(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_graphs(item, f"{path}[{i}]")
|
|
|
|
check_graphs(content)
|
|
return errors
|
|
|
|
|
|
def validate_dashboard_references(content, graph_names):
|
|
"""
|
|
Check if all graphs referenced in dashboards actually exist.
|
|
Returns list of errors.
|
|
"""
|
|
errors = []
|
|
|
|
def check_dashboards(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check for dashboard widget graph references
|
|
if 'fields' in node and isinstance(node['fields'], list):
|
|
widget_name = node.get('name', 'Unknown widget')
|
|
for field in node['fields']:
|
|
if isinstance(field, dict):
|
|
if field.get('name') == 'graphid.0' and 'value' in field:
|
|
value = field['value']
|
|
if isinstance(value, dict) and 'name' in value:
|
|
ref_name = value['name']
|
|
if ref_name not in graph_names:
|
|
errors.append(f"[MISSING GRAPH REF] Dashboard widget references non-existent graph '{ref_name}'")
|
|
|
|
for k, v in node.items():
|
|
check_dashboards(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_dashboards(item, f"{path}[{i}]")
|
|
|
|
check_dashboards(content)
|
|
return errors
|
|
|
|
|
|
def validate_calculated_formulas(content):
|
|
"""
|
|
Check for deprecated or invalid functions in Calculated Items (Zabbix 7.0 compatibility).
|
|
e.g. 'stddev' is not valid, should be 'stddevpop' or 'stddevsamp'.
|
|
"""
|
|
errors = []
|
|
|
|
def check_formulas(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check if it's a calculated item
|
|
if node.get('type') == 'CALCULATED' and 'params' in node:
|
|
params = node['params']
|
|
|
|
# Check for 'stddev(' -> Deprecated/Invalid in 6.0+
|
|
if 'stddev(' in params:
|
|
errors.append(f"[INVALID FORMULA] 'stddev' function usage at {path}. Use 'stddevpop' (population) or 'stddevsamp' (sample) instead.")
|
|
|
|
for k, v in node.items():
|
|
check_formulas(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_formulas(item, f"{path}[{i}]")
|
|
|
|
check_formulas(content)
|
|
return errors
|
|
|
|
|
|
def validate_nested_structure(content):
|
|
"""
|
|
Check for bad nesting, specifically Items nested inside Trigger lists.
|
|
This happens when indentation is wrong in YAML.
|
|
"""
|
|
errors = []
|
|
|
|
def check_structure(node, path="root", parent_context=None):
|
|
if isinstance(node, dict):
|
|
# Identify current context based on keys
|
|
is_trigger = 'expression' in node and 'priority' in node
|
|
is_item = 'key' in node and 'type' in node
|
|
|
|
if parent_context == 'triggers':
|
|
# If we are inside a 'triggers' list, we should ONLY see triggers.
|
|
# If we see an item (has 'key' and 'type'), it's a nesting error.
|
|
if is_item and not is_trigger: # Triggers don't have 'type' usually, but items do.
|
|
# Extra check: 'key' is definitely an item property, not trigger
|
|
if 'key' in node:
|
|
errors.append(f"[BAD NESTING] Found Item with key '{node.get('key')}' nested inside 'triggers' list at {path}. Check indentation.")
|
|
|
|
# Recurse
|
|
for k, v in node.items():
|
|
# Pass context if we are entering a relevant list
|
|
next_context = None
|
|
if k == 'triggers':
|
|
next_context = 'triggers'
|
|
elif k == 'items':
|
|
next_context = 'items'
|
|
|
|
check_structure(v, f"{path}.{k}", next_context)
|
|
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_structure(item, f"{path}[{i}]", parent_context)
|
|
|
|
check_structure(content)
|
|
return errors
|
|
|
|
|
|
def validate_zabbix_7_compliance(content):
|
|
"""
|
|
Check for Zabbix 7.0 specific issues and common YAML gotchas.
|
|
- Boolean keys (no: 1 -> False: 1)
|
|
- REGEXP vs MATCHES_REGEX
|
|
- Forbidden tags (e.g. 'no' in http steps)
|
|
- Hostname mismatches in triggers
|
|
"""
|
|
errors = []
|
|
warnings = []
|
|
|
|
# 1. Get Template Name for Hostname Mismatch Check
|
|
template_name = None
|
|
if isinstance(content, dict) and 'zabbix_export' in content:
|
|
if 'templates' in content and isinstance(content['templates'], list):
|
|
if len(content['templates']) > 0:
|
|
template_name = content['templates'][0].get('name')
|
|
|
|
def check_node(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check 1: Boolean Keys
|
|
# PyYAML loads unquoted 'no', 'on', 'off' as booleans if unsafe, safe_load handles standard yaml 1.1 booleans
|
|
# But we are checking the KEYS of the dict.
|
|
# Convert keys to list to avoid runtime error during iteration
|
|
for k, v in node.items():
|
|
if isinstance(k, bool):
|
|
errors.append(f"[INVALID YAML KEY] Found boolean key '{k}' at {path}. Quote it! (e.g. 'no': 1)")
|
|
|
|
# Check 3: Forbidden tags in httptests
|
|
if path.endswith("steps") and isinstance(node, list) is False: # We are inside a step dict
|
|
if k == 'no':
|
|
errors.append(f"[FORBIDDEN TAG] Tag 'no' found at {path}. Zabbix 7.0 determines order by list position.")
|
|
|
|
# Check 2: Deprecated Operators
|
|
if k == 'operator' and v == 'REGEXP':
|
|
errors.append(f"[DEPRECATED CONSTANT] 'REGEXP' at {path}. Use 'MATCHES_REGEX' for Zabbix 7.0+.")
|
|
|
|
# Check 4: Hostname Mismatch in Triggers
|
|
if k == 'expression' and isinstance(v, str) and template_name:
|
|
# Look for {HOST.HOST} usage or literal matches
|
|
# Only flagged if we find a LITERAL string that looks like a hostname but isn't the current template name
|
|
# AND it's not a standard macro
|
|
|
|
# Regex to find /Host Name/Key
|
|
# Matches: /Hostname/key
|
|
matches = re.findall(r'\/([^\/]+)\/', v)
|
|
for match in matches:
|
|
if match != template_name and match != '{HOST.HOST}' and '$' not in match:
|
|
# Heuristic: If it looks like "Microsoft Exchange..." but isn't exact match
|
|
if "Microsoft" in match or "Exchange" in match or "Windows" in match:
|
|
warnings.append(f"[HOSTNAME MISMATCH] Trigger at {path} references '{match}' but template is '{template_name}'.")
|
|
|
|
# Check 5: Empty parameters in functions (e.g. forecast(...,,15m))
|
|
if ',,' in v:
|
|
warnings.append(f"[POTENTIAL SYNTAX ERROR] Found empty parameter ',,' in expression at {path}. Zabbix sometimes rejects this. Use explicit '0' or '0s'.")
|
|
if 'avg(' in v and re.search(r'avg\(.*,\d+[smhdw],\d+[smhdw]\)', v):
|
|
warnings.append(f"[POTENTIAL SYNTAX ERROR] Found 'avg' with comma separation (1h,1d) at {path}. Zabbix 7.0 uses colon (1h:now-1d).")
|
|
|
|
for k, v in node.items():
|
|
check_node(v, f"{path}.{k}")
|
|
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_node(item, f"{path}[{i}]")
|
|
|
|
check_node(content)
|
|
check_node(content)
|
|
return errors, warnings
|
|
|
|
|
|
def validate_reserved_keys(content):
|
|
"""
|
|
Check for keys that are known to cause collisions or are reserved/monopolistic.
|
|
e.g. service.discovery, service.info with generic params.
|
|
"""
|
|
warnings = []
|
|
|
|
def check_keys(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'key' in node:
|
|
key = node['key']
|
|
# Check for service.discovery (without unique params, it collides generally)
|
|
if key == 'service.discovery':
|
|
warnings.append(f"[POTENTIAL COLLISION] Key 'service.discovery' found at {path}. This often conflicts with Windows OS templates. Consider 'wmi.getall'.")
|
|
|
|
if str(key).startswith('service.info'):
|
|
warnings.append(f"[POTENTIAL COLLISION] Key '{key}' found at {path}. Ensure this does not conflict with standard Windows templates. Consider 'wmi.get'.")
|
|
|
|
for k, v in node.items():
|
|
check_keys(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_keys(item, f"{path}[{i}]")
|
|
|
|
check_keys(content)
|
|
return warnings
|
|
|
|
|
|
def validate_active_agent_compliance(content):
|
|
"""
|
|
If template name suggests 'Active', ensure items have type: ZABBIX_ACTIVE.
|
|
"""
|
|
errors = []
|
|
template_name = ""
|
|
|
|
# Extract template name
|
|
if isinstance(content, dict) and 'zabbix_export' in content:
|
|
if 'templates' in content and isinstance(content['templates'], list) and len(content['templates']) > 0:
|
|
template_name = content['templates'][0].get('name', '')
|
|
|
|
# Check case-insensitive 'active' in template name
|
|
if 'active' not in template_name.lower():
|
|
return errors # Not an active agent template candidate, skip check
|
|
|
|
def check_items(node, path="root"):
|
|
if isinstance(node, dict):
|
|
# Check if it looks like an item/prototype (has 'key' and 'name' and 'uuid')
|
|
# Avoiding false positives on non-item dicts
|
|
if 'key' in node and 'name' in node and 'type' in node:
|
|
item_type = node.get('type')
|
|
key = node['key']
|
|
|
|
# List of types that are NOT Agent checks (so don't need ZABBIX_ACTIVE)
|
|
# ZABBIX_PASSIVE is omitted because that's exactly what we want to FLAG if found.
|
|
allowed_non_agent_types = [
|
|
'ZABBIX_ACTIVE',
|
|
'SIMPLE', 'SNMP', 'INTERNAL', 'TRAP', 'EXTERNAL',
|
|
'DB_MONITOR', 'HTTP_AGENT', 'SSH', 'TELNET', 'CALCULATED', 'JMX', 'DEPENDENT', 'SCRIPT', 'BROWSER'
|
|
]
|
|
|
|
# If type is invalid or missing (None), or explicitly Passive (ZABBIX_PASSIVE/0)
|
|
if item_type not in allowed_non_agent_types:
|
|
# If it turns out to be Passive (which is default export if missing, or explicit)
|
|
errors.append(f"[ACTIVE AGENT MISMATCH] Item '{node['name']}' ({key}) at {path} is not ZABBIX_ACTIVE (found: '{item_type}'). Template '{template_name}' is marked Active.")
|
|
|
|
# Recurse
|
|
for k, v in node.items():
|
|
check_items(v, f"{path}.{k}")
|
|
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_items(item, f"{path}[{i}]")
|
|
|
|
check_items(content)
|
|
return errors
|
|
|
|
|
|
|
|
def check_duplicate_yaml_keys(file_path):
|
|
"""
|
|
Check for duplicate YAML keys at the same level (e.g., two 'macros:' sections).
|
|
This is a Zabbix import killer - YAML parsers silently merge, but Zabbix rejects.
|
|
Uses regex-based scanning since yaml.safe_load silently handles duplicates.
|
|
Returns list of errors.
|
|
|
|
Note: Only checks for duplicates at template-level (indent 4) since nested
|
|
keys like 'triggers:' can legitimately appear multiple times in different
|
|
item contexts.
|
|
"""
|
|
errors = []
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
errors.append(f"[FILE ERROR] Could not read file: {e}")
|
|
return errors
|
|
|
|
# Track keys at template level (indent 4) only
|
|
# Key: key_name -> list of line numbers
|
|
template_level_keys = {}
|
|
|
|
# Critical keys that should never be duplicated at template level
|
|
critical_keys = {'macros', 'items', 'discovery_rules', 'dashboards',
|
|
'graphs', 'valuemaps', 'value_maps'}
|
|
|
|
for line_num, line in enumerate(lines, 1):
|
|
# Skip comments and empty lines
|
|
stripped = line.lstrip()
|
|
if not stripped or stripped.startswith('#'):
|
|
continue
|
|
|
|
# Calculate indentation (spaces before content)
|
|
indent = len(line) - len(line.lstrip())
|
|
|
|
# Only check template-level keys (indent 4 for " macros:")
|
|
if indent != 4:
|
|
continue
|
|
|
|
# Match YAML key pattern: "key:" or "key: value"
|
|
import re
|
|
match = re.match(r'^(\s*)([a-zA-Z_][a-zA-Z0-9_]*):', line)
|
|
if match:
|
|
key_name = match.group(2)
|
|
|
|
# Only track critical keys
|
|
if key_name in critical_keys:
|
|
if key_name not in template_level_keys:
|
|
template_level_keys[key_name] = []
|
|
template_level_keys[key_name].append(line_num)
|
|
|
|
# Report duplicates
|
|
for key_name, line_numbers in template_level_keys.items():
|
|
if len(line_numbers) > 1:
|
|
lines_str = ', '.join(map(str, line_numbers))
|
|
errors.append(f"[DUPLICATE KEY] '{key_name}:' appears {len(line_numbers)} times at template level (lines: {lines_str})")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_yaml(file_path):
|
|
print(f"Validating {file_path}...")
|
|
print("=" * 60)
|
|
|
|
# ========== 0. Check for duplicate YAML keys (pre-parse) ==========
|
|
print("\n[0/5] Checking for duplicate YAML keys...")
|
|
duplicate_key_errors = check_duplicate_yaml_keys(file_path)
|
|
if duplicate_key_errors:
|
|
print(f" ❌ Found {len(duplicate_key_errors)} duplicate key issues")
|
|
for err in duplicate_key_errors:
|
|
print(f" ❌ {err}")
|
|
return False
|
|
else:
|
|
print(" ✅ No duplicate YAML keys detected")
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f"[FATAL] Invalid YAML syntax: {e}")
|
|
return False
|
|
|
|
if not content:
|
|
print("[FATAL] Empty file")
|
|
return False
|
|
|
|
all_errors = []
|
|
warnings = []
|
|
uuids = set()
|
|
|
|
# ========== 1. UUID Format Validation (UUIDv4) ==========
|
|
print("\n[1/4] Checking UUID format (UUIDv4 compliance)...")
|
|
uuid_errors = validate_uuids_format(content)
|
|
if uuid_errors:
|
|
all_errors.extend(uuid_errors)
|
|
print(f" ❌ Found {len(uuid_errors)} invalid UUIDs")
|
|
else:
|
|
print(" ✅ All UUIDs are valid UUIDv4 format")
|
|
|
|
# ========== 2. UUID Duplicates Check ==========
|
|
print("\n[2/4] Checking for duplicate UUIDs...")
|
|
def check_uuid(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
uuid = node['uuid']
|
|
if uuid in uuids:
|
|
warnings.append(f"[DUPLICATE UUID] {uuid} found at {path}")
|
|
else:
|
|
uuids.add(uuid)
|
|
|
|
# Check for English descriptions (Basic Heuristic)
|
|
if 'description' in node:
|
|
desc = node['description']
|
|
if isinstance(desc, str):
|
|
if re.search(r'\bThe\b|\bThis\b|\bValue\b', desc):
|
|
warnings.append(f"[POTENTIAL ENGLISH] at {path}: '{desc[:40]}...'")
|
|
|
|
for k, v in node.items():
|
|
check_uuid(v, f"{path}.{k}")
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
check_uuid(item, f"{path}[{i}]")
|
|
|
|
check_uuid(content)
|
|
dup_warnings = [w for w in warnings if 'DUPLICATE' in w]
|
|
if dup_warnings:
|
|
print(f" ⚠️ Found {len(dup_warnings)} duplicate UUIDs (warning only)")
|
|
else:
|
|
print(" ✅ No duplicate UUIDs")
|
|
|
|
# ========== 3. Graph Item References ==========
|
|
print("\n[3/4] Checking graph item references...")
|
|
item_keys = collect_item_keys(content)
|
|
graph_ref_errors = validate_graph_references(content, item_keys)
|
|
if graph_ref_errors:
|
|
all_errors.extend(graph_ref_errors)
|
|
print(f" ❌ Found {len(graph_ref_errors)} broken item references in graphs")
|
|
else:
|
|
print(f" ✅ All graph item references are valid ({len(item_keys)} items found)")
|
|
|
|
# ========== 4. Dashboard Graph References ==========
|
|
print("\n[4/6] Checking dashboard graph references...")
|
|
graph_names = collect_graph_names(content)
|
|
dashboard_ref_errors = validate_dashboard_references(content, graph_names)
|
|
if dashboard_ref_errors:
|
|
all_errors.extend(dashboard_ref_errors)
|
|
print(f" ❌ Found {len(dashboard_ref_errors)} broken graph references in dashboards")
|
|
else:
|
|
print(f" ✅ All dashboard graph references are valid ({len(graph_names)} graphs found)")
|
|
|
|
# ========== 5. Calculated Formulas Check ==========
|
|
print("\n[5/6] Checking calculated item formulas...")
|
|
formula_errors = validate_calculated_formulas(content)
|
|
if formula_errors:
|
|
all_errors.extend(formula_errors)
|
|
print(f" ❌ Found {len(formula_errors)} invalid formulas")
|
|
else:
|
|
print(" ✅ Formula syntax looks compatible")
|
|
|
|
# ========== 6. Nested Structure Check ==========
|
|
print("\n[6/6] Checking for structure nesting errors...")
|
|
nesting_errors = validate_nested_structure(content)
|
|
if nesting_errors:
|
|
all_errors.extend(nesting_errors)
|
|
print(f" ❌ Found {len(nesting_errors)} nesting errors (items inside triggers, etc)")
|
|
else:
|
|
print(" ✅ Structure nesting looks correct")
|
|
|
|
# ========== 7. Zabbix 7.0 Compliance Check ==========
|
|
print("\n[7/7] Checking for Zabbix 7.0 Compliance & Common Errors...")
|
|
z7_errors, z7_warnings = validate_zabbix_7_compliance(content)
|
|
|
|
if z7_errors:
|
|
all_errors.extend(z7_errors)
|
|
print(f" ❌ Found {len(z7_errors)} compliance errors")
|
|
else:
|
|
print(" ✅ Zabbix 7.0 compliance looks good")
|
|
|
|
if z7_warnings:
|
|
warnings.extend(z7_warnings)
|
|
|
|
# ========== 8. Reserved Keys/Collision Check ==========
|
|
print("\n[8/9] Checking for Collision-Prone Keys...")
|
|
collision_warnings = validate_reserved_keys(content)
|
|
if collision_warnings:
|
|
warnings.extend(collision_warnings)
|
|
print(f" ⚠️ Found {len(collision_warnings)} potential key collisions")
|
|
else:
|
|
print(" ✅ Key safety check passed")
|
|
|
|
# ========== 9. Active Agent Compliance ==========
|
|
print("\n[9/9] Checking Active Agent Compliance...")
|
|
active_errors = validate_active_agent_compliance(content)
|
|
if active_errors:
|
|
all_errors.extend(active_errors)
|
|
print(f" ❌ Found {len(active_errors)} items missing ZABBIX_ACTIVE type")
|
|
else:
|
|
print(" ✅ Active Agent compliance check passed")
|
|
|
|
# ========== Summary ==========
|
|
print("\n" + "=" * 60)
|
|
|
|
if warnings:
|
|
eng_warnings = [w for w in warnings if 'ENGLISH' in w]
|
|
if eng_warnings:
|
|
print(f"\n[WARNINGS] {len(eng_warnings)} potential English descriptions found (Arthur Audit)")
|
|
for w in eng_warnings[:5]: # Show max 5
|
|
print(f" • {w}")
|
|
if len(eng_warnings) > 5:
|
|
print(f" ... and {len(eng_warnings) - 5} more")
|
|
|
|
if all_errors:
|
|
print(f"\n[ERRORS FOUND] {len(all_errors)} critical issues:")
|
|
for e in all_errors:
|
|
print(f" ❌ {e}")
|
|
return False
|
|
else:
|
|
print("\n[SUCCESS] YAML Structure & UUIDs are VALID. ✅")
|
|
return True
|
|
|
|
|
|
|
|
def collect_uuids_from_file(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = yaml.safe_load(f)
|
|
if not content:
|
|
return set()
|
|
|
|
local_uuids = set()
|
|
def extract(node, path="root"):
|
|
if isinstance(node, dict):
|
|
if 'uuid' in node:
|
|
# Ignore UUIDs in template_groups and host_groups (they SHOULD be shared)
|
|
if "template_groups" not in path and "host_groups" not in path:
|
|
local_uuids.add(node['uuid'])
|
|
|
|
for k, v in node.items():
|
|
extract(v, f"{path}.{k}")
|
|
|
|
elif isinstance(node, list):
|
|
for i, item in enumerate(node):
|
|
extract(item, f"{path}[{i}]")
|
|
extract(content)
|
|
return local_uuids
|
|
except Exception as e:
|
|
print(f"[WARN] Could not parse {file_path} for collision check: {e}")
|
|
return set()
|
|
|
|
def check_cross_template_collisions(target_file, search_dir):
|
|
print(f"\n[INFO] Checking for cross-template UUID collisions in: {search_dir}")
|
|
target_uuids = collect_uuids_from_file(target_file)
|
|
if not target_uuids:
|
|
return True # Target file is empty or invalid, handled by main validation
|
|
|
|
collisions = []
|
|
|
|
for root, _, files in os.walk(search_dir):
|
|
for file in files:
|
|
if file.endswith('.yaml') or file.endswith('.xml'):
|
|
full_path = os.path.join(root, file)
|
|
if os.path.abspath(full_path) == os.path.abspath(target_file):
|
|
continue # Skip self
|
|
|
|
other_uuids = collect_uuids_from_file(full_path)
|
|
intersection = target_uuids.intersection(other_uuids)
|
|
|
|
if intersection:
|
|
for uuid in intersection:
|
|
collisions.append(f"[COLLISION] UUID {uuid} exists in both '{os.path.basename(target_file)}' and '{file}'")
|
|
|
|
if collisions:
|
|
print("\n[CROSS-TEMPLATE COLLISIONS DETECTED]:")
|
|
for c in collisions:
|
|
print(c)
|
|
return False
|
|
else:
|
|
print("[SUCCESS] No cross-template UUID collisions found.")
|
|
return True
|
|
|
|
def zabbix_import(file_path, url, token):
|
|
print(f"\n[INFO] Attempting to import {os.path.basename(file_path)} to Zabbix at {url}...")
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
yaml_content = f.read()
|
|
except Exception as e:
|
|
print(f"[ERROR] Could not read file for import: {e}")
|
|
return False
|
|
|
|
# Construct the JSON-RPC request for Zabbix 6.0/7.0
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "configuration.import",
|
|
"params": {
|
|
"format": "yaml",
|
|
"source": yaml_content,
|
|
"rules": {
|
|
"host_groups": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"template_groups": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templates": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"valueMaps": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templateDashboards": {
|
|
"createMissing": True,
|
|
"updateExisting": True
|
|
},
|
|
"templateLinkage": {
|
|
"createMissing": True, # Usually we want to link if missing
|
|
"deleteMissing": False
|
|
},
|
|
"items": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"discoveryRules": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"triggers": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"graphs": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
},
|
|
"httptests": {
|
|
"createMissing": True,
|
|
"updateExisting": True,
|
|
"deleteMissing": False
|
|
}
|
|
}
|
|
},
|
|
"id": 1
|
|
}
|
|
|
|
# Prepare request
|
|
api_url = url.rstrip('/') + "/api_jsonrpc.php"
|
|
headers = {
|
|
'Content-Type': 'application/json-rpc',
|
|
'Authorization': f'Bearer {token}'
|
|
}
|
|
data = json.dumps(payload).encode('utf-8')
|
|
|
|
try:
|
|
req = urllib.request.Request(api_url, data=data, headers=headers, method='POST')
|
|
with urllib.request.urlopen(req) as response:
|
|
resp_body = response.read().decode('utf-8')
|
|
json_resp = json.loads(resp_body)
|
|
|
|
if 'error' in json_resp:
|
|
error = json_resp['error']
|
|
print(f"[IMPORT FAILED] API Error {error.get('code')}: {error.get('message')}")
|
|
if 'data' in error:
|
|
print(f"Details: {error['data']}")
|
|
return False
|
|
elif 'result' in json_resp and json_resp['result'] is True:
|
|
print(f"[SUCCESS] Template imported successfully!")
|
|
return True
|
|
else:
|
|
# Unexpected success response format, but likely success if no error
|
|
print(f"[SUCCESS] Template imported (Response: {json_resp.get('result')})")
|
|
return True
|
|
|
|
except urllib.error.HTTPError as e:
|
|
print(f"[IMPORT FAILED] HTTP Error: {e.code} - {e.reason}")
|
|
return False
|
|
except urllib.error.URLError as e:
|
|
print(f"[IMPORT FAILED] Connection Error: {e.reason}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[IMPORT FAILED] Unexpected error: {e}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Validate and optionally import Zabbix templates.")
|
|
parser.add_argument("file", help="Path to the YAML template file")
|
|
parser.add_argument("--url", help="Zabbix Server URL (e.g., https://zabbix.example.com)", default=None)
|
|
parser.add_argument("--token", help="Zabbix API Token", default=None)
|
|
parser.add_argument("--import-template", action="store_true", help="Attempt to import if validation passes")
|
|
|
|
args = parser.parse_args()
|
|
|
|
file_path = args.file
|
|
|
|
# 1. Validate the file itself
|
|
if not validate_yaml(file_path):
|
|
sys.exit(1)
|
|
|
|
# 2. Check for collisions in the same directory (Gold Edition Suite)
|
|
directory = os.path.dirname(os.path.abspath(file_path))
|
|
if not check_cross_template_collisions(file_path, directory):
|
|
sys.exit(1)
|
|
|
|
# 3. Import if requested
|
|
if args.import_template:
|
|
if not args.url or not args.token:
|
|
print("\n[ERROR] To import, you must provide --url and --token.")
|
|
sys.exit(1)
|
|
|
|
if not zabbix_import(file_path, args.url, args.token):
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|