Apache Airflow

Apache Airflow es la plataforma de orquestación de workflows del Processing Layer de TIGO. Permite definir, programar y monitorear pipelines de datos como DAGs en Python.

#Getting Started

#Conectarse a Airflow (TIGO)

Acceder a la UI web

# Abrir en el navegador
http://airflow.tigo.internal:8080
# Usuario y contraseña provistos por el equipo de plataforma

Acceder vía CLI (desde el servidor)

$ airflow version
$ airflow info

#Comandos esenciales CLI

Comando Descripción
airflow dags list Listar todos los DAGs
airflow dags trigger <dag_id> Disparar un DAG manualmente
airflow dags pause <dag_id> Pausar un DAG
airflow dags unpause <dag_id> Reactivar un DAG pausado
airflow dags state <dag_id> <date> Ver estado de un DAG run
airflow tasks list <dag_id> Listar las tareas de un DAG
airflow tasks run <dag_id> <task_id> <date> Ejecutar una tarea
airflow tasks state <dag_id> <task_id> <date> Ver estado de una tarea

#Variables y Conexiones

Listar variables

$ airflow variables list

Leer una variable

$ airflow variables get <variable_name>

Crear/actualizar una variable

$ airflow variables set <variable_name> <value>

Listar conexiones configuradas

$ airflow connections list

Probar una conexión

$ airflow connections test <conn_id>

#DAGs (Directed Acyclic Graphs)

#Estructura básica de un DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['[email protected]'],
}

with DAG(
    dag_id='mi_pipeline_tigo',
    default_args=default_args,
    description='Pipeline de ejemplo TIGO',
    schedule_interval='0 6 * * *',  # Diario a las 6am
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['tigo', 'data-engineering'],
) as dag:

    def extraer_datos():
        print("Extrayendo datos...")

    def transformar_datos():
        print("Transformando datos...")

    tarea_extraer = PythonOperator(
        task_id='extraer',
        python_callable=extraer_datos,
    )

    tarea_transformar = PythonOperator(
        task_id='transformar',
        python_callable=transformar_datos,
    )

    tarea_extraer >> tarea_transformar

#Operadores más usados en TIGO

Operador Uso principal
PythonOperator Ejecutar funciones Python
BashOperator Ejecutar comandos shell
PostgresOperator Queries sobre PostgreSQL / Trino
S3KeySensor Esperar un archivo en S3/Ceph
HttpSensor Esperar disponibilidad de un endpoint
TriggerDagRunOperator Encadenar DAGs
BranchPythonOperator Flujos condicionales
DummyOperator Marcadores de inicio/fin

#Schedule Intervals (Cron)

Expresión Descripción
@once Solo una vez
@daily Una vez al día (medianoche)
@hourly Cada hora
@weekly Cada semana
@monthly Cada mes
0 6 * * * Diario a las 6am
0 */4 * * * Cada 4 horas
30 8 * * 1-5 Lun-Vie a las 8:30am

#XCom y Dependencias

#XCom (Cross-Communication entre tareas)

Push de valor desde una tarea

def generar_datos(**context):
    tabla = 'ventas_2026'
    # Pushear un valor
    context['ti'].xcom_push(key='nombre_tabla', value=tabla)
    return tabla  # return también hace push automático

Pull de valor en otra tarea

def procesar_datos(**context):
    ti = context['ti']
    tabla = ti.xcom_pull(
        task_ids='generar',
        key='nombre_tabla'
    )
    print(f"Procesando tabla: {tabla}")

#Dependencias entre tareas

Dependencia secuencial

tarea_a >> tarea_b >> tarea_c

Dependencias paralelas

tarea_inicio >> [tarea_b, tarea_c] >> tarea_fin

Usando set_upstream / set_downstream

tarea_b.set_upstream(tarea_a)
tarea_b.set_downstream(tarea_c)

#Conexiones con Sistemas TIGO

#Conexión a PostgreSQL / Trino

from airflow.providers.postgres.operators.postgres import PostgresOperator

query_task = PostgresOperator(
    task_id='ejecutar_query',
    postgres_conn_id='trino_tigo',   # conn_id definido en Airflow Connections
    sql="""
        SELECT country, SUM(revenue) as total
        FROM analytics.ventas
        WHERE fecha >= CURRENT_DATE - INTERVAL '7' DAY
        GROUP BY country
    """,
)

#Conexión a Ceph/S3

from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

# Esperar un archivo
esperar_archivo = S3KeySensor(
    task_id='esperar_archivo_ceph',
    bucket_name='tigo-data-raw',
    bucket_key='etl/ventas/{{ ds }}/data.parquet',
    aws_conn_id='ceph_tigo',
    timeout=3600,
    poke_interval=60,
)

#Conexión a NiFi (vía REST API)

from airflow.providers.http.operators.http import SimpleHttpOperator

trigger_nifi = SimpleHttpOperator(
    task_id='trigger_nifi_flow',
    http_conn_id='nifi_tigo',
    endpoint='/nifi-api/processors/process-group/run',
    method='POST',
    headers={'Content-Type': 'application/json'},
)

#Monitoreo y Debugging

#Verificar estado de DAG Runs

# Listar runs de un DAG específico
$ airflow dags list-runs -d nombre_dag

# Ver logs de una tarea
$ airflow tasks logs <dag_id> <task_id> <execution_date>

#Estados de ejecución

Estado Descripción
success Completado exitosamente
failed Falló (puede tener retries configurados)
running En ejecución actualmente
queued En cola esperando worker
skipped Omitido (branch conditions)
up_for_retry Esperando reintento
upstream_failed Tarea upstream falló

#Comandos de limpieza y reprocess

Limpiar el estado de una tarea (para re-ejecutar)

$ airflow tasks clear <dag_id> \
    --task-regex <task_id> \
    --start-date 2026-03-01 \
    --end-date 2026-03-11

Marcar una tarea como exitosa manualmente

$ airflow tasks mark-success <dag_id> <task_id> <execution_date>

Re-ejecutar un DAG run específico

$ airflow dags backfill <dag_id> \
    --start-date 2026-03-01 \
    --end-date 2026-03-07

#Mejores Prácticas TIGO

#Estructura de DAGs recomendada

# Nomenclatura estándar
dag_id = 'area_fuente_destino_frecuencia'
# Ejemplo: 'comercial_crm_dw_diario'

# Tags obligatorios en TIGO
tags = ['tigo', 'pais', 'area', 'frecuencia']
# Ejemplo: ['tigo', 'co', 'comercial', 'diario']

#Patrones recomendados

Práctica Descripción
Idempotencia Las tareas deben poder re-ejecutarse sin efectos
Atomicidad Cada tarea hace una sola cosa
Timeout por tarea Siempre definir execution_timeout
SLAs Definir alertas de SLA para pipelines críticos
Variables para configuración No hardcodear hosts, credenciales, fechas
Pools para recursos compartidos Limitar concurrencia con pool='nombre_pool'

#Variables de entorno útiles

from airflow.models import Variable

# Leer variable de Airflow
env = Variable.get('tigo_environment', default_var='dev')
db_host = Variable.get('trino_host')
s3_bucket = Variable.get('ceph_bucket_raw')

#Also see