MLOps Best Practices: Building Production Machine Learning Pipelines That Scale

Here’s a stat that might sting: 87% of ML projects never make it to production. I’ve seen it happen. Teams spend months building a model in Jupyter, celebrate the accuracy metrics, then realize they have no idea how to actually deploy the thing.

This is where MLOps comes in. It’s not glamorous. It won’t get you Twitter followers. But it’s the difference between “we built a model” and “we have a working system that drives business value.”

Series Progress: Part 1: Foundations → Part 2: Types → Part 3: Frameworks → Part 4: MLOps (You are here) → Part 5: Enterprise Apps

MLOps Architecture - End-to-End ML Pipeline
Figure 1: MLOps architecture showing data management, model development, deployment, and monitoring

The MLOps Stack That Actually Works

Component My Recommendation
Experiment Tracking MLflow (self-hosted or Databricks)
Data Versioning DVC or Delta Lake
Feature Store Feast (open source) or cloud-native
Model Registry MLflow Model Registry
Serving FastAPI + Docker or cloud endpoints
Orchestration Airflow or cloud-native (Step Functions, etc.)

Experiment Tracking with MLflow

Every ML project needs experiment tracking. Without it, you’ll be staring at 47 model files named model_final_v2_actually_final.pkl.

# experiment_tracking.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.model_selection import train_test_split

# Point to your MLflow server
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("fraud-detection-v2")

def train_and_log(X, y, params):
    """Train model with full MLflow tracking."""
    
    with mlflow.start_run():
        # Log parameters
        mlflow.log_params(params)
        mlflow.log_param("dataset_size", len(X))
        
        # Split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # Train
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]
        
        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "f1": f1_score(y_test, y_pred),
            "auc_roc": roc_auc_score(y_test, y_prob)
        }
        mlflow.log_metrics(metrics)
        
        # Log model with signature
        from mlflow.models.signature import infer_signature
        signature = infer_signature(X_train, y_pred)
        
        mlflow.sklearn.log_model(
            model, "model",
            signature=signature,
            registered_model_name="fraud-detector"
        )
        
        print(f"Metrics: {metrics}")
        return mlflow.active_run().info.run_id

Model Registry: The Source of Truth

# model_registry.py
from mlflow.tracking import MlflowClient
import pandas as pd

client = MlflowClient()

def promote_to_production(model_name, version, approved_by):
    """Promote a model version to production with audit trail."""
    
    # Archive current production model
    try:
        current = client.get_latest_versions(model_name, stages=["Production"])
        if current:
            client.transition_model_version_stage(
                name=model_name,
                version=current[0].version,
                stage="Archived"
            )
    except Exception as e:
        print(f"No current production model: {e}")
    
    # Promote new version
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production"
    )
    
    # Add governance tags
    client.set_model_version_tag(model_name, version, "approved_by", approved_by)
    client.set_model_version_tag(model_name, version, "promoted_at", 
                                  pd.Timestamp.now().isoformat())
    
    print(f"Version {version} promoted to Production")

def load_production_model(model_name):
    """Load the current production model."""
    import mlflow.pyfunc
    return mlflow.pyfunc.load_model(f"models:/{model_name}/Production")

CI/CD for ML

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
    paths: ['src/**', 'data/**']

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/unit -v
      - name: Validate data schema
        run: python scripts/validate_data.py

  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Train model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
        run: python src/train.py
      - name: Validate model performance
        run: |
          python scripts/validate_model.py \
            --min-auc 0.85 \
            --max-latency-ms 100

  deploy-staging:
    needs: train
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - name: Deploy to staging
        run: python scripts/deploy.py --env staging
      - name: Integration tests
        run: pytest tests/integration -v

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy to production (canary)
        run: python scripts/deploy.py --env production --canary-percent 10

Monitoring: Drift Detection

# drift_detection.py
import numpy as np
from scipy import stats

def calculate_psi(expected, actual, bins=10):
    """
    Population Stability Index - measures distribution shift.
    PSI < 0.1: No significant change
    PSI 0.1-0.2: Moderate change, investigate
    PSI > 0.2: Significant change, likely need to retrain
    """
    expected_pct = np.histogram(expected, bins=bins)[0] / len(expected)
    actual_pct = np.histogram(actual, bins=bins)[0] / len(actual)
    
    # Avoid log(0)
    expected_pct = np.clip(expected_pct, 0.0001, None)
    actual_pct = np.clip(actual_pct, 0.0001, None)
    
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
    return psi

def check_drift(reference_data, production_data, feature_names, threshold=0.2):
    """Check all features for drift."""
    alerts = []
    
    for i, feature in enumerate(feature_names):
        psi = calculate_psi(reference_data[:, i], production_data[:, i])
        
        if psi > threshold:
            alerts.append({"feature": feature, "psi": psi, "status": "ALERT"})
            print(f"DRIFT ALERT: {feature} PSI={psi:.3f}")
        elif psi > threshold / 2:
            print(f"WARNING: {feature} PSI={psi:.3f}")
    
    return alerts

# Run daily/weekly
# alerts = check_drift(training_data, last_week_data, feature_names)
# if alerts: send_slack_alert(alerts)

Cloud Platform Quick Reference

Component AWS Azure GCP
Experiment Tracking SageMaker Experiments Azure ML Vertex AI Experiments
Model Registry SageMaker Registry Azure ML Models Vertex AI Model Registry
Pipelines SageMaker Pipelines Azure ML Pipelines Vertex AI Pipelines
Feature Store SageMaker Feature Store Azure ML (preview) Vertex AI Feature Store

Key Takeaways

  • Track everything: Experiments, data versions, model versions.
  • Automate the pipeline: Manual deployments don’t scale.
  • Monitor actively: Models degrade. Detect drift early.
  • Start simple: MLflow + GitHub Actions gets you far.

References & Further Reading

What’s your biggest MLOps headache? Find me on GitHub or drop a comment.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.