# -*- coding: utf-8 -*- import os import json import re import ipaddress from datetime import datetime, timedelta, date import pandas as pd import plotly.express as px import plotly.graph_objects as go from pyzabbix import ZabbixAPI import time import locale import argparse # --- FUNÇÕES AUXILIARES --- def conectar_zabbix(server_url, api_token): """Conecta-se à API do Zabbix.""" try: zapi = ZabbixAPI(server_url) zapi.login(api_token=api_token) print(f"Conectado com sucesso à API do Zabbix na versão: {zapi.api_version()}") return zapi except Exception as e: print(f"Erro ao conectar ao Zabbix: {e}") return None def buscar_dados_zabbix(zapi, host_name, item_name, target_date): """Busca o histórico do item para uma data específica.""" try: host = zapi.host.get(filter={"host": host_name}, output=["hostid"]) if not host: print(f"Erro: Host '{host_name}' não encontrado.") return None hostid = host[0]['hostid'] item = zapi.item.get(filter={"name": item_name, "hostid": hostid}, output=["itemid", "value_type"]) if not item: print(f"Erro: Item '{item_name}' não encontrado no host '{host_name}'.") return None itemid = item[0]['itemid'] value_type = int(item[0]['value_type']) all_history = [] print(f"Iniciando busca de histórico para o dia {target_date.strftime('%Y-%m-%d')}...") for hora in range(24): inicio_hora = datetime.combine(target_date, datetime.min.time()) + timedelta(hours=hora) fim_hora = inicio_hora + timedelta(hours=1) - timedelta(seconds=1) time_from, time_till = int(inicio_hora.timestamp()), int(fim_hora.timestamp()) history_chunk = zapi.history.get( itemids=[itemid], time_from=time_from, time_till=time_till, history=value_type, output='extend', sortfield='clock', sortorder='ASC' ) if history_chunk: all_history.extend(history_chunk) time.sleep(0.1) print(f"Busca finalizada. {len(all_history)} registros de log encontrados.") return all_history except Exception as e: print(f"Erro ao buscar dados do Zabbix: {e}") return None def parsear_logs(history_data): """Analisa os dados brutos de log e extrai informações detalhadas.""" parsed_data = [] if not history_data: return pd.DataFrame() print("Analisando logs para extrair insights...") for record in history_data: log_content = record['value'].strip() last_brace = log_content.rfind('}') if last_brace != -1: log_content = log_content[:last_brace+1] match = re.search(r'\{.*\}', log_content) if not match: continue try: log_json = json.loads(match.group(0)) user = log_json.get('remote_user') or (re.search(r'nc_username=([^;]+)', log_json.get('http_cookie', '')).group(1) if 'nc_username=' in log_json.get('http_cookie', '') else None) if user and re.match(r'^[a-zA-Z0-9._-]+$', user): parsed_data.append({ 'remote_user': re.sub(r'@.*$', '', user.lower()), 'request_time': float(log_json.get('request_time', 0)), 'status': int(log_json.get('status', 0)), 'request_uri': log_json.get('request_uri', '/') }) except (json.JSONDecodeError, AttributeError, ValueError, TypeError): continue if not parsed_data: return pd.DataFrame() return pd.DataFrame(parsed_data) def gerar_relatorio_html(df, dia_relatorio_ymd): """Gera o relatório HTML final com os insights de performance e acesso.""" if df.empty: return f"
Nenhuma atividade válida foi encontrada.
" dia_obj = datetime.strptime(dia_relatorio_ymd, '%Y-%m-%d') dia_relatorio_display = dia_obj.strftime('%d de %B de %Y').capitalize() # --- Cálculos dos KPIs --- office_pattern = r'^/(?:m|x|we|o|p|wv|op|wd|rtc|rtc2|layouts|view)/' is_office_request = df['request_uri'].str.contains(office_pattern, na=False, regex=True) df_office = df[is_office_request] df_cloud = df[~is_office_request] avg_total = df['request_time'].mean() avg_office = df_office['request_time'].mean() if not df_office.empty else 0 avg_cloud = df_cloud['request_time'].mean() if not df_cloud.empty else 0 # Tiers de Velocidade Granulares req_imediata = df[df['request_time'] < 0.05].shape[0] req_rapida = df[(df['request_time'] >= 0.05) & (df['request_time'] < 0.2)].shape[0] req_aceitavel = df[(df['request_time'] >= 0.2) & (df['request_time'] < 0.5)].shape[0] req_lenta = df[(df['request_time'] >= 0.5) & (df['request_time'] < 2.0)].shape[0] req_muito_lenta = df[df['request_time'] >= 2.0].shape[0] # Análise de Erros 4xx df_4xx = df[df['status'].between(400, 499)] total_4xx = len(df_4xx) total_requests = len(df) erros_404 = len(df_4xx[df_4xx['status'] == 404]) erros_403 = len(df_4xx[df_4xx['status'] == 403]) outros_4xx = total_4xx - erros_404 - erros_403 top_404_uris = df_4xx[df_4xx['status'] == 404]['request_uri'].value_counts().head(5).reset_index() top_404_uris.columns = ['request_uri', 'count'] top_403_uris = df_4xx[df_4xx['status'] == 403]['request_uri'].value_counts().head(5).reset_index() top_403_uris.columns = ['request_uri', 'count'] top_outros_4xx_uris = df_4xx[~df_4xx['status'].isin([403, 404])]['request_uri'].value_counts().head(5).reset_index() top_outros_4xx_uris.columns = ['request_uri', 'count'] # --- Gráficos --- # Gráfico de Velocidade (Pizza com 5 tiers) speed_data = pd.DataFrame({ 'Categoria': ['Imediata (<50ms)', 'Rápida (50-200ms)', 'Aceitável (200-500ms)', 'Lenta (0.5-2s)', 'Muito Lenta (>2s)'], 'Count': [req_imediata, req_rapida, req_aceitavel, req_lenta, req_muito_lenta] }) fig_speed = px.pie(speed_data, values='Count', names='Categoria', title='Classificação de Velocidade', color_discrete_map={ 'Imediata (<50ms)': '#10B981', 'Rápida (50-200ms)': '#84cc16', 'Aceitável (200-500ms)': '#F59E0B', 'Lenta (0.5-2s)': '#f97316', 'Muito Lenta (>2s)': '#EF4444' }, category_orders={'Categoria': ['Imediata (<50ms)', 'Rápida (50-200ms)', 'Aceitável (200-500ms)', 'Lenta (0.5-2s)', 'Muito Lenta (>2s)']} ) fig_speed.update_traces(textposition='inside', textinfo='percent', sort=False, hovertemplate='%{label}{dia_relatorio_display}
Média Total
{avg_total:.3f}s
Média Cloud (Genérico)
{avg_cloud:.3f}s
Média Office Online
{avg_office:.3f}s
Não Encontrado (404)
{erros_404}
Acesso Bloqueado (403)
{erros_403}
Outros Erros 4xx
{outros_4xx}
| URI | Count |
|---|
| URI | Count |
|---|
| URI | Count |
|---|