Data Team MICKubeflow es la plataforma open-source de Machine Learning sobre Kubernetes. Permite entrenar, desplegar y gestionar modelos de ML a escala usando Kubeflow Pipelines para orquestar workflows, KFServing para servir modelos en producción, e integración con JupyterHub, Feast y object storage.
Interfaz web de Kubeflow
<tu-url>
Autenticarse con SSO corporativo. El namespace de usuario se crea automáticamente.
| Componente | Descripción |
|---|---|
| Pipelines | Orquestación de workflows de ML |
| Notebooks | JupyterHub integrado para desarrollo exploratorio |
| Models | Registro y versionado de modelos entrenados |
| KFServing | Deployment de modelos para inference en tiempo real |
| Training | Operadores para entrenamientos distribuidos |
| Monitoring | Métricas y health de modelos en producción |
# SSH a nodo del cluster
ssh admin@<tu-url>
# Ver componentes de Kubeflow
kubectl get pods -n kubeflow
kubectl get svc -n kubeflow | grep kubeflow-ui
# Ver status de tu namespace
kubectl get pods -n <tu-namespace>
Instalar SDK
pip install kfp==1.8.0
Ejemplo: Pipeline de clasificación simple
import kfp
from kfp import dsl
# Definir componentes (steps) del pipeline
@dsl.component
def load_data(output_path: str):
import pandas as pd
data = pd.read_csv('gs://ml-bucket/datos/train.csv')
data.to_csv(output_path, index=False)
@dsl.component
def train_model(data_path: str, model_path: str):
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import joblib
data = pd.read_csv(data_path)
X, y = data.drop('target', axis=1), data['target']
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)
joblib.dump(model, model_path)
@dsl.component
def evaluate_model(model_path: str) -> str:
import joblib
model = joblib.load(model_path)
accuracy = model.score(X_test, y_test)
return f"Accuracy: {accuracy}"
# Definir pipeline
@dsl.pipeline(name='classification-pipeline')
def my_pipeline():
data_task = load_data(output_path='/tmp/data.csv')
train_task = train_model(
data_path=data_task.outputs['output_path'],
model_path='/tmp/model.pkl'
)
eval_task = evaluate_model(
model_path=train_task.outputs['model_path']
)
# Compilar a YAML
kfp.compiler.Compiler().compile(
my_pipeline,
'classification_pipeline.yaml'
)
# Ejecutar en Kubeflow UI o vía CLI
kfp run submit \
--experiment-name my-experiments \
--run-name run-20260314 \
classification_pipeline.yaml
En Kubeflow UI:
Name: my-notebook
Namespace: <tu-namespace>
Image: kubeflownotebookswg/jupyter-scipy:latest
CPU: 2
Memory: 4Gi
Storage: 20Gi (PVC)
# Importar cliente de Feast
from feast import FeatureStore
# Conectar a Feast registry
fs = FeatureStore(repo_path='gs://ml-bucket/feast')
# Obtener features para training
feature_views = fs.get_feature_view('user_features')
training_df = fs.get_historical_features(
features=[
'user_features:user_id',
'user_features:total_spend',
'user_features:days_since_signup'
],
entity_rows=entity_df
).to_df()
# Entrenar modelo
from sklearn.ensemble import GradientBoostingClassifier
X = training_df.drop(['target'], axis=1)
y = training_df['target']
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X, y)
# Guardar en Ceph
import joblib
joblib.dump(model, 's3://models-bucket/churn-model-v1.pkl')
Para entrenamientos de larga duración, usar PyTorchJob o TFJob:
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: training-job
namespace: ml
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0-cuda11.8-cudnn8-runtime
command: ['python', 'train.py']
volumeMounts:
- name: training-code
mountPath: /app
volumes:
- name: training-code
emptyDir: {}
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0-cuda11.8-cudnn8-runtime
command: ['python', 'train.py']
Aplicar:
kubectl apply -f training-job.yaml
kubectl logs -f pod/<master-pod-name> -n ml
Exponer modelo para inference en tiempo real:
apiVersion: serving.kubeflow.org/v1beta1
kind: InferenceService
metadata:
name: churn-prediction-model
namespace: ml
spec:
predictor:
sklearn:
storageUri: s3://models-bucket/churn-model-v1.pkl
resources:
requests:
cpu: '1'
memory: '2Gi'
minReplicas: 2
maxReplicas: 10
scaleTarget: 70 # scale a 70% CPU
canary:
trafficPercent: 10 # Canary deployment: 10% tráfico a v2
sklearn:
storageUri: s3://models-bucket/churn-model-v2.pkl
Aplicar y verificar:
kubectl apply -f inference-service.yaml
kubectl get inferenceservices -n ml
El servicio expone un endpoint:
<tu-url>/v1/models/churn-prediction-model:predict
Request:
{
"instances": [
{
"user_id": 12345,
"total_spend": 5000.0,
"days_since_signup": 180,
"monthly_usage": 250
}
]
}
Response:
{
"predictions": [
{
"churn_probability": 0.23
}
]
}
Desde una aplicación o servicio que llama al modelo:
from feast import FeatureStore
import requests
# Conectar a Feature Store
fs = FeatureStore(repo_path='s3://ml-bucket/feast')
# Obtener features online (desde Valkey)
features = fs.get_online_features(
features=['user_features:total_spend', 'user_features:churn_score'],
entity_rows=[{'user_id': 12345}]
)
# Convertir a dict para enviar al modelo
feature_dict = dict(zip(
features.feature_names,
features.to_dict()['results'][0].values()
))
# Invocar modelo
response = requests.post(
'<tu-url>/v1/models/churn-prediction-model:predict',
json={'instances': [feature_dict]}
)
prediction = response.json()['predictions'][0]
print(f"Churn probability: {prediction['churn_probability']}")
Preparar features para training en batch:
# CLI de Feast
feast -c s3://ml-bucket/feast materialize 2026-01-01 2026-03-14
# Esto llena Ceph con archivos Parquet para training
# Acceder a Prometheus
<tu-url>
# Query ejemplo: latencia promedio del modelo
avg(rate(kfs_request_duration_seconds[5m]))
# Query: tasa de errores
sum(rate(kfs_request_error_count[5m])) by (model_name)
Configurar alertas si:
Alertas se envían a Slack/email configurado en Kubeflow monitoring.
| Práctica | Descripción |
|---|---|
| Versionado de modelos | Guardar modelos en s3://models-bucket/<modelo>-v<N>.pkl |
| Reproducibilidad | Fijar seeds (numpy, torch, tensorflow) en training |
| Métricas en artifacts | Guardar métricas de entrenamiento en artefactos |
| Namespace separados | Cada equipo/proyecto en namespace diferente |
| PVC para datos grandes | Usar PersistentVolumeClaim si dataset > 50GB |
| Resource limits | Siempre especificar requests/limits en Pods |
| Monitoreo de drift | Trackear diferencia entre distrib. train vs. producción |
| Problema | Causa | Solución |
|---|---|---|
| "Pod pending" | Insuficientes recursos | Reducir CPU/memory o añadir nodos |
| "Pipeline timeout" | Componente muy lenta | Aumentar timeout, optimizar código |
| "Model not found in Ceph" | Ruta de S3 incorrecta | Verificar URI en storageUri |
| "Inference service erroring" | Incompatibilidad de framework | Verificar imagen Docker match modelo |