DataSkills Hub

Apache Iceberg

Apache Iceberg es un formato de tabla abierto para almacenamiento distribuido. Proporciona capacidades avanzadas de versionado, time-travel, y schema evolution para datasets analíticos masivos almacenados en Ceph/S3. Las tablas Iceberg se consultan vía Trino y se escriben por Spark jobs.

#Getting Started

#Acceder a tablas Iceberg

Conectarse vía Trino al catálogo Iceberg

# Configurar conexión a Trino
$ trino --server <tu-url> \
        --user <tu_usuario> \
        --catalog iceberg

Verificar tablas disponibles

-- Ver todos los schemas en el catálogo Iceberg
SHOW SCHEMAS FROM iceberg;

-- Ver tablas en un schema específico
SHOW TABLES FROM iceberg.analytics;

-- Ver detalles de tabla (metadatos y particiones)
DESCRIBE iceberg.analytics.ventas_diarias;

#Conexión desde Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergApp") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "<metastore-url>") \
    .getOrCreate()

# Consultar tabla Iceberg
df = spark.sql("SELECT * FROM iceberg.analytics.ventas_diarias LIMIT 10")

#Crear Tablas Iceberg

#Crear tabla desde SQL (Trino)

-- Tabla Iceberg particionada por año, mes, día
CREATE TABLE iceberg.analytics.clientes_snapshot (
    customer_id BIGINT,
    nombre VARCHAR,
    segmento VARCHAR,
    fecha_registro DATE,
    snapshot_date DATE
)
WITH (
    partitioned_by = ARRAY['snapshot_date'],
    format = 'PARQUET'
);

#Crear tabla desde datos existentes (CTAS)

-- Crear tabla Iceberg a partir de datos en Ceph
CREATE TABLE iceberg.analytics.transacciones_2026 AS
SELECT
    transaction_id,
    customer_id,
    monto,
    fecha,
    CAST(DATE_TRUNC('month', fecha) AS DATE) AS snapshot_date
FROM iceberg.raw.transacciones_raw
WHERE year(fecha) = 2026;

#Particionamiento efectivo

-- Tabla con particionamiento por múltiples columnas
CREATE TABLE iceberg.analytics.kpi_diario (
    region VARCHAR,
    producto VARCHAR,
    kpi_valor DOUBLE,
    fecha DATE
)
PARTITIONED BY (
    MONTH(fecha),      -- Partición mensual
    region             -- Partición por región
);

#Operaciones de Datos

#Insertar datos

-- Insertar registros nuevos
INSERT INTO iceberg.analytics.clientes_snapshot
SELECT customer_id, nombre, segmento, fecha_registro, CURRENT_DATE
FROM iceberg.staging.clientes_delta
WHERE updated_at > DATE_SUB(CURRENT_DATE, 1);

#Actualizar y eliminar (ACID)

-- Actualizar registros (soporte ACID nativo)
UPDATE iceberg.analytics.clientes_snapshot
SET segmento = 'VIP'
WHERE customer_id IN (1001, 1002, 1003);

-- Eliminar registros
DELETE FROM iceberg.analytics.clientes_snapshot
WHERE fecha_registro < DATE_SUB(CURRENT_DATE, 730);

#Overwrite completo

-- Reemplazar todos los datos de una partición
INSERT OVERWRITE iceberg.analytics.ventas_diarias
PARTITION (snapshot_date = DATE '2026-03-14')
SELECT customer_id, monto, fecha, DATE '2026-03-14' as snapshot_date
FROM iceberg.staging.ventas_nuevas;

#Time-Travel y Versionado

#Consultar snapshot histórico

-- Consultar tabla como estaba hace 1 día
SELECT * FROM iceberg.analytics.clientes_snapshot
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP - INTERVAL '1' DAY;

-- Consultar por timestamp específico
SELECT * FROM iceberg.analytics.clientes_snapshot
FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-13 14:30:00 UTC';

-- Listar todos los snapshots disponibles
SELECT snapshot_id, committed_at, operation
FROM iceberg.analytics."clientes_snapshot$snapshots";

#Rollback a versión anterior

-- Revertir tabla a snapshot anterior (operación admin)
-- Usar Spark o herramientas nativas de Iceberg

#Schema Evolution

#Agregar columnas sin afectar datos existentes

-- Agregar columna nueva (nullable por defecto)
ALTER TABLE iceberg.analytics.clientes_snapshot
ADD COLUMN email VARCHAR;

-- Agregar columna con valor por defecto
ALTER TABLE iceberg.analytics.clientes_snapshot
ADD COLUMN fecha_creacion TIMESTAMP DEFAULT CURRENT_TIMESTAMP;

#Renombrar y reordenar columnas

-- Renombrar columna
ALTER TABLE iceberg.analytics.clientes_snapshot
RENAME COLUMN nombre TO customer_name;

-- Cambiar orden de columnas (se preservan datos)
ALTER TABLE iceberg.analytics.clientes_snapshot
REORDER COLUMNS (customer_id, customer_name, email, segmento);

#Cambiar tipos de datos

-- Expandir tipo (VARCHAR → TEXT, INT → BIGINT)
-- Iceberg permite ciertos cambios seguros
ALTER TABLE iceberg.analytics.clientes_snapshot
ALTER COLUMN monto SET DATA TYPE DECIMAL(18,2);

#Mantenimiento y Compactación

#Optimizar tablas (compactación)

-- Consolidar archivos pequeños (Iceberg nativo)
-- En Trino: usar OPTIMIZE
ALTER TABLE iceberg.analytics.ventas_diarias EXECUTE OPTIMIZE;

-- Especificar tamaño mínimo de archivos
ALTER TABLE iceberg.analytics.ventas_diarias EXECUTE OPTIMIZE
WHERE snapshot_date = CAST(CURRENT_DATE - INTERVAL '1' DAY AS DATE)
USING WRITE_DISTRIBUTION = (PARTITION_ID, BUCKET(10, customer_id));

#Gestión de archivos

-- Listar archivos físicos de una tabla
SELECT file_path, file_size_in_bytes, record_count
FROM iceberg.analytics."clientes_snapshot$files";

-- Estadísticas de particiones
SELECT partition, file_count, record_count, total_size_in_bytes
FROM iceberg.analytics."clientes_snapshot$partitions";

#Limpiar snapshots antiguos (garbage collection)

-- Remover snapshots más antiguos de 30 días
-- Requiere operación admin (CLI/Spark)
# spark-submit --class org.apache.iceberg.spark.procedures.RemoveExpiredSnapshotsProcedure ...

#Integración con el Stack

#Consultar desde Trino con catálogo Iceberg

-- Cross-catalog query (Iceberg + Hive)
SELECT
    i.customer_id,
    i.segmento,
    h.saldo_actual
FROM iceberg.analytics.clientes_snapshot i
JOIN hive.default.saldos h
    ON i.customer_id = h.customer_id;

#Escribir desde Spark (ETL jobs)

# Job Spark que escribe tablas Iceberg
spark.sql("""
    INSERT INTO iceberg.analytics.ventas_diarias
    SELECT
        transaction_id,
        customer_id,
        monto,
        fecha,
        CAST(fecha AS DATE) as snapshot_date
    FROM iceberg.staging.transacciones_raw
    WHERE processed_at > DATE_SUB(CURRENT_DATE, 1)
""")

#Mejores Prácticas

Práctica Descripción
Particionamiento Particionar por snapshot_date o fecha naturalizada. Evitar exceso de particiones (max 1000).
Schema versioning Usar schema evolution para agregar columnas. Nunca eliminar sin deprecación previa.
Naming Tablas en formato <area>_<entidad>_<tipo>, ej. comercial_clientes_snapshot, telco_cdr_incremental.
Compactación Ejecutar OPTIMIZE semanal en tablas high-write o trim snapshots cada 30 días.
Time-travel Documentar snapshots importantes para auditoría. Consultar histórico antes de eliminar datos.
Acceso Usar Trino para lectura, Spark para escritura y transformaciones pesadas.

#Also see