Data Team MICFeast es el Feature Store open-source para ML. Gestiona y sirve features de forma consistente entre entrenamiento e inferencia, materializando features offline en object storage y online en Valkey/Redis, con integración con Kubeflow Pipelines, Trino y JupyterHub.
Interfaz web de Feast Registry
<tu-url>
Explorar feature views, entities y almacenamientos registrados.
| Componente | Tecnología | Descripción |
|---|---|---|
| Registry | Git + API | Metadatos de features (YAML) |
| Offline | Ceph (S3) | Historical features para training |
| Online | Valkey | Real-time features para inference |
| Compute | Spark/Trino | Materialización y cálculo de features |
# SSH al nodo de Feast
ssh admin@<tu-url>
# Ver estado de repositorio
feast feature-store list
# Ver ultimo materialize job
feast feature-store materialize-status
Definición de lo que queremos darle features (ej: usuario, transacción):
# feature_repo/entities.yaml
entities:
- name: user
description: 'Usuario de la plataforma'
valueType: INT64
- name: transaction
description: 'Transacción financiera'
valueType: INT64
Colección de features relacionadas con una entity:
# feature_repo/feature_views.yaml
apiVersion: feast.dev/v1
kind: FeatureView
metadata:
name: user_features
entities:
- user
ttl: 86400s # 24 horas
sources:
- name: user_source
database: hive
table: analytics.usuarios
timestamp_field: fecha_actualizacion
features:
- name: total_spend
valueType: DOUBLE
- name: days_since_signup
valueType: INT32
- name: monthly_churn_score
valueType: DOUBLE
Conexión a la fuente donde viven los datos:
# feature_repo/data_sources.yaml
sources:
- name: user_source
type: trino # o spark, postgres, etc.
config:
host: <tu-url>
port: <puerto>
database: hive
warehouse: analytics
feast-repo/
├── feature_store.yaml # Config principal
├── entities.yaml # Definición de entidades
├── data_sources.yaml # Fuentes de datos
├── feature_views.yaml # Feature views
└── materialize_config.yaml # Configuración de materialización
project: my_ml
registry: s3://ml-bucket/feast/registry.db
provider: local
offline_store:
type: file
data_source_dir: s3://models-bucket/feast_offline
online_store:
type: redis
connection_string: <tu-url>
spark_config:
spark_master: <spark-master-url>
spark_home: /opt/spark
Para features calculadas más complejas:
apiVersion: feast.dev/v1
kind: FeatureView
metadata:
name: user_lifetime_value
entities:
- user
sources:
- name: user_transactions
type: trino
query: |
SELECT
user_id,
SUM(amount) as total_revenue,
COUNT(*) as transaction_count,
MAX(transaction_date) as last_transaction,
CURRENT_TIMESTAMP as feast_created_ts
FROM hive.analytics.transactions
WHERE transaction_date >= '2025-01-01'
GROUP BY user_id
features:
- name: total_revenue
valueType: DOUBLE
- name: transaction_count
valueType: INT32
- name: last_transaction_days_ago
valueType: INT32
Preparar datos históricos para training:
# Desde CLI
feast -c s3://ml-bucket/feast \
materialize 2025-01-01 2026-03-14
# Esto guarda features en Ceph para todo el rango de fechas
Ubicación de archivos materializados:
s3://models-bucket/feast_offline/
├── user_features/
│ ├── 2025-01-01/
│ │ ├── data.parquet
│ │ └── _metadata
│ └── ...
└── user_lifetime_value/
└── ...
Sincronizar features más recientes a Valkey para inference:
# Materializar últimos N días a online store
feast -c s3://ml-bucket/feast materialize-online \
--end-date 2026-03-14
Esto sincroniza features en Valkey para acceso ultra-rápido durante prediction.
Crear CronJob en Kubernetes:
apiVersion: batch/v1
kind: CronJob
metadata:
name: feast-daily-materialize
namespace: ml
spec:
schedule: '02 02 * * *' # 2:02 AM UTC cada día
jobTemplate:
spec:
template:
spec:
containers:
- name: feast-materialize
image: feast-materialize:latest
command:
- /bin/bash
- -c
- |
feast -c s3://ml-bucket/feast materialize \
$(date -d yesterday +%Y-%m-%d) $(date +%Y-%m-%d)
restartPolicy: OnFailure
Obtener features para entrenar modelos:
from feast import FeatureStore
import pandas as pd
# Conectar a Feast
fs = FeatureStore(repo_path='s3://ml-bucket/feast')
# Tabla de entidades con timestamps
entity_df = pd.DataFrame({
'user_id': [123, 456, 789],
'event_timestamp': pd.to_datetime(['2026-01-15', '2026-02-10', '2026-03-05'])
})
# Obtener features históricos
training_df = fs.get_historical_features(
features=[
'user_features:total_spend',
'user_features:days_since_signup',
'user_lifetime_value:total_revenue',
'user_lifetime_value:transaction_count'
],
entity_rows=entity_df
).to_df()
# Guardar para entrenar
training_df.to_parquet('gs://ml-bucket/training_data_v1.parquet')
# Para features que cruzan entidades
feature_refs = [
'user_features:total_spend',
'user_features:days_since_signup',
'transaction_features:avg_transaction_amount', # features de otra entity
]
training_df = fs.get_historical_features(
features=feature_refs,
entity_rows=entity_df # Debe tener user_id y transaction_id
).to_df()
Obtener features más recientes para prediction:
from feast import FeatureStore
fs = FeatureStore(repo_path='s3://ml-bucket/feast')
# Entity para el cual queremos features
entity_dict = {
'user_id': [12345, 67890] # Puede ser batch
}
# Obtener features online (de Valkey)
online_features = fs.get_online_features(
features=[
'user_features:total_spend',
'user_features:days_since_signup',
'user_lifetime_value:transaction_count'
],
entity_rows=[entity_dict]
)
# Convertir a formato para modelo
feature_vector = dict(zip(
online_features.feature_names,
online_features.to_dict()['results'][0].values()
))
print(feature_vector)
# Output: {'total_spend': 5000.0, 'days_since_signup': 180, 'transaction_count': 42}
from feast import FeatureStore
import kfp
from kfp import dsl
@dsl.component
def get_features_for_training() -> str:
"""Obtener features históricos de Feast para training"""
from feast import FeatureStore
import pandas as pd
fs = FeatureStore(repo_path='s3://ml-bucket/feast')
# Entidades para training
entity_df = pd.DataFrame({
'user_id': range(1, 10001),
'event_timestamp': pd.date_range('2025-01-01', periods=10000, freq='1D')
})
# Obtener features
training_df = fs.get_historical_features(
features=[
'user_features:total_spend',
'user_features:days_since_signup',
'user_lifetime_value:total_revenue'
],
entity_rows=entity_df
).to_df()
# Guardar
output_path = 's3://ml-bucket/training_features.parquet'
training_df.to_parquet(output_path)
return output_path
@dsl.pipeline(name='ml-with-feast')
def ml_pipeline():
features_task = get_features_for_training()
# ... resto del pipeline de entrenamiento
Detectar drift o cambios inesperados en features:
# Ver estadísticas de features materializados
feast -c s3://ml-bucket/feast feature-stats
# Ejemplo output:
# user_features:total_spend
# min: 100.0, max: 999999.0, mean: 52000.0, std: 180000.0
# updated: 2026-03-14 02:15:00
En Prometheus / Monitoring:
# Query: detectar si media de feature cambió > 10% vs promedio histórico
(avg(feature_total_spend) - avg(feature_total_spend offset 30d))
/ avg(feature_total_spend offset 30d) > 0.1
| Práctica | Descripción |
|---|---|
| Versionado de features | Cambios en feature view → versionar YAML en git |
| Documentation | Cada feature view debe tener descripción clara |
| TTL apropiado | Balancear entre frescura y costo de materialización |
| Namespacing en Valkey | Prefijo de keys: <proyecto>_<modelo>_<feature> |
| Backups offline | Mantener snapshots diarios en Ceph de estado Valkey |
| Monitoreo de fill rate | Alertar si > 5% de features están NULL |
| Separación train/online | Features entrenado con versión X, servir con X |
| Problema | Causa | Solución |
|---|---|---|
| "Feature not found" | Feature view no materializado | Ejecutar feast materialize |
| "Valkey connection timeout" | Valkey caído o unreachable | Verificar kubectl get pods -n ml |
| "Spark job failed" | Datos malformados en fuente | Revisar logs en Spark UI |
| "NULL values in feature" | Entidad no existe en fuente | Verificar entity_rows en query |
| "Materialize too slow" | Dataset muy grande | Particionar por fecha en query SQL |