#!/usr/bin/env python3 import os import sys import argparse import subprocess import socket import shutil import platform import tarfile import time from datetime import datetime # Tenta importar syslog (Apenas Linux) try: import syslog SYSLOG_AVAILABLE = True except ImportError: SYSLOG_AVAILABLE = False # Tenta importar paramiko (Apenas para Cliente Windows) try: import paramiko PARAMIKO_AVAILABLE = True except ImportError: PARAMIKO_AVAILABLE = False # ============================================================================== # CONFIGURAÇÕES TÉCNICAS # ============================================================================== # Credenciais SERVER_HOST = "172.17.0.253" SERVER_USER = "itguys" PASSWORD = "vR7Ag$Pk" # Caminhos (Linux Server) NGINX_CONF_DIR = "/etc/nginx" FAIL2BAN_CONF_DIR = "/etc/fail2ban" TMP_SYNC_BASE = "/tmp/pathfinder_sync" LOG_DIR = "/var/log/nginx" # Endereço IP Público do Host de Produção (para validação DNS) HOST_PUBLIC_IP = "177.104.182.28" # ============================================================================== # UTILITÁRIOS DE SISTEMA E AUDITORIA (SERVER-SIDE) # ============================================================================== def log_syslog(task, function, details=""): """Registra a ação no Syslog para auditoria (Apenas Linux).""" if not SYSLOG_AVAILABLE: print(f"[*] [SYSLOG_MOCK] {task} | {function} | {details}") return try: hostname = socket.gethostname() remote_ip = os.environ.get('SSH_CLIENT', 'localhost').split()[0] identity = f"PathfinderAutomator[{task}]" msg = f"Task: {task} | Func: {function} | From: {remote_ip} | Host: {hostname} | Details: {details}" syslog.openlog(ident=identity, facility=syslog.LOG_AUTHPRIV) syslog.syslog(syslog.LOG_INFO, msg) print(f"[*] [SYSLOG] {msg}") except Exception as e: print(f"[!] Erro ao registrar no syslog: {e}") def run_sudo(cmd, input_data=None, capture=True): """Executa comando com sudo e retorna (rc, stdout, stderr).""" full_cmd = ['sudo', '-S'] + cmd p = subprocess.Popen(full_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE if capture else None, stderr=subprocess.PIPE if capture else None, text=True) stdin_input = (PASSWORD + '\n') if input_data: stdin_input += input_data out, err = p.communicate(input=stdin_input) return p.returncode, out, err def check_nginx(): """Valida a sintaxe do Nginx.""" rc, out, err = run_sudo(['nginx', '-t']) return rc == 0, err # ============================================================================== # LÓGICA DE BACKUP E ROLLBACK ATÔMICO (SERVER-SIDE) # ============================================================================== BACKUP_MAP = {} # Rastreia arquivos alterados para rollback def backup_file(target_path): """Cria um backup .pathfinder_bak antes de qualquer alteração.""" if not os.path.exists(target_path): return None bak_path = f"{target_path}.pathfinder_bak" log_syslog("BACKUP", "backup_file", f"Backup de {target_path} para {bak_path}") rc, _, _ = run_sudo(['cp', '-rp', target_path, bak_path]) if rc == 0: BACKUP_MAP[target_path] = bak_path return bak_path return None def rollback_all(): """Restaura todos os backups registrados caso ocorra um erro.""" log_syslog("ROLLBACK", "rollback_all", "Iniciando restauração de emergência.") for target, bak in BACKUP_MAP.items(): print(f"[!] Restaurando {target}...") run_sudo(['rm', '-rf', target]) run_sudo(['cp', '-rp', bak, target]) print("[*] Rollback concluído. Testando Nginx novamente...") check_nginx() # ============================================================================== # AUXILIARES DE REDE (DNS/IP/SSL) (SERVER-SIDE) # ============================================================================== def get_public_ip(): """Tenta descobrir o IP público deste host.""" global HOST_PUBLIC_IP if HOST_PUBLIC_IP: return HOST_PUBLIC_IP try: import urllib.request with urllib.request.urlopen('https://api.ipify.org', timeout=5) as response: HOST_PUBLIC_IP = response.read().decode('utf-8') return HOST_PUBLIC_IP except: return "127.0.0.1" def validate_dns(domain): """Verifica se o domínio aponta para este host.""" my_ip = get_public_ip() try: domain_ip = socket.gethostbyname(domain) if domain_ip == my_ip: return True, domain_ip return False, domain_ip except socket.gaierror: return False, "Não resolvido" def setup_ssl(domain): """Executa o Certbot e configura o reload-hook para syslog.""" log_syslog("SSL", "setup_ssl", f"Iniciando Certbot para {domain}") cert_cmd = [ 'certbot', 'certonly', '--webroot', '-w', '/var/lib/letsencrypt/', '-d', domain, '--non-interactive', '--agree-tos', '--email', 'admin@' + domain, '--deploy-hook', f'logger -t CertbotRenewal "SSL Renewed for {domain} - Restarting Nginx" && systemctl reload nginx' ] rc, out, err = run_sudo(cert_cmd) if rc == 0: log_syslog("SSL", "setup_ssl", f"Sucesso ao emitir certificado para {domain}") return True else: log_syslog("SSL", "setup_ssl", f"FALHA ao emitir certificado para {domain}: {err}") print(f"[!] Erro Certbot: {err}") return False # ============================================================================== # CLIENTE WINDOWS (REMOTE DEPLOY) # ============================================================================== def create_tarball(source_dirs, output_filename="deploy_package.tar.gz"): """Cria um tar.gz dos diretórios selecionados.""" print(f"[*] Criando pacote de deploy: {output_filename}") with tarfile.open(output_filename, "w:gz") as tar: for folder in source_dirs: if os.path.exists(folder): print(f" - Adicionando pasta raiz: {folder}") tar.add(folder, arcname=os.path.basename(folder), filter=lambda x: (print(f" + {x.name}") or x)) else: print(f"[!] AVISO: Pasta não encontrada: {folder}") def windows_deploy(args): """Orquestra o deploy remoto a partir do Windows.""" if not PARAMIKO_AVAILABLE: print("[!] Erro: Biblioteca 'paramiko' não instalada.") print(" Execute: pip install paramiko") sys.exit(1) print("="*60) print(f"[*] PATHFINDER REMOTE DEPLOYER - Target: {SERVER_HOST}") print("="*60) # 1. Definir caminhos locais relativos à raiz do projeto # Assume que o script está sendo rodado da raiz ou de producao/scripts base_dir = os.getcwd() if os.path.basename(base_dir) == "scripts": base_dir = os.path.dirname(os.path.dirname(base_dir)) # Sobe para raiz elif os.path.basename(base_dir) == "producao": base_dir = os.path.dirname(base_dir) # Sobe para raiz prod_dir = os.path.join(base_dir, "producao") nginx_dir = os.path.join(prod_dir, "nginx") scripts_dir = os.path.join(prod_dir, "scripts") if not os.path.exists(nginx_dir): print(f"[!] Erro: Não encontrei a pasta 'nginx' em {prod_dir}") sys.exit(1) # 2. Empacotar arquivos tar_name = "pathfinder_deploy.tar.gz" create_tarball([nginx_dir, scripts_dir], tar_name) # 3. Conexão SSH print(f"[*] Conectando em {SERVER_HOST} como {SERVER_USER}...") try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(SERVER_HOST, username=SERVER_USER, password=PASSWORD) # 4. Upload SFTP print(f"[*] Uploading {tar_name} para /tmp/...") sftp = ssh.open_sftp() sftp.put(tar_name, f"/tmp/{tar_name}") sftp.close() # 5. Execução Remota # Reconstrói os argumentos passados para o script local remote_args = " ".join(sys.argv[1:]) if not remote_args: remote_args = "--help" print(f"[*] Executando remotamente: deploy_pathfinder.py {remote_args}") print("-" * 60) # Sequência de comandos remotos (Redirecionando erro para arquivo) remote_cmd = ( f"bash -c 'cd /tmp && " f"tar -xzf {tar_name} && " f"sudo -S -p \"\" python3 scripts/deploy_pathfinder.py {remote_args} > /tmp/deployment_error.log 2>&1'" ) stdin, stdout, stderr = ssh.exec_command(remote_cmd, get_pty=False) # Envia senha para o sudo se solicitado stdin.write(PASSWORD + "\n") stdin.flush() # Stream de saída (stdout) while True: if stdout.channel.recv_ready(): output = stdout.channel.recv(1024).decode('utf-8', errors='replace') print(output, end="") if stdout.channel.exit_status_ready() and not stdout.channel.recv_ready(): break time.sleep(0.1) exit_status = stdout.channel.recv_exit_status() if exit_status != 0: print(f"\n[X] Falha no Deploy Remoto (Exit Code: {exit_status})") print("--- ERRO DETALHADO DO SERVIDOR ---") # Ler arquivo de log remoto _, err_stdout, _ = ssh.exec_command("cat /tmp/deployment_error.log") print(err_stdout.read().decode('utf-8')) print("----------------------------------") else: print(f"\n[+] Deploy Remoto Concluído com Sucesso!") ssh.close() # Limpeza local if os.path.exists(tar_name): os.remove(tar_name) except Exception as e: print(f"\n[!] Erro fatal na conexão SSH: {e}") sys.exit(1) # ============================================================================== # FUNCIONALIDADES DO SCRIPT (SERVER-SIDE LÓGICA) # ============================================================================== def sync_all(): """Sincronização completa (legado).""" log_syslog("SYNC", "sync_all", "Sincronização total de Nginx e Fail2Ban") backup_file(NGINX_CONF_DIR) # Ajuste: No modo remoto, os arquivos são descompactados em /tmp/nginx diretamente # Se rodar manual, assume TMP_SYNC_BASE antigo # Verifica onde estão os arquivos fontes (Prioridade para o pacote descompactado em /tmp) src_nginx = "/tmp/nginx" if not os.path.exists(src_nginx): src_nginx = os.path.join(TMP_SYNC_BASE, "nginx") if not os.path.exists(src_nginx): print(f"[!] Erro: Diretório fonte não encontrado em /tmp/nginx ou {TMP_SYNC_BASE}") return False print(f"[*] Sincronizando de: {src_nginx}") rc, out, err = run_sudo(['cp', '-rf', os.path.join(src_nginx, "."), NGINX_CONF_DIR]) if rc != 0: print(f"[!] Erro ao copiar arquivos: {err}") return False # Atualiza GeoIP se solicitado (ou sempre no sync --all) update_geoip_db() ok, err = check_nginx() if not ok: print(f"[!] Erro na configuração (Nginx -t falhou): {err}") rollback_all() return False run_sudo(['systemctl', 'reload', 'nginx']) print("[+] Sincronização total concluída com sucesso.") return True def update_geoip_db(): """Baixa os bancos de dados GeoIP mais recentes (Mirror GitHub).""" # URLs de Mirror Confiáveis (P3TERX ou similar) GEOIP_URLS = { "GeoLite2-Country.mmdb": "https://git.io/GeoLite2-Country.mmdb", "GeoLite2-City.mmdb": "https://git.io/GeoLite2-City.mmdb" } GEOIP_DIR = "/usr/share/GeoIP" print("[*] Verificando bancos de dados GeoIP2...") if not os.path.exists(GEOIP_DIR): run_sudo(['mkdir', '-p', GEOIP_DIR]) import urllib.request for db_name, url in GEOIP_URLS.items(): target = os.path.join(GEOIP_DIR, db_name) # Lógica simplificada: Baixar sempre no sync --all para garantir atualização # Para produção real, poderia checar headers ETag/Last-Modified, mas o mirror redireciona. print(f" - Baixando {db_name}...") try: # Baixa para /tmp primeiro tmp_target = os.path.join("/tmp", db_name) # Usar curl é mais robusto em ambientes restritos que o urllib do python # Segue redirecionamentos (-L) e falha em erro (-f) rc, out, err = run_sudo(['curl', '-L', '-f', '-o', tmp_target, url]) if rc == 0: run_sudo(['mv', '-f', tmp_target, target]) run_sudo(['chmod', '644', target]) print(f" [OK] Atualizado: {target}") else: print(f" [!] Falha no download de {db_name}: {err}") except Exception as e: print(f" [!] Erro ao atualizar GeoIP: {e}") def sync_item(relative_path): """Sincroniza um arquivo ou diretório.""" # Definição do source (Dual mode: /tmp/nginx direto ou TMP_SYNC_BASE) src_base_1 = "/tmp/nginx" src_base_2 = os.path.join(TMP_SYNC_BASE, "nginx") src = os.path.join(src_base_1, relative_path) if not os.path.exists(src): src = os.path.join(src_base_2, relative_path) dst = os.path.join(NGINX_CONF_DIR, relative_path) if not os.path.exists(src): print(f"[!] Item fonte não encontrado: {src}") return False log_syslog("SYNC_ITEM", "sync_item", f"Sincronizando {relative_path}") backup_file(dst) if os.path.isdir(src): run_sudo(['cp', '-rf', os.path.join(src, '.'), dst]) else: run_sudo(['cp', '-f', src, dst]) ok, err = check_nginx() if not ok: print(f"[!] Falha na validação. Revertendo...") rollback_all() return False run_sudo(['systemctl', 'reload', 'nginx']) print(f"[+] {relative_path} sincronizado e validado.") return True def site_deploy(domain): """Deploy completo de um novo site.""" # Busca fonte src_vhost = os.path.join("/tmp/nginx/conf.d", f"{domain}.conf") if not os.path.exists(src_vhost): src_vhost = os.path.join(TMP_SYNC_BASE, "nginx", "conf.d", f"{domain}.conf") dst_vhost = os.path.join(NGINX_CONF_DIR, "conf.d", f"{domain}.conf") if not os.path.exists(src_vhost): print(f"[!] Arquivo de VHost não encontrado: {src_vhost}") return False log_syslog("SITE_DEPLOY", "site_deploy", f"Iniciando deploy de {domain}") backup_file(dst_vhost) run_sudo(['cp', '-f', src_vhost, dst_vhost]) ok, err = check_nginx() if not ok: print(f"[!] Erro na config do VHost: {err}") rollback_all() return False run_sudo(['systemctl', 'reload', 'nginx']) dns_ok, domain_ip = validate_dns(domain) if not dns_ok: print(f"[!] AVISO: DNS de {domain} ({domain_ip}) incorreto.") print("[!] SSL Certbot será pulado.") return True setup_ssl(domain) return True def site_update(domain): """Atualiza o VHost e tenta renovar SSL se DNS ok.""" vhost_rel = f"conf.d/{domain}.conf" if sync_item(vhost_rel): dns_ok, _ = validate_dns(domain) if dns_ok: setup_ssl(domain) return True return False def site_remove(domain): """Remove site, SSL e Logs.""" log_syslog("SITE_REMOVE", "site_remove", f"Removendo site {domain}") vhost = os.path.join(NGINX_CONF_DIR, "conf.d", f"{domain}.conf") if os.path.exists(vhost): backup_file(vhost) run_sudo(['rm', '-f', vhost]) print(f"[*] Removendo certificados para {domain}...") run_sudo(['certbot', 'delete', '--cert-name', domain]) print(f"[*] Limpando logs de {domain}...") run_sudo(['bash', '-c', f"rm -f {LOG_DIR}/{domain}*"]) ok, _ = check_nginx() if ok: run_sudo(['systemctl', 'reload', 'nginx']) print(f"[+] Site {domain} removido com sucesso.") else: rollback_all() # ============================================================================== # CLI HANDLER # ============================================================================== def main(): # Detecta SO para decidir modo de operação if platform.system() == "Windows": # Modo Cliente (Windows) windows_deploy(sys.argv) return # Modo Servidor (Linux) parser = argparse.ArgumentParser(description="Pathfinder Automator V2 - Nginx/SSL Orchestration") subparsers = parser.add_subparsers(dest="command", help="Comando a executar") sync_parser = subparsers.add_parser("sync", help="Sincronização de arquivos") sync_parser.add_argument("--all", action="store_true", help="Sincronizar tudo") sync_parser.add_argument("--file", type=str, help="Sincronizar arquivo específico") site_parser = subparsers.add_parser("site", help="Gerenciamento de sites") site_parser.add_argument("--deploy", type=str, help="Novo deploy de site (Domínio)") site_parser.add_argument("--update", type=str, help="Atualizar site existente (Domínio)") site_parser.add_argument("--remove", type=str, help="Remover site completamente (Domínio)") geoip_parser = subparsers.add_parser("geoip", help="Gerenciamento de GeoIP") geoip_parser.add_argument("--update", action="store_true", help="Baixar/Atualizar bancos GeoIP") args = parser.parse_args() if args.command == "sync": if args.all: sync_all() elif args.file: sync_item(args.file) elif args.command == "site": if args.deploy: site_deploy(args.deploy) elif args.update: site_update(args.update) elif args.remove: site_remove(args.remove) elif args.command == "geoip": if args.update: update_geoip_db() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[!] Operação cancelada pelo usuário.") sys.exit(1)