Data Team MICApache Kafka es una plataforma de event streaming distribuida para capturar, procesar y distribuir flujos de datos en tiempo real desde múltiples fuentes hacia NiFi, Spark, Trino y otras herramientas del stack.
Kafka está disponible en <tu-url> para clientes internos. Usar la utilidad kafka-console-producer.sh o kafka-console-consumer.sh.
Probar conexión con kafka-broker-api-versions.sh
kafka-broker-api-versions.sh \
--bootstrap-server <tu-url>
| Parámetro | Valor | Descripción |
|---|---|---|
bootstrap.servers |
<tu-url> |
Brokers del cluster |
group.id |
mi-consumer-group |
ID del grupo consumidor |
auto.offset.reset |
earliest o latest |
Posición inicial de lectura |
key.serializer |
StringSerializer |
Serialización de clave |
value.serializer |
StringSerializer |
Serialización de valor |
kafka-topics.sh \
--bootstrap-server <tu-url> \
--list
kafka-topics.sh \
--bootstrap-server <tu-url> \
--create \
--topic eventos-web \
--partitions 3 \
--replication-factor 2 \
--config retention.ms=86400000
kafka-topics.sh \
--bootstrap-server <tu-url> \
--describe \
--topic eventos-web
kafka-topics.sh \
--bootstrap-server <tu-url> \
--alter \
--topic eventos-web \
--partitions 5
kafka-topics.sh \
--bootstrap-server <tu-url> \
--delete \
--topic eventos-web
Consumir desde el inicio del topic
kafka-console-consumer.sh \
--bootstrap-server <tu-url> \
--topic eventos-web \
--from-beginning
Consumidor en tiempo real (últimos mensajes)
kafka-console-consumer.sh \
--bootstrap-server <tu-url> \
--topic eventos-web
Producir mensajes interactivamente
kafka-console-producer.sh \
--bootstrap-server <tu-url> \
--topic eventos-web \
--property "parse.key=true" \
--property "key.separator=:"
Ejemplo de mensaje (en la terminal del productor):
usuario-123:{"evento":"login","timestamp":"2026-03-14T10:30:00Z","ip":"203.0.113.1"}
usuario-456:{"evento":"compra","timestamp":"2026-03-14T10:31:15Z","total":99.99}
kafka-console-producer.sh \
--bootstrap-server <tu-url> \
--topic eventos-web < eventos.jsonl
kafka-consumer-groups.sh \
--bootstrap-server <tu-url> \
--list
kafka-consumer-groups.sh \
--bootstrap-server <tu-url> \
--group mi-app-group \
--describe
Salida muestra: TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID
Resetear al inicio (releer todo)
kafka-consumer-groups.sh \
--bootstrap-server <tu-url> \
--group mi-app-group \
--topic eventos-web \
--reset-offsets \
--to-earliest \
--execute
Resetear a una fecha específica
kafka-consumer-groups.sh \
--bootstrap-server <tu-url> \
--group mi-app-group \
--topic eventos-web \
--reset-offsets \
--to-datetime 2026-03-14T00:00:00.000 \
--execute
| Topic | Descripción | Particiones | Retention |
|---|---|---|---|
raw-eventos-web |
Eventos del sitio web (clics) | 6 | 7 días |
raw-api-calls |
Logs de API de aplicaciones | 4 | 3 días |
raw-mobile-app |
Eventos de aplicación móvil | 8 | 14 días |
raw-cdr-telefonico |
CDR de llamadas telefónicas | 12 | 30 días |
Leer desde Kafka en Spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder \
.appName("KafkaReader") \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<tu-url>") \
.option("subscribe", "raw-eventos-web") \
.option("startingOffsets", "earliest") \
.load()
# El valor viene como bytes, decodificar
df_parsed = df.selectExpr("CAST(value AS STRING)").alias("json")
df_parsed.writeStream.format("console").start().awaitTermination()
En NiFi, usar procesador ConsumeKafka_2_6:
<tu-url>raw-eventos-web)nifi-consumer-groupEn Trino, usar catálogo Kafka para consultar topics en vivo
SHOW CATALOGS; -- Verificar que 'kafka' existe
SHOW TABLES FROM kafka.default;
SELECT * FROM kafka.default."raw-eventos-web" LIMIT 10;
kafka-consumer-groups.sh \
--bootstrap-server <tu-url> \
--group mi-app-group \
--describe
Si LAG es alto → consumidor retrasado. Revisar logs de la aplicación.
kafka-topics.sh \
--bootstrap-server <tu-url> \
--describe \
--under-replicated-partitions
Si hay output → algunas réplicas están fuera de sincronía. Contactar a DevOps/Infra.
En el servidor del broker, revisar logs de Kafka:
# Ubicación típica
tail -f /var/log/kafka/server.log
Buscar errores comunes: ERROR, FATAL, OutOfMemory
# Verificar último offset
kafka-run-class.sh kafka.tools.JmxTool \
--object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
| Práctica | Descripción |
|---|---|
| Naming convention | Topics en snake_case, prefijo raw- para ingesta |
| Particiones | Mínimo 3, escalar según throughput esperado |
| Replication factor | Siempre 2 o 3 en producción (tolerancia a fallos) |
| Retention policy | Definir según SLA; S3 para archive de larga duración |
| Compresión | Habilitar compression.type=snappy en producers |
| Monitoring | Alertas en LAG > 10000 offset para consumer groups |
| Credenciales | Usar SASL/SCRAM en producción (no plaintext) |
| Topics no críticos | 1 partición + retention corta (3-7 días) |