Data Team MICApache Spark es un motor distribuido de procesamiento para ejecutar transformaciones ETL, análisis batch y streaming sobre datos en Ceph/S3, PostgreSQL e Iceberg, escribiendo resultados en Hive, Iceberg y sistemas externos.
Spark está disponible en <tu-url> con Hadoop y acceso a Ceph/S3. Conectar vía spark-submit o PySpark interactivo.
Iniciar sesión PySpark interactivo
pyspark \
--master <spark-master-url> \
--deploy-mode client \
--driver-memory 4g \
--executor-memory 4g \
--num-executors 4
Verificar versión y contexto
print(spark.version) # Versión de Spark
print(spark.sparkContext.appName) # Nombre de la app
sc.getConf().getAll() # Configuración actual
| Parámetro | Valor ejemplo | Descripción |
|---|---|---|
--master |
<spark-master-url> |
Master de Spark |
--deploy-mode |
client o cluster |
Modo de deployment |
--driver-memory |
4g, 8g |
Memoria del driver |
--executor-memory |
4g, 8g |
Memoria por executor |
--num-executors |
4, 8, 16 |
Número de ejecutores |
--packages |
org.apache.hadoop:hadoop-aws:3.3.0 |
Dependencias Maven |
# Leer tabla Parquet desde Ceph
df = spark.read \
.format("parquet") \
.load("s3a://data-analytics/raw/eventos-web/")
df.show(5)
df.printSchema()
# Tabla registrada en Hive
df = spark.sql("SELECT * FROM hive.analytics.ventas LIMIT 100")
# O usar read.table
df = spark.read.table("analytics.ventas")
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://<tu-url>/crm") \
.option("dbtable", "clientes") \
.option("user", "spark_user") \
.option("password", "****") \
.option("driver", "org.postgresql.Driver") \
.option("numPartitions", "4") \
.load()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<tu-url>") \
.option("subscribe", "raw-eventos-web") \
.option("startingOffsets", "earliest") \
.load()
# Parsear JSON del value
from pyspark.sql.functions import from_json, col, schema_of_json
schema = "evento STRING, timestamp STRING, usuario_id LONG"
df_parsed = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Registrar tabla temporal
df.createOrReplaceTempView("eventos_temp")
# Query SQL
result = spark.sql("""
SELECT
usuario_id,
COUNT(*) AS num_eventos,
MAX(timestamp) AS ultimo_evento
FROM eventos_temp
WHERE DATE(timestamp) = '2026-03-14'
GROUP BY usuario_id
ORDER BY num_eventos DESC
""")
result.show(10)
Filtrar, seleccionar, agregar
from pyspark.sql.functions import col, count, sum as spark_sum, date_trunc
# Filtro
df_filtered = df.filter(col("pais") == "CO")
# Select y rename
df_sel = df.select(
col("cliente_id").alias("id"),
col("monto").cast("double").alias("total")
)
# Aggregation
df_agg = df.groupBy("pais", "categoria") \
.agg(
count("*").alias("num_transacciones"),
spark_sum("monto").alias("total_monto")
)
# Window function (ranking)
from pyspark.sql.functions import row_number, dense_rank
from pyspark.sql.window import Window
window_spec = Window.partitionBy("pais").orderBy(col("monto").desc())
df_ranked = df.withColumn(
"ranking",
row_number().over(window_spec)
).filter(col("ranking") <= 10)
df_clientes = spark.read.table("crm.clientes")
df_ventas = spark.read.table("analytics.ventas")
df_joined = df_ventas.join(
df_clientes,
on=col("ventas.cliente_id") == col("clientes.id"),
how="inner"
).select(
"ventas.*",
col("clientes.nombre").alias("nombre_cliente"),
col("clientes.pais").alias("pais_cliente")
)
# Sobrescribir tabla existente
df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "s3a://data-analytics/processed/mi_tabla/") \
.saveAsTable("analytics.mi_tabla")
# O en Iceberg (v2 con ACID)
df.write \
.format("iceberg") \
.mode("overwrite") \
.save("analytics.mi_tabla_iceberg")
df.write \
.format("parquet") \
.mode("overwrite") \
.partitionBy("fecha", "pais") \
.option("path", "s3a://data-analytics/processed/ventas/") \
.saveAsTable("analytics.ventas_particionado")
# APPEND: agregar filas nuevas (mismo esquema)
df_incremental.write \
.format("parquet") \
.mode("append") \
.option("path", "s3a://data-analytics/raw/eventos-diarios/") \
.saveAsTable("raw.eventos")
# OVERWRITE: reemplazar todo (cuidado con datos históricos)
df_completo.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "s3a://data-analytics/processed/snapshot/") \
.saveAsTable("analytics.snapshot")
df.write \
.format("jdbc") \
.mode("append") \
.option("url", "jdbc:postgresql://<tu-url>/reports") \
.option("dbtable", "resumen_diario") \
.option("user", "spark_user") \
.option("password", "****") \
.option("driver", "org.postgresql.Driver") \
.save()
from pyspark.sql.functions import broadcast
df_large = spark.read.table("analytics.ventas") # Millones de filas
df_small = spark.read.table("crm.territorios") # Algunos cientos
# Broadcast df_small (entra en memoria de cada executor)
df_result = df_large.join(
broadcast(df_small),
on="territorio_id"
)
# Crear tabla con bucketing (optimiza joins posteriores)
df.write \
.format("parquet") \
.mode("overwrite") \
.bucketBy(10, "cliente_id") \
.sortBy("cliente_id") \
.option("path", "s3a://data-analytics/processed/ventas_bucketed/") \
.saveAsTable("analytics.ventas_bucketed")
# ❌ MALO: procesa todo, filtra al final
df.groupBy("pais").agg(sum("monto")).filter(col("pais") == "CO")
# ✅ BUENO: filtra primero, menos datos en groupBy
df.filter(col("pais") == "CO").groupBy("pais").agg(sum("monto"))
# Dataframe que se usará múltiples veces
df_prep = spark.read.table("analytics.datos_limpios")
df_prep.persist(StorageLevel.MEMORY_AND_DISK)
# Usar df_prep varias veces
result1 = df_prep.groupBy("fecha").agg(count("*"))
result2 = df_prep.filter(col("valor") > 100)
df_prep.unpersist() # Liberar memoria cuando termines
# Ver plan de ejecución
df.explain()
# Plan completo con estadísticas
df.explain(extended=True)
# Verificar si está bien optimizado
df.explain(mode="cost")
# job_etl_ventas.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as spark_sum
# Iniciar sesión
spark = SparkSession.builder \
.appName("ETL_Ventas_Diaria") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
# Leer
df_raw = spark.read.table("raw.eventos_web")
# Transformar
df_processed = df_raw \
.filter(col("fecha") == "2026-03-14") \
.groupBy("usuario_id", "canal") \
.agg(spark_sum("monto").alias("total"))
# Escribir
df_processed.write \
.format("iceberg") \
.mode("overwrite") \
.option("path", "s3a://data-analytics/processed/ventas_canal/") \
.saveAsTable("analytics.ventas_canal")
print(f"Procesadas {df_processed.count()} filas")
spark.stop()
spark-submit \
--master <spark-master-url> \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--num-executors 8 \
--conf spark.sql.shuffle.partitions=200 \
job_etl_ventas.py
Ver ejecución en tiempo real
# Acceder al Spark UI
<tu-url>/
# Ver logs del driver
tail -f /var/log/spark/driver.log
# Ver logs de ejecutores
tail -f /var/log/spark/executor.log
| Problema | Causa | Solución |
|---|---|---|
OutOfMemory en driver |
Data aggregation grande | Aumentar --driver-memory |
OutOfMemory en executor |
Shuffle sin espacio | Aumentar --executor-memory |
| Slow job | Falta paralelismo | Aumentar --num-executors |
| Skewed partitions | Un executor procesa mucho más | Repartition antes de grupar |
| Query timeout | Datos muy grandes o filtro incorrecto | Filtrar antes de agg, usar sample |
| Práctica | Descripción |
|---|---|
| Path convention | s3a://data-analytics/<layer>/<tabla>/ |
| Particionado | Siempre por fecha si es data histórica |
| Formato almacenamiento | Parquet o Iceberg; nunca CSV en producción |
| Job naming | etl-<nombre>-<frecuencia> ej. etl-ventas-diaria |
| Logs y monitoreo | Loguear hitos: inicio, filas procesadas, fin |
| Errores | Capturar y notificar en alertas (Slack/email) |
| Idempotencia | Jobs deben poder re-ejecutarse sin duplicar |
| Performance tuning | Medir con explain, usar cache en reutilizaciones |