← Back to tutorials

Data Pipeline Observability

Monitoring and alerting for ML data pipeline health

Data Pipeline Observability

Overview

Monitoring and alerting for ML data pipeline health. This guide covers practical implementation for production ML systems.

Why This Matters in MLOps

Setup

bash

Install required tools

pip install opentelemetry mlflow pandas numpy scikit-learn

Or with Docker

docker pull python:3.11-slim

Core Implementation

python
import os
import json
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class DataPipelineObservability: """ Data Pipeline Observability implementation. Handles: observability Tool: opentelemetry """ def __init__(self, config: dict = None): self.config = config or self._default_config() self._setup() def _default_config(self) -> dict: return { "tool": "opentelemetry", "environment": os.getenv("ENVIRONMENT", "development"), "log_level": "INFO", } def _setup(self): """Initialize opentelemetry connection and resources.""" logging.basicConfig(level=self.config.get("log_level", "INFO")) logger.info(f"Initialized Data Pipeline Observability with config: {self.config}") def run(self, **kwargs) -> dict: """Execute observability.""" start = datetime.utcnow() try: result = self._execute(**kwargs) elapsed = (datetime.utcnow() - start).total_seconds() logger.info(f"Data Pipeline Observability completed in {elapsed:.2f}s") return { "status": "success", "result": result, "elapsed_seconds": elapsed } except Exception as e: logger.error(f"Data Pipeline Observability failed: {e}") return { "status": "failed", "error": str(e) } def _execute(self, **kwargs) -> dict: """Core observability logic. Override to customize.""" return {"completed": True, "tool": "opentelemetry"}

Configuration

config = { "tool": "opentelemetry", "tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"), "artifact_root": "./artifacts", }

Initialize

processor = DataPipelineObservability(config) result = processor.run() print(json.dumps(result, indent=2))

OPENTELEMETRY Integration

python

Specific opentelemetry integration for observability

import subprocess

def setup_opentelemetry(): """Configure opentelemetry for observability.""" # Initialize project print(f"Setting up opentelemetry for observability...") # Example configuration config = { "project": "my-ml-project", "tool": "opentelemetry", "specialty": "observability", "version": "1.0.0" } # Save configuration Path(".opentelemetry").mkdir(exist_ok=True) with open(f".opentelemetry/config.json", "w") as f: json.dump(config, f, indent=2) print(f"opentelemetry configured for observability") return config

config = setup_opentelemetry()

Monitoring and Alerting

python
from dataclasses import dataclass
import time

class MLOpsMonitor: """Monitor observability metrics.""" def __init__(self): self.metrics: list[MetricSnapshot] = [] self.thresholds = { "error_rate": 0.05, "latency_p99_ms": 1000, "data_drift_score": 0.3 } def record(self, metric: str, value: float, labels: dict = None): snapshot = MetricSnapshot( timestamp=time.time(), metric_name=metric, value=value, labels=labels or {} ) self.metrics.append(snapshot) self._check_threshold(metric, value) def _check_threshold(self, metric: str, value: float): threshold = self.thresholds.get(metric) if threshold and value > threshold: logger.warning(f"ALERT: {metric}={value:.3f} exceeds threshold {threshold}")

monitor = MLOpsMonitor()

CI/CD Integration

yaml

.github/workflows/ml-pipeline.yml

name: ML Pipeline

on: push: paths: ['src/', 'data/']

jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements.txt - name: Run observability run: python -m src.data_pipeline_observability env: MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }} - name: Check model quality run: python -m src.validate_model

Best Practices

Resources

Also available in 中文.