Error Handling in LLM Applications: Retry, Fallback, and Circuit Breakers

Introduction: LLM APIs fail in ways traditional APIs don’t—rate limits, content filters, malformed outputs, timeouts on long generations, and model-specific quirks. Building resilient LLM applications requires comprehensive error handling: retry logic with exponential backoff, fallback strategies when primary models fail, circuit breakers to prevent cascade failures, and graceful degradation for user-facing applications. This guide covers practical patterns for handling every type of LLM failure you’ll encounter in production.

Error Handling
Error Handling: Retry Logic, Fallback Strategies, and Circuit Breakers

Retry Logic with Exponential Backoff

import time
import random
from functools import wraps
from typing import Callable, TypeVar, Type
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError

T = TypeVar("T")

class RetryConfig:
    """Configuration for retry behavior."""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retryable_exceptions: tuple = (RateLimitError, APITimeoutError, APIConnectionError)
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retryable_exceptions = retryable_exceptions

def retry_with_backoff(config: RetryConfig = None):
    """Decorator for retry with exponential backoff."""
    
    if config is None:
        config = RetryConfig()
    
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            last_exception = None
            
            for attempt in range(config.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                    
                except config.retryable_exceptions as e:
                    last_exception = e
                    
                    if attempt == config.max_retries:
                        raise
                    
                    # Calculate delay with exponential backoff
                    delay = min(
                        config.base_delay * (config.exponential_base ** attempt),
                        config.max_delay
                    )
                    
                    # Add jitter to prevent thundering herd
                    if config.jitter:
                        delay = delay * (0.5 + random.random())
                    
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
                    time.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator

# Usage
client = OpenAI()

@retry_with_backoff(RetryConfig(max_retries=3, base_delay=1.0))
def get_completion(prompt: str) -> str:
    """Get completion with automatic retry."""
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

# Async version
import asyncio

async def retry_async(
    func: Callable,
    *args,
    config: RetryConfig = None,
    **kwargs
):
    """Async retry with exponential backoff."""
    
    if config is None:
        config = RetryConfig()
    
    last_exception = None
    
    for attempt in range(config.max_retries + 1):
        try:
            return await func(*args, **kwargs)
            
        except config.retryable_exceptions as e:
            last_exception = e
            
            if attempt == config.max_retries:
                raise
            
            delay = min(
                config.base_delay * (config.exponential_base ** attempt),
                config.max_delay
            )
            
            if config.jitter:
                delay = delay * (0.5 + random.random())
            
            await asyncio.sleep(delay)
    
    raise last_exception

Fallback Strategies

from dataclasses import dataclass
from typing import Optional, Any
from abc import ABC, abstractmethod

@dataclass
class ModelConfig:
    name: str
    provider: str
    priority: int
    timeout: float = 30.0

class FallbackChain:
    """Chain of fallback models."""
    
    def __init__(self, models: list[ModelConfig]):
        self.models = sorted(models, key=lambda m: m.priority)
        self.clients = self._init_clients()
    
    def _init_clients(self) -> dict:
        """Initialize clients for each provider."""
        
        clients = {}
        
        for model in self.models:
            if model.provider == "openai" and "openai" not in clients:
                clients["openai"] = OpenAI()
            elif model.provider == "anthropic" and "anthropic" not in clients:
                from anthropic import Anthropic
                clients["anthropic"] = Anthropic()
        
        return clients
    
    def _call_model(self, model: ModelConfig, prompt: str) -> str:
        """Call a specific model."""
        
        if model.provider == "openai":
            response = self.clients["openai"].chat.completions.create(
                model=model.name,
                messages=[{"role": "user", "content": prompt}],
                timeout=model.timeout
            )
            return response.choices[0].message.content
        
        elif model.provider == "anthropic":
            response = self.clients["anthropic"].messages.create(
                model=model.name,
                max_tokens=4096,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text
        
        raise ValueError(f"Unknown provider: {model.provider}")
    
    def complete(self, prompt: str) -> tuple[str, ModelConfig]:
        """Try models in order until one succeeds."""
        
        errors = []
        
        for model in self.models:
            try:
                result = self._call_model(model, prompt)
                return result, model
                
            except Exception as e:
                errors.append((model.name, str(e)))
                continue
        
        # All models failed
        error_summary = "\n".join([f"- {name}: {err}" for name, err in errors])
        raise RuntimeError(f"All models failed:\n{error_summary}")

# Usage
chain = FallbackChain([
    ModelConfig(name="gpt-4o", provider="openai", priority=1),
    ModelConfig(name="gpt-4o-mini", provider="openai", priority=2),
    ModelConfig(name="claude-3-5-sonnet-20241022", provider="anthropic", priority=3),
])

result, used_model = chain.complete("Explain quantum computing")
print(f"Response from {used_model.name}: {result[:100]}...")

# Fallback with different strategies
class FallbackStrategy(ABC):
    """Base class for fallback strategies."""
    
    @abstractmethod
    def get_fallback(self, prompt: str, error: Exception) -> str:
        pass

class CachedFallback(FallbackStrategy):
    """Return cached response if available."""
    
    def __init__(self):
        self.cache: dict[str, str] = {}
    
    def cache_response(self, prompt: str, response: str):
        self.cache[prompt] = response
    
    def get_fallback(self, prompt: str, error: Exception) -> str:
        if prompt in self.cache:
            return self.cache[prompt]
        raise error

class DefaultFallback(FallbackStrategy):
    """Return a default response."""
    
    def __init__(self, default_message: str):
        self.default_message = default_message
    
    def get_fallback(self, prompt: str, error: Exception) -> str:
        return self.default_message

class GracefulDegradation(FallbackStrategy):
    """Provide degraded but functional response."""
    
    def __init__(self, degraded_fn: Callable[[str], str]):
        self.degraded_fn = degraded_fn
    
    def get_fallback(self, prompt: str, error: Exception) -> str:
        return self.degraded_fn(prompt)

Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker for LLM API calls."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = timedelta(seconds=recovery_timeout)
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.half_open_calls = 0
        self.lock = Lock()
    
    def _should_allow_request(self) -> bool:
        """Check if request should be allowed."""
        
        with self.lock:
            if self.state == CircuitState.CLOSED:
                return True
            
            if self.state == CircuitState.OPEN:
                # Check if recovery timeout has passed
                if datetime.now() - self.last_failure_time > self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                    return True
                return False
            
            if self.state == CircuitState.HALF_OPEN:
                if self.half_open_calls < self.half_open_max_calls:
                    self.half_open_calls += 1
                    return True
                return False
        
        return False
    
    def record_success(self):
        """Record a successful call."""
        
        with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0
    
    def record_failure(self):
        """Record a failed call."""
        
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.OPEN
            elif self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
    
    def call(self, func: Callable[..., T], *args, **kwargs) -> T:
        """Execute function with circuit breaker protection."""
        
        if not self._should_allow_request():
            raise CircuitOpenError(
                f"Circuit is {self.state.value}. "
                f"Retry after {self.recovery_timeout.seconds}s"
            )
        
        try:
            result = func(*args, **kwargs)
            self.record_success()
            return result
            
        except Exception as e:
            self.record_failure()
            raise

class CircuitOpenError(Exception):
    """Raised when circuit breaker is open."""
    pass

# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

def get_completion_with_circuit(prompt: str) -> str:
    """Get completion with circuit breaker protection."""
    
    def _call():
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    return circuit.call(_call)

# Per-model circuit breakers
class MultiModelCircuitBreaker:
    """Manage circuit breakers for multiple models."""
    
    def __init__(self):
        self.breakers: dict[str, CircuitBreaker] = {}
    
    def get_breaker(self, model: str) -> CircuitBreaker:
        """Get or create circuit breaker for model."""
        
        if model not in self.breakers:
            self.breakers[model] = CircuitBreaker()
        
        return self.breakers[model]
    
    def get_healthy_models(self) -> list[str]:
        """Get list of models with closed circuits."""
        
        return [
            model for model, breaker in self.breakers.items()
            if breaker.state == CircuitState.CLOSED
        ]

Error Classification and Handling

from openai import (
    OpenAI,
    APIError,
    RateLimitError,
    APITimeoutError,
    APIConnectionError,
    BadRequestError,
    AuthenticationError,
    PermissionDeniedError,
    NotFoundError,
    UnprocessableEntityError,
    InternalServerError
)

class LLMErrorHandler:
    """Comprehensive error handler for LLM APIs."""
    
    def __init__(self):
        self.error_counts: dict[str, int] = {}
    
    def classify_error(self, error: Exception) -> dict:
        """Classify error and return handling strategy."""
        
        error_type = type(error).__name__
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
        
        # Rate limit errors
        if isinstance(error, RateLimitError):
            return {
                "category": "rate_limit",
                "retryable": True,
                "retry_after": self._extract_retry_after(error),
                "strategy": "exponential_backoff",
                "user_message": "Service is busy. Please try again in a moment."
            }
        
        # Timeout errors
        if isinstance(error, APITimeoutError):
            return {
                "category": "timeout",
                "retryable": True,
                "retry_after": 1.0,
                "strategy": "immediate_retry",
                "user_message": "Request timed out. Retrying..."
            }
        
        # Connection errors
        if isinstance(error, APIConnectionError):
            return {
                "category": "connection",
                "retryable": True,
                "retry_after": 5.0,
                "strategy": "exponential_backoff",
                "user_message": "Connection issue. Please check your network."
            }
        
        # Bad request (usually prompt issues)
        if isinstance(error, BadRequestError):
            return {
                "category": "bad_request",
                "retryable": False,
                "strategy": "fix_request",
                "user_message": "Invalid request. Please modify your input."
            }
        
        # Content filter
        if isinstance(error, UnprocessableEntityError):
            return {
                "category": "content_filter",
                "retryable": False,
                "strategy": "modify_content",
                "user_message": "Content was filtered. Please rephrase your request."
            }
        
        # Authentication errors
        if isinstance(error, (AuthenticationError, PermissionDeniedError)):
            return {
                "category": "auth",
                "retryable": False,
                "strategy": "check_credentials",
                "user_message": "Authentication failed. Please contact support."
            }
        
        # Server errors
        if isinstance(error, InternalServerError):
            return {
                "category": "server_error",
                "retryable": True,
                "retry_after": 10.0,
                "strategy": "fallback_model",
                "user_message": "Service temporarily unavailable."
            }
        
        # Unknown errors
        return {
            "category": "unknown",
            "retryable": False,
            "strategy": "log_and_alert",
            "user_message": "An unexpected error occurred."
        }
    
    def _extract_retry_after(self, error: RateLimitError) -> float:
        """Extract retry-after from rate limit error."""
        
        # Try to get from headers
        if hasattr(error, "response") and error.response:
            retry_after = error.response.headers.get("retry-after")
            if retry_after:
                return float(retry_after)
        
        # Default backoff
        return 60.0
    
    def get_stats(self) -> dict:
        """Get error statistics."""
        return dict(self.error_counts)

# Comprehensive error handling wrapper
class ResilientLLMClient:
    """LLM client with comprehensive error handling."""
    
    def __init__(self):
        self.client = OpenAI()
        self.error_handler = LLMErrorHandler()
        self.circuit_breaker = CircuitBreaker()
        self.fallback_chain = FallbackChain([
            ModelConfig(name="gpt-4o-mini", provider="openai", priority=1),
            ModelConfig(name="gpt-4o", provider="openai", priority=2),
        ])
    
    def complete(
        self,
        prompt: str,
        max_retries: int = 3,
        use_fallback: bool = True
    ) -> str:
        """Get completion with full error handling."""
        
        last_error = None
        
        for attempt in range(max_retries):
            try:
                # Check circuit breaker
                if self.circuit_breaker.state == CircuitState.OPEN:
                    if use_fallback:
                        return self._try_fallback(prompt)
                    raise CircuitOpenError("Primary model circuit is open")
                
                # Try primary model
                response = self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}]
                )
                
                self.circuit_breaker.record_success()
                return response.choices[0].message.content
                
            except Exception as e:
                last_error = e
                self.circuit_breaker.record_failure()
                
                error_info = self.error_handler.classify_error(e)
                
                if not error_info["retryable"]:
                    if use_fallback and error_info["strategy"] == "fallback_model":
                        return self._try_fallback(prompt)
                    raise
                
                # Wait before retry
                retry_after = error_info.get("retry_after", 1.0)
                time.sleep(retry_after * (2 ** attempt))
        
        # All retries failed, try fallback
        if use_fallback:
            return self._try_fallback(prompt)
        
        raise last_error
    
    def _try_fallback(self, prompt: str) -> str:
        """Try fallback models."""
        result, _ = self.fallback_chain.complete(prompt)
        return result

Output Validation and Recovery

import json
from pydantic import BaseModel, ValidationError

class OutputValidator:
    """Validate and recover from malformed LLM outputs."""
    
    def __init__(self, client: OpenAI):
        self.client = client
    
    def validate_json(
        self,
        output: str,
        schema: dict = None,
        auto_fix: bool = True
    ) -> dict:
        """Validate JSON output with optional auto-fix."""
        
        # Try to parse as-is
        try:
            data = json.loads(output)
            
            if schema:
                self._validate_schema(data, schema)
            
            return data
            
        except json.JSONDecodeError as e:
            if not auto_fix:
                raise
            
            # Try to fix common issues
            fixed = self._fix_json(output)
            
            if fixed:
                return json.loads(fixed)
            
            # Ask LLM to fix
            return self._llm_fix_json(output, str(e))
    
    def _fix_json(self, output: str) -> Optional[str]:
        """Try to fix common JSON issues."""
        
        # Remove markdown code blocks
        if "```json" in output:
            start = output.find("```json") + 7
            end = output.find("```", start)
            output = output[start:end].strip()
        elif "```" in output:
            start = output.find("```") + 3
            end = output.find("```", start)
            output = output[start:end].strip()
        
        # Try to extract JSON object
        start = output.find("{")
        end = output.rfind("}") + 1
        
        if start != -1 and end > start:
            try:
                return output[start:end]
            except:
                pass
        
        return None
    
    def _llm_fix_json(self, output: str, error: str) -> dict:
        """Use LLM to fix malformed JSON."""
        
        fix_prompt = f"""
The following output should be valid JSON but has an error:

Output:
{output}

Error: {error}

Please return ONLY the corrected valid JSON, nothing else.
"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": fix_prompt}],
            response_format={"type": "json_object"}
        )
        
        return json.loads(response.choices[0].message.content)
    
    def _validate_schema(self, data: dict, schema: dict):
        """Validate data against schema."""
        
        required = schema.get("required", [])
        
        for field in required:
            if field not in data:
                raise ValueError(f"Missing required field: {field}")
    
    def validate_pydantic(
        self,
        output: str,
        model: type[BaseModel],
        max_retries: int = 2
    ) -> BaseModel:
        """Validate output against Pydantic model with retries."""
        
        last_error = None
        
        for attempt in range(max_retries + 1):
            try:
                data = self.validate_json(output)
                return model.model_validate(data)
                
            except ValidationError as e:
                last_error = e
                
                if attempt < max_retries:
                    # Ask LLM to fix validation errors
                    output = self._fix_validation_errors(output, str(e), model)
        
        raise last_error
    
    def _fix_validation_errors(
        self,
        output: str,
        errors: str,
        model: type[BaseModel]
    ) -> str:
        """Fix Pydantic validation errors."""
        
        schema = model.model_json_schema()
        
        fix_prompt = f"""
The following JSON has validation errors:

JSON:
{output}

Errors:
{errors}

Expected schema:
{json.dumps(schema, indent=2)}

Return the corrected JSON that matches the schema.
"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": fix_prompt}],
            response_format={"type": "json_object"}
        )
        
        return response.choices[0].message.content

Production Error Handling Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging

app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize resilient client
llm_client = ResilientLLMClient()

class CompletionRequest(BaseModel):
    prompt: str
    max_retries: int = 3
    use_fallback: bool = True

class CompletionResponse(BaseModel):
    content: str
    model_used: str
    retries: int
    fallback_used: bool

@app.post("/complete", response_model=CompletionResponse)
async def complete(request: CompletionRequest):
    """Completion endpoint with comprehensive error handling."""
    
    try:
        result = llm_client.complete(
            prompt=request.prompt,
            max_retries=request.max_retries,
            use_fallback=request.use_fallback
        )
        
        return CompletionResponse(
            content=result,
            model_used="gpt-4o-mini",
            retries=0,
            fallback_used=False
        )
        
    except CircuitOpenError as e:
        logger.warning(f"Circuit open: {e}")
        raise HTTPException(
            status_code=503,
            detail="Service temporarily unavailable. Please try again later."
        )
        
    except RateLimitError as e:
        logger.warning(f"Rate limited: {e}")
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Please slow down requests."
        )
        
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise HTTPException(
            status_code=500,
            detail="An error occurred processing your request."
        )

@app.get("/health")
async def health():
    """Health check with circuit breaker status."""
    
    return {
        "status": "healthy",
        "circuit_state": llm_client.circuit_breaker.state.value,
        "error_stats": llm_client.error_handler.get_stats()
    }

References

Conclusion

Robust error handling is essential for production LLM applications. Implement retry logic with exponential backoff and jitter for transient failures. Use fallback chains to maintain availability when primary models fail. Circuit breakers prevent cascade failures and give failing services time to recover. Classify errors to apply appropriate strategies—some errors are retryable, others require fallback, and some need user intervention. Validate outputs and implement auto-fix mechanisms for malformed responses. Monitor error rates and circuit breaker states to detect issues early. The goal is graceful degradation: users should experience reduced functionality rather than complete failure when LLM services have issues.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.