DataSkills Hub

Kubeflow

Kubeflow es la plataforma open-source de Machine Learning sobre Kubernetes. Permite entrenar, desplegar y gestionar modelos de ML a escala usando Kubeflow Pipelines para orquestar workflows, KFServing para servir modelos en producción, e integración con JupyterHub, Feast y object storage.

#Getting Started

#Acceder a Kubeflow

Interfaz web de Kubeflow

<tu-url>

Autenticarse con SSO corporativo. El namespace de usuario se crea automáticamente.

#Componentes principales

Componente Descripción
Pipelines Orquestación de workflows de ML
Notebooks JupyterHub integrado para desarrollo exploratorio
Models Registro y versionado de modelos entrenados
KFServing Deployment de modelos para inference en tiempo real
Training Operadores para entrenamientos distribuidos
Monitoring Métricas y health de modelos en producción

#Verificar cluster (Health Check)

# SSH a nodo del cluster
ssh admin@<tu-url>

# Ver componentes de Kubeflow
kubectl get pods -n kubeflow
kubectl get svc -n kubeflow | grep kubeflow-ui

# Ver status de tu namespace
kubectl get pods -n <tu-namespace>

#Kubeflow Pipelines

#Crear pipeline básico con Python SDK

Instalar SDK

pip install kfp==1.8.0

Ejemplo: Pipeline de clasificación simple

import kfp
from kfp import dsl

# Definir componentes (steps) del pipeline
@dsl.component
def load_data(output_path: str):
    import pandas as pd
    data = pd.read_csv('gs://ml-bucket/datos/train.csv')
    data.to_csv(output_path, index=False)

@dsl.component
def train_model(data_path: str, model_path: str):
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import joblib

    data = pd.read_csv(data_path)
    X, y = data.drop('target', axis=1), data['target']

    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)
    joblib.dump(model, model_path)

@dsl.component
def evaluate_model(model_path: str) -> str:
    import joblib
    model = joblib.load(model_path)
    accuracy = model.score(X_test, y_test)
    return f"Accuracy: {accuracy}"

# Definir pipeline
@dsl.pipeline(name='classification-pipeline')
def my_pipeline():
    data_task = load_data(output_path='/tmp/data.csv')
    train_task = train_model(
        data_path=data_task.outputs['output_path'],
        model_path='/tmp/model.pkl'
    )
    eval_task = evaluate_model(
        model_path=train_task.outputs['model_path']
    )

#Compilar y ejecutar pipeline

# Compilar a YAML
kfp.compiler.Compiler().compile(
    my_pipeline,
    'classification_pipeline.yaml'
)

# Ejecutar en Kubeflow UI o vía CLI
kfp run submit \
  --experiment-name my-experiments \
  --run-name run-20260314 \
  classification_pipeline.yaml

#Acceder a resultados

En Kubeflow UI:

  1. Pipelines → seleccionar pipeline
  2. Ver DAG de ejecución con status de cada componente
  3. Logs, artefactos y métricas en Artifacts tab

#Jupyter Notebooks integrados

#Crear Notebook en JupyterHub

  1. En Kubeflow UI → Notebooks
  2. New Server
  3. Configurar:
Name:      my-notebook
Namespace: <tu-namespace>
Image:     kubeflownotebookswg/jupyter-scipy:latest
CPU:       2
Memory:    4Gi
Storage:   20Gi (PVC)
  1. Create → Esperar 2-3 min a que inicie el servidor
  2. Connect → Se abre Jupyter

#Notebook de entrenamiento con acceso a Feast

# Importar cliente de Feast
from feast import FeatureStore

# Conectar a Feast registry
fs = FeatureStore(repo_path='gs://ml-bucket/feast')

# Obtener features para training
feature_views = fs.get_feature_view('user_features')
training_df = fs.get_historical_features(
    features=[
        'user_features:user_id',
        'user_features:total_spend',
        'user_features:days_since_signup'
    ],
    entity_rows=entity_df
).to_df()

# Entrenar modelo
from sklearn.ensemble import GradientBoostingClassifier
X = training_df.drop(['target'], axis=1)
y = training_df['target']

model = GradientBoostingClassifier(n_estimators=200)
model.fit(X, y)

# Guardar en Ceph
import joblib
joblib.dump(model, 's3://models-bucket/churn-model-v1.pkl')

#Entrenamiento distribuido

#Training Job en Kubernetes

Para entrenamientos de larga duración, usar PyTorchJob o TFJob:

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: training-job
  namespace: ml
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: pytorch
              image: pytorch/pytorch:2.0-cuda11.8-cudnn8-runtime
              command: ['python', 'train.py']
              volumeMounts:
                - name: training-code
                  mountPath: /app
          volumes:
            - name: training-code
              emptyDir: {}
    Worker:
      replicas: 3
      template:
        spec:
          containers:
            - name: pytorch
              image: pytorch/pytorch:2.0-cuda11.8-cudnn8-runtime
              command: ['python', 'train.py']

Aplicar:

kubectl apply -f training-job.yaml
kubectl logs -f pod/<master-pod-name> -n ml

#KFServing: Desplegar modelos en producción

#Crear InferenceService

Exponer modelo para inference en tiempo real:

apiVersion: serving.kubeflow.org/v1beta1
kind: InferenceService
metadata:
  name: churn-prediction-model
  namespace: ml
spec:
  predictor:
    sklearn:
      storageUri: s3://models-bucket/churn-model-v1.pkl
      resources:
        requests:
          cpu: '1'
          memory: '2Gi'
      minReplicas: 2
      maxReplicas: 10
      scaleTarget: 70 # scale a 70% CPU
  canary:
    trafficPercent: 10 # Canary deployment: 10% tráfico a v2
    sklearn:
      storageUri: s3://models-bucket/churn-model-v2.pkl

Aplicar y verificar:

kubectl apply -f inference-service.yaml
kubectl get inferenceservices -n ml

#Invocar modelo via API REST

El servicio expone un endpoint:

<tu-url>/v1/models/churn-prediction-model:predict

Request:

{
  "instances": [
    {
      "user_id": 12345,
      "total_spend": 5000.0,
      "days_since_signup": 180,
      "monthly_usage": 250
    }
  ]
}

Response:

{
  "predictions": [
    {
      "churn_probability": 0.23
    }
  ]
}

#Integración con Feast (Feature Store)

#Obtener features en predicción

Desde una aplicación o servicio que llama al modelo:

from feast import FeatureStore
import requests

# Conectar a Feature Store
fs = FeatureStore(repo_path='s3://ml-bucket/feast')

# Obtener features online (desde Valkey)
features = fs.get_online_features(
    features=['user_features:total_spend', 'user_features:churn_score'],
    entity_rows=[{'user_id': 12345}]
)

# Convertir a dict para enviar al modelo
feature_dict = dict(zip(
    features.feature_names,
    features.to_dict()['results'][0].values()
))

# Invocar modelo
response = requests.post(
    '<tu-url>/v1/models/churn-prediction-model:predict',
    json={'instances': [feature_dict]}
)
prediction = response.json()['predictions'][0]
print(f"Churn probability: {prediction['churn_probability']}")

#Materializar features offline

Preparar features para training en batch:

# CLI de Feast
feast -c s3://ml-bucket/feast materialize 2026-01-01 2026-03-14

# Esto llena Ceph con archivos Parquet para training

#Monitoreo de modelos

#Ver métricas de InferenceService

# Acceder a Prometheus
<tu-url>

# Query ejemplo: latencia promedio del modelo
avg(rate(kfs_request_duration_seconds[5m]))

# Query: tasa de errores
sum(rate(kfs_request_error_count[5m])) by (model_name)

#Alertas en Kubeflow

Configurar alertas si:

  • Latencia de predicción > 200ms
  • Tasa de error > 1%
  • Replica no está lista

Alertas se envían a Slack/email configurado en Kubeflow monitoring.


#Mejores Prácticas

Práctica Descripción
Versionado de modelos Guardar modelos en s3://models-bucket/<modelo>-v<N>.pkl
Reproducibilidad Fijar seeds (numpy, torch, tensorflow) en training
Métricas en artifacts Guardar métricas de entrenamiento en artefactos
Namespace separados Cada equipo/proyecto en namespace diferente
PVC para datos grandes Usar PersistentVolumeClaim si dataset > 50GB
Resource limits Siempre especificar requests/limits en Pods
Monitoreo de drift Trackear diferencia entre distrib. train vs. producción

#Troubleshooting

#Problemas comunes

Problema Causa Solución
"Pod pending" Insuficientes recursos Reducir CPU/memory o añadir nodos
"Pipeline timeout" Componente muy lenta Aumentar timeout, optimizar código
"Model not found in Ceph" Ruta de S3 incorrecta Verificar URI en storageUri
"Inference service erroring" Incompatibilidad de framework Verificar imagen Docker match modelo

#Also see