Apache NiFi

Apache NiFi es la plataforma de ingesta de datos del Ingestion Layer de TIGO. Permite construir pipelines de datos a través de una interfaz visual drag-and-drop, soportando cientos de conectores hacia diversas fuentes y destinos.

#Getting Started

#Acceder a NiFi (TIGO)

Abrir la interfaz web

http://nifi.tigo.internal:8080/nifi/

Acceder a NiFi Registry (versionado de flows)

http://nifi-registry.tigo.internal:18080/nifi-registry/

#Atajos de teclado en la UI

Atajo Acción
Ctrl + A Seleccionar todos los componentes
Ctrl + C Copiar selección
Ctrl + V Pegar
Delete / Backspace Eliminar selección
Ctrl + Z Deshacer
Ctrl + Shift + Z Rehacer
Ctrl + R Refrescar (actualizar estadísticas UI)
Escape Cancelar selección / Cerrar diálogos
Ctrl + Click Añadir a selección
Shift + Drag Seleccionar área (multiselect)
G Group (agrupar selección en Process Group)

#Processors más usados en TIGO

#Processors de Ingesta y Extracción

Processor Uso en TIGO
GetFile / TailFile Leer archivos locales / logs en tiempo real
ListFile / FetchFile Listar y leer archivos de directorios
GetSFTP / FetchSFTP Recoger archivos de SFTP (fuentes externas)
QueryDatabaseTable Extracción incremental de bases de datos
ExecuteSQL Query SQL personalizado sobre cualquier BD
ConsumeKafka Consumir mensajes de tópicos Kafka
GetHTTP / InvokeHTTP Consumir APIs REST
ListS3 / FetchS3Object Listar y descargar objetos de Ceph/S3

#Processors de Transformación y Enriquecimiento

Processor Uso en TIGO
JoltTransformJSON Transformar estructura JSON con JOLT spec
ReplaceText Reemplazar/transformar contenido de FlowFiles
ConvertRecord Convertir entre formatos (CSV→Parquet, JSON→Avro)
SplitRecord Dividir un batch en registros individuales
MergeRecord Agrupar registros en batches para eficiencia
UpdateAttribute Agregar / modificar atributos del FlowFile
EvaluateJsonPath Extraer campos JSON a atributos
ExecuteGroovyScript Lógica personalizada en Groovy

#Processors de Destino (Output)

Processor Uso en TIGO
PutS3Object Guardar datos en Ceph/S3 (Data Lake)
PutDatabaseRecord Escribir en PostgreSQL / bases transaccionales
PublishKafkaRecord Publicar en tópicos Kafka
PutFile Escribir en sistema de archivos local
PutSFTP Enviar archivos a SFTP externo
InvokeHTTP Llamar APIs REST (POST/PUT de datos)

#Configuración de Processors

#QueryDatabaseTable (extracción incremental)

Configuración recomendada para TIGO

Database Connection Pooling Service: DBCP_PostgreSQL_TIGO
Database Type: PostgreSQL
Table Name: public.ventas
Columns to Return: id, fecha, monto, pais
Maximum-value Columns: fecha, id
Initial Max Value for 'fecha': 2024-01-01
Fetch Size: 10000
Max Rows Per Flow File: 50000
Output Batch Size: 10

#PutS3Object (escribir en Ceph)

Configuración para Data Lake TIGO

Bucket: tigo-data-raw
Object Key: etl/${area}/${fuente}/${now():format('yyyy/MM/dd')}/${filename}
AWS Credentials Provider Service: AWSCredentials_Ceph_TIGO
Endpoint Override URL: https://ceph-rgw.tigo.internal:7480
Region: us-east-1
Content Type: application/octet-stream
Server Side Encryption: None
Storage Class: STANDARD

#ConvertRecord (cambiar formato)

Configuración CSV → Parquet

Record Reader:    CSVReader     (con schema inferido o explícito)
Record Writer:    ParquetRecordSetWriter

Configuración JSON → Parquet

Record Reader:    JsonTreeReader
Record Writer:    ParquetRecordSetWriter

#FlowFiles y Expresiones

#Atributos estándar de FlowFiles

Atributo Descripción
uuid Identificador único del FlowFile
filename Nombre del archivo
path Ruta de origen
fileSize Tamaño en bytes
entryDate Fecha de entrada al flow
lastQueuedDate Última vez que entró a una queue
mime.type Tipo MIME del contenido

#NiFi Expression Language

Acceder a un atributo

${filename}
${area}

Manipulación de cadenas

${filename:toLower()}
${filename:toUpper()}
${filename:substring(0, 8)}
${filename:replace('.csv', '.parquet')}
${filename:startsWith('ventas_')}

Trabajar con fechas

${now():format('yyyy-MM-dd')}
${now():format('yyyy/MM/dd')}
${entryDate:format('HH:mm:ss')}
${literal(2026):minus(1):format('yyyy')}

Construir paths dinámicos para Ceph

etl/${area:toLower()}/${fuente:toLower()}/${now():format('yyyy/MM/dd')}/${filename}

Operaciones matemáticas

${fileSize:divide(1048576):toDecimal():format('0.00')} MB
${count:plus(1)}

Condiciones (para rutas de enrutamiento)

${mime.type:equals('application/json')}
${fileSize:gt(1000000)}
${filename:contains('error')}

#REST API de NiFi

#Endpoints principales

Método Endpoint Descripción
GET /nifi-api/flow/status Estado general del cluster
GET /nifi-api/process-groups/{id}/processors Listar processors de un grupo
PUT /nifi-api/processors/{id}/run-status Iniciar/detener un processor
GET /nifi-api/processors/{id} Ver configuración de un processor
PUT /nifi-api/processors/{id} Actualizar configuración
GET /nifi-api/process-groups/{id}/status Estado de un Process Group
PUT /nifi-api/flow/process-groups/{id} Iniciar/detener un Process Group

#Iniciar/Detener un Process Group via API

Iniciar un Process Group

$ curl -X PUT \
    -H "Content-Type: application/json" \
    -d '{"id":"<process-group-id>","state":"RUNNING"}' \
    http://nifi.tigo.internal:8080/nifi-api/flow/process-groups/<id>

Detener un Process Group

$ curl -X PUT \
    -H "Content-Type: application/json" \
    -d '{"id":"<process-group-id>","state":"STOPPED"}' \
    http://nifi.tigo.internal:8080/nifi-api/flow/process-groups/<id>

Consultar el estado de un processor

$ curl -s http://nifi.tigo.internal:8080/nifi-api/processors/<id> \
    | python3 -m json.tool | grep '"runStatus"'

#Monitoreo y Troubleshooting

#Indicadores de la UI

Color / Indicador Significado
▶ Verde Processor corriendo
■ Rojo Processor detenido
⚠ Amarillo Processor con errores o advertencias
🔵 Azul (queue) FlowFiles esperando en la cola
🔴 Rojo (queue) Back pressure: cola al límite de tamaño

#Ver y gestionar FlowFiles en cola

Ver datos en una connection (cola)

  1. Click derecho sobre la flecha (connection)
  2. List queue → para ver FlowFiles
  3. Click en un FlowFile → View → para inspeccionar contenido y atributos

Vaciar una cola (eliminar FlowFiles)

  1. Click derecho sobre la connection
  2. Empty queue → confirmar

#Revisar logs y errores

# En el servidor NiFi
$ tail -f /opt/nifi/logs/nifi-app.log
$ grep -i "ERROR\|WARN" /opt/nifi/logs/nifi-app.log | tail -50

# Ver logs de un processor específico
# En la UI: Click en processor → View Status History → Ver counters y errores

#Mejores prácticas TIGO

Práctica Descripción
Versionar flows en NiFi Registry Siempre guardar versión antes de cambios grandes
Usar Process Groups por área Organizar por dominio: Comercial, Red, Finanzas, etc
Configurar back pressure Evitar que se llenen las colas sin control
Usar Record-based processors Más eficiente que trabajar FlowFile por FlowFile
Externalizar configuraciones Variables y Parameter Contexts para configs
Monitorear queues en producción Alerta si una cola supera N FlowFiles

#Also see