import yaml import sys import os import re import argparse import json import urllib.request import urllib.error # Fix for Windows console UTF-8 output (emojis) if sys.stdout.encoding != 'utf-8': try: sys.stdout.reconfigure(encoding='utf-8') except AttributeError: pass # Python < 3.7 fallback # ============================================================================ # VALIDATION FUNCTIONS - Arthur Gold Edition # ============================================================================ def is_valid_uuidv4(uuid_str): """ Validate if a string is a proper UUIDv4 format. UUIDv4 rules (32 hex chars, no dashes): - Position 13 (0-indexed 12) must be '4' (version) - Position 17 (0-indexed 16) must be '8', '9', 'a', or 'b' (variant) """ if not isinstance(uuid_str, str): return False, "UUID is not a string" # Remove dashes if present and lowercase clean = uuid_str.replace('-', '').lower() if len(clean) != 32: return False, f"UUID has {len(clean)} chars (expected 32)" if not re.match(r'^[0-9a-f]{32}$', clean): return False, "UUID contains non-hex characters" # Check version (position 12, 0-indexed) if clean[12] != '4': return False, f"UUID version is '{clean[12]}' (expected '4' at position 13)" # Check variant (position 16, 0-indexed) if clean[16] not in '89ab': return False, f"UUID variant is '{clean[16]}' (expected '8/9/a/b' at position 17)" return True, "Valid UUIDv4" def validate_uuids_format(content): """ Recursively check all UUIDs in the template for valid UUIDv4 format. Returns list of errors. """ errors = [] def check_node(node, path="root"): if isinstance(node, dict): if 'uuid' in node: uuid = node['uuid'] is_valid, msg = is_valid_uuidv4(uuid) if not is_valid: errors.append(f"[INVALID UUID] {uuid} at {path}: {msg}") for k, v in node.items(): check_node(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_node(item, f"{path}[{i}]") check_node(content) return errors def collect_item_keys(content): """ Collect all item and item_prototype keys from the template. Used for validating graph references. """ keys = set() def extract(node, path="root"): if isinstance(node, dict): # Collect from items and item_prototypes if 'key' in node and ('type' in node or 'delay' in node or 'value_type' in node): keys.add(node['key']) for k, v in node.items(): extract(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): extract(item, f"{path}[{i}]") extract(content) return keys def collect_graph_names(content): """ Collect all graph and graph_prototype names from the template. Used for validating dashboard references. """ names = set() def extract(node, path="root", in_graphs=False): if isinstance(node, dict): # Check if we're in a graphs section if 'name' in node and (in_graphs or 'graph_items' in node): names.add(node['name']) for k, v in node.items(): is_graph_section = k in ('graphs', 'graph_prototypes') extract(v, f"{path}.{k}", in_graphs or is_graph_section) elif isinstance(node, list): for i, item in enumerate(node): extract(item, f"{path}[{i}]", in_graphs) extract(content) return names def validate_graph_references(content, item_keys): """ Check if all items referenced in graphs actually exist. Returns list of errors. """ errors = [] def check_graphs(node, path="root"): if isinstance(node, dict): # Check graph_items for item references if 'graph_items' in node: graph_name = node.get('name', 'Unknown') for i, graph_item in enumerate(node['graph_items']): if 'item' in graph_item and 'key' in graph_item['item']: ref_key = graph_item['item']['key'] # Handle LLD macros - extract base pattern base_key = re.sub(r'\[.*\]', '[*]', ref_key) # Check if key exists (exact match or pattern match) found = False for existing_key in item_keys: existing_base = re.sub(r'\[.*\]', '[*]', existing_key) if existing_base == base_key or existing_key == ref_key: found = True break if not found: errors.append(f"[MISSING ITEM REF] Graph '{graph_name}' references non-existent item '{ref_key}'") for k, v in node.items(): check_graphs(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_graphs(item, f"{path}[{i}]") check_graphs(content) return errors def validate_dashboard_references(content, graph_names): """ Check if all graphs referenced in dashboards actually exist. Returns list of errors. """ errors = [] def check_dashboards(node, path="root"): if isinstance(node, dict): # Check for dashboard widget graph references if 'fields' in node and isinstance(node['fields'], list): widget_name = node.get('name', 'Unknown widget') for field in node['fields']: if isinstance(field, dict): if field.get('name') == 'graphid.0' and 'value' in field: value = field['value'] if isinstance(value, dict) and 'name' in value: ref_name = value['name'] if ref_name not in graph_names: errors.append(f"[MISSING GRAPH REF] Dashboard widget references non-existent graph '{ref_name}'") for k, v in node.items(): check_dashboards(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_dashboards(item, f"{path}[{i}]") check_dashboards(content) return errors def validate_calculated_formulas(content): """ Check for deprecated or invalid functions in Calculated Items (Zabbix 7.0 compatibility). e.g. 'stddev' is not valid, should be 'stddevpop' or 'stddevsamp'. """ errors = [] def check_formulas(node, path="root"): if isinstance(node, dict): # Check if it's a calculated item if node.get('type') == 'CALCULATED' and 'params' in node: params = node['params'] # Check for 'stddev(' -> Deprecated/Invalid in 6.0+ if 'stddev(' in params: errors.append(f"[INVALID FORMULA] 'stddev' function usage at {path}. Use 'stddevpop' (population) or 'stddevsamp' (sample) instead.") for k, v in node.items(): check_formulas(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_formulas(item, f"{path}[{i}]") check_formulas(content) return errors def validate_nested_structure(content): """ Check for bad nesting, specifically Items nested inside Trigger lists. This happens when indentation is wrong in YAML. """ errors = [] def check_structure(node, path="root", parent_context=None): if isinstance(node, dict): # Identify current context based on keys is_trigger = 'expression' in node and 'priority' in node is_item = 'key' in node and 'type' in node if parent_context == 'triggers': # If we are inside a 'triggers' list, we should ONLY see triggers. # If we see an item (has 'key' and 'type'), it's a nesting error. if is_item and not is_trigger: # Triggers don't have 'type' usually, but items do. # Extra check: 'key' is definitely an item property, not trigger if 'key' in node: errors.append(f"[BAD NESTING] Found Item with key '{node.get('key')}' nested inside 'triggers' list at {path}. Check indentation.") # Recurse for k, v in node.items(): # Pass context if we are entering a relevant list next_context = None if k == 'triggers': next_context = 'triggers' elif k == 'items': next_context = 'items' check_structure(v, f"{path}.{k}", next_context) elif isinstance(node, list): for i, item in enumerate(node): check_structure(item, f"{path}[{i}]", parent_context) check_structure(content) return errors def validate_zabbix_7_compliance(content): """ Check for Zabbix 7.0 specific issues and common YAML gotchas. - Boolean keys (no: 1 -> False: 1) - REGEXP vs MATCHES_REGEX - Forbidden tags (e.g. 'no' in http steps) - Hostname mismatches in triggers """ errors = [] warnings = [] # 1. Get Template Name for Hostname Mismatch Check template_name = None if isinstance(content, dict) and 'zabbix_export' in content: if 'templates' in content and isinstance(content['templates'], list): if len(content['templates']) > 0: template_name = content['templates'][0].get('name') def check_node(node, path="root"): if isinstance(node, dict): # Check 1: Boolean Keys # PyYAML loads unquoted 'no', 'on', 'off' as booleans if unsafe, safe_load handles standard yaml 1.1 booleans # But we are checking the KEYS of the dict. # Convert keys to list to avoid runtime error during iteration for k, v in node.items(): if isinstance(k, bool): errors.append(f"[INVALID YAML KEY] Found boolean key '{k}' at {path}. Quote it! (e.g. 'no': 1)") # Check 3: Forbidden tags in httptests if path.endswith("steps") and isinstance(node, list) is False: # We are inside a step dict if k == 'no': errors.append(f"[FORBIDDEN TAG] Tag 'no' found at {path}. Zabbix 7.0 determines order by list position.") # Check 2: Deprecated Operators if k == 'operator' and v == 'REGEXP': errors.append(f"[DEPRECATED CONSTANT] 'REGEXP' at {path}. Use 'MATCHES_REGEX' for Zabbix 7.0+.") # Check 4: Hostname Mismatch in Triggers if k == 'expression' and isinstance(v, str) and template_name: # Look for {HOST.HOST} usage or literal matches # Only flagged if we find a LITERAL string that looks like a hostname but isn't the current template name # AND it's not a standard macro # Regex to find /Host Name/Key # Matches: /Hostname/key matches = re.findall(r'\/([^\/]+)\/', v) for match in matches: if match != template_name and match != '{HOST.HOST}' and '$' not in match: # Heuristic: If it looks like "Microsoft Exchange..." but isn't exact match if "Microsoft" in match or "Exchange" in match or "Windows" in match: warnings.append(f"[HOSTNAME MISMATCH] Trigger at {path} references '{match}' but template is '{template_name}'.") # Check 5: Empty parameters in functions (e.g. forecast(...,,15m)) if ',,' in v: warnings.append(f"[POTENTIAL SYNTAX ERROR] Found empty parameter ',,' in expression at {path}. Zabbix sometimes rejects this. Use explicit '0' or '0s'.") if 'avg(' in v and re.search(r'avg\(.*,\d+[smhdw],\d+[smhdw]\)', v): warnings.append(f"[POTENTIAL SYNTAX ERROR] Found 'avg' with comma separation (1h,1d) at {path}. Zabbix 7.0 uses colon (1h:now-1d).") for k, v in node.items(): check_node(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_node(item, f"{path}[{i}]") check_node(content) return errors, warnings def check_duplicate_yaml_keys(file_path): """ Check for duplicate YAML keys at the same level (e.g., two 'macros:' sections). This is a Zabbix import killer - YAML parsers silently merge, but Zabbix rejects. Uses regex-based scanning since yaml.safe_load silently handles duplicates. Returns list of errors. Note: Only checks for duplicates at template-level (indent 4) since nested keys like 'triggers:' can legitimately appear multiple times in different item contexts. """ errors = [] try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() except Exception as e: errors.append(f"[FILE ERROR] Could not read file: {e}") return errors # Track keys at template level (indent 4) only # Key: key_name -> list of line numbers template_level_keys = {} # Critical keys that should never be duplicated at template level critical_keys = {'macros', 'items', 'discovery_rules', 'dashboards', 'graphs', 'valuemaps', 'value_maps'} for line_num, line in enumerate(lines, 1): # Skip comments and empty lines stripped = line.lstrip() if not stripped or stripped.startswith('#'): continue # Calculate indentation (spaces before content) indent = len(line) - len(line.lstrip()) # Only check template-level keys (indent 4 for " macros:") if indent != 4: continue # Match YAML key pattern: "key:" or "key: value" import re match = re.match(r'^(\s*)([a-zA-Z_][a-zA-Z0-9_]*):', line) if match: key_name = match.group(2) # Only track critical keys if key_name in critical_keys: if key_name not in template_level_keys: template_level_keys[key_name] = [] template_level_keys[key_name].append(line_num) # Report duplicates for key_name, line_numbers in template_level_keys.items(): if len(line_numbers) > 1: lines_str = ', '.join(map(str, line_numbers)) errors.append(f"[DUPLICATE KEY] '{key_name}:' appears {len(line_numbers)} times at template level (lines: {lines_str})") return errors def validate_yaml(file_path): print(f"Validating {file_path}...") print("=" * 60) # ========== 0. Check for duplicate YAML keys (pre-parse) ========== print("\n[0/5] Checking for duplicate YAML keys...") duplicate_key_errors = check_duplicate_yaml_keys(file_path) if duplicate_key_errors: print(f" ❌ Found {len(duplicate_key_errors)} duplicate key issues") for err in duplicate_key_errors: print(f" ❌ {err}") return False else: print(" ✅ No duplicate YAML keys detected") try: with open(file_path, 'r', encoding='utf-8') as f: content = yaml.safe_load(f) except Exception as e: print(f"[FATAL] Invalid YAML syntax: {e}") return False if not content: print("[FATAL] Empty file") return False all_errors = [] warnings = [] uuids = set() # ========== 1. UUID Format Validation (UUIDv4) ========== print("\n[1/4] Checking UUID format (UUIDv4 compliance)...") uuid_errors = validate_uuids_format(content) if uuid_errors: all_errors.extend(uuid_errors) print(f" ❌ Found {len(uuid_errors)} invalid UUIDs") else: print(" ✅ All UUIDs are valid UUIDv4 format") # ========== 2. UUID Duplicates Check ========== print("\n[2/4] Checking for duplicate UUIDs...") def check_uuid(node, path="root"): if isinstance(node, dict): if 'uuid' in node: uuid = node['uuid'] if uuid in uuids: warnings.append(f"[DUPLICATE UUID] {uuid} found at {path}") else: uuids.add(uuid) # Check for English descriptions (Basic Heuristic) if 'description' in node: desc = node['description'] if isinstance(desc, str): if re.search(r'\bThe\b|\bThis\b|\bValue\b', desc): warnings.append(f"[POTENTIAL ENGLISH] at {path}: '{desc[:40]}...'") for k, v in node.items(): check_uuid(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): check_uuid(item, f"{path}[{i}]") check_uuid(content) dup_warnings = [w for w in warnings if 'DUPLICATE' in w] if dup_warnings: print(f" ⚠️ Found {len(dup_warnings)} duplicate UUIDs (warning only)") else: print(" ✅ No duplicate UUIDs") # ========== 3. Graph Item References ========== print("\n[3/4] Checking graph item references...") item_keys = collect_item_keys(content) graph_ref_errors = validate_graph_references(content, item_keys) if graph_ref_errors: all_errors.extend(graph_ref_errors) print(f" ❌ Found {len(graph_ref_errors)} broken item references in graphs") else: print(f" ✅ All graph item references are valid ({len(item_keys)} items found)") # ========== 4. Dashboard Graph References ========== print("\n[4/6] Checking dashboard graph references...") graph_names = collect_graph_names(content) dashboard_ref_errors = validate_dashboard_references(content, graph_names) if dashboard_ref_errors: all_errors.extend(dashboard_ref_errors) print(f" ❌ Found {len(dashboard_ref_errors)} broken graph references in dashboards") else: print(f" ✅ All dashboard graph references are valid ({len(graph_names)} graphs found)") # ========== 5. Calculated Formulas Check ========== print("\n[5/6] Checking calculated item formulas...") formula_errors = validate_calculated_formulas(content) if formula_errors: all_errors.extend(formula_errors) print(f" ❌ Found {len(formula_errors)} invalid formulas") else: print(" ✅ Formula syntax looks compatible") # ========== 6. Nested Structure Check ========== print("\n[6/6] Checking for structure nesting errors...") nesting_errors = validate_nested_structure(content) if nesting_errors: all_errors.extend(nesting_errors) print(f" ❌ Found {len(nesting_errors)} nesting errors (items inside triggers, etc)") else: print(" ✅ Structure nesting looks correct") # ========== 7. Zabbix 7.0 Compliance Check ========== print("\n[7/7] Checking for Zabbix 7.0 Compliance & Common Errors...") z7_errors, z7_warnings = validate_zabbix_7_compliance(content) if z7_errors: all_errors.extend(z7_errors) print(f" ❌ Found {len(z7_errors)} compliance errors") else: print(" ✅ Zabbix 7.0 compliance looks good") if z7_warnings: warnings.extend(z7_warnings) # ========== Summary ========== print("\n" + "=" * 60) if warnings: eng_warnings = [w for w in warnings if 'ENGLISH' in w] if eng_warnings: print(f"\n[WARNINGS] {len(eng_warnings)} potential English descriptions found (Arthur Audit)") for w in eng_warnings[:5]: # Show max 5 print(f" • {w}") if len(eng_warnings) > 5: print(f" ... and {len(eng_warnings) - 5} more") if all_errors: print(f"\n[ERRORS FOUND] {len(all_errors)} critical issues:") for e in all_errors: print(f" ❌ {e}") return False else: print("\n[SUCCESS] YAML Structure & UUIDs are VALID. ✅") return True def collect_uuids_from_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: content = yaml.safe_load(f) if not content: return set() local_uuids = set() def extract(node, path="root"): if isinstance(node, dict): if 'uuid' in node: # Ignore UUIDs in template_groups and host_groups (they SHOULD be shared) if "template_groups" not in path and "host_groups" not in path: local_uuids.add(node['uuid']) for k, v in node.items(): extract(v, f"{path}.{k}") elif isinstance(node, list): for i, item in enumerate(node): extract(item, f"{path}[{i}]") extract(content) return local_uuids except Exception as e: print(f"[WARN] Could not parse {file_path} for collision check: {e}") return set() def check_cross_template_collisions(target_file, search_dir): print(f"\n[INFO] Checking for cross-template UUID collisions in: {search_dir}") target_uuids = collect_uuids_from_file(target_file) if not target_uuids: return True # Target file is empty or invalid, handled by main validation collisions = [] for root, _, files in os.walk(search_dir): for file in files: if file.endswith('.yaml') or file.endswith('.xml'): full_path = os.path.join(root, file) if os.path.abspath(full_path) == os.path.abspath(target_file): continue # Skip self other_uuids = collect_uuids_from_file(full_path) intersection = target_uuids.intersection(other_uuids) if intersection: for uuid in intersection: collisions.append(f"[COLLISION] UUID {uuid} exists in both '{os.path.basename(target_file)}' and '{file}'") if collisions: print("\n[CROSS-TEMPLATE COLLISIONS DETECTED]:") for c in collisions: print(c) return False else: print("[SUCCESS] No cross-template UUID collisions found.") return True def zabbix_import(file_path, url, token): print(f"\n[INFO] Attempting to import {os.path.basename(file_path)} to Zabbix at {url}...") try: with open(file_path, 'r', encoding='utf-8') as f: yaml_content = f.read() except Exception as e: print(f"[ERROR] Could not read file for import: {e}") return False # Construct the JSON-RPC request for Zabbix 6.0/7.0 payload = { "jsonrpc": "2.0", "method": "configuration.import", "params": { "format": "yaml", "source": yaml_content, "rules": { "host_groups": { "createMissing": True, "updateExisting": True }, "template_groups": { "createMissing": True, "updateExisting": True }, "templates": { "createMissing": True, "updateExisting": True }, "valueMaps": { "createMissing": True, "updateExisting": True }, "templateDashboards": { "createMissing": True, "updateExisting": True }, "templateLinkage": { "createMissing": True, # Usually we want to link if missing "deleteMissing": False }, "items": { "createMissing": True, "updateExisting": True, "deleteMissing": False }, "discoveryRules": { "createMissing": True, "updateExisting": True, "deleteMissing": False }, "triggers": { "createMissing": True, "updateExisting": True, "deleteMissing": False }, "graphs": { "createMissing": True, "updateExisting": True, "deleteMissing": False }, "httptests": { "createMissing": True, "updateExisting": True, "deleteMissing": False } } }, "id": 1 } # Prepare request api_url = url.rstrip('/') + "/api_jsonrpc.php" headers = { 'Content-Type': 'application/json-rpc', 'Authorization': f'Bearer {token}' } data = json.dumps(payload).encode('utf-8') try: req = urllib.request.Request(api_url, data=data, headers=headers, method='POST') with urllib.request.urlopen(req) as response: resp_body = response.read().decode('utf-8') json_resp = json.loads(resp_body) if 'error' in json_resp: error = json_resp['error'] print(f"[IMPORT FAILED] API Error {error.get('code')}: {error.get('message')}") if 'data' in error: print(f"Details: {error['data']}") return False elif 'result' in json_resp and json_resp['result'] is True: print(f"[SUCCESS] Template imported successfully!") return True else: # Unexpected success response format, but likely success if no error print(f"[SUCCESS] Template imported (Response: {json_resp.get('result')})") return True except urllib.error.HTTPError as e: print(f"[IMPORT FAILED] HTTP Error: {e.code} - {e.reason}") return False except urllib.error.URLError as e: print(f"[IMPORT FAILED] Connection Error: {e.reason}") return False except Exception as e: print(f"[IMPORT FAILED] Unexpected error: {e}") return False if __name__ == "__main__": parser = argparse.ArgumentParser(description="Validate and optionally import Zabbix templates.") parser.add_argument("file", help="Path to the YAML template file") parser.add_argument("--url", help="Zabbix Server URL (e.g., https://zabbix.example.com)", default=None) parser.add_argument("--token", help="Zabbix API Token", default=None) parser.add_argument("--import-template", action="store_true", help="Attempt to import if validation passes") args = parser.parse_args() file_path = args.file # 1. Validate the file itself if not validate_yaml(file_path): sys.exit(1) # 2. Check for collisions in the same directory (Gold Edition Suite) directory = os.path.dirname(os.path.abspath(file_path)) if not check_cross_template_collisions(file_path, directory): sys.exit(1) # 3. Import if requested if args.import_template: if not args.url or not args.token: print("\n[ERROR] To import, you must provide --url and --token.") sys.exit(1) if not zabbix_import(file_path, args.url, args.token): sys.exit(1) sys.exit(0)