Skip to main content

Airflow + DBT (com Cosmos)

·672 words·4 mins
Asafe Felipe
Author
Asafe Felipe

A um tempo fiz um post sobre como rodar o [DBT]((https://www.getdbt.com/) dentro do Airflow com o Astronomer Cosmos (caso queria conferir, clique aqui). Chegou a hora de atualizar esse post. Vou assumir que você já conhece os conceitos básicos de DBT e Airflow. O projeto está disponível no meu Github:

Setup
#

Aqui vou assumir que já tenha o Docker, Python e Poetry (esse é opcional). Depois de clonar o projeto, basta subir o compose com o comando docker compose up -d --build.

Criando o database no Postgres
#

Antes de começar é preciso criar um database chamado pokedex no container do postgres com o comando docker compose exec postgres createdb -U airflow pokedex. Dentro do compose o Airflow já faz conexão com esse database via variável de ambiente:

AIRFLOW_CONN_POSTGRES:"postgresql://airflow:airflow@postgres:5432/pokedex"

O projeto
#

O Astronomer Cosmos permite integrar o DBT no Airflow, permitindo criar pipelines onde o Airflow orquestra a execução dos modelos DBT de forma automatizada e controlada, mantendo a separação de responsabilidades entre orquestração (Airflow) e transformação (DBT).

airflow/
├── dags/                # Arquivos do Airflow
│ ├── cosmos/            # projeto do DBT
│ │ ├── models/          # Modelos do DBT
│ │ ├── seeds/           # Arquivos CSV
│ │ ├── macros/          # Funções do DBT
│ │ └── tests/           # Testes do DBT
│ ├── modules/           # Modules para as DAGs
│ │ └── *.py             # Arquivos
│ └── *.py               # Arquivos de DAG
├── tests/               # Testes
├── docker-compose.yaml  # Compose do Airflow
├── Dockerfile           # Imagem Customizada do Airflow
└── pyproject.toml       # Dependências do Python

DBT
#

Dentro do diretório dags/cosmos esta todo o projeto do DBT, seguindo essa estrutura basica.

cosmos/
├── models/
│ ├── sources/   # Definições das fontes de dados
│ ├── trusted/   # Camada de dados limpos e padronizados
│ │ ├── dolar/   # Dados financeiros dólar/ibov
│ │ └── pokedex/ # Dados pokemon/moves/types
│ └── gold/      # Camada de agregação e análise
│ ├── dolar/     # Métricas financeiras (correlação, volatilidade)
│ └── pokedex/   # Análises pokemon/moves/types
├── seeds/       # Dados raw (CSV)
│ ├── pokemons/  # Dados base pokemon (moves, pokemons, moves_pokemons)
│ └── stocks/    # Dados financeiros (dolar_ibov, usd_brl)
├── macros/      # Funções reutilizáveis
└── tests/       # Testes

Basicamente esse projeto segue uma arquitetura medalhão simples:

  • seeds - dados brutos (bronze)
  • models/trusted - dados tratados (silver)
  • models/gold - dados refinados (gold)

Modules
#

No arquivo dbt_config existem 4 funções para configurar as DAGs com o operador do DBT.

get_execution_config()
#

Retorna a configuração do caminho do executável dbt.

from modules.dbt_config import get_execution_config
# Obter configuração do executável dbt
execution_config = get_execution_config()

get_render_config(select)
#

Define quais modelos dbt serão executados.

from modules.dbt_config import get_render_config
# Executar uma tag específica
render_config = get_render_config(select=["tag:dolar"])

get_profile_config(schema, target)
#

Configura a conexão com o banco de dados para o dbt.

from modules.dbt_config import get_profile_config
# Configuração padrão (schema="public", target="dev")
profile_config = get_profile_config()
# Schema e ambiente personalizados
get_profile_config(schema="trusted")

get_project_config(env_vars, dbt_vars)
#

Define o caminho do projeto dbt e suas variáveis.

from modules.dbt_config import get_project_config
# Configuração padrão (sem variáveis)
project_config = get_project_config()
# Com variáveis personalizadas
project_config = get_project_config(
	env_vars={"DBT_ENV": "producao"},
	dbt_vars={"data_inicio": "2023-01-01"}
)

DAG de ingestão dos dados
#

A DAG dag_ingestion_data é responsável por fazer a ingestão das seeds para a camada raw

DAG de transformação
#

Existem duas DAGs de transformação tanto da camada silver quanto da camada gold, uma para os dados de dolar e outra os dados de pokemons

Gerando as Docs do DBT e “hospedando” dentro do Airflow
#

A DAG dag_docs_gerenator fica encarregada de gerar a docs do DBT (equivalente ao comando dbt docs generate) e o Cosmos permite “hospedar” essa documentação dentro do Airflow no menu Browse > dbt docs, aqui é preciso configurar de acordo (na documentação do cosmos tem alguns exemplos) mas como estou rodando o airflow local é so é necessario ter a seguinte variável de ambiente:

AIRFLOW__COSMOS__DBT_DOCS_DIR: "/opt/airflow/dags/cosmos/target"
Não se esqueça de me seguir no LinkedIn

References
#