O Apache Airflow é uma plataforma open source para orquestrar workflows. Ele permite definir, agendar e monitorar pipelines de dados usando Python.
Principais pontos:
- DAGs (Directed Acyclic Graphs): descrevem a ordem de execução das tarefas.
- Operadores: definem o que cada tarefa faz (ex.: rodar um script, executar SQL, chamar API).
- Scheduler: agenda e dispara as DAGs de acordo com a frequência configurada.
- Web UI: interface para acompanhar execução, logs e status das DAGs.
É muito usado em engenharia de dados para automatizar ETLs, integrações e rotinas recorrentes.
Gerando DAGs Dinamicamente#
O Airflow permite resolver isso de forma elegante: gerando DAGs dinamicamente em Python.
Em vez de criar manualmente dezenas de arquivos, você descreve a lógica uma vez e deixa que o código produza quantas DAGs forem necessárias.
Criando o Jinja Template#
Vamos criar o arquivo
process_file.jinja2
from datetime import datetime, timedelta
from airflow import DAG
{% for task in tasks %}
{% if task.type == 'bash' %}
from airflow.operators.bash import BashOperator
{% elif task.type == 'python' %}
from airflow.operators.python import PythonOperator
{% if task.python_imports%}
{{task.python_imports}}
{% endif %}
{% endif %}
{% endfor %}
DEFAULT_DAG_ARGS = {
'owner': 'Asafe',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False,
}
with DAG(
dag_id="{{dag_id}}",
description="{{description}}",
{% if schedule_interval != "None" %}
schedule_interval="{{schedule_interval}}",
{% else %}
schedule_interval={{schedule_interval}},
{% endif %}
default_args=DEFAULT_DAG_ARGS,
tags={{tags}},
):
{% for task in tasks %}
{% if task.type == 'bash' %}
{{task.task_id}} = BashOperator(
task_id="{{task.task_id}}",
bash_command="{{task.bash_command}}",
)
{% elif task.type == 'python' %}
{{task.task_id}} = PythonOperator(
task_id="{{task.task_id}}",
python_callable={{task.python_callable}},
{% if task.op_kwargs %}op_kwargs={{task.op_kwargs}},{% endif %}
)
{% endif %}
{% endfor %}
{% for dependency in dependencies %}
# set dependency
{{dependency.upstream}} >> {{dependency.downstream}}
{% endfor %}
Criando o gerador de dags#
Agora vamos criar o arquivo
generate_dag.py
#!/usr/bin/env python
import yaml
import os
from jinja2 import Template, Environment, FileSystemLoader
from pathlib import Path
from datetime import datetime
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template("multi_task_generator.jinja2")
file_dir = os.path.join(file_dir, "configs")
for filename in os.listdir(file_dir):
if filename.endswith(".yaml") or filename.endswith(".yml"):
print(f"Processando arquivo de configuração: {filename}")
with open(os.path.join(file_dir, filename), 'r', encoding='utf-8') as f:
if filename.endswith('.yaml') or filename.endswith('.yml'):
config = yaml.safe_load(f)
python_functions = []
for task in config['tasks']:
if task.get('type') == 'python' and 'python_function' in task:
python_functions.append(task['python_function'])
dag_code = template.render(
dag_id=config['dag_id'],
description=config.get('description'),
schedule_interval=config.get('schedule_interval', None),
tags=config.get('tags', ['dynamic']),
tasks=config['tasks'],
dependencies=config.get('dependencies', []),
python_functions=python_functions,
)
dag_filename = f"{config['dag_id']}.py"
dags_dir = Path(file_dir).parent.parent.parent / "dags"
dag_filepath = dags_dir / dag_filename
with open(dag_filepath, 'w', encoding='utf-8') as f:
f.write(dag_code)
print(f"DAG gerado com sucesso: {dag_filepath}")
print(f"📁 Localização: {dag_filepath}")
Criando o YAML com as regras da DAG#
Basta criar o arquivo .yaml
dentro da pasta includes/dynamic_generator/configs
dag_id: "dag_z_simple_bash"
description: "DAG simples com uma tarefa bash gerada dinamicamente"
schedule_interval: "@daily"
imports: |
from airflow.operators.bash import BashOperator
tags:
- "dynamic"
- "bash"
- "simple"
tasks:
- task_id: "hello_world"
type: "bash"
bash_command: "echo 'Hello World from Dynamic DAG!'"
Depois basta rodar o comando python includes/dynamic_generator/generate_dag.py
e sera gerado um arquivo dentro da pasta de dags