Skip to main content

Criando DAGs dinamicamente no Airflow

·495 words·3 mins
Asafe Felipe
Author
Asafe Felipe

O Apache Airflow é uma plataforma open source para orquestrar workflows. Ele permite definir, agendar e monitorar pipelines de dados usando Python.

Principais pontos:

  • DAGs (Directed Acyclic Graphs): descrevem a ordem de execução das tarefas.
  • Operadores: definem o que cada tarefa faz (ex.: rodar um script, executar SQL, chamar API).
  • Scheduler: agenda e dispara as DAGs de acordo com a frequência configurada.
  • Web UI: interface para acompanhar execução, logs e status das DAGs.

É muito usado em engenharia de dados para automatizar ETLs, integrações e rotinas recorrentes.

Gerando DAGs Dinamicamente
#

O Airflow permite resolver isso de forma elegante: gerando DAGs dinamicamente em Python.
Em vez de criar manualmente dezenas de arquivos, você descreve a lógica uma vez e deixa que o código produza quantas DAGs forem necessárias.

Criando o Jinja Template
#

Vamos criar o arquivo process_file.jinja2

from datetime import datetime, timedelta
from airflow import DAG

{% for task in tasks %}
{% if task.type == 'bash' %}
from airflow.operators.bash import BashOperator
{% elif task.type == 'python' %}
from airflow.operators.python import PythonOperator
	{% if task.python_imports%}
{{task.python_imports}}
	{% endif %}
{% endif %}
{% endfor %}


DEFAULT_DAG_ARGS = {
	'owner': 'Asafe',
	'depends_on_past': False,
	'start_date': datetime(2025, 1, 1),
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5),
	'catchup': False,
}


with DAG(
	dag_id="{{dag_id}}",
	description="{{description}}",
	{% if schedule_interval != "None" %}
	schedule_interval="{{schedule_interval}}",
	{% else %}
	schedule_interval={{schedule_interval}},
	{% endif %}
	default_args=DEFAULT_DAG_ARGS,
	tags={{tags}},
):
	{% for task in tasks %}
	{% if task.type == 'bash' %}
	{{task.task_id}} = BashOperator(
		task_id="{{task.task_id}}",
		bash_command="{{task.bash_command}}",
	)

	{% elif task.type == 'python' %}
	{{task.task_id}} = PythonOperator(
		task_id="{{task.task_id}}",
		python_callable={{task.python_callable}},
		{% if task.op_kwargs %}op_kwargs={{task.op_kwargs}},{% endif %}
	)
	{% endif %}
	
	{% endfor %}

	{% for dependency in dependencies %}
	# set dependency
	{{dependency.upstream}} >> {{dependency.downstream}}
	{% endfor %}

Criando o gerador de dags
#

Agora vamos criar o arquivo generate_dag.py

#!/usr/bin/env python
import yaml
import os

from jinja2 import Template, Environment, FileSystemLoader
from pathlib import Path
from datetime import datetime


file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template("multi_task_generator.jinja2")
file_dir = os.path.join(file_dir, "configs")

for filename in os.listdir(file_dir):
	if filename.endswith(".yaml") or filename.endswith(".yml"):
		print(f"Processando arquivo de configuração: {filename}")
		with open(os.path.join(file_dir, filename), 'r', encoding='utf-8') as f:
			if filename.endswith('.yaml') or filename.endswith('.yml'):
				config = yaml.safe_load(f)

		python_functions = []
		for task in config['tasks']:
			if task.get('type') == 'python' and 'python_function' in task:
			python_functions.append(task['python_function'])
			dag_code = template.render(
				dag_id=config['dag_id'],
				description=config.get('description'),
				schedule_interval=config.get('schedule_interval', None),
				tags=config.get('tags', ['dynamic']),
				tasks=config['tasks'],
				dependencies=config.get('dependencies', []),
				python_functions=python_functions,
			)

		dag_filename = f"{config['dag_id']}.py"
		dags_dir = Path(file_dir).parent.parent.parent / "dags"
		dag_filepath = dags_dir / dag_filename
		with open(dag_filepath, 'w', encoding='utf-8') as f:
			f.write(dag_code)
		print(f"DAG gerado com sucesso: {dag_filepath}")
	
	print(f"📁 Localização: {dag_filepath}")

Criando o YAML com as regras da DAG
#

Basta criar o arquivo .yaml dentro da pasta includes/dynamic_generator/configs

dag_id: "dag_z_simple_bash"
description: "DAG simples com uma tarefa bash gerada dinamicamente"
schedule_interval: "@daily"
imports: |
	from airflow.operators.bash import BashOperator
tags:
	- "dynamic"
	- "bash"
	- "simple"

tasks:
	- task_id: "hello_world"
	  type: "bash"
	  bash_command: "echo 'Hello World from Dynamic DAG!'"

Depois basta rodar o comando python includes/dynamic_generator/generate_dag.py e sera gerado um arquivo dentro da pasta de dags

References
#