A um tempo fiz um post sobre como rodar o [DBT]((https://www.getdbt.com/) dentro do Airflow com o Astronomer Cosmos (caso queria conferir, clique aqui). Chegou a hora de atualizar esse post. Vou assumir que você já conhece os conceitos básicos de DBT e Airflow. O projeto está disponível no meu Github:
Setup#
Aqui vou assumir que já tenha o Docker, Python e Poetry (esse é opcional). Depois de clonar o projeto, basta subir o compose com o comando docker compose up -d --build.
Criando o database no Postgres#
Antes de começar é preciso criar um database chamado pokedex no container do postgres com o comando docker compose exec postgres createdb -U airflow pokedex. Dentro do compose o Airflow já faz conexão com esse database via variável de ambiente:
AIRFLOW_CONN_POSTGRES:"postgresql://airflow:airflow@postgres:5432/pokedex"
O projeto#
O Astronomer Cosmos permite integrar o DBT no Airflow, permitindo criar pipelines onde o Airflow orquestra a execução dos modelos DBT de forma automatizada e controlada, mantendo a separação de responsabilidades entre orquestração (Airflow) e transformação (DBT).
airflow/
├── dags/ # Arquivos do Airflow
│ ├── cosmos/ # projeto do DBT
│ │ ├── models/ # Modelos do DBT
│ │ ├── seeds/ # Arquivos CSV
│ │ ├── macros/ # Funções do DBT
│ │ └── tests/ # Testes do DBT
│ ├── modules/ # Modules para as DAGs
│ │ └── *.py # Arquivos
│ └── *.py # Arquivos de DAG
├── tests/ # Testes
├── docker-compose.yaml # Compose do Airflow
├── Dockerfile # Imagem Customizada do Airflow
└── pyproject.toml # Dependências do Python
DBT#
Dentro do diretório dags/cosmos esta todo o projeto do DBT, seguindo essa estrutura basica.
cosmos/
├── models/
│ ├── sources/ # Definições das fontes de dados
│ ├── trusted/ # Camada de dados limpos e padronizados
│ │ ├── dolar/ # Dados financeiros dólar/ibov
│ │ └── pokedex/ # Dados pokemon/moves/types
│ └── gold/ # Camada de agregação e análise
│ ├── dolar/ # Métricas financeiras (correlação, volatilidade)
│ └── pokedex/ # Análises pokemon/moves/types
├── seeds/ # Dados raw (CSV)
│ ├── pokemons/ # Dados base pokemon (moves, pokemons, moves_pokemons)
│ └── stocks/ # Dados financeiros (dolar_ibov, usd_brl)
├── macros/ # Funções reutilizáveis
└── tests/ # Testes
Basicamente esse projeto segue uma arquitetura medalhão simples:
- seeds - dados brutos (bronze)
- models/trusted - dados tratados (silver)
- models/gold - dados refinados (gold)
Modules#
No arquivo dbt_config existem 4 funções para configurar as DAGs com o operador do DBT.
get_execution_config()#
Retorna a configuração do caminho do executável dbt.
from modules.dbt_config import get_execution_config
# Obter configuração do executável dbt
execution_config = get_execution_config()
get_render_config(select)#
Define quais modelos dbt serão executados.
from modules.dbt_config import get_render_config
# Executar uma tag específica
render_config = get_render_config(select=["tag:dolar"])
get_profile_config(schema, target)#
Configura a conexão com o banco de dados para o dbt.
from modules.dbt_config import get_profile_config
# Configuração padrão (schema="public", target="dev")
profile_config = get_profile_config()
# Schema e ambiente personalizados
get_profile_config(schema="trusted")
get_project_config(env_vars, dbt_vars)#
Define o caminho do projeto dbt e suas variáveis.
from modules.dbt_config import get_project_config
# Configuração padrão (sem variáveis)
project_config = get_project_config()
# Com variáveis personalizadas
project_config = get_project_config(
env_vars={"DBT_ENV": "producao"},
dbt_vars={"data_inicio": "2023-01-01"}
)
DAG de ingestão dos dados#
A DAG dag_ingestion_data é responsável por fazer a ingestão das seeds para a camada raw
DAG de transformação#
Existem duas DAGs de transformação tanto da camada silver quanto da camada gold, uma para os dados de dolar e outra os dados de pokemons
Gerando as Docs do DBT e “hospedando” dentro do Airflow#
A DAG dag_docs_gerenator fica encarregada de gerar a docs do DBT (equivalente ao comando dbt docs generate) e o Cosmos permite “hospedar” essa documentação dentro do Airflow no menu Browse > dbt docs, aqui é preciso configurar de acordo (na documentação do cosmos tem alguns exemplos) mas como estou rodando o airflow local é so é necessario ter a seguinte variável de ambiente:
AIRFLOW__COSMOS__DBT_DOCS_DIR: "/opt/airflow/dags/cosmos/target"
