Introduction: LLM API calls are expensive and slow. A single GPT-4 request can cost cents and take seconds—multiply that by thousands of users and costs spiral quickly. Caching is the most effective way to reduce both cost and latency. But LLM caching is different from traditional caching: exact string matches are rare, and semantically similar queries should return cached results. This guide covers practical caching strategies: exact match caching for identical requests, semantic caching that finds similar queries using embeddings, prefix caching for shared context, and tiered caching that combines multiple approaches. Whether you’re building a chatbot, search system, or API service, effective caching can reduce costs by 50-90% while dramatically improving response times.

Exact Match Caching
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import hashlib
import json
@dataclass
class CacheEntry:
"""A cache entry."""
key: str
value: str
created_at: datetime
expires_at: datetime = None
hit_count: int = 0
metadata: dict = field(default_factory=dict)
@property
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.utcnow() > self.expires_at
class CacheBackend(ABC):
"""Abstract cache backend."""
@abstractmethod
async def get(self, key: str) -> Optional[CacheEntry]:
"""Get entry from cache."""
pass
@abstractmethod
async def set(
self,
key: str,
value: str,
ttl: int = None,
metadata: dict = None
) -> None:
"""Set entry in cache."""
pass
@abstractmethod
async def delete(self, key: str) -> None:
"""Delete entry from cache."""
pass
@abstractmethod
async def clear(self) -> None:
"""Clear all entries."""
pass
class InMemoryCache(CacheBackend):
"""In-memory cache backend."""
def __init__(self, max_size: int = 10000):
self.max_size = max_size
self._cache: dict[str, CacheEntry] = {}
async def get(self, key: str) -> Optional[CacheEntry]:
"""Get entry from memory."""
entry = self._cache.get(key)
if entry is None:
return None
if entry.is_expired:
del self._cache[key]
return None
entry.hit_count += 1
return entry
async def set(
self,
key: str,
value: str,
ttl: int = None,
metadata: dict = None
) -> None:
"""Set entry in memory."""
# Evict if at capacity
if len(self._cache) >= self.max_size:
self._evict_lru()
expires_at = None
if ttl:
expires_at = datetime.utcnow() + timedelta(seconds=ttl)
self._cache[key] = CacheEntry(
key=key,
value=value,
created_at=datetime.utcnow(),
expires_at=expires_at,
metadata=metadata or {}
)
def _evict_lru(self) -> None:
"""Evict least recently used entry."""
if not self._cache:
return
# Find entry with lowest hit count
lru_key = min(
self._cache.keys(),
key=lambda k: self._cache[k].hit_count
)
del self._cache[lru_key]
async def delete(self, key: str) -> None:
"""Delete entry from memory."""
if key in self._cache:
del self._cache[key]
async def clear(self) -> None:
"""Clear all entries."""
self._cache.clear()
class RedisCache(CacheBackend):
"""Redis cache backend."""
def __init__(
self,
redis_client: Any,
prefix: str = "llm_cache:"
):
self.redis = redis_client
self.prefix = prefix
def _make_key(self, key: str) -> str:
"""Create prefixed key."""
return f"{self.prefix}{key}"
async def get(self, key: str) -> Optional[CacheEntry]:
"""Get entry from Redis."""
redis_key = self._make_key(key)
data = await self.redis.get(redis_key)
if data is None:
return None
entry_data = json.loads(data)
# Increment hit count
entry_data["hit_count"] = entry_data.get("hit_count", 0) + 1
await self.redis.set(redis_key, json.dumps(entry_data))
return CacheEntry(
key=key,
value=entry_data["value"],
created_at=datetime.fromisoformat(entry_data["created_at"]),
expires_at=datetime.fromisoformat(entry_data["expires_at"]) if entry_data.get("expires_at") else None,
hit_count=entry_data["hit_count"],
metadata=entry_data.get("metadata", {})
)
async def set(
self,
key: str,
value: str,
ttl: int = None,
metadata: dict = None
) -> None:
"""Set entry in Redis."""
redis_key = self._make_key(key)
entry_data = {
"value": value,
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(seconds=ttl)).isoformat() if ttl else None,
"hit_count": 0,
"metadata": metadata or {}
}
if ttl:
await self.redis.setex(redis_key, ttl, json.dumps(entry_data))
else:
await self.redis.set(redis_key, json.dumps(entry_data))
async def delete(self, key: str) -> None:
"""Delete entry from Redis."""
redis_key = self._make_key(key)
await self.redis.delete(redis_key)
async def clear(self) -> None:
"""Clear all entries with prefix."""
pattern = f"{self.prefix}*"
cursor = 0
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern)
if keys:
await self.redis.delete(*keys)
if cursor == 0:
break
class ExactMatchCache:
"""Exact match LLM cache."""
def __init__(
self,
backend: CacheBackend,
default_ttl: int = 3600
):
self.backend = backend
self.default_ttl = default_ttl
self._hits = 0
self._misses = 0
def _create_key(
self,
prompt: str,
model: str,
temperature: float,
system_prompt: str = None
) -> str:
"""Create cache key from request parameters."""
key_data = {
"prompt": prompt,
"model": model,
"temperature": temperature,
"system_prompt": system_prompt
}
key_string = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_string.encode()).hexdigest()
async def get(
self,
prompt: str,
model: str,
temperature: float = 0,
system_prompt: str = None
) -> Optional[str]:
"""Get cached response."""
key = self._create_key(prompt, model, temperature, system_prompt)
entry = await self.backend.get(key)
if entry:
self._hits += 1
return entry.value
self._misses += 1
return None
async def set(
self,
prompt: str,
model: str,
response: str,
temperature: float = 0,
system_prompt: str = None,
ttl: int = None
) -> None:
"""Cache response."""
key = self._create_key(prompt, model, temperature, system_prompt)
await self.backend.set(
key=key,
value=response,
ttl=ttl or self.default_ttl,
metadata={
"model": model,
"prompt_length": len(prompt),
"response_length": len(response)
}
)
@property
def hit_rate(self) -> float:
"""Get cache hit rate."""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
def get_stats(self) -> dict:
"""Get cache statistics."""
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate": self.hit_rate
}
Semantic Caching
from dataclasses import dataclass
from typing import Any, Optional
import numpy as np
@dataclass
class SemanticCacheEntry:
"""A semantic cache entry."""
key: str
prompt: str
response: str
embedding: list[float]
created_at: datetime
hit_count: int = 0
metadata: dict = None
class SemanticCache:
"""Cache with semantic similarity matching."""
def __init__(
self,
embedding_model: Any,
similarity_threshold: float = 0.95,
max_entries: int = 10000,
default_ttl: int = 3600
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.max_entries = max_entries
self.default_ttl = default_ttl
self._entries: list[SemanticCacheEntry] = []
self._embeddings: np.ndarray = None
self._hits = 0
self._misses = 0
async def get(
self,
prompt: str,
model: str = None
) -> Optional[str]:
"""Get semantically similar cached response."""
if not self._entries:
self._misses += 1
return None
# Get query embedding
query_embedding = await self.embedding_model.embed([prompt])
query_embedding = np.array(query_embedding[0])
# Find most similar entry
similarities = self._compute_similarities(query_embedding)
best_idx = np.argmax(similarities)
best_similarity = similarities[best_idx]
if best_similarity >= self.similarity_threshold:
entry = self._entries[best_idx]
entry.hit_count += 1
self._hits += 1
return entry.response
self._misses += 1
return None
def _compute_similarities(self, query: np.ndarray) -> np.ndarray:
"""Compute cosine similarities."""
if self._embeddings is None:
return np.array([])
# Normalize
query_norm = query / np.linalg.norm(query)
# Compute dot products (cosine similarity for normalized vectors)
similarities = np.dot(self._embeddings, query_norm)
return similarities
async def set(
self,
prompt: str,
response: str,
model: str = None,
ttl: int = None
) -> None:
"""Cache response with embedding."""
# Get embedding
embedding = await self.embedding_model.embed([prompt])
embedding = np.array(embedding[0])
# Normalize
embedding = embedding / np.linalg.norm(embedding)
# Create entry
entry = SemanticCacheEntry(
key=hashlib.sha256(prompt.encode()).hexdigest()[:16],
prompt=prompt,
response=response,
embedding=embedding.tolist(),
created_at=datetime.utcnow(),
metadata={"model": model}
)
# Add to cache
self._entries.append(entry)
# Update embeddings matrix
if self._embeddings is None:
self._embeddings = embedding.reshape(1, -1)
else:
self._embeddings = np.vstack([self._embeddings, embedding])
# Evict if over capacity
if len(self._entries) > self.max_entries:
self._evict_lru()
def _evict_lru(self) -> None:
"""Evict least recently used entries."""
# Sort by hit count and remove lowest
sorted_indices = sorted(
range(len(self._entries)),
key=lambda i: self._entries[i].hit_count
)
# Remove bottom 10%
remove_count = max(1, len(self._entries) // 10)
indices_to_remove = set(sorted_indices[:remove_count])
self._entries = [
e for i, e in enumerate(self._entries)
if i not in indices_to_remove
]
# Rebuild embeddings matrix
if self._entries:
self._embeddings = np.array([e.embedding for e in self._entries])
else:
self._embeddings = None
@property
def hit_rate(self) -> float:
"""Get cache hit rate."""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
def get_stats(self) -> dict:
"""Get cache statistics."""
return {
"entries": len(self._entries),
"hits": self._hits,
"misses": self._misses,
"hit_rate": self.hit_rate,
"similarity_threshold": self.similarity_threshold
}
class VectorDBSemanticCache:
"""Semantic cache using vector database."""
def __init__(
self,
embedding_model: Any,
vector_db: Any,
collection_name: str = "llm_cache",
similarity_threshold: float = 0.95
):
self.embedding_model = embedding_model
self.vector_db = vector_db
self.collection_name = collection_name
self.similarity_threshold = similarity_threshold
self._hits = 0
self._misses = 0
async def get(self, prompt: str) -> Optional[str]:
"""Get cached response from vector DB."""
# Get query embedding
embedding = await self.embedding_model.embed([prompt])
# Search vector DB
results = await self.vector_db.search(
collection=self.collection_name,
query_vector=embedding[0],
limit=1
)
if results and results[0].score >= self.similarity_threshold:
self._hits += 1
return results[0].metadata.get("response")
self._misses += 1
return None
async def set(
self,
prompt: str,
response: str,
model: str = None
) -> None:
"""Cache response in vector DB."""
embedding = await self.embedding_model.embed([prompt])
await self.vector_db.upsert(
collection=self.collection_name,
id=hashlib.sha256(prompt.encode()).hexdigest()[:16],
vector=embedding[0],
metadata={
"prompt": prompt,
"response": response,
"model": model,
"created_at": datetime.utcnow().isoformat()
}
)
Prefix Caching
from dataclasses import dataclass
from typing import Any, Optional
import hashlib
@dataclass
class PrefixCacheEntry:
"""A prefix cache entry."""
prefix_hash: str
prefix: str
kv_cache: Any # Model-specific KV cache
created_at: datetime
hit_count: int = 0
class PrefixCache:
"""Cache for shared prompt prefixes."""
def __init__(
self,
max_entries: int = 100,
min_prefix_length: int = 100
):
self.max_entries = max_entries
self.min_prefix_length = min_prefix_length
self._entries: dict[str, PrefixCacheEntry] = {}
self._hits = 0
self._misses = 0
def _hash_prefix(self, prefix: str) -> str:
"""Hash prefix for lookup."""
return hashlib.sha256(prefix.encode()).hexdigest()
def find_cached_prefix(self, prompt: str) -> Optional[tuple[str, Any]]:
"""Find longest cached prefix for prompt."""
if len(prompt) < self.min_prefix_length:
self._misses += 1
return None
# Try progressively shorter prefixes
for length in range(len(prompt), self.min_prefix_length - 1, -100):
prefix = prompt[:length]
prefix_hash = self._hash_prefix(prefix)
if prefix_hash in self._entries:
entry = self._entries[prefix_hash]
entry.hit_count += 1
self._hits += 1
return prefix, entry.kv_cache
self._misses += 1
return None
def cache_prefix(
self,
prefix: str,
kv_cache: Any
) -> None:
"""Cache a prefix with its KV cache."""
if len(prefix) < self.min_prefix_length:
return
prefix_hash = self._hash_prefix(prefix)
# Evict if at capacity
if len(self._entries) >= self.max_entries:
self._evict_lru()
self._entries[prefix_hash] = PrefixCacheEntry(
prefix_hash=prefix_hash,
prefix=prefix,
kv_cache=kv_cache,
created_at=datetime.utcnow()
)
def _evict_lru(self) -> None:
"""Evict least recently used entry."""
if not self._entries:
return
lru_hash = min(
self._entries.keys(),
key=lambda h: self._entries[h].hit_count
)
del self._entries[lru_hash]
def get_stats(self) -> dict:
"""Get cache statistics."""
total = self._hits + self._misses
return {
"entries": len(self._entries),
"hits": self._hits,
"misses": self._misses,
"hit_rate": self._hits / total if total > 0 else 0.0
}
class SystemPromptCache:
"""Cache for system prompts (common prefix pattern)."""
def __init__(self, max_prompts: int = 50):
self.max_prompts = max_prompts
self._cache: dict[str, dict] = {}
def _hash_system_prompt(self, system_prompt: str) -> str:
"""Hash system prompt."""
return hashlib.sha256(system_prompt.encode()).hexdigest()[:16]
def get_or_create(
self,
system_prompt: str,
create_fn: callable
) -> Any:
"""Get cached system prompt context or create new."""
prompt_hash = self._hash_system_prompt(system_prompt)
if prompt_hash in self._cache:
self._cache[prompt_hash]["hits"] += 1
return self._cache[prompt_hash]["context"]
# Create new context
context = create_fn(system_prompt)
# Evict if at capacity
if len(self._cache) >= self.max_prompts:
self._evict_lru()
self._cache[prompt_hash] = {
"context": context,
"system_prompt": system_prompt,
"hits": 0,
"created_at": datetime.utcnow()
}
return context
def _evict_lru(self) -> None:
"""Evict least used entry."""
if not self._cache:
return
lru_hash = min(
self._cache.keys(),
key=lambda h: self._cache[h]["hits"]
)
del self._cache[lru_hash]
Tiered Caching
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class CacheTier(Enum):
"""Cache tier levels."""
L1_EXACT = "l1_exact"
L2_SEMANTIC = "l2_semantic"
L3_PREFIX = "l3_prefix"
MISS = "miss"
@dataclass
class TieredCacheResult:
"""Result from tiered cache lookup."""
hit: bool
tier: CacheTier
response: str = None
similarity: float = None
class TieredCache:
"""Multi-tier LLM cache."""
def __init__(
self,
exact_cache: ExactMatchCache,
semantic_cache: SemanticCache,
prefix_cache: PrefixCache = None
):
self.exact_cache = exact_cache
self.semantic_cache = semantic_cache
self.prefix_cache = prefix_cache
self._tier_hits = {tier: 0 for tier in CacheTier}
async def get(
self,
prompt: str,
model: str,
temperature: float = 0,
system_prompt: str = None
) -> TieredCacheResult:
"""Look up in cache tiers."""
# L1: Exact match (fastest)
exact_result = await self.exact_cache.get(
prompt=prompt,
model=model,
temperature=temperature,
system_prompt=system_prompt
)
if exact_result:
self._tier_hits[CacheTier.L1_EXACT] += 1
return TieredCacheResult(
hit=True,
tier=CacheTier.L1_EXACT,
response=exact_result,
similarity=1.0
)
# L2: Semantic match (for temperature=0 only)
if temperature == 0:
semantic_result = await self.semantic_cache.get(
prompt=prompt,
model=model
)
if semantic_result:
self._tier_hits[CacheTier.L2_SEMANTIC] += 1
return TieredCacheResult(
hit=True,
tier=CacheTier.L2_SEMANTIC,
response=semantic_result
)
# L3: Prefix match (if available)
if self.prefix_cache:
prefix_result = self.prefix_cache.find_cached_prefix(prompt)
if prefix_result:
self._tier_hits[CacheTier.L3_PREFIX] += 1
# Return prefix info for partial cache hit
return TieredCacheResult(
hit=True,
tier=CacheTier.L3_PREFIX,
response=None # Caller handles prefix continuation
)
self._tier_hits[CacheTier.MISS] += 1
return TieredCacheResult(hit=False, tier=CacheTier.MISS)
async def set(
self,
prompt: str,
response: str,
model: str,
temperature: float = 0,
system_prompt: str = None
) -> None:
"""Cache response in appropriate tiers."""
# Always cache in exact match
await self.exact_cache.set(
prompt=prompt,
model=model,
response=response,
temperature=temperature,
system_prompt=system_prompt
)
# Cache in semantic for deterministic responses
if temperature == 0:
await self.semantic_cache.set(
prompt=prompt,
response=response,
model=model
)
def get_stats(self) -> dict:
"""Get tiered cache statistics."""
total = sum(self._tier_hits.values())
return {
"tier_hits": {
tier.value: count
for tier, count in self._tier_hits.items()
},
"tier_rates": {
tier.value: count / total if total > 0 else 0
for tier, count in self._tier_hits.items()
},
"total_requests": total,
"overall_hit_rate": (total - self._tier_hits[CacheTier.MISS]) / total if total > 0 else 0,
"exact_cache": self.exact_cache.get_stats(),
"semantic_cache": self.semantic_cache.get_stats()
}
class AdaptiveCache:
"""Cache that adapts thresholds based on performance."""
def __init__(
self,
embedding_model: Any,
initial_threshold: float = 0.95,
min_threshold: float = 0.90,
max_threshold: float = 0.99,
adaptation_rate: float = 0.01
):
self.embedding_model = embedding_model
self.threshold = initial_threshold
self.min_threshold = min_threshold
self.max_threshold = max_threshold
self.adaptation_rate = adaptation_rate
self.semantic_cache = SemanticCache(
embedding_model=embedding_model,
similarity_threshold=initial_threshold
)
self._feedback_buffer: list[bool] = []
self._buffer_size = 100
async def get(self, prompt: str) -> Optional[str]:
"""Get with adaptive threshold."""
# Update threshold in semantic cache
self.semantic_cache.similarity_threshold = self.threshold
return await self.semantic_cache.get(prompt)
async def set(self, prompt: str, response: str) -> None:
"""Cache response."""
await self.semantic_cache.set(prompt, response)
def record_feedback(self, was_correct: bool) -> None:
"""Record whether cached response was acceptable."""
self._feedback_buffer.append(was_correct)
if len(self._feedback_buffer) > self._buffer_size:
self._feedback_buffer.pop(0)
# Adapt threshold based on feedback
if len(self._feedback_buffer) >= 10:
accuracy = sum(self._feedback_buffer) / len(self._feedback_buffer)
if accuracy < 0.95:
# Too many bad cache hits, increase threshold
self.threshold = min(
self.max_threshold,
self.threshold + self.adaptation_rate
)
elif accuracy > 0.99:
# Very accurate, can lower threshold for more hits
self.threshold = max(
self.min_threshold,
self.threshold - self.adaptation_rate
)
def get_stats(self) -> dict:
"""Get adaptive cache statistics."""
return {
"current_threshold": self.threshold,
"recent_accuracy": sum(self._feedback_buffer) / len(self._feedback_buffer) if self._feedback_buffer else None,
"feedback_count": len(self._feedback_buffer),
**self.semantic_cache.get_stats()
}
Cached LLM Client
from dataclasses import dataclass
from typing import Any, Optional, AsyncIterator
@dataclass
class CachedResponse:
"""Response with cache metadata."""
content: str
cached: bool
cache_tier: CacheTier = None
latency_ms: float = 0
cost: float = 0
class CachedLLMClient:
"""LLM client with integrated caching."""
def __init__(
self,
client: Any,
cache: TieredCache,
enable_caching: bool = True
):
self.client = client
self.cache = cache
self.enable_caching = enable_caching
self._total_requests = 0
self._cached_requests = 0
self._total_cost = 0.0
self._saved_cost = 0.0
async def chat(
self,
messages: list[dict],
model: str = "gpt-4o-mini",
temperature: float = 0,
max_tokens: int = None,
use_cache: bool = True
) -> CachedResponse:
"""Chat with caching."""
import time
start_time = time.time()
self._total_requests += 1
# Extract prompt and system prompt
system_prompt = None
user_prompt = ""
for msg in messages:
if msg["role"] == "system":
system_prompt = msg["content"]
elif msg["role"] == "user":
user_prompt = msg["content"]
# Check cache
if self.enable_caching and use_cache:
cache_result = await self.cache.get(
prompt=user_prompt,
model=model,
temperature=temperature,
system_prompt=system_prompt
)
if cache_result.hit and cache_result.response:
self._cached_requests += 1
# Estimate saved cost
estimated_cost = self._estimate_cost(
user_prompt, cache_result.response, model
)
self._saved_cost += estimated_cost
return CachedResponse(
content=cache_result.response,
cached=True,
cache_tier=cache_result.tier,
latency_ms=(time.time() - start_time) * 1000,
cost=0
)
# Call LLM
response = await self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
content = response.choices[0].message.content
# Calculate cost
cost = self._calculate_cost(response.usage, model)
self._total_cost += cost
# Cache response
if self.enable_caching and use_cache and temperature == 0:
await self.cache.set(
prompt=user_prompt,
response=content,
model=model,
temperature=temperature,
system_prompt=system_prompt
)
return CachedResponse(
content=content,
cached=False,
latency_ms=(time.time() - start_time) * 1000,
cost=cost
)
def _estimate_cost(
self,
prompt: str,
response: str,
model: str
) -> float:
"""Estimate cost of request."""
# Rough token estimates
prompt_tokens = len(prompt) // 4
response_tokens = len(response) // 4
# Model pricing (per 1K tokens)
pricing = {
"gpt-4o": (0.0025, 0.01),
"gpt-4o-mini": (0.00015, 0.0006),
"gpt-4-turbo": (0.01, 0.03),
}
input_price, output_price = pricing.get(model, (0.001, 0.002))
return (prompt_tokens * input_price + response_tokens * output_price) / 1000
def _calculate_cost(self, usage: Any, model: str) -> float:
"""Calculate actual cost from usage."""
pricing = {
"gpt-4o": (0.0025, 0.01),
"gpt-4o-mini": (0.00015, 0.0006),
"gpt-4-turbo": (0.01, 0.03),
}
input_price, output_price = pricing.get(model, (0.001, 0.002))
return (
usage.prompt_tokens * input_price +
usage.completion_tokens * output_price
) / 1000
def get_stats(self) -> dict:
"""Get client statistics."""
return {
"total_requests": self._total_requests,
"cached_requests": self._cached_requests,
"cache_rate": self._cached_requests / self._total_requests if self._total_requests > 0 else 0,
"total_cost": self._total_cost,
"saved_cost": self._saved_cost,
"savings_rate": self._saved_cost / (self._total_cost + self._saved_cost) if (self._total_cost + self._saved_cost) > 0 else 0,
"cache_stats": self.cache.get_stats()
}
Production Caching Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class CacheRequest(BaseModel):
prompt: str
model: str = "gpt-4o-mini"
temperature: float = 0
system_prompt: Optional[str] = None
class CacheSetRequest(BaseModel):
prompt: str
response: str
model: str = "gpt-4o-mini"
temperature: float = 0
system_prompt: Optional[str] = None
ttl: Optional[int] = 3600
class ChatRequest(BaseModel):
messages: list[dict]
model: str = "gpt-4o-mini"
temperature: float = 0
max_tokens: Optional[int] = None
use_cache: bool = True
# Initialize caching (would be configured properly)
# exact_cache = ExactMatchCache(InMemoryCache())
# semantic_cache = SemanticCache(embedding_model)
# tiered_cache = TieredCache(exact_cache, semantic_cache)
# cached_client = CachedLLMClient(openai_client, tiered_cache)
@app.post("/v1/cache/get")
async def cache_get(request: CacheRequest):
"""Get from cache."""
# result = await tiered_cache.get(
# prompt=request.prompt,
# model=request.model,
# temperature=request.temperature,
# system_prompt=request.system_prompt
# )
return {
"hit": False,
"tier": None,
"response": None
}
@app.post("/v1/cache/set")
async def cache_set(request: CacheSetRequest):
"""Set in cache."""
# await tiered_cache.set(
# prompt=request.prompt,
# response=request.response,
# model=request.model,
# temperature=request.temperature,
# system_prompt=request.system_prompt
# )
return {"status": "cached"}
@app.post("/v1/chat")
async def chat(request: ChatRequest):
"""Chat with caching."""
# response = await cached_client.chat(
# messages=request.messages,
# model=request.model,
# temperature=request.temperature,
# max_tokens=request.max_tokens,
# use_cache=request.use_cache
# )
return {
"content": "Response placeholder",
"cached": False,
"cache_tier": None,
"latency_ms": 500,
"cost": 0.001
}
@app.delete("/v1/cache")
async def clear_cache():
"""Clear all caches."""
# await exact_cache.backend.clear()
return {"status": "cleared"}
@app.get("/v1/cache/stats")
async def get_cache_stats():
"""Get cache statistics."""
return {
"exact_cache": {
"hits": 0,
"misses": 0,
"hit_rate": 0
},
"semantic_cache": {
"entries": 0,
"hits": 0,
"misses": 0,
"hit_rate": 0
},
"overall": {
"total_requests": 0,
"cached_requests": 0,
"cache_rate": 0,
"total_cost": 0,
"saved_cost": 0
}
}
@app.get("/health")
async def health():
return {"status": "healthy"}
References
- GPTCache: https://github.com/zilliztech/GPTCache
- LangChain Caching: https://python.langchain.com/docs/modules/model_io/llms/llm_caching
- Redis Semantic Cache: https://redis.io/docs/stack/search/reference/vectors/
- Anthropic Prompt Caching: https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
Conclusion
Caching is the most effective way to reduce LLM costs and latency. Start with exact match caching—it’s simple, fast, and catches identical requests. Add semantic caching for temperature=0 requests to catch semantically similar queries that should return the same response. Use prefix caching for applications with shared system prompts or context. Implement tiered caching that checks exact match first (fastest), then semantic (more flexible), falling back to the LLM only on complete misses. Monitor your cache hit rates and adjust similarity thresholds based on feedback—too low and you’ll serve incorrect cached responses, too high and you’ll miss valid cache opportunities. Consider adaptive caching that automatically tunes thresholds based on user feedback. The key insight is that LLM responses are often deterministic for the same input (at temperature=0), and many user queries are semantically similar even if not identical. A well-tuned caching system can reduce costs by 50-90% while dramatically improving response times, making the difference between an expensive, slow application and one that’s both economical and responsive.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.