testes/app/routes/extrato.py

125 lines
4.0 KiB
Python

import os
from time import time
from functools import wraps
from datetime import date
import requests
from dotenv import load_dotenv
# Carregar variáveis de ambiente do arquivo .env
load_dotenv()
# Definição das credenciais e URL base da API
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
CERT_PATH = os.environ.get("CERT_PATH")
CERT_KEY_PATH = os.environ.get("CERT_KEY_PATH")
ACCOUNT = os.environ.get("ACCOUNT", None)
API_BASE_URL = "https://cdpj.partners.bancointer.com.br"
# Cache para tokens de acesso
token_cache = {}
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
data = {
"domain": "example.com",
"scopes": ["boleto-cobranca.write", "boleto-cobranca.read"]
}
return f(data, *args, **kwargs)
return decorated
def obter_token(scopes=["boleto-cobranca.write", "boleto-cobranca.read"]):
"""Obtem um token de acesso para a API do Banco Inter."""
scopes_param = " ".join(scopes)
if token_cache.get(scopes_param):
cached_token = token_cache.get(scopes_param)
# token is still valid
if cached_token.get("expires_at") > int(time()):
return cached_token["token"]
try:
request_string = f"client_id={CLIENT_ID}&client_secret={CLIENT_SECRET}&scope={scopes_param}&grant_type=client_credentials"
print("request_string", request_string)
response = requests.post(url=f"{API_BASE_URL}/oauth/v2/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
cert=(CERT_PATH, CERT_KEY_PATH),
data=request_string)
# Verificar o status da resposta
if response.status_code != 200:
print(f"Erro na requisição: {response.status_code}")
print("Resposta bruta:", response.text) # Imprime a resposta bruta para debugar
data_json = response.json()
print("Resposta JSON:", data_json) # Imprimir o JSON da resposta
token = data_json.get("access_token")
# Cache do token
token_cache[scopes_param] = {
"token": token,
"expires_at": data_json.get("expires_in") + int(time()),
}
return token
except Exception as e:
print(f"Erro ao obter token de acesso: {e}")
return None
def get_balance():
"""Obtém o saldo da conta no Banco Inter."""
token = obter_token(["saldo.read"])
if not token:
print("Erro: não foi possível obter o token de acesso.")
return None
try:
response = requests.get(
f"{API_BASE_URL}/banking/v2/saldo",
headers={"Authorization": f"Bearer {token}"},
cert=(CERT_PATH, CERT_KEY_PATH)
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Erro ao obter saldo: {e}")
return None
def get_statements(start_date: str, end_date: str):
"""Obtém o extrato da conta no Banco Inter entre as datas informadas."""
token = obter_token(["extrato.read"])
if not token:
print("Erro: não foi possível obter o token de acesso.")
return None
try:
response = requests.get(
f"{API_BASE_URL}/banking/v2/extrato",
params={"dataInicio": start_date, "dataFim": end_date},
headers={"Authorization": f"Bearer {token}"},
cert=(CERT_PATH, CERT_KEY_PATH)
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Erro ao obter extrato: {e}")
return None
if __name__ == "__main__":
time_now = time()
yesterday = date.fromtimestamp(time_now - 60 * 60 * 24).strftime("%Y-%m-%d")
today = date.fromtimestamp(time_now).strftime("%Y-%m-%d")
print(f"Obtendo informações financeiras para o período: {yesterday} - {today}")
# Obter saldo
balance_data = get_balance()
print("Saldo:", balance_data)
# Obter extrato
statements_data = get_statements(yesterday, today)
print("Extrato:", statements_data)