← Back to tutorials

Model Serving with Ray Serve

Scalable ML model serving using Ray Serve

Model Serving with Ray Serve

Overview

Scalable ML model serving using Ray Serve. This guide covers practical implementation for production ML systems.

Why This Matters in MLOps

Setup

bash

Install required tools

pip install ray mlflow pandas numpy scikit-learn

Or with Docker

docker pull python:3.11-slim

Core Implementation

python
import os
import json
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class ModelServingwithRayServe: """ Model Serving with Ray Serve implementation. Handles: serving Tool: ray """ def __init__(self, config: dict = None): self.config = config or self._default_config() self._setup() def _default_config(self) -> dict: return { "tool": "ray", "environment": os.getenv("ENVIRONMENT", "development"), "log_level": "INFO", } def _setup(self): """Initialize ray connection and resources.""" logging.basicConfig(level=self.config.get("log_level", "INFO")) logger.info(f"Initialized Model Serving with Ray Serve with config: {self.config}") def run(self, **kwargs) -> dict: """Execute serving.""" start = datetime.utcnow() try: result = self._execute(**kwargs) elapsed = (datetime.utcnow() - start).total_seconds() logger.info(f"Model Serving with Ray Serve completed in {elapsed:.2f}s") return { "status": "success", "result": result, "elapsed_seconds": elapsed } except Exception as e: logger.error(f"Model Serving with Ray Serve failed: {e}") return { "status": "failed", "error": str(e) } def _execute(self, **kwargs) -> dict: """Core serving logic. Override to customize.""" return {"completed": True, "tool": "ray"}

Configuration

config = { "tool": "ray", "tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"), "artifact_root": "./artifacts", }

Initialize

processor = ModelServingwithRayServe(config) result = processor.run() print(json.dumps(result, indent=2))

RAY Integration

python

Specific ray integration for serving

import subprocess

def setup_ray(): """Configure ray for serving.""" # Initialize project print(f"Setting up ray for serving...") # Example configuration config = { "project": "my-ml-project", "tool": "ray", "specialty": "serving", "version": "1.0.0" } # Save configuration Path(".ray").mkdir(exist_ok=True) with open(f".ray/config.json", "w") as f: json.dump(config, f, indent=2) print(f"ray configured for serving") return config

config = setup_ray()

Monitoring and Alerting

python
from dataclasses import dataclass
import time

class MLOpsMonitor: """Monitor serving metrics.""" def __init__(self): self.metrics: list[MetricSnapshot] = [] self.thresholds = { "error_rate": 0.05, "latency_p99_ms": 1000, "data_drift_score": 0.3 } def record(self, metric: str, value: float, labels: dict = None): snapshot = MetricSnapshot( timestamp=time.time(), metric_name=metric, value=value, labels=labels or {} ) self.metrics.append(snapshot) self._check_threshold(metric, value) def _check_threshold(self, metric: str, value: float): threshold = self.thresholds.get(metric) if threshold and value > threshold: logger.warning(f"ALERT: {metric}={value:.3f} exceeds threshold {threshold}")

monitor = MLOpsMonitor()

CI/CD Integration

yaml

.github/workflows/ml-pipeline.yml

name: ML Pipeline

on: push: paths: ['src/', 'data/']

jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: pip install -r requirements.txt - name: Run serving run: python -m src.model_serving_with_ray_serve env: MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }} - name: Check model quality run: python -m src.validate_model

Best Practices

Resources

Also available in 中文.