""" Zabbix API Connector for Arthur Agent. Provides read-only access to Zabbix monitoring data for infrastructure diagnostics and root cause analysis. """ import logging from typing import Optional, Any from dataclasses import dataclass from zabbix_utils import ZabbixAPI from src.config import Config logger = logging.getLogger("ArthurZabbix") @dataclass class HostStatus: """Status information for a monitored host.""" host_id: str hostname: str name: str status: str # "enabled", "disabled" availability: str # "available", "unavailable", "unknown" groups: list[str] interfaces: list[dict] last_problem: Optional[str] = None @dataclass class Problem: """Active problem/alert from Zabbix.""" event_id: str host_id: str hostname: str severity: int # 0-5 (Not classified to Disaster) name: str acknowledged: bool timestamp: str duration_seconds: int tags: list[dict] class ZabbixConnector: """ Read-only connector for Zabbix API. Provides methods for: - Getting host status and availability - Listing active problems/alerts - Root cause analysis (neighbor host correlation) Per PRD: Uses Read-Only API token for security. """ def __init__(self, api_token: Optional[str] = None): """ Initialize Zabbix connector. Args: api_token: Zabbix API token. If not provided, reads from config. """ config = Config.get_zabbix_config() self._url = config.url self._token = api_token or config.api_token self._verify_ssl = config.verify_ssl self._timeout = config.timeout self._api: Optional[ZabbixAPI] = None if not self._token: logger.warning("Zabbix API token not configured") def connect(self) -> bool: """ Establish connection to Zabbix API. Returns: True if connection successful, False otherwise. """ if not self._token: logger.error("Cannot connect: API token not configured") return False try: self._api = ZabbixAPI( url=self._url, token=self._token, validate_certs=self._verify_ssl, timeout=self._timeout ) # Test connection by getting API version version = self._api.api_version() logger.info(f"Connected to Zabbix API v{version} at {self._url}") return True except Exception as e: logger.error(f"Failed to connect to Zabbix: {e}") self._api = None return False def disconnect(self) -> None: """Close Zabbix API connection.""" if self._api: try: self._api.logout() except Exception: pass # Token-based auth doesn't need explicit logout self._api = None logger.info("Disconnected from Zabbix API") def _ensure_connected(self) -> bool: """Ensure API connection is established.""" if self._api is None: return self.connect() return True def get_host_status(self, hostname: str) -> Optional[HostStatus]: """ Get status and availability of a host. Args: hostname: Host name or visible name to search Returns: HostStatus if found, None otherwise """ if not self._ensure_connected(): return None try: hosts = self._api.host.get( search={"host": hostname, "name": hostname}, searchWildcardsEnabled=True, selectGroups=["name"], selectInterfaces=["ip", "type", "available"], output=["hostid", "host", "name", "status", "available"] ) if not hosts: logger.warning(f"Host not found: {hostname}") return None host = hosts[0] # Get last problem for this host problems = self._api.problem.get( hostids=[host["hostid"]], recent=True, sortfield="eventid", sortorder="DESC", limit=1, output=["name"] ) last_problem = problems[0]["name"] if problems else None return HostStatus( host_id=host["hostid"], hostname=host["host"], name=host["name"], status="enabled" if host["status"] == "0" else "disabled", availability=self._map_availability(host.get("available", "0")), groups=[g["name"] for g in host.get("groups", [])], interfaces=host.get("interfaces", []), last_problem=last_problem ) except Exception as e: logger.error(f"Error getting host status: {e}") return None def get_active_problems( self, host_id: Optional[str] = None, severity_min: int = 2, # Warning and above limit: int = 20 ) -> list[Problem]: """ Get active problems/alerts. Args: host_id: Filter by specific host (optional) severity_min: Minimum severity level (0-5) limit: Maximum number of problems to return Returns: List of active problems """ if not self._ensure_connected(): return [] try: params = { "output": ["eventid", "objectid", "severity", "name", "acknowledged", "clock", "r_clock"], "selectHosts": ["hostid", "host", "name"], # Fetch host info in same query "selectTags": "extend", # Keep tags as they are in the Problem dataclass "recent": True, # Keep recent as it was in the original params "sortfield": ["severity", "eventid"], # Keep original sortfield "sortorder": ["DESC", "DESC"], # Keep original sortorder "limit": limit } if host_id: params["hostids"] = [host_id] if severity_min > 0: params["severities"] = list(range(severity_min, 6)) problems = self._api.problem.get(**params) result = [] for p in problems: # Extract host info from payload hosts = p.get("hosts", []) host_info = hosts[0] if hosts else {} result.append(Problem( event_id=p["eventid"], host_id=host_info.get("hostid", ""), hostname=host_info.get("host", "unknown"), severity=int(p["severity"]), name=p["name"], acknowledged=p["acknowledged"] == "1", timestamp=p["clock"], duration_seconds=self._calculate_duration(p["clock"]), tags=p.get("tags", []) )) return result except Exception as e: logger.error(f"Error getting active problems: {e}") return [] def get_neighbor_alerts( self, host_id: str, time_window_minutes: int = 30 ) -> list[Problem]: """ Get alerts from hosts in the same group (neighbor correlation). Used for root cause analysis - if multiple hosts in same network segment have issues, it may indicate infrastructure problem. Args: host_id: Reference host ID time_window_minutes: Time window to search for correlated alerts Returns: List of problems from neighbor hosts """ if not self._ensure_connected(): return [] try: # Get groups of the reference host hosts = self._api.host.get( hostids=[host_id], selectGroups=["groupid"], output=["hostid"] ) if not hosts: return [] group_ids = [g["groupid"] for g in hosts[0].get("groups", [])] if not group_ids: return [] # Get all hosts in the same groups neighbor_hosts = self._api.host.get( groupids=group_ids, output=["hostid", "host"] ) neighbor_ids = [h["hostid"] for h in neighbor_hosts if h["hostid"] != host_id] if not neighbor_ids: return [] # Get problems for neighbor hosts import time time_from = int(time.time()) - (time_window_minutes * 60) problems = self._api.problem.get( hostids=neighbor_ids, time_from=time_from, recent=True, sortfield="eventid", sortorder="DESC", output=["eventid", "objectid", "severity", "name", "acknowledged", "clock"] ) result = [] for p in problems: host_info = self._get_host_for_trigger(p.get("objectid")) result.append(Problem( event_id=p["eventid"], host_id=host_info.get("hostid", ""), hostname=host_info.get("host", "unknown"), severity=int(p["severity"]), name=p["name"], acknowledged=p["acknowledged"] == "1", timestamp=p["clock"], duration_seconds=self._calculate_duration(p["clock"]), tags=[] )) return result except Exception as e: logger.error(f"Error getting neighbor alerts: {e}") return [] def get_host_by_ip(self, ip_address: str) -> Optional[HostStatus]: """ Find host by IP address. Args: ip_address: IP address to search Returns: HostStatus if found, None otherwise """ if not self._ensure_connected(): return None try: hosts = self._api.host.get( filter={"ip": ip_address}, selectGroups=["name"], selectInterfaces=["ip", "type", "available"], output=["hostid", "host", "name", "status", "available"] ) if not hosts: return None host = hosts[0] return HostStatus( host_id=host["hostid"], hostname=host["host"], name=host["name"], status="enabled" if host["status"] == "0" else "disabled", availability=self._map_availability(host.get("available", "0")), groups=[g["name"] for g in host.get("groups", [])], interfaces=host.get("interfaces", []) ) except Exception as e: logger.error(f"Error finding host by IP: {e}") return None def _get_host_for_trigger(self, trigger_id: str) -> dict: """Get host information for a trigger.""" try: triggers = self._api.trigger.get( triggerids=[trigger_id], selectHosts=["hostid", "host"], output=["triggerid"] ) if triggers and triggers[0].get("hosts"): return triggers[0]["hosts"][0] except Exception: pass return {} @staticmethod def _map_availability(status: str) -> str: """Map Zabbix availability code to string.""" mapping = { "0": "unknown", "1": "available", "2": "unavailable" } return mapping.get(status, "unknown") @staticmethod def _calculate_duration(timestamp: str) -> int: """Calculate duration in seconds from timestamp.""" import time try: return int(time.time()) - int(timestamp) except (ValueError, TypeError): return 0 # Singleton instance _zabbix_connector: Optional[ZabbixConnector] = None def get_zabbix_connector() -> ZabbixConnector: """Get global Zabbix connector instance.""" global _zabbix_connector if _zabbix_connector is None: _zabbix_connector = ZabbixConnector() return _zabbix_connector