testes/app/routes/auth.py

120 lines
4.3 KiB
Python

from flask import Blueprint, request, jsonify, current_app
import jwt
from datetime import datetime, timedelta
import re
import ldap3
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPException
from functools import wraps
import logging
from flask_mysqldb import MySQL
# Configuração de logging para depuração
logging.basicConfig(level=logging.DEBUG)
auth = Blueprint('auth', __name__)
mysql = MySQL()
# Decorador para validar o token JWT
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
token = request.headers.get('x-access-token')
if not token:
return jsonify({'msg': 'Token é necessário!'}), 401
try:
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
except jwt.ExpiredSignatureError:
return jsonify({'msg': 'Token expirado!'}), 401
except jwt.InvalidTokenError:
return jsonify({'msg': 'Token inválido!'}), 401
return f(data, *args, **kwargs)
return wrapper
# Validação de SQL Injection
def is_valid_input(input_str):
sql_injection_pattern = r"(--|\b(SELECT|UNION|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b|;|'|\")"
return not re.search(sql_injection_pattern, input_str, re.IGNORECASE)
# Validação de e-mail
def is_valid_email(email):
return re.match(r"[^@]+@[^@]+\.[^@]+", email)
# Rota de login
@auth.route('/login', methods=['POST'])
def login():
data = request.get_json()
username_full = data.get('username')
password = data.get('password')
if not username_full or not password:
return jsonify({'msg': 'Usuário e senha são obrigatórios.'}), 400
if not is_valid_input(username_full) or not is_valid_input(password):
return jsonify({'msg': 'Solicitação rejeitada devido a padrões maliciosos.'}), 422
if not is_valid_email(username_full):
return jsonify({'msg': 'Formato de e-mail inválido'}), 400
# Separar nome de usuário e domínio
try:
username, domain = username_full.split('@')
except ValueError:
return jsonify({'msg': 'Formato de usuário inválido. Use "usuario@dominio.com".'}), 400
# Verificar se o usuário está associado à empresa no MySQL
try:
with mysql.connection.cursor() as cur:
cur.execute("SELECT fk_id_empresa, dominio FROM usuarios WHERE usuario = %s AND dominio = %s", (username, domain))
result = cur.fetchone()
if not result:
return jsonify({'msg': 'Usuário não associado à empresa'}), 404
except Exception as e:
logging.error(f"Erro no banco de dados: {str(e)}")
return jsonify({'msg': 'Erro no banco de dados'}), 500
# Autenticação LDAP usando domínio extraído
ldap_server = f'ldap://{domain}:389'
ldap_user = f'{domain}\\{username}'
try:
server = ldap3.Server(ldap_server, get_info=ldap3.ALL)
conn = ldap3.Connection(
server,
user=username_full,
password=password,
authentication=ldap3.SIMPLE # Sempre utilizar o modo SIMPLE
)
if conn.bind():
token = jwt.encode({
'user': username_full,
'domain': domain, # Incluindo o domínio no payload
'exp': datetime.utcnow() + timedelta(hours=1)
}, current_app.config['SECRET_KEY'], algorithm="HS256")
conn.unbind()
logging.info(f"Login bem-sucedido para usuário: {username_full}")
return jsonify({'msg': 'Login bem-sucedido', 'token': token, 'domain': domain}), 200
else:
conn.unbind()
logging.warning(f"Falha na autenticação LDAP para o usuário: {username_full}")
return jsonify({'msg': 'Falha na autenticação LDAP. Verifique credenciais.'}), 401
except LDAPSocketOpenError:
logging.error("Erro de conexão com o servidor LDAP.")
return jsonify({'msg': 'Não foi possível conectar ao servidor LDAP. Verifique o IP e a porta.'}), 503
except LDAPException as e:
logging.error(f"Erro LDAP: {str(e)}")
return jsonify({'msg': 'Erro durante a autenticação no servidor LDAP.'}), 500
except Exception as e:
logging.error(f"Erro inesperado: {str(e)}")
return jsonify({'msg': 'Erro inesperado durante a autenticação.'}), 500