DataSkills Hub

Apache Kafka

Apache Kafka es una plataforma de event streaming distribuida para capturar, procesar y distribuir flujos de datos en tiempo real desde múltiples fuentes hacia NiFi, Spark, Trino y otras herramientas del stack.

#Getting Started

#Conectarse al cluster Kafka

Kafka está disponible en <tu-url> para clientes internos. Usar la utilidad kafka-console-producer.sh o kafka-console-consumer.sh.

Probar conexión con kafka-broker-api-versions.sh

kafka-broker-api-versions.sh \
  --bootstrap-server <tu-url>

#Parámetros de conexión esenciales

Parámetro Valor Descripción
bootstrap.servers <tu-url> Brokers del cluster
group.id mi-consumer-group ID del grupo consumidor
auto.offset.reset earliest o latest Posición inicial de lectura
key.serializer StringSerializer Serialización de clave
value.serializer StringSerializer Serialización de valor

#Gestión de Topics

#Listar topics existentes

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --list

#Crear un topic nuevo

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --create \
  --topic eventos-web \
  --partitions 3 \
  --replication-factor 2 \
  --config retention.ms=86400000

#Describir configuración de un topic

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --describe \
  --topic eventos-web

#Aumentar particiones de un topic

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --alter \
  --topic eventos-web \
  --partitions 5

#Eliminar un topic

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --delete \
  --topic eventos-web

#Productores y Consumidores

#Consumidor CLI básico

Consumir desde el inicio del topic

kafka-console-consumer.sh \
  --bootstrap-server <tu-url> \
  --topic eventos-web \
  --from-beginning

Consumidor en tiempo real (últimos mensajes)

kafka-console-consumer.sh \
  --bootstrap-server <tu-url> \
  --topic eventos-web

#Productor CLI básico

Producir mensajes interactivamente

kafka-console-producer.sh \
  --bootstrap-server <tu-url> \
  --topic eventos-web \
  --property "parse.key=true" \
  --property "key.separator=:"

Ejemplo de mensaje (en la terminal del productor):

usuario-123:{"evento":"login","timestamp":"2026-03-14T10:30:00Z","ip":"203.0.113.1"}
usuario-456:{"evento":"compra","timestamp":"2026-03-14T10:31:15Z","total":99.99}

#Productor con archivo

kafka-console-producer.sh \
  --bootstrap-server <tu-url> \
  --topic eventos-web < eventos.jsonl

#Gestión de Consumer Groups

#Listar consumer groups

kafka-consumer-groups.sh \
  --bootstrap-server <tu-url> \
  --list

#Describir un consumer group

kafka-consumer-groups.sh \
  --bootstrap-server <tu-url> \
  --group mi-app-group \
  --describe

Salida muestra: TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID

#Resetear offset de un consumer group

Resetear al inicio (releer todo)

kafka-consumer-groups.sh \
  --bootstrap-server <tu-url> \
  --group mi-app-group \
  --topic eventos-web \
  --reset-offsets \
  --to-earliest \
  --execute

Resetear a una fecha específica

kafka-consumer-groups.sh \
  --bootstrap-server <tu-url> \
  --group mi-app-group \
  --topic eventos-web \
  --reset-offsets \
  --to-datetime 2026-03-14T00:00:00.000 \
  --execute

#Topics de Ejemplo

#Topics de ingesta en producción

Topic Descripción Particiones Retention
raw-eventos-web Eventos del sitio web (clics) 6 7 días
raw-api-calls Logs de API de aplicaciones 4 3 días
raw-mobile-app Eventos de aplicación móvil 8 14 días
raw-cdr-telefonico CDR de llamadas telefónicas 12 30 días

#Integración con el Stack

#Kafka → Spark Streaming

Leer desde Kafka en Spark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder \
    .appName("KafkaReader") \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<tu-url>") \
    .option("subscribe", "raw-eventos-web") \
    .option("startingOffsets", "earliest") \
    .load()

# El valor viene como bytes, decodificar
df_parsed = df.selectExpr("CAST(value AS STRING)").alias("json")
df_parsed.writeStream.format("console").start().awaitTermination()

#Kafka → NiFi (ingesta)

En NiFi, usar procesador ConsumeKafka_2_6:

  • Broker list: <tu-url>
  • Topic name: (ej. raw-eventos-web)
  • Group ID: nifi-consumer-group

#Kafka → Trino (query en tiempo real)

En Trino, usar catálogo Kafka para consultar topics en vivo

SHOW CATALOGS;  -- Verificar que 'kafka' existe
SHOW TABLES FROM kafka.default;
SELECT * FROM kafka.default."raw-eventos-web" LIMIT 10;

#Monitoreo y Troubleshooting

#Ver offset lag de un consumer group

kafka-consumer-groups.sh \
  --bootstrap-server <tu-url> \
  --group mi-app-group \
  --describe

Si LAG es alto → consumidor retrasado. Revisar logs de la aplicación.

#Monitoreo de particiones y replicas

kafka-topics.sh \
  --bootstrap-server <tu-url> \
  --describe \
  --under-replicated-partitions

Si hay output → algunas réplicas están fuera de sincronía. Contactar a DevOps/Infra.

#Ver logs de un broker

En el servidor del broker, revisar logs de Kafka:

# Ubicación típica
tail -f /var/log/kafka/server.log

Buscar errores comunes: ERROR, FATAL, OutOfMemory

#Topic vacío o sin mensajes nuevos

# Verificar último offset
kafka-run-class.sh kafka.tools.JmxTool \
  --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

#Mejores Prácticas

Práctica Descripción
Naming convention Topics en snake_case, prefijo raw- para ingesta
Particiones Mínimo 3, escalar según throughput esperado
Replication factor Siempre 2 o 3 en producción (tolerancia a fallos)
Retention policy Definir según SLA; S3 para archive de larga duración
Compresión Habilitar compression.type=snappy en producers
Monitoring Alertas en LAG > 10000 offset para consumer groups
Credenciales Usar SASL/SCRAM en producción (no plaintext)
Topics no críticos 1 partición + retention corta (3-7 días)

#Also see