Model Serving with Ray Serve
Scalable ML model serving using Ray Serve
Model Serving with Ray Serve
Scalable ML model serving using Ray Serve
Model Serving with Ray Serve Overview Scalable ML model serving using Ray Serve. This guide covers practical implementation for production ML systems. Why This Matters in MLOps Modern ML systems require rigorous operations practices: - **Reliabil
Model Serving with Ray Serve
Overview
Scalable ML model serving using Ray Serve. This guide covers practical implementation for production ML systems.
Why This Matters in MLOps
Modern ML systems require rigorous operations practices:
Setup
bash
Install required tools
pip install ray mlflow pandas numpy scikit-learnOr with Docker
docker pull python:3.11-slim
Core Implementation
python
import os
import json
import logging
from datetime import datetime
from pathlib import Pathlogger = logging.getLogger(__name__)
class ModelServingwithRayServe:
"""
Model Serving with Ray Serve implementation.
Handles: serving
Tool: ray
"""
def __init__(self, config: dict = None):
self.config = config or self._default_config()
self._setup()
def _default_config(self) -> dict:
return {
"tool": "ray",
"environment": os.getenv("ENVIRONMENT", "development"),
"log_level": "INFO",
}
def _setup(self):
"""Initialize ray connection and resources."""
logging.basicConfig(level=self.config.get("log_level", "INFO"))
logger.info(f"Initialized Model Serving with Ray Serve with config: {self.config}")
def run(self, **kwargs) -> dict:
"""Execute serving."""
start = datetime.utcnow()
try:
result = self._execute(**kwargs)
elapsed = (datetime.utcnow() - start).total_seconds()
logger.info(f"Model Serving with Ray Serve completed in {elapsed:.2f}s")
return {
"status": "success",
"result": result,
"elapsed_seconds": elapsed
}
except Exception as e:
logger.error(f"Model Serving with Ray Serve failed: {e}")
return {
"status": "failed",
"error": str(e)
}
def _execute(self, **kwargs) -> dict:
"""Core serving logic. Override to customize."""
return {"completed": True, "tool": "ray"}
Configuration
config = {
"tool": "ray",
"tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"),
"artifact_root": "./artifacts",
}Initialize
processor = ModelServingwithRayServe(config)
result = processor.run()
print(json.dumps(result, indent=2))
RAY Integration
python
Specific ray integration for serving
import subprocessdef setup_ray():
"""Configure ray for serving."""
# Initialize project
print(f"Setting up ray for serving...")
# Example configuration
config = {
"project": "my-ml-project",
"tool": "ray",
"specialty": "serving",
"version": "1.0.0"
}
# Save configuration
Path(".ray").mkdir(exist_ok=True)
with open(f".ray/config.json", "w") as f:
json.dump(config, f, indent=2)
print(f"ray configured for serving")
return config
config = setup_ray()
Monitoring and Alerting
python
from dataclasses import dataclass
import time@dataclass
class MetricSnapshot:
timestamp: float
metric_name: str
value: float
labels: dict
class MLOpsMonitor:
"""Monitor serving metrics."""
def __init__(self):
self.metrics: list[MetricSnapshot] = []
self.thresholds = {
"error_rate": 0.05,
"latency_p99_ms": 1000,
"data_drift_score": 0.3
}
def record(self, metric: str, value: float, labels: dict = None):
snapshot = MetricSnapshot(
timestamp=time.time(),
metric_name=metric,
value=value,
labels=labels or {}
)
self.metrics.append(snapshot)
self._check_threshold(metric, value)
def _check_threshold(self, metric: str, value: float):
threshold = self.thresholds.get(metric)
if threshold and value > threshold:
logger.warning(f"ALERT: {metric}={value:.3f} exceeds threshold {threshold}")
monitor = MLOpsMonitor()
CI/CD Integration
yaml
.github/workflows/ml-pipeline.yml
name: ML Pipelineon:
push:
paths: ['src/', 'data/']
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run serving
run: python -m src.model_serving_with_ray_serve
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Check model quality
run: python -m src.validate_model
Best Practices
Resources
相关工具