AI Data Pipelines: ETL and Preprocessing for ML Models
Build robust data pipelines that feed high-quality data to AI models
AI Data Pipelines: ETL and Preprocessing
Why Data Quality Matters
"Garbage in, garbage out" is especially true for ML models. Poor data quality leads to:Pipeline Architecture
Raw Data → Ingestion → Validation → Cleaning → Feature Engineering → Training Data Store
Data Validation with Great Expectations
python
import great_expectations as gxcontext = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("training_data")
Define expectations
suite = context.add_expectation_suite("training_data_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="label")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="age", min_value=0, max_value=120
)
)Validate
result = context.run_checkpoint(...)
Feature Engineering
python
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoderdef engineer_features(df: pd.DataFrame) -> pd.DataFrame:
# Handle missing values
df['age'].fillna(df['age'].median(), inplace=True)
df['category'].fillna('unknown', inplace=True)
# Encode categoricals
le = LabelEncoder()
df['category_encoded'] = le.fit_transform(df['category'])
# Create interaction features
df['age_income_ratio'] = df['age'] / (df['income'] + 1)
# Normalize numerical features
scaler = StandardScaler()
numerical_cols = ['age', 'income', 'score']
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])
return df
Pipeline Orchestration with Prefect
python
from prefect import flow, task
from datetime import timedelta@task(retries=3, retry_delay_seconds=60)
def extract_data(source_url: str) -> pd.DataFrame:
return pd.read_parquet(source_url)
@task
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
assert df.isnull().sum().sum() < len(df) * 0.05, "Too many nulls"
return df
@task
def transform_features(df: pd.DataFrame) -> pd.DataFrame:
return engineer_features(df)
@flow(name="ML Training Pipeline")
def ml_pipeline(source_url: str):
raw_data = extract_data(source_url)
validated = validate_data(raw_data)
features = transform_features(validated)
return features
Monitoring Data Drift
Use tools like Evidently or WhyLogs to detect when production data distribution shifts from training data.Also available in 中文.