← Back to tutorials

Kubeflow ML Pipelines

Orchestrating ML workflows on Kubernetes with Kubeflow

Kubeflow ML Pipelines

Overview

Orchestrating ML workflows on Kubernetes with Kubeflow. This guide covers practical implementation for production ML systems.

Why This Matters in MLOps

Setup

bash

Install required tools

pip install kubeflow mlflow pandas numpy scikit-learn

Or with Docker

docker pull gcr.io/kubeflow-images-public/notebook-servers/jupyter

Core Implementation

python
import os
import json
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class KubeflowMLPipelines: """ Kubeflow ML Pipelines implementation. Handles: kubernetes Tool: kubeflow """ def __init__(self, config: dict = None): self.config = config or self._default_config() self._setup() def _default_config(self) -> dict: return { "tool": "kubeflow", "environment": os.getenv("ENVIRONMENT", "development"), "log_level": "INFO", } def _setup(self): """Initialize kubeflow connection and resources.""" logging.basicConfig(level=self.config.get("log_level", "INFO")) logger.info(f"Initialized Kubeflow ML Pipelines with config: {self.config}") def run(self, **kwargs) -> dict: """Execute kubernetes.""" start = datetime.utcnow() try: result = self._execute(**kwargs) elapsed = (datetime.utcnow() - start).total_seconds() logger.info(f"Kubeflow ML Pipelines completed in {elapsed:.2f}s") return { "status": "success", "result": result, "elapsed_seconds": elapsed } except Exception as e: logger.error(f"Kubeflow ML Pipelines failed: {e}") return { "status": "failed", "error": str(e) } def _execute(self, **kwargs) -> dict: """Core kubernetes logic. Override to customize.""" return {"completed": True, "tool": "kubeflow"}

Configuration

config = { "tool": "kubeflow", "tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"), "artifact_root": "./artifacts", }

Initialize

processor = KubeflowMLPipelines(config) result = processor.run() print(json.dumps(result, indent=2))

KUBEFLOW Integration

python

Specific kubeflow integration for kubernetes

import subprocess

def setup_kubeflow(): """Configure kubeflow for kubernetes.""" # Initialize project print(f"Setting up kubeflow for kubernetes...") # Example configuration config = { "project": "my-ml-project", "tool": "kubeflow", "specialty": "kubernetes", "version": "1.0.0" } # Save configuration Path(".kubeflow").mkdir(exist_ok=True) with open(f".kubeflow/config.json", "w") as f: json.dump(config, f, indent=2) print(f"kubeflow configured for kubernetes") return config

config = setup_kubeflow()

Monitoring and Alerting

python
from dataclasses import dataclass
import time

class MLOpsMonitor: """Monitor kubernetes metrics.""" def __init__(self): self.metrics: list[MetricSnapshot] = [] self.thresholds = { "error_rate": 0.05, "latency_p99_ms": 1000, "data_drift_score": 0.3 } def record(self, metric: str, value: float, labels: dict = None): snapshot = MetricSnapshot( timestamp=time.time(), metric_name=metric, value=value, labels=labels or {} ) self.metrics.append(snapshot) self._check_threshold(metric, value) def _check_threshold(self, metric: str, value: float): threshold = self.thresholds.get(metric) if threshold and value > threshold: logger.warning(f"ALERT: {metric}={value:.3f} exceeds threshold {threshold}")

monitor = MLOpsMonitor()

CI/CD Integration

yaml

.github/workflows/ml-pipeline.yml

name: ML Pipeline

on: push: paths: ['src/', 'data/']

jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements.txt - name: Run kubernetes run: python -m src.kubeflow_ml_pipelines env: MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }} - name: Check model quality run: python -m src.validate_model

Best Practices

Resources

Also available in 中文.