DataSkills Hub

Apache Spark

Apache Spark es un motor distribuido de procesamiento para ejecutar transformaciones ETL, análisis batch y streaming sobre datos en Ceph/S3, PostgreSQL e Iceberg, escribiendo resultados en Hive, Iceberg y sistemas externos.

#Getting Started

#Acceder a Spark

Spark está disponible en <tu-url> con Hadoop y acceso a Ceph/S3. Conectar vía spark-submit o PySpark interactivo.

Iniciar sesión PySpark interactivo

pyspark \
  --master <spark-master-url> \
  --deploy-mode client \
  --driver-memory 4g \
  --executor-memory 4g \
  --num-executors 4

Verificar versión y contexto

print(spark.version)           # Versión de Spark
print(spark.sparkContext.appName)  # Nombre de la app
sc.getConf().getAll()          # Configuración actual

#Configuración de SparkSession

Parámetro Valor ejemplo Descripción
--master <spark-master-url> Master de Spark
--deploy-mode client o cluster Modo de deployment
--driver-memory 4g, 8g Memoria del driver
--executor-memory 4g, 8g Memoria por executor
--num-executors 4, 8, 16 Número de ejecutores
--packages org.apache.hadoop:hadoop-aws:3.3.0 Dependencias Maven

#Lectura de Datos

#Leer desde Ceph/S3 (Parquet)

# Leer tabla Parquet desde Ceph
df = spark.read \
    .format("parquet") \
    .load("s3a://data-analytics/raw/eventos-web/")

df.show(5)
df.printSchema()

#Leer desde Hive Metastore

# Tabla registrada en Hive
df = spark.sql("SELECT * FROM hive.analytics.ventas LIMIT 100")

# O usar read.table
df = spark.read.table("analytics.ventas")

#Leer desde PostgreSQL

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://<tu-url>/crm") \
    .option("dbtable", "clientes") \
    .option("user", "spark_user") \
    .option("password", "****") \
    .option("driver", "org.postgresql.Driver") \
    .option("numPartitions", "4") \
    .load()

#Leer desde Kafka

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<tu-url>") \
    .option("subscribe", "raw-eventos-web") \
    .option("startingOffsets", "earliest") \
    .load()

# Parsear JSON del value
from pyspark.sql.functions import from_json, col, schema_of_json

schema = "evento STRING, timestamp STRING, usuario_id LONG"
df_parsed = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

#Transformaciones SQL y DataFrames

#SQL queries

# Registrar tabla temporal
df.createOrReplaceTempView("eventos_temp")

# Query SQL
result = spark.sql("""
    SELECT
        usuario_id,
        COUNT(*) AS num_eventos,
        MAX(timestamp) AS ultimo_evento
    FROM eventos_temp
    WHERE DATE(timestamp) = '2026-03-14'
    GROUP BY usuario_id
    ORDER BY num_eventos DESC
""")

result.show(10)

#DataFrame transformations (PySpark)

Filtrar, seleccionar, agregar

from pyspark.sql.functions import col, count, sum as spark_sum, date_trunc

# Filtro
df_filtered = df.filter(col("pais") == "CO")

# Select y rename
df_sel = df.select(
    col("cliente_id").alias("id"),
    col("monto").cast("double").alias("total")
)

# Aggregation
df_agg = df.groupBy("pais", "categoria") \
    .agg(
        count("*").alias("num_transacciones"),
        spark_sum("monto").alias("total_monto")
    )

# Window function (ranking)
from pyspark.sql.functions import row_number, dense_rank
from pyspark.sql.window import Window

window_spec = Window.partitionBy("pais").orderBy(col("monto").desc())
df_ranked = df.withColumn(
    "ranking",
    row_number().over(window_spec)
).filter(col("ranking") <= 10)

#Join entre tablas

df_clientes = spark.read.table("crm.clientes")
df_ventas = spark.read.table("analytics.ventas")

df_joined = df_ventas.join(
    df_clientes,
    on=col("ventas.cliente_id") == col("clientes.id"),
    how="inner"
).select(
    "ventas.*",
    col("clientes.nombre").alias("nombre_cliente"),
    col("clientes.pais").alias("pais_cliente")
)

#Escritura de Datos

#Escribir en Hive/Iceberg (Parquet)

# Sobrescribir tabla existente
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3a://data-analytics/processed/mi_tabla/") \
    .saveAsTable("analytics.mi_tabla")

# O en Iceberg (v2 con ACID)
df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .save("analytics.mi_tabla_iceberg")

#Crear tabla con partición

df.write \
    .format("parquet") \
    .mode("overwrite") \
    .partitionBy("fecha", "pais") \
    .option("path", "s3a://data-analytics/processed/ventas/") \
    .saveAsTable("analytics.ventas_particionado")

#Append vs Overwrite

# APPEND: agregar filas nuevas (mismo esquema)
df_incremental.write \
    .format("parquet") \
    .mode("append") \
    .option("path", "s3a://data-analytics/raw/eventos-diarios/") \
    .saveAsTable("raw.eventos")

# OVERWRITE: reemplazar todo (cuidado con datos históricos)
df_completo.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3a://data-analytics/processed/snapshot/") \
    .saveAsTable("analytics.snapshot")

#Escribir en PostgreSQL

df.write \
    .format("jdbc") \
    .mode("append") \
    .option("url", "jdbc:postgresql://<tu-url>/reports") \
    .option("dbtable", "resumen_diario") \
    .option("user", "spark_user") \
    .option("password", "****") \
    .option("driver", "org.postgresql.Driver") \
    .save()

#Optimización y Performance

#Usar broadcast para small tables

from pyspark.sql.functions import broadcast

df_large = spark.read.table("analytics.ventas")  # Millones de filas
df_small = spark.read.table("crm.territorios")   # Algunos cientos

# Broadcast df_small (entra en memoria de cada executor)
df_result = df_large.join(
    broadcast(df_small),
    on="territorio_id"
)

#Bucketing y particionado

# Crear tabla con bucketing (optimiza joins posteriores)
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .bucketBy(10, "cliente_id") \
    .sortBy("cliente_id") \
    .option("path", "s3a://data-analytics/processed/ventas_bucketed/") \
    .saveAsTable("analytics.ventas_bucketed")

#Filtrar antes de operaciones costosas

# ❌ MALO: procesa todo, filtra al final
df.groupBy("pais").agg(sum("monto")).filter(col("pais") == "CO")

# ✅ BUENO: filtra primero, menos datos en groupBy
df.filter(col("pais") == "CO").groupBy("pais").agg(sum("monto"))

#Cache/Persist para reutilización

# Dataframe que se usará múltiples veces
df_prep = spark.read.table("analytics.datos_limpios")
df_prep.persist(StorageLevel.MEMORY_AND_DISK)

# Usar df_prep varias veces
result1 = df_prep.groupBy("fecha").agg(count("*"))
result2 = df_prep.filter(col("valor") > 100)

df_prep.unpersist()  # Liberar memoria cuando termines

#Explain y análisis de queries

# Ver plan de ejecución
df.explain()

# Plan completo con estadísticas
df.explain(extended=True)

# Verificar si está bien optimizado
df.explain(mode="cost")

#Spark Jobs

#Estructura típica de un job

# job_etl_ventas.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as spark_sum

# Iniciar sesión
spark = SparkSession.builder \
    .appName("ETL_Ventas_Diaria") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

# Leer
df_raw = spark.read.table("raw.eventos_web")

# Transformar
df_processed = df_raw \
    .filter(col("fecha") == "2026-03-14") \
    .groupBy("usuario_id", "canal") \
    .agg(spark_sum("monto").alias("total"))

# Escribir
df_processed.write \
    .format("iceberg") \
    .mode("overwrite") \
    .option("path", "s3a://data-analytics/processed/ventas_canal/") \
    .saveAsTable("analytics.ventas_canal")

print(f"Procesadas {df_processed.count()} filas")
spark.stop()

#Ejecutar job con spark-submit

spark-submit \
  --master <spark-master-url> \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --num-executors 8 \
  --conf spark.sql.shuffle.partitions=200 \
  job_etl_ventas.py

#Monitoreo y Troubleshooting

#Logs de Spark jobs

Ver ejecución en tiempo real

# Acceder al Spark UI
<tu-url>/

# Ver logs del driver
tail -f /var/log/spark/driver.log

# Ver logs de ejecutores
tail -f /var/log/spark/executor.log

#Problemas comunes

Problema Causa Solución
OutOfMemory en driver Data aggregation grande Aumentar --driver-memory
OutOfMemory en executor Shuffle sin espacio Aumentar --executor-memory
Slow job Falta paralelismo Aumentar --num-executors
Skewed partitions Un executor procesa mucho más Repartition antes de grupar
Query timeout Datos muy grandes o filtro incorrecto Filtrar antes de agg, usar sample

#Mejores Prácticas

Práctica Descripción
Path convention s3a://data-analytics/<layer>/<tabla>/
Particionado Siempre por fecha si es data histórica
Formato almacenamiento Parquet o Iceberg; nunca CSV en producción
Job naming etl-<nombre>-<frecuencia> ej. etl-ventas-diaria
Logs y monitoreo Loguear hitos: inicio, filas procesadas, fin
Errores Capturar y notificar en alertas (Slack/email)
Idempotencia Jobs deben poder re-ejecutarse sin duplicar
Performance tuning Medir con explain, usar cache en reutilizaciones

#Also see