testes/app/routes/auth.py

157 lines
5.6 KiB
Python

from flask import Blueprint, request, jsonify, current_app
import jwt
from datetime import datetime, timedelta
import re
import ldap3
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPException
from functools import wraps
from flask_mysqldb import MySQL
import logging
# Configuração de logging para depuração
logging.basicConfig(level=logging.DEBUG)
auth = Blueprint('auth', __name__)
mysql = MySQL()
# Decorador para validar o token JWT
def token_required(f):
@wraps(f)
def wrapper(*args, **kwargs):
token = request.headers.get('x-access-token')
if not token:
return jsonify({'msg': 'Token é necessário!'}), 401
try:
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# Caso o token esteja expirado, gere um novo token
try:
# Obter o payload original para criar um novo token
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"], options={"verify_exp": False})
# Gere um novo token com nova validade
new_token = jwt.encode({
'user_id': payload['user_id'],
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, current_app.config['SECRET_KEY'], algorithm="HS256")
response = jsonify({'msg': 'Token expirado! Novo token gerado.', 'new_token': new_token})
response.status_code = 401
return response
except Exception as e:
return jsonify({'msg': 'Erro ao processar o token expirado.', 'error': str(e)}), 401
except jwt.InvalidTokenError:
return jsonify({'msg': 'Token inválido!'}), 401
return f(data, *args, **kwargs)
return wrapper
# Validação de SQL Injection
def is_valid_input(input_str):
sql_injection_pattern = r"(--|\b(SELECT|UNION|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b|;|'|\")"
return not re.search(sql_injection_pattern, input_str, re.IGNORECASE)
# Validação de e-mail
def is_valid_email(email):
return re.match(r"[^@]+@[^@]+\.[^@]+", email)
# Rota de login
@auth.route('/login', methods=['POST'])
def login():
data = request.get_json()
username_full = data.get('username')
password = data.get('password')
if not username_full or not password:
return jsonify({'msg': 'Usuário e senha são obrigatórios.'}), 400
if not is_valid_input(username_full) or not is_valid_input(password):
return jsonify({'msg': 'Solicitação rejeitada devido a padrões maliciosos.'}), 422
if not is_valid_email(username_full):
return jsonify({'msg': 'Formato de e-mail inválido'}), 400
# Separar nome de usuário e domínio
try:
username, domain = username_full.split('@')
print(f" Domínio extraído: {domain}") # <-- Adiciona este print
except ValueError:
return jsonify({'msg': 'Formato de usuário inválido. Use "usuario@dominio.com".'}), 400
# Buscar informações de domínio no banco de dados
try:
with mysql.connection.cursor() as cur:
cur.execute("SELECT idempresa, ip_dominio FROM empresa WHERE dominio = %s", (domain,))
empresa_result = cur.fetchone()
if not empresa_result:
return jsonify({'msg': 'Domínio não encontrado no banco de dados'}), 404
id_empresa, ip_dominio = empresa_result
# Verificar se o usuário está associado à empresa no MySQL
with mysql.connection.cursor() as cur:
cur.execute("SELECT empresa_id, dominio FROM view_usuario_empresa WHERE usuario = %s AND dominio = %s", (username, domain))
result = cur.fetchone()
if not result:
return jsonify({'msg': 'Usuário não associado à empresa'}), 404
except Exception as e:
logging.error(f"Erro no banco de dados: {str(e)}")
return jsonify({'msg': 'Erro no banco de dados'}), 500
# Conexão LDAP com autenticação
ldap_server = f'ldap://{"itguys.com.br:389"}' # Altere para 'ldaps://{ip_dominio}:636' se LDAPS for usado
ldap_user = f'{domain}\\{username}' # Formato DOMAIN\username
try:
server = ldap3.Server(ldap_server, get_info=ldap3.ALL)
conn = ldap3.Connection(
server,
user=username_full,
password=password,
authentication=ldap3.SIMPLE
)
if conn.bind():
token = jwt.encode({
'user': username_full,
'exp': datetime.utcnow() + timedelta(hours=1)
}, current_app.config['SECRET_KEY'], algorithm="HS256")
conn.unbind()
logging.info(f"Login bem-sucedido para usuário: {username_full}")
return jsonify({'msg': 'Login bem-sucedido', 'token': token}), 200
else:
conn.unbind()
logging.warning(f"Falha na autenticação LDAP para o usuário: {username_full}")
return jsonify({'msg': 'Falha na autenticação LDAP. Verifique credenciais.'}), 401
except LDAPSocketOpenError:
logging.error("Erro de conexão com o servidor LDAP.")
return jsonify({'msg': 'Não foi possível conectar ao servidor LDAP. Verifique o IP e a porta.'}), 503
except LDAPException as e:
logging.error(f"Erro LDAP: {str(e)}")
return jsonify({'msg': 'Erro durante a autenticação no servidor LDAP.'}), 500
except Exception as e:
logging.error(f"Erro inesperado: {str(e)}")
return jsonify({'msg': 'Erro inesperado durante a autenticação.'}), 500