feat(telegram): Integração Telegram e otimizações para CPU

RESUMO:
Substituição do listener de Email (IMAP) por Bot do Telegram.
Correções de conectividade Ollama e timeout para inferência CPU.

ALTERAÇÕES:

1. NOVA INTEGRAÇÃO TELEGRAM:
   - Criado src/clients/telegram_client.py (TelegramListener)
   - Autenticação por ID de usuário (whitelist)
   - Comandos /start e processamento de mensagens de texto
   - Integração com Dispatcher para tickets via chat

2. CORREÇÕES OLLAMA:
   - OLLAMA_BASE_URL: localhost -> ollama (Docker network)
   - Timeout aumentado: 120s -> 300s (CPU inference lenta)
   - Contexto Specialist reduzido: 8192 -> 4096 tokens

3. HOMOLOGATION:
   - Substituído check de Email por check de Telegram
   - Usa httpx para validar token via /getMe

4. MOCK FINANCIAL CLIENT:
   - Adicionado get_tenant_by_email() method
   - Domínio 'telegram' adicionado ao tenant iT Guys

5. DATABASE CONNECTION:
   - URL-encode de credenciais no DSN (caracteres especiais)

PORQUE FOI FEITA ESSA ALTERAÇÃO?
- Email/IMAP: Problemas persistentes de autenticação com Exchange
- Telegram: Interface mais ágil para testes controlados
- Timeout: Modelo 8B em 4 cores CPU estourava 120s

QUAIS TESTES FORAM FEITOS?
- Homologação completa: 7/7 checks passando
- Teste direto OllamaClient.generate_triage(): OK
- Teste direto OllamaClient.generate_specialist(): OK
- Teste TriageAgent.process_ticket(): OK
- Teste via Telegram Bot: Mensagens recebidas e processadas

A ALTERAÇÃO GEROU NOVO TESTE?
Não. Scripts de debug criados em src/verification/ são auxiliares.
Pipeline existente (pytest) continua válido.
This commit is contained in:
João Pedro Toledo Goncalves 2026-02-02 15:22:10 -03:00
parent 83fad3ed31
commit 76271d7273
15 changed files with 642 additions and 53 deletions

0
agent_logs_tg.txt Normal file
View File

View File

@ -11,6 +11,7 @@ services:
- PYTHONUNBUFFERED=1
- POSTGRES_HOST=postgres
- QDRANT_HOST=qdrant
- OLLAMA_BASE_URL=http://ollama:11434
env_file:
- .env
depends_on:
@ -56,14 +57,52 @@ services:
networks:
- arthur_net
# Langfuse - AI Tracing (Optional for debugging)
# Ollama - Local LLM Server
ollama:
image: ollama/ollama:latest
container_name: arthur_ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
networks:
- arthur_net
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
# Remove deploy section if no GPU is available or comment out above lines
# Ollama Model Puller (Ephemeral)
ollama-init:
image: curlimages/curl:latest
container_name: arthur_ollama_init
depends_on:
- ollama
networks:
- arthur_net
entrypoint: >
/bin/sh -c "
echo 'Waiting for Ollama to start...';
while ! curl -s http://ollama:11434/api/tags > /dev/null; do sleep 2; done;
echo 'Pulling Llama 3.2 1B (Triage)...';
curl -X POST http://ollama:11434/api/pull -d '{\"name\": \"llama3.2:1b\"}';
echo 'Pulling Llama 3.1 8B (Specialist)...';
curl -X POST http://ollama:11434/api/pull -d '{\"name\": \"llama3.1:8b\"}';
echo 'Models pulled successfully!';
"
# Langfuse - AI Tracing
langfuse:
image: langfuse/langfuse:2
container_name: arthur_langfuse
ports:
- "3000:3000"
environment:
DATABASE_URL: postgresql://${POSTGRES_USER:-arthur}:${POSTGRES_PASSWORD:-Arth#Sup0rt3_2026!xK9}@postgres:5432/langfuse
DATABASE_URL: postgresql://${POSTGRES_USER:-arthur}:Arth%23Sup0rt3_2026%21xK9@postgres:5432/langfuse
NEXTAUTH_SECRET: ${LANGFUSE_SECRET:-ArthurLangfuseSecret2026}
NEXTAUTH_URL: http://localhost:3000
SALT: ${LANGFUSE_SALT:-ArthurSalt2026Random}
@ -72,12 +111,11 @@ services:
condition: service_healthy
networks:
- arthur_net
profiles:
- monitoring # Only start with: docker-compose --profile monitoring up
volumes:
postgres_data:
qdrant_data:
ollama_data:
networks:
arthur_net:

View File

@ -27,6 +27,7 @@ langgraph>=0.0.20
# ==========================================
# INTEGRATIONS
# ==========================================
python-telegram-bot>=21.0
zabbix-utils>=2.0.0
aiosmtplib>=3.0.0
aioimaplib>=1.1.0

View File

@ -57,7 +57,7 @@ MOCK_TENANTS: list[TenantContext] = [
TenantContext(
id="tenant_itguys",
name="iT Guys Tecnologia",
email_domains=["itguys.com.br", "itguys.net"],
email_domains=["itguys.com.br", "itguys.net", "telegram"],
status=TenantStatus.ACTIVE,
zabbix_host_group="ITGUYS-Internal",
qdrant_collection="itguys_knowledge",
@ -169,6 +169,10 @@ class MockFinancialClient:
domain = email.split("@")[1]
return await self.get_tenant_by_email_domain(domain)
async def get_tenant_by_email(self, email: str) -> Optional[TenantContext]:
"""Alias for resolve_tenant_from_email."""
return await self.resolve_tenant_from_email(email)
# Factory function for dependency injection

View File

@ -31,6 +31,7 @@ class MailConfig:
# Credentials
email_address: str = "arthur.servicedesk@itguys.com.br"
email_username: Optional[str] = None
email_password: Optional[str] = None
# Polling Configuration
@ -48,6 +49,8 @@ class MailConfig:
"""
secrets = get_secrets_manager()
email_addr = os.getenv("MAIL_ADDRESS", "arthur.servicedesk@itguys.com.br")
return cls(
imap_host=os.getenv("MAIL_IMAP_HOST", "mail.itguys.com.br"),
imap_port=int(os.getenv("MAIL_IMAP_PORT", "993")),
@ -55,10 +58,8 @@ class MailConfig:
smtp_host=os.getenv("MAIL_SMTP_HOST", "mail.itguys.com.br"),
smtp_port=int(os.getenv("MAIL_SMTP_PORT", "587")),
smtp_use_tls=os.getenv("MAIL_SMTP_TLS", "true").lower() == "true",
email_address=os.getenv(
"MAIL_ADDRESS",
"arthur.servicedesk@itguys.com.br"
),
email_address=email_addr,
email_username=os.getenv("MAIL_USERNAME", email_addr),
email_password=secrets.get("MAIL_PASSWORD"),
poll_interval_seconds=int(os.getenv("MAIL_POLL_INTERVAL", "30")),
inbox_folder=os.getenv("MAIL_INBOX_FOLDER", "INBOX"),
@ -86,4 +87,154 @@ class MailConfig:
if not self.smtp_host:
errors.append("SMTP host is required")
return errors
import asyncio
import logging
from datetime import datetime, timezone
import aioimaplib
import email
from email.message import EmailMessage
logger = logging.getLogger("ArthurMail")
class MailListener:
"""Listener for incoming emails via IMAP."""
def __init__(self):
"""Initialize listener with configuration."""
self.config = MailConfig.from_environment()
self._client: Optional[aioimaplib.IMAP4_SSL] = None
async def connect(self) -> bool:
"""Connect to IMAP server."""
try:
import ssl
# Create permissive SSL context for self-signed certs
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if self.config.imap_use_ssl:
self._client = aioimaplib.IMAP4_SSL(
host=self.config.imap_host,
port=self.config.imap_port,
ssl_context=ctx
)
else:
self._client = aioimaplib.IMAP4(
host=self.config.imap_host,
port=self.config.imap_port
)
# If non-ssl but port 143, we might need STARTTLS if desired
# But typically 143 can be plain or starttls.
# aioimaplib supports starttls method.
if self.config.imap_port == 143:
try:
await self._client.starttls(ssl_context=ctx)
logger.info("Enhanced connection with STARTTLS")
except Exception as e:
logger.warning(f"STARTTLS failed, continuing plain: {e}")
await self._client.wait_hello_from_server()
res = await self._client.login(
self.config.email_username,
self.config.email_password
)
if res.result != 'OK':
logger.error(f"IMAP login failed: {res}")
return False
logger.info(f"Connected to IMAP as {self.config.email_username}")
return True
except Exception as e:
logger.error(f"IMAP connection failed: {e}")
return False
async def start_polling(self, interval: int = 60):
"""Start polling loop for new emails."""
logger.info(f"Starting email polling every {interval}s")
while True:
try:
if not self._client:
if not await self.connect():
await asyncio.sleep(interval)
continue
await self._client.select(self.config.inbox_folder)
# Search for UNSEEN emails
# Note: 'UNSEEN' flag is standard IMAP
status, messages = await self._client.search("UNSEEN")
if status == "OK":
for message_id in messages[0].split():
if not message_id:
continue
await self._process_message(message_id)
except (OSError, asyncio.TimeoutError) as e:
logger.error(f"Connection lost: {e}. Reconnecting...")
self._client = None
except Exception as e:
logger.error(f"Polling error: {e}")
await asyncio.sleep(interval)
async def _process_message(self, message_id: bytes):
"""Fetch and process a single message."""
try:
status, data = await self._client.fetch(message_id, "(RFC822)")
if status != "OK":
logger.error(f"Failed to fetch message {message_id}")
return
raw_email = data[1]
msg = email.message_from_bytes(raw_email)
subject = msg["Subject"]
sender = msg["From"]
msg_id = msg["Message-ID"] or str(message_id)
# Extract body
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" not in content_disposition:
if content_type == "text/plain":
body = part.get_payload(decode=True).decode()
break # Prefer plain text
else:
body = msg.get_payload(decode=True).decode()
logger.info(f"Processing email: {subject} from {sender}")
# Dispatch to Arthur
from src.agents.dispatcher import get_dispatcher
dispatcher = get_dispatcher()
ticket_id = f"TICKET-{int(datetime.now().timestamp())}"
# Fire and forget / background processing
asyncio.create_task(dispatcher.dispatch(
ticket_id=ticket_id,
sender_email=sender,
subject=subject,
body=body,
message_id=msg_id
))
# Mark as read/processed (IMAP default behavior when fetching, but good to be explicit if moving folders)
# await self._client.uid('copy', message_id, self.config.processed_folder)
except Exception as e:
logger.error(f"Error processing message {message_id}: {e}")

View File

@ -43,10 +43,10 @@ class OllamaClient:
self._triage_context = config.triage_context
self._specialist_context = config.specialist_context
# HTTP client with longer timeout for LLM
# HTTP client with longer timeout for LLM (CPU inference can be slow)
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=httpx.Timeout(120.0, connect=10.0)
timeout=httpx.Timeout(300.0, connect=10.0) # 5 minutes for 8B on CPU
)
async def health_check(self) -> bool:

View File

@ -0,0 +1,121 @@
import os
import logging
import asyncio
from typing import List, Optional
from dataclasses import dataclass
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
from src.agents.dispatcher import get_dispatcher
logger = logging.getLogger("ArthurTelegram")
@dataclass
class TelegramConfig:
token: str
allowed_users: List[int]
@classmethod
def from_environment(cls) -> "TelegramConfig":
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
raise ValueError("TELEGRAM_BOT_TOKEN not set")
allowed_raw = os.getenv("TELEGRAM_ALLOWED_USERS", "")
allowed = [int(u.strip()) for u in allowed_raw.split(",") if u.strip().isdigit()]
return cls(token=token, allowed_users=allowed)
class TelegramListener:
"""
Telegram Bot Listener for Arthur Agent.
Receives messages/commands and dispatches them to the Agent Dispatcher.
"""
def __init__(self):
self.config = TelegramConfig.from_environment()
self.app = ApplicationBuilder().token(self.config.token).build()
self._setup_handlers()
def _setup_handlers(self):
# /start command
self.app.add_handler(CommandHandler("start", self._handle_start))
# Text messages
self.app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), self._handle_message))
async def _check_auth(self, update: Update) -> bool:
"""Verify if user is allowed."""
user = update.effective_user
if not user or user.id not in self.config.allowed_users:
logger.warning(f"Unauthorized access attempt from user {user.id} ({user.username})")
if update.message:
await update.message.reply_text("⛔ Acesso não autorizado. Contate o administrador.")
return False
return True
async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await self._check_auth(update):
return
await update.message.reply_text(
"👋 Olá! Sou o Arthur, seu assistente de Nível 2.\n"
"Pode me encaminhar tickets ou descrever problemas que eu analisarei."
)
async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await self._check_auth(update):
return
user = update.effective_user
message = update.message.text
# Feedback to user
await update.message.reply_text("🤖 Analisando sua solicitação...")
# Dispatch to Arthur
dispatcher = get_dispatcher()
# Create a ticket ID based on message ID
ticket_id = f"TG-{update.message.message_id}"
try:
# We run dispatch in background to not block the bot,
# BUT we need to reply with the result.
# Ideally dispatcher should be awaited here or have a callback.
# For now, let's await it to give immediate feedback.
# If it takes too long, Telegram might timeout (but usually fine for <60s).
# Alternatively, using `create_task` and sending a message later requires calling context.bot.send_message
result = await dispatcher.dispatch(
ticket_id=ticket_id,
sender_email=f"{user.username or user.id}@telegram",
subject=f"Message from {user.first_name}",
body=message,
message_id=str(update.message.message_id)
)
# Format response
if result.success:
response = f"✅ **Análise Concluída**\n\n{result.context.final_response}"
else:
response = f"❌ **Erro na Análise**\n\n{result.context.error}"
await update.message.reply_text(response)
except Exception as e:
logger.error(f"Error processing telegram message: {e}")
await update.message.reply_text(f"⚠️ Ocorreu um erro interno: {str(e)}")
async def start(self):
"""Start polling."""
logger.info("Initializing Telegram Bot polling...")
# drop_pending_updates=True to avoid processing old messages on startup
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
async def stop(self):
"""Stop polling."""
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()

View File

@ -64,7 +64,7 @@ class LLMConfig:
# Context windows
triage_context: int = 2048
specialist_context: int = 8192
specialist_context: int = 4096 # Reduced for faster CPU inference
@dataclass

View File

@ -12,6 +12,7 @@ from contextlib import asynccontextmanager
import asyncpg
from asyncpg import Pool
from urllib.parse import quote_plus
from src.security.secrets_manager import SecretsManager
@ -32,10 +33,11 @@ class DatabaseManager:
@property
def dsn(self) -> str:
"""Build PostgreSQL connection DSN from secrets."""
"""Build PostgreSQL connection DSN from secrets."""
return (
f"postgresql://{self._secrets.get('POSTGRES_USER')}:"
f"{self._secrets.get('POSTGRES_PASSWORD')}@"
f"postgresql://{quote_plus(self._secrets.get('POSTGRES_USER'))}:"
f"{quote_plus(self._secrets.get('POSTGRES_PASSWORD'))}@"
f"{self._secrets.get('POSTGRES_HOST')}:"
f"{self._secrets.get('POSTGRES_PORT')}/"
f"{self._secrets.get('POSTGRES_DB')}"

View File

@ -91,7 +91,7 @@ class HomologationValidator:
await self._check_ollama()
await self._check_zabbix()
await self._check_financial_system()
await self._check_email_system()
await self._check_telegram()
await self._check_rate_limiter()
self._result.ended_at = datetime.now(timezone.utc)
@ -114,11 +114,14 @@ class HomologationValidator:
start = time.time()
try:
from src.clients import get_db_connection
from src.database.connection import get_db_manager
async with get_db_connection() as db:
# Simple query to verify connection
result = await db.execute("SELECT 1")
db = get_db_manager()
await db.connect()
# Simple query to verify connection
result = await db.fetchval("SELECT 1")
await db.disconnect()
check.status = ValidationStatus.PASSED
check.message = "Conexão PostgreSQL OK"
@ -255,44 +258,54 @@ class HomologationValidator:
check.duration_ms = int((time.time() - start) * 1000)
self._result.checks.append(check)
async def _check_email_system(self) -> None:
"""Check email system (IMAP/SMTP)."""
async def _check_telegram(self) -> None:
"""Check Telegram Bot connectivity."""
import time
import os
check = ValidationCheck(
name="email_system",
description="Verificar sistema de email (IMAP/SMTP)"
name="telegram_bot",
description="Verificar conectividade com API do Telegram"
)
start = time.time()
# Check if credentials are configured
mail_password = os.getenv("MAIL_PASSWORD", "")
if not mail_password:
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
check.status = ValidationStatus.SKIPPED
check.message = "Credenciais de email não configuradas"
check.message = "Token do Telegram não configurado"
else:
try:
from src.clients import MailListener
# We need to verify connectivity.
import httpx
listener = MailListener()
connected = await listener.connect()
# Mask token for logging: 12345...ABCD
masked = f"{token[:5]}...{token[-4:]}"
if connected:
check.status = ValidationStatus.PASSED
check.message = "Conexão IMAP estabelecida"
else:
check.status = ValidationStatus.FAILED
check.message = "Falha na conexão IMAP"
api_url = f"https://api.telegram.org/bot{token}/getMe"
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(api_url)
except ImportError:
check.status = ValidationStatus.SKIPPED
check.message = "Módulo de email não implementado"
if resp.status_code == 200:
data = resp.json()
if data.get("ok"):
user = data["result"]
check.status = ValidationStatus.PASSED
check.message = f"Telegram OK - @{user.get('username')} (ID: {user.get('id')})"
else:
check.status = ValidationStatus.FAILED
check.message = f"Telegram API Error: {data.get('description')}"
elif resp.status_code == 401:
check.status = ValidationStatus.FAILED
check.message = "Telegram Token Inválido (401 Unauthorized)"
else:
check.status = ValidationStatus.WARNING
check.message = f"Telegram HTTP {resp.status_code}"
except Exception as e:
check.status = ValidationStatus.FAILED
check.message = f"Erro no email: {str(e)}"
check.message = f"Falha na conexão Telegram: {str(e)}"
check.duration_ms = int((time.time() - start) * 1000)
self._result.checks.append(check)
@ -342,20 +355,20 @@ class HomologationValidator:
"",
f" Data: {self._result.started_at.strftime('%Y-%m-%d %H:%M:%S')}",
"",
f" Passou: {self._result.passed}",
f" Falhou: {self._result.failed}",
f" ⚠️ Aviso: {self._result.warnings}",
f" [OK] Passou: {self._result.passed}",
f" [FAIL] Falhou: {self._result.failed}",
f" [WARN] Aviso: {self._result.warnings}",
"",
" DETALHES:",
"-" * 60,
]
status_icons = {
ValidationStatus.PASSED: "",
ValidationStatus.FAILED: "",
ValidationStatus.WARNING: "⚠️",
ValidationStatus.SKIPPED: "⏭️",
ValidationStatus.PENDING: "",
ValidationStatus.PASSED: "[OK]",
ValidationStatus.FAILED: "[FAIL]",
ValidationStatus.WARNING: "[WARN]",
ValidationStatus.SKIPPED: "[SKIP]",
ValidationStatus.PENDING: "[WAIT]",
}
for check in self._result.checks:
@ -366,10 +379,19 @@ class HomologationValidator:
lines.append("=" * 60)
# Add explicit summary of problems
problems = [c for c in self._result.checks if c.status in (ValidationStatus.WARNING, ValidationStatus.FAILED)]
if problems:
lines.append(" RESUMO DE PROBLEMAS:")
for p in problems:
icon = status_icons.get(p.status, "?")
lines.append(f" {icon} {p.name}: {p.message}")
lines.append("=" * 60)
if self._result.all_passed:
lines.append(" ✅ HOMOLOGAÇÃO APROVADA")
lines.append(" [OK] HOMOLOGACAO APROVADA")
else:
lines.append(" ❌ HOMOLOGAÇÃO REPROVADA")
lines.append(" [FAIL] HOMOLOGACAO REPROVADA")
lines.append("=" * 60)
@ -391,4 +413,15 @@ async def run_homologation(environment: str = "development") -> HomologationResu
print(validator.format_report())
return result
if __name__ == "__main__":
import asyncio
import sys
# Force UTF-8 output for Windows console
if sys.platform == "win32":
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
asyncio.run(run_homologation())

64
src/main.py Normal file
View File

@ -0,0 +1,64 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from src.agents.dispatcher import get_dispatcher, DispatcherResult
from contextlib import asynccontextmanager
import asyncio
import logging
from src.clients.mail_client import MailListener
logger = logging.getLogger("ArthurApp")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting Arthur Agent...")
# Start Telegram Listener in background
from src.clients.telegram_client import TelegramListener
listener = TelegramListener()
# Run in background
telegram_task = asyncio.create_task(listener.start())
logger.info("Telegram Listener started.")
yield
# Shutdown
logger.info("Shutting down...")
await listener.stop()
telegram_task.cancel()
try:
await telegram_task
except asyncio.CancelledError:
pass
app = FastAPI(title="Arthur - AI Agent Support N2", lifespan=lifespan)
class TicketRequest(BaseModel):
ticket_id: str
sender_email: str
subject: str
body: str
message_id: Optional[str] = None
@app.post("/analyze")
async def analyze_ticket(request: TicketRequest):
dispatcher = get_dispatcher()
result = await dispatcher.dispatch(
ticket_id=request.ticket_id,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
message_id=request.message_id
)
if not result.success:
raise HTTPException(status_code=500, detail=result.context.error or "Unknown error")
return result
@app.get("/health")
async def health_check():
return {"status": "ok"}

View File

@ -0,0 +1,42 @@
import imaplib
import ssl
import os
def test_auth(user, password):
host = "172.16.150.150"
port = 993
print(f"\n--- Trying Login: User='{user}' ---")
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
m = imaplib.IMAP4_SSL(host, port, ssl_context=ctx)
m.login(user, password)
print("SUCCESS! Logged in.")
m.logout()
return True
except Exception as e:
print(f"FAILED: {e}")
return False
if __name__ == "__main__":
# Password: 3$4SiucP3GzT (escaped logic handled by direct string here)
pwd = "3$4SiucP3GzT"
# 1. Domain\User
test_auth("exch\\noc", pwd)
# 2. Email Address
test_auth("noc@itguys.com.br", pwd)
# 3. Simple User
test_auth("noc", pwd)
# 4. Check Env Var value
env_pwd = os.getenv("MAIL_PASSWORD")
print(f"\nENV Password check: Matches hardcoded? {env_pwd == pwd}")
if env_pwd != pwd:
print(f"ENV Password is: {env_pwd}")

View File

@ -0,0 +1,20 @@
import os
import sys
from src.clients.mail_client import MailConfig
if __name__ == "__main__":
try:
config = MailConfig.from_environment()
user = config.email_username
password = config.email_password
print(f"RAW USERNAME: {repr(user)}")
print(f"RAW PASSWORD: {repr(password)}")
if password:
print(f"Password starts with: {password[:2]}...")
print(f"Password length: {len(password)}")
except Exception as e:
print(f"Error: {e}")

View File

@ -0,0 +1,60 @@
import imaplib
import ssl
import os
from src.clients.mail_client import MailConfig
def test_sync(host, port, use_ssl, try_starttls=False):
print(f"\n--- Testing connection to {host}:{port} (SSL={use_ssl}, STARTTLS={try_starttls}) ---")
config = MailConfig.from_environment()
user = config.email_username
password = config.email_password
# Context
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
m = None
try:
if use_ssl:
print("Connecting with IMAP4_SSL...")
m = imaplib.IMAP4_SSL(host, port, ssl_context=ctx)
else:
print("Connecting with IMAP4...")
m = imaplib.IMAP4(host, port)
if try_starttls:
print("Attempting STARTTLS...")
m.starttls(ssl_context=ctx)
print("STARTTLS successful.")
print(f"Server Hello: {m.welcome}")
print(f"Capabilities: {m.capabilities}")
print(f"Logging in as {user}...")
res, data = m.login(user, password)
print(f"Login Result: {res} {data}")
if res == 'OK':
print("SUCCESS! Authenticated.")
m.logout()
else:
print("FAILURE: Login failed.")
except Exception as e:
print(f"ERROR: {e}")
if __name__ == "__main__":
ip = os.getenv("MAIL_IMAP_HOST", "172.16.150.150")
# Test 1: 993 SSL
test_sync(ip, 993, True, False)
# Test 2: 143 STARTTLS
test_sync(ip, 143, False, True)
# Test 3: 143 Plain
test_sync(ip, 143, False, False)

View File

@ -0,0 +1,53 @@
import imaplib
import ssl
import os
import base64
def test_auth_plain(user, password):
host = "172.16.150.150"
port = 993
print(f"\n--- Trying AUTH=PLAIN: User='{user}' ---")
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
m = imaplib.IMAP4_SSL(host, port, ssl_context=ctx)
# Build PLAIN auth string: NULL user NULL password
# Sometimes it's authzid NULL authcid NULL password
# Usually: \0user\0password
auth_str = f"\0{user}\0{password}"
try:
res = m.authenticate("PLAIN", lambda x: auth_str.encode())
print(f"PLAIN Result: {res}")
m.logout()
except Exception as e:
print(f"PLAIN Failed: {e}")
# Try LOGIN command again specifically to be sure
try:
print("Fallback to LOGIN...")
m.login(user, password)
print("LOGIN Success (Fallback)!")
m.logout()
except Exception as e2:
print(f"LOGIN Failed: {e2}")
except Exception as e:
print(f"Connection Failed: {e}")
if __name__ == "__main__":
pwd = os.getenv("MAIL_PASSWORD", "3$4SiucP3GzT")
# 1. noc@itguys.com.br
test_auth_plain("noc@itguys.com.br", pwd)
# 2. exch.local\noc
test_auth_plain("exch.local\\noc", pwd)
# 3. ITGUYS\noc (common alternative)
test_auth_plain("ITGUYS\\noc", pwd)