NgixProxy_Pathfinder/scripts/deploy_pathfinder.py

299 lines
10 KiB
Python

#!/usr/bin/env python3
import os
import sys
import argparse
import subprocess
import socket
import syslog
import shutil
from datetime import datetime
# ==============================================================================
# CONFIGURAÇÕES TÉCNICAS
# ==============================================================================
PASSWORD = "vR7Ag$Pk"
NGINX_CONF_DIR = "/etc/nginx"
NGINX_CONF_BACKUP = "/etc/nginx.bak"
FAIL2BAN_CONF_DIR = "/etc/fail2ban"
TMP_SYNC_BASE = "/tmp/pathfinder_sync"
LOG_DIR = "/var/log/nginx"
# Endereço IP Público do Host de Produção (para validação DNS)
HOST_PUBLIC_IP = ""
# ==============================================================================
# UTILITÁRIOS DE SISTEMA E AUDITORIA
# ==============================================================================
def log_syslog(task, function, details=""):
"""Registra a ação no Syslog para auditoria."""
try:
hostname = socket.gethostname()
remote_ip = os.environ.get('SSH_CLIENT', 'localhost').split()[0]
identity = f"PathfinderAutomator[{task}]"
msg = f"Task: {task} | Func: {function} | From: {remote_ip} | Host: {hostname} | Details: {details}"
syslog.openlog(ident=identity, facility=syslog.LOG_AUTHPRIV)
syslog.syslog(syslog.LOG_INFO, msg)
print(f"[*] [SYSLOG] {msg}")
except Exception as e:
print(f"[!] Erro ao registrar no syslog: {e}")
def run_sudo(cmd, input_data=None, capture=True):
"""Executa comando com sudo e retorna (rc, stdout, stderr)."""
full_cmd = ['sudo', '-S'] + cmd
p = subprocess.Popen(full_cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.PIPE if capture else None,
text=True)
stdin_input = (PASSWORD + '\n')
if input_data:
stdin_input += input_data
out, err = p.communicate(input=stdin_input)
return p.returncode, out, err
def check_nginx():
"""Valida a sintaxe do Nginx."""
rc, out, err = run_sudo(['nginx', '-t'])
return rc == 0, err
# ==============================================================================
# LÓGICA DE BACKUP E ROLLBACK ATÔMICO
# ==============================================================================
BACKUP_MAP = {} # Rastreia arquivos alterados para rollback
def backup_file(target_path):
"""Cria um backup .pathfinder_bak antes de qualquer alteração."""
if not os.path.exists(target_path):
return None
bak_path = f"{target_path}.pathfinder_bak"
log_syslog("BACKUP", "backup_file", f"Backup de {target_path} para {bak_path}")
rc, _, _ = run_sudo(['cp', '-rp', target_path, bak_path])
if rc == 0:
BACKUP_MAP[target_path] = bak_path
return bak_path
return None
def rollback_all():
"""Restaura todos os backups registrados caso ocorra um erro."""
log_syslog("ROLLBACK", "rollback_all", "Iniciando restauração de emergência.")
for target, bak in BACKUP_MAP.items():
print(f"[!] Restaurando {target}...")
run_sudo(['rm', '-rf', target])
run_sudo(['cp', '-rp', bak, target])
print("[*] Rollback concluído. Testando Nginx novamente...")
check_nginx()
# ==============================================================================
# AUXILIARES DE REDE (DNS/IP/SSL)
# ==============================================================================
def get_public_ip():
"""Tenta descobrir o IP público deste host."""
global HOST_PUBLIC_IP
if HOST_PUBLIC_IP:
return HOST_PUBLIC_IP
try:
import urllib.request
with urllib.request.urlopen('https://api.ipify.org', timeout=5) as response:
HOST_PUBLIC_IP = response.read().decode('utf-8')
return HOST_PUBLIC_IP
except:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
HOST_PUBLIC_IP = s.getsockname()[0]
s.close()
return HOST_PUBLIC_IP
except:
return "127.0.0.1"
def validate_dns(domain):
"""Verifica se o domínio aponta para este host."""
my_ip = get_public_ip()
try:
domain_ip = socket.gethostbyname(domain)
if domain_ip == my_ip:
return True, domain_ip
return False, domain_ip
except socket.gaierror:
return False, "Não resolvido"
def setup_ssl(domain):
"""Executa o Certbot e configura o reload-hook para syslog."""
log_syslog("SSL", "setup_ssl", f"Iniciando Certbot para {domain}")
cert_cmd = [
'certbot', 'certonly', '--webroot',
'-w', '/var/lib/letsencrypt/',
'-d', domain, '--non-interactive', '--agree-tos',
'--email', 'admin@' + domain,
'--deploy-hook', f'logger -t CertbotRenewal "SSL Renewed for {domain} - Restarting Nginx" && systemctl reload nginx'
]
rc, out, err = run_sudo(cert_cmd)
if rc == 0:
log_syslog("SSL", "setup_ssl", f"Sucesso ao emitir certificado para {domain}")
return True
else:
log_syslog("SSL", "setup_ssl", f"FALHA ao emitir certificado para {domain}: {err}")
print(f"[!] Erro Certbot: {err}")
return False
# ==============================================================================
# FUNCIONALIDADES DO SCRIPT
# ==============================================================================
def sync_all():
"""Sincronização completa (legado)."""
log_syslog("SYNC", "sync_all", "Sincronização total de Nginx e Fail2Ban")
backup_file(NGINX_CONF_DIR)
backup_file(FAIL2BAN_CONF_DIR)
src_nginx = os.path.join(TMP_SYNC_BASE, "nginx", ".")
run_sudo(['cp', '-rf', src_nginx, NGINX_CONF_DIR])
ok, err = check_nginx()
if not ok:
print(f"[!] Erro na configuração: {err}")
rollback_all()
return False
run_sudo(['systemctl', 'reload', 'nginx'])
print("[+] Sincronização total concluída com sucesso.")
return True
def sync_item(relative_path):
"""Sincroniza um arquivo ou diretório (ex: snippets/ ou modsec/)."""
src = os.path.join(TMP_SYNC_BASE, "nginx", relative_path)
dst = os.path.join(NGINX_CONF_DIR, relative_path)
if not os.path.exists(src):
print(f"[!] Item fonte não encontrado: {src}")
return False
log_syslog("SYNC_ITEM", "sync_item", f"Sincronizando {relative_path}")
# Backup recursivo se for diretório ou arquivo
backup_file(dst)
# Usa -rf para suportar diretórios (como modsec/)
if os.path.isdir(src):
run_sudo(['cp', '-rf', os.path.join(src, '.'), dst])
else:
run_sudo(['cp', '-f', src, dst])
ok, err = check_nginx()
if not ok:
print(f"[!] Falha na validação após sincronizar {relative_path}. Revertendo...")
rollback_all()
return False
run_sudo(['systemctl', 'reload', 'nginx'])
print(f"[+] {relative_path} sincronizado e validado.")
return True
def site_deploy(domain):
"""Deploy completo de um novo site."""
src_vhost = os.path.join(TMP_SYNC_BASE, "nginx", "conf.d", f"{domain}.conf")
dst_vhost = os.path.join(NGINX_CONF_DIR, "conf.d", f"{domain}.conf")
if not os.path.exists(src_vhost):
print(f"[!] Arquivo de VHost não encontrado em: {src_vhost}")
return False
log_syslog("SITE_DEPLOY", "site_deploy", f"Iniciando deploy de {domain}")
backup_file(dst_vhost)
run_sudo(['cp', '-f', src_vhost, dst_vhost])
ok, err = check_nginx()
if not ok:
print(f"[!] Erro na config do VHost: {err}")
rollback_all()
return False
run_sudo(['systemctl', 'reload', 'nginx'])
dns_ok, domain_ip = validate_dns(domain)
if not dns_ok:
print(f"[!] AVISO: DNS de {domain} ({domain_ip}) não aponta para este host ({get_public_ip()}).")
print("[!] SSL Certbot será pulado. Rode 'site --update' após corrigir o DNS.")
return True
setup_ssl(domain)
return True
def site_update(domain):
"""Atualiza o VHost e tenta renovar SSL se DNS ok."""
vhost_rel = f"conf.d/{domain}.conf"
if sync_item(vhost_rel):
dns_ok, _ = validate_dns(domain)
if dns_ok:
setup_ssl(domain)
return True
return False
def site_remove(domain):
"""Remove site, SSL e Logs."""
log_syslog("SITE_REMOVE", "site_remove", f"Removendo site {domain}")
# 1. Nginx Config
vhost = os.path.join(NGINX_CONF_DIR, "conf.d", f"{domain}.conf")
if os.path.exists(vhost):
backup_file(vhost)
run_sudo(['rm', '-f', vhost])
# 2. SSL Certbot
print(f"[*] Removendo certificados para {domain}...")
run_sudo(['certbot', 'delete', '--cert-name', domain])
# 3. Logs (Atuais e GZ)
print(f"[*] Limpando logs de {domain}...")
run_sudo(['bash', '-c', f"rm -f {LOG_DIR}/{domain}*"])
ok, _ = check_nginx()
if ok:
run_sudo(['systemctl', 'reload', 'nginx'])
print(f"[+] Site {domain} removido com sucesso.")
else:
rollback_all()
# ==============================================================================
# CLI HANDLER
# ==============================================================================
def main():
parser = argparse.ArgumentParser(description="Pathfinder Automator V2 - Nginx/SSL Orchestration")
subparsers = parser.add_subparsers(dest="command", help="Comando a executar")
sync_parser = subparsers.add_parser("sync", help="Sincronização de arquivos")
sync_parser.add_argument("--all", action="store_true", help="Sincronizar tudo")
sync_parser.add_argument("--file", type=str, help="Sincronizar arquivo específico")
site_parser = subparsers.add_parser("site", help="Gerenciamento de sites")
site_parser.add_argument("--deploy", type=str, help="Novo deploy de site (Domínio)")
site_parser.add_argument("--update", type=str, help="Atualizar site existente (Domínio)")
site_parser.add_argument("--remove", type=str, help="Remover site completamente (Domínio)")
args = parser.parse_args()
if args.command == "sync":
if args.all:
sync_all()
elif args.file:
sync_item(args.file)
elif args.command == "site":
if args.deploy: site_deploy(args.deploy)
elif args.update: site_update(args.update)
elif args.remove: site_remove(args.remove)
if __name__ == "__main__":
if os.getuid() == 0:
print("[!] Não execute diretamente como root. Use um usuário com sudo.")
sys.exit(1)
main()