From ce3e25655089dab07a5715958748ab63bfa0fb28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Pedro=20Toledo?= Date: Sun, 1 Feb 2026 12:03:32 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20Implementa=C3=A7=C3=A3o=20da=20Fase=202?= =?UTF-8?q?=20-=20Conectores=20e=20Infraestrutura?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Porque foi feita essa alteração? - Nova funcionalidade: Implementação dos conectores de infraestrutura para o Arthur - ZabbixConnector: noc.itguys.com.br com Read-Only API - QdrantMultitenant: Vector DB com isolamento por tenant e on_disk storage - OllamaClient: Inferência local com triage (1B) e specialist (8B) Quais testes foram feitos? - 72 testes unitários com pytest (todos passando) - Mocks para Zabbix API, Qdrant e Ollama HTTP - Testes de isolamento de tenant no Qdrant - Testes async para Ollama client A alteração gerou um novo teste que precisa ser implementado no pipeline? - Sim. Novos arquivos: test_zabbix.py, test_qdrant.py, test_ollama.py --- src/clients/__init__.py | 21 +- src/clients/ollama_client.py | 249 ++++++++++++++++++++ src/clients/qdrant_client.py | 349 +++++++++++++++++++++++++++ src/clients/zabbix_connector.py | 402 ++++++++++++++++++++++++++++++++ tests/test_ollama.py | 163 +++++++++++++ tests/test_qdrant.py | 128 ++++++++++ tests/test_zabbix.py | 126 ++++++++++ 7 files changed, 1437 insertions(+), 1 deletion(-) create mode 100644 src/clients/ollama_client.py create mode 100644 src/clients/qdrant_client.py create mode 100644 src/clients/zabbix_connector.py create mode 100644 tests/test_ollama.py create mode 100644 tests/test_qdrant.py create mode 100644 tests/test_zabbix.py diff --git a/src/clients/__init__.py b/src/clients/__init__.py index 3b58520..74d36d4 100644 --- a/src/clients/__init__.py +++ b/src/clients/__init__.py @@ -1,9 +1,28 @@ # Clients Module for Arthur Agent (External Integrations) -from .mock_financial import MockFinancialClient, FinancialClient +from .mock_financial import MockFinancialClient, FinancialClient, get_financial_client from .mail_client import MailConfig +from .zabbix_connector import ZabbixConnector, get_zabbix_connector, HostStatus, Problem +from .qdrant_client import QdrantMultitenant, get_qdrant_client, SearchResult +from .ollama_client import OllamaClient, get_ollama_client, LLMResponse __all__ = [ + # Financial "MockFinancialClient", "FinancialClient", + "get_financial_client", + # Mail "MailConfig", + # Zabbix + "ZabbixConnector", + "get_zabbix_connector", + "HostStatus", + "Problem", + # Qdrant + "QdrantMultitenant", + "get_qdrant_client", + "SearchResult", + # Ollama + "OllamaClient", + "get_ollama_client", + "LLMResponse", ] diff --git a/src/clients/ollama_client.py b/src/clients/ollama_client.py new file mode 100644 index 0000000..ea57126 --- /dev/null +++ b/src/clients/ollama_client.py @@ -0,0 +1,249 @@ +""" +Ollama Client for Arthur Agent. + +Provides local LLM inference for both triage (1B) and specialist (8B) models. +Optimized for CPU-only operation per PRD. +""" + +import logging +from typing import Optional, AsyncIterator +from dataclasses import dataclass +import httpx + +from src.config import Config + +logger = logging.getLogger("ArthurOllama") + + +@dataclass +class LLMResponse: + """Response from LLM inference.""" + content: str + model: str + total_tokens: int + eval_duration_ms: int + prompt_eval_count: int + + +class OllamaClient: + """ + Client for Ollama local LLM inference. + + Supports two models per PRD: + - Triage (1B): Fast extraction and classification + - Specialist (8B): Deep reasoning and response generation + """ + + def __init__(self): + """Initialize Ollama client from config.""" + config = Config.get_llm_config() + self._base_url = config.ollama_base_url + self._triage_model = config.triage_model + self._specialist_model = config.specialist_model + self._triage_context = config.triage_context + self._specialist_context = config.specialist_context + + # HTTP client with longer timeout for LLM + self._client = httpx.AsyncClient( + base_url=self._base_url, + timeout=httpx.Timeout(120.0, connect=10.0) + ) + + async def health_check(self) -> bool: + """ + Check if Ollama server is running. + + Returns: + True if healthy, False otherwise + """ + try: + response = await self._client.get("/api/tags") + if response.status_code == 200: + models = response.json().get("models", []) + logger.info(f"Ollama healthy. Available models: {len(models)}") + return True + return False + except Exception as e: + logger.error(f"Ollama health check failed: {e}") + return False + + async def list_models(self) -> list[str]: + """List available models in Ollama.""" + try: + response = await self._client.get("/api/tags") + if response.status_code == 200: + models = response.json().get("models", []) + return [m["name"] for m in models] + return [] + except Exception as e: + logger.error(f"Failed to list models: {e}") + return [] + + async def generate_triage( + self, + prompt: str, + system_prompt: Optional[str] = None + ) -> Optional[LLMResponse]: + """ + Generate response using triage model (1B - fast). + + Used for: + - Entity extraction (client, technology, problem) + - Initial classification + - Tool selection + + Args: + prompt: User prompt + system_prompt: System instructions + + Returns: + LLMResponse or None if failed + """ + return await self._generate( + model=self._triage_model, + prompt=prompt, + system_prompt=system_prompt, + num_ctx=self._triage_context + ) + + async def generate_specialist( + self, + prompt: str, + system_prompt: Optional[str] = None + ) -> Optional[LLMResponse]: + """ + Generate response using specialist model (8B - reasoning). + + Used for: + - Root cause analysis + - Technical diagnosis + - Response generation + + Args: + prompt: User prompt with enriched context + system_prompt: System instructions + + Returns: + LLMResponse or None if failed + """ + return await self._generate( + model=self._specialist_model, + prompt=prompt, + system_prompt=system_prompt, + num_ctx=self._specialist_context + ) + + async def _generate( + self, + model: str, + prompt: str, + system_prompt: Optional[str] = None, + num_ctx: int = 2048 + ) -> Optional[LLMResponse]: + """ + Core generation method. + + Args: + model: Model name + prompt: User prompt + system_prompt: System instructions + num_ctx: Context window size + + Returns: + LLMResponse or None if failed + """ + try: + payload = { + "model": model, + "prompt": prompt, + "stream": False, + "options": { + "num_ctx": num_ctx, + "temperature": 0.3, # Lower for more deterministic + "top_p": 0.9, + } + } + + if system_prompt: + payload["system"] = system_prompt + + response = await self._client.post("/api/generate", json=payload) + + if response.status_code != 200: + logger.error(f"Ollama error: {response.status_code} - {response.text}") + return None + + data = response.json() + + return LLMResponse( + content=data.get("response", ""), + model=data.get("model", model), + total_tokens=data.get("eval_count", 0) + data.get("prompt_eval_count", 0), + eval_duration_ms=int(data.get("eval_duration", 0) / 1_000_000), + prompt_eval_count=data.get("prompt_eval_count", 0) + ) + + except httpx.TimeoutException: + logger.error(f"Ollama timeout for model {model}") + return None + except Exception as e: + logger.error(f"Ollama generation failed: {e}") + return None + + async def generate_stream( + self, + model: str, + prompt: str, + system_prompt: Optional[str] = None + ) -> AsyncIterator[str]: + """ + Stream response tokens. + + Args: + model: Model name + prompt: User prompt + system_prompt: System instructions + + Yields: + Response tokens as they are generated + """ + try: + payload = { + "model": model, + "prompt": prompt, + "stream": True, + } + + if system_prompt: + payload["system"] = system_prompt + + async with self._client.stream( + "POST", "/api/generate", json=payload + ) as response: + async for line in response.aiter_lines(): + if line: + import json + data = json.loads(line) + if token := data.get("response"): + yield token + if data.get("done"): + break + + except Exception as e: + logger.error(f"Streaming failed: {e}") + + async def close(self) -> None: + """Close HTTP client.""" + await self._client.aclose() + + +# Singleton instance +_ollama_client: Optional[OllamaClient] = None + + +def get_ollama_client() -> OllamaClient: + """Get global Ollama client instance.""" + global _ollama_client + if _ollama_client is None: + _ollama_client = OllamaClient() + return _ollama_client diff --git a/src/clients/qdrant_client.py b/src/clients/qdrant_client.py new file mode 100644 index 0000000..ccb0282 --- /dev/null +++ b/src/clients/qdrant_client.py @@ -0,0 +1,349 @@ +""" +Qdrant Client for Arthur Agent. + +Provides multitenant vector storage for RAG with tenant isolation +via payload filtering. +""" + +import logging +from typing import Optional, Any +from dataclasses import dataclass + +from qdrant_client import QdrantClient +from qdrant_client.http import models as qmodels +from qdrant_client.http.exceptions import UnexpectedResponse + +from src.config import Config + +logger = logging.getLogger("ArthurQdrant") + +# Embedding dimensions for different models +EMBEDDING_DIMENSIONS = { + "bge-small": 384, + "bge-base": 768, + "all-MiniLM-L6-v2": 384, + "nomic-embed-text": 768, +} + + +@dataclass +class SearchResult: + """Result from vector search.""" + id: str + score: float + content: str + metadata: dict + tenant_id: str + + +class QdrantMultitenant: + """ + Multitenant Qdrant client for Arthur Agent. + + Ensures tenant isolation through payload filtering. + All queries MUST include tenant_id filter per PRD. + """ + + def __init__( + self, + embedding_dim: int = 384, + collection_name: Optional[str] = None + ): + """ + Initialize Qdrant client. + + Args: + embedding_dim: Dimension of embedding vectors + collection_name: Name of the collection (default from config) + """ + config = Config.get_qdrant_config() + self._host = config.host + self._port = config.port + self._collection = collection_name or config.collection_name + self._on_disk = config.on_disk + self._embedding_dim = embedding_dim + self._client: Optional[QdrantClient] = None + + def connect(self) -> bool: + """ + Connect to Qdrant server. + + Returns: + True if successful, False otherwise + """ + try: + self._client = QdrantClient( + host=self._host, + port=self._port, + timeout=30 + ) + + # Test connection + collections = self._client.get_collections() + logger.info( + f"Connected to Qdrant at {self._host}:{self._port}. " + f"Collections: {len(collections.collections)}" + ) + return True + + except Exception as e: + logger.error(f"Failed to connect to Qdrant: {e}") + self._client = None + return False + + def ensure_collection(self) -> bool: + """ + Ensure the collection exists with proper configuration. + + Creates collection if it doesn't exist, with on_disk storage + for RAM optimization per PRD. + + Returns: + True if collection is ready, False otherwise + """ + if self._client is None: + if not self.connect(): + return False + + try: + # Check if collection exists + collections = self._client.get_collections() + exists = any( + c.name == self._collection + for c in collections.collections + ) + + if exists: + logger.info(f"Collection '{self._collection}' already exists") + return True + + # Create collection with optimized settings + self._client.create_collection( + collection_name=self._collection, + vectors_config=qmodels.VectorParams( + size=self._embedding_dim, + distance=qmodels.Distance.COSINE, + on_disk=self._on_disk # RAM optimization + ), + # Payload indexing for tenant filtering + optimizers_config=qmodels.OptimizersConfigDiff( + indexing_threshold=10000 + ), + on_disk_payload=self._on_disk + ) + + # Create index for tenant_id (required for multitenancy) + self._client.create_payload_index( + collection_name=self._collection, + field_name="tenant_id", + field_schema=qmodels.PayloadSchemaType.KEYWORD + ) + + # Create index for technology (for filtering by tech stack) + self._client.create_payload_index( + collection_name=self._collection, + field_name="technology", + field_schema=qmodels.PayloadSchemaType.KEYWORD + ) + + logger.info(f"Created collection '{self._collection}' with tenant indexing") + return True + + except Exception as e: + logger.error(f"Failed to ensure collection: {e}") + return False + + def upsert_document( + self, + doc_id: str, + content: str, + embedding: list[float], + tenant_id: str, + metadata: Optional[dict] = None + ) -> bool: + """ + Insert or update a document. + + Args: + doc_id: Unique document identifier + content: Text content of the document + embedding: Vector embedding + tenant_id: Tenant identifier (REQUIRED) + metadata: Additional metadata (technology, source, etc) + + Returns: + True if successful + """ + if self._client is None: + if not self.connect(): + return False + + if not tenant_id: + logger.error("tenant_id is REQUIRED for all documents") + return False + + try: + payload = { + "content": content, + "tenant_id": tenant_id, + **(metadata or {}) + } + + self._client.upsert( + collection_name=self._collection, + points=[ + qmodels.PointStruct( + id=doc_id, + vector=embedding, + payload=payload + ) + ] + ) + + logger.debug(f"Upserted document {doc_id} for tenant {tenant_id}") + return True + + except Exception as e: + logger.error(f"Failed to upsert document: {e}") + return False + + def search( + self, + query_embedding: list[float], + tenant_id: str, + limit: int = 5, + technology_filter: Optional[str] = None, + score_threshold: float = 0.5 + ) -> list[SearchResult]: + """ + Search for similar documents with tenant isolation. + + Args: + query_embedding: Query vector + tenant_id: Tenant identifier (REQUIRED for isolation) + limit: Maximum results to return + technology_filter: Optional filter by technology + score_threshold: Minimum similarity score + + Returns: + List of search results + """ + if self._client is None: + if not self.connect(): + return [] + + if not tenant_id: + logger.error("tenant_id is REQUIRED for all searches (isolation)") + return [] + + try: + # Build filter - tenant_id is ALWAYS required + must_conditions = [ + qmodels.FieldCondition( + key="tenant_id", + match=qmodels.MatchValue(value=tenant_id) + ) + ] + + if technology_filter: + must_conditions.append( + qmodels.FieldCondition( + key="technology", + match=qmodels.MatchValue(value=technology_filter) + ) + ) + + results = self._client.search( + collection_name=self._collection, + query_vector=query_embedding, + query_filter=qmodels.Filter(must=must_conditions), + limit=limit, + score_threshold=score_threshold + ) + + return [ + SearchResult( + id=str(r.id), + score=r.score, + content=r.payload.get("content", ""), + metadata={ + k: v for k, v in r.payload.items() + if k not in ("content", "tenant_id") + }, + tenant_id=r.payload.get("tenant_id", "") + ) + for r in results + ] + + except Exception as e: + logger.error(f"Search failed: {e}") + return [] + + def delete_tenant_documents(self, tenant_id: str) -> bool: + """ + Delete all documents for a tenant. + + Use with caution - for tenant cleanup only. + + Args: + tenant_id: Tenant identifier + + Returns: + True if successful + """ + if self._client is None: + if not self.connect(): + return False + + try: + self._client.delete( + collection_name=self._collection, + points_selector=qmodels.FilterSelector( + filter=qmodels.Filter( + must=[ + qmodels.FieldCondition( + key="tenant_id", + match=qmodels.MatchValue(value=tenant_id) + ) + ] + ) + ) + ) + + logger.info(f"Deleted all documents for tenant {tenant_id}") + return True + + except Exception as e: + logger.error(f"Failed to delete tenant documents: {e}") + return False + + def get_collection_info(self) -> Optional[dict]: + """Get collection statistics.""" + if self._client is None: + if not self.connect(): + return None + + try: + info = self._client.get_collection(self._collection) + return { + "name": self._collection, + "vectors_count": info.vectors_count, + "points_count": info.points_count, + "status": info.status.value, + "on_disk": self._on_disk + } + except Exception as e: + logger.error(f"Failed to get collection info: {e}") + return None + + +# Singleton instance +_qdrant_client: Optional[QdrantMultitenant] = None + + +def get_qdrant_client() -> QdrantMultitenant: + """Get global Qdrant client instance.""" + global _qdrant_client + if _qdrant_client is None: + _qdrant_client = QdrantMultitenant() + return _qdrant_client diff --git a/src/clients/zabbix_connector.py b/src/clients/zabbix_connector.py new file mode 100644 index 0000000..4477db2 --- /dev/null +++ b/src/clients/zabbix_connector.py @@ -0,0 +1,402 @@ +""" +Zabbix API Connector for Arthur Agent. + +Provides read-only access to Zabbix monitoring data for +infrastructure diagnostics and root cause analysis. +""" + +import logging +from typing import Optional, Any +from dataclasses import dataclass + +from zabbix_utils import ZabbixAPI + +from src.config import Config + +logger = logging.getLogger("ArthurZabbix") + + +@dataclass +class HostStatus: + """Status information for a monitored host.""" + host_id: str + hostname: str + name: str + status: str # "enabled", "disabled" + availability: str # "available", "unavailable", "unknown" + groups: list[str] + interfaces: list[dict] + last_problem: Optional[str] = None + + +@dataclass +class Problem: + """Active problem/alert from Zabbix.""" + event_id: str + host_id: str + hostname: str + severity: int # 0-5 (Not classified to Disaster) + name: str + acknowledged: bool + timestamp: str + duration_seconds: int + tags: list[dict] + + +class ZabbixConnector: + """ + Read-only connector for Zabbix API. + + Provides methods for: + - Getting host status and availability + - Listing active problems/alerts + - Root cause analysis (neighbor host correlation) + + Per PRD: Uses Read-Only API token for security. + """ + + def __init__(self, api_token: Optional[str] = None): + """ + Initialize Zabbix connector. + + Args: + api_token: Zabbix API token. If not provided, reads from config. + """ + config = Config.get_zabbix_config() + self._url = config.url + self._token = api_token or config.api_token + self._verify_ssl = config.verify_ssl + self._timeout = config.timeout + self._api: Optional[ZabbixAPI] = None + + if not self._token: + logger.warning("Zabbix API token not configured") + + def connect(self) -> bool: + """ + Establish connection to Zabbix API. + + Returns: + True if connection successful, False otherwise. + """ + if not self._token: + logger.error("Cannot connect: API token not configured") + return False + + try: + self._api = ZabbixAPI( + url=self._url, + token=self._token, + validate_certs=self._verify_ssl, + timeout=self._timeout + ) + + # Test connection by getting API version + version = self._api.api_version() + logger.info(f"Connected to Zabbix API v{version} at {self._url}") + return True + + except Exception as e: + logger.error(f"Failed to connect to Zabbix: {e}") + self._api = None + return False + + def disconnect(self) -> None: + """Close Zabbix API connection.""" + if self._api: + try: + self._api.logout() + except Exception: + pass # Token-based auth doesn't need explicit logout + self._api = None + logger.info("Disconnected from Zabbix API") + + def _ensure_connected(self) -> bool: + """Ensure API connection is established.""" + if self._api is None: + return self.connect() + return True + + def get_host_status(self, hostname: str) -> Optional[HostStatus]: + """ + Get status and availability of a host. + + Args: + hostname: Host name or visible name to search + + Returns: + HostStatus if found, None otherwise + """ + if not self._ensure_connected(): + return None + + try: + hosts = self._api.host.get( + search={"host": hostname, "name": hostname}, + searchWildcardsEnabled=True, + selectGroups=["name"], + selectInterfaces=["ip", "type", "available"], + output=["hostid", "host", "name", "status", "available"] + ) + + if not hosts: + logger.warning(f"Host not found: {hostname}") + return None + + host = hosts[0] + + # Get last problem for this host + problems = self._api.problem.get( + hostids=[host["hostid"]], + recent=True, + sortfield="eventid", + sortorder="DESC", + limit=1, + output=["name"] + ) + + last_problem = problems[0]["name"] if problems else None + + return HostStatus( + host_id=host["hostid"], + hostname=host["host"], + name=host["name"], + status="enabled" if host["status"] == "0" else "disabled", + availability=self._map_availability(host.get("available", "0")), + groups=[g["name"] for g in host.get("groups", [])], + interfaces=host.get("interfaces", []), + last_problem=last_problem + ) + + except Exception as e: + logger.error(f"Error getting host status: {e}") + return None + + def get_active_problems( + self, + host_id: Optional[str] = None, + severity_min: int = 2, # Warning and above + limit: int = 20 + ) -> list[Problem]: + """ + Get active problems/alerts. + + Args: + host_id: Filter by specific host (optional) + severity_min: Minimum severity level (0-5) + limit: Maximum number of problems to return + + Returns: + List of active problems + """ + if not self._ensure_connected(): + return [] + + try: + params = { + "recent": True, + "sortfield": ["severity", "eventid"], + "sortorder": ["DESC", "DESC"], + "limit": limit, + "selectTags": "extend", + "output": ["eventid", "objectid", "severity", "name", + "acknowledged", "clock", "r_eventid"] + } + + if host_id: + params["hostids"] = [host_id] + + if severity_min > 0: + params["severities"] = list(range(severity_min, 6)) + + problems = self._api.problem.get(**params) + + result = [] + for p in problems: + # Get host info for this problem + host_info = self._get_host_for_trigger(p.get("objectid")) + + result.append(Problem( + event_id=p["eventid"], + host_id=host_info.get("hostid", ""), + hostname=host_info.get("host", "unknown"), + severity=int(p["severity"]), + name=p["name"], + acknowledged=p["acknowledged"] == "1", + timestamp=p["clock"], + duration_seconds=self._calculate_duration(p["clock"]), + tags=p.get("tags", []) + )) + + return result + + except Exception as e: + logger.error(f"Error getting active problems: {e}") + return [] + + def get_neighbor_alerts( + self, + host_id: str, + time_window_minutes: int = 30 + ) -> list[Problem]: + """ + Get alerts from hosts in the same group (neighbor correlation). + + Used for root cause analysis - if multiple hosts in same + network segment have issues, it may indicate infrastructure problem. + + Args: + host_id: Reference host ID + time_window_minutes: Time window to search for correlated alerts + + Returns: + List of problems from neighbor hosts + """ + if not self._ensure_connected(): + return [] + + try: + # Get groups of the reference host + hosts = self._api.host.get( + hostids=[host_id], + selectGroups=["groupid"], + output=["hostid"] + ) + + if not hosts: + return [] + + group_ids = [g["groupid"] for g in hosts[0].get("groups", [])] + + if not group_ids: + return [] + + # Get all hosts in the same groups + neighbor_hosts = self._api.host.get( + groupids=group_ids, + output=["hostid", "host"] + ) + + neighbor_ids = [h["hostid"] for h in neighbor_hosts if h["hostid"] != host_id] + + if not neighbor_ids: + return [] + + # Get problems for neighbor hosts + import time + time_from = int(time.time()) - (time_window_minutes * 60) + + problems = self._api.problem.get( + hostids=neighbor_ids, + time_from=time_from, + recent=True, + sortfield="eventid", + sortorder="DESC", + output=["eventid", "objectid", "severity", "name", + "acknowledged", "clock"] + ) + + result = [] + for p in problems: + host_info = self._get_host_for_trigger(p.get("objectid")) + result.append(Problem( + event_id=p["eventid"], + host_id=host_info.get("hostid", ""), + hostname=host_info.get("host", "unknown"), + severity=int(p["severity"]), + name=p["name"], + acknowledged=p["acknowledged"] == "1", + timestamp=p["clock"], + duration_seconds=self._calculate_duration(p["clock"]), + tags=[] + )) + + return result + + except Exception as e: + logger.error(f"Error getting neighbor alerts: {e}") + return [] + + def get_host_by_ip(self, ip_address: str) -> Optional[HostStatus]: + """ + Find host by IP address. + + Args: + ip_address: IP address to search + + Returns: + HostStatus if found, None otherwise + """ + if not self._ensure_connected(): + return None + + try: + hosts = self._api.host.get( + filter={"ip": ip_address}, + selectGroups=["name"], + selectInterfaces=["ip", "type", "available"], + output=["hostid", "host", "name", "status", "available"] + ) + + if not hosts: + return None + + host = hosts[0] + return HostStatus( + host_id=host["hostid"], + hostname=host["host"], + name=host["name"], + status="enabled" if host["status"] == "0" else "disabled", + availability=self._map_availability(host.get("available", "0")), + groups=[g["name"] for g in host.get("groups", [])], + interfaces=host.get("interfaces", []) + ) + + except Exception as e: + logger.error(f"Error finding host by IP: {e}") + return None + + def _get_host_for_trigger(self, trigger_id: str) -> dict: + """Get host information for a trigger.""" + try: + triggers = self._api.trigger.get( + triggerids=[trigger_id], + selectHosts=["hostid", "host"], + output=["triggerid"] + ) + if triggers and triggers[0].get("hosts"): + return triggers[0]["hosts"][0] + except Exception: + pass + return {} + + @staticmethod + def _map_availability(status: str) -> str: + """Map Zabbix availability code to string.""" + mapping = { + "0": "unknown", + "1": "available", + "2": "unavailable" + } + return mapping.get(status, "unknown") + + @staticmethod + def _calculate_duration(timestamp: str) -> int: + """Calculate duration in seconds from timestamp.""" + import time + try: + return int(time.time()) - int(timestamp) + except (ValueError, TypeError): + return 0 + + +# Singleton instance +_zabbix_connector: Optional[ZabbixConnector] = None + + +def get_zabbix_connector() -> ZabbixConnector: + """Get global Zabbix connector instance.""" + global _zabbix_connector + if _zabbix_connector is None: + _zabbix_connector = ZabbixConnector() + return _zabbix_connector diff --git a/tests/test_ollama.py b/tests/test_ollama.py new file mode 100644 index 0000000..143e5ee --- /dev/null +++ b/tests/test_ollama.py @@ -0,0 +1,163 @@ +""" +Tests for Ollama Client. + +Tests the local LLM inference client. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock, AsyncMock + +from src.clients.ollama_client import ( + OllamaClient, + LLMResponse, + get_ollama_client +) + + +class TestOllamaClient: + """Tests for OllamaClient class.""" + + @pytest.fixture + def client(self): + """Create an Ollama client for testing.""" + return OllamaClient() + + def test_init_defaults(self, client): + """Test default initialization.""" + assert "localhost" in client._base_url + assert client._triage_model is not None + assert client._specialist_model is not None + + def test_llm_response_dataclass(self): + """Test LLMResponse dataclass.""" + response = LLMResponse( + content="This is the model response", + model="llama3.2:1b", + total_tokens=150, + eval_duration_ms=500, + prompt_eval_count=50 + ) + + assert response.content == "This is the model response" + assert response.total_tokens == 150 + assert response.eval_duration_ms == 500 + + @pytest.mark.asyncio + async def test_health_check_success(self, client): + """Test successful health check.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"models": [{"name": "llama3.2:1b"}]} + + with patch.object(client._client, 'get', new_callable=AsyncMock) as mock_get: + mock_get.return_value = mock_response + + result = await client.health_check() + + assert result is True + + @pytest.mark.asyncio + async def test_health_check_failure(self, client): + """Test health check failure.""" + with patch.object(client._client, 'get', new_callable=AsyncMock) as mock_get: + mock_get.side_effect = Exception("Connection refused") + + result = await client.health_check() + + assert result is False + + @pytest.mark.asyncio + async def test_list_models_success(self, client): + """Test listing available models.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "models": [ + {"name": "llama3.2:1b"}, + {"name": "llama3.1:8b"} + ] + } + + with patch.object(client._client, 'get', new_callable=AsyncMock) as mock_get: + mock_get.return_value = mock_response + + models = await client.list_models() + + assert len(models) == 2 + assert "llama3.2:1b" in models + + @pytest.mark.asyncio + async def test_generate_triage_success(self, client): + """Test triage model generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "Extracted: client=OESTEPAN, problem=server down", + "model": "llama3.2:1b", + "eval_count": 20, + "prompt_eval_count": 50, + "eval_duration": 500_000_000 # nanoseconds + } + + with patch.object(client._client, 'post', new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + + result = await client.generate_triage("Extract entities from: server is down") + + assert result is not None + assert "OESTEPAN" in result.content + assert result.model == "llama3.2:1b" + + @pytest.mark.asyncio + async def test_generate_specialist_success(self, client): + """Test specialist model generation.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "response": "Based on the analysis, the root cause is disk space exhaustion.", + "model": "llama3.1:8b", + "eval_count": 100, + "prompt_eval_count": 200, + "eval_duration": 2_000_000_000 + } + + with patch.object(client._client, 'post', new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + + result = await client.generate_specialist( + "Analyze this problem with context...", + system_prompt="You are a technical support specialist." + ) + + assert result is not None + assert "root cause" in result.content + assert result.model == "llama3.1:8b" + + @pytest.mark.asyncio + async def test_generate_error_handling(self, client): + """Test error handling in generation.""" + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal server error" + + with patch.object(client._client, 'post', new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + + result = await client.generate_triage("Test prompt") + + assert result is None + + +class TestOllamaSingleton: + """Tests for singleton instance.""" + + def test_get_ollama_client_singleton(self): + """Test singleton returns same instance.""" + # Reset singleton + import src.clients.ollama_client as module + module._ollama_client = None + + client1 = get_ollama_client() + client2 = get_ollama_client() + + assert client1 is client2 diff --git a/tests/test_qdrant.py b/tests/test_qdrant.py new file mode 100644 index 0000000..b1880cf --- /dev/null +++ b/tests/test_qdrant.py @@ -0,0 +1,128 @@ +""" +Tests for Qdrant Multitenant Client. + +Tests the Qdrant vector database client with tenant isolation. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock + +from src.clients.qdrant_client import ( + QdrantMultitenant, + SearchResult, + get_qdrant_client +) + + +class TestQdrantMultitenant: + """Tests for QdrantMultitenant class.""" + + @pytest.fixture + def client(self): + """Create a Qdrant client for testing.""" + return QdrantMultitenant(embedding_dim=384) + + def test_init_defaults(self, client): + """Test default initialization.""" + assert client._embedding_dim == 384 + assert client._on_disk is True + assert client._client is None + + def test_init_custom_collection(self): + """Test custom collection name.""" + client = QdrantMultitenant(collection_name="custom_collection") + assert client._collection == "custom_collection" + + @patch('src.clients.qdrant_client.QdrantClient') + def test_connect_success(self, mock_qdrant_class, client): + """Test successful connection.""" + mock_qdrant = MagicMock() + mock_qdrant.get_collections.return_value = Mock(collections=[]) + mock_qdrant_class.return_value = mock_qdrant + + result = client.connect() + + assert result is True + assert client._client is not None + + @patch('src.clients.qdrant_client.QdrantClient') + def test_connect_failure(self, mock_qdrant_class, client): + """Test connection failure.""" + mock_qdrant_class.side_effect = Exception("Connection refused") + + result = client.connect() + + assert result is False + assert client._client is None + + def test_upsert_without_tenant_fails(self, client): + """Test upsert fails without tenant_id.""" + client._client = MagicMock() + + result = client.upsert_document( + doc_id="doc1", + content="Test content", + embedding=[0.1] * 384, + tenant_id="" # Empty tenant_id + ) + + assert result is False + + def test_search_without_tenant_fails(self, client): + """Test search fails without tenant_id.""" + client._client = MagicMock() + + results = client.search( + query_embedding=[0.1] * 384, + tenant_id="" # Empty tenant_id - MUST fail + ) + + assert results == [] + + @patch('src.clients.qdrant_client.QdrantClient') + def test_upsert_success(self, mock_qdrant_class, client): + """Test successful document upsert.""" + mock_qdrant = MagicMock() + mock_qdrant.get_collections.return_value = Mock(collections=[]) + mock_qdrant_class.return_value = mock_qdrant + client.connect() + + result = client.upsert_document( + doc_id="doc1", + content="Test document content", + embedding=[0.1] * 384, + tenant_id="tenant-001", + metadata={"technology": "linux"} + ) + + assert result is True + mock_qdrant.upsert.assert_called_once() + + def test_search_result_dataclass(self): + """Test SearchResult dataclass.""" + result = SearchResult( + id="doc1", + score=0.95, + content="This is the document content", + metadata={"technology": "linux"}, + tenant_id="tenant-001" + ) + + assert result.score == 0.95 + assert result.tenant_id == "tenant-001" + assert "linux" in result.metadata.get("technology", "") + + +class TestQdrantSingleton: + """Tests for singleton instance.""" + + def test_get_qdrant_client_singleton(self): + """Test singleton returns same instance.""" + # Reset singleton + import src.clients.qdrant_client as module + module._qdrant_client = None + + client1 = get_qdrant_client() + client2 = get_qdrant_client() + + assert client1 is client2 diff --git a/tests/test_zabbix.py b/tests/test_zabbix.py new file mode 100644 index 0000000..cc62c79 --- /dev/null +++ b/tests/test_zabbix.py @@ -0,0 +1,126 @@ +""" +Tests for Zabbix Connector. + +Tests the Zabbix API connector functionality. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock + +from src.clients.zabbix_connector import ( + ZabbixConnector, + HostStatus, + Problem, + get_zabbix_connector +) + + +class TestZabbixConnector: + """Tests for ZabbixConnector class.""" + + @pytest.fixture + def connector(self): + """Create a connector with mock token.""" + with patch.dict('os.environ', {'ZABBIX_API_TOKEN': 'test-token'}): + return ZabbixConnector(api_token="test-token") + + def test_init_with_token(self): + """Test initialization with API token.""" + connector = ZabbixConnector(api_token="my-token") + assert connector._token == "my-token" + + def test_init_without_token(self): + """Test initialization without token (from config).""" + with patch('src.clients.zabbix_connector.Config') as mock_config: + mock_config.get_zabbix_config.return_value = Mock( + url="https://zabbix.test", + api_token=None, + verify_ssl=True, + timeout=30 + ) + connector = ZabbixConnector() + assert connector._token is None + + def test_connect_without_token_fails(self, connector): + """Test connection fails without token.""" + connector._token = None + result = connector.connect() + assert result is False + + @patch('src.clients.zabbix_connector.ZabbixAPI') + def test_connect_success(self, mock_api_class, connector): + """Test successful connection.""" + mock_api = MagicMock() + mock_api.api_version.return_value = "6.0.0" + mock_api_class.return_value = mock_api + + result = connector.connect() + + assert result is True + assert connector._api is not None + + @patch('src.clients.zabbix_connector.ZabbixAPI') + def test_connect_failure(self, mock_api_class, connector): + """Test connection failure handling.""" + mock_api_class.side_effect = Exception("Connection refused") + + result = connector.connect() + + assert result is False + assert connector._api is None + + def test_host_status_dataclass(self): + """Test HostStatus dataclass.""" + status = HostStatus( + host_id="12345", + hostname="srv-app01", + name="Application Server 01", + status="enabled", + availability="available", + groups=["Linux Servers", "Production"], + interfaces=[{"ip": "192.168.1.10"}] + ) + + assert status.host_id == "12345" + assert status.hostname == "srv-app01" + assert "Production" in status.groups + + def test_problem_dataclass(self): + """Test Problem dataclass.""" + problem = Problem( + event_id="9999", + host_id="12345", + hostname="srv-db01", + severity=4, # High + name="Disk space is low", + acknowledged=False, + timestamp="1706800000", + duration_seconds=3600, + tags=[{"tag": "scope", "value": "storage"}] + ) + + assert problem.severity == 4 + assert problem.acknowledged is False + assert problem.duration_seconds == 3600 + + def test_map_availability(self): + """Test availability status mapping.""" + assert ZabbixConnector._map_availability("0") == "unknown" + assert ZabbixConnector._map_availability("1") == "available" + assert ZabbixConnector._map_availability("2") == "unavailable" + assert ZabbixConnector._map_availability("99") == "unknown" + + +class TestZabbixConnectorSingleton: + """Tests for singleton instance.""" + + def test_get_zabbix_connector_singleton(self): + """Test singleton returns same instance.""" + # Reset singleton + import src.clients.zabbix_connector as module + module._zabbix_connector = None + + connector1 = get_zabbix_connector() + connector2 = get_zabbix_connector() + + assert connector1 is connector2