feat: Implementação da Fase 1 - Arquitetura de Dados do Agente Arthur

Porque foi feita essa alteração?
- Nova funcionalidade: Implementação da base de arquitetura de dados para o Agente Arthur (Suporte Técnico N2)
- Criados módulos: models (TenantContext, AuditLog), database (PostgreSQL), security (SecretsManager, DLP), clients (MockFinancial)

Quais testes foram feitos?
- 45 testes unitários com pytest
- Testes de modelos Pydantic (9 testes)
- Testes de DLP filter para CPF, CNPJ, senhas (15 testes)
- Testes de SecretsManager para Docker Secrets (11 testes)
- Testes de MockFinancialClient para resolução de tenants (10 testes)

A alteração gerou um novo teste que precisa ser implementado no pipeline?
- Sim. Os testes em tests/ devem ser adicionados ao pipeline CI/CD com pytest.
This commit is contained in:
João Pedro Toledo Goncalves 2026-02-01 11:52:07 -03:00
parent 318b329e1e
commit 5373daf600
25 changed files with 2090 additions and 84 deletions

View File

@ -1,60 +1,79 @@
# ========================================== # ==========================================
# CORE AI CONFIGURATION (Provider Agnostic) # ARTHUR AGENT CONFIGURATION
# ========================================== # ==========================================
# Select the LLM Provider: "openai", "azure", "anthropic", "ollama", "vertexai" # Environment
LLM_PROVIDER=openai ENVIRONMENT=development
LOG_LEVEL=INFO
# Select the specific model name
# Examples:
# - OpenAI: gpt-4o, gpt-3.5-turbo
# - Anthropic: claude-3-5-sonnet-20240620
# - Ollama: llama3, mistral
# - Azure: azure/gpt-4o (requires specific Azure config below)
LLM_MODEL_NAME=gpt-4o
# ========================================== # ==========================================
# PROVIDER API KEYS (Fill what you use) # DATABASE CONFIGURATION
# ========================================== # ==========================================
# OpenAI # PostgreSQL (Audit Logs)
OPENAI_API_KEY=sk-proj-... POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_USER=arthur
POSTGRES_PASSWORD=Arth#Sup0rt3_2026!xK9
POSTGRES_DB=arthur_db
POSTGRES_MIN_POOL=2
POSTGRES_MAX_POOL=10
# Anthropic # Qdrant (Vector Database)
ANTHROPIC_API_KEY=sk-ant-... QDRANT_HOST=qdrant
QDRANT_PORT=6333
QDRANT_COLLECTION=arthur_knowledge
QDRANT_ON_DISK=true
# Azure OpenAI # ==========================================
AZURE_OPENAI_VERSION=2024-02-15-preview # LLM CONFIGURATION (Local Inference)
AZURE_OPENAI_DEPLOYMENT=... # ==========================================
AZURE_OPENAI_ENDPOINT=...
AZURE_OPENAI_KEY=...
# Ollama (Local) # Ollama endpoint
OLLAMA_BASE_URL=http://localhost:11434 OLLAMA_BASE_URL=http://localhost:11434
# Google Gemini # Triage Model (1B - Fast extraction)
GEMINI_API_KEY=AIzaSy... LLM_TRIAGE_MODEL=llama3.2:1b
LLM_TRIAGE_THREADS=4
LLM_TRIAGE_CONTEXT=2048
# Specialist Model (8B - Reasoning)
LLM_SPECIALIST_MODEL=llama3.1:8b
LLM_SPECIALIST_THREADS=12
LLM_SPECIALIST_CONTEXT=8192
# ========================================== # ==========================================
# SHARED MEMORY CONFIGURATION (Mem0) # ZABBIX INTEGRATION
# ========================================== # ==========================================
# Provider: "mem0" (SaaS) or "qdrant" (Local/Self-Hosted) ZABBIX_API_URL=https://noc.itguys.com.br/api_jsonrpc.php
MEMORY_PROVIDER=mem0 ZABBIX_API_TOKEN=
ZABBIX_VERIFY_SSL=true
# Mem0 SaaS Key ZABBIX_TIMEOUT=30
MEM0_API_KEY=m0-xw...
# Shared Project ID (The "Brain" Context)
MEMORY_PROJECT_ID=itguys_antigravity_v1
# Embedding Configuration
# CRITICAL: Changing this requires re-indexing everything!
# Options: "openai" (text-embedding-3-small), "local" (all-MiniLM-L6-v2)
MEMORY_EMBEDDING_PROVIDER=openai
# ========================================== # ==========================================
# TELEGRAM INTEGRATION # EMAIL CONFIGURATION
# ========================================== # ==========================================
TELEGRAM_BOT_TOKEN=...
TELEGRAM_ALLOWED_CHAT_IDS=123456789,987654321 MAIL_IMAP_HOST=mail.itguys.com.br
MAIL_IMAP_PORT=993
MAIL_SMTP_HOST=mail.itguys.com.br
MAIL_SMTP_PORT=587
MAIL_ADDRESS=arthur.servicedesk@itguys.com.br
MAIL_PASSWORD=
MAIL_POLL_INTERVAL=30
# ==========================================
# LANGFUSE (AI Monitoring - Optional)
# ==========================================
LANGFUSE_SECRET=ArthurLangfuseSecret2026
LANGFUSE_SALT=ArthurSalt2026Random
# ==========================================
# FINANCIAL SYSTEM (Mock for now)
# ==========================================
# When ready, uncomment and configure:
# FINANCIAL_API_URL=https://financeiro.itguys.com.br/api/v1
# FINANCIAL_API_KEY=

View File

@ -24,3 +24,19 @@ Sempre que houver alterações no projeto, realize o commit e envie para o repos
- **Porque foi feita essa alteração?** (Resolução de Bug/Nova funcionalidade/Melhoria de performance/Refatoração/Implementação de segurança/Outro) - **Porque foi feita essa alteração?** (Resolução de Bug/Nova funcionalidade/Melhoria de performance/Refatoração/Implementação de segurança/Outro)
- **Quais testes foram feitos?** (Descreva os testes realizados antes e depois da alteração) - **Quais testes foram feitos?** (Descreva os testes realizados antes e depois da alteração)
- **A alteração gerou um novo teste que precisa ser implementado no pipeline de testes?** (Sim/Não e justificativa) - **A alteração gerou um novo teste que precisa ser implementado no pipeline de testes?** (Sim/Não e justificativa)
### 5. Padrões de Qualidade e Verificação
Para garantir a robustez do código do Agente, siga estas práticas:
- **Linting & Análise Estática:**
- Python: Use `flake8` para encontrar erros de sintaxe ou violações de estilo.
- O código não deve ter *warnings* críticos antes do commit.
- **Formatação (Prettify):**
- Python: Use `Black` ou `autopep8` para garantir formatação consistente.
- Outros (JSON/MD): Use `Prettier` se disponível no ambiente.
- **Testes Unitários (Unit Tests):**
- Framework Oficial: `pytest`.
- **Regra:** Toda nova *tool* ou lógica de negócio complexa deve ter ao menos um teste unitário ("Caminho Feliz") e um teste de falha ("Edge Case").
- Execute `pytest` na raiz do projeto antes de considerar a tarefa "Feita".

View File

@ -22,6 +22,15 @@ O Agente Arthur é um sistema de IA soberano projetado para Suporte Técnico N2
### 3.2 Não Incluso ### 3.2 Não Incluso
- **Treinamento de Base Model:** O projeto utiliza modelos pré-treinados (Llama, Phi, Qwen) e aplica fine-tuning via LoRA, não o treinamento do zero. - **Treinamento de Base Model:** O projeto utiliza modelos pré-treinados (Llama, Phi, Qwen) e aplica fine-tuning via LoRA, não o treinamento do zero.
- **Suporte Nível 1 (Básico):** O foco é em problemas que exigem raciocínio técnico e acesso a monitoramento. - **Suporte Nível 1 (Básico):** O foco é em problemas que exigem raciocínio técnico e acesso a monitoramento.
- **Segurança de E-mail (Spoofing):** A validação de SPF/DKIM/DMARC e a garantia de que o remetente não é forjado é responsabilidade do servidor de e-mail (MTA) e não do código do agente.
### 3.3 Requisitos de Segurança Detalhados
- **Gestão de Segredos:** Proibido uso de credenciais *hardcoded*. Obrigatório uso de **Docker Secrets**.
- **Princípio do Menor Privilégio:** Permissões de APIs e Bando de Dados devem ser estritamente **Read-Only** (Leitura) para o Agente. Escrita permitida apenas no canal de resposta (SMTP) e log de auditoria.
- **Sanitização de Input (Sandbox):** Todo anexo ou dado externo não estruturado deve ser pré-processado em ambiente isolado (Sandbox) para remover scripts ou instruções maliciosas antes de ser lido pelos modelos (Mitigação de *Indirect Prompt Injection*).
- **Controle de Fluxo:** As ferramentas devem ter destinos de saída imutáveis, impedindo o agente de exfiltrar dados para locais não autorizados.
- **Redação de Dados (DLP Leve):** Implementar filtro baseado em *Regex de alta performance* para mascarar credenciais (Senhas, Keys) e PII antes de qualquer processamento ou armazenamento.
- **Rate Limiting de Aplicação:** Definir limite lógico de requisições por Tenant para evitar exaustão de recursos da CPU (Proteção DoS).
## 4. Funcionalidades Principais ## 4. Funcionalidades Principais
@ -34,6 +43,7 @@ O Agente Arthur é um sistema de IA soberano projetado para Suporte Técnico N2
| **FR05** | Memória Histórica Comparativa | Verifica se o chamado atual é uma reincidência ou se possui relação com problemas globais registrados. | | **FR05** | Memória Histórica Comparativa | Verifica se o chamado atual é uma reincidência ou se possui relação com problemas globais registrados. |
| **FR06** | Biblioteca de Ferramentas Extensível | Estrutura modular que permite adicionar novas integrações (AD, Firewall, M365) sem necessidade de retreinar os modelos. | | **FR06** | Biblioteca de Ferramentas Extensível | Estrutura modular que permite adicionar novas integrações (AD, Firewall, M365) sem necessidade de retreinar os modelos. |
| **FR07** | Agrupamento e Deduplicação | Capacidade de correlacionar múltiplos alertas do Zabbix em um único diagnóstico de causa raiz, reduzindo o ruído e notificações duplicadas. | | **FR07** | Agrupamento e Deduplicação | Capacidade de correlacionar múltiplos alertas do Zabbix em um único diagnóstico de causa raiz, reduzindo o ruído e notificações duplicadas. |
| **FR08** | Validação de Output e Reflexão | Mecanismo de auto-crítica (Reflection Loop) onde o agente valida tecnicamente sua própria sugestão (ex: existência do host) antes de enviar a resposta. |
## 5. Requisitos Não Funcionais (NFR) ## 5. Requisitos Não Funcionais (NFR)
- **NFR01 - Soberania:** 100% de execução local sem dependências de APIs de nuvem externas. - **NFR01 - Soberania:** 100% de execução local sem dependências de APIs de nuvem externas.

View File

@ -9,6 +9,8 @@ Este documento serve como o roteiro técnico detalhado para a implementação do
- Criar esquema Pydantic para o objeto `TenantContext`. - Criar esquema Pydantic para o objeto `TenantContext`.
- [ ] **Design do Schema de Auditoria:** - [ ] **Design do Schema de Auditoria:**
- Criar modelo de banco de dados (PostgreSQL) para registrar: `TicketID`, `Remetente`, `Contexto Coletado`, `Pensamento do Modelo`, `Resposta Enviada` e `Status de Resolução`. - Criar modelo de banco de dados (PostgreSQL) para registrar: `TicketID`, `Remetente`, `Contexto Coletado`, `Pensamento do Modelo`, `Resposta Enviada` e `Status de Resolução`.
- [ ] **Mapeamento de Segredos:**
- Listar todas as credenciais necessárias (Zabbix, DB, E-mail) e definir variáveis p/ Docker Secrets.
## Fase 2: Infraestrutura e Conectores Core (Músculos) ## Fase 2: Infraestrutura e Conectores Core (Músculos)
- [ ] **Ambiente de Inferência Local:** - [ ] **Ambiente de Inferência Local:**
@ -23,10 +25,15 @@ Este documento serve como o roteiro técnico detalhado para a implementação do
- [ ] **Conector Zabbix API:** - [ ] **Conector Zabbix API:**
- Implementar wrapper usando `zabbix_utils`. - Implementar wrapper usando `zabbix_utils`.
- Criar funções específicas: `get_host_status`, `get_active_problems`, `get_neighbor_alerts` (Causa Raiz). - Criar funções específicas: `get_host_status`, `get_active_problems`, `get_neighbor_alerts` (Causa Raiz).
- [ ] **Segurança de Infraestrutura:**
- Configurar suporte a **Docker Secrets** no orquestrador.
- Garantir usuário de API do Zabbix com permissão **Read-Only**.
- Implementar **Regex Redaction Filter** (DLP) no módulo de input para sanitizar logs e anexos.
## Fase 3: Orquestração e Raciocínio (Cérebro) ## Fase 3: Orquestração e Raciocínio (Cérebro)
- [ ] **Desenvolvimento do Multi-Agent Dispatcher:** - [ ] **Desenvolvimento do Multi-Agent Dispatcher:**
- Criar o orquestrador (LangGraph) que gerencia o estado do chamado. - Criar o orquestrador (LangGraph) que gerencia o estado do chamado.
- Incluir lógica de `RateLimiter` por Tenant na entrada do Dispatcher (Fila de Prioridade/Descarte).
- [ ] **Implementação do Agente de Triagem (1B):** - [ ] **Implementação do Agente de Triagem (1B):**
- Prompt Engineering para extração de entidades (Cliente, Tecnologia, Problema). - Prompt Engineering para extração de entidades (Cliente, Tecnologia, Problema).
- Lógica de decisão de ferramentas (Single Dispatcher). - Lógica de decisão de ferramentas (Single Dispatcher).
@ -34,10 +41,15 @@ Este documento serve como o roteiro técnico detalhado para a implementação do
- Código Python para comparar alertas do host atual com alertas de sub-rede/vizinhança no Zabbix. - Código Python para comparar alertas do host atual com alertas de sub-rede/vizinhança no Zabbix.
- [ ] **Implementação do Agente Especialista (8B):** - [ ] **Implementação do Agente Especialista (8B):**
- Prompt de Resolução N2: Recebe o "Contexto Enriquecido" (Zabbix + Histórico 24h + Manuais RAG) e gera a resposta técnica final. - Prompt de Resolução N2: Recebe o "Contexto Enriquecido" (Zabbix + Histórico 24h + Manuais RAG) e gera a resposta técnica final.
- [ ] **Camada de Validação e Segurança (Self-Correction):**
- Implementar Schemas Pydantic rigorosos para todas as saídas de ferramentas.
- Criar nó de "Crítico/Reflexão" no LangGraph para validar a existência de hosts/serviços antes de sugerir ações.
- Adicionar asserções de segurança (Defensive Programming) para bloquear ações em domínios não permitidos.
## Fase 4: Flywheel e Qualidade (Aprendizado) ## Fase 4: Flywheel e Qualidade (Aprendizado)
- [ ] **Pipeline de Ingestão de RAG:** - [ ] **Pipeline de Ingestão de RAG:**
- Criar script para processar diretórios de Markdowns/PDFs técnicos e indexar no Qdrant com metadados de tecnologia. - Criar script para processar diretórios de Markdowns/PDFs técnicos e indexar no Qdrant com metadados de tecnologia.
- **Segurança:** Implementar passo de sanitização onde anexos rodam em sandbox e apenas texto puro é extraído.
- [ ] **Parser de Feedback de Encerramento:** - [ ] **Parser de Feedback de Encerramento:**
- Desenvolver lógica para ler respostas de e-mail dos técnicos e identificar se o caso foi "Resolvido" ou "Reaberto". - Desenvolver lógica para ler respostas de e-mail dos técnicos e identificar se o caso foi "Resolvido" ou "Reaberto".
- [ ] **Módulo de Memória Episódica:** - [ ] **Módulo de Memória Episódica:**

View File

@ -1,43 +1,84 @@
services: services:
# Main Banking Agent API # Arthur Agent API
app: app:
build: . build: .
container_name: antigravity_brain container_name: arthur_agent
ports: ports:
- "8000:8000" - "8000:8000"
volumes: volumes:
- .:/app - .:/app
environment: environment:
- PYTHONUNBUFFERED=1 - PYTHONUNBUFFERED=1
- POSTGRES_HOST=postgres
- QDRANT_HOST=qdrant
env_file: env_file:
- .env - .env
depends_on: depends_on:
postgres:
condition: service_healthy
qdrant: qdrant:
condition: service_started condition: service_started
networks: networks:
- antigravity_net - arthur_net
# Override command to run API instead of Chainlit
command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload
# PostgreSQL Database (Audit Logs)
postgres:
image: postgres:16-alpine
container_name: arthur_postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER: ${POSTGRES_USER:-arthur}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-Arth#Sup0rt3_2026!xK9}
POSTGRES_DB: ${POSTGRES_DB:-arthur_db}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- arthur_net
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-arthur}"]
interval: 5s
timeout: 5s
retries: 5
# Vector Database (RAG) # Vector Database (RAG)
qdrant: qdrant:
image: qdrant/qdrant image: qdrant/qdrant:latest
container_name: antigravity_qdrant container_name: arthur_qdrant
ports: ports:
- "6333:6333" - "6333:6333"
- "6334:6334" # gRPC port
volumes: volumes:
- qdrant_data:/qdrant/storage - qdrant_data:/qdrant/storage
environment:
QDRANT__SERVICE__GRPC_PORT: 6334
networks: networks:
- antigravity_net - arthur_net
# Monitoring (Zabbix Agent) - Placeholder for now as per PRD # Langfuse - AI Tracing (Optional for debugging)
# zabbix-agent: langfuse:
# image: zabbix/zabbix-agent:latest image: langfuse/langfuse:2
# ... container_name: arthur_langfuse
ports:
- "3000:3000"
environment:
DATABASE_URL: postgresql://${POSTGRES_USER:-arthur}:${POSTGRES_PASSWORD:-Arth#Sup0rt3_2026!xK9}@postgres:5432/langfuse
NEXTAUTH_SECRET: ${LANGFUSE_SECRET:-ArthurLangfuseSecret2026}
NEXTAUTH_URL: http://localhost:3000
SALT: ${LANGFUSE_SALT:-ArthurSalt2026Random}
depends_on:
postgres:
condition: service_healthy
networks:
- arthur_net
profiles:
- monitoring # Only start with: docker-compose --profile monitoring up
volumes: volumes:
postgres_data:
qdrant_data: qdrant_data:
networks: networks:
antigravity_net: arthur_net:
driver: bridge driver: bridge

7
pytest.ini Normal file
View File

@ -0,0 +1,7 @@
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
asyncio_mode = auto
addopts = -v --tb=short

View File

@ -1,10 +1,46 @@
python-dotenv # ==========================================
openai # CORE DEPENDENCIES
langchain_openai # ==========================================
langchain_community python-dotenv>=1.0.0
qdrant-client pydantic>=2.5.0
fastapi pydantic-ai>=0.0.20
uvicorn
pydantic # ==========================================
pydantic-ai # DATABASE & STORAGE
logfire # ==========================================
asyncpg>=0.29.0
qdrant-client>=1.7.0
# ==========================================
# WEB FRAMEWORK
# ==========================================
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
# ==========================================
# AI & LLM (Local Inference)
# ==========================================
langchain>=0.1.0
langchain-community>=0.0.20
langgraph>=0.0.20
# ==========================================
# INTEGRATIONS
# ==========================================
zabbix-utils>=2.0.0
aiosmtplib>=3.0.0
aioimaplib>=1.1.0
# ==========================================
# MONITORING & LOGGING
# ==========================================
logfire>=0.30.0
# ==========================================
# DEVELOPMENT & TESTING
# ==========================================
pytest>=7.4.0
pytest-asyncio>=0.23.0
pytest-cov>=4.1.0
black>=24.1.0
flake8>=7.0.0

9
src/clients/__init__.py Normal file
View File

@ -0,0 +1,9 @@
# Clients Module for Arthur Agent (External Integrations)
from .mock_financial import MockFinancialClient, FinancialClient
from .mail_client import MailConfig
__all__ = [
"MockFinancialClient",
"FinancialClient",
"MailConfig",
]

View File

@ -0,0 +1,89 @@
"""
Mail Client Configuration for Arthur Agent.
Handles IMAP/SMTP configuration for email-based support workflow.
"""
import os
from dataclasses import dataclass
from typing import Optional
from src.security.secrets_manager import get_secrets_manager
@dataclass
class MailConfig:
"""
Email configuration for Arthur Agent.
Uses arthur.servicedesk@itguys.com.br via mail.itguys.com.br
"""
# IMAP Configuration (Incoming)
imap_host: str = "mail.itguys.com.br"
imap_port: int = 993
imap_use_ssl: bool = True
# SMTP Configuration (Outgoing)
smtp_host: str = "mail.itguys.com.br"
smtp_port: int = 587
smtp_use_tls: bool = True
# Credentials
email_address: str = "arthur.servicedesk@itguys.com.br"
email_password: Optional[str] = None
# Polling Configuration
poll_interval_seconds: int = 30
inbox_folder: str = "INBOX"
processed_folder: str = "Processed"
@classmethod
def from_environment(cls) -> "MailConfig":
"""
Load mail configuration from environment/secrets.
Returns:
MailConfig instance with values from environment
"""
secrets = get_secrets_manager()
return cls(
imap_host=os.getenv("MAIL_IMAP_HOST", "mail.itguys.com.br"),
imap_port=int(os.getenv("MAIL_IMAP_PORT", "993")),
imap_use_ssl=os.getenv("MAIL_IMAP_SSL", "true").lower() == "true",
smtp_host=os.getenv("MAIL_SMTP_HOST", "mail.itguys.com.br"),
smtp_port=int(os.getenv("MAIL_SMTP_PORT", "587")),
smtp_use_tls=os.getenv("MAIL_SMTP_TLS", "true").lower() == "true",
email_address=os.getenv(
"MAIL_ADDRESS",
"arthur.servicedesk@itguys.com.br"
),
email_password=secrets.get("MAIL_PASSWORD"),
poll_interval_seconds=int(os.getenv("MAIL_POLL_INTERVAL", "30")),
inbox_folder=os.getenv("MAIL_INBOX_FOLDER", "INBOX"),
processed_folder=os.getenv("MAIL_PROCESSED_FOLDER", "Processed"),
)
def validate(self) -> list[str]:
"""
Validate the configuration.
Returns:
List of validation errors (empty if valid)
"""
errors = []
if not self.email_address:
errors.append("Email address is required")
if not self.email_password:
errors.append("Email password is required (set MAIL_PASSWORD secret)")
if not self.imap_host:
errors.append("IMAP host is required")
if not self.smtp_host:
errors.append("SMTP host is required")
return errors

View File

@ -0,0 +1,186 @@
"""
Mock Financial System Client for Arthur Agent.
Provides mock data for tenant resolution until the real
Financial System API is available.
"""
import logging
from typing import Optional, Protocol
from datetime import datetime
from src.models.tenant import TenantContext, TenantStatus
logger = logging.getLogger("ArthurFinancial")
class FinancialClient(Protocol):
"""Protocol defining the Financial System client interface."""
async def get_tenant_by_email_domain(
self, domain: str
) -> Optional[TenantContext]:
"""Get tenant information by email domain."""
...
async def get_tenant_by_id(self, tenant_id: str) -> Optional[TenantContext]:
"""Get tenant information by ID."""
...
async def list_active_tenants(self) -> list[TenantContext]:
"""List all active tenants."""
...
# Mock data for development
MOCK_TENANTS: list[TenantContext] = [
TenantContext(
id="tenant_oestepan",
name="OESTEPAN Ltda",
email_domains=["oestepan.com.br", "oestepan.net"],
status=TenantStatus.ACTIVE,
zabbix_host_group="OESTEPAN-Infrastructure",
qdrant_collection="oestepan_knowledge",
rate_limit_per_hour=25,
created_at=datetime(2025, 1, 15, 10, 0, 0)
),
TenantContext(
id="tenant_enseg",
name="ENSEG Corretora de Seguros",
email_domains=["enseg.com.br", "enseg.net.br"],
status=TenantStatus.ACTIVE,
zabbix_host_group="ENSEG-Infrastructure",
qdrant_collection="enseg_knowledge",
rate_limit_per_hour=20,
created_at=datetime(2025, 2, 20, 14, 30, 0)
),
TenantContext(
id="tenant_itguys",
name="iT Guys Tecnologia",
email_domains=["itguys.com.br", "itguys.net"],
status=TenantStatus.ACTIVE,
zabbix_host_group="ITGUYS-Internal",
qdrant_collection="itguys_knowledge",
rate_limit_per_hour=50, # Internal - higher limit
created_at=datetime(2024, 6, 1, 9, 0, 0)
),
TenantContext(
id="tenant_demo",
name="Cliente Demo (Inativo)",
email_domains=["demo-cliente.com.br"],
status=TenantStatus.INACTIVE,
zabbix_host_group=None,
qdrant_collection=None,
rate_limit_per_hour=5,
created_at=datetime(2025, 11, 1, 12, 0, 0)
),
]
class MockFinancialClient:
"""
Mock implementation of the Financial System client.
Returns pre-defined tenant data for development and testing.
Will be replaced with real API client when Financial System is ready.
"""
def __init__(self):
self._tenants = {t.id: t for t in MOCK_TENANTS}
self._domain_map: dict[str, str] = {}
# Build domain to tenant mapping
for tenant in MOCK_TENANTS:
for domain in tenant.email_domains:
self._domain_map[domain.lower()] = tenant.id
logger.info(
f"MockFinancialClient initialized with {len(self._tenants)} tenants"
)
async def get_tenant_by_email_domain(
self, domain: str
) -> Optional[TenantContext]:
"""
Get tenant information by email domain.
Args:
domain: Email domain (e.g., "oestepan.com.br")
Returns:
TenantContext if found and active, None otherwise
"""
domain_lower = domain.lower()
tenant_id = self._domain_map.get(domain_lower)
if not tenant_id:
logger.warning(f"No tenant found for domain: {domain}")
return None
tenant = self._tenants.get(tenant_id)
if tenant and tenant.status != TenantStatus.ACTIVE:
logger.warning(
f"Tenant {tenant_id} found but status is {tenant.status}"
)
return None
return tenant
async def get_tenant_by_id(self, tenant_id: str) -> Optional[TenantContext]:
"""
Get tenant information by ID.
Args:
tenant_id: Tenant identifier
Returns:
TenantContext if found, None otherwise
"""
return self._tenants.get(tenant_id)
async def list_active_tenants(self) -> list[TenantContext]:
"""
List all active tenants.
Returns:
List of active TenantContext objects
"""
return [
t for t in self._tenants.values()
if t.status == TenantStatus.ACTIVE
]
async def resolve_tenant_from_email(
self, email: str
) -> Optional[TenantContext]:
"""
Resolve tenant from a full email address.
Args:
email: Full email address (e.g., "joao@oestepan.com.br")
Returns:
TenantContext if found and active, None otherwise
"""
if "@" not in email:
logger.error(f"Invalid email format: {email}")
return None
domain = email.split("@")[1]
return await self.get_tenant_by_email_domain(domain)
# Factory function for dependency injection
def get_financial_client() -> FinancialClient:
"""
Get the financial system client.
Returns MockFinancialClient for now.
Will return real client when Financial System API is available.
"""
# TODO: Replace with real client when API is ready
# if os.getenv("FINANCIAL_API_URL"):
# return RealFinancialClient(os.getenv("FINANCIAL_API_URL"))
return MockFinancialClient()

View File

@ -1,37 +1,158 @@
"""
Central Configuration for Arthur Agent.
Manages all configuration from environment variables and Docker Secrets.
"""
import os import os
import logging import logging
from dotenv import load_dotenv from dotenv import load_dotenv
from dataclasses import dataclass
from typing import Optional
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
# Setup Logging # Setup Logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(
logger = logging.getLogger("AntigravityConfig") level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("ArthurConfig")
@dataclass
class QdrantConfig:
"""Qdrant vector database configuration."""
host: str = "localhost"
port: int = 6333
collection_name: str = "arthur_knowledge"
use_grpc: bool = False
on_disk: bool = True # Per PRD: optimize RAM usage
@dataclass
class PostgresConfig:
"""PostgreSQL database configuration."""
host: str = "postgres"
port: int = 5432
database: str = "arthur_db"
user: str = "arthur"
password: Optional[str] = None
min_pool_size: int = 2
max_pool_size: int = 10
@dataclass
class LLMConfig:
"""Local LLM configuration for CPU inference."""
# Triage Model (1B - Fast)
triage_model: str = "llama3.2:1b"
triage_threads: int = 4
# Specialist Model (8B - Reasoning)
specialist_model: str = "llama3.1:8b"
specialist_threads: int = 12
# Ollama endpoint
ollama_base_url: str = "http://localhost:11434"
# Context windows
triage_context: int = 2048
specialist_context: int = 8192
@dataclass
class ZabbixConfig:
"""Zabbix API configuration."""
url: str = "https://noc.itguys.com.br/api_jsonrpc.php"
api_token: Optional[str] = None
verify_ssl: bool = True
timeout: int = 30
@dataclass
class MailConfig:
"""Email configuration."""
imap_host: str = "mail.itguys.com.br"
imap_port: int = 993
smtp_host: str = "mail.itguys.com.br"
smtp_port: int = 587
email_address: str = "arthur.servicedesk@itguys.com.br"
password: Optional[str] = None
class Config: class Config:
""" """
Central Configuration for Banking Agent. Central Configuration for Arthur Agent.
""" """
@staticmethod @staticmethod
def get_qdrant_config(): def get_qdrant_config() -> QdrantConfig:
""" """Returns Qdrant connection configuration."""
Returns Qdrant connection configuration. return QdrantConfig(
""" host=os.getenv("QDRANT_HOST", "qdrant"),
return { port=int(os.getenv("QDRANT_PORT", "6333")),
"host": os.getenv("QDRANT_HOST", "localhost"), collection_name=os.getenv("QDRANT_COLLECTION", "arthur_knowledge"),
"port": int(os.getenv("QDRANT_PORT", 6333)), use_grpc=os.getenv("QDRANT_USE_GRPC", "false").lower() == "true",
"collection_name": os.getenv("QDRANT_COLLECTION", "banking_transactions") on_disk=os.getenv("QDRANT_ON_DISK", "true").lower() == "true",
} )
@staticmethod @staticmethod
def get_llm_config(): def get_postgres_config() -> PostgresConfig:
""" """Returns PostgreSQL configuration."""
Returns LLM Configuration for Local Inference. return PostgresConfig(
""" host=os.getenv("POSTGRES_HOST", "postgres"),
return { port=int(os.getenv("POSTGRES_PORT", "5432")),
"model_path": os.getenv("LLM_MODEL_PATH", "./models/llama-3.2-1b.Q4_K_M.gguf"), database=os.getenv("POSTGRES_DB", "arthur_db"),
"n_ctx": int(os.getenv("LLM_CONTEXT_WINDOW", 2048)), user=os.getenv("POSTGRES_USER", "arthur"),
"n_threads": int(os.getenv("LLM_THREADS", 4)) # CPU threads password=os.getenv("POSTGRES_PASSWORD"),
} min_pool_size=int(os.getenv("POSTGRES_MIN_POOL", "2")),
max_pool_size=int(os.getenv("POSTGRES_MAX_POOL", "10")),
)
@staticmethod
def get_llm_config() -> LLMConfig:
"""Returns LLM Configuration for Local Inference."""
return LLMConfig(
triage_model=os.getenv("LLM_TRIAGE_MODEL", "llama3.2:1b"),
triage_threads=int(os.getenv("LLM_TRIAGE_THREADS", "4")),
specialist_model=os.getenv("LLM_SPECIALIST_MODEL", "llama3.1:8b"),
specialist_threads=int(os.getenv("LLM_SPECIALIST_THREADS", "12")),
ollama_base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
triage_context=int(os.getenv("LLM_TRIAGE_CONTEXT", "2048")),
specialist_context=int(os.getenv("LLM_SPECIALIST_CONTEXT", "8192")),
)
@staticmethod
def get_zabbix_config() -> ZabbixConfig:
"""Returns Zabbix API configuration."""
return ZabbixConfig(
url=os.getenv("ZABBIX_API_URL", "https://noc.itguys.com.br/api_jsonrpc.php"),
api_token=os.getenv("ZABBIX_API_TOKEN"),
verify_ssl=os.getenv("ZABBIX_VERIFY_SSL", "true").lower() == "true",
timeout=int(os.getenv("ZABBIX_TIMEOUT", "30")),
)
@staticmethod
def get_mail_config() -> MailConfig:
"""Returns email configuration."""
return MailConfig(
imap_host=os.getenv("MAIL_IMAP_HOST", "mail.itguys.com.br"),
imap_port=int(os.getenv("MAIL_IMAP_PORT", "993")),
smtp_host=os.getenv("MAIL_SMTP_HOST", "mail.itguys.com.br"),
smtp_port=int(os.getenv("MAIL_SMTP_PORT", "587")),
email_address=os.getenv("MAIL_ADDRESS", "arthur.servicedesk@itguys.com.br"),
password=os.getenv("MAIL_PASSWORD"),
)
@staticmethod
def is_production() -> bool:
"""Check if running in production mode."""
return os.getenv("ENVIRONMENT", "development").lower() == "production"
@staticmethod
def get_log_level() -> int:
"""Get configured log level."""
level = os.getenv("LOG_LEVEL", "INFO").upper()
return getattr(logging, level, logging.INFO)

7
src/database/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# Database Module for Arthur Agent
from .connection import DatabaseManager, get_db_manager
__all__ = [
"DatabaseManager",
"get_db_manager",
]

120
src/database/connection.py Normal file
View File

@ -0,0 +1,120 @@
"""
Database Connection Manager for Arthur Agent.
Handles PostgreSQL connections for audit logging with support
for Docker Secrets and connection pooling.
"""
import os
import logging
from typing import Optional
from contextlib import asynccontextmanager
import asyncpg
from asyncpg import Pool
from src.security.secrets_manager import SecretsManager
logger = logging.getLogger("ArthurDB")
class DatabaseManager:
"""
Manages PostgreSQL database connections.
Supports both Docker Secrets (production) and environment
variables (development) for credential management.
"""
def __init__(self):
self._pool: Optional[Pool] = None
self._secrets = SecretsManager()
@property
def dsn(self) -> str:
"""Build PostgreSQL connection DSN from secrets."""
return (
f"postgresql://{self._secrets.get('POSTGRES_USER')}:"
f"{self._secrets.get('POSTGRES_PASSWORD')}@"
f"{os.getenv('POSTGRES_HOST', 'postgres')}:"
f"{os.getenv('POSTGRES_PORT', '5432')}/"
f"{self._secrets.get('POSTGRES_DB')}"
)
async def connect(self, min_size: int = 2, max_size: int = 10) -> None:
"""
Initialize the connection pool.
Args:
min_size: Minimum number of connections in pool
max_size: Maximum number of connections in pool
"""
if self._pool is not None:
logger.warning("Connection pool already initialized")
return
try:
self._pool = await asyncpg.create_pool(
dsn=self.dsn,
min_size=min_size,
max_size=max_size,
command_timeout=60
)
logger.info("Database connection pool initialized successfully")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
async def disconnect(self) -> None:
"""Close the connection pool."""
if self._pool:
await self._pool.close()
self._pool = None
logger.info("Database connection pool closed")
@asynccontextmanager
async def acquire(self):
"""
Acquire a connection from the pool.
Usage:
async with db.acquire() as conn:
await conn.execute(...)
"""
if self._pool is None:
raise RuntimeError("Database not connected. Call connect() first.")
async with self._pool.acquire() as connection:
yield connection
async def execute(self, query: str, *args) -> str:
"""Execute a query and return status."""
async with self.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args) -> list:
"""Execute a query and return all rows."""
async with self.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args) -> Optional[asyncpg.Record]:
"""Execute a query and return a single row."""
async with self.acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(self, query: str, *args):
"""Execute a query and return a single value."""
async with self.acquire() as conn:
return await conn.fetchval(query, *args)
# Singleton instance
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Get the global database manager instance."""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager

135
src/database/migrations.py Normal file
View File

@ -0,0 +1,135 @@
"""
Database Migrations for Arthur Agent.
Contains SQL scripts for creating and managing the audit schema.
"""
import logging
from typing import Optional
from .connection import DatabaseManager
logger = logging.getLogger("ArthurMigrations")
# SQL for creating the audit schema
MIGRATIONS = [
# Migration 001: Create tenants table
"""
CREATE TABLE IF NOT EXISTS tenants (
id VARCHAR(100) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email_domains TEXT[] NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'active',
zabbix_host_group VARCHAR(255),
qdrant_collection VARCHAR(255),
rate_limit_per_hour INTEGER DEFAULT 10,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""",
# Migration 002: Create audit_logs table
"""
CREATE TABLE IF NOT EXISTS audit_logs (
id SERIAL PRIMARY KEY,
ticket_id VARCHAR(100) NOT NULL UNIQUE,
tenant_id VARCHAR(100) NOT NULL REFERENCES tenants(id),
sender_email VARCHAR(255) NOT NULL,
subject TEXT NOT NULL,
original_message TEXT NOT NULL,
context_collected JSONB DEFAULT '{}',
triage_model_output TEXT,
specialist_model_reasoning TEXT,
response_sent TEXT,
tools_called TEXT[] DEFAULT ARRAY[]::TEXT[],
resolution_status VARCHAR(20) NOT NULL DEFAULT 'pending',
processing_time_ms INTEGER,
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
resolved_at TIMESTAMP WITH TIME ZONE
);
""",
# Migration 003: Create index for faster queries
"""
CREATE INDEX IF NOT EXISTS idx_audit_logs_tenant_id
ON audit_logs(tenant_id);
""",
"""
CREATE INDEX IF NOT EXISTS idx_audit_logs_created_at
ON audit_logs(created_at);
""",
"""
CREATE INDEX IF NOT EXISTS idx_audit_logs_status
ON audit_logs(resolution_status);
""",
# Migration 004: Create tickets history view for 24h lookback
"""
CREATE OR REPLACE VIEW recent_tickets AS
SELECT
ticket_id,
tenant_id,
sender_email,
subject,
resolution_status,
created_at
FROM audit_logs
WHERE created_at > NOW() - INTERVAL '24 hours';
""",
# Migration 005: Create rate limiting table
"""
CREATE TABLE IF NOT EXISTS rate_limits (
tenant_id VARCHAR(100) NOT NULL REFERENCES tenants(id),
request_count INTEGER DEFAULT 0,
window_start TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (tenant_id)
);
""",
]
async def run_migrations(db: DatabaseManager) -> None:
"""
Run all pending database migrations.
Args:
db: DatabaseManager instance
"""
logger.info("Starting database migrations...")
for i, migration in enumerate(MIGRATIONS, 1):
try:
await db.execute(migration)
logger.info(f"Migration {i}/{len(MIGRATIONS)} completed")
except Exception as e:
logger.error(f"Migration {i} failed: {e}")
raise
logger.info("All migrations completed successfully")
async def reset_database(db: DatabaseManager) -> None:
"""
Drop all tables and recreate them.
WARNING: This will delete all data! Use only in development.
"""
logger.warning("Resetting database - ALL DATA WILL BE LOST!")
drop_statements = [
"DROP VIEW IF EXISTS recent_tickets CASCADE;",
"DROP TABLE IF EXISTS rate_limits CASCADE;",
"DROP TABLE IF EXISTS audit_logs CASCADE;",
"DROP TABLE IF EXISTS tenants CASCADE;",
]
for stmt in drop_statements:
await db.execute(stmt)
await run_migrations(db)
logger.info("Database reset completed")

11
src/models/__init__.py Normal file
View File

@ -0,0 +1,11 @@
# Models Module for Arthur Agent
from .tenant import TenantContext, TenantStatus
from .audit import AuditLog, ResolutionStatus, TicketContext
__all__ = [
"TenantContext",
"TenantStatus",
"AuditLog",
"ResolutionStatus",
"TicketContext",
]

142
src/models/audit.py Normal file
View File

@ -0,0 +1,142 @@
"""
Audit Log Model for Arthur Agent.
Defines the schema for audit logging, ensuring all agent decisions
are traceable and auditable per PRD requirements.
"""
from datetime import datetime
from enum import Enum
from typing import Optional, Any
from pydantic import BaseModel, Field
class ResolutionStatus(str, Enum):
"""Status of a support ticket resolution."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
RESOLVED = "resolved"
ESCALATED = "escalated"
REOPENED = "reopened"
FAILED = "failed"
class TicketContext(BaseModel):
"""
Context collected during ticket processing.
Contains all data gathered from various sources (Zabbix, RAG, history)
before the model generates a response.
"""
zabbix_data: Optional[dict[str, Any]] = Field(
default=None,
description="Data retrieved from Zabbix API (host status, alerts)"
)
rag_context: Optional[list[str]] = Field(
default=None,
description="Relevant documents retrieved from RAG"
)
historical_tickets: Optional[list[dict]] = Field(
default=None,
description="Similar tickets from last 24h"
)
neighbor_alerts: Optional[list[dict]] = Field(
default=None,
description="Alerts from neighboring hosts (root cause analysis)"
)
extracted_entities: Optional[dict[str, str]] = Field(
default=None,
description="Entities extracted by triage model (technology, problem)"
)
class AuditLog(BaseModel):
"""
Complete audit log for a support ticket interaction.
Every decision made by Arthur is logged here for traceability
and compliance. Stored in PostgreSQL.
"""
ticket_id: str = Field(
...,
description="Unique identifier for this support ticket"
)
tenant_id: str = Field(
...,
description="Tenant identifier for multitenant isolation"
)
sender_email: str = Field(
...,
description="Email address of the requester (sanitized)"
)
subject: str = Field(
...,
description="Email subject line"
)
original_message: str = Field(
...,
description="Original message body (sanitized by DLP filter)"
)
context_collected: TicketContext = Field(
default_factory=TicketContext,
description="All context gathered during processing"
)
triage_model_output: Optional[str] = Field(
default=None,
description="Raw output from triage model (1B)"
)
specialist_model_reasoning: Optional[str] = Field(
default=None,
description="Reasoning chain from specialist model (8B)"
)
response_sent: Optional[str] = Field(
default=None,
description="Final response sent to the user"
)
tools_called: list[str] = Field(
default_factory=list,
description="List of tools invoked during processing"
)
resolution_status: ResolutionStatus = Field(
default=ResolutionStatus.PENDING,
description="Current resolution status"
)
processing_time_ms: Optional[int] = Field(
default=None,
description="Total processing time in milliseconds"
)
error_message: Optional[str] = Field(
default=None,
description="Error message if processing failed"
)
created_at: datetime = Field(
default_factory=datetime.utcnow,
description="Timestamp when ticket was received"
)
updated_at: datetime = Field(
default_factory=datetime.utcnow,
description="Timestamp of last update"
)
resolved_at: Optional[datetime] = Field(
default=None,
description="Timestamp when ticket was resolved"
)
class Config:
"""Pydantic configuration."""
json_schema_extra = {
"example": {
"ticket_id": "TKT-2026-0201-001",
"tenant_id": "tenant_oestepan",
"sender_email": "joao@oestepan.com.br",
"subject": "Servidor não responde",
"original_message": "O servidor srv-app01 não está respondendo...",
"resolution_status": "resolved",
"processing_time_ms": 2500,
"created_at": "2026-02-01T10:30:00Z",
"resolved_at": "2026-02-01T10:35:00Z"
}
}

109
src/models/tenant.py Normal file
View File

@ -0,0 +1,109 @@
"""
Tenant Context Model for Arthur Agent.
Defines the schema for tenant (customer) identification and context
used for multitenant isolation in the support system.
"""
from datetime import datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field, EmailStr
class TenantStatus(str, Enum):
"""Status of a tenant in the system."""
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class TenantContext(BaseModel):
"""
Schema for Tenant identification and context.
Used by the Tenant Resolver to identify which customer
is making a support request based on email domain.
"""
id: str = Field(
...,
description="Unique identifier for the tenant",
examples=["tenant_oestepan", "tenant_enseg"]
)
name: str = Field(
...,
description="Company/Customer name",
examples=["OESTEPAN", "ENSEG"]
)
email_domains: list[str] = Field(
...,
description="List of email domains associated with this tenant",
examples=[["oestepan.com.br"], ["enseg.com.br", "enseg.net"]]
)
status: TenantStatus = Field(
default=TenantStatus.ACTIVE,
description="Current status of the tenant"
)
zabbix_host_group: Optional[str] = Field(
default=None,
description="Zabbix host group name for this tenant's infrastructure"
)
qdrant_collection: Optional[str] = Field(
default=None,
description="Qdrant collection name for tenant-specific RAG"
)
rate_limit_per_hour: int = Field(
default=10,
description="Maximum requests per hour for this tenant (DoS protection)"
)
created_at: datetime = Field(
default_factory=datetime.utcnow,
description="Timestamp when tenant was created"
)
class Config:
"""Pydantic configuration."""
json_schema_extra = {
"example": {
"id": "tenant_oestepan",
"name": "OESTEPAN Ltda",
"email_domains": ["oestepan.com.br"],
"status": "active",
"zabbix_host_group": "OESTEPAN-Infrastructure",
"qdrant_collection": "oestepan_knowledge",
"rate_limit_per_hour": 20,
"created_at": "2026-01-15T10:00:00Z"
}
}
class TenantResolver:
"""
Resolves tenant context from email domain.
This is a placeholder that will connect to the Financial System API
when it becomes available. Currently uses mock data.
"""
@staticmethod
def resolve_from_email(email: str) -> Optional[TenantContext]:
"""
Resolve tenant from sender email address.
Args:
email: Sender email address
Returns:
TenantContext if found, None otherwise
"""
# Extract domain from email
if "@" not in email:
return None
domain = email.split("@")[1].lower()
# TODO: Replace with actual API call to Financial System
# For now, return None - mock data is in MockFinancialClient
return None

9
src/security/__init__.py Normal file
View File

@ -0,0 +1,9 @@
# Security Module for Arthur Agent
from .secrets_manager import SecretsManager
from .dlp_filter import DLPFilter, sanitize_text
__all__ = [
"SecretsManager",
"DLPFilter",
"sanitize_text",
]

184
src/security/dlp_filter.py Normal file
View File

@ -0,0 +1,184 @@
"""
Data Loss Prevention (DLP) Filter for Arthur Agent.
Implements regex-based redaction for sensitive data including
passwords, API keys, and personally identifiable information (PII).
This is a critical security layer per PRD requirements.
"""
import re
import logging
from typing import Callable
logger = logging.getLogger("ArthurDLP")
# Pre-compiled regex patterns for high performance
# Focado em dados sensíveis relevantes para contexto local (iT Guys)
PATTERNS: dict[str, re.Pattern] = {
# Passwords in various formats (password=, password:, password is:, etc)
"password": re.compile(
r'(?i)(password|senha|pwd|pass|secret)(?:\s+(?:is|e|é))?\s*[:=]\s*["\']?([^\s"\'\n]{3,})["\']?',
re.IGNORECASE
),
# API Keys and Tokens (generic - Zabbix, internal systems)
"api_key": re.compile(
r'(?i)(api[_-]?key|apikey|token|bearer|authorization)\s*[:=]\s*["\']?([a-zA-Z0-9_\-\.]{20,})["\']?',
re.IGNORECASE
),
# Generic secret patterns
"secret_value": re.compile(
r'(?i)(private[_-]?key|client[_-]?secret)\s*[:=]\s*["\']?([^\s"\'\n]{8,})["\']?',
re.IGNORECASE
),
# Brazilian CPF (critical for LGPD compliance)
"cpf": re.compile(
r'\b\d{3}\.?\d{3}\.?\d{3}[-]?\d{2}\b'
),
# Brazilian CNPJ
"cnpj": re.compile(
r'\b\d{2}\.?\d{3}\.?\d{3}/?\d{4}[-]?\d{2}\b'
),
# Credit card numbers (caso cliente envie por engano)
"credit_card": re.compile(
r'\b(?:\d{4}[-\s]?){3}\d{4}\b'
),
# Email addresses (partial redaction - hide user, keep domain)
"email_partial": re.compile(
r'\b([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b'
),
# SSH/RSA Private Keys
"private_key": re.compile(
r'-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----[\s\S]*?-----END\s+(?:RSA\s+)?PRIVATE\s+KEY-----',
re.MULTILINE
),
}
# Redaction mask
REDACTED = "[REDACTED]"
REDACTED_EMAIL = "[EMAIL_REDACTED]"
REDACTED_CPF = "[CPF_REDACTED]"
REDACTED_CNPJ = "[CNPJ_REDACTED]"
REDACTED_CARD = "[CARD_REDACTED]"
REDACTED_KEY = "[KEY_REDACTED]"
class DLPFilter:
"""
Data Loss Prevention filter for sanitizing sensitive data.
Uses high-performance regex patterns to identify and redact
sensitive information before processing or storage.
"""
def __init__(self, custom_patterns: dict[str, re.Pattern] = None):
"""
Initialize the DLP filter.
Args:
custom_patterns: Additional regex patterns to use
"""
self.patterns = PATTERNS.copy()
if custom_patterns:
self.patterns.update(custom_patterns)
self._stats = {
"total_processed": 0,
"total_redacted": 0,
"by_type": {key: 0 for key in self.patterns}
}
def sanitize(self, text: str, log_stats: bool = False) -> str:
"""
Sanitize text by redacting sensitive information.
Args:
text: Input text to sanitize
log_stats: Log statistics about redactions
Returns:
Sanitized text with sensitive data redacted
"""
if not text:
return text
self._stats["total_processed"] += 1
result = text
redaction_count = 0
# Apply each pattern
for pattern_name, pattern in self.patterns.items():
matches = pattern.findall(result)
if matches:
redaction_count += len(matches)
self._stats["by_type"][pattern_name] += len(matches)
# Apply appropriate redaction
if pattern_name == "cpf":
result = pattern.sub(REDACTED_CPF, result)
elif pattern_name == "cnpj":
result = pattern.sub(REDACTED_CNPJ, result)
elif pattern_name == "credit_card":
result = pattern.sub(REDACTED_CARD, result)
elif pattern_name == "email_partial":
# Partial email redaction: keep domain
result = pattern.sub(r'[USER]@\2', result)
elif pattern_name == "private_key":
result = pattern.sub(REDACTED_KEY, result)
elif pattern_name in ("password", "api_key", "secret_value"):
# For key=value patterns, keep the key name
result = pattern.sub(r'\1: ' + REDACTED, result)
else:
result = pattern.sub(REDACTED, result)
if redaction_count > 0:
self._stats["total_redacted"] += redaction_count
if log_stats:
logger.info(
f"Redacted {redaction_count} sensitive items from text"
)
return result
def get_stats(self) -> dict:
"""Get redaction statistics."""
return self._stats.copy()
def reset_stats(self) -> None:
"""Reset statistics counters."""
self._stats = {
"total_processed": 0,
"total_redacted": 0,
"by_type": {key: 0 for key in self.patterns}
}
# Singleton instance
_dlp_filter: DLPFilter = None
def get_dlp_filter() -> DLPFilter:
"""Get the global DLP filter instance."""
global _dlp_filter
if _dlp_filter is None:
_dlp_filter = DLPFilter()
return _dlp_filter
def sanitize_text(text: str) -> str:
"""
Convenience function to sanitize text.
Args:
text: Text to sanitize
Returns:
Sanitized text
"""
return get_dlp_filter().sanitize(text)

View File

@ -0,0 +1,148 @@
"""
Secrets Manager for Arthur Agent.
Handles secure credential management with support for Docker Secrets
(production) and environment variables (development).
"""
import os
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger("ArthurSecrets")
# Default path for Docker Secrets
DOCKER_SECRETS_PATH = Path("/run/secrets")
class SecretsManager:
"""
Manages secure access to credentials.
Priority order:
1. Docker Secrets (production)
2. Environment Variables (development)
3. Default values (only for non-sensitive config)
"""
def __init__(self, secrets_path: Optional[Path] = None):
"""
Initialize the secrets manager.
Args:
secrets_path: Custom path to Docker secrets directory
"""
self._secrets_path = secrets_path or DOCKER_SECRETS_PATH
self._cache: dict[str, str] = {}
self._loaded = False
def _load_docker_secrets(self) -> None:
"""Load all secrets from Docker Secrets directory."""
if not self._secrets_path.exists():
logger.debug(f"Docker secrets path not found: {self._secrets_path}")
return
for secret_file in self._secrets_path.iterdir():
if secret_file.is_file():
try:
self._cache[secret_file.name.upper()] = (
secret_file.read_text().strip()
)
logger.debug(f"Loaded secret: {secret_file.name}")
except Exception as e:
logger.warning(f"Failed to load secret {secret_file.name}: {e}")
self._loaded = True
def get(
self,
key: str,
default: Optional[str] = None,
required: bool = False
) -> Optional[str]:
"""
Get a secret value.
Args:
key: Secret name (case-insensitive for Docker Secrets)
default: Default value if secret not found
required: If True, raises ValueError when secret is missing
Returns:
Secret value or default
Raises:
ValueError: If required secret is not found
"""
# Lazy load Docker Secrets
if not self._loaded:
self._load_docker_secrets()
key_upper = key.upper()
# Priority 1: Docker Secrets (cached)
if key_upper in self._cache:
return self._cache[key_upper]
# Priority 2: Environment Variables
env_value = os.getenv(key) or os.getenv(key_upper)
if env_value:
return env_value
# Not found
if required:
raise ValueError(
f"Required secret '{key}' not found in Docker Secrets "
f"or environment variables"
)
return default
def validate_required_secrets(self, secrets: list[str]) -> list[str]:
"""
Validate that all required secrets are available.
Args:
secrets: List of required secret names
Returns:
List of missing secrets (empty if all found)
"""
missing = []
for secret in secrets:
try:
self.get(secret, required=True)
except ValueError:
missing.append(secret)
return missing
@staticmethod
def get_required_secrets() -> list[str]:
"""
Get list of all secrets required by Arthur.
Returns:
List of secret names
"""
return [
"POSTGRES_USER",
"POSTGRES_PASSWORD",
"POSTGRES_DB",
"MAIL_PASSWORD",
# Zabbix API key is optional until configured
# "ZABBIX_API_KEY",
]
# Singleton instance
_secrets_manager: Optional[SecretsManager] = None
def get_secrets_manager() -> SecretsManager:
"""Get the global secrets manager instance."""
global _secrets_manager
if _secrets_manager is None:
_secrets_manager = SecretsManager()
return _secrets_manager

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
# Tests for Arthur Agent

154
tests/test_dlp.py Normal file
View File

@ -0,0 +1,154 @@
"""
Tests for DLP Filter (Data Loss Prevention).
Validates regex patterns for sensitive data redaction.
"""
import pytest
from src.security.dlp_filter import DLPFilter, sanitize_text
class TestDLPFilter:
"""Tests for DLP Filter functionality."""
@pytest.fixture
def dlp(self):
"""Create a fresh DLP filter for each test."""
return DLPFilter()
def test_password_redaction_simple(self, dlp):
"""Test simple password redaction."""
text = "My password is: secret123"
result = dlp.sanitize(text)
# The sensitive value should be removed
assert "secret123" not in result
# Some form of redaction should be present
assert "REDACTED" in result or "password" in result.lower()
def test_password_redaction_various_formats(self, dlp):
"""Test password redaction in various formats."""
cases = [
("password=mypass123", "mypass123"),
("senha: minhasenha456", "minhasenha456"),
("pwd: abc123def", "abc123def"),
('secret="topsecret789"', "topsecret789"),
]
for text, sensitive_value in cases:
result = dlp.sanitize(text)
# Original sensitive value should be gone
assert sensitive_value not in result, f"Value '{sensitive_value}' still in result"
def test_api_key_redaction(self, dlp):
"""Test API key redaction."""
text = "api_key=sk-proj-1234567890abcdefghij"
result = dlp.sanitize(text)
assert "sk-proj-1234567890abcdefghij" not in result
assert "[REDACTED]" in result
def test_cpf_redaction(self, dlp):
"""Test Brazilian CPF redaction."""
cases = [
"CPF: 123.456.789-00",
"cpf=12345678900",
"O CPF 123.456.789-00 está cadastrado",
]
for text in cases:
result = dlp.sanitize(text)
assert "[CPF_REDACTED]" in result
def test_cnpj_redaction(self, dlp):
"""Test Brazilian CNPJ redaction."""
cases = [
"CNPJ: 12.345.678/0001-90",
"cnpj=12345678000190",
]
for text in cases:
result = dlp.sanitize(text)
assert "[CNPJ_REDACTED]" in result
def test_credit_card_redaction(self, dlp):
"""Test credit card number redaction."""
cases = [
"Card: 4111-1111-1111-1111",
"Number: 4111 1111 1111 1111",
"Cartão: 4111111111111111",
]
for text in cases:
result = dlp.sanitize(text)
assert "[CARD_REDACTED]" in result
def test_email_partial_redaction(self, dlp):
"""Test partial email redaction (keep domain)."""
text = "Contact joao.silva@empresa.com.br for help"
result = dlp.sanitize(text)
assert "joao.silva" not in result
assert "empresa.com.br" in result # Domain kept
assert "[USER]@empresa.com.br" in result
def test_private_key_redaction(self, dlp):
"""Test SSH private key redaction."""
text = """
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA0Z...
-----END RSA PRIVATE KEY-----
"""
result = dlp.sanitize(text)
assert "MIIEpAIBAAKCAQEA0Z" not in result
assert "[KEY_REDACTED]" in result
def test_normal_text_unchanged(self, dlp):
"""Test that normal text is not modified."""
text = "Hello, this is a normal support message about server performance."
result = dlp.sanitize(text)
assert result == text
def test_mixed_content(self, dlp):
"""Test text with both sensitive and normal content."""
text = """
Olá, preciso de ajuda com o servidor srv-app01.
senha: admin123
O CPF do responsável é 123.456.789-00
Por favor, me ajudem!
"""
result = dlp.sanitize(text)
# Normal content preserved
assert "srv-app01" in result
assert "Por favor" in result
# Sensitive content redacted
assert "admin123" not in result
assert "[CPF_REDACTED]" in result
def test_statistics_tracking(self, dlp):
"""Test that statistics are tracked correctly."""
dlp.sanitize("password=test123")
dlp.sanitize("CPF: 123.456.789-00")
dlp.sanitize("Normal text")
stats = dlp.get_stats()
assert stats["total_processed"] == 3
assert stats["total_redacted"] >= 2
def test_convenience_function(self):
"""Test the sanitize_text convenience function."""
result = sanitize_text("password=secret")
assert "secret" not in result
assert "[REDACTED]" in result
def test_empty_input(self, dlp):
"""Test handling of empty input."""
assert dlp.sanitize("") == ""
assert dlp.sanitize(None) is None

View File

@ -0,0 +1,128 @@
"""
Tests for Mock Financial Client.
Validates tenant resolution from email domains.
"""
import pytest
from src.clients.mock_financial import MockFinancialClient, get_financial_client
from src.models.tenant import TenantStatus
class TestMockFinancialClient:
"""Tests for MockFinancialClient."""
@pytest.fixture
def client(self):
"""Create a mock financial client."""
return MockFinancialClient()
@pytest.mark.asyncio
async def test_resolve_known_tenant_oestepan(self, client):
"""Test resolving OESTEPAN tenant from email."""
tenant = await client.resolve_tenant_from_email("joao@oestepan.com.br")
assert tenant is not None
assert tenant.id == "tenant_oestepan"
assert tenant.name == "OESTEPAN Ltda"
assert tenant.status == TenantStatus.ACTIVE
@pytest.mark.asyncio
async def test_resolve_known_tenant_enseg(self, client):
"""Test resolving ENSEG tenant from email."""
tenant = await client.resolve_tenant_from_email("admin@enseg.com.br")
assert tenant is not None
assert tenant.id == "tenant_enseg"
assert tenant.name == "ENSEG Corretora de Seguros"
@pytest.mark.asyncio
async def test_resolve_known_tenant_itguys(self, client):
"""Test resolving internal iT Guys tenant."""
tenant = await client.resolve_tenant_from_email("arthur@itguys.com.br")
assert tenant is not None
assert tenant.id == "tenant_itguys"
assert tenant.rate_limit_per_hour == 50 # Higher for internal
@pytest.mark.asyncio
async def test_resolve_unknown_domain(self, client):
"""Test that unknown domains return None."""
tenant = await client.resolve_tenant_from_email("user@unknown-company.com")
assert tenant is None
@pytest.mark.asyncio
async def test_resolve_inactive_tenant(self, client):
"""Test that inactive tenants are not resolved."""
tenant = await client.resolve_tenant_from_email("user@demo-cliente.com.br")
assert tenant is None # Inactive tenant should not be returned
@pytest.mark.asyncio
async def test_resolve_invalid_email(self, client):
"""Test handling of invalid email format."""
tenant = await client.resolve_tenant_from_email("not-an-email")
assert tenant is None
@pytest.mark.asyncio
async def test_get_tenant_by_id(self, client):
"""Test getting tenant by ID."""
tenant = await client.get_tenant_by_id("tenant_oestepan")
assert tenant is not None
assert tenant.id == "tenant_oestepan"
@pytest.mark.asyncio
async def test_get_nonexistent_tenant_by_id(self, client):
"""Test getting nonexistent tenant by ID."""
tenant = await client.get_tenant_by_id("tenant_does_not_exist")
assert tenant is None
@pytest.mark.asyncio
async def test_list_active_tenants(self, client):
"""Test listing all active tenants."""
tenants = await client.list_active_tenants()
assert len(tenants) >= 3 # At least oestepan, enseg, itguys
# All returned tenants should be active
for tenant in tenants:
assert tenant.status == TenantStatus.ACTIVE
# Demo tenant (inactive) should not be in list
tenant_ids = [t.id for t in tenants]
assert "tenant_demo" not in tenant_ids
@pytest.mark.asyncio
async def test_domain_case_insensitive(self, client):
"""Test that domain matching is case-insensitive."""
tenant1 = await client.get_tenant_by_email_domain("OESTEPAN.COM.BR")
tenant2 = await client.get_tenant_by_email_domain("oestepan.com.br")
assert tenant1 is not None
assert tenant2 is not None
assert tenant1.id == tenant2.id
@pytest.mark.asyncio
async def test_multiple_domains_same_tenant(self, client):
"""Test tenant with multiple email domains."""
tenant1 = await client.get_tenant_by_email_domain("enseg.com.br")
tenant2 = await client.get_tenant_by_email_domain("enseg.net.br")
assert tenant1 is not None
assert tenant2 is not None
assert tenant1.id == tenant2.id == "tenant_enseg"
class TestFinancialClientFactory:
"""Tests for the financial client factory function."""
def test_get_financial_client_returns_mock(self):
"""Test that factory returns mock client."""
client = get_financial_client()
assert isinstance(client, MockFinancialClient)

171
tests/test_models.py Normal file
View File

@ -0,0 +1,171 @@
"""
Tests for Pydantic Models (TenantContext, AuditLog).
Validates schema correctness and edge cases.
"""
import pytest
from datetime import datetime
from src.models.tenant import TenantContext, TenantStatus
from src.models.audit import AuditLog, ResolutionStatus, TicketContext
class TestTenantContext:
"""Tests for TenantContext model."""
def test_create_valid_tenant(self):
"""Test creating a valid tenant context."""
tenant = TenantContext(
id="tenant_test",
name="Test Company",
email_domains=["test.com.br", "test.net"],
status=TenantStatus.ACTIVE
)
assert tenant.id == "tenant_test"
assert tenant.name == "Test Company"
assert len(tenant.email_domains) == 2
assert tenant.status == TenantStatus.ACTIVE
assert tenant.rate_limit_per_hour == 10 # default
def test_tenant_with_all_fields(self):
"""Test tenant with all optional fields populated."""
tenant = TenantContext(
id="tenant_full",
name="Full Company",
email_domains=["full.com.br"],
status=TenantStatus.ACTIVE,
zabbix_host_group="FULL-Infrastructure",
qdrant_collection="full_knowledge",
rate_limit_per_hour=50,
created_at=datetime(2025, 1, 1, 10, 0, 0)
)
assert tenant.zabbix_host_group == "FULL-Infrastructure"
assert tenant.qdrant_collection == "full_knowledge"
assert tenant.rate_limit_per_hour == 50
def test_tenant_status_enum(self):
"""Test different tenant status values."""
active = TenantContext(
id="t1", name="Active", email_domains=["a.com"],
status=TenantStatus.ACTIVE
)
inactive = TenantContext(
id="t2", name="Inactive", email_domains=["i.com"],
status=TenantStatus.INACTIVE
)
suspended = TenantContext(
id="t3", name="Suspended", email_domains=["s.com"],
status=TenantStatus.SUSPENDED
)
assert active.status == TenantStatus.ACTIVE
assert inactive.status == TenantStatus.INACTIVE
assert suspended.status == TenantStatus.SUSPENDED
def test_tenant_serialization(self):
"""Test JSON serialization of tenant."""
tenant = TenantContext(
id="tenant_json",
name="JSON Test",
email_domains=["json.com"]
)
json_data = tenant.model_dump_json()
assert "tenant_json" in json_data
assert "JSON Test" in json_data
class TestAuditLog:
"""Tests for AuditLog model."""
def test_create_minimal_audit_log(self):
"""Test creating audit log with required fields only."""
log = AuditLog(
ticket_id="TKT-001",
tenant_id="tenant_test",
sender_email="user@test.com",
subject="Test Subject",
original_message="Test message content"
)
assert log.ticket_id == "TKT-001"
assert log.tenant_id == "tenant_test"
assert log.resolution_status == ResolutionStatus.PENDING
assert log.tools_called == []
assert log.created_at is not None
def test_create_full_audit_log(self):
"""Test creating audit log with all fields."""
context = TicketContext(
zabbix_data={"host": "srv-app01", "status": "down"},
rag_context=["Manual de troubleshooting"],
historical_tickets=[{"id": "TKT-000", "status": "resolved"}],
extracted_entities={"technology": "Windows Server", "problem": "Offline"}
)
log = AuditLog(
ticket_id="TKT-002",
tenant_id="tenant_full",
sender_email="admin@full.com",
subject="Server Down",
original_message="O servidor está offline desde às 10h",
context_collected=context,
triage_model_output="Classificado como: Infraestrutura",
specialist_model_reasoning="Análise indica falha de rede...",
response_sent="Prezado, identificamos o problema...",
tools_called=["zabbix_get_host", "rag_search"],
resolution_status=ResolutionStatus.RESOLVED,
processing_time_ms=2500
)
assert log.context_collected.zabbix_data["status"] == "down"
assert "zabbix_get_host" in log.tools_called
assert log.processing_time_ms == 2500
def test_resolution_status_transitions(self):
"""Test different resolution statuses."""
statuses = [
ResolutionStatus.PENDING,
ResolutionStatus.IN_PROGRESS,
ResolutionStatus.RESOLVED,
ResolutionStatus.ESCALATED,
ResolutionStatus.REOPENED,
ResolutionStatus.FAILED
]
for status in statuses:
log = AuditLog(
ticket_id=f"TKT-{status.value}",
tenant_id="tenant_test",
sender_email="test@test.com",
subject="Status Test",
original_message="Test",
resolution_status=status
)
assert log.resolution_status == status
class TestTicketContext:
"""Tests for TicketContext model."""
def test_empty_context(self):
"""Test creating empty ticket context."""
context = TicketContext()
assert context.zabbix_data is None
assert context.rag_context is None
assert context.historical_tickets is None
def test_partial_context(self):
"""Test creating context with some fields."""
context = TicketContext(
zabbix_data={"host": "srv-01"},
extracted_entities={"technology": "Linux"}
)
assert context.zabbix_data["host"] == "srv-01"
assert context.rag_context is None
assert context.extracted_entities["technology"] == "Linux"

141
tests/test_secrets.py Normal file
View File

@ -0,0 +1,141 @@
"""
Tests for SecretsManager.
Validates Docker Secrets and environment variable handling.
"""
import os
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from src.security.secrets_manager import SecretsManager
class TestSecretsManager:
"""Tests for SecretsManager functionality."""
@pytest.fixture
def temp_secrets_dir(self, tmp_path):
"""Create a temporary secrets directory with test files."""
secrets_dir = tmp_path / "secrets"
secrets_dir.mkdir()
# Create some test secret files
(secrets_dir / "POSTGRES_PASSWORD").write_text("test_password_123")
(secrets_dir / "api_key").write_text("sk-test-key-abc")
return secrets_dir
def test_load_docker_secrets(self, temp_secrets_dir):
"""Test loading secrets from Docker Secrets directory."""
manager = SecretsManager(secrets_path=temp_secrets_dir)
password = manager.get("POSTGRES_PASSWORD")
assert password == "test_password_123"
def test_secret_case_insensitive(self, temp_secrets_dir):
"""Test that secret names are case-insensitive."""
manager = SecretsManager(secrets_path=temp_secrets_dir)
# File is 'api_key' but accessed as 'API_KEY'
key = manager.get("API_KEY")
assert key == "sk-test-key-abc"
def test_fallback_to_environment(self):
"""Test fallback to environment variables."""
manager = SecretsManager(secrets_path=Path("/nonexistent"))
with patch.dict(os.environ, {"TEST_SECRET": "env_value"}):
value = manager.get("TEST_SECRET")
assert value == "env_value"
def test_return_default_when_not_found(self):
"""Test returning default value when secret not found."""
manager = SecretsManager(secrets_path=Path("/nonexistent"))
value = manager.get("NONEXISTENT_SECRET", default="default_value")
assert value == "default_value"
def test_required_secret_raises_error(self):
"""Test that required secrets raise ValueError when missing."""
manager = SecretsManager(secrets_path=Path("/nonexistent"))
with pytest.raises(ValueError) as exc_info:
manager.get("MISSING_SECRET", required=True)
assert "MISSING_SECRET" in str(exc_info.value)
def test_validate_required_secrets(self, temp_secrets_dir):
"""Test validating multiple required secrets."""
manager = SecretsManager(secrets_path=temp_secrets_dir)
# POSTGRES_PASSWORD exists, MISSING_ONE doesn't
missing = manager.validate_required_secrets([
"POSTGRES_PASSWORD",
"MISSING_ONE",
"MISSING_TWO"
])
assert "POSTGRES_PASSWORD" not in missing
assert "MISSING_ONE" in missing
assert "MISSING_TWO" in missing
def test_get_required_secrets_list(self):
"""Test getting the list of required secrets."""
required = SecretsManager.get_required_secrets()
assert "POSTGRES_USER" in required
assert "POSTGRES_PASSWORD" in required
assert "POSTGRES_DB" in required
assert "MAIL_PASSWORD" in required
def test_docker_secrets_priority_over_env(self, temp_secrets_dir):
"""Test that Docker Secrets take priority over environment."""
manager = SecretsManager(secrets_path=temp_secrets_dir)
# Set both Docker Secret and env var
with patch.dict(os.environ, {"POSTGRES_PASSWORD": "env_password"}):
password = manager.get("POSTGRES_PASSWORD")
# Should use Docker Secret, not env var
assert password == "test_password_123"
def test_caching(self, temp_secrets_dir):
"""Test that secrets are cached after first load."""
manager = SecretsManager(secrets_path=temp_secrets_dir)
# First access loads secrets
manager.get("POSTGRES_PASSWORD")
assert manager._loaded is True
# Modify the file (would not be picked up due to caching)
(temp_secrets_dir / "POSTGRES_PASSWORD").write_text("new_password")
# Should still return cached value
password = manager.get("POSTGRES_PASSWORD")
assert password == "test_password_123"
def test_empty_secrets_directory(self, tmp_path):
"""Test handling of empty secrets directory."""
empty_dir = tmp_path / "empty_secrets"
empty_dir.mkdir()
manager = SecretsManager(secrets_path=empty_dir)
# Should not raise, just return None/default
value = manager.get("ANY_SECRET", default="fallback")
assert value == "fallback"
def test_strip_whitespace_from_secrets(self, temp_secrets_dir):
"""Test that whitespace is stripped from secret values."""
(temp_secrets_dir / "WHITESPACE_SECRET").write_text(" value_with_spaces \n")
manager = SecretsManager(secrets_path=temp_secrets_dir)
value = manager.get("WHITESPACE_SECRET")
assert value == "value_with_spaces"