150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
from flask import Blueprint, request, jsonify, current_app
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
import ldap3
|
|
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPException
|
|
from functools import wraps
|
|
from flask_mysqldb import MySQL
|
|
import logging
|
|
|
|
# Configuração de logging para depuração
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
auth = Blueprint('auth', __name__)
|
|
mysql = MySQL()
|
|
|
|
|
|
# Decorador para validar o token JWT
|
|
def token_required(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
token = request.headers.get('x-access-token')
|
|
if not token:
|
|
return jsonify({'msg': 'Token é necessário!'}), 401
|
|
|
|
try:
|
|
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
|
|
except jwt.ExpiredSignatureError:
|
|
# Caso o token esteja expirado, gere um novo token
|
|
try:
|
|
# Obter o payload original para criar um novo token
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"], options={"verify_exp": False})
|
|
|
|
# Gere um novo token com nova validade
|
|
new_token = jwt.encode({
|
|
'user_id': payload['user_id'],
|
|
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
|
|
}, current_app.config['SECRET_KEY'], algorithm="HS256")
|
|
|
|
response = jsonify({'msg': 'Token expirado! Novo token gerado.', 'new_token': new_token})
|
|
response.status_code = 401
|
|
return response
|
|
except Exception as e:
|
|
return jsonify({'msg': 'Erro ao processar o token expirado.', 'error': str(e)}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'msg': 'Token inválido!'}), 401
|
|
|
|
return f(data, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
# Validação de SQL Injection
|
|
def is_valid_input(input_str):
|
|
sql_injection_pattern = r"(--|\b(SELECT|UNION|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b|;|'|\")"
|
|
return not re.search(sql_injection_pattern, input_str, re.IGNORECASE)
|
|
|
|
|
|
# Validação de e-mail
|
|
def is_valid_email(email):
|
|
return re.match(r"[^@]+@[^@]+\.[^@]+", email)
|
|
|
|
|
|
# Rota de login
|
|
@auth.route('/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
|
|
username_full = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not username_full or not password:
|
|
return jsonify({'msg': 'Usuário e senha são obrigatórios.'}), 400
|
|
|
|
if not is_valid_input(username_full) or not is_valid_input(password):
|
|
return jsonify({'msg': 'Solicitação rejeitada devido a padrões maliciosos.'}), 422
|
|
|
|
if not is_valid_email(username_full):
|
|
return jsonify({'msg': 'Formato de e-mail inválido'}), 400
|
|
|
|
# Separar nome de usuário e domínio
|
|
try:
|
|
username, domain = username_full.split('@')
|
|
except ValueError:
|
|
return jsonify({'msg': 'Formato de usuário inválido. Use "usuario@dominio.com".'}), 400
|
|
|
|
# Buscar informações de domínio no banco de dados
|
|
try:
|
|
with mysql.connection.cursor() as cur:
|
|
cur.execute("SELECT idempresa, ip_dominio FROM empresa WHERE dominio = %s", (domain,))
|
|
empresa_result = cur.fetchone()
|
|
|
|
if not empresa_result:
|
|
return jsonify({'msg': 'Domínio não encontrado no banco de dados'}), 404
|
|
|
|
id_empresa, ip_dominio = empresa_result
|
|
|
|
# Verificar se o usuário está associado à empresa no MySQL
|
|
with mysql.connection.cursor() as cur:
|
|
cur.execute("SELECT empresa_id, dominio FROM view_usuario_empresa WHERE usuario = %s AND dominio = %s", (username, domain))
|
|
result = cur.fetchone()
|
|
|
|
if not result:
|
|
return jsonify({'msg': 'Usuário não associado à empresa'}), 404
|
|
|
|
except Exception as e:
|
|
logging.error(f"Erro no banco de dados: {str(e)}")
|
|
return jsonify({'msg': 'Erro no banco de dados'}), 500
|
|
|
|
# Conexão LDAP com autenticação
|
|
ldap_server = f'ldap://{"itguys.com.br:389"}'
|
|
ldap_user = f'{domain}\\{username}'
|
|
|
|
try:
|
|
server = ldap3.Server(ldap_server, get_info=ldap3.ALL)
|
|
conn = ldap3.Connection(
|
|
server,
|
|
user=username_full,
|
|
password=password,
|
|
authentication=ldap3.SIMPLE
|
|
)
|
|
|
|
if conn.bind():
|
|
token = jwt.encode({
|
|
'user': username_full,
|
|
'domain': domain, # Incluindo o domínio no payload
|
|
'exp': datetime.utcnow() + timedelta(hours=1)
|
|
}, current_app.config['SECRET_KEY'], algorithm="HS256")
|
|
|
|
conn.unbind()
|
|
logging.info(f"Login bem-sucedido para usuário: {username_full}")
|
|
return jsonify({'msg': 'Login bem-sucedido', 'token': token, 'domain': domain}), 200
|
|
else:
|
|
conn.unbind()
|
|
logging.warning(f"Falha na autenticação LDAP para o usuário: {username_full}")
|
|
return jsonify({'msg': 'Falha na autenticação LDAP. Verifique credenciais.'}), 401
|
|
|
|
except LDAPSocketOpenError:
|
|
logging.error("Erro de conexão com o servidor LDAP.")
|
|
return jsonify({'msg': 'Não foi possível conectar ao servidor LDAP. Verifique o IP e a porta.'}), 503
|
|
except LDAPException as e:
|
|
logging.error(f"Erro LDAP: {str(e)}")
|
|
return jsonify({'msg': 'Erro durante a autenticação no servidor LDAP.'}), 500
|
|
except Exception as e:
|
|
logging.error(f"Erro inesperado: {str(e)}")
|
|
return jsonify({'msg': 'Erro inesperado durante a autenticação.'}), 500
|
|
|
|
|
|
|