148 lines
8.1 KiB
Python
148 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Importa as bibliotecas necessárias
|
|
import pandas as pd
|
|
import re
|
|
import os
|
|
import sys
|
|
|
|
print("Iniciando o script de normalização e enriquecimento de dados...")
|
|
|
|
# --- 1. CONFIGURAÇÃO DOS CAMINHOS ---
|
|
base_path = os.path.expanduser('~')
|
|
caminho_dados_originais = os.path.join(base_path, 'Documents', 'Relatorio de Inventario GRUPO PRALOG', 'dados.xlsx')
|
|
caminho_inventario = os.path.join(base_path, 'Documents', 'Relatorio de Inventario GRUPO PRALOG', 'Inventario de Equipamentos Grupo Pralog - via RH (1).xlsx')
|
|
caminho_saida = os.path.join(base_path, 'Desktop', 'dados-normalizados.xlsx')
|
|
|
|
# Configuração de Leitura do Excel de Inventário
|
|
linha_cabecalho = 0
|
|
|
|
# --- 2. FUNÇÕES DE NORMALIZAÇÃO E ATUALIZAÇÃO ---
|
|
def normalizar_email(email):
|
|
email_str = str(email).strip()
|
|
if '@' in email_str and email_str.lower() not in ['sem informação', 'n/a']:
|
|
return email_str.lower()
|
|
return email
|
|
|
|
def normalizar_telefone(telefone):
|
|
if pd.isna(telefone): return telefone
|
|
telefone_str = str(telefone)
|
|
numeros = re.sub(r'\D', '', telefone_str)
|
|
if len(numeros) in [12, 13] and numeros.startswith('55'):
|
|
return f"'+{numeros}"
|
|
elif len(numeros) in [10, 11]:
|
|
return f"'+55{numeros}"
|
|
return telefone
|
|
|
|
def normalizar_contrato(tipo_contrato):
|
|
if pd.isna(tipo_contrato): return tipo_contrato
|
|
tipo_str = str(tipo_contrato).strip().upper()
|
|
if tipo_str in ['MEI', 'PJ']: return 'Pessoa Juridica'
|
|
return str(tipo_contrato).strip()
|
|
|
|
# --- 3. EXECUÇÃO DO SCRIPT ---
|
|
try:
|
|
# Carrega as planilhas, garantindo que tudo seja lido como texto e valores vazios sejam strings vazias
|
|
print(f"Lendo o arquivo principal: {caminho_dados_originais}")
|
|
df_dados = pd.read_excel(caminho_dados_originais, dtype=str).fillna('')
|
|
|
|
print(f"Lendo o arquivo de inventário: {caminho_inventario} (usando a linha {linha_cabecalho + 1} como cabeçalho)")
|
|
df_inventario = pd.read_excel(caminho_inventario, header=linha_cabecalho, dtype=str).fillna('')
|
|
|
|
# Limpeza automática dos nomes das colunas de AMBOS os arquivos
|
|
print("\nLimpando nomes de colunas de ambos os arquivos...")
|
|
df_dados.columns = df_dados.columns.str.strip().str.replace(r'\s+', ' ', regex=True)
|
|
df_inventario.columns = df_inventario.columns.str.strip().str.replace(r'\s+', ' ', regex=True)
|
|
|
|
# Diagnóstico e Validação
|
|
colunas_necessarias_inventario = ['Colaborador Registrado', 'Tipo de Contrato', 'Unidade', 'Status']
|
|
colunas_faltando = [col for col in colunas_necessarias_inventario if col not in df_inventario.columns]
|
|
if colunas_faltando:
|
|
print(f"❌ ERRO CRÍTICO: As colunas a seguir não foram encontradas no inventário: {colunas_faltando}")
|
|
sys.exit()
|
|
|
|
# TAREFA A: Adicionar colaboradores ausentes
|
|
print("\nVerificando e adicionando colaboradores ausentes...")
|
|
df_inventario.dropna(subset=['Colaborador Registrado'], inplace=True)
|
|
nomes_dados = set(df_dados['NOME'].astype(str).str.strip().str.lower())
|
|
nomes_inventario = set(df_inventario['Colaborador Registrado'].astype(str).str.strip().str.lower())
|
|
nomes_a_adicionar_lower = nomes_inventario - nomes_dados
|
|
if nomes_a_adicionar_lower:
|
|
nomes_originais = df_inventario[df_inventario['Colaborador Registrado'].str.strip().str.lower().isin(nomes_a_adicionar_lower)]['Colaborador Registrado'].unique()
|
|
print(f"Encontrados {len(nomes_originais)} novos colaboradores para adicionar.")
|
|
novos_colaboradores_df = pd.DataFrame(nomes_originais, columns=['NOME'])
|
|
df_dados = pd.concat([df_dados, novos_colaboradores_df], ignore_index=True).fillna('')
|
|
else:
|
|
print("Nenhum novo colaborador a ser adicionado.")
|
|
|
|
# TAREFA B: Normalizações Padrão
|
|
print("\nExecutando normalizações padrão (email e telefone)...")
|
|
df_dados['Email Pessoal Sendo Usado'] = df_dados['Email Pessoal Sendo Usado'].apply(normalizar_email)
|
|
df_dados['Email Corporativo'] = df_dados['Email Corporativo'].apply(normalizar_email)
|
|
df_dados['Contato Colaborador'] = df_dados['Contato Colaborador'].apply(normalizar_telefone)
|
|
df_dados['Contato do RESPONSÁVEL'] = df_dados['Contato do RESPONSÁVEL'].apply(normalizar_telefone)
|
|
|
|
# TAREFA C: Lógica condicional para E-mails
|
|
print("Aplicando lógicas condicionais para preenchimento de e-mails...")
|
|
# Regra 1: Se 'Tem Email Corporativo?' for 'Não' E não houver e-mail pessoal, preenche 'Sem Informação'
|
|
cond_nao_tem_email = df_dados['Tem Email Corporativo?'].str.strip().str.lower() == 'não'
|
|
cond_email_pessoal_nao_existe = ~df_dados['Email Pessoal Sendo Usado'].str.contains('@', na=False)
|
|
mascara_regra1 = cond_nao_tem_email & cond_email_pessoal_nao_existe
|
|
df_dados.loc[mascara_regra1, 'Email Pessoal Sendo Usado'] = 'Sem Informação'
|
|
|
|
# Regra 2: Se 'Tem Email Corporativo?' for 'Sim', preenche 'Email Pessoal Sendo Usado' com 'N/A'
|
|
cond_tem_email_sim = df_dados['Tem Email Corporativo?'].str.strip().str.lower() == 'sim'
|
|
df_dados.loc[cond_tem_email_sim, 'Email Pessoal Sendo Usado'] = 'N/A'
|
|
|
|
# NOVO: Regra 3: Se 'Tem Email Corporativo?' for 'Não', preenche 'Email Corporativo' com 'N/A'
|
|
df_dados.loc[cond_nao_tem_email, 'Email Corporativo'] = 'N/A'
|
|
|
|
# TAREFA D: Atualizar 'Contratação' e 'BASE'
|
|
print("Atualizando dados de 'Contratação' e 'BASE' a partir do inventário...")
|
|
df_inventario_sem_duplicatas = df_inventario.drop_duplicates(subset=['Colaborador Registrado'], keep='first').copy()
|
|
|
|
df_inventario_sem_duplicatas.loc[:, 'Tipo de Contrato Normalizado'] = df_inventario_sem_duplicatas['Tipo de Contrato'].apply(normalizar_contrato)
|
|
mapa_contratacao = df_inventario_sem_duplicatas.set_index(df_inventario_sem_duplicatas['Colaborador Registrado'].str.strip().str.lower())['Tipo de Contrato Normalizado']
|
|
novos_valores_contratacao = df_dados['NOME'].str.strip().str.lower().map(mapa_contratacao)
|
|
df_dados['Contratação'] = novos_valores_contratacao.combine_first(df_dados['Contratação'])
|
|
|
|
mapa_base = df_inventario_sem_duplicatas.set_index(df_inventario_sem_duplicatas['Colaborador Registrado'].str.strip().str.lower())['Unidade']
|
|
df_dados['nova_base'] = df_dados['NOME'].str.strip().str.lower().map(mapa_base)
|
|
def mesclar_base(row):
|
|
base_atual = str(row['BASE']).strip()
|
|
base_nova = str(row['nova_base']).strip()
|
|
if not base_nova: return base_atual
|
|
if not base_atual: return base_nova
|
|
if base_atual.lower() == base_nova.lower(): return base_atual
|
|
return f"{base_atual} / {base_nova}"
|
|
df_dados['BASE'] = df_dados.apply(mesclar_base, axis=1)
|
|
df_dados.drop(columns=['nova_base'], inplace=True)
|
|
|
|
# TAREFA E: Atualizar status 'Usa Notebook da Empresa'
|
|
print("Atualizando status de 'Usa Notebook da Empresa'...")
|
|
df_em_uso = df_inventario[df_inventario['Status'].str.strip().str.lower() == 'em uso']
|
|
nomes_com_notebook_em_uso = set(df_em_uso['Colaborador Registrado'].str.strip().str.lower())
|
|
mascara_atualizacao_notebook = df_dados['NOME'].str.strip().str.lower().isin(nomes_com_notebook_em_uso)
|
|
df_dados.loc[mascara_atualizacao_notebook, 'Usa Notebook da Empresa'] = 'Sim'
|
|
|
|
# NOVO: TAREFA F: Preenchimento de valores padrão em colunas vazias
|
|
print("Preenchendo valores padrão para colunas vazias...")
|
|
colunas_para_preencher = {
|
|
'Usa Notebook da Empresa': 'Sem Informações',
|
|
'Realmente e um Subordinado?': 'Sem Informações'
|
|
}
|
|
for coluna, valor_padrao in colunas_para_preencher.items():
|
|
# A condição `== ''` funciona porque carregamos os dados com .fillna('')
|
|
df_dados.loc[df_dados[coluna] == '', coluna] = valor_padrao
|
|
|
|
# --- 4. SALVAR O RESULTADO ---
|
|
print(f"\nSalvando o arquivo final em: {caminho_saida}")
|
|
df_dados.to_excel(caminho_saida, index=False)
|
|
print("\n✅ Script concluído com sucesso!")
|
|
|
|
except FileNotFoundError as e:
|
|
print(f"❌ ERRO: Arquivo não encontrado! Verifique o caminho: {e.filename}")
|
|
except KeyError as e:
|
|
print(f"❌ ERRO DE CHAVE: A coluna {e} não foi encontrada. Verifique os nomes das colunas nos arquivos Excel.")
|
|
except Exception as e:
|
|
print(f"❌ Ocorreu um erro inesperado: {e}") |