LLM Rate Limiting and Throttling: Building Resilient AI Applications

Introduction: LLM APIs have strict rate limits—requests per minute, tokens per minute, and concurrent request caps. Hit these limits and your application grinds to a halt with 429 errors. Worse, aggressive retry logic can trigger longer cooldowns. Proper rate limiting isn’t just about staying under limits; it’s about maximizing throughput while gracefully handling bursts, prioritizing important requests, and providing good user experience even under load. This guide covers implementing robust rate limiting for LLM applications: token bucket algorithms, sliding windows, request queuing, exponential backoff, and production patterns for high-throughput systems.

Rate Limiting
Rate Limiting: Token Bucket, Sliding Window, and Request Queue

Token Bucket Rate Limiter

import time
import threading
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 60
    tokens_per_minute: int = 90000
    max_concurrent: int = 10

class TokenBucketLimiter:
    """Token bucket rate limiter for LLM APIs."""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        
        # Request bucket
        self.request_tokens = config.requests_per_minute
        self.request_capacity = config.requests_per_minute
        self.request_refill_rate = config.requests_per_minute / 60  # per second
        
        # Token bucket
        self.token_tokens = config.tokens_per_minute
        self.token_capacity = config.tokens_per_minute
        self.token_refill_rate = config.tokens_per_minute / 60  # per second
        
        # Concurrency
        self.concurrent = 0
        self.max_concurrent = config.max_concurrent
        
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        
        self.request_tokens = min(
            self.request_capacity,
            self.request_tokens + elapsed * self.request_refill_rate
        )
        
        self.token_tokens = min(
            self.token_capacity,
            self.token_tokens + elapsed * self.token_refill_rate
        )
        
        self.last_refill = now
    
    def acquire(self, estimated_tokens: int = 1000, timeout: float = 60) -> bool:
        """Try to acquire rate limit tokens."""
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            with self.lock:
                self._refill()
                
                # Check all limits
                if (self.request_tokens >= 1 and 
                    self.token_tokens >= estimated_tokens and
                    self.concurrent < self.max_concurrent):
                    
                    self.request_tokens -= 1
                    self.token_tokens -= estimated_tokens
                    self.concurrent += 1
                    return True
            
            # Wait and retry
            time.sleep(0.1)
        
        return False
    
    def release(self, actual_tokens: int = 0):
        """Release after request completes."""
        with self.lock:
            self.concurrent -= 1
            
            # Adjust token count if we overestimated
            # (tokens are already consumed, this is just tracking)
    
    def wait_time(self, estimated_tokens: int = 1000) -> float:
        """Estimate wait time until request can proceed."""
        with self.lock:
            self._refill()
            
            wait_for_request = max(0, (1 - self.request_tokens) / self.request_refill_rate)
            wait_for_tokens = max(0, (estimated_tokens - self.token_tokens) / self.token_refill_rate)
            
            return max(wait_for_request, wait_for_tokens)

# Usage
limiter = TokenBucketLimiter(RateLimitConfig(
    requests_per_minute=60,
    tokens_per_minute=90000,
    max_concurrent=5
))

def rate_limited_completion(prompt: str) -> str:
    """Make rate-limited LLM call."""
    from openai import OpenAI
    client = OpenAI()
    
    estimated_tokens = len(prompt) // 4 + 500
    
    if not limiter.acquire(estimated_tokens, timeout=30):
        raise Exception("Rate limit timeout")
    
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        
        actual_tokens = response.usage.total_tokens
        limiter.release(actual_tokens)
        
        return response.choices[0].message.content
    
    except Exception as e:
        limiter.release()
        raise

Sliding Window Rate Limiter

from collections import deque
from dataclasses import dataclass
import time
import threading

class SlidingWindowLimiter:
    """Sliding window rate limiter with precise tracking."""
    
    def __init__(self, requests_per_minute: int, tokens_per_minute: int):
        self.rpm_limit = requests_per_minute
        self.tpm_limit = tokens_per_minute
        
        # Track timestamps and token counts
        self.request_times: deque = deque()
        self.token_usage: deque = deque()  # (timestamp, tokens)
        
        self.lock = threading.Lock()
    
    def _cleanup(self):
        """Remove entries older than 1 minute."""
        cutoff = time.time() - 60
        
        while self.request_times and self.request_times[0] < cutoff:
            self.request_times.popleft()
        
        while self.token_usage and self.token_usage[0][0] < cutoff:
            self.token_usage.popleft()
    
    def can_proceed(self, estimated_tokens: int) -> tuple[bool, float]:
        """Check if request can proceed. Returns (can_proceed, wait_time)."""
        with self.lock:
            self._cleanup()
            
            current_rpm = len(self.request_times)
            current_tpm = sum(tokens for _, tokens in self.token_usage)
            
            if current_rpm >= self.rpm_limit:
                # Wait until oldest request expires
                wait_time = self.request_times[0] + 60 - time.time()
                return False, max(0, wait_time)
            
            if current_tpm + estimated_tokens > self.tpm_limit:
                # Wait until enough tokens free up
                wait_time = self.token_usage[0][0] + 60 - time.time()
                return False, max(0, wait_time)
            
            return True, 0
    
    def record(self, tokens_used: int):
        """Record a completed request."""
        with self.lock:
            now = time.time()
            self.request_times.append(now)
            self.token_usage.append((now, tokens_used))
    
    def get_usage(self) -> dict:
        """Get current usage stats."""
        with self.lock:
            self._cleanup()
            
            return {
                "requests_used": len(self.request_times),
                "requests_limit": self.rpm_limit,
                "tokens_used": sum(t for _, t in self.token_usage),
                "tokens_limit": self.tpm_limit
            }

# Async-friendly sliding window
import asyncio

class AsyncSlidingWindowLimiter:
    """Async sliding window rate limiter."""
    
    def __init__(self, requests_per_minute: int, tokens_per_minute: int):
        self.rpm_limit = requests_per_minute
        self.tpm_limit = tokens_per_minute
        self.request_times: deque = deque()
        self.token_usage: deque = deque()
        self.lock = asyncio.Lock()
    
    async def acquire(self, estimated_tokens: int, timeout: float = 60):
        """Acquire rate limit slot, waiting if necessary."""
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            async with self.lock:
                self._cleanup()
                
                current_rpm = len(self.request_times)
                current_tpm = sum(t for _, t in self.token_usage)
                
                if current_rpm < self.rpm_limit and current_tpm + estimated_tokens <= self.tpm_limit:
                    now = time.time()
                    self.request_times.append(now)
                    self.token_usage.append((now, estimated_tokens))
                    return True
            
            await asyncio.sleep(0.1)
        
        raise TimeoutError("Rate limit acquisition timeout")
    
    def _cleanup(self):
        cutoff = time.time() - 60
        while self.request_times and self.request_times[0] < cutoff:
            self.request_times.popleft()
        while self.token_usage and self.token_usage[0][0] < cutoff:
            self.token_usage.popleft()

Request Queue with Priority

import heapq
from dataclasses import dataclass, field
from typing import Any, Callable
import threading
import queue

@dataclass(order=True)
class PrioritizedRequest:
    priority: int
    timestamp: float = field(compare=False)
    request_id: str = field(compare=False)
    callback: Callable = field(compare=False)
    args: tuple = field(compare=False)
    kwargs: dict = field(compare=False)

class PriorityRequestQueue:
    """Priority queue for rate-limited requests."""
    
    def __init__(self, limiter: TokenBucketLimiter, workers: int = 3):
        self.limiter = limiter
        self.queue: list = []
        self.lock = threading.Lock()
        self.workers = workers
        self.running = False
        self.results: dict = {}
    
    def submit(
        self,
        callback: Callable,
        *args,
        priority: int = 5,  # 1 = highest, 10 = lowest
        **kwargs
    ) -> str:
        """Submit a request to the queue."""
        import uuid
        
        request_id = str(uuid.uuid4())
        
        request = PrioritizedRequest(
            priority=priority,
            timestamp=time.time(),
            request_id=request_id,
            callback=callback,
            args=args,
            kwargs=kwargs
        )
        
        with self.lock:
            heapq.heappush(self.queue, request)
        
        return request_id
    
    def start(self):
        """Start worker threads."""
        self.running = True
        
        for i in range(self.workers):
            thread = threading.Thread(target=self._worker, daemon=True)
            thread.start()
    
    def stop(self):
        """Stop workers."""
        self.running = False
    
    def _worker(self):
        """Worker thread that processes queue."""
        while self.running:
            request = None
            
            with self.lock:
                if self.queue:
                    request = heapq.heappop(self.queue)
            
            if request is None:
                time.sleep(0.1)
                continue
            
            # Wait for rate limit
            estimated_tokens = request.kwargs.get("estimated_tokens", 1000)
            
            if self.limiter.acquire(estimated_tokens, timeout=60):
                try:
                    result = request.callback(*request.args, **request.kwargs)
                    self.results[request.request_id] = {"success": True, "result": result}
                except Exception as e:
                    self.results[request.request_id] = {"success": False, "error": str(e)}
                finally:
                    self.limiter.release()
            else:
                # Timeout - requeue with lower priority
                request.priority += 1
                with self.lock:
                    heapq.heappush(self.queue, request)
    
    def get_result(self, request_id: str, timeout: float = 60) -> Any:
        """Wait for and get result."""
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            if request_id in self.results:
                return self.results.pop(request_id)
            time.sleep(0.1)
        
        raise TimeoutError("Result not available")

# Usage
limiter = TokenBucketLimiter(RateLimitConfig())
queue = PriorityRequestQueue(limiter)
queue.start()

# Submit requests with different priorities
high_priority_id = queue.submit(
    rate_limited_completion,
    "Important query",
    priority=1
)

low_priority_id = queue.submit(
    rate_limited_completion,
    "Background task",
    priority=10
)

# Get results
result = queue.get_result(high_priority_id)

Exponential Backoff with Jitter

import random
from functools import wraps

def exponential_backoff(
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """Decorator for exponential backoff with jitter."""
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                
                except Exception as e:
                    last_exception = e
                    error_str = str(e).lower()
                    
                    # Check if retryable
                    retryable = any(msg in error_str for msg in [
                        "rate limit", "429", "too many requests",
                        "overloaded", "timeout", "503", "529"
                    ])
                    
                    if not retryable:
                        raise
                    
                    if attempt == max_retries - 1:
                        raise
                    
                    # Calculate delay
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )
                    
                    # Add jitter
                    if jitter:
                        delay = delay * (0.5 + random.random())
                    
                    print(f"Rate limited, retrying in {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
                    time.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator

# Usage
@exponential_backoff(max_retries=5, base_delay=1.0)
def resilient_completion(prompt: str) -> str:
    """LLM call with automatic retry."""
    from openai import OpenAI
    client = OpenAI()
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

# Async version
async def async_exponential_backoff(
    func,
    *args,
    max_retries: int = 5,
    base_delay: float = 1.0,
    **kwargs
):
    """Async exponential backoff."""
    
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            if "rate limit" not in str(e).lower():
                raise
            
            delay = base_delay * (2 ** attempt) * (0.5 + random.random())
            await asyncio.sleep(delay)

Production Rate Limiting

from dataclasses import dataclass
import redis

class RedisRateLimiter:
    """Distributed rate limiter using Redis."""
    
    def __init__(self, redis_client: redis.Redis, key_prefix: str = "ratelimit"):
        self.redis = redis_client
        self.prefix = key_prefix
    
    def check_rate_limit(
        self,
        identifier: str,  # user_id, api_key, etc.
        requests_per_minute: int,
        tokens_per_minute: int,
        estimated_tokens: int
    ) -> tuple[bool, dict]:
        """Check if request is within rate limits."""
        
        now = int(time.time())
        minute_key = f"{self.prefix}:{identifier}:{now // 60}"
        
        pipe = self.redis.pipeline()
        
        # Get current counts
        pipe.hget(minute_key, "requests")
        pipe.hget(minute_key, "tokens")
        
        results = pipe.execute()
        
        current_requests = int(results[0] or 0)
        current_tokens = int(results[1] or 0)
        
        # Check limits
        if current_requests >= requests_per_minute:
            return False, {
                "reason": "requests_per_minute",
                "current": current_requests,
                "limit": requests_per_minute,
                "retry_after": 60 - (now % 60)
            }
        
        if current_tokens + estimated_tokens > tokens_per_minute:
            return False, {
                "reason": "tokens_per_minute",
                "current": current_tokens,
                "limit": tokens_per_minute,
                "retry_after": 60 - (now % 60)
            }
        
        return True, {}
    
    def record_usage(self, identifier: str, tokens_used: int):
        """Record usage after successful request."""
        now = int(time.time())
        minute_key = f"{self.prefix}:{identifier}:{now // 60}"
        
        pipe = self.redis.pipeline()
        pipe.hincrby(minute_key, "requests", 1)
        pipe.hincrby(minute_key, "tokens", tokens_used)
        pipe.expire(minute_key, 120)  # Expire after 2 minutes
        pipe.execute()

# FastAPI middleware for rate limiting
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()
redis_client = redis.Redis()
rate_limiter = RedisRateLimiter(redis_client)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    """Rate limiting middleware."""
    
    # Get identifier (API key, user ID, IP)
    identifier = request.headers.get("X-API-Key") or request.client.host
    
    # Estimate tokens from request body
    body = await request.body()
    estimated_tokens = len(body) // 4 + 500
    
    # Check rate limit
    allowed, info = rate_limiter.check_rate_limit(
        identifier=identifier,
        requests_per_minute=60,
        tokens_per_minute=90000,
        estimated_tokens=estimated_tokens
    )
    
    if not allowed:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded", **info},
            headers={"Retry-After": str(info.get("retry_after", 60))}
        )
    
    response = await call_next(request)
    
    # Record usage (would need actual token count from response)
    rate_limiter.record_usage(identifier, estimated_tokens)
    
    return response

References

Conclusion

Rate limiting is the unsung hero of production LLM applications. Without it, traffic spikes trigger cascading 429 errors, aggressive retries make things worse, and your application becomes unreliable. A well-designed rate limiter smooths traffic, prioritizes important requests, and gracefully degrades under load. Start with a simple token bucket for single-instance applications. Add sliding windows for more precise tracking. Implement request queues with priority for complex workloads. Use Redis for distributed rate limiting across multiple instances. Always implement exponential backoff with jitter for retries—it's the difference between recovering gracefully and hammering an already-stressed API. Monitor your rate limit usage, set alerts before hitting limits, and consider tiered limits for different user classes. The goal isn't just staying under limits—it's maximizing throughput while providing consistent, reliable service.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.