DataSkills Hub

Feast

Feast es el Feature Store open-source para ML. Gestiona y sirve features de forma consistente entre entrenamiento e inferencia, materializando features offline en object storage y online en Valkey/Redis, con integración con Kubeflow Pipelines, Trino y JupyterHub.

#Getting Started

#Acceder a Feast

Interfaz web de Feast Registry

<tu-url>

Explorar feature views, entities y almacenamientos registrados.

#Arquitectura de Feature Store

Componente Tecnología Descripción
Registry Git + API Metadatos de features (YAML)
Offline Ceph (S3) Historical features para training
Online Valkey Real-time features para inference
Compute Spark/Trino Materialización y cálculo de features

#Verificar conexión

# SSH al nodo de Feast
ssh admin@<tu-url>

# Ver estado de repositorio
feast feature-store list

# Ver ultimo materialize job
feast feature-store materialize-status

#Conceptos clave

#Entity (Entidad)

Definición de lo que queremos darle features (ej: usuario, transacción):

# feature_repo/entities.yaml
entities:
  - name: user
    description: 'Usuario de la plataforma'
    valueType: INT64

  - name: transaction
    description: 'Transacción financiera'
    valueType: INT64

#Feature View

Colección de features relacionadas con una entity:

# feature_repo/feature_views.yaml
apiVersion: feast.dev/v1
kind: FeatureView
metadata:
  name: user_features
  entities:
    - user
  ttl: 86400s # 24 horas
sources:
  - name: user_source
    database: hive
    table: analytics.usuarios
    timestamp_field: fecha_actualizacion
features:
  - name: total_spend
    valueType: DOUBLE
  - name: days_since_signup
    valueType: INT32
  - name: monthly_churn_score
    valueType: DOUBLE

#Data Source

Conexión a la fuente donde viven los datos:

# feature_repo/data_sources.yaml
sources:
  - name: user_source
    type: trino # o spark, postgres, etc.
    config:
      host: <tu-url>
      port: <puerto>
      database: hive
      warehouse: analytics

#Definir Feature Store

#Estructura de repositorio

feast-repo/
├── feature_store.yaml       # Config principal
├── entities.yaml            # Definición de entidades
├── data_sources.yaml        # Fuentes de datos
├── feature_views.yaml       # Feature views
└── materialize_config.yaml  # Configuración de materialización

#Archivo feature_store.yaml

project: my_ml
registry: s3://ml-bucket/feast/registry.db
provider: local

offline_store:
  type: file
  data_source_dir: s3://models-bucket/feast_offline

online_store:
  type: redis
  connection_string: <tu-url>

spark_config:
  spark_master: <spark-master-url>
  spark_home: /opt/spark

#Feature View con transformación SQL

Para features calculadas más complejas:

apiVersion: feast.dev/v1
kind: FeatureView
metadata:
  name: user_lifetime_value
  entities:
    - user
sources:
  - name: user_transactions
    type: trino
    query: |
      SELECT
        user_id,
        SUM(amount) as total_revenue,
        COUNT(*) as transaction_count,
        MAX(transaction_date) as last_transaction,
        CURRENT_TIMESTAMP as feast_created_ts
      FROM hive.analytics.transactions
      WHERE transaction_date >= '2025-01-01'
      GROUP BY user_id
features:
  - name: total_revenue
    valueType: DOUBLE
  - name: transaction_count
    valueType: INT32
  - name: last_transaction_days_ago
    valueType: INT32

#Materializar Features

#Materialización offline → Ceph

Preparar datos históricos para training:

# Desde CLI
feast -c s3://ml-bucket/feast \
  materialize 2025-01-01 2026-03-14

# Esto guarda features en Ceph para todo el rango de fechas

Ubicación de archivos materializados:

s3://models-bucket/feast_offline/
  ├── user_features/
  │   ├── 2025-01-01/
  │   │   ├── data.parquet
  │   │   └── _metadata
  │   └── ...
  └── user_lifetime_value/
      └── ...

#Materialización online → Valkey

Sincronizar features más recientes a Valkey para inference:

# Materializar últimos N días a online store
feast -c s3://ml-bucket/feast materialize-online \
  --end-date 2026-03-14

Esto sincroniza features en Valkey para acceso ultra-rápido durante prediction.

#Programar materialización automática

Crear CronJob en Kubernetes:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: feast-daily-materialize
  namespace: ml
spec:
  schedule: '02 02 * * *' # 2:02 AM UTC cada día
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: feast-materialize
              image: feast-materialize:latest
              command:
                - /bin/bash
                - -c
                - |
                  feast -c s3://ml-bucket/feast materialize \
                    $(date -d yesterday +%Y-%m-%d) $(date +%Y-%m-%d)
          restartPolicy: OnFailure

#Recuperar Features para Training

#Get Historical Features (Offline)

Obtener features para entrenar modelos:

from feast import FeatureStore
import pandas as pd

# Conectar a Feast
fs = FeatureStore(repo_path='s3://ml-bucket/feast')

# Tabla de entidades con timestamps
entity_df = pd.DataFrame({
    'user_id': [123, 456, 789],
    'event_timestamp': pd.to_datetime(['2026-01-15', '2026-02-10', '2026-03-05'])
})

# Obtener features históricos
training_df = fs.get_historical_features(
    features=[
        'user_features:total_spend',
        'user_features:days_since_signup',
        'user_lifetime_value:total_revenue',
        'user_lifetime_value:transaction_count'
    ],
    entity_rows=entity_df
).to_df()

# Guardar para entrenar
training_df.to_parquet('gs://ml-bucket/training_data_v1.parquet')

#Feature View con múltiples entities

# Para features que cruzan entidades
feature_refs = [
    'user_features:total_spend',
    'user_features:days_since_signup',
    'transaction_features:avg_transaction_amount',  # features de otra entity
]

training_df = fs.get_historical_features(
    features=feature_refs,
    entity_rows=entity_df  # Debe tener user_id y transaction_id
).to_df()

#Recuperar Features para Inference

#Get Online Features (Realtime)

Obtener features más recientes para prediction:

from feast import FeatureStore

fs = FeatureStore(repo_path='s3://ml-bucket/feast')

# Entity para el cual queremos features
entity_dict = {
    'user_id': [12345, 67890]  # Puede ser batch
}

# Obtener features online (de Valkey)
online_features = fs.get_online_features(
    features=[
        'user_features:total_spend',
        'user_features:days_since_signup',
        'user_lifetime_value:transaction_count'
    ],
    entity_rows=[entity_dict]
)

# Convertir a formato para modelo
feature_vector = dict(zip(
    online_features.feature_names,
    online_features.to_dict()['results'][0].values()
))

print(feature_vector)
# Output: {'total_spend': 5000.0, 'days_since_signup': 180, 'transaction_count': 42}

#Integración con Kubeflow Pipelines

#Componente de Feast en Pipeline

from feast import FeatureStore
import kfp
from kfp import dsl

@dsl.component
def get_features_for_training() -> str:
    """Obtener features históricos de Feast para training"""
    from feast import FeatureStore
    import pandas as pd

    fs = FeatureStore(repo_path='s3://ml-bucket/feast')

    # Entidades para training
    entity_df = pd.DataFrame({
        'user_id': range(1, 10001),
        'event_timestamp': pd.date_range('2025-01-01', periods=10000, freq='1D')
    })

    # Obtener features
    training_df = fs.get_historical_features(
        features=[
            'user_features:total_spend',
            'user_features:days_since_signup',
            'user_lifetime_value:total_revenue'
        ],
        entity_rows=entity_df
    ).to_df()

    # Guardar
    output_path = 's3://ml-bucket/training_features.parquet'
    training_df.to_parquet(output_path)

    return output_path

@dsl.pipeline(name='ml-with-feast')
def ml_pipeline():
    features_task = get_features_for_training()
    # ... resto del pipeline de entrenamiento

#Monitoreo de Features

#Feature Stats y Data Quality

Detectar drift o cambios inesperados en features:

# Ver estadísticas de features materializados
feast -c s3://ml-bucket/feast feature-stats

# Ejemplo output:
# user_features:total_spend
#   min: 100.0, max: 999999.0, mean: 52000.0, std: 180000.0
#   updated: 2026-03-14 02:15:00

#Configurar alertas de drift

En Prometheus / Monitoring:

# Query: detectar si media de feature cambió > 10% vs promedio histórico
(avg(feature_total_spend) - avg(feature_total_spend offset 30d))
  / avg(feature_total_spend offset 30d) > 0.1

#Mejores Prácticas

Práctica Descripción
Versionado de features Cambios en feature view → versionar YAML en git
Documentation Cada feature view debe tener descripción clara
TTL apropiado Balancear entre frescura y costo de materialización
Namespacing en Valkey Prefijo de keys: <proyecto>_<modelo>_<feature>
Backups offline Mantener snapshots diarios en Ceph de estado Valkey
Monitoreo de fill rate Alertar si > 5% de features están NULL
Separación train/online Features entrenado con versión X, servir con X

#Troubleshooting

#Problemas comunes

Problema Causa Solución
"Feature not found" Feature view no materializado Ejecutar feast materialize
"Valkey connection timeout" Valkey caído o unreachable Verificar kubectl get pods -n ml
"Spark job failed" Datos malformados en fuente Revisar logs en Spark UI
"NULL values in feature" Entidad no existe en fuente Verificar entity_rows en query
"Materialize too slow" Dataset muy grande Particionar por fecha en query SQL