# process_log.py # Autor: Gemini (assistente de IA) # Data: 26 de setembro de 2025, 22:08 # Descrição: Script para processamento de logs NGINX. # Versão 14.1 (Correção Final): # - Removido filtro 'Tipo de Cliente'. Agora exibe os gráficos Web e App simultaneamente. # - Corrigido o cabeçalho do relatório para remover permanentemente o caminho do arquivo. # - Lógica de geração de gráficos mantida, apenas a exibição no HTML foi corrigida. import csv import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates import seaborn as sns import io import base64 from datetime import datetime import json import re import numpy as np from scipy.interpolate import make_interp_spline # --- CONFIGURAÇÃO E MAPEAMENTOS --- DAY_MAP_PT = {'Monday': 'Segunda-feira', 'Tuesday': 'Terça-feira', 'Wednesday': 'Quarta-feira', 'Thursday': 'Quinta-feira', 'Friday': 'Sexta-feira', 'Saturday': 'Sábado', 'Sunday': 'Domingo'} ACTION_CATEGORIES = { 'Ativo': ['Edição de Documento', 'Sincronização (Upload/Download)', 'Upload (Web/Outros)', 'Download/Visualização (Web)', 'Criação de Pasta', 'Exclusão de Arquivo/Pasta', 'Mover/Renomear Arquivo'], 'Online': ['Navegação', 'Atividade em Segundo Plano (Web)'], 'Outros': ['Sincronização (Verificação)', 'Interação Web (Formulários, etc.)', 'Requisição Inválida'] } CATEGORY_COLORS = {"Ativo": "#3498db", "Online": "#2ecc71", "Outros": "#f1c40f"} DETAILED_LEGEND_HTML = f"""

Legenda das Categorias:

""" # --- FUNÇÕES DE ANÁLISE E INFERÊNCIA --- def get_action_details(req_method: str, uri: str, user_agent: str) -> tuple[str, str, str]: ua = user_agent.lower() action, client_type = 'Navegação', 'Web' if 'mirall' in ua or 'nextcloud-desktop' in ua: client_type = 'App'; action = 'Sincronização (Verificação)' if req_method in ['PUT', 'GET']: action = 'Sincronização (Upload/Download)' elif 'nextcloud-android' in ua or 'nextcloud-ios' in ua: client_type = 'App' elif req_method == 'POST' and '/we/' in uri: action = 'Edição de Documento' elif req_method == 'PUT': action = 'Upload (Web/Outros)' elif req_method == 'GET' and '/remote.php/dav/files/' in uri: action = 'Download/Visualização (Web)' elif req_method == 'DELETE': action = 'Exclusão de Arquivo/Pasta' elif req_method == 'MKCOL': action = 'Criação de Pasta' elif req_method == 'MOVE': action = 'Mover/Renomear Arquivo' elif req_method == 'GET' and '/ocs/v2.php' in uri: action = 'Atividade em Segundo Plano (Web)' elif req_method == 'POST': action = 'Interação Web (Formulários, etc.)' elif req_method not in ['GET', 'HEAD']: action = f'Outra ({req_method})' for category, actions in ACTION_CATEGORIES.items(): if action in actions: return action, category, client_type return action, 'Outros', client_type def parse_line(row: list) -> dict | None: try: if len(row) < 13: return None timestamp = datetime.fromisoformat(row[0][:-6]) user_agent, request = row[2], row[9] req_method = request.split(' ')[0] uri = request.split(' ')[1] if len(request.split(' ')) > 1 else '' action, category, client_type = get_action_details(req_method, uri, user_agent) return { 'timestamp': timestamp, 'username': (re.search(r'nc_username=([^;]+)', row[3]).group(1).strip() if 'nc_username' in row[3] else None), 'category': category, 'client_type': client_type } except (ValueError, TypeError, IndexError): return None def generate_activity_scatter_chart(df: pd.DataFrame) -> str: print("Gerando gráfico de dispersão...") if df.empty: return "" scatter_data = df.groupby([df['timestamp'].dt.floor('H'), 'category']).size().reset_index(name='counts') fig, ax = plt.subplots(figsize=(12, 6)) max_count = scatter_data['counts'].max() size_factor = 2000 / max_count if max_count > 0 else 1 for category, color in CATEGORY_COLORS.items(): subset = scatter_data[scatter_data['category'] == category] if not subset.empty: ax.scatter(subset['timestamp'], subset['category'], s=subset['counts'] * size_factor, c=color, label=category, alpha=0.6, edgecolors="w", linewidth=0.5) ax.set_title('Nuvens de Atividade por Categoria e Tempo', fontsize=16) ax.xaxis.set_major_formatter(mdates.DateFormatter('%d/%m %Hh')) ax.grid(axis='x', linestyle='--', alpha=0.5); ax.legend(title='Categorias', bbox_to_anchor=(1.05, 1), loc='upper left', markerscale=0.5) plt.tight_layout(); img_buffer = io.BytesIO() plt.savefig(img_buffer, format='png', dpi=100); plt.close(fig); img_buffer.seek(0) return base64.b64encode(img_buffer.getvalue()).decode('utf-8') def generate_visualizations(df: pd.DataFrame) -> tuple[str, dict, list]: print("Gerando visualizações de usuário...") if df.empty: return "", {}, [] df['day_dt'] = pd.to_datetime(df['timestamp'].dt.date); df['hour'] = df['timestamp'].dt.hour df['weekday'] = df['day_dt'].dt.day_name().map(DAY_MAP_PT); all_weekdays = list(DAY_MAP_PT.values()) overall_heatmap_b64, agg_heatmaps, daily_timelines = "", {}, [] heatmap_data = df.groupby(['weekday', 'hour']).size().unstack(fill_value=0).reindex(index=all_weekdays, columns=range(24), fill_value=0) fig, ax = plt.subplots(figsize=(12, 5)); sns.heatmap(heatmap_data, annot=False, cmap='viridis', ax=ax, cbar_kws={'label': 'Nº de Ações'}) ax.set_title('Padrão de Atividade Agregado (Todos os Colaboradores)', fontsize=12); plt.tight_layout(); img_buffer = io.BytesIO() plt.savefig(img_buffer, format='png', dpi=90); plt.close(fig); img_buffer.seek(0) overall_heatmap_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8') for user, user_df in df.groupby('username'): user_agg_data = user_df.groupby(['weekday', 'hour']).size().unstack(fill_value=0).reindex(index=all_weekdays, columns=range(24), fill_value=0) fig, ax = plt.subplots(figsize=(12, 5)); sns.heatmap(user_agg_data, annot=False, cmap='viridis', ax=ax, cbar_kws={'label': 'Nº de Ações'}) ax.set_title(f'Padrão de Atividade Agregado - {user}', fontsize=12); plt.tight_layout(); img_buffer_agg = io.BytesIO() plt.savefig(img_buffer_agg, format='png', dpi=90); plt.close(fig); img_buffer_agg.seek(0) agg_heatmaps[user] = base64.b64encode(img_buffer_agg.getvalue()).decode('utf-8') for (user, day), group in df.groupby(['username', df['day_dt'].dt.date]): for client_type in ['Web', 'App']: data = group[group['client_type'] == client_type].groupby(['hour', 'category']).size().unstack(fill_value=0).reindex(columns=ACTION_CATEGORIES.keys(), fill_value=0).reindex(index=range(24), fill_value=0) if not data.empty and data.sum().sum() > 0: fig, ax = plt.subplots(figsize=(10, 4)) for category in data.columns: x, y = data.index, data[category] if y.sum() > 0: x_smooth = np.linspace(x.min(), x.max(), 300); spl = make_interp_spline(x, y, k=3); y_smooth = spl(x_smooth) ax.plot(x_smooth, y_smooth, color=CATEGORY_COLORS.get(category), label=category) ax.plot(x, y, 'o', color=CATEGORY_COLORS.get(category), markersize=4) ax.set_title(f'Timeline de Atividade {client_type.upper()} - {user} em {day.strftime("%d/%m/%Y")}', fontsize=12) ax.set_xlabel('Hora do Dia'); ax.set_ylabel('Intensidade (Nº de Ações)'); ax.grid(axis='y', linestyle='--', alpha=0.7); ax.legend(title='Categorias', loc='upper left') ax.set_xticks(range(0, 25, 2)); plt.tight_layout(); img_buffer_daily = io.BytesIO(); plt.savefig(img_buffer_daily, format='png', dpi=100); plt.close(fig); img_buffer_daily.seek(0) daily_timelines.append({"user": user, "day": day.isoformat(), "client_type": client_type, "chart_b64": base64.b64encode(img_buffer_daily.getvalue()).decode('utf-8')}) return overall_heatmap_b64, agg_heatmaps, daily_timelines def main(log_file_path: str): print(f"Iniciando processamento do arquivo: {log_file_path}") all_log_entries = [] total_lines = 0 with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as f: reader = csv.reader(f, delimiter=',', quotechar='"') try: next(reader); total_lines = 1 except StopIteration: pass for row in reader: total_lines += 1; parsed = parse_line(row) if parsed and parsed['username']: all_log_entries.append(parsed) df = pd.DataFrame(all_log_entries) parsed_lines = len(df); print(f"\n--- Processamento Concluído ---\nLinhas Lidas: {total_lines:,} | Requisições Válidas com Usuário: {parsed_lines:,}") scatter_chart_b64, overall_heatmap_b64, agg_heatmaps, daily_timelines = "", "", {}, [] if not df.empty: scatter_chart_b64 = generate_activity_scatter_chart(df) overall_heatmap_b64, agg_heatmaps, daily_timelines = generate_visualizations(df) generate_html_report(scatter_chart_b64, overall_heatmap_b64, agg_heatmaps, daily_timelines, total_lines, parsed_lines) def generate_html_report(scatter_chart_b64, overall_heatmap_b64, agg_heatmaps, daily_timelines, total_lines, parsed_lines): print("\nGerando relatório HTML final...") activity_map = {}; user_daily_html_parts = [] for item in daily_timelines: activity_map.setdefault(item['user'], []).append(item['day']) user_agg_html = "".join([f'' for user, b64 in agg_heatmaps.items()]) for item in daily_timelines: user_daily_html_parts.append(f'') user_daily_html = "".join(user_daily_html_parts) generation_time = datetime.now().strftime('%d de %B de %Y, %H:%M:%S') html_template = f""" Painel de Atividade

Painel de Atividade nos Sistemas

Gerado em: {generation_time}

Resumo de Atividade

Gráfico de Dispersão

Análise de Atividade Individual

Heatmap Geral
{user_agg_html}{user_daily_html}
{DETAILED_LEGEND_HTML}
""" with open('painel_de_atividade_final.html', 'w', encoding='utf-8') as f: f.write(html_template) print("\nRelatório final 'Painel de Atividade' salvo com sucesso!") if __name__ == "__main__": LOG_FILE = r'C:\Users\joao.goncalves\Documents\Relatorio FDP\nginx_logs_final.csv' main(LOG_FILE)