← Back to tutorials

Continuous Training Pipelines

Automated model retraining triggered by data or performance changes

Continuous Training Pipelines

Overview

Automated model retraining triggered by data or performance changes. This guide covers practical implementation for production ML systems.

Why This Matters in MLOps

Setup

bash

Install required tools

pip install airflow mlflow pandas numpy scikit-learn

Or with Docker

docker pull python:3.11-slim

Core Implementation

python
import os
import json
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class ContinuousTrainingPipelines: """ Continuous Training Pipelines implementation. Handles: automation Tool: airflow """ def __init__(self, config: dict = None): self.config = config or self._default_config() self._setup() def _default_config(self) -> dict: return { "tool": "airflow", "environment": os.getenv("ENVIRONMENT", "development"), "log_level": "INFO", } def _setup(self): """Initialize airflow connection and resources.""" logging.basicConfig(level=self.config.get("log_level", "INFO")) logger.info(f"Initialized Continuous Training Pipelines with config: {self.config}") def run(self, **kwargs) -> dict: """Execute automation.""" start = datetime.utcnow() try: result = self._execute(**kwargs) elapsed = (datetime.utcnow() - start).total_seconds() logger.info(f"Continuous Training Pipelines completed in {elapsed:.2f}s") return { "status": "success", "result": result, "elapsed_seconds": elapsed } except Exception as e: logger.error(f"Continuous Training Pipelines failed: {e}") return { "status": "failed", "error": str(e) } def _execute(self, **kwargs) -> dict: """Core automation logic. Override to customize.""" return {"completed": True, "tool": "airflow"}

Configuration

config = { "tool": "airflow", "tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"), "artifact_root": "./artifacts", }

Initialize

processor = ContinuousTrainingPipelines(config) result = processor.run() print(json.dumps(result, indent=2))

AIRFLOW Integration

python

Specific airflow integration for automation

import subprocess

def setup_airflow(): """Configure airflow for automation.""" # Initialize project print(f"Setting up airflow for automation...") # Example configuration config = { "project": "my-ml-project", "tool": "airflow", "specialty": "automation", "version": "1.0.0" } # Save configuration Path(".airflow").mkdir(exist_ok=True) with open(f".airflow/config.json", "w") as f: json.dump(config, f, indent=2) print(f"airflow configured for automation") return config

config = setup_airflow()

Monitoring and Alerting

python
from dataclasses import dataclass
import time

class MLOpsMonitor: """Monitor automation metrics.""" def __init__(self): self.metrics: list[MetricSnapshot] = [] self.thresholds = { "error_rate": 0.05, "latency_p99_ms": 1000, "data_drift_score": 0.3 } def record(self, metric: str, value: float, labels: dict = None): snapshot = MetricSnapshot( timestamp=time.time(), metric_name=metric, value=value, labels=labels or {} ) self.metrics.append(snapshot) self._check_threshold(metric, value) def _check_threshold(self, metric: str, value: float): threshold = self.thresholds.get(metric) if threshold and value > threshold: logger.warning(f"ALERT: {metric}={value:.3f} exceeds threshold {threshold}")

monitor = MLOpsMonitor()

CI/CD Integration

yaml

.github/workflows/ml-pipeline.yml

name: ML Pipeline

on: push: paths: ['src/', 'data/']

jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements.txt - name: Run automation run: python -m src.continuous_training_pipelines env: MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }} - name: Check model quality run: python -m src.validate_model

Best Practices

Resources

Also available in 中文.