testes/python/server.py

695 lines
26 KiB
Python

from flask import Flask, request, jsonify, send_from_directory
from flask_mysqldb import MySQL
from MySQLdb.cursors import DictCursor
import ldap3
import re
import requests
import os
import json
from functools import wraps
from collections import OrderedDict
from dotenv import load_dotenv
import jwt # Importa a biblioteca JWT para gerar e validar tokens
from datetime import datetime, timedelta
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPException # Importa as exceções específicas de ldap3
from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect # Importa CSRFProtect para proteção CSRF
from flask_talisman import Talisman
import influxdb_client, time
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import ldap3
# Carregar variáveis de ambiente do arquivo .env
load_dotenv()
app = Flask(__name__)
# Configuração de cookies seguros
app.config['SESSION_COOKIE_SECURE'] = True # Cookies serão enviados apenas em conexões HTTPS
app.config['SESSION_COOKIE_HTTPONLY'] = True # Cookies não estarão acessíveis via JavaScript
Talisman(app)
# Configuração da chave secreta necessária para CSRF e JWT
app.config['SECRET_KEY'] = os.getenv('JWT_SECRET') or 'sua_chave_secreta' # Defina uma chave secreta segura
# Inicializa a proteção CSRF
csrf = CSRFProtect(app)
# Permitir apenas uma origem específica
CORS(app, resources={r"/*": {"origins": "https://dev.itguys.com.br"}})
# Configuração do MySQL usando variáveis de ambiente
app.config['MYSQL_HOST'] = os.getenv('MYSQL_HOST')
app.config['MYSQL_USER'] = os.getenv('MYSQL_USER')
app.config['MYSQL_PASSWORD'] = os.getenv('MYSQL_PASSWORD')
app.config['MYSQL_DB'] = os.getenv('MYSQL_DB')
#Configuração do SMTP usando variáveis de ambiente
app.config['SMTP_USERNAME'] = os.getenv('SMTP_USERNAME')
app.config['SMTP_HOST'] = os.getenv('SMTP_HOST')
app.config['SMTP_PASSWORD'] = os.getenv('SMTP_PASSWORD')
app.config['SMTP_PORT'] = os.getenv('SMTP_PORT')
app.config['SMTP_RECIPIENT'] = os.getenv('SMTP_RECIPIENT')
mysql = MySQL(app)
def token_required(f):
@wraps(f) # Adiciona wraps aqui para manter o nome da função original
def wrapper(*args, **kwargs):
token = request.headers.get('x-access-token')
if not token:
return jsonify({'msg': 'Token é necessário!'}), 401
try:
# Decodifica o token para validar
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# O token expirou
return jsonify({'msg': 'Token expirado! Por favor, faça login novamente.'}), 401
except jwt.InvalidTokenError:
# Token inválido por qualquer outro motivo
return jsonify({'msg': 'Token inválido!'}), 401
return f(*args, **kwargs)
return wrapper
def get_influxdb_config(domain, mysql):
# Consultar o banco de dados para obter o nome da empresa com base no domínio
cur = mysql.connection.cursor()
cur.execute("SELECT nome FROM empresa WHERE dominio = %s", (domain,))
empresa_result = cur.fetchone()
cur.close()
# Verifica se a empresa foi encontrada
if empresa_result is None:
raise ValueError('Empresa não encontrada no banco de dados')
# Obtém o nome da empresa (empresa_result é uma tupla)
nome_empresa = empresa_result[0]
# Definir o caminho do diretório onde os arquivos JSON estão armazenados
json_directory = './json/' # Substitua pelo caminho real do diretório
# Nome do arquivo JSON correspondente ao nome da empresa
json_filename = f"{nome_empresa}.json"
# Caminho completo para o arquivo JSON
json_filepath = os.path.join(json_directory, json_filename)
# Verificar se o arquivo JSON existe
if not os.path.isfile(json_filepath):
raise FileNotFoundError('Arquivo JSON não encontrado para a empresa fornecida')
# Carregar o conteúdo do arquivo JSON mantendo a ordem
with open(json_filepath, 'r') as json_file:
json_data = json.load(json_file, object_pairs_hook=OrderedDict)
return json_data # Retorna todo o JSON carregado
@app.route('/login', methods=['POST'])
@csrf.exempt # Endpoints de autenticação como este podem ser excluídos da proteção CSRF, remova se quiser aplicar.
def login():
data = request.get_json()
username_full = data.get('username') # Extrai o e-mail completo do usuário dos dados da requisição.
password = data.get('password') # Extrai a senha dos dados da requisição.
# Verificação básica para detecção de padrões de SQL Injection
sql_injection_pattern = r"(--|\b(SELECT|UNION|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b|;|'|\")"
# Checar se o username_full ou o password contêm padrões suspeitos
if re.search(sql_injection_pattern, username_full, re.IGNORECASE) or re.search(sql_injection_pattern, password, re.IGNORECASE):
return jsonify({'msg': 'A solicitação foi bem formada, mas não pôde ser atendida devido a erros semânticos.'}), 422 # Retorna o status 422
# Validação do formato do e-mail
if not re.match(r"[^@]+@[^@]+\.[^@]+", username_full):
return jsonify({'msg': 'Formato de e-mail inválido'}), 400
# Extrair o nome de usuário (parte antes do @)
username = username_full.split('@')[0]
# Separar o domínio
domain = username_full.split('@')[1]
# Verificar se o domínio existe na tabela empresa
cur = mysql.connection.cursor()
cur.execute("SELECT idempresa, ip_dominio FROM empresa WHERE dominio = %s", (domain,))
empresa_result = cur.fetchone()
if empresa_result is None: # Verifica se o domínio não existe na tabela empresa.
cur.close()
return jsonify({'msg': 'Domínio não encontrado no banco de dados'}), 404
id_empresa, ip_dominio = empresa_result # Armazena o ID da empresa e o IP do domínio encontrados no banco de dados.
# Verificar se o usuário está associado à empresa no MySQL usando a view
cur.execute("SELECT empresa_id, dominio_empresa FROM view_usuario_empresa WHERE usuario = %s AND dominio_empresa = %s", (username, domain))
result = cur.fetchone()
cur.close()
if result is None: # Verifica se o usuário não está associado à empresa.
return jsonify({'msg': 'Usuário não associado à empresa'}), 404
empresa_id, dominio_empresa = result
# Conectar ao servidor LDAP correspondente e autenticar o usuário
ldap_server = f'ldap://{ip_dominio}:389'
try:
# Configurar o servidor LDAP
server = ldap3.Server(ldap_server)
# Tentar criar a conexão
conn = ldap3.Connection(server, user=username_full, password=password)
# Tentar autenticar o usuário
if conn.bind():
# Gerar o token JWT após autenticação bem-sucedida
token = jwt.encode({
'user': username_full,
'exp': datetime.utcnow() + timedelta(hours=1) # Token expira em 1 hora
}, app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({'msg': 'Login bem-sucedido', 'token': token}), 200
else:
return jsonify({'msg': 'Falha na autenticação LDAP'}), 401
except LDAPSocketOpenError:
# Erro específico para problemas de conexão com o servidor LDAP
return jsonify({'msg': 'Não foi possível conectar ao servidor LDAP. Verifique a conexão de rede e as configurações do servidor.'}), 503
except LDAPException as e:
# Captura qualquer outra exceção LDAP genérica
return jsonify({'Erro 500': f'Erro LDAP: {str(e)}'}), 500
@app.route('/mounting', methods=['GET'])
@token_required
def mounting():
# Decodifica o token para obter o e-mail do usuário
token = request.headers.get('x-access-token')
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
username_full = data['user']
# Extrair o domínio do email do usuário autenticado
domain = username_full.split('@')[1]
# Verifica se o usuário está autenticado
if not username_full:
return jsonify({'msg': 'Token inválido!'}), 401
# Consultar o banco de dados para obter o nome da empresa com base no domínio
cur = mysql.connection.cursor()
cur.execute("SELECT nome FROM empresa WHERE dominio = %s", (domain,))
empresa_result = cur.fetchone()
cur.close()
# Verifica se a empresa foi encontrada
if empresa_result is None:
return jsonify({'msg': 'Empresa não encontrada no banco de dados para o domínio fornecido'}), 404
# Obtém o nome da empresa
nome_empresa = empresa_result[0]
# Definir o caminho do diretório onde os arquivos JSON estão armazenados
json_directory = './json/' # Substitua pelo caminho real do diretório
# Nome do arquivo JSON correspondente ao nome da empresa
json_filename = f"{nome_empresa}.json"
# Caminho completo para o arquivo JSON
json_filepath = os.path.join(json_directory, json_filename)
# Verificar se o arquivo JSON existe
if not os.path.isfile(json_filepath):
return jsonify({'msg': 'Arquivo JSON não encontrado para a empresa fornecida'}), 404
# Carregar o conteúdo do arquivo JSON mantendo a ordem
with open(json_filepath, 'r') as json_file:
json_data = json.load(json_file, object_pairs_hook=OrderedDict)
# Extrair apenas o conteúdo de 'info_html'
info_html_content = json_data.get('info_html', {})
# Retornar a resposta JSON com apenas o conteúdo de 'info_html'
return jsonify(info_html_content)
@app.route('/api/options', methods=['GET'])
@token_required
@csrf.exempt
def get_options():
client = None
try:
# Decodifica o token para obter o e-mail do usuário
token = request.headers.get('x-access-token')
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
username_full = data['user']
# Extrair o domínio do email do usuário autenticado
domain = username_full.split('@')[1]
# Carregar as configurações do InfluxDB a partir do JSON
json_data = get_influxdb_config(domain, mysql)
# Extrair o token de 'config_influxdb'
config_influxdb = json_data.get('config_influxdb', [])
if not config_influxdb or not isinstance(config_influxdb, list):
return jsonify({'msg': 'config_influxdb não encontrado ou inválido'}), 404
# Acessa o primeiro objeto da lista e obtém os detalhes necessários
influxdb_token = config_influxdb[0].get('Token', '')
influxdb_org = config_influxdb[0].get('org', '')
influxdb_url = config_influxdb[0].get('url', '')
# Verifica se todos os dados necessários foram encontrados
if not influxdb_token:
return jsonify({'msg': 'Token não encontrado em config_influxdb'}), 404
if not influxdb_org:
return jsonify({'msg': 'Org não encontrado em config_influxdb'}), 404
if not influxdb_url:
return jsonify({'msg': 'URL não encontrado em config_influxdb'}), 404
# Inicializa o cliente InfluxDB
client = influxdb_client.InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org)
query_api = client.query_api()
# Consulta para listar os buckets disponíveis
buckets_query = '''
buckets()
'''
buckets_result = query_api.query(org=influxdb_org, query=buckets_query)
# Processa a lista de buckets
buckets = []
for table in buckets_result:
for record in table.records:
bucket_name = record.values.get('name') # Acessa diretamente o nome do bucket
if bucket_name:
buckets.append(bucket_name)
if len(buckets) == 0:
return jsonify({'msg': 'Nenhum bucket encontrado'}), 404
# Para cada bucket, listar os nodenames diretamente
nodenames = {}
for bucket in buckets:
nodenames_query = f"""
from(bucket: \"{bucket}\")
|> range(start: -1d)
|> keep(columns: [\"nodename\"])
|> distinct(column: \"nodename\")
"""
nodenames_result = query_api.query(org=influxdb_org, query=nodenames_query)
bucket_nodenames = []
for table in nodenames_result:
for record in table.records:
if record.get_value() is not None: # Ignorar valores nulos
bucket_nodenames.append(record.get_value())
nodenames[bucket] = bucket_nodenames
# Retorna a lista de buckets e nodenames como resposta JSON
return jsonify({
"buckets": buckets,
"nodenames": nodenames
})
except Exception as e:
return jsonify({'msg': 'Erro ao recuperar opções', 'error': str(e)}), 500
finally:
if client is not None:
client.close()
@app.route('/api/execute_all_queries', methods=['POST'])
@token_required
@csrf.exempt
def execute_all_queries():
client = None
try:
# Decodifica o token para obter o e-mail do usuário
token = request.headers.get('x-access-token')
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
username_full = data['user']
domain = username_full.split('@')[1]
# Carregar as configurações do InfluxDB e queries a partir do JSON
json_data = get_influxdb_config(domain, mysql)
# Extrair as informações da requisição
request_data = request.get_json()
bucket = request_data.get('bucket')
host = request_data.get('host')
start_time = request_data.get('startTime')
end_time = request_data.get('endTime')
window_period = request_data.get('windowPeriod')
# Inicializa o cliente InfluxDB
influxdb_token = json_data['config_influxdb'][0]['Token']
influxdb_org = json_data['config_influxdb'][0]['org']
influxdb_url = json_data['config_influxdb'][0]['url']
client = influxdb_client.InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org)
query_api = client.query_api()
# Lista para armazenar os resultados de cada query
all_results = []
# Itera sobre todas as queries no JSON
for query_data in json_data['queries']:
# Substitui as variáveis na query
query_template = query_data['query']
query = query_template.replace("${Bucket}", bucket).replace("${server}", host)
query = query.replace("v.timeRangeStart", start_time).replace("v.timeRangeStop", end_time)
query = query.replace("v.windowPeriod", window_period)
# Executa a query
result = query_api.query(org=influxdb_org, query=query)
# Processa os resultados da query
output = []
for table in result:
for record in table.records:
output.append({
"time": record.get_time(),
"value": record.get_value(),
"measurement": record.get_measurement(),
"tags": record.values # Inclui as tags relacionadas ao registro
})
# Adiciona os resultados processados à lista de todas as queries
all_results.append({
"uid": query_data['uid'],
"name": query_data['query_name'],
"result": output
})
# Retorna o resultado de todas as queries
return jsonify(all_results)
except Exception as e:
return jsonify({'msg': 'Erro ao executar as queries', 'error': str(e)}), 500
finally:
if client:
client.close()
@app.route('/api/execute_query/<uid>', methods=['POST'])
@token_required
@csrf.exempt
def execute_query(uid):
client = None
try:
# Decodifica o token para obter o e-mail do usuário
token = request.headers.get('x-access-token')
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
username_full = data['user']
domain = username_full.split('@')[1]
# Carregar as configurações de InfluxDB e queries a partir do JSON
json_data = get_influxdb_config(domain, mysql)
# Extrair as informações da requisição
request_data = request.get_json()
bucket = request_data.get('bucket')
host = request_data.get('host')
start_time = request_data.get('startTime')
end_time = request_data.get('endTime')
window_period = request_data.get('windowPeriod')
# Encontra a query específica com o UID fornecido
query_data = next((q for q in json_data['queries'] if q['uid'] == uid), None)
if not query_data:
return jsonify({'msg': 'Consulta não encontrada'}), 404
# Substitui as variáveis na query
query_template = query_data['query']
query = query_template.replace("${Bucket}", bucket).replace("${server}", host)
query = query.replace("v.timeRangeStart", start_time).replace("v.timeRangeStop", end_time)
query = query.replace("v.windowPeriod", window_period)
# Inicializa o cliente InfluxDB
influxdb_token = json_data['config_influxdb'][0]['Token']
influxdb_org = json_data['config_influxdb'][0]['org']
influxdb_url = json_data['config_influxdb'][0]['url']
client = influxdb_client.InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org)
query_api = client.query_api()
# Executa a query
result = query_api.query(org=influxdb_org, query=query)
# Processa os resultados da query
output = []
for table in result:
for record in table.records:
output.append({
"time": record.get_time(),
"value": record.get_value(),
"measurement": record.get_measurement(),
"tags": record.values # Inclui as tags relacionadas ao registro
})
# Retorna o resultado da consulta específica
return jsonify({
"uid": query_data['uid'],
"name": query_data['query_name'],
"result": output
})
except Exception as e:
return jsonify({'msg': 'Erro ao executar a query', 'error': str(e)}), 500
finally:
if client:
client.close()
# Função para buscar as configurações dos servidores Zabbix do banco de dados
# Ajustar o cursor para usar DictCursor
def get_zabbix_configs():
cursor = mysql.connection.cursor(DictCursor) # Usando DictCursor para retornar dicionários
query = "SELECT * FROM connections_zabbix"
cursor.execute(query)
configs = cursor.fetchall() # Agora retorna uma lista de dicionários
cursor.close()
return configs
# Função para autenticar no Zabbix via API JSON-RPC
def zabbix_login(url, username, password):
headers = {'Content-Type': 'application/json-rpc'}
payload = {
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": username,
"password": password
},
"id": 1,
"auth": None
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
result = response.json()
if 'result' in result:
return result['result'] # Retorna o token de autenticação
else:
raise Exception(f"Erro na autenticação: {result}")
def get_zabbix_host_groups(auth_token, url):
headers = {'Content-Type': 'application/json-rpc'}
payload = {
"jsonrpc": "2.0",
"method": "hostgroup.get",
"params": {
"output": ["groupid", "name"], # Retorna o ID e o nome dos grupos
"real_hosts": True # Apenas grupos com hosts reais, excluindo templates
},
"auth": auth_token,
"id": 1
}
# Faz a requisição para a API do Zabbix
response = requests.post(url, headers=headers, data=json.dumps(payload))
# Captura e retorna o resultado da resposta
return response.json().get("result", [])
# Função para buscar os hosts de um grupo no Zabbix
def get_zabbix_hosts(auth_token, url, group_id):
headers = {'Content-Type': 'application/json-rpc'}
payload = {
"jsonrpc": "2.0",
"method": "host.get",
"params": {
"groupids": group_id,
"output": ["hostid", "name"] # Retorna o ID e o nome dos hosts
},
"auth": auth_token,
"id": 2
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json().get("result", [])
# Função para buscar os itens de um host no Zabbix
def get_zabbix_items(auth_token, url, host_id):
headers = {'Content-Type': 'application/json-rpc'}
payload = {
"jsonrpc": "2.0",
"method": "item.get",
"params": {
"hostids": host_id,
"output": ["itemid", "name", "key_"] # Retorna o ID, nome e chave dos itens
},
"auth": auth_token,
"id": 3
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json().get("result", [])
# Função para buscar o valor de um item específico no Zabbix
def get_item_data(auth_token, url, item_id):
headers = {'Content-Type': 'application/json-rpc'}
payload = {
"jsonrpc": "2.0",
"method": "history.get",
"params": {
"output": "extend",
"history": 0, # Tipo 0 para dados numéricos
"itemids": item_id,
"sortfield": "clock", # Ordena pelos mais recentes
"sortorder": "DESC",
"limit": 10 # Limite de 10 resultados
},
"auth": auth_token,
"id": 4
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json().get("result", [])
@app.route('/zabbix/servers', methods=['GET'])
@token_required
def get_zabbix_servers():
try:
# Busca as configurações do banco de dados
zabbix_configs = get_zabbix_configs() # Certifique-se de que essa função está retornando dados válidos
# Verifique se não há servidores Zabbix
if not zabbix_configs:
return jsonify({"message": "Nenhum servidor Zabbix encontrado"}), 404
# Formata os servidores para o frontend
servers = []
for config in zabbix_configs:
servers.append({
"name": config['name'], # Acessa a coluna 'name'
"type": config['type'], # Acessa a coluna 'type'
"url": config['url'] # Acessa a coluna 'url'
})
return jsonify(servers)
except Exception as e:
# Mostra o erro exato que está ocorrendo para facilitar o debug
return jsonify({"error": str(e)}), 500
@app.route('/zabbix/<server_name>/groups', methods=['GET'])
@token_required
def get_host_groups(server_name):
try:
# Busca as configurações do banco de dados para o servidor Zabbix com o nome "server_name"
zabbix_configs = get_zabbix_configs()
config = next((conf for conf in zabbix_configs if conf['name'] == server_name), None)
if config is None:
return jsonify({"error": "Servidor Zabbix não encontrado"}), 404
# Autentica no servidor Zabbix
auth_token = zabbix_login(config['url'], config['username'], config['password'])
# Busca os grupos de hosts reais, excluindo os templates
groups = get_zabbix_host_groups(auth_token, config['url'])
return jsonify(groups)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Rota para listar os hosts de um grupo específico
@app.route('/zabbix/<server_name>/groups/<group_id>/hosts', methods=['GET'])
@token_required
def get_hosts_by_group(server_name, group_id):
try:
# Busca as configurações do servidor Zabbix no banco de dados
zabbix_configs = get_zabbix_configs()
config = next((conf for conf in zabbix_configs if conf['name'] == server_name), None)
if config is None:
return jsonify({"error": "Servidor Zabbix não encontrado"}), 404
# Autentica no servidor Zabbix
auth_token = zabbix_login(config['url'], config['username'], config['password'])
# Busca os hosts do grupo
hosts = get_zabbix_hosts(auth_token, config['url'], group_id)
return jsonify(hosts)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Rota para listar os itens de um host específico
@app.route('/zabbix/<server_name>/hosts/<host_id>/items', methods=['GET'])
@token_required
def get_items_by_host(server_name, host_id):
try:
# Busca as configurações do servidor Zabbix no banco de dados
zabbix_configs = get_zabbix_configs()
config = next((conf for conf in zabbix_configs if conf['name'] == server_name), None)
if config is None:
return jsonify({"error": "Servidor Zabbix não encontrado"}), 404
# Autentica no servidor Zabbix
auth_token = zabbix_login(config['url'], config['username'], config['password'])
# Busca os itens do host
items = get_zabbix_items(auth_token, config['url'], host_id)
return jsonify(items)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Rota para retornar os dados de um item específico
@app.route('/zabbix/<server_name>/items/<item_id>/data', methods=['GET'])
@token_required
def get_item_history(server_name, item_id):
try:
# Busca as configurações do servidor Zabbix no banco de dados
zabbix_configs = get_zabbix_configs()
config = next((conf for conf in zabbix_configs if conf['name'] == server_name), None)
if config is None:
return jsonify({"error": "Servidor Zabbix não encontrado"}), 404
# Autentica no servidor Zabbix
auth_token = zabbix_login(config['url'], config['username'], config['password'])
# Busca os dados do item
item_data = get_item_data(auth_token, config['url'], item_id)
return jsonify(item_data)
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5000, ssl_context=('./fullchain1.pem', './privkey1.pem'))