Data Team TigoApache NiFi es la plataforma de ingesta de datos del Ingestion Layer de TIGO. Permite construir pipelines de datos a través de una interfaz visual drag-and-drop, soportando cientos de conectores hacia diversas fuentes y destinos.
Abrir la interfaz web
http://nifi.tigo.internal:8080/nifi/
Acceder a NiFi Registry (versionado de flows)
http://nifi-registry.tigo.internal:18080/nifi-registry/
| Atajo | Acción |
|---|---|
Ctrl + A |
Seleccionar todos los componentes |
Ctrl + C |
Copiar selección |
Ctrl + V |
Pegar |
Delete / Backspace |
Eliminar selección |
Ctrl + Z |
Deshacer |
Ctrl + Shift + Z |
Rehacer |
Ctrl + R |
Refrescar (actualizar estadísticas UI) |
Escape |
Cancelar selección / Cerrar diálogos |
Ctrl + Click |
Añadir a selección |
Shift + Drag |
Seleccionar área (multiselect) |
G |
Group (agrupar selección en Process Group) |
| Processor | Uso en TIGO |
|---|---|
GetFile / TailFile |
Leer archivos locales / logs en tiempo real |
ListFile / FetchFile |
Listar y leer archivos de directorios |
GetSFTP / FetchSFTP |
Recoger archivos de SFTP (fuentes externas) |
QueryDatabaseTable |
Extracción incremental de bases de datos |
ExecuteSQL |
Query SQL personalizado sobre cualquier BD |
ConsumeKafka |
Consumir mensajes de tópicos Kafka |
GetHTTP / InvokeHTTP |
Consumir APIs REST |
ListS3 / FetchS3Object |
Listar y descargar objetos de Ceph/S3 |
| Processor | Uso en TIGO |
|---|---|
JoltTransformJSON |
Transformar estructura JSON con JOLT spec |
ReplaceText |
Reemplazar/transformar contenido de FlowFiles |
ConvertRecord |
Convertir entre formatos (CSV→Parquet, JSON→Avro) |
SplitRecord |
Dividir un batch en registros individuales |
MergeRecord |
Agrupar registros en batches para eficiencia |
UpdateAttribute |
Agregar / modificar atributos del FlowFile |
EvaluateJsonPath |
Extraer campos JSON a atributos |
ExecuteGroovyScript |
Lógica personalizada en Groovy |
| Processor | Uso en TIGO |
|---|---|
PutS3Object |
Guardar datos en Ceph/S3 (Data Lake) |
PutDatabaseRecord |
Escribir en PostgreSQL / bases transaccionales |
PublishKafkaRecord |
Publicar en tópicos Kafka |
PutFile |
Escribir en sistema de archivos local |
PutSFTP |
Enviar archivos a SFTP externo |
InvokeHTTP |
Llamar APIs REST (POST/PUT de datos) |
Configuración recomendada para TIGO
Database Connection Pooling Service: DBCP_PostgreSQL_TIGO
Database Type: PostgreSQL
Table Name: public.ventas
Columns to Return: id, fecha, monto, pais
Maximum-value Columns: fecha, id
Initial Max Value for 'fecha': 2024-01-01
Fetch Size: 10000
Max Rows Per Flow File: 50000
Output Batch Size: 10
Configuración para Data Lake TIGO
Bucket: tigo-data-raw
Object Key: etl/${area}/${fuente}/${now():format('yyyy/MM/dd')}/${filename}
AWS Credentials Provider Service: AWSCredentials_Ceph_TIGO
Endpoint Override URL: https://ceph-rgw.tigo.internal:7480
Region: us-east-1
Content Type: application/octet-stream
Server Side Encryption: None
Storage Class: STANDARD
Configuración CSV → Parquet
Record Reader: CSVReader (con schema inferido o explícito)
Record Writer: ParquetRecordSetWriter
Configuración JSON → Parquet
Record Reader: JsonTreeReader
Record Writer: ParquetRecordSetWriter
| Atributo | Descripción |
|---|---|
uuid |
Identificador único del FlowFile |
filename |
Nombre del archivo |
path |
Ruta de origen |
fileSize |
Tamaño en bytes |
entryDate |
Fecha de entrada al flow |
lastQueuedDate |
Última vez que entró a una queue |
mime.type |
Tipo MIME del contenido |
Acceder a un atributo
${filename}
${area}
Manipulación de cadenas
${filename:toLower()}
${filename:toUpper()}
${filename:substring(0, 8)}
${filename:replace('.csv', '.parquet')}
${filename:startsWith('ventas_')}
Trabajar con fechas
${now():format('yyyy-MM-dd')}
${now():format('yyyy/MM/dd')}
${entryDate:format('HH:mm:ss')}
${literal(2026):minus(1):format('yyyy')}
Construir paths dinámicos para Ceph
etl/${area:toLower()}/${fuente:toLower()}/${now():format('yyyy/MM/dd')}/${filename}
Operaciones matemáticas
${fileSize:divide(1048576):toDecimal():format('0.00')} MB
${count:plus(1)}
Condiciones (para rutas de enrutamiento)
${mime.type:equals('application/json')}
${fileSize:gt(1000000)}
${filename:contains('error')}
| Método | Endpoint | Descripción |
|---|---|---|
GET |
/nifi-api/flow/status |
Estado general del cluster |
GET |
/nifi-api/process-groups/{id}/processors |
Listar processors de un grupo |
PUT |
/nifi-api/processors/{id}/run-status |
Iniciar/detener un processor |
GET |
/nifi-api/processors/{id} |
Ver configuración de un processor |
PUT |
/nifi-api/processors/{id} |
Actualizar configuración |
GET |
/nifi-api/process-groups/{id}/status |
Estado de un Process Group |
PUT |
/nifi-api/flow/process-groups/{id} |
Iniciar/detener un Process Group |
Iniciar un Process Group
$ curl -X PUT \
-H "Content-Type: application/json" \
-d '{"id":"<process-group-id>","state":"RUNNING"}' \
http://nifi.tigo.internal:8080/nifi-api/flow/process-groups/<id>
Detener un Process Group
$ curl -X PUT \
-H "Content-Type: application/json" \
-d '{"id":"<process-group-id>","state":"STOPPED"}' \
http://nifi.tigo.internal:8080/nifi-api/flow/process-groups/<id>
Consultar el estado de un processor
$ curl -s http://nifi.tigo.internal:8080/nifi-api/processors/<id> \
| python3 -m json.tool | grep '"runStatus"'
| Color / Indicador | Significado |
|---|---|
| ▶ Verde | Processor corriendo |
| ■ Rojo | Processor detenido |
| ⚠ Amarillo | Processor con errores o advertencias |
| 🔵 Azul (queue) | FlowFiles esperando en la cola |
| 🔴 Rojo (queue) | Back pressure: cola al límite de tamaño |
Ver datos en una connection (cola)
Vaciar una cola (eliminar FlowFiles)
# En el servidor NiFi
$ tail -f /opt/nifi/logs/nifi-app.log
$ grep -i "ERROR\|WARN" /opt/nifi/logs/nifi-app.log | tail -50
# Ver logs de un processor específico
# En la UI: Click en processor → View Status History → Ver counters y errores
| Práctica | Descripción |
|---|---|
| Versionar flows en NiFi Registry | Siempre guardar versión antes de cambios grandes |
| Usar Process Groups por área | Organizar por dominio: Comercial, Red, Finanzas, etc |
| Configurar back pressure | Evitar que se llenen las colas sin control |
| Usar Record-based processors | Más eficiente que trabajar FlowFile por FlowFile |
| Externalizar configuraciones | Variables y Parameter Contexts para configs |
| Monitorear queues en producción | Alerta si una cola supera N FlowFiles |