Data Team MICApache Iceberg es un formato de tabla abierto para almacenamiento distribuido. Proporciona capacidades avanzadas de versionado, time-travel, y schema evolution para datasets analíticos masivos almacenados en Ceph/S3. Las tablas Iceberg se consultan vía Trino y se escriben por Spark jobs.
Conectarse vía Trino al catálogo Iceberg
# Configurar conexión a Trino
$ trino --server <tu-url> \
--user <tu_usuario> \
--catalog iceberg
Verificar tablas disponibles
-- Ver todos los schemas en el catálogo Iceberg
SHOW SCHEMAS FROM iceberg;
-- Ver tablas en un schema específico
SHOW TABLES FROM iceberg.analytics;
-- Ver detalles de tabla (metadatos y particiones)
DESCRIBE iceberg.analytics.ventas_diarias;
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergApp") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hive") \
.config("spark.sql.catalog.iceberg.uri", "<metastore-url>") \
.getOrCreate()
# Consultar tabla Iceberg
df = spark.sql("SELECT * FROM iceberg.analytics.ventas_diarias LIMIT 10")
-- Tabla Iceberg particionada por año, mes, día
CREATE TABLE iceberg.analytics.clientes_snapshot (
customer_id BIGINT,
nombre VARCHAR,
segmento VARCHAR,
fecha_registro DATE,
snapshot_date DATE
)
WITH (
partitioned_by = ARRAY['snapshot_date'],
format = 'PARQUET'
);
-- Crear tabla Iceberg a partir de datos en Ceph
CREATE TABLE iceberg.analytics.transacciones_2026 AS
SELECT
transaction_id,
customer_id,
monto,
fecha,
CAST(DATE_TRUNC('month', fecha) AS DATE) AS snapshot_date
FROM iceberg.raw.transacciones_raw
WHERE year(fecha) = 2026;
-- Tabla con particionamiento por múltiples columnas
CREATE TABLE iceberg.analytics.kpi_diario (
region VARCHAR,
producto VARCHAR,
kpi_valor DOUBLE,
fecha DATE
)
PARTITIONED BY (
MONTH(fecha), -- Partición mensual
region -- Partición por región
);
-- Insertar registros nuevos
INSERT INTO iceberg.analytics.clientes_snapshot
SELECT customer_id, nombre, segmento, fecha_registro, CURRENT_DATE
FROM iceberg.staging.clientes_delta
WHERE updated_at > DATE_SUB(CURRENT_DATE, 1);
-- Actualizar registros (soporte ACID nativo)
UPDATE iceberg.analytics.clientes_snapshot
SET segmento = 'VIP'
WHERE customer_id IN (1001, 1002, 1003);
-- Eliminar registros
DELETE FROM iceberg.analytics.clientes_snapshot
WHERE fecha_registro < DATE_SUB(CURRENT_DATE, 730);
-- Reemplazar todos los datos de una partición
INSERT OVERWRITE iceberg.analytics.ventas_diarias
PARTITION (snapshot_date = DATE '2026-03-14')
SELECT customer_id, monto, fecha, DATE '2026-03-14' as snapshot_date
FROM iceberg.staging.ventas_nuevas;
-- Consultar tabla como estaba hace 1 día
SELECT * FROM iceberg.analytics.clientes_snapshot
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP - INTERVAL '1' DAY;
-- Consultar por timestamp específico
SELECT * FROM iceberg.analytics.clientes_snapshot
FOR SYSTEM_TIME AS OF TIMESTAMP '2026-03-13 14:30:00 UTC';
-- Listar todos los snapshots disponibles
SELECT snapshot_id, committed_at, operation
FROM iceberg.analytics."clientes_snapshot$snapshots";
-- Revertir tabla a snapshot anterior (operación admin)
-- Usar Spark o herramientas nativas de Iceberg
-- Agregar columna nueva (nullable por defecto)
ALTER TABLE iceberg.analytics.clientes_snapshot
ADD COLUMN email VARCHAR;
-- Agregar columna con valor por defecto
ALTER TABLE iceberg.analytics.clientes_snapshot
ADD COLUMN fecha_creacion TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
-- Renombrar columna
ALTER TABLE iceberg.analytics.clientes_snapshot
RENAME COLUMN nombre TO customer_name;
-- Cambiar orden de columnas (se preservan datos)
ALTER TABLE iceberg.analytics.clientes_snapshot
REORDER COLUMNS (customer_id, customer_name, email, segmento);
-- Expandir tipo (VARCHAR → TEXT, INT → BIGINT)
-- Iceberg permite ciertos cambios seguros
ALTER TABLE iceberg.analytics.clientes_snapshot
ALTER COLUMN monto SET DATA TYPE DECIMAL(18,2);
-- Consolidar archivos pequeños (Iceberg nativo)
-- En Trino: usar OPTIMIZE
ALTER TABLE iceberg.analytics.ventas_diarias EXECUTE OPTIMIZE;
-- Especificar tamaño mínimo de archivos
ALTER TABLE iceberg.analytics.ventas_diarias EXECUTE OPTIMIZE
WHERE snapshot_date = CAST(CURRENT_DATE - INTERVAL '1' DAY AS DATE)
USING WRITE_DISTRIBUTION = (PARTITION_ID, BUCKET(10, customer_id));
-- Listar archivos físicos de una tabla
SELECT file_path, file_size_in_bytes, record_count
FROM iceberg.analytics."clientes_snapshot$files";
-- Estadísticas de particiones
SELECT partition, file_count, record_count, total_size_in_bytes
FROM iceberg.analytics."clientes_snapshot$partitions";
-- Remover snapshots más antiguos de 30 días
-- Requiere operación admin (CLI/Spark)
# spark-submit --class org.apache.iceberg.spark.procedures.RemoveExpiredSnapshotsProcedure ...
-- Cross-catalog query (Iceberg + Hive)
SELECT
i.customer_id,
i.segmento,
h.saldo_actual
FROM iceberg.analytics.clientes_snapshot i
JOIN hive.default.saldos h
ON i.customer_id = h.customer_id;
# Job Spark que escribe tablas Iceberg
spark.sql("""
INSERT INTO iceberg.analytics.ventas_diarias
SELECT
transaction_id,
customer_id,
monto,
fecha,
CAST(fecha AS DATE) as snapshot_date
FROM iceberg.staging.transacciones_raw
WHERE processed_at > DATE_SUB(CURRENT_DATE, 1)
""")
| Práctica | Descripción |
|---|---|
| Particionamiento | Particionar por snapshot_date o fecha naturalizada. Evitar exceso de particiones (max 1000). |
| Schema versioning | Usar schema evolution para agregar columnas. Nunca eliminar sin deprecación previa. |
| Naming | Tablas en formato <area>_<entidad>_<tipo>, ej. comercial_clientes_snapshot, telco_cdr_incremental. |
| Compactación | Ejecutar OPTIMIZE semanal en tablas high-write o trim snapshots cada 30 días. |
| Time-travel | Documentar snapshots importantes para auditoría. Consultar histórico antes de eliminar datos. |
| Acceso | Usar Trino para lectura, Spark para escritura y transformaciones pesadas. |