LLM Caching Strategies: Reducing Costs and Latency at Scale

Introduction: LLM API calls are expensive and slow. A single GPT-4 request can cost cents and take seconds—multiply that by thousands of users and costs spiral quickly. Caching is the most effective way to reduce both cost and latency. But LLM caching is different from traditional caching: exact string matches are rare, and semantically similar queries should return cached results. This guide covers practical caching strategies: exact match caching for identical requests, semantic caching that finds similar queries using embeddings, prefix caching for shared context, and tiered caching that combines multiple approaches. Whether you’re building a chatbot, search system, or API service, effective caching can reduce costs by 50-90% while dramatically improving response times.

LLM Caching
LLM Caching: Exact Match, Semantic Cache, Prefix Cache

Exact Match Caching

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import hashlib
import json

@dataclass
class CacheEntry:
    """A cache entry."""
    
    key: str
    value: str
    created_at: datetime
    expires_at: datetime = None
    hit_count: int = 0
    metadata: dict = field(default_factory=dict)
    
    @property
    def is_expired(self) -> bool:
        if self.expires_at is None:
            return False
        return datetime.utcnow() > self.expires_at

class CacheBackend(ABC):
    """Abstract cache backend."""
    
    @abstractmethod
    async def get(self, key: str) -> Optional[CacheEntry]:
        """Get entry from cache."""
        pass
    
    @abstractmethod
    async def set(
        self,
        key: str,
        value: str,
        ttl: int = None,
        metadata: dict = None
    ) -> None:
        """Set entry in cache."""
        pass
    
    @abstractmethod
    async def delete(self, key: str) -> None:
        """Delete entry from cache."""
        pass
    
    @abstractmethod
    async def clear(self) -> None:
        """Clear all entries."""
        pass

class InMemoryCache(CacheBackend):
    """In-memory cache backend."""
    
    def __init__(self, max_size: int = 10000):
        self.max_size = max_size
        self._cache: dict[str, CacheEntry] = {}
    
    async def get(self, key: str) -> Optional[CacheEntry]:
        """Get entry from memory."""
        
        entry = self._cache.get(key)
        
        if entry is None:
            return None
        
        if entry.is_expired:
            del self._cache[key]
            return None
        
        entry.hit_count += 1
        return entry
    
    async def set(
        self,
        key: str,
        value: str,
        ttl: int = None,
        metadata: dict = None
    ) -> None:
        """Set entry in memory."""
        
        # Evict if at capacity
        if len(self._cache) >= self.max_size:
            self._evict_lru()
        
        expires_at = None
        if ttl:
            expires_at = datetime.utcnow() + timedelta(seconds=ttl)
        
        self._cache[key] = CacheEntry(
            key=key,
            value=value,
            created_at=datetime.utcnow(),
            expires_at=expires_at,
            metadata=metadata or {}
        )
    
    def _evict_lru(self) -> None:
        """Evict least recently used entry."""
        
        if not self._cache:
            return
        
        # Find entry with lowest hit count
        lru_key = min(
            self._cache.keys(),
            key=lambda k: self._cache[k].hit_count
        )
        del self._cache[lru_key]
    
    async def delete(self, key: str) -> None:
        """Delete entry from memory."""
        
        if key in self._cache:
            del self._cache[key]
    
    async def clear(self) -> None:
        """Clear all entries."""
        self._cache.clear()

class RedisCache(CacheBackend):
    """Redis cache backend."""
    
    def __init__(
        self,
        redis_client: Any,
        prefix: str = "llm_cache:"
    ):
        self.redis = redis_client
        self.prefix = prefix
    
    def _make_key(self, key: str) -> str:
        """Create prefixed key."""
        return f"{self.prefix}{key}"
    
    async def get(self, key: str) -> Optional[CacheEntry]:
        """Get entry from Redis."""
        
        redis_key = self._make_key(key)
        data = await self.redis.get(redis_key)
        
        if data is None:
            return None
        
        entry_data = json.loads(data)
        
        # Increment hit count
        entry_data["hit_count"] = entry_data.get("hit_count", 0) + 1
        await self.redis.set(redis_key, json.dumps(entry_data))
        
        return CacheEntry(
            key=key,
            value=entry_data["value"],
            created_at=datetime.fromisoformat(entry_data["created_at"]),
            expires_at=datetime.fromisoformat(entry_data["expires_at"]) if entry_data.get("expires_at") else None,
            hit_count=entry_data["hit_count"],
            metadata=entry_data.get("metadata", {})
        )
    
    async def set(
        self,
        key: str,
        value: str,
        ttl: int = None,
        metadata: dict = None
    ) -> None:
        """Set entry in Redis."""
        
        redis_key = self._make_key(key)
        
        entry_data = {
            "value": value,
            "created_at": datetime.utcnow().isoformat(),
            "expires_at": (datetime.utcnow() + timedelta(seconds=ttl)).isoformat() if ttl else None,
            "hit_count": 0,
            "metadata": metadata or {}
        }
        
        if ttl:
            await self.redis.setex(redis_key, ttl, json.dumps(entry_data))
        else:
            await self.redis.set(redis_key, json.dumps(entry_data))
    
    async def delete(self, key: str) -> None:
        """Delete entry from Redis."""
        
        redis_key = self._make_key(key)
        await self.redis.delete(redis_key)
    
    async def clear(self) -> None:
        """Clear all entries with prefix."""
        
        pattern = f"{self.prefix}*"
        cursor = 0
        
        while True:
            cursor, keys = await self.redis.scan(cursor, match=pattern)
            if keys:
                await self.redis.delete(*keys)
            if cursor == 0:
                break

class ExactMatchCache:
    """Exact match LLM cache."""
    
    def __init__(
        self,
        backend: CacheBackend,
        default_ttl: int = 3600
    ):
        self.backend = backend
        self.default_ttl = default_ttl
        
        self._hits = 0
        self._misses = 0
    
    def _create_key(
        self,
        prompt: str,
        model: str,
        temperature: float,
        system_prompt: str = None
    ) -> str:
        """Create cache key from request parameters."""
        
        key_data = {
            "prompt": prompt,
            "model": model,
            "temperature": temperature,
            "system_prompt": system_prompt
        }
        
        key_string = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_string.encode()).hexdigest()
    
    async def get(
        self,
        prompt: str,
        model: str,
        temperature: float = 0,
        system_prompt: str = None
    ) -> Optional[str]:
        """Get cached response."""
        
        key = self._create_key(prompt, model, temperature, system_prompt)
        entry = await self.backend.get(key)
        
        if entry:
            self._hits += 1
            return entry.value
        
        self._misses += 1
        return None
    
    async def set(
        self,
        prompt: str,
        model: str,
        response: str,
        temperature: float = 0,
        system_prompt: str = None,
        ttl: int = None
    ) -> None:
        """Cache response."""
        
        key = self._create_key(prompt, model, temperature, system_prompt)
        
        await self.backend.set(
            key=key,
            value=response,
            ttl=ttl or self.default_ttl,
            metadata={
                "model": model,
                "prompt_length": len(prompt),
                "response_length": len(response)
            }
        )
    
    @property
    def hit_rate(self) -> float:
        """Get cache hit rate."""
        
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        
        return {
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self.hit_rate
        }

Semantic Caching

from dataclasses import dataclass
from typing import Any, Optional
import numpy as np

@dataclass
class SemanticCacheEntry:
    """A semantic cache entry."""
    
    key: str
    prompt: str
    response: str
    embedding: list[float]
    created_at: datetime
    hit_count: int = 0
    metadata: dict = None

class SemanticCache:
    """Cache with semantic similarity matching."""
    
    def __init__(
        self,
        embedding_model: Any,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000,
        default_ttl: int = 3600
    ):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.max_entries = max_entries
        self.default_ttl = default_ttl
        
        self._entries: list[SemanticCacheEntry] = []
        self._embeddings: np.ndarray = None
        
        self._hits = 0
        self._misses = 0
    
    async def get(
        self,
        prompt: str,
        model: str = None
    ) -> Optional[str]:
        """Get semantically similar cached response."""
        
        if not self._entries:
            self._misses += 1
            return None
        
        # Get query embedding
        query_embedding = await self.embedding_model.embed([prompt])
        query_embedding = np.array(query_embedding[0])
        
        # Find most similar entry
        similarities = self._compute_similarities(query_embedding)
        best_idx = np.argmax(similarities)
        best_similarity = similarities[best_idx]
        
        if best_similarity >= self.similarity_threshold:
            entry = self._entries[best_idx]
            entry.hit_count += 1
            self._hits += 1
            return entry.response
        
        self._misses += 1
        return None
    
    def _compute_similarities(self, query: np.ndarray) -> np.ndarray:
        """Compute cosine similarities."""
        
        if self._embeddings is None:
            return np.array([])
        
        # Normalize
        query_norm = query / np.linalg.norm(query)
        
        # Compute dot products (cosine similarity for normalized vectors)
        similarities = np.dot(self._embeddings, query_norm)
        
        return similarities
    
    async def set(
        self,
        prompt: str,
        response: str,
        model: str = None,
        ttl: int = None
    ) -> None:
        """Cache response with embedding."""
        
        # Get embedding
        embedding = await self.embedding_model.embed([prompt])
        embedding = np.array(embedding[0])
        
        # Normalize
        embedding = embedding / np.linalg.norm(embedding)
        
        # Create entry
        entry = SemanticCacheEntry(
            key=hashlib.sha256(prompt.encode()).hexdigest()[:16],
            prompt=prompt,
            response=response,
            embedding=embedding.tolist(),
            created_at=datetime.utcnow(),
            metadata={"model": model}
        )
        
        # Add to cache
        self._entries.append(entry)
        
        # Update embeddings matrix
        if self._embeddings is None:
            self._embeddings = embedding.reshape(1, -1)
        else:
            self._embeddings = np.vstack([self._embeddings, embedding])
        
        # Evict if over capacity
        if len(self._entries) > self.max_entries:
            self._evict_lru()
    
    def _evict_lru(self) -> None:
        """Evict least recently used entries."""
        
        # Sort by hit count and remove lowest
        sorted_indices = sorted(
            range(len(self._entries)),
            key=lambda i: self._entries[i].hit_count
        )
        
        # Remove bottom 10%
        remove_count = max(1, len(self._entries) // 10)
        indices_to_remove = set(sorted_indices[:remove_count])
        
        self._entries = [
            e for i, e in enumerate(self._entries)
            if i not in indices_to_remove
        ]
        
        # Rebuild embeddings matrix
        if self._entries:
            self._embeddings = np.array([e.embedding for e in self._entries])
        else:
            self._embeddings = None
    
    @property
    def hit_rate(self) -> float:
        """Get cache hit rate."""
        
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        
        return {
            "entries": len(self._entries),
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self.hit_rate,
            "similarity_threshold": self.similarity_threshold
        }

class VectorDBSemanticCache:
    """Semantic cache using vector database."""
    
    def __init__(
        self,
        embedding_model: Any,
        vector_db: Any,
        collection_name: str = "llm_cache",
        similarity_threshold: float = 0.95
    ):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
        self.collection_name = collection_name
        self.similarity_threshold = similarity_threshold
        
        self._hits = 0
        self._misses = 0
    
    async def get(self, prompt: str) -> Optional[str]:
        """Get cached response from vector DB."""
        
        # Get query embedding
        embedding = await self.embedding_model.embed([prompt])
        
        # Search vector DB
        results = await self.vector_db.search(
            collection=self.collection_name,
            query_vector=embedding[0],
            limit=1
        )
        
        if results and results[0].score >= self.similarity_threshold:
            self._hits += 1
            return results[0].metadata.get("response")
        
        self._misses += 1
        return None
    
    async def set(
        self,
        prompt: str,
        response: str,
        model: str = None
    ) -> None:
        """Cache response in vector DB."""
        
        embedding = await self.embedding_model.embed([prompt])
        
        await self.vector_db.upsert(
            collection=self.collection_name,
            id=hashlib.sha256(prompt.encode()).hexdigest()[:16],
            vector=embedding[0],
            metadata={
                "prompt": prompt,
                "response": response,
                "model": model,
                "created_at": datetime.utcnow().isoformat()
            }
        )

Prefix Caching

from dataclasses import dataclass
from typing import Any, Optional
import hashlib

@dataclass
class PrefixCacheEntry:
    """A prefix cache entry."""
    
    prefix_hash: str
    prefix: str
    kv_cache: Any  # Model-specific KV cache
    created_at: datetime
    hit_count: int = 0

class PrefixCache:
    """Cache for shared prompt prefixes."""
    
    def __init__(
        self,
        max_entries: int = 100,
        min_prefix_length: int = 100
    ):
        self.max_entries = max_entries
        self.min_prefix_length = min_prefix_length
        
        self._entries: dict[str, PrefixCacheEntry] = {}
        self._hits = 0
        self._misses = 0
    
    def _hash_prefix(self, prefix: str) -> str:
        """Hash prefix for lookup."""
        return hashlib.sha256(prefix.encode()).hexdigest()
    
    def find_cached_prefix(self, prompt: str) -> Optional[tuple[str, Any]]:
        """Find longest cached prefix for prompt."""
        
        if len(prompt) < self.min_prefix_length:
            self._misses += 1
            return None
        
        # Try progressively shorter prefixes
        for length in range(len(prompt), self.min_prefix_length - 1, -100):
            prefix = prompt[:length]
            prefix_hash = self._hash_prefix(prefix)
            
            if prefix_hash in self._entries:
                entry = self._entries[prefix_hash]
                entry.hit_count += 1
                self._hits += 1
                return prefix, entry.kv_cache
        
        self._misses += 1
        return None
    
    def cache_prefix(
        self,
        prefix: str,
        kv_cache: Any
    ) -> None:
        """Cache a prefix with its KV cache."""
        
        if len(prefix) < self.min_prefix_length:
            return
        
        prefix_hash = self._hash_prefix(prefix)
        
        # Evict if at capacity
        if len(self._entries) >= self.max_entries:
            self._evict_lru()
        
        self._entries[prefix_hash] = PrefixCacheEntry(
            prefix_hash=prefix_hash,
            prefix=prefix,
            kv_cache=kv_cache,
            created_at=datetime.utcnow()
        )
    
    def _evict_lru(self) -> None:
        """Evict least recently used entry."""
        
        if not self._entries:
            return
        
        lru_hash = min(
            self._entries.keys(),
            key=lambda h: self._entries[h].hit_count
        )
        del self._entries[lru_hash]
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        
        total = self._hits + self._misses
        return {
            "entries": len(self._entries),
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self._hits / total if total > 0 else 0.0
        }

class SystemPromptCache:
    """Cache for system prompts (common prefix pattern)."""
    
    def __init__(self, max_prompts: int = 50):
        self.max_prompts = max_prompts
        self._cache: dict[str, dict] = {}
    
    def _hash_system_prompt(self, system_prompt: str) -> str:
        """Hash system prompt."""
        return hashlib.sha256(system_prompt.encode()).hexdigest()[:16]
    
    def get_or_create(
        self,
        system_prompt: str,
        create_fn: callable
    ) -> Any:
        """Get cached system prompt context or create new."""
        
        prompt_hash = self._hash_system_prompt(system_prompt)
        
        if prompt_hash in self._cache:
            self._cache[prompt_hash]["hits"] += 1
            return self._cache[prompt_hash]["context"]
        
        # Create new context
        context = create_fn(system_prompt)
        
        # Evict if at capacity
        if len(self._cache) >= self.max_prompts:
            self._evict_lru()
        
        self._cache[prompt_hash] = {
            "context": context,
            "system_prompt": system_prompt,
            "hits": 0,
            "created_at": datetime.utcnow()
        }
        
        return context
    
    def _evict_lru(self) -> None:
        """Evict least used entry."""
        
        if not self._cache:
            return
        
        lru_hash = min(
            self._cache.keys(),
            key=lambda h: self._cache[h]["hits"]
        )
        del self._cache[lru_hash]

Tiered Caching

from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum

class CacheTier(Enum):
    """Cache tier levels."""
    
    L1_EXACT = "l1_exact"
    L2_SEMANTIC = "l2_semantic"
    L3_PREFIX = "l3_prefix"
    MISS = "miss"

@dataclass
class TieredCacheResult:
    """Result from tiered cache lookup."""
    
    hit: bool
    tier: CacheTier
    response: str = None
    similarity: float = None

class TieredCache:
    """Multi-tier LLM cache."""
    
    def __init__(
        self,
        exact_cache: ExactMatchCache,
        semantic_cache: SemanticCache,
        prefix_cache: PrefixCache = None
    ):
        self.exact_cache = exact_cache
        self.semantic_cache = semantic_cache
        self.prefix_cache = prefix_cache
        
        self._tier_hits = {tier: 0 for tier in CacheTier}
    
    async def get(
        self,
        prompt: str,
        model: str,
        temperature: float = 0,
        system_prompt: str = None
    ) -> TieredCacheResult:
        """Look up in cache tiers."""
        
        # L1: Exact match (fastest)
        exact_result = await self.exact_cache.get(
            prompt=prompt,
            model=model,
            temperature=temperature,
            system_prompt=system_prompt
        )
        
        if exact_result:
            self._tier_hits[CacheTier.L1_EXACT] += 1
            return TieredCacheResult(
                hit=True,
                tier=CacheTier.L1_EXACT,
                response=exact_result,
                similarity=1.0
            )
        
        # L2: Semantic match (for temperature=0 only)
        if temperature == 0:
            semantic_result = await self.semantic_cache.get(
                prompt=prompt,
                model=model
            )
            
            if semantic_result:
                self._tier_hits[CacheTier.L2_SEMANTIC] += 1
                return TieredCacheResult(
                    hit=True,
                    tier=CacheTier.L2_SEMANTIC,
                    response=semantic_result
                )
        
        # L3: Prefix match (if available)
        if self.prefix_cache:
            prefix_result = self.prefix_cache.find_cached_prefix(prompt)
            if prefix_result:
                self._tier_hits[CacheTier.L3_PREFIX] += 1
                # Return prefix info for partial cache hit
                return TieredCacheResult(
                    hit=True,
                    tier=CacheTier.L3_PREFIX,
                    response=None  # Caller handles prefix continuation
                )
        
        self._tier_hits[CacheTier.MISS] += 1
        return TieredCacheResult(hit=False, tier=CacheTier.MISS)
    
    async def set(
        self,
        prompt: str,
        response: str,
        model: str,
        temperature: float = 0,
        system_prompt: str = None
    ) -> None:
        """Cache response in appropriate tiers."""
        
        # Always cache in exact match
        await self.exact_cache.set(
            prompt=prompt,
            model=model,
            response=response,
            temperature=temperature,
            system_prompt=system_prompt
        )
        
        # Cache in semantic for deterministic responses
        if temperature == 0:
            await self.semantic_cache.set(
                prompt=prompt,
                response=response,
                model=model
            )
    
    def get_stats(self) -> dict:
        """Get tiered cache statistics."""
        
        total = sum(self._tier_hits.values())
        
        return {
            "tier_hits": {
                tier.value: count
                for tier, count in self._tier_hits.items()
            },
            "tier_rates": {
                tier.value: count / total if total > 0 else 0
                for tier, count in self._tier_hits.items()
            },
            "total_requests": total,
            "overall_hit_rate": (total - self._tier_hits[CacheTier.MISS]) / total if total > 0 else 0,
            "exact_cache": self.exact_cache.get_stats(),
            "semantic_cache": self.semantic_cache.get_stats()
        }

class AdaptiveCache:
    """Cache that adapts thresholds based on performance."""
    
    def __init__(
        self,
        embedding_model: Any,
        initial_threshold: float = 0.95,
        min_threshold: float = 0.90,
        max_threshold: float = 0.99,
        adaptation_rate: float = 0.01
    ):
        self.embedding_model = embedding_model
        self.threshold = initial_threshold
        self.min_threshold = min_threshold
        self.max_threshold = max_threshold
        self.adaptation_rate = adaptation_rate
        
        self.semantic_cache = SemanticCache(
            embedding_model=embedding_model,
            similarity_threshold=initial_threshold
        )
        
        self._feedback_buffer: list[bool] = []
        self._buffer_size = 100
    
    async def get(self, prompt: str) -> Optional[str]:
        """Get with adaptive threshold."""
        
        # Update threshold in semantic cache
        self.semantic_cache.similarity_threshold = self.threshold
        
        return await self.semantic_cache.get(prompt)
    
    async def set(self, prompt: str, response: str) -> None:
        """Cache response."""
        await self.semantic_cache.set(prompt, response)
    
    def record_feedback(self, was_correct: bool) -> None:
        """Record whether cached response was acceptable."""
        
        self._feedback_buffer.append(was_correct)
        
        if len(self._feedback_buffer) > self._buffer_size:
            self._feedback_buffer.pop(0)
        
        # Adapt threshold based on feedback
        if len(self._feedback_buffer) >= 10:
            accuracy = sum(self._feedback_buffer) / len(self._feedback_buffer)
            
            if accuracy < 0.95:
                # Too many bad cache hits, increase threshold
                self.threshold = min(
                    self.max_threshold,
                    self.threshold + self.adaptation_rate
                )
            elif accuracy > 0.99:
                # Very accurate, can lower threshold for more hits
                self.threshold = max(
                    self.min_threshold,
                    self.threshold - self.adaptation_rate
                )
    
    def get_stats(self) -> dict:
        """Get adaptive cache statistics."""
        
        return {
            "current_threshold": self.threshold,
            "recent_accuracy": sum(self._feedback_buffer) / len(self._feedback_buffer) if self._feedback_buffer else None,
            "feedback_count": len(self._feedback_buffer),
            **self.semantic_cache.get_stats()
        }

Cached LLM Client

from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator

@dataclass
class CachedResponse:
    """Response with cache metadata."""
    
    content: str
    cached: bool
    cache_tier: CacheTier = None
    latency_ms: float = 0
    cost: float = 0

class CachedLLMClient:
    """LLM client with integrated caching."""
    
    def __init__(
        self,
        client: Any,
        cache: TieredCache,
        enable_caching: bool = True
    ):
        self.client = client
        self.cache = cache
        self.enable_caching = enable_caching
        
        self._total_requests = 0
        self._cached_requests = 0
        self._total_cost = 0.0
        self._saved_cost = 0.0
    
    async def chat(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini",
        temperature: float = 0,
        max_tokens: int = None,
        use_cache: bool = True
    ) -> CachedResponse:
        """Chat with caching."""
        
        import time
        
        start_time = time.time()
        self._total_requests += 1
        
        # Extract prompt and system prompt
        system_prompt = None
        user_prompt = ""
        
        for msg in messages:
            if msg["role"] == "system":
                system_prompt = msg["content"]
            elif msg["role"] == "user":
                user_prompt = msg["content"]
        
        # Check cache
        if self.enable_caching and use_cache:
            cache_result = await self.cache.get(
                prompt=user_prompt,
                model=model,
                temperature=temperature,
                system_prompt=system_prompt
            )
            
            if cache_result.hit and cache_result.response:
                self._cached_requests += 1
                
                # Estimate saved cost
                estimated_cost = self._estimate_cost(
                    user_prompt, cache_result.response, model
                )
                self._saved_cost += estimated_cost
                
                return CachedResponse(
                    content=cache_result.response,
                    cached=True,
                    cache_tier=cache_result.tier,
                    latency_ms=(time.time() - start_time) * 1000,
                    cost=0
                )
        
        # Call LLM
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens
        )
        
        content = response.choices[0].message.content
        
        # Calculate cost
        cost = self._calculate_cost(response.usage, model)
        self._total_cost += cost
        
        # Cache response
        if self.enable_caching and use_cache and temperature == 0:
            await self.cache.set(
                prompt=user_prompt,
                response=content,
                model=model,
                temperature=temperature,
                system_prompt=system_prompt
            )
        
        return CachedResponse(
            content=content,
            cached=False,
            latency_ms=(time.time() - start_time) * 1000,
            cost=cost
        )
    
    def _estimate_cost(
        self,
        prompt: str,
        response: str,
        model: str
    ) -> float:
        """Estimate cost of request."""
        
        # Rough token estimates
        prompt_tokens = len(prompt) // 4
        response_tokens = len(response) // 4
        
        # Model pricing (per 1K tokens)
        pricing = {
            "gpt-4o": (0.0025, 0.01),
            "gpt-4o-mini": (0.00015, 0.0006),
            "gpt-4-turbo": (0.01, 0.03),
        }
        
        input_price, output_price = pricing.get(model, (0.001, 0.002))
        
        return (prompt_tokens * input_price + response_tokens * output_price) / 1000
    
    def _calculate_cost(self, usage: Any, model: str) -> float:
        """Calculate actual cost from usage."""
        
        pricing = {
            "gpt-4o": (0.0025, 0.01),
            "gpt-4o-mini": (0.00015, 0.0006),
            "gpt-4-turbo": (0.01, 0.03),
        }
        
        input_price, output_price = pricing.get(model, (0.001, 0.002))
        
        return (
            usage.prompt_tokens * input_price +
            usage.completion_tokens * output_price
        ) / 1000
    
    def get_stats(self) -> dict:
        """Get client statistics."""
        
        return {
            "total_requests": self._total_requests,
            "cached_requests": self._cached_requests,
            "cache_rate": self._cached_requests / self._total_requests if self._total_requests > 0 else 0,
            "total_cost": self._total_cost,
            "saved_cost": self._saved_cost,
            "savings_rate": self._saved_cost / (self._total_cost + self._saved_cost) if (self._total_cost + self._saved_cost) > 0 else 0,
            "cache_stats": self.cache.get_stats()
        }

Production Caching Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class CacheRequest(BaseModel):
    prompt: str
    model: str = "gpt-4o-mini"
    temperature: float = 0
    system_prompt: Optional[str] = None

class CacheSetRequest(BaseModel):
    prompt: str
    response: str
    model: str = "gpt-4o-mini"
    temperature: float = 0
    system_prompt: Optional[str] = None
    ttl: Optional[int] = 3600

class ChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"
    temperature: float = 0
    max_tokens: Optional[int] = None
    use_cache: bool = True

# Initialize caching (would be configured properly)
# exact_cache = ExactMatchCache(InMemoryCache())
# semantic_cache = SemanticCache(embedding_model)
# tiered_cache = TieredCache(exact_cache, semantic_cache)
# cached_client = CachedLLMClient(openai_client, tiered_cache)

@app.post("/v1/cache/get")
async def cache_get(request: CacheRequest):
    """Get from cache."""
    
    # result = await tiered_cache.get(
    #     prompt=request.prompt,
    #     model=request.model,
    #     temperature=request.temperature,
    #     system_prompt=request.system_prompt
    # )
    
    return {
        "hit": False,
        "tier": None,
        "response": None
    }

@app.post("/v1/cache/set")
async def cache_set(request: CacheSetRequest):
    """Set in cache."""
    
    # await tiered_cache.set(
    #     prompt=request.prompt,
    #     response=request.response,
    #     model=request.model,
    #     temperature=request.temperature,
    #     system_prompt=request.system_prompt
    # )
    
    return {"status": "cached"}

@app.post("/v1/chat")
async def chat(request: ChatRequest):
    """Chat with caching."""
    
    # response = await cached_client.chat(
    #     messages=request.messages,
    #     model=request.model,
    #     temperature=request.temperature,
    #     max_tokens=request.max_tokens,
    #     use_cache=request.use_cache
    # )
    
    return {
        "content": "Response placeholder",
        "cached": False,
        "cache_tier": None,
        "latency_ms": 500,
        "cost": 0.001
    }

@app.delete("/v1/cache")
async def clear_cache():
    """Clear all caches."""
    
    # await exact_cache.backend.clear()
    
    return {"status": "cleared"}

@app.get("/v1/cache/stats")
async def get_cache_stats():
    """Get cache statistics."""
    
    return {
        "exact_cache": {
            "hits": 0,
            "misses": 0,
            "hit_rate": 0
        },
        "semantic_cache": {
            "entries": 0,
            "hits": 0,
            "misses": 0,
            "hit_rate": 0
        },
        "overall": {
            "total_requests": 0,
            "cached_requests": 0,
            "cache_rate": 0,
            "total_cost": 0,
            "saved_cost": 0
        }
    }

@app.get("/health")
async def health():
    return {"status": "healthy"}

References

Conclusion

Caching is the most effective way to reduce LLM costs and latency. Start with exact match caching—it’s simple, fast, and catches identical requests. Add semantic caching for temperature=0 requests to catch semantically similar queries that should return the same response. Use prefix caching for applications with shared system prompts or context. Implement tiered caching that checks exact match first (fastest), then semantic (more flexible), falling back to the LLM only on complete misses. Monitor your cache hit rates and adjust similarity thresholds based on feedback—too low and you’ll serve incorrect cached responses, too high and you’ll miss valid cache opportunities. Consider adaptive caching that automatically tunes thresholds based on user feedback. The key insight is that LLM responses are often deterministic for the same input (at temperature=0), and many user queries are semantically similar even if not identical. A well-tuned caching system can reduce costs by 50-90% while dramatically improving response times, making the difference between an expensive, slow application and one that’s both economical and responsive.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.