minions-ai-agents/tests/test_zabbix.py

127 lines
4.1 KiB
Python

"""
Tests for Zabbix Connector.
Tests the Zabbix API connector functionality.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from src.clients.zabbix_connector import (
ZabbixConnector,
HostStatus,
Problem,
get_zabbix_connector
)
class TestZabbixConnector:
"""Tests for ZabbixConnector class."""
@pytest.fixture
def connector(self):
"""Create a connector with mock token."""
with patch.dict('os.environ', {'ZABBIX_API_TOKEN': 'test-token'}):
return ZabbixConnector(api_token="test-token")
def test_init_with_token(self):
"""Test initialization with API token."""
connector = ZabbixConnector(api_token="my-token")
assert connector._token == "my-token"
def test_init_without_token(self):
"""Test initialization without token (from config)."""
with patch('src.clients.zabbix_connector.Config') as mock_config:
mock_config.get_zabbix_config.return_value = Mock(
url="https://zabbix.test",
api_token=None,
verify_ssl=True,
timeout=30
)
connector = ZabbixConnector()
assert connector._token is None
def test_connect_without_token_fails(self, connector):
"""Test connection fails without token."""
connector._token = None
result = connector.connect()
assert result is False
@patch('src.clients.zabbix_connector.ZabbixAPI')
def test_connect_success(self, mock_api_class, connector):
"""Test successful connection."""
mock_api = MagicMock()
mock_api.api_version.return_value = "6.0.0"
mock_api_class.return_value = mock_api
result = connector.connect()
assert result is True
assert connector._api is not None
@patch('src.clients.zabbix_connector.ZabbixAPI')
def test_connect_failure(self, mock_api_class, connector):
"""Test connection failure handling."""
mock_api_class.side_effect = Exception("Connection refused")
result = connector.connect()
assert result is False
assert connector._api is None
def test_host_status_dataclass(self):
"""Test HostStatus dataclass."""
status = HostStatus(
host_id="12345",
hostname="srv-app01",
name="Application Server 01",
status="enabled",
availability="available",
groups=["Linux Servers", "Production"],
interfaces=[{"ip": "192.168.1.10"}]
)
assert status.host_id == "12345"
assert status.hostname == "srv-app01"
assert "Production" in status.groups
def test_problem_dataclass(self):
"""Test Problem dataclass."""
problem = Problem(
event_id="9999",
host_id="12345",
hostname="srv-db01",
severity=4, # High
name="Disk space is low",
acknowledged=False,
timestamp="1706800000",
duration_seconds=3600,
tags=[{"tag": "scope", "value": "storage"}]
)
assert problem.severity == 4
assert problem.acknowledged is False
assert problem.duration_seconds == 3600
def test_map_availability(self):
"""Test availability status mapping."""
assert ZabbixConnector._map_availability("0") == "unknown"
assert ZabbixConnector._map_availability("1") == "available"
assert ZabbixConnector._map_availability("2") == "unavailable"
assert ZabbixConnector._map_availability("99") == "unknown"
class TestZabbixConnectorSingleton:
"""Tests for singleton instance."""
def test_get_zabbix_connector_singleton(self):
"""Test singleton returns same instance."""
# Reset singleton
import src.clients.zabbix_connector as module
module._zabbix_connector = None
connector1 = get_zabbix_connector()
connector2 = get_zabbix_connector()
assert connector1 is connector2