Data Team TigoApache Airflow es la plataforma de orquestación de workflows del Processing Layer de TIGO. Permite definir, programar y monitorear pipelines de datos como DAGs en Python.
Acceder a la UI web
# Abrir en el navegador
http://airflow.tigo.internal:8080
# Usuario y contraseña provistos por el equipo de plataforma
Acceder vía CLI (desde el servidor)
$ airflow version
$ airflow info
| Comando | Descripción |
|---|---|
airflow dags list |
Listar todos los DAGs |
airflow dags trigger <dag_id> |
Disparar un DAG manualmente |
airflow dags pause <dag_id> |
Pausar un DAG |
airflow dags unpause <dag_id> |
Reactivar un DAG pausado |
airflow dags state <dag_id> <date> |
Ver estado de un DAG run |
airflow tasks list <dag_id> |
Listar las tareas de un DAG |
airflow tasks run <dag_id> <task_id> <date> |
Ejecutar una tarea |
airflow tasks state <dag_id> <task_id> <date> |
Ver estado de una tarea |
Listar variables
$ airflow variables list
Leer una variable
$ airflow variables get <variable_name>
Crear/actualizar una variable
$ airflow variables set <variable_name> <value>
Listar conexiones configuradas
$ airflow connections list
Probar una conexión
$ airflow connections test <conn_id>
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['[email protected]'],
}
with DAG(
dag_id='mi_pipeline_tigo',
default_args=default_args,
description='Pipeline de ejemplo TIGO',
schedule_interval='0 6 * * *', # Diario a las 6am
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['tigo', 'data-engineering'],
) as dag:
def extraer_datos():
print("Extrayendo datos...")
def transformar_datos():
print("Transformando datos...")
tarea_extraer = PythonOperator(
task_id='extraer',
python_callable=extraer_datos,
)
tarea_transformar = PythonOperator(
task_id='transformar',
python_callable=transformar_datos,
)
tarea_extraer >> tarea_transformar
| Operador | Uso principal |
|---|---|
PythonOperator |
Ejecutar funciones Python |
BashOperator |
Ejecutar comandos shell |
PostgresOperator |
Queries sobre PostgreSQL / Trino |
S3KeySensor |
Esperar un archivo en S3/Ceph |
HttpSensor |
Esperar disponibilidad de un endpoint |
TriggerDagRunOperator |
Encadenar DAGs |
BranchPythonOperator |
Flujos condicionales |
DummyOperator |
Marcadores de inicio/fin |
| Expresión | Descripción |
|---|---|
@once |
Solo una vez |
@daily |
Una vez al día (medianoche) |
@hourly |
Cada hora |
@weekly |
Cada semana |
@monthly |
Cada mes |
0 6 * * * |
Diario a las 6am |
0 */4 * * * |
Cada 4 horas |
30 8 * * 1-5 |
Lun-Vie a las 8:30am |
Push de valor desde una tarea
def generar_datos(**context):
tabla = 'ventas_2026'
# Pushear un valor
context['ti'].xcom_push(key='nombre_tabla', value=tabla)
return tabla # return también hace push automático
Pull de valor en otra tarea
def procesar_datos(**context):
ti = context['ti']
tabla = ti.xcom_pull(
task_ids='generar',
key='nombre_tabla'
)
print(f"Procesando tabla: {tabla}")
Dependencia secuencial
tarea_a >> tarea_b >> tarea_c
Dependencias paralelas
tarea_inicio >> [tarea_b, tarea_c] >> tarea_fin
Usando set_upstream / set_downstream
tarea_b.set_upstream(tarea_a)
tarea_b.set_downstream(tarea_c)
from airflow.providers.postgres.operators.postgres import PostgresOperator
query_task = PostgresOperator(
task_id='ejecutar_query',
postgres_conn_id='trino_tigo', # conn_id definido en Airflow Connections
sql="""
SELECT country, SUM(revenue) as total
FROM analytics.ventas
WHERE fecha >= CURRENT_DATE - INTERVAL '7' DAY
GROUP BY country
""",
)
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Esperar un archivo
esperar_archivo = S3KeySensor(
task_id='esperar_archivo_ceph',
bucket_name='tigo-data-raw',
bucket_key='etl/ventas/{{ ds }}/data.parquet',
aws_conn_id='ceph_tigo',
timeout=3600,
poke_interval=60,
)
from airflow.providers.http.operators.http import SimpleHttpOperator
trigger_nifi = SimpleHttpOperator(
task_id='trigger_nifi_flow',
http_conn_id='nifi_tigo',
endpoint='/nifi-api/processors/process-group/run',
method='POST',
headers={'Content-Type': 'application/json'},
)
# Listar runs de un DAG específico
$ airflow dags list-runs -d nombre_dag
# Ver logs de una tarea
$ airflow tasks logs <dag_id> <task_id> <execution_date>
| Estado | Descripción |
|---|---|
success |
Completado exitosamente |
failed |
Falló (puede tener retries configurados) |
running |
En ejecución actualmente |
queued |
En cola esperando worker |
skipped |
Omitido (branch conditions) |
up_for_retry |
Esperando reintento |
upstream_failed |
Tarea upstream falló |
Limpiar el estado de una tarea (para re-ejecutar)
$ airflow tasks clear <dag_id> \
--task-regex <task_id> \
--start-date 2026-03-01 \
--end-date 2026-03-11
Marcar una tarea como exitosa manualmente
$ airflow tasks mark-success <dag_id> <task_id> <execution_date>
Re-ejecutar un DAG run específico
$ airflow dags backfill <dag_id> \
--start-date 2026-03-01 \
--end-date 2026-03-07
# Nomenclatura estándar
dag_id = 'area_fuente_destino_frecuencia'
# Ejemplo: 'comercial_crm_dw_diario'
# Tags obligatorios en TIGO
tags = ['tigo', 'pais', 'area', 'frecuencia']
# Ejemplo: ['tigo', 'co', 'comercial', 'diario']
| Práctica | Descripción |
|---|---|
| Idempotencia | Las tareas deben poder re-ejecutarse sin efectos |
| Atomicidad | Cada tarea hace una sola cosa |
| Timeout por tarea | Siempre definir execution_timeout |
| SLAs | Definir alertas de SLA para pipelines críticos |
| Variables para configuración | No hardcodear hosts, credenciales, fechas |
| Pools para recursos compartidos | Limitar concurrencia con pool='nombre_pool' |
from airflow.models import Variable
# Leer variable de Airflow
env = Variable.get('tigo_environment', default_var='dev')
db_host = Variable.get('trino_host')
s3_bucket = Variable.get('ceph_bucket_raw')