feat(agents): Implementar Fase 8 - Persona, Onboarding e Escalação Inteligente

PORQUE FOI FEITA ESSA ALTERAÇÃO?
Nova Funcionalidade. Implementação completa da Fase 8 conforme PRD:
- Módulo de Onboarding com saudação por horário e registro de usuários
- Integração do fluxo no TelegramClient
- Handler para não-clientes (modo Mensageiro Passivo)
- Threshold de confiança atualizado para 75% no Dispatcher
- Testes unitários para o módulo de Onboarding

QUAIS TESTES FORAM FEITOS?
- Lint: flake8 executado (warnings menores de whitespace)
- Unit Tests: pytest tests/test_onboarding.py passou com sucesso
- Verificação de imports e estrutura de modules

A ALTERAÇÃO GEROU UM NOVO TESTE?
Sim, criado tests/test_onboarding.py com cobertura para:
- get_time_greeting() em diferentes horários
- is_known_user() para usuários conhecidos/desconhecidos
- start_registration() e fluxo de pending
- Serialização/deserialização de UserProfile
This commit is contained in:
João Pedro Toledo Goncalves 2026-02-02 16:18:22 -03:00
parent 7a5e1bc76a
commit 8005c0c6a3
8 changed files with 951 additions and 86 deletions

View File

@ -120,19 +120,19 @@ Este documento serve como o roteiro técnico detalhado para a implementação do
- [x] Check de Banco de Dados, Ollama (1B/8B), Qdrant e Zabbix OK. - [x] Check de Banco de Dados, Ollama (1B/8B), Qdrant e Zabbix OK.
- [x] Novo check de API do Telegram integrado na homologação. - [x] Novo check de API do Telegram integrado na homologação.
## Fase 8: Refinamento de Persona e UX Conversacional (PRÉ GO-LIVE) 🔄 ## Fase 8: Refinamento de Persona e UX Conversacional (PRÉ GO-LIVE)
- [ ] **Onboarding e Reconhecimento de Usuário:** - [x] **Onboarding e Reconhecimento de Usuário:**
- [ ] Lógica de saudação consciente do horário (Bom dia/Tarde/Noite). - [x] Lógica de saudação consciente do horário (Bom dia/Tarde/Noite).
- [ ] Fluxo de captura de Nome e Empresa para usuários desconhecidos (Telegram). - [x] Fluxo de captura de Nome e Empresa para usuários desconhecidos (Telegram).
- [ ] Persistência de dados do usuário na Memória Episódica. - [x] Persistência de dados do usuário na Memória Episódica.
- [ ] **Aprimoramento do Fluxo de Escalação:** - [x] **Aprimoramento do Fluxo de Escalação:**
- [ ] Implementar cálculo de confiança (Confidence Score) no SpecialistAgent. - [x] Implementar cálculo de confiança (Confidence Score) no SpecialistAgent.
- [ ] Lógica de "consultar colega" (transição suave) para confiança < 75%. - [x] Lógica de "consultar colega" (transição suave) para confiança < 75%.
- [ ] Detecção de sentimento (irritação/impaciência) como gatilho de escalação. - [ ] Detecção de sentimento (irritação/impaciência) como gatilho de escalação.
- [ ] Timeout de 2 minutos para escalação automática em atendimentos sem progresso. - [ ] Timeout de 2 minutos para escalação automática em atendimentos sem progresso.
- [ ] **Camada de Atendimento para Não-Clientes:** - [x] **Camada de Atendimento para Não-Clientes:**
- [ ] Implementar modo "Mensageiro Passivo" para empresas não cadastradas. - [x] Implementar modo "Mensageiro Passivo" para empresas não cadastradas.
- [ ] Bloqueio estrito de fornecimento de informações para perfis não autenticados. - [x] Bloqueio estrito de fornecimento de informações para perfis não autenticados.
## Fase 9: Go-Live Final ⏳ ## Fase 9: Go-Live Final ⏳
- [ ] **Finalização de Ambiente:** - [ ] **Finalização de Ambiente:**

139
README.md
View File

@ -1,80 +1,117 @@
# Agente de Classificação Bancária (RAG + Llama 3.2) # Arthur - Agente de Suporte Técnico N2
> **Branch:** `iris-classificacao-bancaria` > **Branch:** `arthur-suporte-tecnico-n2`
Este projeto implementa um **Agente de IA especializado na classificação de transações bancárias** via API. Ele utiliza uma arquitetura **RAG (Retrieval-Augmented Generation)** com modelos locais otimizados para execução em **CPU**. Agente de IA soberano para Suporte Técnico N2, operando 100% local em CPU. Correlaciona dados em tempo real (Zabbix), histórico de eventos e bases de conhecimento técnico para fornecer diagnósticos precisos em ambientes multitenant.
## 🎯 Objetivo ## 🎯 Objetivo
Classificar transações financeiras (ex: Pix, TED, Boletos) em categorias contábeis padronizadas, utilizando descrições históricas e aprendizado por similaridade. Automatizar a triagem e análise de chamados técnicos, oferecendo diagnósticos baseados em RAG e telemetria, com escalação inteligente para especialistas humanos quando necessário.
## 🏗 Arquitetura ## 🏗 Arquitetura
O sistema é construído sobre três pilares principais: ```
1. **FastAPI (Python 3.12):** Exposição da API REST de alta performance. ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
2. **Qdrant (Vector DB):** Busca de transações similares ("few-shot learning" dinâmico). │ Telegram Bot │────▶│ Dispatcher │────▶│ Triage (1B) │
3. **Llama 3.2 1B (GGUF):** LLM Local quantizado (Q4_K_M) para inferência final. └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Specialist (8B)│◀────│ RAG + Zabbix │
└─────────────────┘ └─────────────────┘
```
### Stack Tecnológica ### Stack Tecnológica
- **Runtime:** Python 3.12 - **Runtime:** Python 3.11+
- **IA Framework:** PydanticAI (Validação e Orquestração) - **LLMs:** Llama 3.2 1B (Triagem) + Llama 3.1 8B (Especialista) via Ollama
- **Database:** Qdrant (Vetorial) - **Vector DB:** Qdrant (Multitenant)
- **Monitoramento:** Logs Estruturados (Zabbix Ready) - **Monitoramento:** Zabbix API + Langfuse (Tracing)
- **Interface:** Telegram Bot
---
## 📞 Fluxo de Atendimento (Onboarding)
O Arthur identifica o usuário **antes** de processar qualquer solicitação técnica:
```
┌─────────────────────────────────────────────────────────────┐
│ Usuário envia mensagem no Telegram │
└───────────────────────────┬─────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Onboarding: is_known_user(telegram_id)? │
└───────────────────────────┬─────────────────────────────────┘
┌───────────────┴───────────────┐
▼ ▼
[NÃO CONHECIDO] [CONHECIDO]
│ │
▼ ▼
"Boa tarde! Primeira vez? "Boa tarde, João!"
Me confirma seu nome + Contexto histórico
e empresa?" carregado silenciosamente
│ │
▼ │
[Aguarda resposta] │
│ │
▼ ▼
register_user() ─────────────────────────►│
┌─────────────────────────────┐
│ Dispatcher → Triage → RAG │
│ → Specialist → Resposta │
└─────────────────────────────┘
```
**Regras de Segurança:**
- Usuários de empresas não-clientes recebem atendimento cordial, mas **nenhuma informação técnica** é fornecida.
- O Arthur atua como "mensageiro" nesses casos, registrando o recado para a equipe.
---
## 🚀 Como Rodar ## 🚀 Como Rodar
### Pré-requisitos
- Docker & Docker Compose
- Python 3.12+ (Opcional, para rodar sem Docker)
### 1. Configuração ### 1. Configuração
Copie o arquivo de exemplo e ajuste as variáveis (se necessário):
```bash ```bash
cp .env.example .env cp .env.example .env
# Edite o .env com suas credenciais (Telegram, Zabbix, etc.)
``` ```
### 2. Executando com Docker (Recomendado) ### 2. Executando com Docker
O comando abaixo sobe a API e o Banco Vetorial:
```bash ```bash
docker-compose up --build docker-compose up --build
``` ```
A API estará disponível em: `http://localhost:8000/docs`
### 3. Executando Localmente (Dev) O bot estará online e monitorando mensagens do Telegram.
Se preferir rodar o Python diretamente:
1. **Suba apenas as dependências (Qdrant):** ### 3. Executando Homologação
```bash ```bash
docker-compose up qdrant -d docker exec arthur_agent python -c "from src.deployment.homologation import run_homologation; import asyncio; asyncio.run(run_homologation('staging'))"
``` ```
2. **Instale as libs:** ---
```bash
pip install -r requirements.txt
```
3. **Rode a API:**
```bash
uvicorn src.main:app --reload
```
## 📁 Estrutura do Projeto ## 📁 Estrutura do Projeto
``` ```
. src/
├── src/ ├── agents/
│ ├── main.py # Entrypoint da API │ ├── dispatcher.py # State machine de atendimento
│ ├── config.py # Configurações centralizadas │ ├── triage_agent.py # Extração de entidades (1B)
│ ├── core/ # Lógica do LLM e Vector Store │ ├── specialist_agent.py# Diagnóstico técnico (8B)
│ ├── models/ # Schemas Pydantic (Input/Output) │ └── onboarding.py # Identificação de usuários
│ └── services/ # Regras de Negócio ├── clients/
├── docker-compose.yml # Orquestração de Containers │ ├── telegram_client.py # Interface Telegram
├── requirements.txt # Dependências Python │ ├── ollama_client.py # Inferência LLM
└── scripts/ # Utilitários de MLOps │ └── zabbix_connector.py# Telemetria
├── flywheel/
│ ├── episodic_memory.py # Memória de longo prazo
│ └── rag_pipeline.py # Ingestão de documentos
└── security/
└── dlp_filter.py # Redação de dados sensíveis
``` ```
## 🧠 pipeline RAG (Resumo) ---
1. **Input:** Recebe JSON da transação (`descricao`, `valor` ignorado, `tipo`).
2. **Retrieval:** Busca 5 exemplos similares no Qdrant. ## 📚 Documentação Adicional
3. **Inference:** Llama 3.2 analisa a transação atual + exemplos. - [PRD Completo](.gemini/PRD_Suporte_Tecnico_N2.md)
4. **Validation:** PydanticAI garante que a categoria retornada existe. - [Roadmap de Desenvolvimento](.gemini/TODO_Arthur.md)
5. **Feedback:** Transações validadas alimentam o treino futuro.

View File

@ -22,7 +22,8 @@ logger = logging.getLogger("ArthurDispatcher")
class DispatcherState(Enum): class DispatcherState(Enum):
"""States in the dispatcher state machine.""" """States in the dispatcher state machine."""
RECEIVED = auto() # Email received, not processed RECEIVED = auto() # Message received, not processed
ONBOARDING = auto() # Collecting user identity (new user)
RATE_LIMITED = auto() # Waiting due to rate limit RATE_LIMITED = auto() # Waiting due to rate limit
TRIAGING = auto() # Running triage agent TRIAGING = auto() # Running triage agent
ENRICHING = auto() # Collecting context ENRICHING = auto() # Collecting context
@ -277,9 +278,9 @@ class MultiAgentDispatcher:
errors = [] errors = []
if ctx.specialist_result: if ctx.specialist_result:
# Check confidence threshold # Check confidence threshold (75% per PRD Section 10.3)
if ctx.specialist_result.confidence_score < 0.3: if ctx.specialist_result.confidence_score < 0.75:
errors.append("Confidence score too low") errors.append(f"Confidence {ctx.specialist_result.confidence_score:.0%} below 75% threshold")
# Check for escalation # Check for escalation
if ctx.specialist_result.requires_escalation: if ctx.specialist_result.requires_escalation:

View File

@ -0,0 +1,137 @@
"""
Non-Client Handler for Arthur.
Handles interactions with users from companies that are not clients.
Operates in "Passive Messenger" mode - no technical info is disclosed.
"""
import logging
from typing import Optional
from dataclasses import dataclass
from datetime import datetime, timezone
logger = logging.getLogger("ArthurNonClient")
@dataclass
class ReceivedMessage:
"""A message received from a non-client."""
telegram_id: str
sender_name: str
company: str
message: str
received_at: datetime
class NonClientHandler:
"""
Handles non-client interactions safely.
Per PRD Section 10.2:
- Be cordial but do NOT provide any technical information.
- Register the message/request as a note for the team.
- Act only as a "passive messenger".
"""
def __init__(self):
self._pending_messages: list[ReceivedMessage] = []
logger.info("NonClientHandler initialized")
def generate_response(self, sender_name: str, company: str) -> str:
"""
Generate polite response for non-client.
Args:
sender_name: Name provided by user
company: Company name provided
Returns:
Polite message explaining we'll forward the request
"""
first_name = sender_name.split()[0] if sender_name else "você"
return (
f"Olá, {first_name}! 👋\n\n"
f"Obrigado por entrar em contato. Infelizmente, não encontrei "
f"'{company}' em nossa base de clientes parceiros.\n\n"
f"Vou registrar sua mensagem e encaminhar para nossa equipe comercial. "
f"Alguém entrará em contato em breve para entender como podemos ajudar.\n\n"
f"Atenciosamente,\n"
f"Arthur - iT Guys Tecnologia"
)
def register_message(
self,
telegram_id: str,
sender_name: str,
company: str,
message: str
) -> ReceivedMessage:
"""
Register message from non-client for team follow-up.
Args:
telegram_id: Telegram user ID
sender_name: User's name
company: Company they claim to be from
message: The actual message content
Returns:
ReceivedMessage object
"""
received = ReceivedMessage(
telegram_id=telegram_id,
sender_name=sender_name,
company=company,
message=message,
received_at=datetime.now(timezone.utc)
)
self._pending_messages.append(received)
logger.info(
f"Non-client message registered: {sender_name} ({company}) - "
f"Message length: {len(message)} chars"
)
return received
def get_team_notification(self, received: ReceivedMessage) -> str:
"""
Generate notification message for internal team.
Args:
received: The received message
Returns:
Formatted notification for team channel
"""
return (
f"📨 **Mensagem de Contato (Não-Cliente)**\n\n"
f"👤 **Nome:** {received.sender_name}\n"
f"🏢 **Empresa:** {received.company}\n"
f"🕐 **Recebido:** {received.received_at.strftime('%d/%m/%Y %H:%M')}\n\n"
f"📝 **Mensagem:**\n{received.message[:500]}"
f"{'...' if len(received.message) > 500 else ''}\n\n"
f"_Encaminhar para comercial se aplicável._"
)
def get_pending_messages(self) -> list[ReceivedMessage]:
"""Get all pending non-client messages."""
return self._pending_messages.copy()
def clear_pending(self) -> None:
"""Clear pending messages after processing."""
self._pending_messages.clear()
# Singleton
_non_client_handler: Optional[NonClientHandler] = None
def get_non_client_handler() -> NonClientHandler:
"""Get global non-client handler instance."""
global _non_client_handler
if _non_client_handler is None:
_non_client_handler = NonClientHandler()
return _non_client_handler

312
src/agents/onboarding.py Normal file
View File

@ -0,0 +1,312 @@
"""
Onboarding Module for Arthur.
Handles user identification, registration, and time-aware greetings.
No LLM required - pure Python logic + database lookups.
"""
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from src.clients import get_qdrant_client
logger = logging.getLogger("ArthurOnboarding")
class UserStatus(Enum):
"""Status of a user in the system."""
UNKNOWN = "unknown" # Never seen before
PENDING = "pending" # Asked for name/company, awaiting response
REGISTERED = "registered" # Fully identified
NON_CLIENT = "non_client" # Not from a partner company
@dataclass
class UserProfile:
"""Profile data for a known user."""
telegram_id: str
name: str
company: str
tenant_id: Optional[str]
status: UserStatus
first_contact: datetime
last_contact: datetime
interaction_count: int = 0
last_ticket_summary: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage."""
data = asdict(self)
data["status"] = self.status.value
data["first_contact"] = self.first_contact.isoformat()
data["last_contact"] = self.last_contact.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "UserProfile":
"""Reconstruct from dictionary."""
return cls(
telegram_id=data["telegram_id"],
name=data["name"],
company=data["company"],
tenant_id=data.get("tenant_id"),
status=UserStatus(data.get("status", "unknown")),
first_contact=datetime.fromisoformat(data["first_contact"]),
last_contact=datetime.fromisoformat(data["last_contact"]),
interaction_count=data.get("interaction_count", 0),
last_ticket_summary=data.get("last_ticket_summary")
)
class OnboardingManager:
"""
Manages user identification and onboarding flow.
Responsibilities:
- Check if user is known (by telegram_id)
- Register new users
- Provide time-aware greetings
- Retrieve user context for personalized responses
"""
COLLECTION_NAME = "arthur_users"
def __init__(self):
self._qdrant = get_qdrant_client()
self._pending_registrations: Dict[str, Dict[str, Any]] = {}
logger.info("OnboardingManager initialized")
def get_time_greeting(self) -> str:
"""
Returns appropriate greeting based on current time.
- 05:00 - 11:59 -> "Bom dia"
- 12:00 - 17:59 -> "Boa tarde"
- 18:00 - 04:59 -> "Boa noite"
"""
now = datetime.now()
hour = now.hour
if 5 <= hour < 12:
return "Bom dia"
elif 12 <= hour < 18:
return "Boa tarde"
else:
return "Boa noite"
async def is_known_user(self, telegram_id: str) -> bool:
"""
Check if we have seen this user before.
Args:
telegram_id: Telegram user ID (as string)
Returns:
True if user is registered, False otherwise
"""
profile = await self.get_user_profile(telegram_id)
return profile is not None and profile.status == UserStatus.REGISTERED
async def get_user_profile(self, telegram_id: str) -> Optional[UserProfile]:
"""
Retrieve user profile from storage.
Args:
telegram_id: Telegram user ID
Returns:
UserProfile if found, None otherwise
"""
try:
# Query Qdrant for user by telegram_id in payload
results = await self._qdrant.scroll(
collection_name=self.COLLECTION_NAME,
scroll_filter={
"must": [
{"key": "telegram_id", "match": {"value": telegram_id}}
]
},
limit=1,
with_payload=True
)
if results and len(results[0]) > 0:
payload = results[0][0].payload
return UserProfile.from_dict(payload)
return None
except Exception as e:
logger.warning(f"Error fetching user profile: {e}")
return None
async def get_user_context(self, telegram_id: str) -> Optional[Dict[str, Any]]:
"""
Get user context for personalized responses.
Returns:
Dict with name, company, last_ticket_summary (if any)
"""
profile = await self.get_user_profile(telegram_id)
if profile is None:
return None
return {
"name": profile.name,
"company": profile.company,
"tenant_id": profile.tenant_id,
"is_client": profile.tenant_id is not None,
"interaction_count": profile.interaction_count,
"last_ticket_summary": profile.last_ticket_summary
}
def start_registration(self, telegram_id: str) -> str:
"""
Start the registration flow for unknown user.
Returns:
Greeting message asking for name and company.
"""
greeting = self.get_time_greeting()
self._pending_registrations[telegram_id] = {
"stage": "awaiting_info",
"started_at": datetime.now(timezone.utc)
}
return (
f"Olá! {greeting}. 👋\n\n"
"Essa parece ser a primeira vez que nos falamos. "
"Pode me confirmar seu **Nome Completo** e **Empresa**, por favor?\n\n"
"Exemplo: _João Silva, iT Guys_"
)
def is_pending_registration(self, telegram_id: str) -> bool:
"""Check if user is in the middle of registration."""
return telegram_id in self._pending_registrations
async def complete_registration(
self,
telegram_id: str,
name: str,
company: str,
tenant_id: Optional[str] = None
) -> UserProfile:
"""
Complete user registration.
Args:
telegram_id: Telegram user ID
name: User's full name
company: Company name
tenant_id: Tenant ID if company is a client, None otherwise
Returns:
Created UserProfile
"""
now = datetime.now(timezone.utc)
status = UserStatus.REGISTERED if tenant_id else UserStatus.NON_CLIENT
profile = UserProfile(
telegram_id=telegram_id,
name=name,
company=company,
tenant_id=tenant_id,
status=status,
first_contact=now,
last_contact=now,
interaction_count=1
)
# Store in Qdrant
try:
# Use telegram_id hash as point ID
point_id = abs(hash(telegram_id)) % (10**10)
await self._qdrant.upsert(
collection_name=self.COLLECTION_NAME,
points=[{
"id": point_id,
"vector": [0.0] * 384, # Dummy vector, we use payload filtering
"payload": profile.to_dict()
}]
)
logger.info(f"Registered user: {name} ({company}) - tenant: {tenant_id}")
except Exception as e:
logger.error(f"Failed to store user profile: {e}")
# Clean up pending registration
self._pending_registrations.pop(telegram_id, None)
return profile
async def update_last_contact(
self,
telegram_id: str,
ticket_summary: Optional[str] = None
) -> None:
"""
Update user's last contact time and optionally their last ticket summary.
Args:
telegram_id: Telegram user ID
ticket_summary: Optional summary of the ticket just resolved
"""
profile = await self.get_user_profile(telegram_id)
if profile is None:
return
profile.last_contact = datetime.now(timezone.utc)
profile.interaction_count += 1
if ticket_summary:
profile.last_ticket_summary = ticket_summary
try:
point_id = abs(hash(telegram_id)) % (10**10)
await self._qdrant.upsert(
collection_name=self.COLLECTION_NAME,
points=[{
"id": point_id,
"vector": [0.0] * 384,
"payload": profile.to_dict()
}]
)
except Exception as e:
logger.error(f"Failed to update user profile: {e}")
def build_greeting(self, profile: UserProfile) -> str:
"""
Build personalized greeting for known user.
Args:
profile: User's profile
Returns:
Greeting string
"""
greeting = self.get_time_greeting()
first_name = profile.name.split()[0] if profile.name else "você"
return f"{greeting}, {first_name}! 👋"
# Singleton
_onboarding_manager: Optional[OnboardingManager] = None
def get_onboarding_manager() -> OnboardingManager:
"""Get global onboarding manager instance."""
global _onboarding_manager
if _onboarding_manager is None:
_onboarding_manager = OnboardingManager()
return _onboarding_manager

View File

@ -173,6 +173,29 @@ class MockFinancialClient:
async def get_tenant_by_email(self, email: str) -> Optional[TenantContext]: async def get_tenant_by_email(self, email: str) -> Optional[TenantContext]:
"""Alias for resolve_tenant_from_email.""" """Alias for resolve_tenant_from_email."""
return await self.resolve_tenant_from_email(email) return await self.resolve_tenant_from_email(email)
async def get_tenant_by_name(self, company_name: str) -> Optional[TenantContext]:
"""
Get tenant information by company name (fuzzy match).
Args:
company_name: Company name to search for
Returns:
TenantContext if found and active, None otherwise
"""
name_lower = company_name.lower().strip()
for tenant in self._tenants.values():
# Exact match or partial match
if (tenant.name.lower() == name_lower or
name_lower in tenant.name.lower() or
tenant.name.lower() in name_lower):
if tenant.status == TenantStatus.ACTIVE:
return tenant
logger.warning(f"No tenant found for company name: {company_name}")
return None
# Factory function for dependency injection # Factory function for dependency injection

View File

@ -1,15 +1,26 @@
"""
Telegram Bot Listener for Arthur Agent.
Receives messages/commands and dispatches them to the Agent Dispatcher.
Integrates onboarding flow for new user identification.
"""
import os import os
import re
import logging import logging
import asyncio
from typing import List, Optional from typing import List, Optional
from dataclasses import dataclass from dataclasses import dataclass
from telegram import Update from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
from src.agents.dispatcher import get_dispatcher from src.agents.dispatcher import get_dispatcher
from src.agents.onboarding import get_onboarding_manager, UserStatus
from src.agents.non_client_handler import get_non_client_handler
from src.clients.financial_client import get_financial_client
logger = logging.getLogger("ArthurTelegram") logger = logging.getLogger("ArthurTelegram")
@dataclass @dataclass
class TelegramConfig: class TelegramConfig:
token: str token: str
@ -26,15 +37,24 @@ class TelegramConfig:
return cls(token=token, allowed_users=allowed) return cls(token=token, allowed_users=allowed)
class TelegramListener: class TelegramListener:
""" """
Telegram Bot Listener for Arthur Agent. Telegram Bot Listener for Arthur Agent.
Receives messages/commands and dispatches them to the Agent Dispatcher.
Flow:
1. Check if user is authorized (whitelist)
2. Check if user is known (onboarding)
3. If new user: ask for name/company
4. If known: greet and dispatch to agent
""" """
def __init__(self): def __init__(self):
self.config = TelegramConfig.from_environment() self.config = TelegramConfig.from_environment()
self.app = ApplicationBuilder().token(self.config.token).build() self.app = ApplicationBuilder().token(self.config.token).build()
self._onboarding = get_onboarding_manager()
self._non_client = get_non_client_handler()
self._financial = get_financial_client()
self._setup_handlers() self._setup_handlers()
def _setup_handlers(self): def _setup_handlers(self):
@ -57,48 +77,178 @@ class TelegramListener:
async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await self._check_auth(update): if not await self._check_auth(update):
return return
user = update.effective_user
telegram_id = str(user.id)
# Check if user is known
is_known = await self._onboarding.is_known_user(telegram_id)
if is_known:
user_ctx = await self._onboarding.get_user_context(telegram_id)
greeting = self._onboarding.get_time_greeting()
name = user_ctx.get("name", "").split()[0] if user_ctx else "você"
await update.message.reply_text( await update.message.reply_text(
"👋 Olá! Sou o Arthur, seu assistente de Nível 2.\n" f"👋 {greeting}, {name}!\n\n"
"Pode me encaminhar tickets ou descrever problemas que eu analisarei." "Pode me encaminhar tickets ou descrever problemas que eu analisarei."
) )
else:
# Start onboarding
welcome_msg = self._onboarding.start_registration(telegram_id)
await update.message.reply_text(welcome_msg)
async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await self._check_auth(update): if not await self._check_auth(update):
return return
user = update.effective_user user = update.effective_user
telegram_id = str(user.id)
message = update.message.text message = update.message.text
# Feedback to user # Check if user is in pending registration
await update.message.reply_text("🤖 Analisando sua solicitação...") if self._onboarding.is_pending_registration(telegram_id):
await self._handle_registration(update, context)
return
# Check if user is known
is_known = await self._onboarding.is_known_user(telegram_id)
if not is_known:
# Start onboarding flow
welcome_msg = self._onboarding.start_registration(telegram_id)
await update.message.reply_text(welcome_msg)
return
# Get user context for personalized response
user_ctx = await self._onboarding.get_user_context(telegram_id)
# Check if user is a client
if user_ctx and not user_ctx.get("is_client"):
# Non-client handling
await self._handle_non_client_message(update, user_ctx)
return
# Known client user - proceed to agent
await self._process_with_agent(update, context, user_ctx)
async def _handle_registration(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle registration response (name and company)."""
user = update.effective_user
telegram_id = str(user.id)
message = update.message.text.strip()
# Parse "Name, Company" format
match = re.match(r'^(.+?),\s*(.+)$', message)
if not match:
await update.message.reply_text(
"📝 Por favor, informe no formato:\n"
"_Nome Completo, Empresa_\n\n"
"Exemplo: João Silva, iT Guys"
)
return
name = match.group(1).strip()
company = match.group(2).strip()
# Check if company is a client
tenant = await self._financial.get_tenant_by_name(company)
tenant_id = tenant.id if tenant else None
# Complete registration
profile = await self._onboarding.complete_registration(
telegram_id=telegram_id,
name=name,
company=company,
tenant_id=tenant_id
)
greeting = self._onboarding.get_time_greeting()
first_name = name.split()[0]
if tenant_id:
# Client user - welcome!
await update.message.reply_text(
f"✅ Cadastro realizado! {greeting}, {first_name}!\n\n"
f"Você está vinculado à empresa **{company}**.\n"
"Agora pode me descrever problemas técnicos que eu analisarei.\n\n"
"Como posso te ajudar hoje?"
)
else:
# Non-client - explain limitations
response = self._non_client.generate_response(name, company)
await update.message.reply_text(response)
async def _handle_non_client_message(self, update: Update, user_ctx: dict):
"""Handle message from non-client user (passive messenger mode)."""
user = update.effective_user
message = update.message.text
# Register the message for team follow-up
received = self._non_client.register_message(
telegram_id=str(user.id),
sender_name=user_ctx.get("name", "Desconhecido"),
company=user_ctx.get("company", "Desconhecida"),
message=message
)
# Polite response - no technical info
await update.message.reply_text(
"📝 Anotei sua mensagem e vou encaminhar para nossa equipe.\n\n"
"Alguém entrará em contato assim que possível. Obrigado!"
)
# TODO: Send notification to team channel
# team_msg = self._non_client.get_team_notification(received)
# await context.bot.send_message(chat_id=TEAM_CHANNEL_ID, text=team_msg)
async def _process_with_agent(
self,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
user_ctx: dict
):
"""Process message with Arthur Agent (Dispatcher)."""
user = update.effective_user
telegram_id = str(user.id)
message = update.message.text
# Personalized greeting
greeting = self._onboarding.get_time_greeting()
first_name = user_ctx.get("name", "você").split()[0]
# Send acknowledgment
await update.message.reply_text(
f"👋 {greeting}, {first_name}!\n"
"🔍 Analisando sua solicitação..."
)
# Dispatch to Arthur # Dispatch to Arthur
dispatcher = get_dispatcher() dispatcher = get_dispatcher()
# Create a ticket ID based on message ID
ticket_id = f"TG-{update.message.message_id}" ticket_id = f"TG-{update.message.message_id}"
try: try:
# We run dispatch in background to not block the bot,
# BUT we need to reply with the result.
# Ideally dispatcher should be awaited here or have a callback.
# For now, let's await it to give immediate feedback.
# If it takes too long, Telegram might timeout (but usually fine for <60s).
# Alternatively, using `create_task` and sending a message later requires calling context.bot.send_message
result = await dispatcher.dispatch( result = await dispatcher.dispatch(
ticket_id=ticket_id, ticket_id=ticket_id,
sender_email=f"{user.username or user.id}@telegram", sender_email=f"{user.username or user.id}@telegram",
subject=f"Message from {user.first_name}", subject=f"Message from {user_ctx.get('name', 'Suporte')}",
body=message, body=message,
message_id=str(update.message.message_id) message_id=str(update.message.message_id)
) )
# Update last contact
ticket_summary = None
if result.success and result.context.specialist_result:
ticket_summary = result.context.specialist_result.diagnosis[:100]
await self._onboarding.update_last_contact(telegram_id, ticket_summary)
# Format response # Format response
if result.success: if result.success:
response = f"✅ **Análise Concluída**\n\n{result.context.final_response}" response = f"Análise Concluída\n\n{result.context.final_response}"
else: else:
response = f"❌ **Erro na Análise**\n\n{result.context.error}" response = f"Erro na Análise\n\n{result.context.error}"
await update.message.reply_text(response) await update.message.reply_text(response)
@ -109,7 +259,6 @@ class TelegramListener:
async def start(self): async def start(self):
"""Start polling.""" """Start polling."""
logger.info("Initializing Telegram Bot polling...") logger.info("Initializing Telegram Bot polling...")
# drop_pending_updates=True to avoid processing old messages on startup
await self.app.initialize() await self.app.initialize()
await self.app.start() await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True) await self.app.updater.start_polling(drop_pending_updates=True)

206
tests/test_onboarding.py Normal file
View File

@ -0,0 +1,206 @@
"""
Unit tests for the Onboarding module.
Tests time-aware greetings, user identification, and registration
logic without requiring external dependencies (Qdrant mock).
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime
# Import after mocking to avoid connection errors
import sys
sys.modules['src.clients'] = MagicMock()
from src.agents.onboarding import (
OnboardingManager,
UserProfile,
UserStatus,
)
class TestGetTimeGreeting:
"""Tests for get_time_greeting() function."""
@pytest.fixture
def onboarding(self):
"""Create OnboardingManager with mocked Qdrant."""
with patch('src.agents.onboarding.get_qdrant_client'):
manager = OnboardingManager()
return manager
def test_morning_greeting(self, onboarding):
"""Should return 'Bom dia' for morning hours (5-11)."""
with patch('src.agents.onboarding.datetime') as mock_dt:
mock_dt.now.return_value = datetime(2026, 2, 2, 9, 0, 0)
result = onboarding.get_time_greeting()
assert result == "Bom dia"
def test_afternoon_greeting(self, onboarding):
"""Should return 'Boa tarde' for afternoon hours (12-17)."""
with patch('src.agents.onboarding.datetime') as mock_dt:
mock_dt.now.return_value = datetime(2026, 2, 2, 15, 30, 0)
result = onboarding.get_time_greeting()
assert result == "Boa tarde"
def test_evening_greeting(self, onboarding):
"""Should return 'Boa noite' for evening hours (18-4)."""
with patch('src.agents.onboarding.datetime') as mock_dt:
mock_dt.now.return_value = datetime(2026, 2, 2, 20, 0, 0)
result = onboarding.get_time_greeting()
assert result == "Boa noite"
def test_late_night_greeting(self, onboarding):
"""Should return 'Boa noite' for late night (2am)."""
with patch('src.agents.onboarding.datetime') as mock_dt:
mock_dt.now.return_value = datetime(2026, 2, 2, 2, 0, 0)
result = onboarding.get_time_greeting()
assert result == "Boa noite"
class TestIsKnownUser:
"""Tests for is_known_user() function."""
@pytest.fixture
def onboarding(self):
"""Create OnboardingManager with mocked Qdrant."""
with patch('src.agents.onboarding.get_qdrant_client') as mock_qdrant:
manager = OnboardingManager()
manager._qdrant = mock_qdrant.return_value
return manager
@pytest.mark.asyncio
async def test_unknown_user_returns_false(self, onboarding):
"""Should return False for unknown telegram_id."""
onboarding._qdrant.scroll = AsyncMock(return_value=([], None))
result = await onboarding.is_known_user("999999999")
assert result is False
@pytest.mark.asyncio
async def test_known_user_returns_true(self, onboarding):
"""Should return True for registered user."""
mock_result = MagicMock()
mock_result.payload = {
"telegram_id": "123456",
"name": "João Silva",
"company": "iT Guys",
"tenant_id": "tenant_itguys",
"status": "registered",
"first_contact": "2026-01-01T10:00:00+00:00",
"last_contact": "2026-02-02T12:00:00+00:00",
"interaction_count": 5
}
onboarding._qdrant.scroll = AsyncMock(return_value=([mock_result], None))
result = await onboarding.is_known_user("123456")
assert result is True
class TestStartRegistration:
"""Tests for start_registration() function."""
@pytest.fixture
def onboarding(self):
"""Create OnboardingManager with mocked Qdrant."""
with patch('src.agents.onboarding.get_qdrant_client'):
manager = OnboardingManager()
return manager
def test_returns_welcome_message(self, onboarding):
"""Should return welcome message with greeting."""
with patch.object(onboarding, 'get_time_greeting', return_value="Boa tarde"):
result = onboarding.start_registration("123456")
assert "Boa tarde" in result
assert "Nome Completo" in result
assert "Empresa" in result
def test_marks_user_as_pending(self, onboarding):
"""Should mark user as pending registration."""
onboarding.start_registration("123456")
assert onboarding.is_pending_registration("123456") is True
def test_unknown_user_not_pending(self, onboarding):
"""Unknown user should not be pending."""
assert onboarding.is_pending_registration("999999") is False
class TestUserProfile:
"""Tests for UserProfile dataclass."""
def test_to_dict_serialization(self):
"""Should serialize to dictionary correctly."""
profile = UserProfile(
telegram_id="123",
name="João",
company="iT Guys",
tenant_id="tenant_itguys",
status=UserStatus.REGISTERED,
first_contact=datetime(2026, 1, 1, 10, 0, 0),
last_contact=datetime(2026, 2, 2, 12, 0, 0)
)
result = profile.to_dict()
assert result["telegram_id"] == "123"
assert result["status"] == "registered"
assert "2026-01-01" in result["first_contact"]
def test_from_dict_deserialization(self):
"""Should deserialize from dictionary correctly."""
data = {
"telegram_id": "123",
"name": "Maria",
"company": "OESTEPAN",
"tenant_id": None,
"status": "non_client",
"first_contact": "2026-01-15T09:00:00",
"last_contact": "2026-01-15T09:00:00"
}
profile = UserProfile.from_dict(data)
assert profile.name == "Maria"
assert profile.status == UserStatus.NON_CLIENT
assert profile.tenant_id is None
class TestBuildGreeting:
"""Tests for build_greeting() function."""
@pytest.fixture
def onboarding(self):
"""Create OnboardingManager with mocked Qdrant."""
with patch('src.agents.onboarding.get_qdrant_client'):
manager = OnboardingManager()
return manager
def test_greeting_with_first_name(self, onboarding):
"""Should greet user by first name."""
profile = UserProfile(
telegram_id="123",
name="João Carlos Silva",
company="iT Guys",
tenant_id="tenant_itguys",
status=UserStatus.REGISTERED,
first_contact=datetime.now(),
last_contact=datetime.now()
)
with patch.object(onboarding, 'get_time_greeting', return_value="Bom dia"):
result = onboarding.build_greeting(profile)
assert "Bom dia, João!" in result