208 lines
6.9 KiB
Python
208 lines
6.9 KiB
Python
"""
|
|
Tests for Self-Correction Layer.
|
|
|
|
Tests validation and security checks.
|
|
"""
|
|
|
|
import pytest
|
|
from unittest.mock import Mock
|
|
|
|
from src.agents.validators import (
|
|
SelfCorrectionLayer,
|
|
ValidationResult,
|
|
ValidationIssue,
|
|
ValidationSeverity,
|
|
get_validator
|
|
)
|
|
from src.agents.triage_agent import TriageResult, ExtractedEntities, Priority, Category
|
|
from src.agents.specialist_agent import SpecialistResponse, EnrichedContext
|
|
|
|
|
|
class TestValidationResult:
|
|
"""Tests for ValidationResult class."""
|
|
|
|
def test_initial_valid(self):
|
|
"""Test initial validation is valid."""
|
|
result = ValidationResult(is_valid=True)
|
|
assert result.is_valid is True
|
|
assert result.issues == []
|
|
|
|
def test_add_warning(self):
|
|
"""Test adding warning keeps result valid."""
|
|
result = ValidationResult(is_valid=True)
|
|
result.add_issue(ValidationIssue(
|
|
code="TEST_WARN",
|
|
message="Test warning",
|
|
severity=ValidationSeverity.WARNING
|
|
))
|
|
|
|
assert result.is_valid is True
|
|
assert result.has_warnings is True
|
|
|
|
def test_add_error_invalidates(self):
|
|
"""Test adding error invalidates result."""
|
|
result = ValidationResult(is_valid=True)
|
|
result.add_issue(ValidationIssue(
|
|
code="TEST_ERR",
|
|
message="Test error",
|
|
severity=ValidationSeverity.ERROR
|
|
))
|
|
|
|
assert result.is_valid is False
|
|
assert result.has_errors is True
|
|
|
|
|
|
class TestSelfCorrectionLayer:
|
|
"""Tests for SelfCorrectionLayer class."""
|
|
|
|
@pytest.fixture
|
|
def validator(self):
|
|
"""Create a fresh validator."""
|
|
return SelfCorrectionLayer()
|
|
|
|
def test_validate_email_allowed_domain(self, validator):
|
|
"""Test allowed email domain."""
|
|
result = validator.validate_email_domain("joao@itguys.com.br")
|
|
|
|
assert result.is_valid is True
|
|
assert not result.has_errors
|
|
|
|
def test_validate_email_blocked_domain(self, validator):
|
|
"""Test blocked email domain."""
|
|
result = validator.validate_email_domain("hacker@unknown.com")
|
|
|
|
assert result.is_valid is False
|
|
assert any(i.code == "EMAIL_DOMAIN_NOT_ALLOWED" for i in result.issues)
|
|
|
|
def test_validate_email_invalid_format(self, validator):
|
|
"""Test invalid email format."""
|
|
result = validator.validate_email_domain("not-an-email")
|
|
|
|
assert result.is_valid is False
|
|
assert any(i.code == "EMAIL_INVALID_FORMAT" for i in result.issues)
|
|
|
|
def test_validate_specialist_low_confidence(self, validator):
|
|
"""Test low confidence validation."""
|
|
specialist = SpecialistResponse(
|
|
success=True,
|
|
ticket_id="TKT-001",
|
|
diagnosis="Test",
|
|
recommended_actions=[],
|
|
response_to_client="Resposta de teste para o cliente.",
|
|
context_used=EnrichedContext(),
|
|
model_reasoning="",
|
|
confidence_score=0.2 # Below threshold
|
|
)
|
|
|
|
result = validator.validate_specialist(specialist)
|
|
|
|
assert result.is_valid is False
|
|
assert any(i.code == "SPEC_LOW_CONFIDENCE" for i in result.issues)
|
|
|
|
def test_validate_specialist_empty_response(self, validator):
|
|
"""Test empty response validation."""
|
|
specialist = SpecialistResponse(
|
|
success=True,
|
|
ticket_id="TKT-001",
|
|
diagnosis="Test",
|
|
recommended_actions=[],
|
|
response_to_client="", # Empty
|
|
context_used=EnrichedContext(),
|
|
model_reasoning="",
|
|
confidence_score=0.8
|
|
)
|
|
|
|
result = validator.validate_specialist(specialist)
|
|
|
|
assert result.is_valid is False
|
|
assert any(i.code == "SPEC_EMPTY_RESPONSE" for i in result.issues)
|
|
|
|
def test_validate_blocked_action_rm_rf(self, validator):
|
|
"""Test blocked dangerous command in actions."""
|
|
specialist = SpecialistResponse(
|
|
success=True,
|
|
ticket_id="TKT-001",
|
|
diagnosis="Test",
|
|
recommended_actions=["Execute rm -rf / to clean disk"], # Dangerous!
|
|
response_to_client="Resposta válida de teste para cliente.",
|
|
context_used=EnrichedContext(),
|
|
model_reasoning="",
|
|
confidence_score=0.9
|
|
)
|
|
|
|
result = validator.validate_specialist(specialist)
|
|
|
|
assert any(i.code == "SPEC_BLOCKED_ACTION" for i in result.issues)
|
|
assert any(i.severity == ValidationSeverity.CRITICAL for i in result.issues)
|
|
|
|
def test_validate_blocked_action_drop_database(self, validator):
|
|
"""Test blocked DROP DATABASE command."""
|
|
specialist = SpecialistResponse(
|
|
success=True,
|
|
ticket_id="TKT-001",
|
|
diagnosis="Test",
|
|
recommended_actions=["DROP DATABASE production"],
|
|
response_to_client="Resposta válida de teste.",
|
|
context_used=EnrichedContext(),
|
|
model_reasoning="",
|
|
confidence_score=0.9
|
|
)
|
|
|
|
result = validator.validate_specialist(specialist)
|
|
|
|
assert any(i.code == "SPEC_BLOCKED_ACTION" for i in result.issues)
|
|
|
|
def test_sanitize_response(self, validator):
|
|
"""Test response sanitization."""
|
|
dangerous_response = "Para resolver, execute: rm -rf / no servidor"
|
|
|
|
safe = validator.sanitize_response(dangerous_response)
|
|
|
|
assert "rm -rf" not in safe
|
|
assert "[COMANDO REMOVIDO POR SEGURANÇA]" in safe
|
|
|
|
def test_add_allowed_domain(self, validator):
|
|
"""Test adding new allowed domain."""
|
|
# Initially blocked
|
|
result1 = validator.validate_email_domain("user@newclient.com")
|
|
assert result1.is_valid is False
|
|
|
|
# Add domain
|
|
validator.add_allowed_domain("newclient.com")
|
|
|
|
# Now allowed
|
|
result2 = validator.validate_email_domain("user@newclient.com")
|
|
assert result2.is_valid is True
|
|
|
|
def test_validate_triage_no_tenant(self, validator):
|
|
"""Test triage validation without tenant."""
|
|
triage = TriageResult(
|
|
success=False,
|
|
ticket_id="TKT-001",
|
|
tenant=None,
|
|
entities=ExtractedEntities(),
|
|
sanitized_content="",
|
|
recommended_tools=[],
|
|
reasoning="",
|
|
error="No tenant"
|
|
)
|
|
|
|
result = validator.validate_triage(triage)
|
|
|
|
assert result.is_valid is False
|
|
assert any(i.code == "TRIAGE_NO_TENANT" for i in result.issues)
|
|
|
|
|
|
class TestValidatorSingleton:
|
|
"""Tests for singleton pattern."""
|
|
|
|
def test_singleton(self):
|
|
"""Test singleton returns same instance."""
|
|
import src.agents.validators as module
|
|
module._validator = None
|
|
|
|
v1 = get_validator()
|
|
v2 = get_validator()
|
|
|
|
assert v1 is v2
|